Skip to content
Home / Skills / Kafka / Failure Recovery
KA

Failure Recovery

Kafka operations v1.0.0

Kafka Failure Recovery

Overview

Kafka is designed for fault tolerance, but proper configuration and handling is required to ensure reliable operation. This skill covers broker failures, consumer failures, producer failures, and recovery strategies.


Key Concepts

Failure Scenarios

┌─────────────────────────────────────────────────────────────┐
│                    Kafka Failure Scenarios                   │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  Broker Failure:                                             │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐                      │
│  │Broker 1 │  │Broker 2 │  │Broker 3 │                      │
│  │  P0(L)  │  │  P0(F)  │  │  P1(L)  │                      │
│  │   ✗     │  │  ▲      │  │         │                      │
│  └────┬────┘  └──┴──────┘  └─────────┘                      │
│       │          │                                           │
│       └──────────┘ Follower becomes Leader                  │
│                                                              │
│  Consumer Failure:                                           │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ Consumer Group: order-processors                     │   │
│  │                                                      │   │
│  │ Before: C1[P0,P1] C2[P2,P3] C3[P4,P5]              │   │
│  │                     ↓ C2 dies                        │   │
│  │ After:  C1[P0,P1,P2] C3[P3,P4,P5]                   │   │
│  │         (Rebalance redistributes partitions)         │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                              │
│  Producer Failure:                                           │
│  ┌───────────────────────────────────────────────────────┐ │
│  │ Scenario 1: Network partition                         │ │
│  │   Producer ──✗──▶ Broker                             │ │
│  │   Action: Retry with exponential backoff              │ │
│  │                                                       │ │
│  │ Scenario 2: Broker down                              │ │
│  │   Action: Metadata refresh, send to new leader       │ │
│  │                                                       │ │
│  │ Scenario 3: Producer crash                           │ │
│  │   Action: New producer, idempotency handles dupes    │ │
│  └───────────────────────────────────────────────────────┘ │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Best Practices

1. Configure Proper Replication

Use replication factor of 3 with min.insync.replicas = 2.

2. Handle Rebalances Gracefully

Implement proper shutdown and rebalance listeners.

3. Implement Circuit Breakers

Prevent cascading failures when Kafka is unavailable.

4. Monitor Lag and Alerts

Detect consumer failures through lag monitoring.

5. Test Failure Scenarios

Regularly test broker failures, consumer crashes, and network partitions.


Code Examples

Example 1: Resilient Producer

public class ResilientProducer {
    
    private final KafkaProducer<String, String> producer;
    private final CircuitBreaker circuitBreaker;
    private final MeterRegistry meterRegistry;
    
