KA
Ordering Guarantees
Kafka advanced v1.0.0
Kafka Message Ordering
Overview
Kafka guarantees message ordering within a partition, not across partitions. This skill covers how to design partition keys, handle ordering requirements, and work around ordering limitations.
Key Concepts
Ordering Guarantees
┌─────────────────────────────────────────────────────────────┐
│ Kafka Ordering Model │
├─────────────────────────────────────────────────────────────┤
│ │
│ Topic: orders (3 partitions) │
│ │
│ Partition 0: [A1] → [A2] → [A3] → [A4] │
│ ↑ │
│ Strictly ordered within partition │
│ │
│ Partition 1: [B1] → [B2] → [B3] │
│ │
│ Partition 2: [C1] → [C2] → [C3] → [C4] → [C5] │
│ │
│ Consumer sees A1 before A2, but may see B1 before A1 │
│ │
│ Ordering Strategies: │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ 1. Single Partition (1 partition) │ │
│ │ ✓ Global ordering │ │
│ │ ✗ No parallelism, throughput limited │ │
│ │ │ │
│ │ 2. Key-Based Partitioning │ │
│ │ ✓ Ordering per key (entity) │ │
│ │ ✓ Parallel processing of different keys │ │
│ │ │ │
│ │ 3. Event Sequencing (sequence numbers) │ │
│ │ ✓ Detect out-of-order at consumer │ │
│ │ ✓ Works across partitions │ │
│ └───────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Producer Ordering Configuration
| Config | Value | Effect |
|---|---|---|
max.in.flight.requests.per.connection | 1 | Strict ordering (slow) |
max.in.flight.requests.per.connection | 5 | Ordering with idempotence |
enable.idempotence | true | Required for ordering with in-flight > 1 |
Best Practices
1. Design Partition Keys Carefully
Choose keys that group related events that need ordering.
2. Use Idempotent Producer
Enables ordering with higher throughput.
3. Include Sequence Numbers
For detecting out-of-order delivery at consumer.
4. Avoid Hot Partitions
Ensure even key distribution across partitions.
5. Single Consumer per Partition
For strict ordering, ensure one consumer per partition.
Code Examples
Example 1: Key-Based Ordering
public class OrderedProducer {
private final KafkaProducer<String, OrderEvent> producer;
public OrderedProducer(String bootstrapServers) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// Ordering with high throughput
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
this.producer = new KafkaProducer<>(props);
}
/**
* Events for the same order will be ordered because they share the same key
*/
public void sendOrderEvents(Order order, List<OrderEvent> events) {
String orderKey = order.getId().toString();
for (OrderEvent event : events) {
ProducerRecord<String, OrderEvent> record = new ProducerRecord<>(
"order-events",
orderKey, // Same key = same partition = ordered
event
);
// Add sequence for consumer validation
record.headers().add("sequence",
String.valueOf(event.getSequenceNumber()).getBytes());
producer.send(record, (metadata, exception) -> {
if (exception != null) {
handleSendError(order, event, exception);
} else {
log.debug("Sent event {} to partition {} offset {}",
event.getType(), metadata.partition(), metadata.offset());
}
});
}
}
/**
* Multi-entity ordering: events for related entities need same key
*/
public void sendWithCorrelation(String correlationKey, Object event) {
// Use correlation key (e.g., session ID, transaction ID)
// to ensure related events are ordered
ProducerRecord<String, Object> record = new ProducerRecord<>(
"events",
correlationKey,
event
);
producer.send(record);
}
}
Example 2: Ordered Consumer
public class OrderedConsumer {
private final KafkaConsumer<String, OrderEvent> consumer;
private final Map<String, Long> lastSeenSequence = new ConcurrentHashMap<>();
public OrderedConsumer(String bootstrapServers, String groupId) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// Single-threaded consumer for ordering
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// Manual commit for reliable processing
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
this.consumer = new KafkaConsumer<>(props);
}
public void consume() {
consumer.subscribe(List.of("order-events"));
while (true) {
ConsumerRecords<String, OrderEvent> records = consumer.poll(Duration.ofMillis(100));
// Process records in partition order
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, OrderEvent>> partitionRecords =
records.records(partition);
for (ConsumerRecord<String, OrderEvent> record : partitionRecords) {
processWithOrderValidation(record);
}
}
consumer.commitSync();
}
}
private void processWithOrderValidation(ConsumerRecord<String, OrderEvent> record) {
String key = record.key();
OrderEvent event = record.value();
long sequence = event.getSequenceNumber();
// Check sequence
Long lastSequence = lastSeenSequence.get(key);
if (lastSequence != null && sequence <= lastSequence) {
// Out of order or duplicate
log.warn("Out of order event for {}: expected > {}, got {}",
key, lastSequence, sequence);
if (sequence == lastSequence) {
// Duplicate - skip
return;
}
// Handle out-of-order based on business requirements
handleOutOfOrder(record, lastSequence);
return;
}
if (lastSequence != null && sequence > lastSequence + 1) {
// Gap detected - missing events
log.warn("Gap detected for {}: expected {}, got {}",
key, lastSequence + 1, sequence);
handleGap(record, lastSequence);
}
// Process event
processEvent(event);
// Update last seen
lastSeenSequence.put(key, sequence);
}
private void handleOutOfOrder(ConsumerRecord<String, OrderEvent> record, long lastSequence) {
// Options:
// 1. Buffer and reorder
// 2. Reject and alert
// 3. Process anyway if business allows
}
private void handleGap(ConsumerRecord<String, OrderEvent> record, long lastSequence) {
// Options:
// 1. Wait for missing events (with timeout)
// 2. Request replay of missing events
// 3. Continue with warning
}
}
Example 3: Event Reordering Buffer
public class EventReorderBuffer<K, V> {
private final Map<K, TreeMap<Long, V>> buffers = new ConcurrentHashMap<>();
private final Map<K, Long> expectedSequences = new ConcurrentHashMap<>();
private final Duration maxWaitTime;
private final int maxBufferSize;
public EventReorderBuffer(Duration maxWaitTime, int maxBufferSize) {
this.maxWaitTime = maxWaitTime;
this.maxBufferSize = maxBufferSize;
}
public List<V> addAndFlush(K key, long sequence, V event) {
TreeMap<Long, V> buffer = buffers.computeIfAbsent(key, k -> new TreeMap<>());
Long expected = expectedSequences.getOrDefault(key, 1L);
if (sequence == expected) {
// In order - process immediately and flush any buffered
List<V> toProcess = new ArrayList<>();
toProcess.add(event);
expectedSequences.put(key, sequence + 1);
// Check for buffered events that are now in order
while (!buffer.isEmpty() && buffer.firstKey() == expectedSequences.get(key)) {
toProcess.add(buffer.pollFirstEntry().getValue());
expectedSequences.merge(key, 1L, Long::sum);
}
return toProcess;
} else if (sequence > expected) {
// Future event - buffer it
if (buffer.size() >= maxBufferSize) {
// Buffer full - force flush
log.warn("Buffer full for {}, forcing flush", key);
return forceFlush(key);
}
buffer.put(sequence, event);
return List.of();
} else {
// Past event - duplicate or very late
log.warn("Received past event for {}: expected {}, got {}", key, expected, sequence);
return List.of(); // Skip
}
}
public List<V> forceFlush(K key) {
TreeMap<Long, V> buffer = buffers.get(key);
if (buffer == null || buffer.isEmpty()) {
return List.of();
}
List<V> events = new ArrayList<>(buffer.values());
buffer.clear();
// Update expected to after highest flushed
Long lastSequence = events.isEmpty() ?
expectedSequences.get(key) :
buffer.lastKey() + 1;
expectedSequences.put(key, lastSequence);
return events;
}
/**
* Periodic cleanup of stale buffers
*/
@Scheduled(fixedRate = 60000)
public void cleanup() {
buffers.entrySet().removeIf(entry -> {
TreeMap<Long, V> buffer = entry.getValue();
if (buffer.isEmpty()) {
return true;
}
// Check if oldest entry is too old
// (Requires timestamp in events)
return false;
});
}
}
Example 4: Partition-Aware Consumer
public class PartitionAwareConsumer {
private final ExecutorService executor;
private final Map<Integer, BlockingQueue<ConsumerRecord<String, String>>> partitionQueues;
/**
* Process partitions in parallel while maintaining order within each partition
*/
public void consumeWithParallelPartitions() {
KafkaConsumer<String, String> consumer = createConsumer();
consumer.subscribe(List.of("events"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Wait for in-flight processing
for (TopicPartition partition : partitions) {
BlockingQueue<ConsumerRecord<String, String>> queue =
partitionQueues.get(partition.partition());
if (queue != null) {
// Wait for queue to drain
while (!queue.isEmpty()) {
Thread.sleep(100);
}
}
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
BlockingQueue<ConsumerRecord<String, String>> queue =
new LinkedBlockingQueue<>(1000);
partitionQueues.put(partition.partition(), queue);
// Start worker for this partition
executor.submit(() -> processPartition(partition.partition(), queue));
}
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// Route to partition-specific queues
for (ConsumerRecord<String, String> record : records) {
BlockingQueue<ConsumerRecord<String, String>> queue =
partitionQueues.get(record.partition());
try {
queue.put(record); // Blocks if queue is full
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
// Commit periodically
consumer.commitSync();
}
}
private void processPartition(int partition,
BlockingQueue<ConsumerRecord<String, String>> queue) {
while (true) {
try {
ConsumerRecord<String, String> record = queue.take();
// Process in order for this partition
process(record);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
Example 5: Avoiding Hot Partitions
public class BalancedPartitioner implements Partitioner {
private final AtomicLong counter = new AtomicLong(0);
private final Map<String, Integer> stickyAssignments = new ConcurrentHashMap<>();
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
// No key: round-robin for even distribution
return (int) (counter.getAndIncrement() % numPartitions);
}
String keyString = key.toString();
// Check for known hot keys
if (isHotKey(keyString)) {
// Spread hot keys across multiple partitions
// Using key + sequence to distribute
return spreadHotKey(keyString, numPartitions);
}
// Standard consistent hashing for regular keys
return Math.abs(keyString.hashCode()) % numPartitions;
}
private boolean isHotKey(String key) {
// Could be configured or learned dynamically
// Examples: popular products, high-activity users
return key.startsWith("POPULAR_") || hotKeySet.contains(key);
}
private int spreadHotKey(String key, int numPartitions) {
// Add sequence to spread across partitions
// Note: This breaks per-key ordering!
// Only use when ordering is not required
long seq = counter.getAndIncrement();
String spreadKey = key + "-" + (seq % 10);
return Math.abs(spreadKey.hashCode()) % numPartitions;
}
/**
* Sticky partitioner for batching efficiency
*/
public int stickyPartition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes != null) {
// Keys go to deterministic partition
return Math.abs(key.toString().hashCode()) % cluster.partitionsForTopic(topic).size();
}
// Null keys: stick to same partition for batching
String threadId = String.valueOf(Thread.currentThread().getId());
return stickyAssignments.computeIfAbsent(threadId,
t -> ThreadLocalRandom.current().nextInt(cluster.partitionsForTopic(topic).size())
);
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
Anti-Patterns
❌ Assuming Global Ordering
// WRONG - assumes messages arrive in send order
producer.send(new ProducerRecord<>("topic", null, "message1"));
producer.send(new ProducerRecord<>("topic", null, "message2"));
// Without key, messages may go to different partitions = no ordering
// ✅ CORRECT - use key for ordering
producer.send(new ProducerRecord<>("topic", "order-123", "message1"));
producer.send(new ProducerRecord<>("topic", "order-123", "message2"));
// Same key = same partition = ordered
❌ Multiple Consumers per Partition
When using consumer groups, each partition is assigned to exactly one consumer. Trying to have multiple consumers process the same partition breaks ordering.