Kafka in Action 04 - Enriching Data with Joins
In our “Kafka in Action” series, we’ve progressively built more complex applications. We started by transforming single streams of data, then moved on to performing time-windowed aggregations to derive insights. However, in any real-world system, data is rarely isolated. Valuable context is often spread across multiple systems and, consequently, multiple Kafka topics.
To create a truly complete picture of an event, we need to combine these disparate sources. This process of combining and augmenting data is called enrichment, and the primary tool for achieving it in Kafka Streams is the Join.
This article will explore how to perform joins in Kafka Streams, focusing on the most common and powerful pattern for data enrichment: the KStream-KTable join.
A World of Joins
Kafka Streams provides a rich set of join operations, each suited for different scenarios. The three main types are:
KStream-KStream Join: This joins two streams of events. Since both streams are unbounded, this join must be performed within a window to prevent the state from growing infinitely. It’s used to correlate events that occur within a certain time proximity, like “Find all users who viewed a product and then added it to their cart within 5 minutes.”
KTable-KTable Join: This is analogous to a standard database join between two tables. It combines two
KTables, and the result is a newKTablethat is updated whenever a record in either of the source tables changes.KStream-KTable Join: This is the workhorse of data enrichment. It takes a stream of events (the
KStream, representing “what is happening”) and joins it with a table of reference data (theKTable, representing “facts about the world”). For every new record in the stream, Kafka Streams performs a lookup in the table using the record’s key and combines the two. This is exactly what we need to enrich an event with contextual information.
The Scenario: Enriching Payments with User Profiles
Let’s build an application that enriches a stream of payment events.
- We have a
paymentstopic where each record represents a transaction. The key is theuserId, and the value is the payment amount. - We have a
user-profilestopic that contains the latest profile information for each user. The key is theuserId, and the value contains their name and location.
Our Goal: For every new payment that arrives, we want to look up the user’s profile information and create a new, enriched payment event that includes the user’s name and location, publishing it to a new topic.
The Importance of Keys (and Co-partitioning)
Before we write the code, there’s a critical prerequisite for joins to work: the join is performed on the message key. This means that if we want to join a payment for user-A with the profile for user-A, both records must have the key user-A.
Furthermore, for efficient scaling, the input topics should be co-partitioned. This means they must have the same number of partitions. Kafka Streams can then guarantee that records with the same key from both topics will reside on the same machine, allowing the join to be performed locally without any slow and expensive network communication.
Building the Enrichment Pipeline
1. Create Topics
We need two input topics and one output topic. Make sure they have the same number of partitions (we’ll use 2).
1
2
3
4
5
6
7
8
# Input topic for payments
docker-compose exec kafka kafka-topics --create --topic payments --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1
# Input topic for user profiles (our "table")
docker-compose exec kafka kafka-topics --create --topic user-profiles --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1
# Output topic for the enriched results
docker-compose exec kafka kafka-topics --create --topic enriched-payments --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1
2. The Application Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class StreamEnrichmentJoin {
public static void main(String[] args) {
// 1. Create Streams Properties
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-enrichment-app");
properties.put(Streams-Config.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 2. Define the processing topology
StreamsBuilder builder = new StreamsBuilder();
// 2.1) Get the stream of payments
KStream<String, String> paymentStream = builder.stream("payments");
// 2.2) Get the user profiles as a KTable. This tells Streams to materialize
// the topic into a queryable state store.
KTable<String, String> userProfiles = builder.table("user-profiles");
// 2.3) Join the stream with the table
KStream<String, String> enrichedStream = paymentStream.join(userProfiles,
// The ValueJoiner lambda: defines how to combine the two values
(paymentValue, profileValue) -> {
String amount = paymentValue;
String name = profileValue.split(",")[0];
String location = profileValue.split(",")[1];
return String.format("Payment of $%s by %s from %s", amount, name, location);
}
);
// 2.4) Sink the enriched stream to the output topic
enrichedStream.to("enriched-payments");
// 3. Build and start the Kafka Streams application
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Running the Join in Real-Time
- Start the Java App: Run the
StreamEnrichmentJoinapplication. - Start Console Producers & Consumer: Open three terminals.
1 2 3 4 5 6 7 8
# Terminal 1: Produce user profiles (Key:Value) docker-compose exec kafka kafka-console-producer --broker-list localhost:9092 --topic user-profiles --property "parse.key=true" --property "key.separator=:" # Terminal 2: Produce payments (Key:Value) docker-compose exec kafka kafka-console-producer --broker-list localhost:9092 --topic payments --property "parse.key=true" --property "key.separator=:" # Terminal 3: Consume the enriched results docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic enriched-payments --from-beginning
- Populate the Table: In the
user-profilesproducer (Terminal 1), add some reference data.1 2
user-A:Alice,USA user-B:Bob,Canada
- Send Events: Now, in the
paymentsproducer (Terminal 2), send some events.1 2 3
user-A:150.00 user-C:99.99 user-B:25.50
- Observe the Enriched Output: In Terminal 3, you will see the joined results appear instantly.
1 2
Payment of $150.00 by Alice from USA Payment of $25.50 by Bob from Canada
Notice that the payment for
user-Cwas dropped. This is because the default join is an inner join; if a key exists in the stream but not in the table, the record is discarded. If you need to process every event, even if enrichment fails, you would use aleftJoin(). - Update the Table: Now, update a user’s profile in Terminal 1.
1
user-A:Alice,Germany
- Send Another Event: Send another payment for
user-Ain Terminal 2.1
user-A:300.00
The consumer will immediately show the new, updated profile data in the enriched event:
1
Payment of $300.00 by Alice from Germany
Conclusion & What’s Next?
Joins are the key to unlocking the full context of your data, transforming simple event streams into rich, insightful information flows. The KStream-KTable join, in particular, is a fundamental pattern for any real-time enrichment pipeline.
We have now covered the core development aspects of Kafka Streams: stateless transformations, stateful aggregations and windowing, and now, contextual enrichment with joins. We’ve built powerful applications, but they’ve all been running locally on our machine. The final step in our journey is to understand how to take a Streams application to production.
In the final article of the “Kafka in Action” series, we will explore the operational side of Kafka Streams, discussing deployment, scalability, and how to query your application’s state directly using a powerful feature called Interactive Queries.