Skip to content
Home / Skills / Kafka / Kafka Internals
KA

Kafka Internals

Kafka core v1.0.0

Kafka Internals

Overview

Apache Kafka is a distributed event streaming platform. Understanding Kafka’s internal architecture—brokers, partitions, replication, and storage—is essential for building reliable, high-throughput event-driven systems.


Key Concepts

Kafka Architecture

┌─────────────────────────────────────────────────────────────┐
│                    Kafka Cluster                             │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐                      │
│  │Broker 1 │  │Broker 2 │  │Broker 3 │                      │
│  │         │  │         │  │         │                      │
│  │ Topic A │  │ Topic A │  │ Topic A │                      │
│  │  P0(L)  │  │  P0(F)  │  │  P1(L)  │                      │
│  │  P1(F)  │  │  P2(L)  │  │  P2(F)  │                      │
│  │         │  │         │  │         │                      │
│  │ Topic B │  │ Topic B │  │ Topic B │                      │
│  │  P0(F)  │  │  P0(L)  │  │  P1(L)  │                      │
│  │  P1(F)  │  │  P1(F)  │  │  P0(F)  │                      │
│  └─────────┘  └─────────┘  └─────────┘                      │
│       │            │            │                           │
│       └────────────┼────────────┘                           │
│                    │                                         │
│              ┌─────┴─────┐                                  │
│              │Controller │ (elected broker)                 │
│              └───────────┘                                  │
│                    │                                         │
│         ┌─────────────────────┐                             │
│         │ KRaft / ZooKeeper   │                             │
│         │ (metadata store)    │                             │
│         └─────────────────────┘                             │
│                                                              │
│  L = Leader, F = Follower                                   │
│  Replication Factor = 3                                      │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Partition Storage:
┌─────────────────────────────────────────────────────────────┐
│  Partition 0                                                 │
│  ┌─────────┬─────────┬─────────┬─────────┬─────────┐       │
│  │Segment 0│Segment 1│Segment 2│Segment 3│Segment 4│       │
│  │ (old)   │         │         │         │ (active)│       │
│  └─────────┴─────────┴─────────┴─────────┴─────────┘       │
│                                                              │
│  Each Segment:                                               │
│  ├── 00000000000000000000.log    (messages)                 │
│  ├── 00000000000000000000.index  (offset → position)        │
│  └── 00000000000000000000.timeindex (timestamp → offset)    │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Key Metrics

MetricDescriptionTarget
Under-replicated partitionsPartitions without full ISR0
Offline partitionsPartitions without leader0
Request latency (p99)Produce/Fetch latency< 100ms
Consumer lagMessages behind headNear 0

Best Practices

1. Plan Partition Count Carefully

More partitions = higher parallelism but more overhead.

2. Set Appropriate Replication Factor

Use 3 for production data durability.

3. Monitor ISR Shrinkage

Under-replicated partitions indicate problems.

4. Configure Retention Properly

Balance storage costs with replay requirements.

5. Use KRaft for New Clusters

ZooKeeper mode is deprecated.


Code Examples

Example 1: Topic Configuration

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.TopicConfig;

public class TopicManagement {
    
    private final AdminClient adminClient;
    
    public TopicManagement(String bootstrapServers) {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        this.adminClient = AdminClient.create(props);
    }
    
    public void createTopic(String topicName, int partitions, short replicationFactor) 
            throws ExecutionException, InterruptedException {
        
        // Topic configuration
        Map<String, String> configs = new HashMap<>();
        
        // Retention
        configs.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(7 * 24 * 60 * 60 * 1000L)); // 7 days
        configs.put(TopicConfig.RETENTION_BYTES_CONFIG, String.valueOf(10L * 1024 * 1024 * 1024)); // 10GB per partition
        
        // Compaction for state topics
        // configs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
        
        // Segment configuration
        configs.put(TopicConfig.SEGMENT_MS_CONFIG, String.valueOf(24 * 60 * 60 * 1000L)); // 1 day
        configs.put(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(1024 * 1024 * 1024)); // 1GB
        
        // Replication
        configs.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2");
        
        // Compression
        configs.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4");
        
        // Message size
        configs.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(10 * 1024 * 1024)); // 10MB
        
        NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor)
            .configs(configs);
        
        CreateTopicsResult result = adminClient.createTopics(List.of(newTopic));
        result.all().get(); // Wait for completion
        
        System.out.println("Created topic: " + topicName);
    }
    
    public void createCompactedTopic(String topicName, int partitions, short replicationFactor)
            throws ExecutionException, InterruptedException {
        
        Map<String, String> configs = new HashMap<>();
        
        // Compaction settings
        configs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
        configs.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, String.valueOf(60 * 60 * 1000L)); // 1 hour
        configs.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, String.valueOf(24 * 60 * 60 * 1000L)); // 1 day
        configs.put(TopicConfig.SEGMENT_MS_CONFIG, String.valueOf(60 * 60 * 1000L)); // 1 hour segments
        
        configs.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2");
        
        NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor)
            .configs(configs);
        
        adminClient.createTopics(List.of(newTopic)).all().get();
    }
    
    public Map<String, TopicDescription> describeTopics(Collection<String> topicNames)
            throws ExecutionException, InterruptedException {
        return adminClient.describeTopics(topicNames).allTopicNames().get();
    }
    
    public void increasePartitions(String topicName, int newPartitionCount)
            throws ExecutionException, InterruptedException {
        
        Map<String, NewPartitions> newPartitions = Map.of(
            topicName, NewPartitions.increaseTo(newPartitionCount)
        );
        
        adminClient.createPartitions(newPartitions).all().get();
    }
    
    public void close() {
        adminClient.close();
    }
}