    public ResilientProducer(String bootstrapServers) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        
        // Fault tolerance configuration
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 300000); // 5 minutes
        
        // Handle transient failures
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
        
        // Metadata refresh
        props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 60000);
        
        this.producer = new KafkaProducer<>(props);
        
        // Circuit breaker for total Kafka failure
        this.circuitBreaker = CircuitBreaker.of("kafka-producer",
            CircuitBreakerConfig.custom()
                .failureRateThreshold(50)
                .waitDurationInOpenState(Duration.ofSeconds(30))
                .permittedNumberOfCallsInHalfOpenState(3)
                .slidingWindowSize(10)
                .recordExceptions(TimeoutException.class, KafkaException.class)
                .ignoreExceptions(InvalidTopicException.class, AuthorizationException.class)
                .build()
        );
    }
    
    public CompletableFuture<RecordMetadata> send(String topic, String key, String value) {
        return CompletableFuture.supplyAsync(() -> 
            circuitBreaker.executeSupplier(() -> sendWithRetry(topic, key, value))
        );
    }
    
    private RecordMetadata sendWithRetry(String topic, String key, String value) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
        
        int maxAttempts = 3;
        Exception lastException = null;
        
        for (int attempt = 1; attempt <= maxAttempts; attempt++) {
            try {
                Future<RecordMetadata> future = producer.send(record);
                RecordMetadata metadata = future.get(30, TimeUnit.SECONDS);
                
                meterRegistry.counter("kafka.producer.success").increment();
                return metadata;
                
            } catch (ExecutionException e) {
                lastException = (Exception) e.getCause();
                
                if (isRetriable(lastException)) {
                    log.warn("Retriable error on attempt {}: {}", attempt, lastException.getMessage());
                    
                    if (attempt < maxAttempts) {
                        sleepWithBackoff(attempt);
                    }
                } else {
                    // Non-retriable - fail immediately
                    throw new RuntimeException("Non-retriable error", lastException);
                }
                
            } catch (TimeoutException e) {
                lastException = e;
                log.warn("Timeout on attempt {}", attempt);
                
                if (attempt < maxAttempts) {
                    sleepWithBackoff(attempt);
                }
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted", e);
            }
        }
        
        meterRegistry.counter("kafka.producer.failure").increment();
        throw new RuntimeException("Failed after " + maxAttempts + " attempts", lastException);
    }
    
    private boolean isRetriable(Exception e) {
        return e instanceof RetriableException ||
               e instanceof TimeoutException ||
               e instanceof NotLeaderForPartitionException ||
               e instanceof NetworkException;
    }
    
    private void sleepWithBackoff(int attempt) {
        try {
            long backoff = (long) Math.pow(2, attempt) * 100;
            Thread.sleep(Math.min(backoff, 10000));
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    /**
     * Graceful shutdown
     */
    public void close() {
        // Flush pending messages
        producer.flush();
        
        // Close with timeout
        producer.close(Duration.ofSeconds(30));
    }
}

Example 2: Consumer Rebalance Handling

public class ResilientConsumer {
    
    private final KafkaConsumer<String, String> consumer;
    private final Map<TopicPartition, OffsetAndMetadata> pendingOffsets = new ConcurrentHashMap<>();
    private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
    
    public ResilientConsumer(String bootstrapServers, String groupId) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        
        // Manual commit for rebalance safety
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        
        // Rebalance configuration
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
        
        // Cooperative rebalancing (Kafka 2.4+)
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
            CooperativeStickyAssignor.class.getName());
        
        this.consumer = new KafkaConsumer<>(props);
    }
    
    public void consume(List<String> topics) {
        consumer.subscribe(topics, new RebalanceHandler());
        
        try {
            while (!shutdownRequested.get()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                
                if (records.isEmpty()) {
                    continue;
                }
                
                for (ConsumerRecord<String, String> record : records) {
                    try {
                        processRecord(record);
                        
                        // Track processed offset
                        TopicPartition partition = new TopicPartition(record.topic(), record.partition());
                        pendingOffsets.put(partition, new OffsetAndMetadata(record.offset() + 1));
                        
                    } catch (Exception e) {
                        handleProcessingError(record, e);
                    }
                }
                
                // Commit processed offsets
                commitPendingOffsets();
            }
        } finally {
            // Final commit and cleanup
            commitPendingOffsets();
            consumer.close();
        }
    }
    
    private void commitPendingOffsets() {
        if (!pendingOffsets.isEmpty()) {
            try {
                consumer.commitSync(new HashMap<>(pendingOffsets), Duration.ofSeconds(10));
                pendingOffsets.clear();
            } catch (CommitFailedException e) {
                log.error("Commit failed - rebalance occurred", e);
                pendingOffsets.clear();
            }
        }
    }
    
    private class RebalanceHandler implements ConsumerRebalanceListener {
        
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            log.info("Partitions revoked: {}", partitions);
            
            // Commit offsets for revoked partitions
            Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
            for (TopicPartition partition : partitions) {
                OffsetAndMetadata offset = pendingOffsets.remove(partition);
                if (offset != null) {
                    toCommit.put(partition, offset);
                }
            }
            
            if (!toCommit.isEmpty()) {
                try {
                    consumer.commitSync(toCommit);
                    log.info("Committed offsets for revoked partitions: {}", toCommit);
                } catch (Exception e) {
                    log.error("Failed to commit on revocation", e);
                }
            }
            
            // Cleanup any partition-specific state
            cleanupPartitionState(partitions);
        }
        
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            log.info("Partitions assigned: {}", partitions);
            
            // Initialize state for new partitions
            initializePartitionState(partitions);
        }
        
        @Override
        public void onPartitionsLost(Collection<TopicPartition> partitions) {
            // Called when partitions are lost without revocation
            // (e.g., consumer crash or session timeout)
            log.warn("Partitions lost: {}", partitions);
            
            // Cannot commit here - partitions already reassigned
            pendingOffsets.keySet().removeAll(partitions);
            cleanupPartitionState(partitions);
        }
    }
    
    public void shutdown() {
        shutdownRequested.set(true);
        consumer.wakeup(); // Interrupt poll
    }
}

