DI
Consensus
Distributed Systems core v1.0.0
Distributed Consensus
Overview
Distributed consensus enables multiple nodes to agree on a single value or sequence of values despite failures. This skill covers consensus algorithms (Raft, Paxos), leader election, and quorum-based systems.
Key Concepts
Consensus Problem
┌─────────────────────────────────────────────────────────────┐
│ Distributed Consensus │
├─────────────────────────────────────────────────────────────┤
│ │
│ The Problem: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ How do N nodes agree on a value when: │ │
│ │ • Nodes may crash │ │
│ │ • Network may partition │ │
│ │ • Messages may be delayed/reordered │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Raft Algorithm (Leader-Based): │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌────────┐ votes ┌────────┐ │ │
│ │ │Follower│◄────────────▶│Follower│ │ │
│ │ └────┬───┘ └───┬────┘ │ │
│ │ │ │ │ │
│ │ │ ┌────────┐ │ │ │
│ │ └───▶│ Leader │◄───────┘ │ │
│ │ └────┬───┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Log Replication (AppendEntries) │ │
│ │ │ │
│ │ States: Follower ──▶ Candidate ──▶ Leader │ │
│ │ election timeout wins majority │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Quorum: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ N nodes, majority quorum = ⌊N/2⌋ + 1 │ │
│ │ │ │
│ │ 3 nodes: need 2 for quorum (tolerate 1 failure) │ │
│ │ 5 nodes: need 3 for quorum (tolerate 2 failures) │ │
│ │ 7 nodes: need 4 for quorum (tolerate 3 failures) │ │
│ │ │ │
│ │ Any two quorums overlap = consistency guaranteed │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Consensus Properties
| Property | Description |
|---|---|
| Agreement | All correct nodes decide on the same value |
| Validity | Decided value was proposed by some node |
| Termination | All correct nodes eventually decide |
| Integrity | Each node decides at most once |
Best Practices
1. Use Odd Number of Nodes
Odd numbers (3, 5, 7) optimize quorum efficiency.
2. Configure Appropriate Timeouts
Balance between failure detection speed and false positives.
3. Monitor Raft Metrics
Track leader changes, commit latency, and log size.
4. Plan for Network Partitions
Design for split-brain scenarios.
5. Use Existing Implementations
Don’t implement consensus from scratch in production.
Code Examples
Example 1: Leader Election
public class LeaderElection {
private final String nodeId;
private final List<String> clusterNodes;
private final AtomicReference<String> currentLeader = new AtomicReference<>();
private final AtomicLong currentTerm = new AtomicLong(0);
private final AtomicReference<NodeState> state = new AtomicReference<>(NodeState.FOLLOWER);
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
private volatile ScheduledFuture<?> electionTimer;
private static final long ELECTION_TIMEOUT_MIN = 150;
private static final long ELECTION_TIMEOUT_MAX = 300;
private static final long HEARTBEAT_INTERVAL = 50;
enum NodeState {
FOLLOWER, CANDIDATE, LEADER
}
public void start() {
resetElectionTimer();
}
private void resetElectionTimer() {
if (electionTimer != null) {
electionTimer.cancel(false);
}
// Random timeout to prevent split votes
long timeout = ELECTION_TIMEOUT_MIN +
ThreadLocalRandom.current().nextLong(ELECTION_TIMEOUT_MAX - ELECTION_TIMEOUT_MIN);
electionTimer = scheduler.schedule(
this::startElection,
timeout,
TimeUnit.MILLISECONDS
);
}
private void startElection() {
// Transition to candidate
state.set(NodeState.CANDIDATE);
long newTerm = currentTerm.incrementAndGet();
log.info("Starting election for term {}", newTerm);
// Vote for self
AtomicInteger votes = new AtomicInteger(1);
int majority = (clusterNodes.size() / 2) + 1;
// Request votes from other nodes
CompletableFuture<?>[] voteFutures = clusterNodes.stream()
.filter(node -> !node.equals(nodeId))
.map(node -> requestVote(node, newTerm))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(voteFutures)
.orTimeout(ELECTION_TIMEOUT_MIN, TimeUnit.MILLISECONDS)
.whenComplete((result, error) -> {
if (state.get() == NodeState.CANDIDATE && currentTerm.get() == newTerm) {
if (votes.get() >= majority) {
becomeLeader();
} else {
// Failed to get majority, restart election
state.set(NodeState.FOLLOWER);
resetElectionTimer();
}
}
});
}
private CompletableFuture<Boolean> requestVote(String node, long term) {
// Send vote request to node
VoteRequest request = new VoteRequest(nodeId, term);
return rpcClient.sendVoteRequest(node, request)
.thenApply(response -> {
if (response.isGranted()) {
votes.incrementAndGet();
return true;
}
if (response.getTerm() > currentTerm.get()) {
// Discovered higher term, step down
stepDown(response.getTerm());
}
return false;
})
.exceptionally(e -> false);
}
private void becomeLeader() {
log.info("Became leader for term {}", currentTerm.get());
state.set(NodeState.LEADER);
currentLeader.set(nodeId);
// Cancel election timer
if (electionTimer != null) {
electionTimer.cancel(false);
}
// Start heartbeat
scheduler.scheduleAtFixedRate(
this::sendHeartbeats,
0,
HEARTBEAT_INTERVAL,
TimeUnit.MILLISECONDS
);
}
private void sendHeartbeats() {
if (state.get() != NodeState.LEADER) {
return;
}
long term = currentTerm.get();
clusterNodes.stream()
.filter(node -> !node.equals(nodeId))
.forEach(node -> {
AppendEntriesRequest heartbeat = new AppendEntriesRequest(
nodeId, term, Collections.emptyList()
);
rpcClient.sendAppendEntries(node, heartbeat)
.whenComplete((response, error) -> {
if (response != null && response.getTerm() > term) {
stepDown(response.getTerm());
}
});
});
}
public void handleAppendEntries(AppendEntriesRequest request) {
if (request.getTerm() >= currentTerm.get()) {
currentTerm.set(request.getTerm());
currentLeader.set(request.getLeaderId());
state.set(NodeState.FOLLOWER);
resetElectionTimer();
}
}
private void stepDown(long newTerm) {
log.info("Stepping down, discovered term {}", newTerm);
currentTerm.set(newTerm);
state.set(NodeState.FOLLOWER);
currentLeader.set(null);
resetElectionTimer();
}
}
Example 2: Log Replication
public class RaftLog {
private final List<LogEntry> entries = new ArrayList<>();
private volatile long commitIndex = 0;
private volatile long lastApplied = 0;
// Leader state
private final Map<String, Long> nextIndex = new ConcurrentHashMap<>();
private final Map<String, Long> matchIndex = new ConcurrentHashMap<>();
public synchronized long appendEntry(Object command, long term) {
long index = entries.size();
entries.add(new LogEntry(index, term, command));
return index;
}
public synchronized void appendEntries(long prevLogIndex, long prevLogTerm,
List<LogEntry> newEntries) {
// Check log consistency
if (prevLogIndex >= 0) {
if (prevLogIndex >= entries.size()) {
throw new LogInconsistencyException("Missing entries");
}
if (entries.get((int) prevLogIndex).getTerm() != prevLogTerm) {
throw new LogInconsistencyException("Term mismatch");
}
}
// Append new entries (may overwrite conflicting entries)
for (LogEntry entry : newEntries) {
int index = (int) entry.getIndex();
if (index < entries.size()) {
if (entries.get(index).getTerm() != entry.getTerm()) {
// Remove conflicting entries
entries.subList(index, entries.size()).clear();
entries.add(entry);
}
} else {
entries.add(entry);
}
}
}
public void replicateToFollower(String followerId, RpcClient rpcClient,
long currentTerm, String leaderId) {
long next = nextIndex.getOrDefault(followerId, entries.size());
long prevIndex = next - 1;
long prevTerm = prevIndex >= 0 ? entries.get((int) prevIndex).getTerm() : 0;
List<LogEntry> entriesToSend = new ArrayList<>();
for (long i = next; i < entries.size(); i++) {
entriesToSend.add(entries.get((int) i));
}
AppendEntriesRequest request = new AppendEntriesRequest(
leaderId, currentTerm, prevIndex, prevTerm,
entriesToSend, commitIndex
);
rpcClient.sendAppendEntries(followerId, request)
.thenAccept(response -> {
if (response.isSuccess()) {
// Update indices on success
long lastSent = next + entriesToSend.size() - 1;
nextIndex.put(followerId, lastSent + 1);
matchIndex.put(followerId, lastSent);
// Try to advance commit index
maybeAdvanceCommitIndex(currentTerm);
} else {
// Decrement nextIndex and retry
nextIndex.compute(followerId, (k, v) -> Math.max(0, v - 1));
}
});
}
private synchronized void maybeAdvanceCommitIndex(long currentTerm) {
// Find highest index replicated on majority
for (long n = entries.size() - 1; n > commitIndex; n--) {
if (entries.get((int) n).getTerm() != currentTerm) {
continue; // Only commit current term entries
}
int replicatedCount = 1; // Self
for (long matchIdx : matchIndex.values()) {
if (matchIdx >= n) {
replicatedCount++;
}
}
int majority = (matchIndex.size() + 1) / 2 + 1;
if (replicatedCount >= majority) {
commitIndex = n;
applyCommittedEntries();
break;
}
}
}
private void applyCommittedEntries() {
while (lastApplied < commitIndex) {
lastApplied++;
LogEntry entry = entries.get((int) lastApplied);
stateMachine.apply(entry.getCommand());
}
}
}
@Value
class LogEntry {
long index;
long term;
Object command;
}
Example 3: Distributed Lock with Consensus
public class ConsensusBasedLock {
private final String lockName;
private final EtcdClient etcdClient;
private final String nodeId;
private volatile long leaseId;
private volatile boolean held;
public ConsensusBasedLock(String lockName, EtcdClient etcdClient) {
this.lockName = lockName;
this.etcdClient = etcdClient;
this.nodeId = UUID.randomUUID().toString();
}
public boolean tryLock(Duration timeout) throws Exception {
// Create lease for automatic release on failure
LeaseGrantResponse lease = etcdClient.getLeaseClient()
.grant(timeout.toSeconds())
.get();
this.leaseId = lease.getID();
// Try to acquire lock using compare-and-swap
String lockKey = "/locks/" + lockName;
TxnResponse txn = etcdClient.getKVClient().txn()
.If(new Cmp(
ByteSequence.from(lockKey, StandardCharsets.UTF_8),
Cmp.Op.EQUAL,
CmpTarget.version(0) // Key doesn't exist
))
.Then(Op.put(
ByteSequence.from(lockKey, StandardCharsets.UTF_8),
ByteSequence.from(nodeId, StandardCharsets.UTF_8),
PutOption.newBuilder().withLeaseId(leaseId).build()
))
.commit()
.get();
if (txn.isSucceeded()) {
held = true;
startKeepAlive();
return true;
}
// Lock held by another node, optionally wait
return waitForLock(lockKey, timeout);
}
private boolean waitForLock(String lockKey, Duration timeout) throws Exception {
long deadline = System.currentTimeMillis() + timeout.toMillis();
while (System.currentTimeMillis() < deadline) {
// Watch for deletion
Watch.Watcher watcher = etcdClient.getWatchClient().watch(
ByteSequence.from(lockKey, StandardCharsets.UTF_8),
WatchOption.newBuilder().withNoDelete(false).build(),
response -> {
for (WatchEvent event : response.getEvents()) {
if (event.getEventType() == WatchEvent.EventType.DELETE) {
// Try to acquire again
}
}
}
);
// Try again
if (tryAcquire(lockKey)) {
held = true;
return true;
}
Thread.sleep(100);
}
return false;
}
private void startKeepAlive() {
etcdClient.getLeaseClient().keepAlive(
leaseId,
new StreamObserver<LeaseKeepAliveResponse>() {
@Override
public void onNext(LeaseKeepAliveResponse response) {
// Lease renewed
}
@Override
public void onError(Throwable t) {
log.error("Lease keep-alive failed", t);
held = false;
}
@Override
public void onCompleted() {
held = false;
}
}
);
}
public void unlock() {
if (!held) {
return;
}
try {
String lockKey = "/locks/" + lockName;
// Only delete if we still hold the lock
etcdClient.getKVClient().txn()
.If(new Cmp(
ByteSequence.from(lockKey, StandardCharsets.UTF_8),
Cmp.Op.EQUAL,
CmpTarget.value(ByteSequence.from(nodeId, StandardCharsets.UTF_8))
))
.Then(Op.delete(
ByteSequence.from(lockKey, StandardCharsets.UTF_8),
DeleteOption.DEFAULT
))
.commit()
.get();
etcdClient.getLeaseClient().revoke(leaseId).get();
} catch (Exception e) {
log.error("Failed to release lock", e);
} finally {
held = false;
}
}
}
Example 4: Quorum Read/Write
public class QuorumReplicator<T> {
private final List<ReplicaClient> replicas;
private final int writeQuorum;
private final int readQuorum;
public QuorumReplicator(List<ReplicaClient> replicas) {
this.replicas = replicas;
int n = replicas.size();
// Ensure read + write quorum > n for consistency
this.writeQuorum = n / 2 + 1;
this.readQuorum = n / 2 + 1;
}
public CompletableFuture<Void> write(String key, T value, long version) {
WriteRequest<T> request = new WriteRequest<>(key, value, version);
List<CompletableFuture<WriteResponse>> futures = replicas.stream()
.map(replica -> replica.write(request)
.exceptionally(e -> WriteResponse.failed(e)))
.collect(Collectors.toList());
return waitForQuorum(futures, writeQuorum)
.thenAccept(responses -> {
long successCount = responses.stream()
.filter(WriteResponse::isSuccess)
.count();
if (successCount < writeQuorum) {
throw new QuorumNotReachedException(
"Write quorum not reached: " + successCount + "/" + writeQuorum
);
}
});
}
public CompletableFuture<VersionedValue<T>> read(String key) {
ReadRequest request = new ReadRequest(key);
List<CompletableFuture<ReadResponse<T>>> futures = replicas.stream()
.map(replica -> replica.read(request)
.exceptionally(e -> ReadResponse.failed(e)))
.collect(Collectors.toList());
return waitForQuorum(futures, readQuorum)
.thenApply(responses -> {
List<ReadResponse<T>> successful = responses.stream()
.filter(ReadResponse::isSuccess)
.collect(Collectors.toList());
if (successful.size() < readQuorum) {
throw new QuorumNotReachedException(
"Read quorum not reached: " + successful.size() + "/" + readQuorum
);
}
// Return value with highest version
return successful.stream()
.map(ReadResponse::getValue)
.max(Comparator.comparing(VersionedValue::getVersion))
.orElseThrow();
});
}
private <R> CompletableFuture<List<R>> waitForQuorum(
List<CompletableFuture<R>> futures, int quorum) {
List<R> results = new CopyOnWriteArrayList<>();
CompletableFuture<List<R>> quorumFuture = new CompletableFuture<>();
AtomicInteger completed = new AtomicInteger(0);
for (CompletableFuture<R> future : futures) {
future.whenComplete((result, error) -> {
if (result != null) {
results.add(result);
}
int count = completed.incrementAndGet();
if (results.size() >= quorum && !quorumFuture.isDone()) {
quorumFuture.complete(new ArrayList<>(results));
} else if (count == futures.size() && !quorumFuture.isDone()) {
quorumFuture.complete(new ArrayList<>(results));
}
});
}
return quorumFuture;
}
}
@Value
class VersionedValue<T> {
T value;
long version;
long timestamp;
}
Example 5: Consensus State Machine
public interface StateMachine {
Object apply(Command command);
byte[] snapshot();
void restore(byte[] snapshot);
}
public class KeyValueStateMachine implements StateMachine {
private final ConcurrentMap<String, VersionedValue<byte[]>> store =
new ConcurrentHashMap<>();
@Override
public Object apply(Command command) {
if (command instanceof PutCommand put) {
long version = System.currentTimeMillis();
VersionedValue<byte[]> value = new VersionedValue<>(
put.getValue(), version, System.currentTimeMillis()
);
store.put(put.getKey(), value);
return version;
} else if (command instanceof GetCommand get) {
return store.get(get.getKey());
} else if (command instanceof DeleteCommand delete) {
return store.remove(delete.getKey());
} else if (command instanceof CompareAndSwapCommand cas) {
return store.compute(cas.getKey(), (k, current) -> {
if (current == null && cas.getExpectedVersion() == 0) {
return new VersionedValue<>(
cas.getNewValue(), 1, System.currentTimeMillis()
);
}
if (current != null && current.getVersion() == cas.getExpectedVersion()) {
return new VersionedValue<>(
cas.getNewValue(),
current.getVersion() + 1,
System.currentTimeMillis()
);
}
return current; // CAS failed
});
}
throw new UnsupportedOperationException("Unknown command: " + command);
}
@Override
public byte[] snapshot() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(new HashMap<>(store));
return baos.toByteArray();
} catch (IOException e) {
throw new RuntimeException("Failed to create snapshot", e);
}
}
@Override
public void restore(byte[] snapshot) {
try {
ObjectInputStream ois = new ObjectInputStream(
new ByteArrayInputStream(snapshot)
);
@SuppressWarnings("unchecked")
Map<String, VersionedValue<byte[]>> restored =
(Map<String, VersionedValue<byte[]>>) ois.readObject();
store.clear();
store.putAll(restored);
} catch (Exception e) {
throw new RuntimeException("Failed to restore snapshot", e);
}
}
}
Anti-Patterns
❌ Implementing Consensus from Scratch
// WRONG - home-grown consensus is error-prone
public class MyConsensus {
// This will have bugs!
}
// ✅ CORRECT - use proven implementations
// etcd, ZooKeeper, Consul for coordination
// Raft implementations: Apache Ratis, JRaft
❌ Even Number of Nodes
3 nodes: tolerate 1 failure, need 2 for quorum 4 nodes: tolerate 1 failure, need 3 for quorum (same tolerance, more nodes)