Skip to content
Home / Skills / Kafka / Delivery Semantics
KA

Delivery Semantics

Kafka core v1.0.0

Kafka Delivery Semantics

Overview

Kafka supports three delivery semantics: at-most-once, at-least-once, and exactly-once. Understanding these semantics and how to achieve them is critical for building reliable event-driven systems.


Key Concepts

Delivery Guarantees

┌─────────────────────────────────────────────────────────────┐
│                    Delivery Semantics                        │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  At-Most-Once:                                               │
│  ┌────────┐     ┌───────┐     ┌──────────┐                 │
│  │Producer│────▶│ Kafka │────▶│ Consumer │                 │
│  └────────┘     └───────┘     └──────────┘                 │
│      │                              │                        │
│      └── Might lose messages        └── Process once        │
│          (no retries, acks=0)           or not at all       │
│                                                              │
│  At-Least-Once:                                              │
│  ┌────────┐     ┌───────┐     ┌──────────┐                 │
│  │Producer│────▶│ Kafka │────▶│ Consumer │                 │
│  └────────┘     └───────┘     └──────────┘                 │
│      │                              │                        │
│      └── Retry on failure           └── May process same    │
│          (acks=all)                     message twice       │
│                                                              │
│  Exactly-Once:                                               │
│  ┌────────┐     ┌───────┐     ┌──────────┐                 │
│  │Producer│────▶│ Kafka │────▶│ Consumer │                 │
│  └────────┘     └───────┘     └──────────┘                 │
│      │               │              │                        │
│      └── Idempotent  └── Atomic     └── Transactional       │
│          + Transactional commits        read + commit       │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Producer Acknowledgment (acks):
┌─────────────────────────────────────────────────────────────┐
│                                                              │
│  acks=0:  Fire and forget                                   │
│           Producer ───────▶ Broker (no wait)                │
│           • Fastest, lowest durability                      │
│                                                              │
│  acks=1:  Leader acknowledgment                             │
│           Producer ──▶ Leader ──▶ Response                  │
│           • Balanced                                         │
│                                                              │
│  acks=all: All ISR acknowledgment                           │
│           Producer ──▶ Leader ──▶ Replicas ──▶ Response     │
│           • Highest durability, slowest                     │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Best Practices

1. Use acks=all for Critical Data

Ensures data is replicated before acknowledging.

2. Enable Idempotence for Producers

Prevents duplicate messages on retries.

3. Use Transactions for Atomic Operations

When producing to multiple topics/partitions.

4. Implement Idempotent Consumers

Even with exactly-once, handle duplicates gracefully.

5. Configure min.insync.replicas

Set to at least 2 for durability.


Code Examples

Example 1: At-Least-Once Producer

import org.apache.kafka.clients.producer.*;

public class AtLeastOnceProducer {
    
    private final KafkaProducer<String, String> producer;
    
    public AtLeastOnceProducer(String bootstrapServers) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        
        // At-least-once configuration
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // Wait for all replicas
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); // 2 minutes
        
        // Ordering guarantee with retries
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // Required for ordering with retries
        
        // Batching for throughput
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
        
        this.producer = new KafkaProducer<>(props);
    }
    
    public void send(String topic, String key, String value) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
        
        try {
            // Synchronous send for guaranteed delivery
            RecordMetadata metadata = producer.send(record).get();
            
            System.out.printf("Sent to %s-%d at offset %d%n",
                metadata.topic(), metadata.partition(), metadata.offset());
                
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while sending", e);
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof RetriableException) {
                // Should not happen with our config, but handle gracefully
                throw new RuntimeException("Retriable error after max retries", cause);
            }
            throw new RuntimeException("Non-retriable error", cause);
        }
    }
    
    public void sendAsync(String topic, String key, String value, Callback callback) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
        
        // Asynchronous with callback
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                // Handle error - message might be lost if not retried
                System.err.println("Send failed: " + exception.getMessage());
            }
            if (callback != null) {
                callback.onCompletion(metadata, exception);
            }
        });
    }
    
    public void close() {
        producer.flush();
        producer.close();
    }
}

Example 2: Exactly-Once Producer with Transactions

public class ExactlyOnceProducer {
    
    private final KafkaProducer<String, String> producer;
    private final String transactionalId;
    
