KA
Kafka Internals
Kafka core v1.0.0
Kafka Internals
Overview
Apache Kafka is a distributed event streaming platform. Understanding Kafka’s internal architecture—brokers, partitions, replication, and storage—is essential for building reliable, high-throughput event-driven systems.
Key Concepts
Kafka Architecture
┌─────────────────────────────────────────────────────────────┐
│ Kafka Cluster │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │Broker 1 │ │Broker 2 │ │Broker 3 │ │
│ │ │ │ │ │ │ │
│ │ Topic A │ │ Topic A │ │ Topic A │ │
│ │ P0(L) │ │ P0(F) │ │ P1(L) │ │
│ │ P1(F) │ │ P2(L) │ │ P2(F) │ │
│ │ │ │ │ │ │ │
│ │ Topic B │ │ Topic B │ │ Topic B │ │
│ │ P0(F) │ │ P0(L) │ │ P1(L) │ │
│ │ P1(F) │ │ P1(F) │ │ P0(F) │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │ │ │ │
│ └────────────┼────────────┘ │
│ │ │
│ ┌─────┴─────┐ │
│ │Controller │ (elected broker) │
│ └───────────┘ │
│ │ │
│ ┌─────────────────────┐ │
│ │ KRaft / ZooKeeper │ │
│ │ (metadata store) │ │
│ └─────────────────────┘ │
│ │
│ L = Leader, F = Follower │
│ Replication Factor = 3 │
│ │
└─────────────────────────────────────────────────────────────┘
Partition Storage:
┌─────────────────────────────────────────────────────────────┐
│ Partition 0 │
│ ┌─────────┬─────────┬─────────┬─────────┬─────────┐ │
│ │Segment 0│Segment 1│Segment 2│Segment 3│Segment 4│ │
│ │ (old) │ │ │ │ (active)│ │
│ └─────────┴─────────┴─────────┴─────────┴─────────┘ │
│ │
│ Each Segment: │
│ ├── 00000000000000000000.log (messages) │
│ ├── 00000000000000000000.index (offset → position) │
│ └── 00000000000000000000.timeindex (timestamp → offset) │
│ │
└─────────────────────────────────────────────────────────────┘
Key Metrics
| Metric | Description | Target |
|---|---|---|
| Under-replicated partitions | Partitions without full ISR | 0 |
| Offline partitions | Partitions without leader | 0 |
| Request latency (p99) | Produce/Fetch latency | < 100ms |
| Consumer lag | Messages behind head | Near 0 |
Best Practices
1. Plan Partition Count Carefully
More partitions = higher parallelism but more overhead.
2. Set Appropriate Replication Factor
Use 3 for production data durability.
3. Monitor ISR Shrinkage
Under-replicated partitions indicate problems.
4. Configure Retention Properly
Balance storage costs with replay requirements.
5. Use KRaft for New Clusters
ZooKeeper mode is deprecated.
Code Examples
Example 1: Topic Configuration
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.TopicConfig;
public class TopicManagement {
private final AdminClient adminClient;
public TopicManagement(String bootstrapServers) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
this.adminClient = AdminClient.create(props);
}
public void createTopic(String topicName, int partitions, short replicationFactor)
throws ExecutionException, InterruptedException {
// Topic configuration
Map<String, String> configs = new HashMap<>();
// Retention
configs.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(7 * 24 * 60 * 60 * 1000L)); // 7 days
configs.put(TopicConfig.RETENTION_BYTES_CONFIG, String.valueOf(10L * 1024 * 1024 * 1024)); // 10GB per partition
// Compaction for state topics
// configs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
// Segment configuration
configs.put(TopicConfig.SEGMENT_MS_CONFIG, String.valueOf(24 * 60 * 60 * 1000L)); // 1 day
configs.put(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(1024 * 1024 * 1024)); // 1GB
// Replication
configs.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2");
// Compression
configs.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4");
// Message size
configs.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(10 * 1024 * 1024)); // 10MB
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor)
.configs(configs);
CreateTopicsResult result = adminClient.createTopics(List.of(newTopic));
result.all().get(); // Wait for completion
System.out.println("Created topic: " + topicName);
}
public void createCompactedTopic(String topicName, int partitions, short replicationFactor)
throws ExecutionException, InterruptedException {
Map<String, String> configs = new HashMap<>();
// Compaction settings
configs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
configs.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, String.valueOf(60 * 60 * 1000L)); // 1 hour
configs.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, String.valueOf(24 * 60 * 60 * 1000L)); // 1 day
configs.put(TopicConfig.SEGMENT_MS_CONFIG, String.valueOf(60 * 60 * 1000L)); // 1 hour segments
configs.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2");
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor)
.configs(configs);
adminClient.createTopics(List.of(newTopic)).all().get();
}
public Map<String, TopicDescription> describeTopics(Collection<String> topicNames)
throws ExecutionException, InterruptedException {
return adminClient.describeTopics(topicNames).allTopicNames().get();
}
public void increasePartitions(String topicName, int newPartitionCount)
throws ExecutionException, InterruptedException {
Map<String, NewPartitions> newPartitions = Map.of(
topicName, NewPartitions.increaseTo(newPartitionCount)
);
adminClient.createPartitions(newPartitions).all().get();
}
public void close() {
adminClient.close();
}
}
Example 2: Understanding Partition Assignment
public class PartitionAssignment {
/**
* Demonstrates how messages are assigned to partitions
*/
public static class KeyPartitioningExample {
private final KafkaProducer<String, String> producer;
public KeyPartitioningExample(String bootstrapServers) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
this.producer = new KafkaProducer<>(props);
}
public void demonstratePartitioning(String topic) {
// Same key always goes to same partition
String customerId = "customer-123";
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(
topic,
customerId, // Key determines partition
"Order " + i
);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Sent to partition %d, offset %d%n",
metadata.partition(), metadata.offset());
}
});
}
// All 10 messages will go to the same partition
}
public void sendToSpecificPartition(String topic, int partition) {
// Explicit partition assignment
ProducerRecord<String, String> record = new ProducerRecord<>(
topic,
partition, // Explicit partition
"key",
"value"
);
producer.send(record);
}
}
/**
* Custom partitioner for business logic
*/
public static class PriorityPartitioner implements Partitioner {
private static final int HIGH_PRIORITY_PARTITION = 0;
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (key == null) {
// Round-robin for null keys
return ThreadLocalRandom.current().nextInt(numPartitions);
}
String keyString = key.toString();
// Route high-priority to dedicated partition
if (keyString.startsWith("PRIORITY_HIGH")) {
return HIGH_PRIORITY_PARTITION;
}
// Consistent hashing for other keys
return Math.abs(keyString.hashCode()) % numPartitions;
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
}
Example 3: Broker Configuration
# Server Basics
broker.id=0
listeners=PLAINTEXT://:9092,SSL://:9093
advertised.listeners=PLAINTEXT://broker1.example.com:9092
inter.broker.listener.name=PLAINTEXT
# KRaft Mode (ZooKeeper-less)
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@broker1:9093,2@broker2:9093,3@broker3:9093
controller.listener.names=CONTROLLER
# Log Configuration
log.dirs=/var/kafka/data1,/var/kafka/data2
num.partitions=12
default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false
# Log Retention
log.retention.hours=168
log.retention.bytes=-1
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# Network and I/O
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# Replication
num.replica.fetchers=4
replica.fetch.max.bytes=10485760
replica.socket.timeout.ms=30000
replica.lag.time.max.ms=30000
# Producer/Consumer Quotas
quota.producer.default=10485760
quota.consumer.default=10485760
# Request Processing
queued.max.requests=500
request.timeout.ms=30000
# Compression
compression.type=producer
Example 4: Monitoring Kafka Internals
public class KafkaMonitoring {
private final AdminClient adminClient;
private final MeterRegistry meterRegistry;
public void monitorClusterHealth() throws ExecutionException, InterruptedException {
// Describe cluster
DescribeClusterResult clusterResult = adminClient.describeCluster();
Collection<Node> nodes = clusterResult.nodes().get();
Node controller = clusterResult.controller().get();
System.out.printf("Cluster has %d brokers, controller is %d%n",
nodes.size(), controller.id());
// Check for under-replicated partitions
Map<String, TopicDescription> topics = adminClient.describeTopics(
adminClient.listTopics().names().get()
).allTopicNames().get();
int underReplicated = 0;
int offline = 0;
for (TopicDescription topic : topics.values()) {
for (TopicPartitionInfo partition : topic.partitions()) {
if (partition.leader() == null) {
offline++;
System.err.printf("OFFLINE: %s-%d%n", topic.name(), partition.partition());
} else if (partition.isr().size() < partition.replicas().size()) {
underReplicated++;
System.err.printf("UNDER-REPLICATED: %s-%d (ISR: %d/%d)%n",
topic.name(), partition.partition(),
partition.isr().size(), partition.replicas().size());
}
}
}
// Record metrics
meterRegistry.gauge("kafka.partitions.offline", offline);
meterRegistry.gauge("kafka.partitions.under_replicated", underReplicated);
}
public void monitorConsumerLag(String groupId) throws ExecutionException, InterruptedException {
// Get consumer group offsets
Map<TopicPartition, OffsetAndMetadata> offsets = adminClient
.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata()
.get();
// Get end offsets (high watermarks)
Map<TopicPartition, OffsetSpec> offsetRequest = offsets.keySet().stream()
.collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()));
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
adminClient.listOffsets(offsetRequest).all().get();
// Calculate lag
long totalLag = 0;
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
TopicPartition tp = entry.getKey();
long committed = entry.getValue().offset();
long end = endOffsets.get(tp).offset();
long lag = end - committed;
System.out.printf("%s-%d: lag = %d%n", tp.topic(), tp.partition(), lag);
meterRegistry.gauge(
"kafka.consumer.lag",
Tags.of("group", groupId, "topic", tp.topic(), "partition", String.valueOf(tp.partition())),
lag
);
totalLag += lag;
}
System.out.printf("Total lag for %s: %d%n", groupId, totalLag);
}
public void monitorProducerMetrics(KafkaProducer<?, ?> producer) {
// Get producer metrics
Map<MetricName, ? extends Metric> metrics = producer.metrics();
metrics.forEach((name, metric) -> {
if (name.group().equals("producer-metrics")) {
switch (name.name()) {
case "record-send-rate":
case "request-latency-avg":
case "request-latency-max":
case "batch-size-avg":
case "records-per-request-avg":
System.out.printf("%s: %s%n", name.name(), metric.metricValue());
break;
}
}
});
}
}
Example 5: Log Segment Analysis
public class LogSegmentAnalysis {
/**
* Analyze log segment files for debugging
*/
public void analyzeLogSegment(Path segmentPath) throws IOException {
// Log segment file structure
Path logFile = segmentPath.resolve("00000000000000000000.log");
Path indexFile = segmentPath.resolve("00000000000000000000.index");
Path timeIndexFile = segmentPath.resolve("00000000000000000000.timeindex");
// Read log file
try (FileChannel channel = FileChannel.open(logFile, StandardOpenOption.READ)) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
while (channel.read(buffer) > 0) {
buffer.flip();
// Parse record batch header
long baseOffset = buffer.getLong();
int batchLength = buffer.getInt();
int partitionLeaderEpoch = buffer.getInt();
byte magic = buffer.get();
int crc = buffer.getInt();
short attributes = buffer.getShort();
int lastOffsetDelta = buffer.getInt();
long firstTimestamp = buffer.getLong();
long maxTimestamp = buffer.getLong();
long producerId = buffer.getLong();
short producerEpoch = buffer.getShort();
int baseSequence = buffer.getInt();
int recordCount = buffer.getInt();
System.out.printf("Batch: offset=%d, records=%d, timestamp=%s%n",
baseOffset, recordCount, Instant.ofEpochMilli(firstTimestamp));
buffer.clear();
}
}
}
/**
* Demonstrates offset lookup using index
*/
public long findPositionForOffset(Path indexFile, long targetOffset) throws IOException {
// Index entries are 8 bytes: 4 bytes offset + 4 bytes position
try (FileChannel channel = FileChannel.open(indexFile, StandardOpenOption.READ)) {
long fileSize = channel.size();
int entryCount = (int) (fileSize / 8);
// Binary search
int low = 0;
int high = entryCount - 1;
ByteBuffer buffer = ByteBuffer.allocate(8);
while (low <= high) {
int mid = (low + high) / 2;
channel.position(mid * 8L);
buffer.clear();
channel.read(buffer);
buffer.flip();
int relativeOffset = buffer.getInt();
int position = buffer.getInt();
if (relativeOffset < targetOffset) {
low = mid + 1;
} else if (relativeOffset > targetOffset) {
high = mid - 1;
} else {
return position;
}
}
return -1; // Not found
}
}
}
Anti-Patterns
❌ Too Many Small Partitions
# WRONG - creates memory pressure
num.partitions: 1000
replication.factor: 3
# = 3000 partition replicas per broker = high overhead
# ✅ CORRECT - right-size partitions
num.partitions: 12 # Scale based on consumer parallelism
replication.factor: 3
❌ Ignoring Under-Replicated Partitions
Under-replicated partitions indicate failing brokers or network issues. Always monitor and alert on this metric.