KA
Failure Recovery
Kafka operations v1.0.0
Kafka Failure Recovery
Overview
Kafka is designed for fault tolerance, but proper configuration and handling is required to ensure reliable operation. This skill covers broker failures, consumer failures, producer failures, and recovery strategies.
Key Concepts
Failure Scenarios
┌─────────────────────────────────────────────────────────────┐
│ Kafka Failure Scenarios │
├─────────────────────────────────────────────────────────────┤
│ │
│ Broker Failure: │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │Broker 1 │ │Broker 2 │ │Broker 3 │ │
│ │ P0(L) │ │ P0(F) │ │ P1(L) │ │
│ │ ✗ │ │ ▲ │ │ │ │
│ └────┬────┘ └──┴──────┘ └─────────┘ │
│ │ │ │
│ └──────────┘ Follower becomes Leader │
│ │
│ Consumer Failure: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Consumer Group: order-processors │ │
│ │ │ │
│ │ Before: C1[P0,P1] C2[P2,P3] C3[P4,P5] │ │
│ │ ↓ C2 dies │ │
│ │ After: C1[P0,P1,P2] C3[P3,P4,P5] │ │
│ │ (Rebalance redistributes partitions) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Producer Failure: │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ Scenario 1: Network partition │ │
│ │ Producer ──✗──▶ Broker │ │
│ │ Action: Retry with exponential backoff │ │
│ │ │ │
│ │ Scenario 2: Broker down │ │
│ │ Action: Metadata refresh, send to new leader │ │
│ │ │ │
│ │ Scenario 3: Producer crash │ │
│ │ Action: New producer, idempotency handles dupes │ │
│ └───────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Best Practices
1. Configure Proper Replication
Use replication factor of 3 with min.insync.replicas = 2.
2. Handle Rebalances Gracefully
Implement proper shutdown and rebalance listeners.
3. Implement Circuit Breakers
Prevent cascading failures when Kafka is unavailable.
4. Monitor Lag and Alerts
Detect consumer failures through lag monitoring.
5. Test Failure Scenarios
Regularly test broker failures, consumer crashes, and network partitions.
Code Examples
Example 1: Resilient Producer
public class ResilientProducer {
private final KafkaProducer<String, String> producer;
private final CircuitBreaker circuitBreaker;
private final MeterRegistry meterRegistry;
public ResilientProducer(String bootstrapServers) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// Fault tolerance configuration
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 300000); // 5 minutes
// Handle transient failures
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
// Metadata refresh
props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 60000);
this.producer = new KafkaProducer<>(props);
// Circuit breaker for total Kafka failure
this.circuitBreaker = CircuitBreaker.of("kafka-producer",
CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.waitDurationInOpenState(Duration.ofSeconds(30))
.permittedNumberOfCallsInHalfOpenState(3)
.slidingWindowSize(10)
.recordExceptions(TimeoutException.class, KafkaException.class)
.ignoreExceptions(InvalidTopicException.class, AuthorizationException.class)
.build()
);
}
public CompletableFuture<RecordMetadata> send(String topic, String key, String value) {
return CompletableFuture.supplyAsync(() ->
circuitBreaker.executeSupplier(() -> sendWithRetry(topic, key, value))
);
}
private RecordMetadata sendWithRetry(String topic, String key, String value) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
int maxAttempts = 3;
Exception lastException = null;
for (int attempt = 1; attempt <= maxAttempts; attempt++) {
try {
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get(30, TimeUnit.SECONDS);
meterRegistry.counter("kafka.producer.success").increment();
return metadata;
} catch (ExecutionException e) {
lastException = (Exception) e.getCause();
if (isRetriable(lastException)) {
log.warn("Retriable error on attempt {}: {}", attempt, lastException.getMessage());
if (attempt < maxAttempts) {
sleepWithBackoff(attempt);
}
} else {
// Non-retriable - fail immediately
throw new RuntimeException("Non-retriable error", lastException);
}
} catch (TimeoutException e) {
lastException = e;
log.warn("Timeout on attempt {}", attempt);
if (attempt < maxAttempts) {
sleepWithBackoff(attempt);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted", e);
}
}
meterRegistry.counter("kafka.producer.failure").increment();
throw new RuntimeException("Failed after " + maxAttempts + " attempts", lastException);
}
private boolean isRetriable(Exception e) {
return e instanceof RetriableException ||
e instanceof TimeoutException ||
e instanceof NotLeaderForPartitionException ||
e instanceof NetworkException;
}
private void sleepWithBackoff(int attempt) {
try {
long backoff = (long) Math.pow(2, attempt) * 100;
Thread.sleep(Math.min(backoff, 10000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* Graceful shutdown
*/
public void close() {
// Flush pending messages
producer.flush();
// Close with timeout
producer.close(Duration.ofSeconds(30));
}
}
Example 2: Consumer Rebalance Handling
public class ResilientConsumer {
private final KafkaConsumer<String, String> consumer;
private final Map<TopicPartition, OffsetAndMetadata> pendingOffsets = new ConcurrentHashMap<>();
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
public ResilientConsumer(String bootstrapServers, String groupId) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// Manual commit for rebalance safety
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Rebalance configuration
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
// Cooperative rebalancing (Kafka 2.4+)
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
CooperativeStickyAssignor.class.getName());
this.consumer = new KafkaConsumer<>(props);
}
public void consume(List<String> topics) {
consumer.subscribe(topics, new RebalanceHandler());
try {
while (!shutdownRequested.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
continue;
}
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record);
// Track processed offset
TopicPartition partition = new TopicPartition(record.topic(), record.partition());
pendingOffsets.put(partition, new OffsetAndMetadata(record.offset() + 1));
} catch (Exception e) {
handleProcessingError(record, e);
}
}
// Commit processed offsets
commitPendingOffsets();
}
} finally {
// Final commit and cleanup
commitPendingOffsets();
consumer.close();
}
}
private void commitPendingOffsets() {
if (!pendingOffsets.isEmpty()) {
try {
consumer.commitSync(new HashMap<>(pendingOffsets), Duration.ofSeconds(10));
pendingOffsets.clear();
} catch (CommitFailedException e) {
log.error("Commit failed - rebalance occurred", e);
pendingOffsets.clear();
}
}
}
private class RebalanceHandler implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
log.info("Partitions revoked: {}", partitions);
// Commit offsets for revoked partitions
Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
for (TopicPartition partition : partitions) {
OffsetAndMetadata offset = pendingOffsets.remove(partition);
if (offset != null) {
toCommit.put(partition, offset);
}
}
if (!toCommit.isEmpty()) {
try {
consumer.commitSync(toCommit);
log.info("Committed offsets for revoked partitions: {}", toCommit);
} catch (Exception e) {
log.error("Failed to commit on revocation", e);
}
}
// Cleanup any partition-specific state
cleanupPartitionState(partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.info("Partitions assigned: {}", partitions);
// Initialize state for new partitions
initializePartitionState(partitions);
}
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
// Called when partitions are lost without revocation
// (e.g., consumer crash or session timeout)
log.warn("Partitions lost: {}", partitions);
// Cannot commit here - partitions already reassigned
pendingOffsets.keySet().removeAll(partitions);
cleanupPartitionState(partitions);
}
}
public void shutdown() {
shutdownRequested.set(true);
consumer.wakeup(); // Interrupt poll
}
}
Example 3: Dead Letter Queue Pattern
public class DeadLetterQueueHandler {
private final KafkaProducer<String, String> dlqProducer;
private final String dlqTopic;
private final int maxRetries;
public DeadLetterQueueHandler(String bootstrapServers, String dlqTopic, int maxRetries) {
this.dlqTopic = dlqTopic;
this.maxRetries = maxRetries;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
this.dlqProducer = new KafkaProducer<>(props);
}
public void processWithDLQ(ConsumerRecord<String, String> record,
ThrowingConsumer<String> processor) {
// Get retry count from headers
int retryCount = getRetryCount(record);
try {
processor.accept(record.value());
} catch (Exception e) {
if (retryCount < maxRetries) {
// Send to retry topic
sendToRetryTopic(record, retryCount + 1, e);
} else {
// Max retries exceeded - send to DLQ
sendToDLQ(record, e);
}
}
}
private void sendToRetryTopic(ConsumerRecord<String, String> original,
int retryCount, Exception error) {
String retryTopic = original.topic() + ".retry";
ProducerRecord<String, String> retry = new ProducerRecord<>(
retryTopic,
original.key(),
original.value()
);
// Add metadata
retry.headers().add("retry-count", String.valueOf(retryCount).getBytes());
retry.headers().add("original-topic", original.topic().getBytes());
retry.headers().add("original-partition", String.valueOf(original.partition()).getBytes());
retry.headers().add("original-offset", String.valueOf(original.offset()).getBytes());
retry.headers().add("error-message", error.getMessage().getBytes());
retry.headers().add("retry-timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
// Calculate delay for exponential backoff
long delay = (long) Math.pow(2, retryCount) * 1000; // 2s, 4s, 8s...
retry.headers().add("process-after", String.valueOf(System.currentTimeMillis() + delay).getBytes());
dlqProducer.send(retry);
log.info("Sent to retry topic {} (attempt {})", retryTopic, retryCount);
}
private void sendToDLQ(ConsumerRecord<String, String> original, Exception error) {
ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
dlqTopic,
original.key(),
original.value()
);
// Add comprehensive metadata for investigation
dlqRecord.headers().add("original-topic", original.topic().getBytes());
dlqRecord.headers().add("original-partition", String.valueOf(original.partition()).getBytes());
dlqRecord.headers().add("original-offset", String.valueOf(original.offset()).getBytes());
dlqRecord.headers().add("original-timestamp", String.valueOf(original.timestamp()).getBytes());
dlqRecord.headers().add("error-class", error.getClass().getName().getBytes());
dlqRecord.headers().add("error-message", error.getMessage().getBytes());
dlqRecord.headers().add("error-stacktrace", getStackTrace(error).getBytes());
dlqRecord.headers().add("dlq-timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
dlqProducer.send(dlqRecord, (metadata, exception) -> {
if (exception != null) {
log.error("Failed to send to DLQ!", exception);
// Critical - consider alternative handling
} else {
log.warn("Sent to DLQ: topic={}, partition={}, offset={}",
original.topic(), original.partition(), original.offset());
}
});
}
private int getRetryCount(ConsumerRecord<String, String> record) {
Header header = record.headers().lastHeader("retry-count");
if (header == null) {
return 0;
}
return Integer.parseInt(new String(header.value()));
}
private String getStackTrace(Exception e) {
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
return sw.toString();
}
}
Example 4: Broker Failure Recovery
public class BrokerFailureHandler {
private final AdminClient adminClient;
private final MeterRegistry meterRegistry;
/**
* Monitor cluster health and handle broker failures
*/
@Scheduled(fixedRate = 30000)
public void monitorClusterHealth() throws ExecutionException, InterruptedException {
DescribeClusterResult cluster = adminClient.describeCluster();
Collection<Node> nodes = cluster.nodes().get();
Node controller = cluster.controller().get();
meterRegistry.gauge("kafka.cluster.broker_count", nodes.size());
// Check for under-replicated partitions
checkUnderReplicated();
// Check for offline partitions
checkOfflinePartitions();
// Check for leader imbalance
checkLeaderBalance(nodes);
}
private void checkUnderReplicated() throws ExecutionException, InterruptedException {
Set<String> topics = adminClient.listTopics().names().get();
Map<String, TopicDescription> descriptions =
adminClient.describeTopics(topics).allTopicNames().get();
int underReplicated = 0;
for (TopicDescription topic : descriptions.values()) {
for (TopicPartitionInfo partition : topic.partitions()) {
if (partition.isr().size() < partition.replicas().size()) {
underReplicated++;
log.warn("Under-replicated: {}-{} (ISR: {}/{})",
topic.name(), partition.partition(),
partition.isr().size(), partition.replicas().size());
}
}
}
meterRegistry.gauge("kafka.partitions.under_replicated", underReplicated);
if (underReplicated > 0) {
alertOps("Under-replicated partitions detected: " + underReplicated);
}
}
private void checkOfflinePartitions() throws ExecutionException, InterruptedException {
Set<String> topics = adminClient.listTopics().names().get();
Map<String, TopicDescription> descriptions =
adminClient.describeTopics(topics).allTopicNames().get();
List<String> offline = new ArrayList<>();
for (TopicDescription topic : descriptions.values()) {
for (TopicPartitionInfo partition : topic.partitions()) {
if (partition.leader() == null) {
offline.add(topic.name() + "-" + partition.partition());
}
}
}
meterRegistry.gauge("kafka.partitions.offline", offline.size());
if (!offline.isEmpty()) {
alertOps("CRITICAL: Offline partitions: " + offline);
}
}
/**
* Trigger leader rebalance if needed
*/
public void triggerLeaderRebalance() throws ExecutionException, InterruptedException {
// Trigger preferred leader election
Set<String> topics = adminClient.listTopics().names().get();
Map<String, TopicDescription> descriptions =
adminClient.describeTopics(topics).allTopicNames().get();
Set<TopicPartition> partitionsToRebalance = new HashSet<>();
for (TopicDescription topic : descriptions.values()) {
for (TopicPartitionInfo partition : topic.partitions()) {
if (partition.leader() != null && !partition.replicas().isEmpty()) {
Node preferredLeader = partition.replicas().get(0);
if (!partition.leader().equals(preferredLeader)) {
partitionsToRebalance.add(
new TopicPartition(topic.name(), partition.partition()));
}
}
}
}
if (!partitionsToRebalance.isEmpty()) {
log.info("Triggering leader election for {} partitions", partitionsToRebalance.size());
adminClient.electLeaders(
ElectionType.PREFERRED,
partitionsToRebalance
).all().get();
}
}
/**
* Recovery procedure after broker comes back online
*/
public void handleBrokerRecovery(int brokerId) {
log.info("Broker {} recovered, initiating recovery procedures", brokerId);
// Wait for ISR to catch up
try {
waitForIsrCatchup(brokerId, Duration.ofMinutes(10));
// Trigger leader rebalance
triggerLeaderRebalance();
log.info("Broker {} recovery complete", brokerId);
} catch (Exception e) {
log.error("Broker {} recovery failed", brokerId, e);
alertOps("Broker recovery failed: " + brokerId);
}
}
private void waitForIsrCatchup(int brokerId, Duration timeout)
throws ExecutionException, InterruptedException {
Instant deadline = Instant.now().plus(timeout);
while (Instant.now().isBefore(deadline)) {
boolean allCaughtUp = checkBrokerIsrStatus(brokerId);
if (allCaughtUp) {
log.info("Broker {} ISR caught up", brokerId);
return;
}
Thread.sleep(5000);
}
throw new TimeoutException("Broker " + brokerId + " ISR catch-up timeout");
}
}
Example 5: Consumer Lag Monitoring
public class ConsumerLagMonitor {
private final AdminClient adminClient;
private final MeterRegistry meterRegistry;
private final AlertService alertService;
@Scheduled(fixedRate = 10000)
public void monitorConsumerLag() {
try {
List<String> groupIds = adminClient.listConsumerGroups()
.all().get().stream()
.map(ConsumerGroupListing::groupId)
.filter(id -> !id.startsWith("_")) // Exclude internal groups
.collect(Collectors.toList());
for (String groupId : groupIds) {
monitorGroup(groupId);
}
} catch (Exception e) {
log.error("Failed to monitor consumer lag", e);
}
}
private void monitorGroup(String groupId) throws ExecutionException, InterruptedException {
// Get group state
ConsumerGroupDescription description = adminClient
.describeConsumerGroups(List.of(groupId))
.describedGroups().get(groupId).get();
ConsumerGroupState state = description.state();
meterRegistry.gauge("kafka.consumer.state",
Tags.of("group", groupId),
state.ordinal());
if (state != ConsumerGroupState.STABLE) {
log.warn("Consumer group {} is in state: {}", groupId, state);
}
// Get committed offsets
Map<TopicPartition, OffsetAndMetadata> offsets = adminClient
.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata().get();
if (offsets.isEmpty()) {
return;
}
// Get end offsets
Map<TopicPartition, OffsetSpec> endOffsetRequest = offsets.keySet().stream()
.collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()));
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
adminClient.listOffsets(endOffsetRequest).all().get();
// Calculate lag per partition
long totalLag = 0;
Map<String, Long> topicLag = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
TopicPartition tp = entry.getKey();
long committed = entry.getValue().offset();
long end = endOffsets.get(tp).offset();
long lag = end - committed;
totalLag += lag;
topicLag.merge(tp.topic(), lag, Long::sum);
// Record per-partition metric
meterRegistry.gauge("kafka.consumer.lag",
Tags.of(
"group", groupId,
"topic", tp.topic(),
"partition", String.valueOf(tp.partition())
),
lag
);
}
// Record total lag
meterRegistry.gauge("kafka.consumer.total_lag",
Tags.of("group", groupId),
totalLag
);
// Alert on high lag
if (totalLag > 10000) {
alertService.warn(String.format(
"High consumer lag for group %s: %d messages behind",
groupId, totalLag
));
}
// Check for stalled consumers
checkForStalledConsumers(groupId, offsets);
}
private void checkForStalledConsumers(String groupId,
Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
// Compare with previous offsets
Map<TopicPartition, Long> previousOffsets = lastSeenOffsets.get(groupId);
if (previousOffsets != null) {
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : currentOffsets.entrySet()) {
TopicPartition tp = entry.getKey();
long current = entry.getValue().offset();
Long previous = previousOffsets.get(tp);
if (previous != null && current == previous) {
log.warn("Consumer stalled: group={}, partition={}", groupId, tp);
meterRegistry.counter("kafka.consumer.stalled",
Tags.of("group", groupId, "topic", tp.topic())
).increment();
}
}
}
// Update last seen
lastSeenOffsets.put(groupId, currentOffsets.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())));
}
}
Anti-Patterns
❌ No Graceful Shutdown
// WRONG - abrupt shutdown
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumer.close(); // Doesn't commit pending offsets
}));
// ✅ CORRECT - graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumer.wakeup(); // Signal shutdown
// Main loop will commit and close properly
}));
❌ Ignoring Rebalance Events
Always implement ConsumerRebalanceListener to commit offsets before partitions are revoked.