In previous posts, we explored broker request processing and storage layer architecture. Today, we dive into Kafkaβs replication protocol (i.e., the mechanism that ensures data durability and availability across a distributed cluster).
Overview
Kafkaβs replication protocol is a leader-follower replication scheme where each partition has:
- One leader replica that handles all reads and writes
- Multiple follower replicas that replicate data from the leader
- An ISR (In-Sync Replicas) set tracking which replicas are caught up
The protocol provides:
-
Linearizable writes with configurable durability (
acks=all
) - High availability through automatic leader failover
- Strong consistency guarantees for committed data
- Tunable trade-offs between latency, throughput, and durability
CAP Theorem Trade-offs
Kafkaβs replication protocol navigates the CAP theorem by choosing CP (Consistency + Partition tolerance) over availability in the face of network partitions:
-
Consistency (C): Achieved through the ISR mechanism and high watermark. With
acks=all
andmin.insync.replicas > 1
, all committed data is guaranteed to be replicated to multiple brokers before acknowledgment. -
Availability (A): Sacrificed when
ISR.size < min.insync.replicas
. In this scenario, Kafka rejects writes to maintain consistency guarantees. - Partition tolerance (P): Built-in through the distributed architecture. Kafka continues operating despite network partitions, but may become unavailable for writes if insufficient replicas remain in-sync.
Configurable trade-off: Setting min.insync.replicas = 1
shifts toward AP (prioritizing availability), allowing writes as long as the leader is available, at the cost of potential data loss during leader failures.
Key Components
Core Classes
Component | Location | Purpose |
---|---|---|
Partition | core/cluster/Partition.scala | Manages per-partition replication state |
ReplicaManager | core/server/ReplicaManager.scala | Coordinates replication across partitions |
ReplicaFetcherThread | core/server/ReplicaFetcherThread.scala | Follower fetch mechanism |
AlterPartitionManager | - | Manages ISR change requests to controller |
Replication Topology
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Controller (KRaft) β
β β’ Manages cluster metadata β
β β’ Approves ISR changes via AlterPartition β
β β’ Elects leaders when needed β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
AlterPartition LeaderAndIsr
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Partition: topic-0 β
β Leader: Broker 1 β
β ISR: {1, 2, 3} β
β LEO: 1000 HW: 950 β
β ββββββββββββββββββββββββββββββββββββββββββββββββ β
β β LocalLog β β
β β [msg0...msg999] β β
β ββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Fetch (offset=950) β Fetch (offset=950)
ββββββββββββββββββββ ββββββββββββββββββββ
β Follower Broker 2β β Follower Broker 3β
β LEO: 950 β β LEO: 950 β
β ISR member β β ISR member β
ββββββββββββββββββββ ββββββββββββββββββββ
High Watermark (HW) and Log End Offset (LEO)
Definitions
Log End Offset (LEO): The next offset to be written in a log
- Each replica maintains its own LEO
- Leader tracks LEO for all replicas via fetch requests
High Watermark (HW): The maximum offset that is guaranteed to be replicated to all ISR members
- Marks the boundary of committed vs uncommitted data
- Consumers can only read up to HW (with
read_committed
isolation) - HW = min(LEO of all ISR members)
- In other words it can be also described as Upper Limit/Bound offset for consumers to read.
HW Advancement Logic
From Partition.scala:1159:
private def maybeIncrementLeaderHW(leaderLog: UnifiedLog, currentTimeMs: Long): Boolean = {
if (isUnderMinIsr) {
trace(s"Not increasing HWM because partition is under min ISR")
return false
}
val leaderLogEndOffset = leaderLog.logEndOffsetMetadata
var newHighWatermark = leaderLogEndOffset
remoteReplicasMap.forEach { (_, replica) =>
val replicaState = replica.stateSnapshot
// Consider replica if it's in ISR or caught up and eligible to join
if (replicaState.logEndOffsetMetadata.messageOffset < newHighWatermark.messageOffset &&
(partitionState.maximalIsr.contains(replica.brokerId) ||
shouldWaitForReplicaToJoinIsr)) {
newHighWatermark = replicaState.logEndOffsetMetadata
}
}
leaderLog.maybeIncrementHighWatermark(newHighWatermark).toScala match {
case Some(oldHighWatermark) =>
debug(s"High watermark updated from $oldHighWatermark to $newHighWatermark")
true
case None =>
false
}
}
Key insights:
- HW only advances if
ISR.size >= min.insync.replicas
- Uses βmaximal ISRβ concept (includes pending additions) for safety
- New replicas joining ISR are waited on to prevent premature HW advancement
High Watermark Propagation
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Leader (Broker 1) β
β LEO: 1000 β
β HW: 950 (min of all ISR LEOs) β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β
ββ Fetch Request (from Broker 2)
β β Response includes leader HW=950
ββββββββββββββββββββββββββ
β Follower (Broker 2) β
β LEO: 950 β
β Updates local HW: 950 β
ββββββββββββββββββββββββββ
From ReplicaFetcherThread.scala:130:
log.maybeUpdateHighWatermark(partitionData.highWatermark).ifPresent { newHighWatermark =>
maybeUpdateHighWatermarkMessage = s"and updated replica high watermark to $newHighWatermark"
partitionsWithNewHighWatermark += topicPartition
}
In-Sync Replicas (ISR)
ISR Semantics
The ISR is the dynamic set of replicas that are:
- Caught up to the leaderβs LEO (within
replica.lag.time.max.ms
) - Eligible to become leader (not fenced, not shutting down)
- Have matching broker epochs
ISR Expansion
A follower is added to ISR when:
- Its LEO >= leaderβs HW
- Its LEO >= leader epoch start offset
- Itβs not fenced and not in controlled shutdown
- Broker epoch matches (for fencing)
From Partition.scala:1056:
private def isFollowerInSync(followerReplica: Replica): Boolean = {
leaderLogIfLocal.exists { leaderLog =>
val followerEndOffset = followerReplica.stateSnapshot.logEndOffset
followerEndOffset >= leaderLog.highWatermark &&
leaderEpochStartOffsetOpt.exists(followerEndOffset >= _)
}
}
Why both conditions?
-
followerEndOffset >= HW
: Ensures follower has all committed data -
followerEndOffset >= leaderEpochStartOffset
: Prevents follower from joining ISR before catching up to current leader epochβs data
ISR Shrinking
From Partition.scala:1282:
private def isFollowerOutOfSync(replicaId: Int,
leaderEndOffset: Long,
currentTimeMs: Long,
maxLagMs: Long): Boolean = {
getReplica(replicaId).fold(true) { followerReplica =>
!followerReplica.stateSnapshot.isCaughtUp(leaderEndOffset, currentTimeMs, maxLagMs)
}
}
A replica is removed from ISR when:
- It hasnβt fetched up to the leaderβs LEO within
replica.lag.time.max.ms
- Itβs fenced or shutting down
Two types of lag:
-
Stuck followers: LEO hasnβt updated for
maxLagMs
-
Slow followers: Havenβt caught up within
maxLagMs
Both are handled by tracking lastCaughtUpTimeMs
which represents the last time the replica was fully caught up.
AlterPartition: ISR Change Protocol
The Problem
Before KIP-497, ISR changes were written directly to ZooKeeper, causing:
- High ZK write load (100s of writes/sec)
- Frequent leadership changes during instability
- No transactional guarantees
The Solution: AlterPartition Request
Leaders propose ISR changes to the controller via AlterPartition
request:
ββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Partition Leader (Broker 1) β
β 1. Detects follower caught up or lagging β
β 2. Prepares new ISR state β
β 3. Optimistically updates local ISR β
ββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
β AlterPartition(partitionEpoch, leaderEpoch, newISR, brokerEpochs)
ββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Controller β
β 1. Validates partition epoch β
β 2. Validates leader epoch β
β 3. Checks broker epochs (fencing) β
β 4. Approves or rejects change β
ββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
β AlterPartitionResponse(success/failure, newPartitionEpoch)
ββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Partition Leader (Broker 1) β
β 1. On success: commits ISR change locally β
β 2. On failure: rolls back to last committed β
ββββββββββββββββββββββββββββββββββββββββββββββββββββ
From Partition.scala:1754:
private def prepareIsrExpand(
currentState: CommittedPartitionState,
newInSyncReplicaId: Int
): PendingExpandIsr = {
val isrToSend = partitionState.isr + newInSyncReplicaId
val isrWithBrokerEpoch = addBrokerEpochToIsr(isrToSend.toList).asJava
val newLeaderAndIsr = new LeaderAndIsr(
localBrokerId,
leaderEpoch,
partitionState.leaderRecoveryState,
isrWithBrokerEpoch,
partitionEpoch
)
val updatedState = PendingExpandIsr(
newInSyncReplicaId,
newLeaderAndIsr,
currentState
)
partitionState = updatedState // Optimistic update
updatedState
}
Partition State Machine
βββββββββββββββββββββββββββββββ
β CommittedPartitionState β
β ISR: {1, 2, 3} β
β partitionEpoch: 42 β
β isInflight: false β
βββββββββββββββββββββββββββββββ
β ISR change detected
β
βββββββββββββββββββββββββββββββ
β PendingExpandIsr/ShrinkIsr β
β maximalISR: {1, 2, 3, 4} β β Used for HW calculation
β committed ISR: {1, 2, 3} β β Fallback on failure
β isInflight: true β
βββββββββββββββββββββββββββββββ
β AlterPartition response
β
βββββββββββββββββββββββββββββββ
β CommittedPartitionState β
β ISR: {1, 2, 3, 4} β
β partitionEpoch: 43 β
β isInflight: false β
βββββββββββββββββββββββββββββββ
Maximal ISR (KIP-497):
- For ISR expansion: includes pending additions
- For ISR shrinking: excludes pending removals
- Used for HW advancement to maintain safety
This ensures:
- ISR expansions are optimistic (more restrictive, safer)
- ISR shrinks are pessimistic (wait for confirmation)
Follower Fetch Protocol
Fetch Thread Architecture
Each follower broker runs ReplicaFetcherThread
instances to replicate from leaders:
βββββββββββββββββββββββββββββββββββββββββββββββ
β Follower Broker 2 β
β β
β ReplicaFetcherManager β
β ββ FetcherThread-1 β Leader Broker 1 β
β β ββ Partitions: {topic-0, topic-1} β
β ββ FetcherThread-2 β Leader Broker 3 β
β ββ Partitions: {topic-2, topic-3} β
βββββββββββββββββββββββββββββββββββββββββββββββ
Fetch Loop
From ReplicaFetcherThread.scala
override def processPartitionData(
topicPartition: TopicPartition,
fetchOffset: Long,
partitionLeaderEpoch: Int,
partitionData: FetchData
): Option[LogAppendInfo] = {
val partition = replicaMgr.getPartitionOrException(topicPartition)
val log = partition.localLogOrException
val records = toMemoryRecords(FetchResponse.recordsOrFail(partitionData))
// Sanity check: fetch offset should match LEO
if (fetchOffset != log.logEndOffset)
throw new IllegalStateException(s"Offset mismatch for $topicPartition")
// Append to local log
val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(
records,
isFuture = false,
partitionLeaderEpoch
)
// Update high watermark from leader
log.maybeUpdateHighWatermark(partitionData.highWatermark).ifPresent { newHW =>
partitionsWithNewHighWatermark += topicPartition
}
logAppendInfo
}
Key steps:
- Fetch records from leader starting at local LEO
- Validate offset consistency
- Append to local log
- Update local HW from leaderβs HW in fetch response
- Track partitions with new HW to complete delayed fetch requests
Truncation on Divergence
When a followerβs log diverges from the leader (e.g., after leader failover), it must truncate:
From ReplicaFetcherThread.scala:165:
override def truncate(tp: TopicPartition, offsetTruncationState: OffsetTruncationState): Unit = {
val partition = replicaMgr.getPartitionOrException(tp)
partition.truncateTo(offsetTruncationState.offset, isFuture = false)
// Mark future replica for truncation if this is the last truncation
if (offsetTruncationState.truncationCompleted)
replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(
brokerConfig.brokerId,
tp,
offsetTruncationState.offset
)
}
Divergence detection:
- Follower sends
lastFetchedEpoch
in fetch request - Leader checks if its log at that epoch has diverged
- If diverged, leader returns
divergingEpoch
in response - Follower truncates to the divergence point
Leader Election
Election Triggers
Leader election occurs when:
- Current leader fails (broker crash, network partition)
- Current leader is fenced (controller detects staleness)
- Manual leadership transfer (preferred leader election)
Election Process
ββββββββββββββββββββββββββββββββββββββββββββββ
β Controller detects leader failure β
ββββββββββββββββββββββββββββββββββββββββββββββ
β
β
ββββββββββββββββββββββββββββββββββββββββββββββ
β Select new leader from ISR β
β Priority: First replica in ISR β
ββββββββββββββββββββββββββββββββββββββββββββββ
β
β
ββββββββββββββββββββββββββββββββββββββββββββββ
β Increment partition epoch β
β Increment leader epoch β
ββββββββββββββββββββββββββββββββββββββββββββββ
β
β
ββββββββββββββββββββββββββββββββββββββββββββββ
β Send LeaderAndIsr to all replicas β
β (via metadata log in KRaft) β
ββββββββββββββββββββββββββββββββββββββββββββββ
β
β
ββββββββββββββββββββββββββββββββββββββββββββββ
β New leader: makeLeader() β
β Followers: makeFollower() β
ββββββββββββββββββββββββββββββββββββββββββββββ
makeLeader Transition
From Partition.scala:736:
def makeLeader(partitionRegistration: PartitionRegistration,
isNew: Boolean,
highWatermarkCheckpoints: OffsetCheckpoints,
topicId: Option[Uuid]): Boolean = {
val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
// Validate partition epoch
if (partitionRegistration.partitionEpoch < partitionEpoch) {
stateChangeLogger.info(s"Skipped become-leader for $topicPartition since " +
s"partition epoch ${partitionRegistration.partitionEpoch} < current $partitionEpoch")
return false
}
val isNewLeader = !isLeader
val isNewLeaderEpoch = partitionRegistration.leaderEpoch > leaderEpoch
// Update ISR and assignment
updateAssignmentAndIsr(
replicas = partitionRegistration.replicas,
isLeader = true,
isr = partitionRegistration.isr.toSet,
...
)
createLogIfNotExists(...)
val leaderLog = localLogOrException
if (isNewLeaderEpoch) {
val leaderEpochStartOffset = leaderLog.logEndOffset
leaderLog.assignEpochStartOffset(partitionRegistration.leaderEpoch, leaderEpochStartOffset)
// Reset replica states
remoteReplicas.foreach { replica =>
replica.resetReplicaState(currentTimeMs, leaderEpochStartOffset, ...)
}
leaderEpoch = partitionRegistration.leaderEpoch
leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
}
partitionEpoch = partitionRegistration.partitionEpoch
leaderReplicaIdOpt = Some(localBrokerId)
(maybeIncrementLeaderHW(leaderLog), isNewLeader)
}
if (leaderHWIncremented)
tryCompleteDelayedRequests()
isNewLeader
}
Critical points:
- Epoch validation: Rejects stale metadata
- Leader epoch start offset: Cached to help followers truncate correctly
- Replica state reset: All followers start with unknown LEO
- HW advancement: May immediately advance if ISR size >= minISR
Durability Guarantees
Producer Acknowledgment Semantics
acks |
Behavior | Durability | Latency |
---|---|---|---|
0 |
Fire-and-forget | None | Lowest |
1 |
Leader appends to log | Leader crash loses data | Medium |
all |
All ISR replicas append | No data loss (with min.insync.replicas > 1 ) |
Highest |
From Partition.scala:1368:
def appendRecordsToLeader(records: MemoryRecords,
origin: AppendOrigin,
requiredAcks: Int,
requestLocal: RequestLocal): LogAppendInfo = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
leaderLogIfLocal match {
case Some(leaderLog) =>
val minIsr = effectiveMinIsr(leaderLog)
val inSyncSize = partitionState.isr.size
// Check min.insync.replicas for acks=all
if (inSyncSize < minIsr && requiredAcks == -1) {
throw new NotEnoughReplicasException(
s"ISR size $inSyncSize < min.isr $minIsr for partition $topicPartition")
}
val info = leaderLog.appendAsLeader(records, this.leaderEpoch, origin, ...)
(info, maybeIncrementLeaderHW(leaderLog))
case None =>
throw new NotLeaderOrFollowerException(...)
}
}
info.copy(if (leaderHWIncremented) LeaderHwChange.INCREASED else LeaderHwChange.SAME)
}
Delayed Produce (acks=all)
When acks=all
, the response is delayed until:
- All ISR replicas have fetched the data (LEO >= requiredOffset), OR
- Timeout expires (
request.timeout.ms
)
From Partition.scala:1096:
def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
leaderLogIfLocal match {
case Some(leaderLog) =>
val curMaximalIsr = partitionState.maximalIsr
val minIsr = effectiveMinIsr(leaderLog)
if (leaderLog.highWatermark >= requiredOffset) {
// HW advanced - all ISR replicas have the data
if (minIsr <= curMaximalIsr.size)
(true, Errors.NONE)
else
(true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
} else {
(false, Errors.NONE) // Still waiting
}
case None =>
(false, Errors.NOT_LEADER_OR_FOLLOWER)
}
}
The purgatory pattern ensures:
- Handler threads donβt block waiting for replication
- Responses sent immediately when replicas catch up
- Timeouts handled gracefully
Replica Fencing with Broker Epochs
The Problem
Without fencing, zombie brokers (network-partitioned but still running) could:
- Continue accepting writes as leader
- Corrupt data after a new leader is elected
- Cause split-brain scenarios
Split-brain example:
1. Broker 1 is leader for partition topic-0
2. Network partition isolates Broker 1 from the controller
3. Controller elects Broker 2 as new leader (increments partition epoch)
4. Without fencing:
ββ Broker 1 (zombie): Still thinks it's leader, accepts writes at offset 1000+
ββ Broker 2 (new leader): Also accepts writes at offset 1000+
5. Result: Two divergent logs with different data at same offsets
6. When partition heals: Data corruption, inconsistent replicas
With broker epoch fencing, Broker 1βs writes are rejected because its broker epoch is stale, preventing the split-brain.
The Solution: Broker Epochs
Each broker maintains a monotonically increasing broker epoch:
- Incremented on every registration with controller
- Sent with every fetch request
- Validated during ISR changes
From Partition.scala:1063:
private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
val mayBeReplica = getReplica(followerReplicaId)
if (mayBeReplica.isEmpty) return false
val storedBrokerEpoch = mayBeReplica.get.stateSnapshot.brokerEpoch
val cachedBrokerEpoch = metadataCache.getAliveBrokerEpoch(followerReplicaId)
!metadataCache.isBrokerFenced(followerReplicaId) &&
!metadataCache.isBrokerShuttingDown(followerReplicaId) &&
isBrokerEpochIsrEligible(storedBrokerEpoch, cachedBrokerEpoch)
}
Fencing flow:
- Broker crashes and restarts
- Broker re-registers with controller, gets new broker epoch
- Old fetch requests (with old broker epoch) are rejected
- Replica cannot join ISR until broker epochs match
Configuration Tuning
Replication Factor
replication.factor = 3 # Tolerates 2 failures with min.insync.replicas=2
Trade-offs:
- Higher RF β More durability, higher storage cost, higher replication overhead
- Typical: RF=3 for production, RF=1 for development
min.insync.replicas
min.insync.replicas = 2 # With RF=3, tolerates 1 failure
Formula: min.insync.replicas = f + 1
where f
= number of tolerable failures
Impact:
- Affects write availability (requires minISR replicas to accept writes)
- Affects HW advancement (HW only advances if ISR >= minISR)
replica.lag.time.max.ms
replica.lag.time.max.ms = 30000 # 30 seconds
Meaning: Maximum time a follower can be out of sync before removal from ISR
Considerations:
- Too low β ISR thrashing during transient slowness
- Too high β Delayed detection of failed replicas
unclean.leader.election.enable
unclean.leader.election.enable = false # Default: prioritize consistency
When true
: Allows out-of-sync replicas to become leader
Risk: Potential data loss, non-monotonic HW
Use case: Prefer availability over consistency (rare) (i.e., moving to AP over CP => CAP Theorem)
KIP-966: Enhanced Durability with ELR
KIP-966 introduces Eligible Leader Replicas (ELR) to address the βlast replica standingβ problem. This feature was added in Kafka 4.0 (preview) and enabled by default for new clusters in Kafka 4.1.
The Problem
With traditional ISR:
- ISR = {1, 2, 3}, minISR = 2
- Replica 3 lags and is removed β ISR = {1, 2}
- Replica 2 fails β ISR = {1}
- If Replica 1 fails, partition becomes unavailable (no ISR member)
The Solution: ELR
ELR is a βsafety netβ containing replicas that:
- Were removed from ISR
- Still host the complete committed log
- Are managed by the controller
Election priority:
- Select from ISR (if available)
- Select from ELR (if ISR empty)
- Unclean election (if both empty and enabled)
Benefits:
- Reduces data loss risk
- Maintains availability during cascading failures
- Provides recovery quorum
Monitoring ISR Health
Key Metrics
Metric | Meaning | Alert Threshold |
---|---|---|
UnderReplicatedPartitions |
Partitions with replicas < RF | > 0 |
OfflinePartitionsCount |
Partitions with no leader | > 0 |
UnderMinIsrPartitionCount |
ISR < minISR | > 0 |
IsrExpandsPerSec |
ISR additions/sec | High variance |
IsrShrinksPerSec |
ISR removals/sec | Sustained increase |
FailedIsrUpdatesPerSec |
Failed AlterPartition requests | > 0 |
From ReplicaManager.scala:312:
val isrExpandRate: Meter = metricsGroup.newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS)
val isrShrinkRate: Meter = metricsGroup.newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS)
val failedIsrUpdatesRate: Meter = metricsGroup.newMeter("FailedIsrUpdatesPerSec", "failedUpdates", TimeUnit.SECONDS)
Common ISR Issues
ISR Thrashing:
- Symptom: High
IsrExpandsPerSec
andIsrShrinksPerSec
- Causes: Network instability, GC pauses, slow disks
- Solution: Increase
replica.lag.time.max.ms
, investigate broker performance
Persistent UnderMinIsr:
- Symptom: Sustained
UnderMinIsrPartitionCount > 0
- Causes: Broker failures, disk failures, network partitions
- Solution: Replace failed brokers, check partition reassignment
Failed ISR Updates:
- Symptom:
FailedIsrUpdatesPerSec > 0
- Causes: Controller unavailable, partition epoch conflicts
- Solution: Check controller health, investigate metadata log
Summary
Kafkaβs replication protocol achieves strong durability and availability through:
- Leader-follower architecture with dynamic ISR tracking
- High watermark mechanism for committed data safety
- AlterPartition protocol for transactional ISR changes with controller validation
- Broker epoch fencing to prevent split-brain scenarios
- Configurable trade-offs between latency, throughput, and durability
-
Delayed produce pattern for efficient
acks=all
handling
The protocolβs key innovation is the separation of replication (ISR) from availability (leader election), allowing Kafka to maintain strong consistency while providing high availability through automatic failover.
In the next post, weβll explore consumer groups and coordination (i.e., how Kafka manages consumer group membership, partition assignment, and offset commits).
This series explores Apache Kafkaβs internal architecture at the code level. All references are to the Apache Kafka 4.1.0+ codebase.