KA
Delivery Semantics
Kafka core v1.0.0
Kafka Delivery Semantics
Overview
Kafka supports three delivery semantics: at-most-once, at-least-once, and exactly-once. Understanding these semantics and how to achieve them is critical for building reliable event-driven systems.
Key Concepts
Delivery Guarantees
┌─────────────────────────────────────────────────────────────┐
│ Delivery Semantics │
├─────────────────────────────────────────────────────────────┤
│ │
│ At-Most-Once: │
│ ┌────────┐ ┌───────┐ ┌──────────┐ │
│ │Producer│────▶│ Kafka │────▶│ Consumer │ │
│ └────────┘ └───────┘ └──────────┘ │
│ │ │ │
│ └── Might lose messages └── Process once │
│ (no retries, acks=0) or not at all │
│ │
│ At-Least-Once: │
│ ┌────────┐ ┌───────┐ ┌──────────┐ │
│ │Producer│────▶│ Kafka │────▶│ Consumer │ │
│ └────────┘ └───────┘ └──────────┘ │
│ │ │ │
│ └── Retry on failure └── May process same │
│ (acks=all) message twice │
│ │
│ Exactly-Once: │
│ ┌────────┐ ┌───────┐ ┌──────────┐ │
│ │Producer│────▶│ Kafka │────▶│ Consumer │ │
│ └────────┘ └───────┘ └──────────┘ │
│ │ │ │ │
│ └── Idempotent └── Atomic └── Transactional │
│ + Transactional commits read + commit │
│ │
└─────────────────────────────────────────────────────────────┘
Producer Acknowledgment (acks):
┌─────────────────────────────────────────────────────────────┐
│ │
│ acks=0: Fire and forget │
│ Producer ───────▶ Broker (no wait) │
│ • Fastest, lowest durability │
│ │
│ acks=1: Leader acknowledgment │
│ Producer ──▶ Leader ──▶ Response │
│ • Balanced │
│ │
│ acks=all: All ISR acknowledgment │
│ Producer ──▶ Leader ──▶ Replicas ──▶ Response │
│ • Highest durability, slowest │
│ │
└─────────────────────────────────────────────────────────────┘
Best Practices
1. Use acks=all for Critical Data
Ensures data is replicated before acknowledging.
2. Enable Idempotence for Producers
Prevents duplicate messages on retries.
3. Use Transactions for Atomic Operations
When producing to multiple topics/partitions.
4. Implement Idempotent Consumers
Even with exactly-once, handle duplicates gracefully.
5. Configure min.insync.replicas
Set to at least 2 for durability.
Code Examples
Example 1: At-Least-Once Producer
import org.apache.kafka.clients.producer.*;
public class AtLeastOnceProducer {
private final KafkaProducer<String, String> producer;
public AtLeastOnceProducer(String bootstrapServers) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// At-least-once configuration
props.put(ProducerConfig.ACKS_CONFIG, "all"); // Wait for all replicas
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); // 2 minutes
// Ordering guarantee with retries
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // Required for ordering with retries
// Batching for throughput
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
this.producer = new KafkaProducer<>(props);
}
public void send(String topic, String key, String value) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
try {
// Synchronous send for guaranteed delivery
RecordMetadata metadata = producer.send(record).get();
System.out.printf("Sent to %s-%d at offset %d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while sending", e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof RetriableException) {
// Should not happen with our config, but handle gracefully
throw new RuntimeException("Retriable error after max retries", cause);
}
throw new RuntimeException("Non-retriable error", cause);
}
}
public void sendAsync(String topic, String key, String value, Callback callback) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
// Asynchronous with callback
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// Handle error - message might be lost if not retried
System.err.println("Send failed: " + exception.getMessage());
}
if (callback != null) {
callback.onCompletion(metadata, exception);
}
});
}
public void close() {
producer.flush();
producer.close();
}
}
Example 2: Exactly-Once Producer with Transactions
public class ExactlyOnceProducer {
private final KafkaProducer<String, String> producer;
private final String transactionalId;
public ExactlyOnceProducer(String bootstrapServers, String transactionalId) {
this.transactionalId = transactionalId;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// Exactly-once configuration
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
// Required settings for transactions
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// Transaction timeout
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
this.producer = new KafkaProducer<>(props);
// Initialize transactions (call once per producer instance)
producer.initTransactions();
}
/**
* Atomic write to multiple topics
*/
public void sendAtomically(List<ProducerRecord<String, String>> records) {
try {
producer.beginTransaction();
for (ProducerRecord<String, String> record : records) {
producer.send(record);
}
producer.commitTransaction();
} catch (ProducerFencedException | InvalidProducerEpochException e) {
// Fatal error - another producer with same transactional.id is active
producer.close();
throw new RuntimeException("Producer fenced", e);
} catch (KafkaException e) {
// Abort transaction on any error
producer.abortTransaction();
throw new RuntimeException("Transaction failed", e);
}
}
/**
* Read-process-write pattern with exactly-once
*/
public void processAndProduce(
KafkaConsumer<String, String> consumer,
String outputTopic,
Function<String, String> processor) {
try {
producer.beginTransaction();
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Process
String result = processor.apply(record.value());
// Produce to output topic
producer.send(new ProducerRecord<>(outputTopic, record.key(), result));
}
// Atomically commit consumer offsets with producer transaction
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
producer.commitTransaction();
} catch (ProducerFencedException | InvalidProducerEpochException e) {
producer.close();
throw new RuntimeException("Producer fenced", e);
} catch (KafkaException e) {
producer.abortTransaction();
throw e;
}
}
public void close() {
producer.close();
}
}
Example 3: Idempotent Consumer
public class IdempotentConsumer {
private final KafkaConsumer<String, String> consumer;
private final IdempotencyStore idempotencyStore;
private final MessageProcessor processor;
public IdempotentConsumer(String bootstrapServers, String groupId) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// At-least-once: commit after processing
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Read committed transactions only
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// Start from earliest on new group
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
this.consumer = new KafkaConsumer<>(props);
this.idempotencyStore = new RedisIdempotencyStore();
this.processor = new MessageProcessor();
}
public void consume(List<String> topics) {
consumer.subscribe(topics);
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String idempotencyKey = generateIdempotencyKey(record);
// Check if already processed
if (idempotencyStore.hasBeenProcessed(idempotencyKey)) {
System.out.printf("Skipping duplicate: %s%n", idempotencyKey);
continue;
}
try {
// Process message
processor.process(record);
// Mark as processed (with TTL for cleanup)
idempotencyStore.markProcessed(idempotencyKey, Duration.ofDays(7));
} catch (Exception e) {
// Don't mark as processed - will be retried
System.err.printf("Failed to process %s: %s%n",
idempotencyKey, e.getMessage());
throw e;
}
}
// Commit after successful processing
consumer.commitSync();
}
} finally {
consumer.close();
}
}
private String generateIdempotencyKey(ConsumerRecord<String, String> record) {
// Unique key based on topic, partition, and offset
return String.format("%s-%d-%d",
record.topic(), record.partition(), record.offset());
}
}
// Idempotency store interface
interface IdempotencyStore {
boolean hasBeenProcessed(String key);
void markProcessed(String key, Duration ttl);
}
// Redis implementation
class RedisIdempotencyStore implements IdempotencyStore {
private final JedisPool jedisPool;
@Override
public boolean hasBeenProcessed(String key) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.exists("processed:" + key);
}
}
@Override
public void markProcessed(String key, Duration ttl) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.setex("processed:" + key, (int) ttl.toSeconds(), "1");
}
}
}
Example 4: Exactly-Once Stream Processing
public class ExactlyOnceStreamProcessor {
public void runStream() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Exactly-once semantics
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// Commit interval
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
// Changelog replication
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
StreamsBuilder builder = new StreamsBuilder();
// Input stream
KStream<String, String> orders = builder.stream("orders");
// State store with changelog (for exactly-once state)
StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores
.keyValueStoreBuilder(
Stores.persistentKeyValueStore("order-counts"),
Serdes.String(),
Serdes.Long()
)
.withCachingEnabled()
.withLoggingEnabled(Map.of()); // Changelog for recovery
builder.addStateStore(storeBuilder);
// Transform with state
KStream<String, String> enriched = orders.transformValues(
() -> new ValueTransformerWithKey<String, String, String>() {
private KeyValueStore<String, Long> store;
@Override
public void init(ProcessorContext context) {
this.store = context.getStateStore("order-counts");
}
@Override
public String transform(String key, String value) {
Long count = store.get(key);
count = (count == null) ? 1L : count + 1;
store.put(key, count);
// Return enriched value
return value + "|count=" + count;
}
@Override
public void close() {}
},
"order-counts"
);
// Output (exactly-once to output topic)
enriched.to("enriched-orders");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
// Handle uncaught exceptions
streams.setUncaughtExceptionHandler(exception -> {
System.err.println("Stream error: " + exception.getMessage());
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
streams.start();
// Shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Example 5: Consumer with Manual Offset Management
public class ManualOffsetConsumer {
private final KafkaConsumer<String, String> consumer;
private final Map<TopicPartition, Long> processedOffsets = new ConcurrentHashMap<>();
public void consumeWithManualOffset(List<String> topics) {
consumer.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Commit processed offsets before rebalance
commitProcessedOffsets();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Optionally seek to specific offsets
for (TopicPartition partition : partitions) {
Long storedOffset = loadOffsetFromExternalStore(partition);
if (storedOffset != null) {
consumer.seek(partition, storedOffset);
}
}
}
});
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
// Track processed offset
TopicPartition partition = new TopicPartition(record.topic(), record.partition());
processedOffsets.put(partition, record.offset() + 1);
}
// Periodic commit
if (shouldCommit()) {
commitProcessedOffsets();
}
}
} finally {
commitProcessedOffsets();
consumer.close();
}
}
private void commitProcessedOffsets() {
if (processedOffsets.isEmpty()) {
return;
}
Map<TopicPartition, OffsetAndMetadata> offsets = processedOffsets.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> new OffsetAndMetadata(e.getValue())
));
try {
// Also persist to external store for recovery
saveOffsetsToExternalStore(offsets);
// Commit to Kafka
consumer.commitSync(offsets);
processedOffsets.clear();
} catch (CommitFailedException e) {
System.err.println("Commit failed: " + e.getMessage());
// Handle - possibly reprocess
}
}
private void saveOffsetsToExternalStore(Map<TopicPartition, OffsetAndMetadata> offsets) {
// Persist to database for recovery after crash
// This enables exactly-once with external state
}
private Long loadOffsetFromExternalStore(TopicPartition partition) {
// Load from database
return null; // Return null to use Kafka-stored offset
}
}
Anti-Patterns
❌ Auto-Commit with At-Least-Once
// WRONG - may lose messages
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
// If crash happens after auto-commit but before processing, messages are lost
// ✅ CORRECT - manual commit after processing
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// ... process records ...
consumer.commitSync();
❌ Not Handling Producer Fencing
// WRONG - ignoring ProducerFencedException
try {
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction(); // Wrong for fencing!
}
// ✅ CORRECT - close on fencing
try {
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.close(); // Must close and create new producer
throw e;
}