    public ExactlyOnceProducer(String bootstrapServers, String transactionalId) {
        this.transactionalId = transactionalId;
        
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        
        // Exactly-once configuration
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
        
        // Required settings for transactions
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        
        // Transaction timeout
        props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
        
        this.producer = new KafkaProducer<>(props);
        
        // Initialize transactions (call once per producer instance)
        producer.initTransactions();
    }
    
    /**
     * Atomic write to multiple topics
     */
    public void sendAtomically(List<ProducerRecord<String, String>> records) {
        try {
            producer.beginTransaction();
            
            for (ProducerRecord<String, String> record : records) {
                producer.send(record);
            }
            
            producer.commitTransaction();
            
        } catch (ProducerFencedException | InvalidProducerEpochException e) {
            // Fatal error - another producer with same transactional.id is active
            producer.close();
            throw new RuntimeException("Producer fenced", e);
            
        } catch (KafkaException e) {
            // Abort transaction on any error
            producer.abortTransaction();
            throw new RuntimeException("Transaction failed", e);
        }
    }
    
    /**
     * Read-process-write pattern with exactly-once
     */
    public void processAndProduce(
            KafkaConsumer<String, String> consumer,
            String outputTopic,
            Function<String, String> processor) {
        
        try {
            producer.beginTransaction();
            
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            
            for (ConsumerRecord<String, String> record : records) {
                // Process
                String result = processor.apply(record.value());
                
                // Produce to output topic
                producer.send(new ProducerRecord<>(outputTopic, record.key(), result));
            }
            
            // Atomically commit consumer offsets with producer transaction
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
            }
            
            producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
            producer.commitTransaction();
            
        } catch (ProducerFencedException | InvalidProducerEpochException e) {
            producer.close();
            throw new RuntimeException("Producer fenced", e);
            
        } catch (KafkaException e) {
            producer.abortTransaction();
            throw e;
        }
    }
    
    public void close() {
        producer.close();
    }
}

Example 3: Idempotent Consumer

public class IdempotentConsumer {
    
    private final KafkaConsumer<String, String> consumer;
    private final IdempotencyStore idempotencyStore;
    private final MessageProcessor processor;
    
    public IdempotentConsumer(String bootstrapServers, String groupId) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        
        // At-least-once: commit after processing
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        
        // Read committed transactions only
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        
        // Start from earliest on new group
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        this.consumer = new KafkaConsumer<>(props);
        this.idempotencyStore = new RedisIdempotencyStore();
        this.processor = new MessageProcessor();
    }
    
    public void consume(List<String> topics) {
        consumer.subscribe(topics);
        
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord<String, String> record : records) {
                    String idempotencyKey = generateIdempotencyKey(record);
                    
                    // Check if already processed
                    if (idempotencyStore.hasBeenProcessed(idempotencyKey)) {
                        System.out.printf("Skipping duplicate: %s%n", idempotencyKey);
                        continue;
                    }
                    
                    try {
                        // Process message
                        processor.process(record);
                        
                        // Mark as processed (with TTL for cleanup)
                        idempotencyStore.markProcessed(idempotencyKey, Duration.ofDays(7));
                        
                    } catch (Exception e) {
                        // Don't mark as processed - will be retried
                        System.err.printf("Failed to process %s: %s%n", 
                            idempotencyKey, e.getMessage());
                        throw e;
                    }
                }
                
                // Commit after successful processing
                consumer.commitSync();
            }
        } finally {
            consumer.close();
        }
    }
    
    private String generateIdempotencyKey(ConsumerRecord<String, String> record) {
        // Unique key based on topic, partition, and offset
        return String.format("%s-%d-%d",
            record.topic(), record.partition(), record.offset());
    }
}

// Idempotency store interface
interface IdempotencyStore {
    boolean hasBeenProcessed(String key);
    void markProcessed(String key, Duration ttl);
}

// Redis implementation
class RedisIdempotencyStore implements IdempotencyStore {
    
    private final JedisPool jedisPool;
    
    @Override
    public boolean hasBeenProcessed(String key) {
        try (Jedis jedis = jedisPool.getResource()) {
            return jedis.exists("processed:" + key);
        }
    }
    
    @Override
    public void markProcessed(String key, Duration ttl) {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.setex("processed:" + key, (int) ttl.toSeconds(), "1");
        }
    }
}

Example 4: Exactly-Once Stream Processing

