Hello Kafka 02 - Consuming Messages
Welcome to the second part of our “Hello Kafka” series! In the first article, we successfully sent our very first message, “hello, kafka!”, to a topic using a Java producer. But sending a message is only half the story. The real power of Kafka is unlocked when another application can read and process that message.
Today, we’ll build the other side of the coin: a Java consumer that will subscribe to our topic and read the message we sent.
The Role of the Consumer
A Kafka consumer’s job is to subscribe to one or more topics and read the messages in the order they were produced. But there’s a key concept that makes consumers incredibly powerful and scalable: Consumer Groups.
When you start a consumer, you assign it a group.id.
- If multiple consumers have the same
group.id, they work together to process messages from a topic. Kafka automatically distributes the topic’s partitions among the consumers in that group, ensuring each message is only processed by one consumer in the group. This is how you achieve parallel processing and load balancing. - If multiple consumers have different
group.ids, they will each receive a full copy of all the messages from the topic. This is ideal for when different applications need to process the same data stream independently.
For our example, we’ll start with a single consumer in a group.
Your First Java Consumer: Reading the Message
Just like our producer, the consumer requires the same Kafka client library. If you’re continuing in the same Maven project, you’re already set up.
1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
Here’s a simple Java class that will act as our Kafka consumer:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SimpleConsumer {
private static final Logger log = LoggerFactory.getLogger(SimpleConsumer.class);
public static void main(String[] args) {
// 1. Create Consumer Properties
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-java-application");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 2. Create the Consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 3. Subscribe consumer to our topic(s)
consumer.subscribe(Collections.singletonList("my-first-topic"));
// 4. Poll for new data
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
log.info("Key: " + record.key() + ", Value: " + record.value());
log.info("Partition: " + record.partition() + ", Offset:" + record.offset());
}
}
}
}
Dissecting the Code:
- Create Consumer Properties:
BOOTSTRAP_SERVERS_CONFIG: Same as the producer, this points to our Kafka broker.KEY_DESERIALIZER_CLASS_CONFIG&VALUE_DESERIALIZER_CLASS_CONFIG: This is the inverse of the producer’s serializer. The consumer receives bytes from Kafka and needs to know how to convert them back into a Java String. We use theStringDeserializer.GROUP_ID_CONFIG: This is a unique string that identifies the consumer group this consumer belongs to. It’s a required property.AUTO_OFFSET_RESET_CONFIG: This tells the consumer what to do when it starts for the first time without a saved position (offset).earliestmeans it will read from the very beginning of the topic.latestwould mean it only reads new messages sent after it starts.
Create the Consumer: We instantiate a
KafkaConsumerwith our configured properties.Subscribe to Topic(s): The
subscribe()method is how we tell Kafka what we’re interested in. Here, we subscribe to ourmy-first-topic. You can pass a list of multiple topic names here.- The Poll Loop: This is the heart of the consumer.
- Consumers are not pushed messages; they actively ask, or “poll”, Kafka for new messages.
consumer.poll(Duration.ofMillis(100))asks Kafka if there are any new records. The consumer will wait up to 100 milliseconds for messages to arrive. If there are none, it returns an empty collection.- The
while(true)loop ensures our consumer is always running and ready to process new messages as they arrive. - When records are fetched, we iterate through them and can access their value, key, partition, and offset.
Seeing it in Action
- First, run the
SimpleConsumerapplication’smainmethod. You’ll see it start up, but nothing will be printed to the console yet. It’s now polling, waiting for messages. - Now, go back to the code from Part 1 and run the
SimpleProducerapplication. - As soon as the producer sends the message, switch back to the console for your
SimpleConsumer. You should see the message details printed out instantly!
1
2
INFO: Key: null, Value: hello, kafka!
INFO: Partition: 0, Offset:0
Congratulations! You have now built a complete, end-to-end event streaming pipeline. A producer sent a message, Kafka stored it durably, and a consumer read and processed it.
In the next part of our series, we’ll dive deeper into producers. We’ll explore how to handle send confirmations using callbacks and the important role that message keys play in partitioning. Stay tuned