Post

Hello Kafka 03 - Producer Deep Dive with Keys & Callbacks

Hello Kafka 03 - Producer Deep Dive with Keys & Callbacks

In the first two articles of our “Hello Kafka” series, we established a basic communication channel. We learned how to produce a message to a topic and then consume that message with a separate application. While functional, our producer was a “fire-and-forget” implementation. We sent the message and just hoped for the best.

In the real world, we need more control and reliability. What if the message fails to send? And how does Kafka handle related messages that need to be processed in a specific order?

This post will enhance our producer by exploring two fundamental concepts: Message Keys for ordering and Callbacks for handling send confirmations.

The Power of the Key: Guaranteeing Order

So far, our ProducerRecord has only contained a topic and a value. However, there’s an optional third parameter: the key.

1
2
3
4
5
// Our old record
new ProducerRecord<>("my-first-topic", "hello, kafka!");

// A record with a key
new ProducerRecord<>("my-first-topic", "user_id_123", "User logged in");

What does this key do? Its primary purpose is to control which partition a message lands in within a topic.

By default, Kafka’s producer works like this:

  • If the key is null, the producer will send messages to partitions in a round-robin fashion to balance the load. (Message 1 -> Partition 0, Message 2 -> Partition 1, Message 3 -> Partition 0, and so on).
  • If a key is provided, the producer hashes the key and uses that hash to determine the partition.

This leads to a critical guarantee from Kafka: all messages with the same key will always be written to the same partition. Since consumers read messages from a single partition in the exact order they were written, this ensures that all messages for a specific key are processed sequentially.

Imagine tracking a user’s actions on a website. By using the user_id as the key for all events related to that user, you can guarantee that you will process their “login” event before their “add to cart” event, and that before their “checkout” event.

Don’t Just Fire and Forget: Using Callbacks

Our producer’s send() method is asynchronous. It adds the message to an internal buffer and returns immediately, allowing our application to continue working without waiting for a network round-trip. This is great for performance, but it means we don’t know if the message actually made it to the Kafka broker.

To solve this, the send() method can accept a Callback object as a second argument. This callback is a piece of code that will be executed once the broker acknowledges receipt of the message (or if sending failed).

The callback has a single method, onCompletion, which receives two arguments: RecordMetadata and an Exception.

  • If the send was successful, RecordMetadata will be populated with details like the topic, partition, and offset of the new message. The Exception will be null.
  • If the send failed, RecordMetadata will be null, and the Exception will describe what went wrong (e.g., the broker was unreachable).

Putting It All Together: An Enhanced Producer

Let’s modify our producer code to send several messages with keys and use a callback to log the result of each send.

First, to better demonstrate partitioning, let’s recreate our topic with 3 partitions instead of just one.

1
2
3
4
5
# First, delete the old topic
docker-compose exec kafka kafka-topics --delete --topic my-first-topic --bootstrap-server localhost:9092

# Now, create it with 3 partitions
docker-compose exec kafka kafka-topics --create --topic my-first-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

Now, let’s update our Java code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class ProducerWithKeysAndCallback {

    private static final Logger log = LoggerFactory.getLogger(ProducerWithKeysAndCallback.class);

    public static void main(String[] args) throws InterruptedException {
        // 1. Create Producer Properties
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 2. Create the Producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 10; i++) {
            String topic = "my-first-topic";
            String key = "id_" + (i % 3); // We'll have keys id_0, id_1, id_2
            String value = "message " + i;

            // 3. Create a producer record
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);

            // 4. Send data with a callback
            producer.send(record, (metadata, exception) -> {
                // Executes every time a record is successfully sent or an exception is thrown
                if (exception == null) {
                    // The record was successfully sent
                    log.info("Received new metadata. \n" +
                            "Topic: " + metadata.topic() + "\n" +
                            "Key: " + record.key() + "\n" +
                            "Partition: " + metadata.partition() + "\n" +
                            "Offset: " + metadata.offset() + "\n" +
                            "Timestamp: " + metadata.timestamp());
                } else {
                    log.error("Error while producing", exception);
                }
            });
            
            // A small sleep to observe batches being sent
            Thread.sleep(500);
        }

        // 5. Flush and close producer
        // This is crucial to ensure the last messages are sent before the app exits
        producer.flush();
        producer.close();
    }
}

When you run this code, you’ll see in the logs that all messages with the key id_0 go to one partition, all messages with id_1 go to another, and all with id_2 go to the third. You’ve now taken control of your data’s ordering! You also have the peace of mind that comes with knowing whether each message was successfully delivered.

In our next article, we’ll shift our focus back to the consumer. We’ll explore the dynamics of consumer groups, see how multiple consumers can work together to process a topic in parallel, and discuss how Kafka keeps track of what’s been read.

This post is licensed under CC BY 4.0 by the author.