Skip to content
Home / Skills / Distributed Systems / Consensus
DI

Consensus

Distributed Systems core v1.0.0

Distributed Consensus

Overview

Distributed consensus enables multiple nodes to agree on a single value or sequence of values despite failures. This skill covers consensus algorithms (Raft, Paxos), leader election, and quorum-based systems.


Key Concepts

Consensus Problem

┌─────────────────────────────────────────────────────────────┐
│                 Distributed Consensus                        │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  The Problem:                                                │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ How do N nodes agree on a value when:               │   │
│  │ • Nodes may crash                                    │   │
│  │ • Network may partition                              │   │
│  │ • Messages may be delayed/reordered                 │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                              │
│  Raft Algorithm (Leader-Based):                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                                                      │   │
│  │   ┌────────┐    votes     ┌────────┐               │   │
│  │   │Follower│◄────────────▶│Follower│               │   │
│  │   └────┬───┘              └───┬────┘               │   │
│  │        │                      │                     │   │
│  │        │    ┌────────┐        │                    │   │
│  │        └───▶│ Leader │◄───────┘                    │   │
│  │             └────┬───┘                              │   │
│  │                  │                                  │   │
│  │                  ▼                                  │   │
│  │        Log Replication (AppendEntries)             │   │
│  │                                                      │   │
│  │  States: Follower ──▶ Candidate ──▶ Leader         │   │
│  │                 election timeout    wins majority   │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                              │
│  Quorum:                                                    │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ N nodes, majority quorum = ⌊N/2⌋ + 1                │   │
│  │                                                      │   │
│  │ 3 nodes: need 2 for quorum (tolerate 1 failure)    │   │
│  │ 5 nodes: need 3 for quorum (tolerate 2 failures)   │   │
│  │ 7 nodes: need 4 for quorum (tolerate 3 failures)   │   │
│  │                                                      │   │
│  │ Any two quorums overlap = consistency guaranteed    │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Consensus Properties

PropertyDescription
AgreementAll correct nodes decide on the same value
ValidityDecided value was proposed by some node
TerminationAll correct nodes eventually decide
IntegrityEach node decides at most once

Best Practices

1. Use Odd Number of Nodes

Odd numbers (3, 5, 7) optimize quorum efficiency.

2. Configure Appropriate Timeouts

Balance between failure detection speed and false positives.

3. Monitor Raft Metrics

Track leader changes, commit latency, and log size.

4. Plan for Network Partitions

Design for split-brain scenarios.

5. Use Existing Implementations

Don’t implement consensus from scratch in production.


Code Examples

Example 1: Leader Election

public class LeaderElection {
    
    private final String nodeId;
    private final List<String> clusterNodes;
    private final AtomicReference<String> currentLeader = new AtomicReference<>();
    private final AtomicLong currentTerm = new AtomicLong(0);
    private final AtomicReference<NodeState> state = new AtomicReference<>(NodeState.FOLLOWER);
    
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
    private volatile ScheduledFuture<?> electionTimer;
    
    private static final long ELECTION_TIMEOUT_MIN = 150;
    private static final long ELECTION_TIMEOUT_MAX = 300;
    private static final long HEARTBEAT_INTERVAL = 50;
    
    enum NodeState {
        FOLLOWER, CANDIDATE, LEADER
    }
    
    public void start() {
        resetElectionTimer();
    }
    
    private void resetElectionTimer() {
        if (electionTimer != null) {
            electionTimer.cancel(false);
        }
        
        // Random timeout to prevent split votes
        long timeout = ELECTION_TIMEOUT_MIN + 
            ThreadLocalRandom.current().nextLong(ELECTION_TIMEOUT_MAX - ELECTION_TIMEOUT_MIN);
        
        electionTimer = scheduler.schedule(
            this::startElection,
            timeout,
            TimeUnit.MILLISECONDS
        );
    }
    
