Skip to content
Home / Skills / Kafka / Ordering Guarantees
KA

Ordering Guarantees

Kafka advanced v1.0.0

Kafka Message Ordering

Overview

Kafka guarantees message ordering within a partition, not across partitions. This skill covers how to design partition keys, handle ordering requirements, and work around ordering limitations.


Key Concepts

Ordering Guarantees

┌─────────────────────────────────────────────────────────────┐
│                    Kafka Ordering Model                      │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  Topic: orders (3 partitions)                               │
│                                                              │
│  Partition 0:  [A1] → [A2] → [A3] → [A4]                   │
│                 ↑                                            │
│                 Strictly ordered within partition            │
│                                                              │
│  Partition 1:  [B1] → [B2] → [B3]                          │
│                                                              │
│  Partition 2:  [C1] → [C2] → [C3] → [C4] → [C5]           │
│                                                              │
│  Consumer sees A1 before A2, but may see B1 before A1      │
│                                                              │
│  Ordering Strategies:                                        │
│  ┌───────────────────────────────────────────────────────┐ │
│  │ 1. Single Partition (1 partition)                     │ │
│  │    ✓ Global ordering                                  │ │
│  │    ✗ No parallelism, throughput limited              │ │
│  │                                                       │ │
│  │ 2. Key-Based Partitioning                             │ │
│  │    ✓ Ordering per key (entity)                       │ │
│  │    ✓ Parallel processing of different keys          │ │
│  │                                                       │ │
│  │ 3. Event Sequencing (sequence numbers)               │ │
│  │    ✓ Detect out-of-order at consumer                │ │
│  │    ✓ Works across partitions                        │ │
│  └───────────────────────────────────────────────────────┘ │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Producer Ordering Configuration

ConfigValueEffect
max.in.flight.requests.per.connection1Strict ordering (slow)
max.in.flight.requests.per.connection5Ordering with idempotence
enable.idempotencetrueRequired for ordering with in-flight > 1

Best Practices

1. Design Partition Keys Carefully

Choose keys that group related events that need ordering.

2. Use Idempotent Producer

Enables ordering with higher throughput.

3. Include Sequence Numbers

For detecting out-of-order delivery at consumer.

4. Avoid Hot Partitions

Ensure even key distribution across partitions.

5. Single Consumer per Partition

For strict ordering, ensure one consumer per partition.


Code Examples

Example 1: Key-Based Ordering

public class OrderedProducer {
    
    private final KafkaProducer<String, OrderEvent> producer;
    
    public OrderedProducer(String bootstrapServers) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        
        // Ordering with high throughput
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        
        this.producer = new KafkaProducer<>(props);
    }
    
    /**
     * Events for the same order will be ordered because they share the same key
     */
    public void sendOrderEvents(Order order, List<OrderEvent> events) {
        String orderKey = order.getId().toString();
        
        for (OrderEvent event : events) {
            ProducerRecord<String, OrderEvent> record = new ProducerRecord<>(
                "order-events",
                orderKey,  // Same key = same partition = ordered
                event
            );
            
            // Add sequence for consumer validation
            record.headers().add("sequence", 
                String.valueOf(event.getSequenceNumber()).getBytes());
            
            producer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    handleSendError(order, event, exception);
                } else {
                    log.debug("Sent event {} to partition {} offset {}",
                        event.getType(), metadata.partition(), metadata.offset());
                }
            });
        }
    }
    
    /**
     * Multi-entity ordering: events for related entities need same key
     */
    public void sendWithCorrelation(String correlationKey, Object event) {
        // Use correlation key (e.g., session ID, transaction ID)
        // to ensure related events are ordered
        ProducerRecord<String, Object> record = new ProducerRecord<>(
            "events",
            correlationKey,
            event
        );
        
        producer.send(record);
    }
}

Example 2: Ordered Consumer

public class OrderedConsumer {
    
    private final KafkaConsumer<String, OrderEvent> consumer;
    private final Map<String, Long> lastSeenSequence = new ConcurrentHashMap<>();
    
