In previous posts, we explored broker request processing, storage layer architecture, and the replication protocol. Today, we dive into how Kafka manages consumer group membership, partition assignment, and offset commits.
Overview
Kafka’s consumer group protocol enables distributed consumption where multiple consumers coordinate to consume partitions in parallel. The protocol provides:
- Automatic partition assignment: Partitions distributed among active consumers
- Rebalancing: Dynamic membership changes trigger reassignment
- Exactly-once semantics: Coordinated offset commits prevent duplicate/lost messages
- Failure detection: Heartbeat mechanism detects crashed consumers
Key Guarantees
- Each partition consumed by exactly one consumer in a group (exclusive assignment)
- At-least-once delivery by default (with proper offset management)
- Exactly-once semantics when using transactional reads (
isolation.level=read_committed
) - Automatic failover when consumers join/leave
Core Components
Architecture Overview
┌─────────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Group Coordinator (Broker 1) │ │
│ │ • Manages consumer group "my-group" │ │
│ │ • Stores state in __consumer_offsets │ │
│ │ • Handles rebalancing │ │
│ │ • Tracks heartbeats │ │
│ └──────────────────────────────────────────────────┘ │
│ ↕ JoinGroup / SyncGroup / Heartbeat │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Partition: __consumer_offsets-X │ │
│ │ Key: (group.id, topic, partition) │ │
│ │ Value: (offset, metadata, timestamp) │ │
│ └──────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
↕ ↕ ↕
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ Consumer 1 │ │ Consumer 2 │ │ Consumer 3 │
│ Group: my-group │ │ Group: my-group │ │ Group: my-group │
│ Assigned: [0,1] │ │ Assigned: [2,3] │ │ Assigned: [4,5] │
│ Generation: 5 │ │ Generation: 5 │ │ Generation: 5 │
└──────────────────┘ └──────────────────┘ └──────────────────┘
Core Classes
Component | Location | Purpose |
---|---|---|
GroupCoordinator | coordinator/group/GroupCoordinator.scala | Manages consumer group lifecycle |
GroupMetadataManager | coordinator/group/GroupMetadataManager.scala | Persists group state to __consumer_offsets |
KafkaConsumer | clients/src/…/consumer/KafkaConsumer.java | Client-side consumer implementation |
ConsumerCoordinator | clients/src/…/consumer/ConsumerCoordinator.java | Client-side coordination protocol |
PartitionAssignor | clients/src/…/consumer/PartitionAssignor.java | Assignment strategy interface |
Consumer Group Lifecycle
States and Transitions
A consumer group progresses through these states (showing the happy path - additional transitions exist for error cases and group deletion):
Empty
│
↓ First consumer joins
PreparingRebalance ←─────────┐
│ │
↓ All members joined │
CompletingRebalance │
│ │
↓ Leader sends │
Stable ──────────────────┘
Consumer leaves/crashes
Additional transitions (not shown above):
- PreparingRebalance → Empty: All members leave during rebalance
- CompletingRebalance → PreparingRebalance: Member joins/leaves while waiting for assignment
- Any state → Dead: Group deletion (partition emigration or admin operation)
Group Coordinator Selection
Each consumer group is managed by exactly one coordinator, determined by:
// From GroupCoordinatorService.java:394
public int partitionFor(String groupId) {
return Utils.abs(groupId.hashCode()) % numPartitions;
}
Where numPartitions
is the number of partitions in the __consumer_offsets
topic.
The coordinator is the leader of the __consumer_offsets
partition for that group.
Example:
groupId = "my-app"
groupId.hashCode = 1234567
__consumer_offsets partitions = 50
coordinator partition = abs(1234567) % 50 = 17
coordinator broker = leader of __consumer_offsets-17
This ensures:
- Deterministic coordinator selection
- Load distribution across brokers
- Automatic failover (if coordinator broker fails, partition leader election picks new coordinator)
Rebalancing Protocol
The Join-Sync Dance
Rebalancing follows a two-phase protocol:
Phase 1: JoinGroup (Member Discovery)
┌──────────────┐ JoinGroup(group.id, member.id="") ┌──────────────────┐
│ Consumer 1 │ ──────────────────────────────────→ │ │
└──────────────┘ │ │
│ Coordinator │
┌──────────────┐ JoinGroup(group.id, member.id="") │ │
│ Consumer 2 │ ──────────────────────────────────→ │ State: │
└──────────────┘ │ PreparingReb... │
│ │
┌──────────────┐ JoinGroup(group.id, member.id="") │ Waiting for │
│ Consumer 3 │ ──────────────────────────────────→ │ session.timeout │
└──────────────┘ │ or all members │
└──────────────────┘
↓ Timeout or all members joined
┌──────────────┐ ← JoinResponse(generation=5, leader=C1, members=[C1,C2,C3])
│ Consumer 1 │
│ (Leader) │
└──────────────┘
┌──────────────┐ ← JoinResponse(generation=5, leader=C1, members=[])
│ Consumer 2 │
│ (Follower) │
└──────────────┘
┌──────────────┐ ← JoinResponse(generation=5, leader=C1, members=[])
│ Consumer 3 │
│ (Follower) │
└──────────────┘
From GroupMetadataManager.java:6142
:
public CoordinatorResult<Void, CoordinatorRecord> classicGroupJoin(
AuthorizableRequestContext context,
JoinGroupRequestData request,
CompletableFuture<JoinGroupResponseData> responseFuture
) {
Group group = groups.get(request.groupId(), Long.MAX_VALUE);
if (group != null) {
if (group.type() == CONSUMER && !group.isEmpty()) {
return classicGroupJoinToConsumerGroup((ConsumerGroup) group, context, request, responseFuture);
} else if (group.type() == CONSUMER || group.type() == CLASSIC || group.type() == STREAMS && group.isEmpty()) {
return classicGroupJoinToClassicGroup(context, request, responseFuture);
} else {
responseFuture.complete(new JoinGroupResponseData()
.setMemberId(UNKNOWN_MEMBER_ID)
.setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code())
);
return EMPTY_RESULT;
}
} else {
return classicGroupJoinToClassicGroup(context, request, responseFuture);
}
}
Key points:
- First joiner becomes the leader (selects assignment strategy)
- Leader receives full member list and metadata
- Followers receive only generation ID
- Coordinator assigns a unique
member.id
to each consumer
Phase 2: SyncGroup (Assignment Distribution)
┌──────────────┐
│ Consumer 1 │ Computes assignment using RangeAssignor/RoundRobinAssignor
│ (Leader) │ assignment = {C1: [p0,p1], C2: [p2,p3], C3: [p4,p5]}
└──────────────┘
│
↓ SyncGroup(generation=5, assignment={...})
┌──────────────────┐
│ Coordinator │ Stores assignment
└──────────────────┘
↓
↓ SyncResponse(assignment=[p0,p1]) ┌──────────────┐
└────────────────────────────────→ │ Consumer 1 │
└──────────────┘
↓ SyncResponse(assignment=[p2,p3]) ┌──────────────┐
└────────────────────────────────→ │ Consumer 2 │
└──────────────┘
↓ SyncResponse(assignment=[p4,p5]) ┌──────────────┐
└────────────────────────────────→ │ Consumer 3 │
└──────────────┘
From GroupMetadataManager.java:7381
:
public CoordinatorResult<Void, CoordinatorRecord> classicGroupSync(
AuthorizableRequestContext context,
SyncGroupRequestData request,
CompletableFuture<SyncGroupResponseData> responseFuture
) {
Group group;
try {
group = group(request.groupId());
} catch (GroupIdNotFoundException e) {
responseFuture.complete(new SyncGroupResponseData()
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
return EMPTY_RESULT;
}
if (group.type() == CLASSIC) {
return classicGroupSyncToClassicGroup((ClassicGroup) group, context, request, responseFuture);
} else if (group.type() == CONSUMER) {
return classicGroupSyncToConsumerGroup((ConsumerGroup) group, request, responseFuture);
} else {
responseFuture.complete(new SyncGroupResponseData()
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
return EMPTY_RESULT;
}
}
Rebalancing Triggers
Rebalancing is triggered when:
- Consumer joins (new member or existing member rejoins after crash)
- Consumer leaves (graceful shutdown via
LeaveGroup
) - Consumer crashes (detected via heartbeat timeout)
- Subscription changes (consumer changes topic subscriptions)
- Partition count changes (new partitions added to subscribed topics)
From GroupMetadataManager.java:6946
:
CoordinatorResult<Void, CoordinatorRecord> prepareRebalance(
ClassicGroup group,
String reason
) {
// If any members are awaiting sync, cancel their request and have them rejoin.
if (group.isInState(COMPLETING_REBALANCE)) {
resetAndPropagateAssignmentWithError(group, Errors.REBALANCE_IN_PROGRESS);
}
// If a sync expiration is pending, cancel it.
removeSyncExpiration(group);
group.transitionTo(PREPARING_REBALANCE);
log.info("Preparing to rebalance group {} in state {} with old generation {} (reason: {}).",
group.groupId(),
group.currentState(),
group.generationId(),
reason);
return EMPTY_RESULT;
}
Generation ID and Fencing
Each rebalance increments the generation ID, which serves as an epoch to fence stale consumers:
// From ClassicGroup.java:1289
public void initNextGeneration() {
generationId++;
if (!members.isEmpty()) {
setProtocolName(Optional.of(selectProtocol()));
subscribedTopics = computeSubscribedTopics();
transitionTo(COMPLETING_REBALANCE);
} else {
setProtocolName(Optional.empty());
subscribedTopics = computeSubscribedTopics();
transitionTo(EMPTY);
}
clearPendingSyncMembers();
}
Fencing Example:
1. Consumer C1 assigned partition 0 in generation 5
2. Network partition isolates C1
3. Coordinator triggers rebalance → generation 6
4. Consumer C2 assigned partition 0 in generation 6
5. C1 reconnects and tries to commit offset for partition 0
→ Rejected with ILLEGAL_GENERATION error
→ C1 must rejoin and get new assignment
This prevents zombie consumers from committing offsets for partitions they no longer own.
Partition Assignment Strategies
Built-in Strategies
Kafka provides three built-in assignment strategies:
1. Range Assignor
Algorithm: Assign partitions per topic in sequential ranges.
From RangeAssignor.java:148
:
// Simplified algorithm (actual implementation includes rack-aware assignment):
// 1. For each topic independently:
// - Sort consumers lexicographically
// - Sort partitions numerically
// - Divide partitions evenly: numPartitions / numConsumers
// - First (numPartitions % numConsumers) consumers get one extra partition
public Map<String, List<TopicPartition>> assign(
Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
}
Example:
Topic: topic-A (10 partitions), topic-B (10 partitions)
Consumers: C1, C2, C3
Assignment:
C1: topic-A[0,1,2,3], topic-B[0,1,2,3] (8 partitions)
C2: topic-A[4,5,6], topic-B[4,5,6] (6 partitions)
C3: topic-A[7,8,9], topic-B[7,8,9] (6 partitions)
Issue: Can lead to imbalanced assignment when consuming multiple topics.
2. RoundRobin Assignor
Algorithm: Distribute partitions across all topics in a round-robin fashion.
From RoundRobinAssignor.java:104
:
// Simplified algorithm:
// 1. Sort all partitions across all topics alphabetically
// 2. Sort all consumers alphabetically
// 3. Use circular iterator to assign partitions round-robin to consumers
// (skipping consumers not subscribed to that partition's topic)
public Map<String, List<TopicPartition>> assign(
Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
CircularIterator<MemberInfo> assigner = new CircularIterator<>(Utils.sorted(memberInfoList));
for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
// Skip to next consumer subscribed to this topic
while (!subscriptions.get(assigner.peek().memberId).topics().contains(partition.topic()))
assigner.next();
assignment.get(assigner.next().memberId).add(partition);
}
return assignment;
}
Example:
Topic: topic-A (5 partitions), topic-B (5 partitions)
Consumers: C1, C2, C3
Sorted partitions: [A-0, A-1, A-2, A-3, A-4, B-0, B-1, B-2, B-3, B-4]
Assignment (round-robin):
C1: A-0, A-3, B-1, B-4 (4 partitions)
C2: A-1, A-4, B-2 (3 partitions)
C3: A-2, B-0, B-3 (3 partitions)
Better balance across consumers, but can cause more partition movement during rebalancing.
3. Sticky Assignor
Algorithm: Minimize partition movement during rebalancing while maintaining balance.
Goals:
- Balanced assignment (like RoundRobin)
- Minimize partition reassignment when members join/leave (preserve existing assignments where possible)
From AbstractStickyAssignor.java:118
(parent class of StickyAssignor):
// Simplified algorithm:
// 1. Extract previous partition assignments from member metadata (owned partitions)
// 2. If all consumers subscribe to same topics:
// - Use ConstrainedAssignmentBuilder (optimized algorithm)
// - Preserves existing assignments where possible
// 3. Otherwise:
// - Use GeneralAssignmentBuilder
// - Compute sticky assignment from scratch
// 4. Balance unassigned partitions across consumers while minimizing movement
public Map<String, List<TopicPartition>> assign(
Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
AbstractAssignmentBuilder assignmentBuilder;
if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, /*...*/)) {
assignmentBuilder = new ConstrainedAssignmentBuilder(/* preserve assignments */);
} else {
assignmentBuilder = new GeneralAssignmentBuilder(/* recompute */);
}
return assignmentBuilder.build();
}
Example (rebalance scenario):
Initial state (3 consumers):
C1: [A-0, A-1, B-0]
C2: [A-2, A-3, B-1]
C3: [A-4, B-2, B-3]
C2 leaves → triggers rebalance
RoundRobin would produce:
C1: [A-0, A-2, B-0, B-2] ← 3 new partitions (A-2, B-0, B-2)
C3: [A-1, A-3, A-4, B-1, B-3] ← 3 new partitions
StickyAssignor produces:
C1: [A-0, A-1, B-0, A-2] ← 1 new partition (A-2)
C3: [A-4, B-2, B-3, A-3, B-1] ← 2 new partitions (A-3, B-1)
Partition movement: 3 instead of 6
Benefits:
- Reduces consumer-side state rebuilding (caches, offset windows, etc.)
- Lowers rebalancing latency
- Maintains balance across consumers
Cooperative Rebalancing (Incremental)
Problem with Eager Rebalancing:
- During rebalance, all consumers stop consuming
- Causes consumption lag spike
- Entire partition set reassigned, even if only one consumer joins/leaves
Solution: Cooperative (Incremental) Rebalancing (KIP-429)
Key idea: Only revoke partitions that need to be reassigned.
Traditional Eager Rebalancing:
C1: [p0, p1, p2] → revoke all → stop consuming
C2: [p3, p4, p5] → revoke all → stop consuming
C3 joins
↓ rebalance
C1: [p0, p1] ← resume consuming
C2: [p2, p3] ← resume consuming
C3: [p4, p5] ← resume consuming
Total consumption pause: ~2-5 seconds for all partitions
Cooperative Rebalancing:
C1: [p0, p1, p2] → continue consuming p0, p1
C2: [p3, p4, p5] → continue consuming p3, p4
C3 joins
↓ rebalance (incremental)
C1 revokes p2 → C3 gets p2
C2 revokes p5 → C3 gets p5
Total consumption pause: ~500ms for only reassigned partitions (p2, p5)
From ConsumerCoordinator.java:824
:
switch (protocol) {
case EAGER:
// revoke all partitions
revokedPartitions.addAll(subscriptions.assignedPartitions());
exception = rebalanceListenerInvoker.invokePartitionsRevoked(revokedPartitions);
subscriptions.assignFromSubscribed(Collections.emptySet());
break;
case COOPERATIVE:
// only revoke those partitions that are not in the subscription anymore.
Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
revokedPartitions.addAll(ownedPartitions.stream()
.filter(tp -> !subscriptions.subscription().contains(tp.topic()))
.collect(Collectors.toSet()));
if (!revokedPartitions.isEmpty()) {
exception = rebalanceListenerInvoker.invokePartitionsRevoked(revokedPartitions);
ownedPartitions.removeAll(revokedPartitions);
subscriptions.assignFromSubscribed(ownedPartitions);
}
break;
}
Trade-off:
- Eager: Simple, all consumers sync at once (stopped-the-world)
- Cooperative: Complex (multi-round rebalancing), but minimal consumption disruption
Offset Management
Offset Commit Protocol
Offsets are stored in the __consumer_offsets
topic, a compacted log where:
- Key:
(group.id, topic, partition)
- Value:
(offset, metadata, commit_timestamp, expire_timestamp)
__consumer_offsets-17:
Key: ("my-app", "orders", 0) → Value: (offset=12345, commit_ts=1697890000)
Key: ("my-app", "orders", 1) → Value: (offset=67890, commit_ts=1697890001)
Key: ("my-app", "orders", 2) → Value: (offset=11111, commit_ts=1697890002)
From OffsetMetadataManager.java:600
:
public CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> commitOffset(
AuthorizableRequestContext context,
OffsetCommitRequestData request
) throws ApiException {
Group group = validateOffsetCommit(context, request);
// In the old consumer group protocol, the offset commits maintain the session if
// the group is in Stable or PreparingRebalance state.
if (group.type() == Group.GroupType.CLASSIC) {
ClassicGroup classicGroup = (ClassicGroup) group;
if (classicGroup.isInState(STABLE) || classicGroup.isInState(PREPARING_REBALANCE)) {
groupMetadataManager.rescheduleClassicGroupMemberHeartbeat(
classicGroup,
classicGroup.member(request.memberId())
);
}
}
final OffsetCommitResponseData response = new OffsetCommitResponseData();
final List<CoordinatorRecord> records = new ArrayList<>();
final long currentTimeMs = time.milliseconds();
final OptionalLong expireTimestampMs = expireTimestampMs(request.retentionTimeMs(), currentTimeMs);
request.topics().forEach(topic -> {
topic.partitions().forEach(partition -> {
// Build offset commit record
final OffsetAndMetadata offsetAndMetadata = OffsetAndMetadata.fromRequest(
topic.topicId(),
partition,
currentTimeMs,
expireTimestampMs
);
// Add record to be written to __consumer_offsets
records.add(CoordinatorRecordHelpers.newOffsetCommitRecord(
request.groupId(),
topic.name(),
partition.partitionIndex(),
offsetAndMetadata,
metadataImage.features().metadataVersion()
));
});
});
return new CoordinatorResult<>(records, response);
}
Commit Strategies
1. Auto-Commit (Default)
// Consumer config
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000"); // Commit every 5 seconds
// Consumer loop (simplified)
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<K, V> record : records) {
processRecord(record);
}
// Auto-commit happens in background
// - Triggered every auto.commit.interval.ms
// - Commits offsets from last poll()
}
Problem: At-least-once with potential duplicates
1. poll() returns messages [100-199]
2. Process messages [100-149]
3. Consumer crashes
4. Auto-commit was scheduled but didn't run
5. Consumer restarts, offset still at 99
6. Re-processes messages [100-149] → DUPLICATES
2. Manual Commit (Sync)
props.put("enable.auto.commit", "false");
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<K, V> record : records) {
processRecord(record);
}
// Synchronous commit after processing batch
consumer.commitSync(); // Blocks until offsets written to __consumer_offsets
}
Guarantee: At-least-once
- If crash occurs after processing but before commit → duplicates on restart
- If crash occurs after commit → no duplicates
3. Manual Commit (Async)
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<K, V> record : records) {
processRecord(record);
}
// Asynchronous commit (non-blocking)
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.error("Offset commit failed", exception);
}
});
}
Trade-off: Lower latency but no guarantee that commit succeeded before crash.
4. Exactly-Once with Transactions
// Producer side (transactional writes)
producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic", key, value));
producer.commitTransaction(); // Atomically commits messages + offsets
// Consumer side (read committed only)
props.put("isolation.level", "read_committed");
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<K, V> record : records) {
processRecord(record);
// Send output transactionally
producer.beginTransaction();
producer.send(new ProducerRecord<>("output-topic", ...));
producer.sendOffsetsToTransaction(
Map.of(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)),
consumer.groupMetadata()
);
producer.commitTransaction();
}
}
Guarantee: Exactly-once end-to-end
- Offset commit and output message write are atomic
- If transaction fails, neither offset nor output is committed
- Consumer skips aborted transactions (
isolation.level=read_committed
)
Heartbeat and Session Management
Heartbeat Thread
Consumers send periodic heartbeats to the coordinator to signal liveness.
From AbstractCoordinator.java:1454
:
private class HeartbeatThread extends BaseHeartbeatThread {
@Override
protected void onSend() {
synchronized (AbstractCoordinator.this) {
if (!isRunning() || state != MemberState.STABLE) {
// The coordinator may have failed while sending heartbeat
return;
}
if (coordinatorUnknown()) {
// Try to find the coordinator
lookupCoordinator();
} else {
// Send heartbeat
long now = time.milliseconds();
lastHeartbeatSend = now;
HeartbeatRequest.Builder requestBuilder = new HeartbeatRequest.Builder(
new HeartbeatRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(memberId)
.setGenerationId(generation.generationId)
.setGroupInstanceId(rebalanceConfig.groupInstanceId.orElse(null))
);
client.send(coordinator, requestBuilder)
.compose(new HeartbeatResponseHandler(generation));
}
}
}
}
Session Timeout Detection
The coordinator tracks the last heartbeat from each member:
From GroupMetadataManager.java:7660
:
public CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> classicGroupHeartbeat(
AuthorizableRequestContext context,
HeartbeatRequestData request
) {
Group group;
try {
group = group(request.groupId());
} catch (GroupIdNotFoundException e) {
throw new UnknownMemberIdException(
String.format("Group %s not found.", request.groupId())
);
}
if (group.type() == CLASSIC) {
return classicGroupHeartbeatToClassicGroup((ClassicGroup) group, context, request);
} else if (group.type() == CONSUMER) {
return classicGroupHeartbeatToConsumerGroup((ConsumerGroup) group, context, request);
} else {
throw new UnknownMemberIdException(
String.format("Group %s not found.", request.groupId())
);
}
}
Failure Detection:
1. Consumer C1 sends heartbeat at t=0
2. Coordinator sets timeout = t + session.timeout.ms (e.g., t + 10s)
3. C1 sends heartbeat at t=3 → timeout reset to t + 13s
4. C1 crashes at t=5
5. No heartbeat received by t=13s
6. Coordinator removes C1 from group
7. Triggers rebalance (generation N → N+1)
8. Remaining consumers rejoin and get new assignments
Configuration Tuning
Parameter | Default | Purpose | Recommendation |
---|---|---|---|
session.timeout.ms |
45000 | Max time between heartbeats before member evicted | 10000-30000 (balance detection speed vs stability) |
heartbeat.interval.ms |
3000 | Frequency of heartbeat requests | session.timeout.ms / 3 |
max.poll.interval.ms |
300000 | Max time between poll() calls before rebalance | Set based on max processing time per batch |
rebalance.timeout.ms |
300000 | Max time for all members to rejoin during rebalance | Increase for large groups (1000+ consumers) |
Relationship:
heartbeat.interval.ms < session.timeout.ms < max.poll.interval.ms
Example:
heartbeat.interval.ms = 3000 (3 sec)
session.timeout.ms = 10000 (10 sec)
max.poll.interval.ms = 300000 (5 min)
Why max.poll.interval.ms
> session.timeout.ms
?
- Heartbeat thread runs independently of
poll()
thread - Consumer can heartbeat while processing messages from last
poll()
- If
poll()
not called withinmax.poll.interval.ms
→ consumer assumed stuck → triggers rebalance
Summary
Kafka’s consumer group protocol provides distributed, fault-tolerant consumption through:
- Group Coordinator: Centralized component managing group membership and partition assignment
- Join-Sync Protocol: Two-phase rebalancing ensures all consumers receive consistent assignments
- Generation IDs: Epoch-based fencing prevents zombie consumers from committing stale offsets
- Heartbeat Mechanism: Detects failed consumers and triggers automatic rebalancing
- Pluggable Assignment Strategies: Range, RoundRobin, Sticky, and Cooperative for different trade-offs
- Offset Commit Protocol: Durable offset storage in
__consumer_offsets
topic - Static Membership: Eliminates unnecessary rebalances during rolling restarts
The protocol achieves at-least-once delivery by default, with exactly-once semantics available through transactional APIs.
In the next post, we’ll take a practical approach to consumer groups an what we learned so far in this Kafka internal series.
This series explores Apache Kafka’s internal architecture at the code level. All references are to the Apache Kafka 4.1.0+ codebase.