Example 2: Understanding Partition Assignment

public class PartitionAssignment {
    
    /**
     * Demonstrates how messages are assigned to partitions
     */
    public static class KeyPartitioningExample {
        
        private final KafkaProducer<String, String> producer;
        
        public KeyPartitioningExample(String bootstrapServers) {
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            
            this.producer = new KafkaProducer<>(props);
        }
        
        public void demonstratePartitioning(String topic) {
            // Same key always goes to same partition
            String customerId = "customer-123";
            
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>(
                    topic,
                    customerId,  // Key determines partition
                    "Order " + i
                );
                
                producer.send(record, (metadata, exception) -> {
                    if (exception == null) {
                        System.out.printf("Sent to partition %d, offset %d%n",
                            metadata.partition(), metadata.offset());
                    }
                });
            }
            // All 10 messages will go to the same partition
        }
        
        public void sendToSpecificPartition(String topic, int partition) {
            // Explicit partition assignment
            ProducerRecord<String, String> record = new ProducerRecord<>(
                topic,
                partition,  // Explicit partition
                "key",
                "value"
            );
            
            producer.send(record);
        }
    }
    
    /**
     * Custom partitioner for business logic
     */
    public static class PriorityPartitioner implements Partitioner {
        
        private static final int HIGH_PRIORITY_PARTITION = 0;
        
        @Override
        public int partition(String topic, Object key, byte[] keyBytes,
                            Object value, byte[] valueBytes, Cluster cluster) {
            
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            
            if (key == null) {
                // Round-robin for null keys
                return ThreadLocalRandom.current().nextInt(numPartitions);
            }
            
            String keyString = key.toString();
            
            // Route high-priority to dedicated partition
            if (keyString.startsWith("PRIORITY_HIGH")) {
                return HIGH_PRIORITY_PARTITION;
            }
            
            // Consistent hashing for other keys
            return Math.abs(keyString.hashCode()) % numPartitions;
        }
        
        @Override
        public void close() {}
        
        @Override
        public void configure(Map<String, ?> configs) {}
    }
}

Example 3: Broker Configuration

# Server Basics
broker.id=0
listeners=PLAINTEXT://:9092,SSL://:9093
advertised.listeners=PLAINTEXT://broker1.example.com:9092
inter.broker.listener.name=PLAINTEXT

# KRaft Mode (ZooKeeper-less)
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@broker1:9093,2@broker2:9093,3@broker3:9093
controller.listener.names=CONTROLLER

# Log Configuration
log.dirs=/var/kafka/data1,/var/kafka/data2
num.partitions=12
default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false

# Log Retention
log.retention.hours=168
log.retention.bytes=-1
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

# Network and I/O
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# Replication
num.replica.fetchers=4
replica.fetch.max.bytes=10485760
replica.socket.timeout.ms=30000
replica.lag.time.max.ms=30000

# Producer/Consumer Quotas
quota.producer.default=10485760
quota.consumer.default=10485760

# Request Processing
queued.max.requests=500
request.timeout.ms=30000

# Compression
compression.type=producer

Example 4: Monitoring Kafka Internals

public class KafkaMonitoring {
    
    private final AdminClient adminClient;
    private final MeterRegistry meterRegistry;
    
    public void monitorClusterHealth() throws ExecutionException, InterruptedException {
        // Describe cluster
        DescribeClusterResult clusterResult = adminClient.describeCluster();
        Collection<Node> nodes = clusterResult.nodes().get();
        Node controller = clusterResult.controller().get();
        
        System.out.printf("Cluster has %d brokers, controller is %d%n",
            nodes.size(), controller.id());
        
        // Check for under-replicated partitions
        Map<String, TopicDescription> topics = adminClient.describeTopics(
            adminClient.listTopics().names().get()
        ).allTopicNames().get();
        
        int underReplicated = 0;
        int offline = 0;
        
        for (TopicDescription topic : topics.values()) {
            for (TopicPartitionInfo partition : topic.partitions()) {
                if (partition.leader() == null) {
                    offline++;
                    System.err.printf("OFFLINE: %s-%d%n", topic.name(), partition.partition());
                } else if (partition.isr().size() < partition.replicas().size()) {
                    underReplicated++;
                    System.err.printf("UNDER-REPLICATED: %s-%d (ISR: %d/%d)%n",
                        topic.name(), partition.partition(),
                        partition.isr().size(), partition.replicas().size());
                }
            }
        }
        
        // Record metrics
        meterRegistry.gauge("kafka.partitions.offline", offline);
        meterRegistry.gauge("kafka.partitions.under_replicated", underReplicated);
    }
    
