Hello Kafka 05 - Delivery Semantics & Manual Commits
Welcome to the final installment of the “Hello Kafka” series. We’ve come a long way: from producing our first message, to consuming it, enhancing our producer with keys and callbacks, and finally, scaling our consumers with consumer groups.
In our last article, we touched on a critical topic: consumer offsets. We relied on the default “auto-commit” behavior, where the consumer client periodically commits the offsets of messages it has polled in the background. While convenient, this approach can be risky in production and can lead to either lost messages or duplicate processing.
Today, we’ll dive into the heart of reliable message processing by exploring delivery semantics and taking manual control of our offset commits.
The Problem with Auto-Commit
The default setting, enable.auto.commit=true, tells the consumer to commit the latest offset it has received from the poll() method every five seconds (configurable with auto.commit.interval.ms). This creates a window for error:
- Data Loss (At-most-once): Imagine the consumer polls a batch of messages. The auto-commit timer fires, committing the offset for this batch. Immediately after, the consumer crashes while processing the very first message in the batch. When it restarts, it will begin reading from the committed offset, completely skipping the messages it never got to process.
- Data Duplication (At-least-once): Now imagine the consumer polls a batch, successfully processes all of them, but then crashes before the auto-commit timer has a chance to fire. When it restarts, it will begin from the last committed offset, re-fetching and re-processing the same batch of messages.
For many applications, reprocessing messages is acceptable, but losing them is not. To build a truly robust system, we need to explicitly tell Kafka when we are “done” with a message.
Understanding Delivery Semantics
When building a messaging system, you must decide on the processing guarantee you need:
- At-most-once: Messages might be lost but are never reprocessed. This is the fastest but least reliable option. You get this if you commit offsets before you process the messages.
- At-least-once: Messages are never lost but might be reprocessed. This is the most common and generally preferred guarantee. You achieve this by committing offsets after you finish processing the messages. If a crash happens between processing and committing, you reprocess on restart.
- Exactly-once: Every message is delivered and processed once, and only once. This is the holy grail of messaging and is significantly more complex to achieve, often requiring support for transactions across the producer, Kafka, and the consumer’s data store.
For most use cases, at-least-once is the target. We can achieve this by disabling auto-commit and managing the offset ourselves.
Taking Control: Manual Offset Commits
The first step is to change our consumer’s configuration:
1
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
With this setting, the consumer will never commit offsets automatically. It is now our responsibility to do so using one of two methods:
consumer.commitSync(): This performs a synchronous, or blocking, commit. The application will pause until the offset commit is acknowledged by the Kafka broker or an exception is thrown. This is simple and reliable, but it reduces throughput because your application is blocked during the commit.consumer.commitAsync(): This performs a non-blocking commit. It sends the request and continues immediately. You can provide an optional callback to check if the commit was successful. This is much faster but adds complexity in handling commit failures (a failed async commit that is retried might commit an old offset after a newer one has already succeeded).
For reliability and simplicity, commitSync() is the best place to start.
Our Final Consumer: A Robust Implementation
Let’s modify our consumer to use manual synchronous commits to guarantee at-least-once processing.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerWithManualCommit {
private static final Logger log = LoggerFactory.getLogger(ConsumerWithManualCommit.class);
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-java-application");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// ★★★ The key change: disable auto-commit
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// Get a reference to the current thread
final Thread mainThread = Thread.currentThread();
// Add a shutdown hook to cleanly close the consumer
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("Detected a shutdown, let's exit by calling consumer.wakeup()...");
consumer.wakeup();
// Join the main thread to allow the execution of the code in the main thread
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
try {
consumer.subscribe(Collections.singletonList("my-first-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
log.info("Processing record -> Key: {}, Value: {}, Partition: {}, Offset: {}",
record.key(), record.value(), record.partition(), record.offset());
// Imagine some business logic here to process the record
}
// After processing the whole batch, we commit the offsets
if (!records.isEmpty()) {
consumer.commitSync();
log.info("Offsets have been committed synchronously!");
}
}
} catch (WakeupException e) {
log.info("Wake up exception! This is expected for a clean shutdown.");
} catch (Exception e) {
log.error("Unexpected exception", e);
} finally {
log.info("The consumer is now gracefully closing...");
consumer.close();
}
}
}
In this code, we poll for records and process them in a loop, just like before. The critical difference is that after the loop finishes for a given batch, we call consumer.commitSync(). This tells Kafka, “I have successfully processed all messages up to this point.” If our application crashes mid-batch, before the commit, we will simply re-process that batch upon restart, thus guaranteeing we never lose data.
We’ve also added a proper shutdown hook. Calling consumer.wakeup() from another thread is the correct way to break out of the poll() loop and allow the consumer to close its connections cleanly.
Conclusion of the Series
Over these five articles, we have built a solid foundation in the world of Java and Kafka. We’ve learned to produce messages with keys for ordering, consume them individually and in scaled-out consumer groups, and finally, to manage offsets manually for reliable, at-least-once processing.
This is just the beginning of what’s possible. From here, you can explore powerful frameworks like Kafka Streams for real-time stream processing, KSQL for querying your data streams with SQL, and the Confluent Schema Registry for managing data evolution.