    private void startElection() {
        // Transition to candidate
        state.set(NodeState.CANDIDATE);
        long newTerm = currentTerm.incrementAndGet();
        
        log.info("Starting election for term {}", newTerm);
        
        // Vote for self
        AtomicInteger votes = new AtomicInteger(1);
        int majority = (clusterNodes.size() / 2) + 1;
        
        // Request votes from other nodes
        CompletableFuture<?>[] voteFutures = clusterNodes.stream()
            .filter(node -> !node.equals(nodeId))
            .map(node -> requestVote(node, newTerm))
            .toArray(CompletableFuture[]::new);
        
        CompletableFuture.allOf(voteFutures)
            .orTimeout(ELECTION_TIMEOUT_MIN, TimeUnit.MILLISECONDS)
            .whenComplete((result, error) -> {
                if (state.get() == NodeState.CANDIDATE && currentTerm.get() == newTerm) {
                    if (votes.get() >= majority) {
                        becomeLeader();
                    } else {
                        // Failed to get majority, restart election
                        state.set(NodeState.FOLLOWER);
                        resetElectionTimer();
                    }
                }
            });
    }
    
    private CompletableFuture<Boolean> requestVote(String node, long term) {
        // Send vote request to node
        VoteRequest request = new VoteRequest(nodeId, term);
        
        return rpcClient.sendVoteRequest(node, request)
            .thenApply(response -> {
                if (response.isGranted()) {
                    votes.incrementAndGet();
                    return true;
                }
                if (response.getTerm() > currentTerm.get()) {
                    // Discovered higher term, step down
                    stepDown(response.getTerm());
                }
                return false;
            })
            .exceptionally(e -> false);
    }
    
    private void becomeLeader() {
        log.info("Became leader for term {}", currentTerm.get());
        state.set(NodeState.LEADER);
        currentLeader.set(nodeId);
        
        // Cancel election timer
        if (electionTimer != null) {
            electionTimer.cancel(false);
        }
        
        // Start heartbeat
        scheduler.scheduleAtFixedRate(
            this::sendHeartbeats,
            0,
            HEARTBEAT_INTERVAL,
            TimeUnit.MILLISECONDS
        );
    }
    
    private void sendHeartbeats() {
        if (state.get() != NodeState.LEADER) {
            return;
        }
        
        long term = currentTerm.get();
        
        clusterNodes.stream()
            .filter(node -> !node.equals(nodeId))
            .forEach(node -> {
                AppendEntriesRequest heartbeat = new AppendEntriesRequest(
                    nodeId, term, Collections.emptyList()
                );
                
                rpcClient.sendAppendEntries(node, heartbeat)
                    .whenComplete((response, error) -> {
                        if (response != null && response.getTerm() > term) {
                            stepDown(response.getTerm());
                        }
                    });
            });
    }
    
    public void handleAppendEntries(AppendEntriesRequest request) {
        if (request.getTerm() >= currentTerm.get()) {
            currentTerm.set(request.getTerm());
            currentLeader.set(request.getLeaderId());
            state.set(NodeState.FOLLOWER);
            resetElectionTimer();
        }
    }
    
    private void stepDown(long newTerm) {
        log.info("Stepping down, discovered term {}", newTerm);
        currentTerm.set(newTerm);
        state.set(NodeState.FOLLOWER);
        currentLeader.set(null);
        resetElectionTimer();
    }
}

Example 2: Log Replication

public class RaftLog {
    
    private final List<LogEntry> entries = new ArrayList<>();
    private volatile long commitIndex = 0;
    private volatile long lastApplied = 0;
    
    // Leader state
    private final Map<String, Long> nextIndex = new ConcurrentHashMap<>();
    private final Map<String, Long> matchIndex = new ConcurrentHashMap<>();
    
    public synchronized long appendEntry(Object command, long term) {
        long index = entries.size();
        entries.add(new LogEntry(index, term, command));
        return index;
    }
    
    public synchronized void appendEntries(long prevLogIndex, long prevLogTerm, 
                                           List<LogEntry> newEntries) {
        // Check log consistency
        if (prevLogIndex >= 0) {
            if (prevLogIndex >= entries.size()) {
                throw new LogInconsistencyException("Missing entries");
            }
            if (entries.get((int) prevLogIndex).getTerm() != prevLogTerm) {
                throw new LogInconsistencyException("Term mismatch");
            }
        }
        
        // Append new entries (may overwrite conflicting entries)
        for (LogEntry entry : newEntries) {
            int index = (int) entry.getIndex();
            if (index < entries.size()) {
                if (entries.get(index).getTerm() != entry.getTerm()) {
                    // Remove conflicting entries
                    entries.subList(index, entries.size()).clear();
                    entries.add(entry);
                }
            } else {
                entries.add(entry);
            }
        }
    }
    