Example 3: Dead Letter Queue Pattern

public class DeadLetterQueueHandler {
    
    private final KafkaProducer<String, String> dlqProducer;
    private final String dlqTopic;
    private final int maxRetries;
    
    public DeadLetterQueueHandler(String bootstrapServers, String dlqTopic, int maxRetries) {
        this.dlqTopic = dlqTopic;
        this.maxRetries = maxRetries;
        
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        this.dlqProducer = new KafkaProducer<>(props);
    }
    
    public void processWithDLQ(ConsumerRecord<String, String> record, 
                               ThrowingConsumer<String> processor) {
        
        // Get retry count from headers
        int retryCount = getRetryCount(record);
        
        try {
            processor.accept(record.value());
            
        } catch (Exception e) {
            if (retryCount < maxRetries) {
                // Send to retry topic
                sendToRetryTopic(record, retryCount + 1, e);
            } else {
                // Max retries exceeded - send to DLQ
                sendToDLQ(record, e);
            }
        }
    }
    
    private void sendToRetryTopic(ConsumerRecord<String, String> original, 
                                   int retryCount, Exception error) {
        
        String retryTopic = original.topic() + ".retry";
        
        ProducerRecord<String, String> retry = new ProducerRecord<>(
            retryTopic,
            original.key(),
            original.value()
        );
        
        // Add metadata
        retry.headers().add("retry-count", String.valueOf(retryCount).getBytes());
        retry.headers().add("original-topic", original.topic().getBytes());
        retry.headers().add("original-partition", String.valueOf(original.partition()).getBytes());
        retry.headers().add("original-offset", String.valueOf(original.offset()).getBytes());
        retry.headers().add("error-message", error.getMessage().getBytes());
        retry.headers().add("retry-timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
        
        // Calculate delay for exponential backoff
        long delay = (long) Math.pow(2, retryCount) * 1000; // 2s, 4s, 8s...
        retry.headers().add("process-after", String.valueOf(System.currentTimeMillis() + delay).getBytes());
        
        dlqProducer.send(retry);
        
        log.info("Sent to retry topic {} (attempt {})", retryTopic, retryCount);
    }
    
    private void sendToDLQ(ConsumerRecord<String, String> original, Exception error) {
        ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
            dlqTopic,
            original.key(),
            original.value()
        );
        
        // Add comprehensive metadata for investigation
        dlqRecord.headers().add("original-topic", original.topic().getBytes());
        dlqRecord.headers().add("original-partition", String.valueOf(original.partition()).getBytes());
        dlqRecord.headers().add("original-offset", String.valueOf(original.offset()).getBytes());
        dlqRecord.headers().add("original-timestamp", String.valueOf(original.timestamp()).getBytes());
        dlqRecord.headers().add("error-class", error.getClass().getName().getBytes());
        dlqRecord.headers().add("error-message", error.getMessage().getBytes());
        dlqRecord.headers().add("error-stacktrace", getStackTrace(error).getBytes());
        dlqRecord.headers().add("dlq-timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
        
        dlqProducer.send(dlqRecord, (metadata, exception) -> {
            if (exception != null) {
                log.error("Failed to send to DLQ!", exception);
                // Critical - consider alternative handling
            } else {
                log.warn("Sent to DLQ: topic={}, partition={}, offset={}",
                    original.topic(), original.partition(), original.offset());
            }
        });
    }
    
    private int getRetryCount(ConsumerRecord<String, String> record) {
        Header header = record.headers().lastHeader("retry-count");
        if (header == null) {
            return 0;
        }
        return Integer.parseInt(new String(header.value()));
    }
    
    private String getStackTrace(Exception e) {
        StringWriter sw = new StringWriter();
        e.printStackTrace(new PrintWriter(sw));
        return sw.toString();
    }
}

Example 4: Broker Failure Recovery

public class BrokerFailureHandler {
    
    private final AdminClient adminClient;
    private final MeterRegistry meterRegistry;
    
