Kafka in Action 02 - Stateless Transformations with the DSL
In our first foray into Kafka Streams we built a stateful word-count application. The count() operation was stateful because it had to remember the previous count for each word to increment it. While powerful, many stream processing tasks are much simpler and don’t require memory of past events. These are called stateless transformations.
A stateless operation processes each record independently, in isolation. The transformation of one message has no knowledge of, and no impact on, the transformation of any other message. These operations are the fundamental building blocks of any data pipeline, used for cleaning, reshaping, and routing data as it flows.
In this article, we’ll build a practical data pipeline for a fictional e-commerce platform, exploring the most essential stateless operations: filter, map, flatMap, and branch.
The Scenario: Processing E-commerce Orders
Imagine we have a stream of raw order data being published to a Kafka topic called orders. Each message is a simple comma-separated string representing an order: orderId,userId,product,quantity,price.
Our goal is to build a Kafka Streams application that:
- Filters out low-value orders.
- Transforms the raw data into a more readable format.
- Routes orders for different product categories to separate topics for specialized downstream processing.
Core Stateless Operations
1. filter() and filterNot()
The filter() operation is exactly what it sounds like. You provide it with a condition (a Predicate), and it allows records that satisfy the condition to pass through while discarding the rest. filterNot() does the opposite.
- Use Case: We only want to process orders with a total value (
quantity * price) greater than $100.
2. map() and mapValues()
These operations transform each record. mapValues() is a convenient and often more efficient shortcut when you only need to change the value of a record, keeping its key intact. map() allows you to change the key, the value, or both.
- Use Case: We want to transform our raw CSV string
101,user-A,Laptop,1,1200into a more human-readable value likePurchase of 1 Laptop(s) by user-A for a total of $1200.
3. flatMap() and flatMapValues()
These are similar to map, but with a key difference: they can transform a single input record into zero, one, or many output records. The word count example from our previous article is the classic demonstration of flatMapValues: one record containing the sentence "hello kafka streams" was transformed into three records: "hello", "kafka", and "streams".
4. branch()
The branch() operation is an efficient way to split one stream into many based on a set of conditions. You provide it with multiple predicates, and it returns an array of KStreams, one for each predicate that is met. This is far better than applying multiple filter() operations to the same source stream.
- Use Case: From our stream of high-value orders, we want to create one stream for “Electronics” and another for “Books”.
Building the E-commerce Pipeline
Let’s put these concepts into practice.
1. Create Topics
First, we need our input and output topics.
1
2
3
4
5
6
7
8
9
10
11
# Input topic for raw orders
docker-compose exec kafka kafka-topics --create --topic orders --bootstrap-server localhost:9092
# Output topic for high-value electronics orders
docker-compose exec kafka kafka-topics --create --topic electronics-orders --bootstrap-server localhost:9092
# Output topic for high-value book orders
docker-compose exec kafka kafka-topics --create --topic book-orders --bootstrap-server localhost:9092
# Output topic for other high-value items
docker-compose exec kafka kafka-topics --create --topic other-orders --bootstrap-server localhost:9092
2. The Application Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;
import java.util.Properties;
public class EcommercePipeline {
public static void main(String[] args) {
// 1. Create Streams Properties
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "ecommerce-pipeline-app");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 2. Define the processing topology
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> orderStream = builder.stream("orders");
// 2.1) Filter for orders with a total value > $100
KStream<String, String> highValueOrders = orderStream.filter((key, value) -> {
try {
String[] parts = value.split(",");
int quantity = Integer.parseInt(parts[3]);
double price = Double.parseDouble(parts[4]);
return (quantity * price) > 100.0;
} catch (Exception e) {
// Malformed record, filter it out
return false;
}
});
// 2.2) Map the value to a more readable format
KStream<String, String> formattedOrders = highValueOrders.mapValues(value -> {
String[] parts = value.split(",");
String userId = parts[1];
String product = parts[2];
int quantity = Integer.parseInt(parts[3]);
double price = Double.parseDouble(parts[4]);
return String.format("Purchase of %d %s(s) by %s for a total of $%.2f",
quantity, product, userId, quantity * price);
});
// 2.3) Branch the formatted stream into different categories
Predicate<String, String> isElectronics = (key, value) -> value.contains("Laptop") || value.contains("Camera");
Predicate<String, String> isBook = (key, value) -> value.contains("Book");
KStream<String, String>[] branchedStreams = formattedOrders.branch(isElectronics, isBook, (key, value) -> true);
KStream<String, String> electronicsStream = branchedStreams[0];
KStream<String, String> booksStream = branchedStreams[1];
KStream<String, String> othersStream = branchedStreams[2];
// 2.4) Sink each branch to its respective topic
electronicsStream.to("electronics-orders");
booksStream.to("book-orders");
othersStream.to("other-orders");
// 3. Build and start the Kafka Streams application
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Running the Pipeline
- Start the Java App: Run the
EcommercePipelineapplication. - Start a Console Producer: Open a terminal to send order data.
1
docker-compose exec kafka kafka-console-producer --broker-list localhost:9092 --topic orders
- Start Console Consumers: Open three separate terminals to monitor each output topic.
1 2 3 4 5 6 7 8
# Terminal 1: Electronics docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic electronics-orders --from-beginning # Terminal 2: Books docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic book-orders --from-beginning # Terminal 3: Others docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic other-orders --from-beginning
- Send Data: In the producer terminal, enter some order data.
1 2 3 4 5
>101,user-A,Laptop,1,1200 >102,user-B,Coffee,3,15 >103,user-C,Kafka Book,1,450 >104,user-A,Camera,1,850 >105,user-D,Desk Chair,1,225
- Observe the Results: You will see the messages being routed and transformed correctly.
- The
Coffeeorder (value $45) will be dropped by thefilter. - The
LaptopandCameraorders will appear in theelectronics-ordersconsumer. - The
Kafka Bookorder will appear in thebook-ordersconsumer. - The
Desk Chairorder will appear in theother-ordersconsumer. - All messages in the output topics will be in the new, human-readable format.
- The
What’s Next?
We have now seen how to construct powerful, declarative data pipelines using stateless transformations. These operations are the workhorses of stream processing. However, the real magic of Kafka Streams is its ability to combine these stateless operations with stateful ones.
In our next article, we will revisit stateful processing in depth. We’ll go beyond the simple count() and explore powerful aggregations like reduce and aggregate, and introduce the concept of windowing to perform calculations over specific periods of time.