    public void replicateToFollower(String followerId, RpcClient rpcClient, 
                                    long currentTerm, String leaderId) {
        long next = nextIndex.getOrDefault(followerId, entries.size());
        long prevIndex = next - 1;
        long prevTerm = prevIndex >= 0 ? entries.get((int) prevIndex).getTerm() : 0;
        
        List<LogEntry> entriesToSend = new ArrayList<>();
        for (long i = next; i < entries.size(); i++) {
            entriesToSend.add(entries.get((int) i));
        }
        
        AppendEntriesRequest request = new AppendEntriesRequest(
            leaderId, currentTerm, prevIndex, prevTerm, 
            entriesToSend, commitIndex
        );
        
        rpcClient.sendAppendEntries(followerId, request)
            .thenAccept(response -> {
                if (response.isSuccess()) {
                    // Update indices on success
                    long lastSent = next + entriesToSend.size() - 1;
                    nextIndex.put(followerId, lastSent + 1);
                    matchIndex.put(followerId, lastSent);
                    
                    // Try to advance commit index
                    maybeAdvanceCommitIndex(currentTerm);
                } else {
                    // Decrement nextIndex and retry
                    nextIndex.compute(followerId, (k, v) -> Math.max(0, v - 1));
                }
            });
    }
    
    private synchronized void maybeAdvanceCommitIndex(long currentTerm) {
        // Find highest index replicated on majority
        for (long n = entries.size() - 1; n > commitIndex; n--) {
            if (entries.get((int) n).getTerm() != currentTerm) {
                continue; // Only commit current term entries
            }
            
            int replicatedCount = 1; // Self
            for (long matchIdx : matchIndex.values()) {
                if (matchIdx >= n) {
                    replicatedCount++;
                }
            }
            
            int majority = (matchIndex.size() + 1) / 2 + 1;
            if (replicatedCount >= majority) {
                commitIndex = n;
                applyCommittedEntries();
                break;
            }
        }
    }
    
    private void applyCommittedEntries() {
        while (lastApplied < commitIndex) {
            lastApplied++;
            LogEntry entry = entries.get((int) lastApplied);
            stateMachine.apply(entry.getCommand());
        }
    }
}

@Value
class LogEntry {
    long index;
    long term;
    Object command;
}

Example 3: Distributed Lock with Consensus

public class ConsensusBasedLock {
    
    private final String lockName;
    private final EtcdClient etcdClient;
    private final String nodeId;
    
    private volatile long leaseId;
    private volatile boolean held;
    
    public ConsensusBasedLock(String lockName, EtcdClient etcdClient) {
        this.lockName = lockName;
        this.etcdClient = etcdClient;
        this.nodeId = UUID.randomUUID().toString();
    }
    
    public boolean tryLock(Duration timeout) throws Exception {
        // Create lease for automatic release on failure
        LeaseGrantResponse lease = etcdClient.getLeaseClient()
            .grant(timeout.toSeconds())
            .get();
        this.leaseId = lease.getID();
        
        // Try to acquire lock using compare-and-swap
        String lockKey = "/locks/" + lockName;
        
        TxnResponse txn = etcdClient.getKVClient().txn()
            .If(new Cmp(
                ByteSequence.from(lockKey, StandardCharsets.UTF_8),
                Cmp.Op.EQUAL,
                CmpTarget.version(0) // Key doesn't exist
            ))
            .Then(Op.put(
                ByteSequence.from(lockKey, StandardCharsets.UTF_8),
                ByteSequence.from(nodeId, StandardCharsets.UTF_8),
                PutOption.newBuilder().withLeaseId(leaseId).build()
            ))
            .commit()
            .get();
        
        if (txn.isSucceeded()) {
            held = true;
            startKeepAlive();
            return true;
        }
        
        // Lock held by another node, optionally wait
        return waitForLock(lockKey, timeout);
    }
    
