In the previous post, we explored how Kafkaβs broker processes requests using the reactor pattern. Today, we dive into how Kafka efficiently stores and retrieves billions of messages using a sophisticated storage architecture built on log segments, memory-mapped files, and clever indexing.
Overview
Kafkaβs storage layer is designed around an append-only log abstraction. Each partition is stored as a UnifiedLog
which consists of multiple immutable log segments. This architecture enables:
- Sequential writes: ~500MB/s per disk vs ~100KB/s for random writes
- Zero-copy reads: Direct page cache to network transfers
- Efficient retention: Delete entire segments, no compaction needed
- Simple recovery: Replay from last checkpoint
Partition (UnifiedLog)
βββ Log Segments (ordered by base offset)
β βββ 00000000000000000000.log (actual messages)
β βββ 00000000000000000000.index (offset β position)
β βββ 00000000000000000000.timeindex (timestamp β offset)
β βββ 00000000000000000000.txnindex (transaction metadata)
βββ 00000000000000100000.log
βββ 00000000000000100000.index
βββ ...
Architecture Components
Core Classes
Component | Location | Purpose |
---|---|---|
UnifiedLog | storage/internals/log/UnifiedLog.java | Partition-level abstraction managing segments |
LogSegment | storage/internals/log/LogSegment.java | Individual segment with data + indexes |
FileRecords | clients/common/record/FileRecords.java | File-backed message storage |
OffsetIndex | storage/internals/log/OffsetIndex.java | Maps offsets to file positions |
TimeIndex | storage/internals/log/TimeIndex.java | Maps timestamps to offsets |
File Layout on Disk
Example for topic my-topic
partition 0:
/kafka-logs/my-topic-0/
βββ 00000000000000000000.log (1GB, sealed)
βββ 00000000000000000000.index (10MB)
βββ 00000000000000000000.timeindex (10MB)
βββ 00000000000001000000.log (1GB, sealed)
βββ 00000000000001000000.index (10MB)
βββ 00000000000001000000.timeindex (10MB)
βββ 00000000000002000000.log (active, growing)
βββ 00000000000002000000.index (active, sparse)
βββ 00000000000002000000.timeindex (active, sparse)
βββ leader-epoch-checkpoint (epoch boundaries)
βββ partition.metadata (topic ID)
Naming convention: {baseOffset}.{extension}
where base offset is zero-padded to 20 digits for lexicographic sorting.
Log Segments
A log segment is the fundamental storage unit in Kafka. Each segment contains:
-
.log
file - The actual message data (FileRecords) -
.index
file - Offset index for fast lookups -
.timeindex
file - Time-based index -
.txnindex
file - Transaction index (for transactional data)
Segment Rolling
Segments are immutable once sealed. A new segment is created (βrolledβ) when:
// LogSegment.java:167
public boolean shouldRoll(RollParams rollParams) throws IOException {
boolean reachedRollMs = timeWaitedForRoll(...) > rollParams.maxSegmentMs() - rollJitterMs;
int size = size();
return size > rollParams.maxSegmentBytes() - rollParams.messagesSize() ||
(size > 0 && reachedRollMs) ||
offsetIndex().isFull() || timeIndex().isFull() ||
!canConvertToRelativeOffset(rollParams.maxOffsetInMessages());
}
Trigger conditions:
- Segment size exceeds
segment.bytes
(default: 1GB) - Age exceeds
segment.ms
(default: 7 days) - Offset index is full
- Time index is full
- Offset overflow (canβt fit in 4-byte relative offset)
Indexing Strategy
Memory-Mapped Files
Kafka uses memory-mapped I/O for indexes to achieve high performance.
Why Memory Mapping?
From AbstractIndex.java
// AbstractIndex.java:100
private void createAndAssignMmap() throws IOException {
RandomAccessFile raf = new RandomAccessFile(file, writable ? "rw" : "r");
try {
if (newlyCreated) {
// Pre-allocate the file
raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize()));
}
long length = raf.length();
MappedByteBuffer mmap = createMappedBuffer(raf, newlyCreated, length, writable, entrySize());
this.length = length;
this.mmap = mmap;
} finally {
Utils.closeQuietly(raf, "index " + file.getName());
}
}
Benefits of memory-mapped files:
Benefit | Description |
---|---|
Zero-copy | OS maps file directly to memory; no read() syscalls |
Page cache | OS manages caching; warm indexes stay in RAM |
Concurrent reads | Multiple threads read without locks (coordinated via remapLock) |
Lazy loading | Pages loaded on-demand via page faults |
Index Structure
OffsetIndex (OffsetIndex.java:54):
- Maps logical offset β physical file position
- Sparse index: One entry every
index.interval.bytes
(default: 4KB) - Entry size: 8 bytes (4-byte relative offset + 4-byte position)
- Uses binary search for lookups
// OffsetIndex.java:97
public OffsetPosition lookup(long targetOffset) {
return inRemapReadLock(() -> {
ByteBuffer idx = mmap().duplicate();
int slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY);
if (slot == -1)
return new OffsetPosition(baseOffset(), 0);
else
return parseEntry(idx, slot);
});
}
TimeIndex (TimeIndex.java:54):
- Maps timestamp β offset
- Also sparse (one entry per
index.interval.bytes
) - Entry size: 12 bytes (8-byte timestamp + 4-byte relative offset)
- Guarantees monotonically increasing timestamps
Data Flow
Write Path: Appending Messages
Letβs trace a produce request writing to disk:
Producer β Broker β UnifiedLog.append() β LogSegment.append() β FileRecords β Disk
Step-by-step Flow
1. Entry point (UnifiedLog.java:1081):
private LogAppendInfo append(MemoryRecords records, ...) {
// Validate records
LogAppendInfo appendInfo = analyzeAndValidateRecords(records, ...);
synchronized (lock) {
// Assign offsets
PrimitiveRef.LongRef offset = PrimitiveRef.ofLong(localLog.logEndOffset());
// Validate and possibly compress
LogValidator validator = new LogValidator(...);
ValidationResult validationResult = validator.validateMessagesAndAssignOffsets(...);
// Get or create active segment
LogSegment segment = localLog.maybeRoll(...);
// Append to segment
segment.append(appendInfo.lastOffset(), ..., validationResult.validatedRecords());
// Update indexes
updateLogEndOffset(appendInfo.lastOffset() + 1);
}
}
2. Write to FileRecords (FileRecords.java:193):
public int append(MemoryRecords records) throws IOException {
int written = records.writeFullyTo(channel); // Direct write to FileChannel
size.getAndAdd(written);
return written;
}
3. Update index (LogSegment.java - invoked during append):
// Update offset index every index.interval.bytes
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex().append(offset, physicalPosition);
timeIndex().append(timestamp, offset);
bytesSinceLastIndexEntry = 0;
}
Write Performance
Technique | Benefit |
---|---|
Append-only | Sequential writes ~100MB/s vs random ~100KB/s on HDDs |
Batching | Multiple records per write syscall |
Page cache | OS buffers writes; sync based on flush.ms |
Direct I/O | FileChannel writes bypass JVM heap |
Read Path: Fetching Messages
Fetch flow:
Consumer β Broker β UnifiedLog.read() β LogSegment.read() β FileRecords β Zero-copy transfer
Step-by-step Flow
1. Find starting segment (UnifiedLog.java:1604):
public FetchDataInfo read(long startOffset, int maxLength, FetchIsolation isolation, ...) {
checkLogStartOffset(startOffset);
// Determine max offset based on isolation level
LogOffsetMetadata maxOffsetMetadata = switch (isolation) {
case LOG_END -> localLog.logEndOffsetMetadata();
case HIGH_WATERMARK -> fetchHighWatermarkMetadata();
case TXN_COMMITTED -> fetchLastStableOffsetMetadata();
};
return localLog.read(startOffset, maxLength, minOneMessage, maxOffsetMetadata, ...);
}
2. Use offset index to find position (simplified):
// Find segment containing startOffset (binary search on segment base offsets)
LogSegment segment = segments.floorEntry(startOffset);
// Look up position in offset index
OffsetPosition startPosition = segment.offsetIndex().lookup(startOffset);
// Read from file starting at that position
FileRecords records = segment.log().slice(startPosition.position(), maxLength);
3. Zero-copy transfer (for network sends):
// FileRecords can transfer directly to socket via sendfile()
long bytesTransferred = records.writeTo(channel, position, maxSize);
Binary Search for Offset Lookup
The offset index uses a clever binary search implementation:
// AbstractIndex.java
protected int largestLowerBoundSlotFor(ByteBuffer idx, long target, IndexSearchType searchEntity) {
// Binary search for largest entry <= target
int lo = 0;
int hi = entries() - 1;
while (lo < hi) {
int mid = ceil(hi, lo);
long found = getSearchEntity(idx, mid, searchEntity);
if (found == target)
return mid;
else if (found < target)
lo = mid;
else
hi = mid - 1;
}
return lo;
}
Time complexity: O(log N) where N = number of index entries
Example:
- Segment has 1GB of data
- Index entry every 4KB β ~250,000 entries
- Binary search: ~18 comparisons to find offset
Log Retention & Compaction
Kafka supports two retention policies:
- Delete - Remove old segments based on time/size
- Compact - Keep only the latest value for each key
Log Compaction
From LogCleaner.java:49:
The cleaner is responsible for removing obsolete records from logs which have the βcompactβ retention strategy.
A message with key K and offset O is obsolete if there exists a message with key K and offset Oβ such that O < Oβ.
Compaction Process:
- Build keyβoffset map for dirty section (OffsetMap)
- Recopy segments, omitting obsolete messages
- Merge small segments to avoid fragmentation
- Swap cleaned segments atomically
OffsetMap: Memory-Efficient Deduplication
The OffsetMap (SkimpyOffsetMap.java) is a specialized hash table:
- Preallocated array: No resizing, fixed memory
- Open addressing: Linear probing for collisions
- 24-byte entries: Hash (8B) + Offset (8B) + Position (8B)
-
Configurable size:
log.cleaner.dedupe.buffer.size
(default: 128MB) - Capacity: 128MB / 24B β 5.5M unique keys per cleaning
Retention Policies
Time-based retention (log.retention.ms
):
// Delete segments older than retention time
segments.forEach(segment -> {
if (segment.largestTimestamp() < now - retentionMs) {
deleteSegment(segment);
}
});
Size-based retention (log.retention.bytes
):
// Delete oldest segments until size < retention.bytes
while (totalSize > retentionBytes && segments.size() > 1) {
deleteSegment(segments.firstEntry());
}
Design Patterns & Performance
Key Design Patterns
Pattern | Implementation | Benefit |
---|---|---|
Log-structured storage | Append-only segments | Sequential writes, simple recovery |
Sparse indexing | Entry every 4KB | Memory-efficient with good lookup speed |
Memory mapping | MappedByteBuffer for indexes | Zero-copy, OS page cache integration |
Segment rolling | Immutable sealed segments | Parallel operations, easy deletion |
Zero-copy transfer | sendfile() for network | Bypass kernelβuserβkernel copies |
Tiered storage | Local + remote segments | Cost efficiency for old data |
Summary
Kafkaβs storage layer achieves exceptional performance through:
- Append-only log segments - Leverage sequential I/O
- Memory-mapped indexes - Fast lookups with OS page cache
- Sparse indexing - Balance memory vs lookup speed
- Zero-copy transfers - Minimize data copying
- Segment-based lifecycle - Simple retention and compaction
The design allows Kafka to handle millions of messages/second while maintaining durability and efficient disk usage.
In the next post, weβll explore the replication protocol (i.e., how Kafka ensures data durability across brokers using leader-follower replication and the high watermark mechanism).
References in this post point to Apache Kafka 4.1.0+ codebase in the storage module.