    /**
     * Monitor cluster health and handle broker failures
     */
    @Scheduled(fixedRate = 30000)
    public void monitorClusterHealth() throws ExecutionException, InterruptedException {
        DescribeClusterResult cluster = adminClient.describeCluster();
        
        Collection<Node> nodes = cluster.nodes().get();
        Node controller = cluster.controller().get();
        
        meterRegistry.gauge("kafka.cluster.broker_count", nodes.size());
        
        // Check for under-replicated partitions
        checkUnderReplicated();
        
        // Check for offline partitions
        checkOfflinePartitions();
        
        // Check for leader imbalance
        checkLeaderBalance(nodes);
    }
    
    private void checkUnderReplicated() throws ExecutionException, InterruptedException {
        Set<String> topics = adminClient.listTopics().names().get();
        Map<String, TopicDescription> descriptions = 
            adminClient.describeTopics(topics).allTopicNames().get();
        
        int underReplicated = 0;
        
        for (TopicDescription topic : descriptions.values()) {
            for (TopicPartitionInfo partition : topic.partitions()) {
                if (partition.isr().size() < partition.replicas().size()) {
                    underReplicated++;
                    
                    log.warn("Under-replicated: {}-{} (ISR: {}/{})",
                        topic.name(), partition.partition(),
                        partition.isr().size(), partition.replicas().size());
                }
            }
        }
        
        meterRegistry.gauge("kafka.partitions.under_replicated", underReplicated);
        
        if (underReplicated > 0) {
            alertOps("Under-replicated partitions detected: " + underReplicated);
        }
    }
    
    private void checkOfflinePartitions() throws ExecutionException, InterruptedException {
        Set<String> topics = adminClient.listTopics().names().get();
        Map<String, TopicDescription> descriptions = 
            adminClient.describeTopics(topics).allTopicNames().get();
        
        List<String> offline = new ArrayList<>();
        
        for (TopicDescription topic : descriptions.values()) {
            for (TopicPartitionInfo partition : topic.partitions()) {
                if (partition.leader() == null) {
                    offline.add(topic.name() + "-" + partition.partition());
                }
            }
        }
        
        meterRegistry.gauge("kafka.partitions.offline", offline.size());
        
        if (!offline.isEmpty()) {
            alertOps("CRITICAL: Offline partitions: " + offline);
        }
    }
    
    /**
     * Trigger leader rebalance if needed
     */
    public void triggerLeaderRebalance() throws ExecutionException, InterruptedException {
        // Trigger preferred leader election
        Set<String> topics = adminClient.listTopics().names().get();
        Map<String, TopicDescription> descriptions = 
            adminClient.describeTopics(topics).allTopicNames().get();
        
        Set<TopicPartition> partitionsToRebalance = new HashSet<>();
        
        for (TopicDescription topic : descriptions.values()) {
            for (TopicPartitionInfo partition : topic.partitions()) {
                if (partition.leader() != null && !partition.replicas().isEmpty()) {
                    Node preferredLeader = partition.replicas().get(0);
                    if (!partition.leader().equals(preferredLeader)) {
                        partitionsToRebalance.add(
                            new TopicPartition(topic.name(), partition.partition()));
                    }
                }
            }
        }
        
        if (!partitionsToRebalance.isEmpty()) {
            log.info("Triggering leader election for {} partitions", partitionsToRebalance.size());
            
            adminClient.electLeaders(
                ElectionType.PREFERRED, 
                partitionsToRebalance
            ).all().get();
        }
    }
    
    /**
     * Recovery procedure after broker comes back online
     */
    public void handleBrokerRecovery(int brokerId) {
        log.info("Broker {} recovered, initiating recovery procedures", brokerId);
        
        // Wait for ISR to catch up
        try {
            waitForIsrCatchup(brokerId, Duration.ofMinutes(10));
            
            // Trigger leader rebalance
            triggerLeaderRebalance();
            
            log.info("Broker {} recovery complete", brokerId);
            
        } catch (Exception e) {
            log.error("Broker {} recovery failed", brokerId, e);
            alertOps("Broker recovery failed: " + brokerId);
        }
    }
    
    private void waitForIsrCatchup(int brokerId, Duration timeout) 
            throws ExecutionException, InterruptedException {
        
        Instant deadline = Instant.now().plus(timeout);
        
        while (Instant.now().isBefore(deadline)) {
            boolean allCaughtUp = checkBrokerIsrStatus(brokerId);
            
            if (allCaughtUp) {
                log.info("Broker {} ISR caught up", brokerId);
                return;
            }
            
            Thread.sleep(5000);
        }
        
        throw new TimeoutException("Broker " + brokerId + " ISR catch-up timeout");
    }
}

Example 5: Consumer Lag Monitoring

public class ConsumerLagMonitor {
    