    public OrderedConsumer(String bootstrapServers, String groupId) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        
        // Single-threaded consumer for ordering
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        
        // Manual commit for reliable processing
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        
        this.consumer = new KafkaConsumer<>(props);
    }
    
    public void consume() {
        consumer.subscribe(List.of("order-events"));
        
        while (true) {
            ConsumerRecords<String, OrderEvent> records = consumer.poll(Duration.ofMillis(100));
            
            // Process records in partition order
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, OrderEvent>> partitionRecords = 
                    records.records(partition);
                
                for (ConsumerRecord<String, OrderEvent> record : partitionRecords) {
                    processWithOrderValidation(record);
                }
            }
            
            consumer.commitSync();
        }
    }
    
    private void processWithOrderValidation(ConsumerRecord<String, OrderEvent> record) {
        String key = record.key();
        OrderEvent event = record.value();
        long sequence = event.getSequenceNumber();
        
        // Check sequence
        Long lastSequence = lastSeenSequence.get(key);
        
        if (lastSequence != null && sequence <= lastSequence) {
            // Out of order or duplicate
            log.warn("Out of order event for {}: expected > {}, got {}",
                key, lastSequence, sequence);
            
            if (sequence == lastSequence) {
                // Duplicate - skip
                return;
            }
            
            // Handle out-of-order based on business requirements
            handleOutOfOrder(record, lastSequence);
            return;
        }
        
        if (lastSequence != null && sequence > lastSequence + 1) {
            // Gap detected - missing events
            log.warn("Gap detected for {}: expected {}, got {}",
                key, lastSequence + 1, sequence);
            
            handleGap(record, lastSequence);
        }
        
        // Process event
        processEvent(event);
        
        // Update last seen
        lastSeenSequence.put(key, sequence);
    }
    
    private void handleOutOfOrder(ConsumerRecord<String, OrderEvent> record, long lastSequence) {
        // Options:
        // 1. Buffer and reorder
        // 2. Reject and alert
        // 3. Process anyway if business allows
    }
    
    private void handleGap(ConsumerRecord<String, OrderEvent> record, long lastSequence) {
        // Options:
        // 1. Wait for missing events (with timeout)
        // 2. Request replay of missing events
        // 3. Continue with warning
    }
}

Example 3: Event Reordering Buffer

public class EventReorderBuffer<K, V> {
    
    private final Map<K, TreeMap<Long, V>> buffers = new ConcurrentHashMap<>();
    private final Map<K, Long> expectedSequences = new ConcurrentHashMap<>();
    private final Duration maxWaitTime;
    private final int maxBufferSize;
    
    public EventReorderBuffer(Duration maxWaitTime, int maxBufferSize) {
        this.maxWaitTime = maxWaitTime;
        this.maxBufferSize = maxBufferSize;
    }
    
    public List<V> addAndFlush(K key, long sequence, V event) {
        TreeMap<Long, V> buffer = buffers.computeIfAbsent(key, k -> new TreeMap<>());
        Long expected = expectedSequences.getOrDefault(key, 1L);
        
        if (sequence == expected) {
            // In order - process immediately and flush any buffered
            List<V> toProcess = new ArrayList<>();
            toProcess.add(event);
            expectedSequences.put(key, sequence + 1);
            
            // Check for buffered events that are now in order
            while (!buffer.isEmpty() && buffer.firstKey() == expectedSequences.get(key)) {
                toProcess.add(buffer.pollFirstEntry().getValue());
                expectedSequences.merge(key, 1L, Long::sum);
            }
            
            return toProcess;
            
        } else if (sequence > expected) {
            // Future event - buffer it
            if (buffer.size() >= maxBufferSize) {
                // Buffer full - force flush
                log.warn("Buffer full for {}, forcing flush", key);
                return forceFlush(key);
            }
            
            buffer.put(sequence, event);
            return List.of();
            
        } else {
            // Past event - duplicate or very late
            log.warn("Received past event for {}: expected {}, got {}", key, expected, sequence);
            return List.of(); // Skip
        }
    }
    
    public List<V> forceFlush(K key) {
        TreeMap<Long, V> buffer = buffers.get(key);
        if (buffer == null || buffer.isEmpty()) {
            return List.of();
        }
        
        List<V> events = new ArrayList<>(buffer.values());
        buffer.clear();
        
        // Update expected to after highest flushed
        Long lastSequence = events.isEmpty() ? 
            expectedSequences.get(key) : 
            buffer.lastKey() + 1;
        expectedSequences.put(key, lastSequence);
        
        return events;
    }
    
    /**
     * Periodic cleanup of stale buffers
     */
    @Scheduled(fixedRate = 60000)
    public void cleanup() {
        buffers.entrySet().removeIf(entry -> {
            TreeMap<Long, V> buffer = entry.getValue();
            if (buffer.isEmpty()) {
                return true;
            }
            
            // Check if oldest entry is too old
            // (Requires timestamp in events)
            return false;
        });
    }
}

Example 4: Partition-Aware Consumer

public class PartitionAwareConsumer {
    