    public void monitorConsumerLag(String groupId) throws ExecutionException, InterruptedException {
        // Get consumer group offsets
        Map<TopicPartition, OffsetAndMetadata> offsets = adminClient
            .listConsumerGroupOffsets(groupId)
            .partitionsToOffsetAndMetadata()
            .get();
        
        // Get end offsets (high watermarks)
        Map<TopicPartition, OffsetSpec> offsetRequest = offsets.keySet().stream()
            .collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()));
        
        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets = 
            adminClient.listOffsets(offsetRequest).all().get();
        
        // Calculate lag
        long totalLag = 0;
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            TopicPartition tp = entry.getKey();
            long committed = entry.getValue().offset();
            long end = endOffsets.get(tp).offset();
            long lag = end - committed;
            
            System.out.printf("%s-%d: lag = %d%n", tp.topic(), tp.partition(), lag);
            
            meterRegistry.gauge(
                "kafka.consumer.lag",
                Tags.of("group", groupId, "topic", tp.topic(), "partition", String.valueOf(tp.partition())),
                lag
            );
            
            totalLag += lag;
        }
        
        System.out.printf("Total lag for %s: %d%n", groupId, totalLag);
    }
    
    public void monitorProducerMetrics(KafkaProducer<?, ?> producer) {
        // Get producer metrics
        Map<MetricName, ? extends Metric> metrics = producer.metrics();
        
        metrics.forEach((name, metric) -> {
            if (name.group().equals("producer-metrics")) {
                switch (name.name()) {
                    case "record-send-rate":
                    case "request-latency-avg":
                    case "request-latency-max":
                    case "batch-size-avg":
                    case "records-per-request-avg":
                        System.out.printf("%s: %s%n", name.name(), metric.metricValue());
                        break;
                }
            }
        });
    }
}

Example 5: Log Segment Analysis

public class LogSegmentAnalysis {
    
    /**
     * Analyze log segment files for debugging
     */
    public void analyzeLogSegment(Path segmentPath) throws IOException {
        // Log segment file structure
        Path logFile = segmentPath.resolve("00000000000000000000.log");
        Path indexFile = segmentPath.resolve("00000000000000000000.index");
        Path timeIndexFile = segmentPath.resolve("00000000000000000000.timeindex");
        
        // Read log file
        try (FileChannel channel = FileChannel.open(logFile, StandardOpenOption.READ)) {
            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
            
            while (channel.read(buffer) > 0) {
                buffer.flip();
                
                // Parse record batch header
                long baseOffset = buffer.getLong();
                int batchLength = buffer.getInt();
                int partitionLeaderEpoch = buffer.getInt();
                byte magic = buffer.get();
                int crc = buffer.getInt();
                short attributes = buffer.getShort();
                int lastOffsetDelta = buffer.getInt();
                long firstTimestamp = buffer.getLong();
                long maxTimestamp = buffer.getLong();
                long producerId = buffer.getLong();
                short producerEpoch = buffer.getShort();
                int baseSequence = buffer.getInt();
                int recordCount = buffer.getInt();
                
                System.out.printf("Batch: offset=%d, records=%d, timestamp=%s%n",
                    baseOffset, recordCount, Instant.ofEpochMilli(firstTimestamp));
                
                buffer.clear();
            }
        }
    }
    
    /**
     * Demonstrates offset lookup using index
     */
    public long findPositionForOffset(Path indexFile, long targetOffset) throws IOException {
        // Index entries are 8 bytes: 4 bytes offset + 4 bytes position
        try (FileChannel channel = FileChannel.open(indexFile, StandardOpenOption.READ)) {
            long fileSize = channel.size();
            int entryCount = (int) (fileSize / 8);
            
            // Binary search
            int low = 0;
            int high = entryCount - 1;
            
            ByteBuffer buffer = ByteBuffer.allocate(8);
            
            while (low <= high) {
                int mid = (low + high) / 2;
                channel.position(mid * 8L);
                buffer.clear();
                channel.read(buffer);
                buffer.flip();
                
                int relativeOffset = buffer.getInt();
                int position = buffer.getInt();
                
                if (relativeOffset < targetOffset) {
                    low = mid + 1;
                } else if (relativeOffset > targetOffset) {
                    high = mid - 1;
                } else {
                    return position;
                }
            }
            
            return -1; // Not found
        }
    }
}

Anti-Patterns

❌ Too Many Small Partitions

# WRONG - creates memory pressure
num.partitions: 1000
replication.factor: 3
# = 3000 partition replicas per broker = high overhead

# ✅ CORRECT - right-size partitions
num.partitions: 12  # Scale based on consumer parallelism
replication.factor: 3

❌ Ignoring Under-Replicated Partitions

Under-replicated partitions indicate failing brokers or network issues. Always monitor and alert on this metric.


References