Kafka Consumer Processing Workflow and Thread Safety
This article covers the core aspects of Kafka's consumer processing workflow, focusing on thread safety, group coordination, and partition rebalancing.
Thread Safety Considerations
Thread safety ensures that a class behaves correctly when accessed by multiple threads concurrently, without requiring additional synchronization in the calling code.
Producer Thread Safety
KafkaProducer is designed to be thread-safe. It is implemented as an immutable class, with all fields declared as private final and no methods provided for modification. This design allows a single KafkaProducer instance to be shared safely across multiple threads, optimizing resource usage.
Example of a multi-threaded producer implementation:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.*;
public class MultiThreadedProducer {
private static final int MESSAGE_COUNT = 1000;
private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT);
private static class ProducerTask implements Runnable {
private ProducerRecord<String, String> message;
private KafkaProducer<String, String> producer;
public ProducerTask(ProducerRecord<String, String> message, KafkaProducer<String, String> producer) {
this.message = message;
this.producer = producer;
}
@Override
public void run() {
String threadId = Thread.currentThread().getId() + "-" + System.identityHashCode(producer);
try {
producer.send(message, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
}
if (metadata != null) {
System.out.println(threadId + " | Offset: " + metadata.offset() + ", Partition: " + metadata.partition());
}
});
System.out.println(threadId + ": Sent message [" + message + "]");
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
for (int i = 0; i < MESSAGE_COUNT; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key" + i, "value" + i);
executor.submit(new ProducerTask(record, producer));
}
latch.await();
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
executor.shutdown();
}
}
}
Consumer Thread Safety
KafkaConsumer is not thread-safe. The recommended approach for multi-threaded consumption is thread confinement, where each thread instantiates its own KafkaConsumer object.
Example of a multi-threaded consumer implementation:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
public class MultiThreadedConsumer {
private static final int THREAD_COUNT = 2;
private static ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
private static class ConsumerTask implements Runnable {
private KafkaConsumer<String, String> consumer;
public ConsumerTask(Properties config, String topic) {
this.consumer = new KafkaConsumer<>(config);
consumer.subscribe(Collections.singletonList(topic));
}
@Override
public void run() {
String threadName = Thread.currentThread().getName();
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(threadName + " | Topic: " + record.topic() + ", Partition: " + record.partition() + ", Offset: " + record.offset() + ", Key: " + record.key() + ", Value: " + record.value());
}
}
} finally {
consumer.close();
}
}
}
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
for (int i = 0; i < THREAD_COUNT; i++) {
executor.submit(new ConsumerTask(props, "test-topic"));
}
}
}
Group Coordination
When a consumer joins a group, it sends a JoinGroup request to the group coordinator. The first consumer to join becomes the group leader, responsible for assigning partitions to all member. The leader sends the assignment to the coordinator, which then distributes it to each consumer. This process occurs during group changes, such as new consumers joining or existing ones leaving, or when partitions are added to a topic.
Group Coordinator
The group coordinator is a server-side component in Kafka that manages consumer groups. Key responsibilities include:
- Electing a leader consumer.
- Processing join requests.
- Synchronizing partition assignments after rebalancing.
- Maintaining heartbeat detection with consumers.
- Storing consumed offsets in the
__consumer_offsetstopic.
There are multiple group coordinators, each corresponding to a partition of the __consumer_offsets topic. By default, this topic has 50 partitions, and a consumer group is assigned to one based on the hash of its group.id.
Consumer Coordinator
Each consumer client has a consumer coordinator that interacts with the group coordinator. Its functions include:
- Sending join and sync requests.
- Handling leave requests.
- Maintaining a heartbeat thread.
- Submitting offset commits.
Consumer Join Process
- On startup or reconnection, a consumer sends a JoinGroup request.
- After joining, it receives a callback and sends a SyncGroup request to get the partition assignment.
- On shutdown or failure, a LeaveGroup request is triggered, or the group coordinator may remove the consumer based on heartbeat failures.
- Once joined, the consumer maintains a heartbeat to stay connected.
Offset Storage
Consumers commit offsets to the __consumer_offsets topic, which Kafka manages internally. Each message in this topic uses a key-value format, where the key is a combination of group.id, topic, and partition number, and the value is the offset.
To view consumer group metadata, use the kafka-consumer-groups script:
kafka-consumer-groups.bat --bootstrap-server :9092 --group test-group --describe
Partition Rebalancing
Rebalancing occurs when consumers join or leave a group, or when partitions are added to a topic. It ensures high availability and scalability but can cause temporary unavailability as consumers reassign partitions.
Consumers maintain membership through heartbeats to the group coordinator. If a consumer fails to send heartbeats within max.poll.interval.ms, it is considered dead, triggering a rebalance.
Offset Commit Issues
Consumers track their position in partitions using offsets, committing them to __consumer_offsets. During rebalancing, consumers read the last committed offset to resume processing. Problems can arise if:
- Committed offsets are lower than the last processed offset, causing duplicate message processing.
- Committed offsets are higher, leading to message loss.
Rebalance Listener Implementation
To handle rebalancing events, implement a ConsumerRebalanceListener when subscribing to topics. It provides two methods:
onPartitionsRevoked: Called before rebalancing starts, allowing offset commits.onPartitionsAssigned: Called after rebalancing, before message consumption begins.
Example usage with a 3-partition topic:
consumer.subscribe(Collections.singletonList("rebalance-topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Commit offsets before rebalancing
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Start consumption after assignment
}
});
Rebalancing triggers these methods to manage partition ownership transitions effective.