    private final AdminClient adminClient;
    private final MeterRegistry meterRegistry;
    private final AlertService alertService;
    
    @Scheduled(fixedRate = 10000)
    public void monitorConsumerLag() {
        try {
            List<String> groupIds = adminClient.listConsumerGroups()
                .all().get().stream()
                .map(ConsumerGroupListing::groupId)
                .filter(id -> !id.startsWith("_")) // Exclude internal groups
                .collect(Collectors.toList());
            
            for (String groupId : groupIds) {
                monitorGroup(groupId);
            }
            
        } catch (Exception e) {
            log.error("Failed to monitor consumer lag", e);
        }
    }
    
    private void monitorGroup(String groupId) throws ExecutionException, InterruptedException {
        // Get group state
        ConsumerGroupDescription description = adminClient
            .describeConsumerGroups(List.of(groupId))
            .describedGroups().get(groupId).get();
        
        ConsumerGroupState state = description.state();
        meterRegistry.gauge("kafka.consumer.state",
            Tags.of("group", groupId),
            state.ordinal());
        
        if (state != ConsumerGroupState.STABLE) {
            log.warn("Consumer group {} is in state: {}", groupId, state);
        }
        
        // Get committed offsets
        Map<TopicPartition, OffsetAndMetadata> offsets = adminClient
            .listConsumerGroupOffsets(groupId)
            .partitionsToOffsetAndMetadata().get();
        
        if (offsets.isEmpty()) {
            return;
        }
        
        // Get end offsets
        Map<TopicPartition, OffsetSpec> endOffsetRequest = offsets.keySet().stream()
            .collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()));
        
        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
            adminClient.listOffsets(endOffsetRequest).all().get();
        
        // Calculate lag per partition
        long totalLag = 0;
        Map<String, Long> topicLag = new HashMap<>();
        
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            TopicPartition tp = entry.getKey();
            long committed = entry.getValue().offset();
            long end = endOffsets.get(tp).offset();
            long lag = end - committed;
            
            totalLag += lag;
            topicLag.merge(tp.topic(), lag, Long::sum);
            
            // Record per-partition metric
            meterRegistry.gauge("kafka.consumer.lag",
                Tags.of(
                    "group", groupId,
                    "topic", tp.topic(),
                    "partition", String.valueOf(tp.partition())
                ),
                lag
            );
        }
        
        // Record total lag
        meterRegistry.gauge("kafka.consumer.total_lag",
            Tags.of("group", groupId),
            totalLag
        );
        
        // Alert on high lag
        if (totalLag > 10000) {
            alertService.warn(String.format(
                "High consumer lag for group %s: %d messages behind",
                groupId, totalLag
            ));
        }
        
        // Check for stalled consumers
        checkForStalledConsumers(groupId, offsets);
    }
    
    private void checkForStalledConsumers(String groupId, 
                                          Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        // Compare with previous offsets
        Map<TopicPartition, Long> previousOffsets = lastSeenOffsets.get(groupId);
        
        if (previousOffsets != null) {
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : currentOffsets.entrySet()) {
                TopicPartition tp = entry.getKey();
                long current = entry.getValue().offset();
                Long previous = previousOffsets.get(tp);
                
                if (previous != null && current == previous) {
                    log.warn("Consumer stalled: group={}, partition={}", groupId, tp);
                    
                    meterRegistry.counter("kafka.consumer.stalled",
                        Tags.of("group", groupId, "topic", tp.topic())
                    ).increment();
                }
            }
        }
        
        // Update last seen
        lastSeenOffsets.put(groupId, currentOffsets.entrySet().stream()
            .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())));
    }
}

Anti-Patterns

❌ No Graceful Shutdown

// WRONG - abrupt shutdown
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    consumer.close(); // Doesn't commit pending offsets
}));

// ✅ CORRECT - graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    consumer.wakeup(); // Signal shutdown
    // Main loop will commit and close properly
}));

❌ Ignoring Rebalance Events

Always implement ConsumerRebalanceListener to commit offsets before partitions are revoked.


References