Kafka in Action 03 - Stateful Processing with Windows and Aggregations
In our journey so far, we’ve explored the stateless, record-by-record transformations that form the backbone of data pipelines. We routed e-commerce orders based on their content. But the true power of stream processing is unlocked when we analyze data over time.
However, in the real world, “all time” is rarely what we need. Business questions are almost always time-bound: “How many users logged in during the last hour?”, “What was our total sales revenue in the last 5 minutes?”, “What’s the average sensor temperature per minute?”.
To answer these questions, we need to combine two powerful concepts: sophisticated aggregations and time-bound windows. This article will dive deep into building stateful, time-windowed analytics.
Beyond count(): Advanced Aggregations
While count() is useful, it’s just one tool in the toolbox. Kafka Streams provides more generic and powerful aggregation operators.
reduce(): This operator is used when you want to combine all records for a key into a single result of the same type. You provide aReducerlambda that takes the current aggregate value and the next record’s value and returns the new aggregate value. A classic example is summing a stream of numbers.aggregate(): This is the most general and powerful aggregation tool. It allows you to produce a result that has a different type from your input stream. It requires two main components:- An
Initializerthat provides the starting value for your aggregation (e.g., zero for a sum, or an empty list). - An
Aggregatorlambda that takes the current key, the new record’s value, and the current aggregate value, and returns the new aggregate value.
- An
Bounding State with Windows
Performing an aggregation over an unbounded stream means the state required to store the result will grow forever. This is not sustainable. Windows are the mechanism for bounding our calculations to specific slices of time.
Kafka Streams offers several windowing strategies, but the most common is the Tumbling Window.
A tumbling window is defined by a fixed size and represents a series of fixed-size, non-overlapping time intervals. For example, a 1-minute tumbling window will capture events from [12:00, 12:01), then [12:01, 12:02), and so on. Every event belongs to one and only one window.
Building a Real-Time Sales Dashboard
Let’s apply these concepts to our e-commerce scenario. Our goal is to calculate the total sales revenue for each product category over 1-minute tumbling windows.
A Quick Detour: Handling Structured Data with Serdes
So far, we’ve used simple strings. This is brittle. A real application uses structured data like JSON or Avro. To handle this, Kafka Streams uses Serdes (Serializer/Deserializer). A Serde is a helper that knows how to convert a Java object to bytes (for writing to Kafka) and back again (for reading from Kafka).
While you can write your own, it’s common to use a library. For this example, we’ll just model our data with a POJO (Plain Old Java Object) and stick to a simpler data format to focus on the windowing logic.
1. Create Topics
We’ll use our orders topic as the input and create a new output topic.
1
2
3
4
5
# Input topic for raw orders (re-use from last article)
# docker-compose exec kafka kafka-topics --create --topic orders --bootstrap-server localhost:9252
# Output topic for our windowed analytics
docker-compose exec kafka kafka-topics --create --topic sales-per-minute --bootstrap-server localhost:9092
2. The Application Code
Our application will read orders, group them by product category (e.g., “Electronics”, “Books”), and then use a 1-minute tumbling window to sum up the total revenue for each category within that minute.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Properties;
public class WindowedSalesAnalytics {
// Simple POJO to represent our data
public static class Order {
public String product;
public double value;
public Order(String product, double value) {
this.product = product;
this.value = value;
}
}
public static void main(String[] args) {
// 1. Create Streams Properties
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowed-sales-app");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 2. Define the processing topology
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> orderStream = builder.stream("orders");
// Parse the raw string into a structured Order object
KStream<String, Order> structuredOrders = orderStream.mapValues(value -> {
try {
String[] parts = value.split(",");
String product = parts[2];
int quantity = Integer.parseInt(parts[3]);
double price = Double.parseDouble(parts[4]);
return new Order(product, quantity * price);
} catch (Exception e) {
// Return null for malformed records
return null;
}
}).filter((key, value) -> value != null); // Filter out malformed records
// The core logic: group, window, and aggregate
KTable<Windowed<String>, Double> aggregatedSales = structuredOrders
// Group by the product name, which becomes the key
.groupBy((key, order) -> order.product)
// Define a 1-minute tumbling window
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
// Aggregate the sales values
.aggregate(
() -> 0.0, // Initializer: start sum at 0
(aggKey, newValue, aggValue) -> aggValue + newValue.value, // Aggregator: add new value
Materialized.with(Serdes.String(), Serdes.Double()) // Serdes for state store
);
// Convert the windowed KTable back to a KStream to sink it to a topic
aggregatedSales.toStream()
.mapValues((readOnlyKey, value) -> String.format("Total sales for %s in window [%s to %s] is $%.2f",
readOnlyKey.key(),
readOnlyKey.window().startTime(),
readOnlyKey.window().endTime(),
value))
.to("sales-per-minute", Produced.with(null, Serdes.String()));
// 3. Build and start the Kafka Streams application
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Running the Time-Sensitive Pipeline
- Start the App: Run the
WindowedSalesAnalyticsapplication. - Start Producer and Consumer: Set up a producer for the
orderstopic and a consumer for thesales-per-minutetopic. - Send Data: In the producer, send some order data. Try to send a few within the same minute, then wait for a minute to pass and send some more.
1 2 3 4 5 6 7 8 9 10
# (Send these within the same minute) >101,user-A,Laptop,1,1200 >102,user-C,Kafka Book,1,450 >103,user-A,Laptop,1,1350 # (Wait for a minute to pass on the clock) # (Send these in the next minute) >104,user-B,Kafka Book,2,900 >105,user-D,Camera,1,750
- Observe the Windowed Results: Your consumer will show the final aggregated results for each window as the window closes.
- After the first minute, you’ll see results like:
1 2
Total sales for Laptop in window [12:01:00 to 12:02:00] is $2550.00 Total sales for Kafka Book in window [12:01:00 to 12:02:00] is $450.00
- After the second minute, you’ll see a new set of results:
1 2
Total sales for Kafka Book in window [12:02:00 to 12:03:00] is $900.00 Total sales for Camera in window [12:02:00 to 12:03:00] is $750.00
- After the first minute, you’ll see results like:
Conclusion & What’s Next?
You have now built a true, real-time analytics pipeline. By combining aggregations with windows, you can manage state effectively and answer complex, time-bound business questions as data arrives. We’ve also seen the importance of Serdes for handling structured data, moving us closer to production-ready applications.
But what if our data is spread across multiple topics? A stream of orders is useful, but it would be much more valuable if we could combine it with a stream of customer-details to enrich it in real-time. This operation, a cornerstone of data processing, is called a Join. In our next article, we will explore how to perform stream-to-stream and stream-to-table joins in Kafka Streams. perform stream-to-stream and stream-to-table joins in Kafka Streams.