public class ExactlyOnceStreamProcessor {
    
    public void runStream() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        // Exactly-once semantics
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        
        // Commit interval
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
        
        // Changelog replication
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Input stream
        KStream<String, String> orders = builder.stream("orders");
        
        // State store with changelog (for exactly-once state)
        StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores
            .keyValueStoreBuilder(
                Stores.persistentKeyValueStore("order-counts"),
                Serdes.String(),
                Serdes.Long()
            )
            .withCachingEnabled()
            .withLoggingEnabled(Map.of()); // Changelog for recovery
        
        builder.addStateStore(storeBuilder);
        
        // Transform with state
        KStream<String, String> enriched = orders.transformValues(
            () -> new ValueTransformerWithKey<String, String, String>() {
                private KeyValueStore<String, Long> store;
                
                @Override
                public void init(ProcessorContext context) {
                    this.store = context.getStateStore("order-counts");
                }
                
                @Override
                public String transform(String key, String value) {
                    Long count = store.get(key);
                    count = (count == null) ? 1L : count + 1;
                    store.put(key, count);
                    
                    // Return enriched value
                    return value + "|count=" + count;
                }
                
                @Override
                public void close() {}
            },
            "order-counts"
        );
        
        // Output (exactly-once to output topic)
        enriched.to("enriched-orders");
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        
        // Handle uncaught exceptions
        streams.setUncaughtExceptionHandler(exception -> {
            System.err.println("Stream error: " + exception.getMessage());
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
        });
        
        streams.start();
        
        // Shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Example 5: Consumer with Manual Offset Management

public class ManualOffsetConsumer {
    
    private final KafkaConsumer<String, String> consumer;
    private final Map<TopicPartition, Long> processedOffsets = new ConcurrentHashMap<>();
    
    public void consumeWithManualOffset(List<String> topics) {
        consumer.subscribe(topics, new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                // Commit processed offsets before rebalance
                commitProcessedOffsets();
            }
            
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                // Optionally seek to specific offsets
                for (TopicPartition partition : partitions) {
                    Long storedOffset = loadOffsetFromExternalStore(partition);
                    if (storedOffset != null) {
                        consumer.seek(partition, storedOffset);
                    }
                }
            }
        });
        
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord<String, String> record : records) {
                    processRecord(record);
                    
                    // Track processed offset
                    TopicPartition partition = new TopicPartition(record.topic(), record.partition());
                    processedOffsets.put(partition, record.offset() + 1);
                }
                
                // Periodic commit
                if (shouldCommit()) {
                    commitProcessedOffsets();
                }
            }
        } finally {
            commitProcessedOffsets();
            consumer.close();
        }
    }
    
    private void commitProcessedOffsets() {
        if (processedOffsets.isEmpty()) {
            return;
        }
        
        Map<TopicPartition, OffsetAndMetadata> offsets = processedOffsets.entrySet().stream()
            .collect(Collectors.toMap(
                Map.Entry::getKey,
                e -> new OffsetAndMetadata(e.getValue())
            ));
        
        try {
            // Also persist to external store for recovery
            saveOffsetsToExternalStore(offsets);
            
            // Commit to Kafka
            consumer.commitSync(offsets);
            
            processedOffsets.clear();
            
        } catch (CommitFailedException e) {
            System.err.println("Commit failed: " + e.getMessage());
            // Handle - possibly reprocess
        }
    }
    
    private void saveOffsetsToExternalStore(Map<TopicPartition, OffsetAndMetadata> offsets) {
        // Persist to database for recovery after crash
        // This enables exactly-once with external state
    }
    
    private Long loadOffsetFromExternalStore(TopicPartition partition) {
        // Load from database
        return null; // Return null to use Kafka-stored offset
    }
}

Anti-Patterns

❌ Auto-Commit with At-Least-Once

// WRONG - may lose messages
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
// If crash happens after auto-commit but before processing, messages are lost

// ✅ CORRECT - manual commit after processing
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// ... process records ...
consumer.commitSync();

❌ Not Handling Producer Fencing

// WRONG - ignoring ProducerFencedException
try {
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction(); // Wrong for fencing!
}

// ✅ CORRECT - close on fencing
try {
    producer.commitTransaction();
} catch (ProducerFencedException e) {
    producer.close(); // Must close and create new producer
    throw e;
}

References