Post

Hello Kafka 04 - Scaling with Consumer Groups & Rebalancing

Hello Kafka 04 - Scaling with Consumer Groups & Rebalancing

So far in our journey, we’ve built a producer that can guarantee message ordering with keys and confirm delivery with callbacks. We also have a simple consumer that reads messages from a topic. But what happens when the rate of incoming messages becomes too high for a single consumer to handle?

This is where Kafka’s architecture truly shines. The solution isn’t to vertically scale the consumer (i.e., give it a more powerful machine), but to scale horizontally by adding more consumers. This is achieved through the powerful concept of Consumer Groups.

Consumer Groups: The Key to Parallelism

We briefly introduced the group.id property in Part 2. Let’s now explore its profound implications.

When multiple consumer instances are started with the same group.id, they join the same consumer group and coordinate to process messages from a topic. Kafka automatically distributes the topic’s partitions among the active consumers in that group.

Here’s the golden rule: Within a consumer group, a single partition can only be assigned to one consumer at a time.

Let’s visualize this with the 3-partition topic we created in the last article:

  • Scenario 1: 1 Consumer, 3 Partitions
    • The single consumer is assigned all three partitions. It will read messages from Partition 0, then Partition 1, then Partition 2, sequentially.
  • Scenario 2: 3 Consumers, 3 Partitions
    • This is the ideal state for parallelism. Kafka assigns one partition to each consumer. Consumer A gets Partition 0, Consumer B gets Partition 1, and Consumer C gets Partition 2. The group’s processing throughput is now roughly three times that of a single consumer.
  • Scenario 3: 4 Consumers, 3 Partitions
    • Three of the consumers will each be assigned a partition. The fourth consumer will be idle, receiving no messages. You can never have more active consumers in a group than you have partitions in the topic. This is a critical factor to consider when designing your topics.

The Magic of a Rebalance

This automatic assignment of partitions is managed through a process called a rebalance. A rebalance is triggered whenever the composition of the group changes:

  1. A new consumer instance joins the group.
  2. An existing consumer leaves the group (either by shutting down cleanly or by crashing).
  3. The topic’s metadata changes (e.g., more partitions are added).

When a rebalance occurs, Kafka briefly pauses all consumers in the group, revokes their current partition assignments, and re-assigns partitions across the new set of active consumers. This is what makes a Kafka consumer group elastic and fault-tolerant. If one instance crashes, its assigned partitions are automatically handed over to the remaining healthy instances.

Seeing It in Action

Let’s demonstrate this. We’ll use the same SimpleConsumer code from Part 2. The key is that we will run multiple instances of it, all configured with the same group.id.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// From Part 2 - No changes needed!
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleConsumer {

    private static final Logger log = LoggerFactory.getLogger(SimpleConsumer.class);

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        
        // This is the crucial part!
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-java-application"); 
        
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList("my-first-topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                log.info("Consumed record: Key: " + record.key() + ", Value: " + record.value() +
                         ", Partition: " + record.partition());
            }
        }
    }
}

Experiment:

  1. Start one consumer: Open a terminal or use your IDE to run the SimpleConsumer application.
  2. Start the producer: Run the ProducerWithKeysAndCallback application from Part 3. You will see the single consumer processing messages from all three partitions.
  3. Start a second consumer: While the first one is still running, open another terminal or IDE instance and start SimpleConsumer again. You will see log messages indicating a rebalance has occurred. Now, messages from some partitions will appear in the first consumer’s log, and messages from the other partition(s) will appear in the second’s.
  4. Start a third consumer: Do it one more time. After another rebalance, you will likely see each consumer “owning” and processing messages from just one partition.
  5. Stop one consumer: Use Ctrl+C to stop one of the consumer instances. You’ll see another rebalance, and the partitions that were owned by the stopped consumer will be re-assigned to the two remaining consumers.

Keeping Track: Offsets and Commits

How does a consumer group know which messages it has already processed, especially after a rebalance?

Kafka doesn’t track this on a per-message basis. Instead, each consumer group tracks its progress for each partition by storing an offset. The offset is simply a number that indicates the position of the next message to be read.

By default (the configuration in our SimpleConsumer), the consumer automatically “commits” this offset back to Kafka periodically in the background. This is controlled by the enable.auto.commit=true property. While simple, this approach has risks. What if the consumer crashes after fetching messages but before processing them? The offset might have been auto-committed, leading to data loss.

Managing offsets correctly is vital for building reliable systems. The default auto-commit is convenient for getting started, but for production systems, you often need more explicit control.

In the final article of this series, we will tackle these advanced but crucial topics. We’ll explore different delivery semantics (at-least-once, at-most-once, and exactly-once processing) and demonstrate how to take manual control of offset commits to build a truly robust Kafka consumer.

This post is licensed under CC BY 4.0 by the author.