Post

Kafka in Action 02 - Stateless Transformations with the DSL

Kafka in Action 02 - Stateless Transformations with the DSL

In our first foray into Kafka Streams we built a stateful word-count application. The count() operation was stateful because it had to remember the previous count for each word to increment it. While powerful, many stream processing tasks are much simpler and don’t require memory of past events. These are called stateless transformations.

A stateless operation processes each record independently, in isolation. The transformation of one message has no knowledge of, and no impact on, the transformation of any other message. These operations are the fundamental building blocks of any data pipeline, used for cleaning, reshaping, and routing data as it flows.

In this article, we’ll build a practical data pipeline for a fictional e-commerce platform, exploring the most essential stateless operations: filter, map, flatMap, and branch.

The Scenario: Processing E-commerce Orders

Imagine we have a stream of raw order data being published to a Kafka topic called orders. Each message is a simple comma-separated string representing an order: orderId,userId,product,quantity,price.

Our goal is to build a Kafka Streams application that:

  1. Filters out low-value orders.
  2. Transforms the raw data into a more readable format.
  3. Routes orders for different product categories to separate topics for specialized downstream processing.

Core Stateless Operations

1. filter() and filterNot()

The filter() operation is exactly what it sounds like. You provide it with a condition (a Predicate), and it allows records that satisfy the condition to pass through while discarding the rest. filterNot() does the opposite.

  • Use Case: We only want to process orders with a total value (quantity * price) greater than $100.

2. map() and mapValues()

These operations transform each record. mapValues() is a convenient and often more efficient shortcut when you only need to change the value of a record, keeping its key intact. map() allows you to change the key, the value, or both.

  • Use Case: We want to transform our raw CSV string 101,user-A,Laptop,1,1200 into a more human-readable value like Purchase of 1 Laptop(s) by user-A for a total of $1200.

3. flatMap() and flatMapValues()

These are similar to map, but with a key difference: they can transform a single input record into zero, one, or many output records. The word count example from our previous article is the classic demonstration of flatMapValues: one record containing the sentence "hello kafka streams" was transformed into three records: "hello", "kafka", and "streams".

4. branch()

The branch() operation is an efficient way to split one stream into many based on a set of conditions. You provide it with multiple predicates, and it returns an array of KStreams, one for each predicate that is met. This is far better than applying multiple filter() operations to the same source stream.

  • Use Case: From our stream of high-value orders, we want to create one stream for “Electronics” and another for “Books”.

Building the E-commerce Pipeline

Let’s put these concepts into practice.

1. Create Topics

First, we need our input and output topics.

1
2
3
4
5
6
7
8
9
10
11
# Input topic for raw orders
docker-compose exec kafka kafka-topics --create --topic orders --bootstrap-server localhost:9092

# Output topic for high-value electronics orders
docker-compose exec kafka kafka-topics --create --topic electronics-orders --bootstrap-server localhost:9092

# Output topic for high-value book orders
docker-compose exec kafka kafka-topics --create --topic book-orders --bootstrap-server localhost:9092

# Output topic for other high-value items
docker-compose exec kafka kafka-topics --create --topic other-orders --bootstrap-server localhost:9092

2. The Application Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;

import java.util.Properties;

public class EcommercePipeline {
    public static void main(String[] args) {
        // 1. Create Streams Properties
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "ecommerce-pipeline-app");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // 2. Define the processing topology
        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> orderStream = builder.stream("orders");

        // 2.1) Filter for orders with a total value > $100
        KStream<String, String> highValueOrders = orderStream.filter((key, value) -> {
            try {
                String[] parts = value.split(",");
                int quantity = Integer.parseInt(parts[3]);
                double price = Double.parseDouble(parts[4]);
                return (quantity * price) > 100.0;
            } catch (Exception e) {
                // Malformed record, filter it out
                return false;
            }
        });

        // 2.2) Map the value to a more readable format
        KStream<String, String> formattedOrders = highValueOrders.mapValues(value -> {
            String[] parts = value.split(",");
            String userId = parts[1];
            String product = parts[2];
            int quantity = Integer.parseInt(parts[3]);
            double price = Double.parseDouble(parts[4]);
            return String.format("Purchase of %d %s(s) by %s for a total of $%.2f",
                    quantity, product, userId, quantity * price);
        });

        // 2.3) Branch the formatted stream into different categories
        Predicate<String, String> isElectronics = (key, value) -> value.contains("Laptop") || value.contains("Camera");
        Predicate<String, String> isBook = (key, value) -> value.contains("Book");

        KStream<String, String>[] branchedStreams = formattedOrders.branch(isElectronics, isBook, (key, value) -> true);

        KStream<String, String> electronicsStream = branchedStreams[0];
        KStream<String, String> booksStream = branchedStreams[1];
        KStream<String, String> othersStream = branchedStreams[2];
        
        // 2.4) Sink each branch to its respective topic
        electronicsStream.to("electronics-orders");
        booksStream.to("book-orders");
        othersStream.to("other-orders");

        // 3. Build and start the Kafka Streams application
        KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        streams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Running the Pipeline

  1. Start the Java App: Run the EcommercePipeline application.
  2. Start a Console Producer: Open a terminal to send order data.
    1
    
    docker-compose exec kafka kafka-console-producer --broker-list localhost:9092 --topic orders
    
  3. Start Console Consumers: Open three separate terminals to monitor each output topic.
    1
    2
    3
    4
    5
    6
    7
    8
    
    # Terminal 1: Electronics
    docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic electronics-orders --from-beginning
    
    # Terminal 2: Books
    docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic book-orders --from-beginning
    
    # Terminal 3: Others
    docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic other-orders --from-beginning
    
  4. Send Data: In the producer terminal, enter some order data.
    1
    2
    3
    4
    5
    
    >101,user-A,Laptop,1,1200
    >102,user-B,Coffee,3,15
    >103,user-C,Kafka Book,1,450
    >104,user-A,Camera,1,850
    >105,user-D,Desk Chair,1,225
    
  5. Observe the Results: You will see the messages being routed and transformed correctly.
    • The Coffee order (value $45) will be dropped by the filter.
    • The Laptop and Camera orders will appear in the electronics-orders consumer.
    • The Kafka Book order will appear in the book-orders consumer.
    • The Desk Chair order will appear in the other-orders consumer.
    • All messages in the output topics will be in the new, human-readable format.

What’s Next?

We have now seen how to construct powerful, declarative data pipelines using stateless transformations. These operations are the workhorses of stream processing. However, the real magic of Kafka Streams is its ability to combine these stateless operations with stateful ones.

In our next article, we will revisit stateful processing in depth. We’ll go beyond the simple count() and explore powerful aggregations like reduce and aggregate, and introduce the concept of windowing to perform calculations over specific periods of time.

This post is licensed under CC BY 4.0 by the author.