    private boolean waitForLock(String lockKey, Duration timeout) throws Exception {
        long deadline = System.currentTimeMillis() + timeout.toMillis();
        
        while (System.currentTimeMillis() < deadline) {
            // Watch for deletion
            Watch.Watcher watcher = etcdClient.getWatchClient().watch(
                ByteSequence.from(lockKey, StandardCharsets.UTF_8),
                WatchOption.newBuilder().withNoDelete(false).build(),
                response -> {
                    for (WatchEvent event : response.getEvents()) {
                        if (event.getEventType() == WatchEvent.EventType.DELETE) {
                            // Try to acquire again
                        }
                    }
                }
            );
            
            // Try again
            if (tryAcquire(lockKey)) {
                held = true;
                return true;
            }
            
            Thread.sleep(100);
        }
        
        return false;
    }
    
    private void startKeepAlive() {
        etcdClient.getLeaseClient().keepAlive(
            leaseId,
            new StreamObserver<LeaseKeepAliveResponse>() {
                @Override
                public void onNext(LeaseKeepAliveResponse response) {
                    // Lease renewed
                }
                
                @Override
                public void onError(Throwable t) {
                    log.error("Lease keep-alive failed", t);
                    held = false;
                }
                
                @Override
                public void onCompleted() {
                    held = false;
                }
            }
        );
    }
    
    public void unlock() {
        if (!held) {
            return;
        }
        
        try {
            String lockKey = "/locks/" + lockName;
            
            // Only delete if we still hold the lock
            etcdClient.getKVClient().txn()
                .If(new Cmp(
                    ByteSequence.from(lockKey, StandardCharsets.UTF_8),
                    Cmp.Op.EQUAL,
                    CmpTarget.value(ByteSequence.from(nodeId, StandardCharsets.UTF_8))
                ))
                .Then(Op.delete(
                    ByteSequence.from(lockKey, StandardCharsets.UTF_8),
                    DeleteOption.DEFAULT
                ))
                .commit()
                .get();
                
            etcdClient.getLeaseClient().revoke(leaseId).get();
            
        } catch (Exception e) {
            log.error("Failed to release lock", e);
        } finally {
            held = false;
        }
    }
}

Example 4: Quorum Read/Write

public class QuorumReplicator<T> {
    
    private final List<ReplicaClient> replicas;
    private final int writeQuorum;
    private final int readQuorum;
    
    public QuorumReplicator(List<ReplicaClient> replicas) {
        this.replicas = replicas;
        int n = replicas.size();
        // Ensure read + write quorum > n for consistency
        this.writeQuorum = n / 2 + 1;
        this.readQuorum = n / 2 + 1;
    }
    
    public CompletableFuture<Void> write(String key, T value, long version) {
        WriteRequest<T> request = new WriteRequest<>(key, value, version);
        
        List<CompletableFuture<WriteResponse>> futures = replicas.stream()
            .map(replica -> replica.write(request)
                .exceptionally(e -> WriteResponse.failed(e)))
            .collect(Collectors.toList());
        
        return waitForQuorum(futures, writeQuorum)
            .thenAccept(responses -> {
                long successCount = responses.stream()
                    .filter(WriteResponse::isSuccess)
                    .count();
                
                if (successCount < writeQuorum) {
                    throw new QuorumNotReachedException(
                        "Write quorum not reached: " + successCount + "/" + writeQuorum
                    );
                }
            });
    }
    
    public CompletableFuture<VersionedValue<T>> read(String key) {
        ReadRequest request = new ReadRequest(key);
        
        List<CompletableFuture<ReadResponse<T>>> futures = replicas.stream()
            .map(replica -> replica.read(request)
                .exceptionally(e -> ReadResponse.failed(e)))
            .collect(Collectors.toList());
        
        return waitForQuorum(futures, readQuorum)
            .thenApply(responses -> {
                List<ReadResponse<T>> successful = responses.stream()
                    .filter(ReadResponse::isSuccess)
                    .collect(Collectors.toList());
                
                if (successful.size() < readQuorum) {
                    throw new QuorumNotReachedException(
                        "Read quorum not reached: " + successful.size() + "/" + readQuorum
                    );
                }
                
                // Return value with highest version
                return successful.stream()
                    .map(ReadResponse::getValue)
                    .max(Comparator.comparing(VersionedValue::getVersion))
                    .orElseThrow();
            });
    }
    
