Post

Kafka in Action 01 - Introduction to Kafka Streams

Kafka in Action 01 - Introduction to Kafka Streams

In our foundational “Hello Kafka” series, we mastered the essentials of the Kafka ecosystem. We learned how to produce and consume messages, manage ordering with keys, and build scalable, reliable consumers. This knowledge is crucial, as the Producer and Consumer APIs are the bedrock of any Kafka-based application.

However, you might have noticed that our consumer logic was wrapped in a while(true) loop. We were responsible for polling, processing, and managing commits manually. This is powerful but low-level. What if our goal isn’t just to move data from point A to point B, but to transform, filter, aggregate, or enrich that data as it flows?

This is where the Kafka Streams API comes in. It’s time to level up from Kafka’s “assembly language” and start building sophisticated, real-time applications with ease.

What is Kafka Streams?

Kafka Streams is a client library, not a separate processing cluster. It allows you to build powerful stream processing applications that you can run just like any other Java application. It leverages the core concepts of Kafka—topics for storage, partitions for parallelism, and consumer groups for scalability—to provide a highly scalable and fault-tolerant framework for real-time data processing.

The key difference is the level of abstraction. Instead of thinking about individual messages in a poll loop, you start to think in terms of continuous, unbounded streams of data and the operations you want to apply to them.

Key features include:

  • High-Level DSL: A rich, functional API with operations like map, filter, groupBy, and join.
  • Stateful Processing: The ability to perform stateful operations like counting, summing, or windowing, with fault-tolerant state stores managed by Kafka itself.
  • No Dedicated Cluster: It’s just a library. You package it with your application and deploy it wherever you like.
  • Elastic Scalability: To increase processing power, you simply start more instances of your application. Kafka Streams handles the rebalancing automatically, just like a consumer group.

Core Concepts: Streams and Tables

The Kafka Streams DSL is built on two primary abstractions:

  1. KStream (Stream): A KStream is an unbounded sequence of records, where each record is a key-value pair. You can think of it as a changelog where every new record is an independent event. For example, a stream of all user clicks on a website.

  2. KTable (Table): A KTable is a representation of the current state of a stream. It’s an aggregation that is updated by new records. For any given key, a KTable only holds the single, most recent value. Think of it as a database table that is being continuously updated by the events in a stream. For example, a table holding the current location of every delivery truck.

This duality between streams and tables is a powerful concept that allows you to model a vast range of real-time problems.

Your First Streams App: The “Hello, World” of Real-Time

The canonical “Hello, World!” for stream processing is a word count application. It’s a perfect example because it demonstrates both stateless and stateful operations.

Our Goal: Read sentences from an input topic, split them into words, count the occurrences of each word in real-time, and publish the running counts to an output topic.

1. Project Setup

First, add the kafka-streams dependency to your pom.xml:

1
2
3
4
5
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.6.0</version>
</dependency>

2. Create Topics

We’ll need an input and an output topic.

1
2
3
4
5
# Input topic for our sentences
docker-compose exec kafka kafka-topics --create --topic word-count-input --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

# Output topic for the word counts
docker-compose exec kafka kafka-topics --create --topic word-count-output --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

3. The Application Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Arrays;
import java.util.Properties;

public class WordCountApp {
    public static void main(String[] args) {
        // 1. Create Streams Properties
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // 2. Define the processing topology
        StreamsBuilder builder = new StreamsBuilder();

        // 2.1) Stream from the input topic
        KStream<String, String> textLines = builder.stream("word-count-input");

        // 2.2) Map values to lowercase, flatmap to split by space, group by word, and count
        KTable<String, Long> wordCounts = textLines
                // "Kafka is great" -> "kafka is great"
                .mapValues(value -> value.toLowerCase())
                // "kafka is great" -> ["kafka", "is", "great"]
                .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
                // Select the word as the new key for grouping
                .selectKey((key, word) -> word)
                // Group by the key (the word)
                .groupByKey()
                // Count the occurrences
                .count();

        // 2.3) Write the results back to the output topic
        wordCounts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));

        // 3. Build and start the Kafka Streams application
        KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        streams.start();

        // Add a shutdown hook for graceful closure
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Running the Application

  1. Start the Java App: Run the main method in WordCountApp.
  2. Start a Console Producer: Open a terminal and start a producer to send sentences to our input topic.

    1
    
    docker-compose exec kafka kafka-console-producer --broker-list localhost:9092 --topic word-count-input
    
  3. Start a Console Consumer: In a second terminal, start a consumer to watch the output topic. Note the special value-deserializer to correctly read the Long values.

    1
    2
    3
    4
    5
    6
    
    docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 \
    --topic word-count-output \
    --from-beginning \
    --property print.key=true \
    --property key.separator=":" \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
    
  4. Send Data! In the producer terminal, type sentences and press enter.
    1
    2
    3
    
    >hello kafka streams
    >hello world
    >kafka streams is powerful
    
  5. Watch the Magic! In the consumer terminal, you will see the word counts update in real-time:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    hello:1
    kafka:1
    streams:1
    hello:2
    world:1
    kafka:2
    streams:2
    is:1
    powerful:1
    

What’s Next?

In just a few lines of declarative code, we built a complete, fault-tolerant, stateful stream processing application. We didn’t need a while loop or manual offset management; the Kafka Streams library handled all the complexity for us.

In the next part of the “Kafka in Action” series, we will dive deeper into the rich set of stateless transformations available in the DSL, such as filter, map, and branch, to build more sophisticated data processing pipelines. Stay tuned

This post is licensed under CC BY 4.0 by the author.