Post

Kafka in Action 04 - Enriching Data with Joins

Kafka in Action 04 - Enriching Data with Joins

In our “Kafka in Action” series, we’ve progressively built more complex applications. We started by transforming single streams of data, then moved on to performing time-windowed aggregations to derive insights. However, in any real-world system, data is rarely isolated. Valuable context is often spread across multiple systems and, consequently, multiple Kafka topics.

To create a truly complete picture of an event, we need to combine these disparate sources. This process of combining and augmenting data is called enrichment, and the primary tool for achieving it in Kafka Streams is the Join.

This article will explore how to perform joins in Kafka Streams, focusing on the most common and powerful pattern for data enrichment: the KStream-KTable join.

A World of Joins

Kafka Streams provides a rich set of join operations, each suited for different scenarios. The three main types are:

  1. KStream-KStream Join: This joins two streams of events. Since both streams are unbounded, this join must be performed within a window to prevent the state from growing infinitely. It’s used to correlate events that occur within a certain time proximity, like “Find all users who viewed a product and then added it to their cart within 5 minutes.”

  2. KTable-KTable Join: This is analogous to a standard database join between two tables. It combines two KTables, and the result is a new KTable that is updated whenever a record in either of the source tables changes.

  3. KStream-KTable Join: This is the workhorse of data enrichment. It takes a stream of events (the KStream, representing “what is happening”) and joins it with a table of reference data (the KTable, representing “facts about the world”). For every new record in the stream, Kafka Streams performs a lookup in the table using the record’s key and combines the two. This is exactly what we need to enrich an event with contextual information.

The Scenario: Enriching Payments with User Profiles

Let’s build an application that enriches a stream of payment events.

  • We have a payments topic where each record represents a transaction. The key is the userId, and the value is the payment amount.
  • We have a user-profiles topic that contains the latest profile information for each user. The key is the userId, and the value contains their name and location.

Our Goal: For every new payment that arrives, we want to look up the user’s profile information and create a new, enriched payment event that includes the user’s name and location, publishing it to a new topic.

The Importance of Keys (and Co-partitioning)

Before we write the code, there’s a critical prerequisite for joins to work: the join is performed on the message key. This means that if we want to join a payment for user-A with the profile for user-A, both records must have the key user-A.

Furthermore, for efficient scaling, the input topics should be co-partitioned. This means they must have the same number of partitions. Kafka Streams can then guarantee that records with the same key from both topics will reside on the same machine, allowing the join to be performed locally without any slow and expensive network communication.

Building the Enrichment Pipeline

1. Create Topics

We need two input topics and one output topic. Make sure they have the same number of partitions (we’ll use 2).

1
2
3
4
5
6
7
8
# Input topic for payments
docker-compose exec kafka kafka-topics --create --topic payments --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1

# Input topic for user profiles (our "table")
docker-compose exec kafka kafka-topics --create --topic user-profiles --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1

# Output topic for the enriched results
docker-compose exec kafka kafka-topics --create --topic enriched-payments --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1

2. The Application Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class StreamEnrichmentJoin {
    public static void main(String[] args) {
        // 1. Create Streams Properties
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-enrichment-app");
        properties.put(Streams-Config.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // 2. Define the processing topology
        StreamsBuilder builder = new StreamsBuilder();

        // 2.1) Get the stream of payments
        KStream<String, String> paymentStream = builder.stream("payments");

        // 2.2) Get the user profiles as a KTable. This tells Streams to materialize
        // the topic into a queryable state store.
        KTable<String, String> userProfiles = builder.table("user-profiles");

        // 2.3) Join the stream with the table
        KStream<String, String> enrichedStream = paymentStream.join(userProfiles,
                // The ValueJoiner lambda: defines how to combine the two values
                (paymentValue, profileValue) -> {
                    String amount = paymentValue;
                    String name = profileValue.split(",")[0];
                    String location = profileValue.split(",")[1];
                    return String.format("Payment of $%s by %s from %s", amount, name, location);
                }
        );

        // 2.4) Sink the enriched stream to the output topic
        enrichedStream.to("enriched-payments");
        
        // 3. Build and start the Kafka Streams application
        KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        streams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Running the Join in Real-Time

  1. Start the Java App: Run the StreamEnrichmentJoin application.
  2. Start Console Producers & Consumer: Open three terminals.
    1
    2
    3
    4
    5
    6
    7
    8
    
    # Terminal 1: Produce user profiles (Key:Value)
    docker-compose exec kafka kafka-console-producer --broker-list localhost:9092 --topic user-profiles --property "parse.key=true" --property "key.separator=:"
    
    # Terminal 2: Produce payments (Key:Value)
    docker-compose exec kafka kafka-console-producer --broker-list localhost:9092 --topic payments --property "parse.key=true" --property "key.separator=:"
    
    # Terminal 3: Consume the enriched results
    docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic enriched-payments --from-beginning
    
  3. Populate the Table: In the user-profiles producer (Terminal 1), add some reference data.
    1
    2
    
    user-A:Alice,USA
    user-B:Bob,Canada
    
  4. Send Events: Now, in the payments producer (Terminal 2), send some events.
    1
    2
    3
    
    user-A:150.00
    user-C:99.99
    user-B:25.50
    
  5. Observe the Enriched Output: In Terminal 3, you will see the joined results appear instantly.
    1
    2
    
    Payment of $150.00 by Alice from USA
    Payment of $25.50 by Bob from Canada
    

    Notice that the payment for user-C was dropped. This is because the default join is an inner join; if a key exists in the stream but not in the table, the record is discarded. If you need to process every event, even if enrichment fails, you would use a leftJoin().

  6. Update the Table: Now, update a user’s profile in Terminal 1.
    1
    
    user-A:Alice,Germany
    
  7. Send Another Event: Send another payment for user-A in Terminal 2.
    1
    
    user-A:300.00
    

    The consumer will immediately show the new, updated profile data in the enriched event:

    1
    
    Payment of $300.00 by Alice from Germany
    

Conclusion & What’s Next?

Joins are the key to unlocking the full context of your data, transforming simple event streams into rich, insightful information flows. The KStream-KTable join, in particular, is a fundamental pattern for any real-time enrichment pipeline.

We have now covered the core development aspects of Kafka Streams: stateless transformations, stateful aggregations and windowing, and now, contextual enrichment with joins. We’ve built powerful applications, but they’ve all been running locally on our machine. The final step in our journey is to understand how to take a Streams application to production.

In the final article of the “Kafka in Action” series, we will explore the operational side of Kafka Streams, discussing deployment, scalability, and how to query your application’s state directly using a powerful feature called Interactive Queries.

This post is licensed under CC BY 4.0 by the author.