    private <R> CompletableFuture<List<R>> waitForQuorum(
            List<CompletableFuture<R>> futures, int quorum) {
        
        List<R> results = new CopyOnWriteArrayList<>();
        CompletableFuture<List<R>> quorumFuture = new CompletableFuture<>();
        AtomicInteger completed = new AtomicInteger(0);
        
        for (CompletableFuture<R> future : futures) {
            future.whenComplete((result, error) -> {
                if (result != null) {
                    results.add(result);
                }
                
                int count = completed.incrementAndGet();
                
                if (results.size() >= quorum && !quorumFuture.isDone()) {
                    quorumFuture.complete(new ArrayList<>(results));
                } else if (count == futures.size() && !quorumFuture.isDone()) {
                    quorumFuture.complete(new ArrayList<>(results));
                }
            });
        }
        
        return quorumFuture;
    }
}

@Value
class VersionedValue<T> {
    T value;
    long version;
    long timestamp;
}

Example 5: Consensus State Machine

public interface StateMachine {
    Object apply(Command command);
    byte[] snapshot();
    void restore(byte[] snapshot);
}

public class KeyValueStateMachine implements StateMachine {
    
    private final ConcurrentMap<String, VersionedValue<byte[]>> store = 
        new ConcurrentHashMap<>();
    
    @Override
    public Object apply(Command command) {
        if (command instanceof PutCommand put) {
            long version = System.currentTimeMillis();
            VersionedValue<byte[]> value = new VersionedValue<>(
                put.getValue(), version, System.currentTimeMillis()
            );
            store.put(put.getKey(), value);
            return version;
            
        } else if (command instanceof GetCommand get) {
            return store.get(get.getKey());
            
        } else if (command instanceof DeleteCommand delete) {
            return store.remove(delete.getKey());
            
        } else if (command instanceof CompareAndSwapCommand cas) {
            return store.compute(cas.getKey(), (k, current) -> {
                if (current == null && cas.getExpectedVersion() == 0) {
                    return new VersionedValue<>(
                        cas.getNewValue(), 1, System.currentTimeMillis()
                    );
                }
                if (current != null && current.getVersion() == cas.getExpectedVersion()) {
                    return new VersionedValue<>(
                        cas.getNewValue(), 
                        current.getVersion() + 1, 
                        System.currentTimeMillis()
                    );
                }
                return current; // CAS failed
            });
        }
        
        throw new UnsupportedOperationException("Unknown command: " + command);
    }
    
    @Override
    public byte[] snapshot() {
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeObject(new HashMap<>(store));
            return baos.toByteArray();
        } catch (IOException e) {
            throw new RuntimeException("Failed to create snapshot", e);
        }
    }
    
    @Override
    public void restore(byte[] snapshot) {
        try {
            ObjectInputStream ois = new ObjectInputStream(
                new ByteArrayInputStream(snapshot)
            );
            @SuppressWarnings("unchecked")
            Map<String, VersionedValue<byte[]>> restored = 
                (Map<String, VersionedValue<byte[]>>) ois.readObject();
            store.clear();
            store.putAll(restored);
        } catch (Exception e) {
            throw new RuntimeException("Failed to restore snapshot", e);
        }
    }
}

Anti-Patterns

❌ Implementing Consensus from Scratch

// WRONG - home-grown consensus is error-prone
public class MyConsensus {
    // This will have bugs!
}

// ✅ CORRECT - use proven implementations
// etcd, ZooKeeper, Consul for coordination
// Raft implementations: Apache Ratis, JRaft

❌ Even Number of Nodes

3 nodes: tolerate 1 failure, need 2 for quorum 4 nodes: tolerate 1 failure, need 3 for quorum (same tolerance, more nodes)


References