    private final ExecutorService executor;
    private final Map<Integer, BlockingQueue<ConsumerRecord<String, String>>> partitionQueues;
    
    /**
     * Process partitions in parallel while maintaining order within each partition
     */
    public void consumeWithParallelPartitions() {
        KafkaConsumer<String, String> consumer = createConsumer();
        consumer.subscribe(List.of("events"), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                // Wait for in-flight processing
                for (TopicPartition partition : partitions) {
                    BlockingQueue<ConsumerRecord<String, String>> queue = 
                        partitionQueues.get(partition.partition());
                    if (queue != null) {
                        // Wait for queue to drain
                        while (!queue.isEmpty()) {
                            Thread.sleep(100);
                        }
                    }
                }
            }
            
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                for (TopicPartition partition : partitions) {
                    BlockingQueue<ConsumerRecord<String, String>> queue = 
                        new LinkedBlockingQueue<>(1000);
                    partitionQueues.put(partition.partition(), queue);
                    
                    // Start worker for this partition
                    executor.submit(() -> processPartition(partition.partition(), queue));
                }
            }
        });
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            
            // Route to partition-specific queues
            for (ConsumerRecord<String, String> record : records) {
                BlockingQueue<ConsumerRecord<String, String>> queue = 
                    partitionQueues.get(record.partition());
                
                try {
                    queue.put(record); // Blocks if queue is full
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            
            // Commit periodically
            consumer.commitSync();
        }
    }
    
    private void processPartition(int partition, 
                                   BlockingQueue<ConsumerRecord<String, String>> queue) {
        while (true) {
            try {
                ConsumerRecord<String, String> record = queue.take();
                
                // Process in order for this partition
                process(record);
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
}

Example 5: Avoiding Hot Partitions

public class BalancedPartitioner implements Partitioner {
    
    private final AtomicLong counter = new AtomicLong(0);
    private final Map<String, Integer> stickyAssignments = new ConcurrentHashMap<>();
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes, Cluster cluster) {
        
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        
        if (keyBytes == null) {
            // No key: round-robin for even distribution
            return (int) (counter.getAndIncrement() % numPartitions);
        }
        
        String keyString = key.toString();
        
        // Check for known hot keys
        if (isHotKey(keyString)) {
            // Spread hot keys across multiple partitions
            // Using key + sequence to distribute
            return spreadHotKey(keyString, numPartitions);
        }
        
        // Standard consistent hashing for regular keys
        return Math.abs(keyString.hashCode()) % numPartitions;
    }
    
    private boolean isHotKey(String key) {
        // Could be configured or learned dynamically
        // Examples: popular products, high-activity users
        return key.startsWith("POPULAR_") || hotKeySet.contains(key);
    }
    
    private int spreadHotKey(String key, int numPartitions) {
        // Add sequence to spread across partitions
        // Note: This breaks per-key ordering!
        // Only use when ordering is not required
        long seq = counter.getAndIncrement();
        String spreadKey = key + "-" + (seq % 10);
        return Math.abs(spreadKey.hashCode()) % numPartitions;
    }
    
    /**
     * Sticky partitioner for batching efficiency
     */
    public int stickyPartition(String topic, Object key, byte[] keyBytes,
                               Object value, byte[] valueBytes, Cluster cluster) {
        
        if (keyBytes != null) {
            // Keys go to deterministic partition
            return Math.abs(key.toString().hashCode()) % cluster.partitionsForTopic(topic).size();
        }
        
        // Null keys: stick to same partition for batching
        String threadId = String.valueOf(Thread.currentThread().getId());
        return stickyAssignments.computeIfAbsent(threadId, 
            t -> ThreadLocalRandom.current().nextInt(cluster.partitionsForTopic(topic).size())
        );
    }
    
    @Override
    public void close() {}
    
    @Override
    public void configure(Map<String, ?> configs) {}
}

Anti-Patterns

❌ Assuming Global Ordering

// WRONG - assumes messages arrive in send order
producer.send(new ProducerRecord<>("topic", null, "message1"));
producer.send(new ProducerRecord<>("topic", null, "message2"));
// Without key, messages may go to different partitions = no ordering

// ✅ CORRECT - use key for ordering
producer.send(new ProducerRecord<>("topic", "order-123", "message1"));
producer.send(new ProducerRecord<>("topic", "order-123", "message2"));
// Same key = same partition = ordered

❌ Multiple Consumers per Partition

When using consumer groups, each partition is assigned to exactly one consumer. Trying to have multiple consumers process the same partition breaks ordering.


References