DI
Consistency Models
Distributed Systems core v1.0.0
Consistency Models
Overview
Consistency models define the guarantees a distributed system provides about the order and visibility of operations. This skill covers strong consistency, eventual consistency, causal consistency, and the tradeoffs between them.
Key Concepts
Consistency Spectrum
┌─────────────────────────────────────────────────────────────┐
│ Consistency Spectrum │
├─────────────────────────────────────────────────────────────┤
│ │
│ Stronger ◄────────────────────────────────────▶ Weaker │
│ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌───────┐ │
│ │Linearizable│ │ Sequential │ │ Causal │ │Eventual│ │
│ │ │ │ │ │ │ │ │ │
│ │• Real-time │ │• Program │ │• Respects │ │• All │ │
│ │ ordering │ │ order │ │ causality │ │ replicas│
│ │• Single │ │• Global │ │• Partial │ │ converge│
│ │ copy │ │ order │ │ order │ │ │ │
│ └────────────┘ └────────────┘ └────────────┘ └───────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ Latency +++ Latency ++ Latency + Latency │
│ Availability - Availability - Availability + Avail +++ │
│ │
│ CAP Theorem: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ During network partition, choose: │ │
│ │ │ │
│ │ Consistency (CP) Availability (AP) │ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Reject ops │ │ Accept ops │ │ │
│ │ │ until healed│ │ may diverge │ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ │ Examples: │ │
│ │ CP: ZooKeeper, etcd AP: Cassandra, DynamoDB │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
PACELC Theorem
| Scenario | Choice | Examples |
|---|---|---|
| Partition | Consistency (PC) | HBase, VoltDB |
| Partition | Availability (PA) | Cassandra, DynamoDB |
| Else (normal) | Latency (EL) | Cassandra |
| Else (normal) | Consistency (EC) | PAXOS stores |
Best Practices
1. Match Consistency to Use Case
Bank transactions need strong; social feeds can be eventual.
2. Use Read-Your-Writes
Users should see their own updates immediately.
3. Consider Session Consistency
Maintain consistency within a user session.
4. Document Consistency Guarantees
Make explicit what clients can expect.
5. Test with Chaos Engineering
Verify behavior under partition scenarios.
Code Examples
Example 1: Read-Your-Writes Consistency
public class ReadYourWritesClient {
private final List<ReplicaClient> replicas;
private final ThreadLocal<Long> lastWriteTimestamp = new ThreadLocal<>();
/**
* Write to primary and wait for acknowledgment
*/
public WriteResult write(String key, byte[] value) {
WriteRequest request = new WriteRequest(key, value, System.currentTimeMillis());
// Write to primary
ReplicaClient primary = getPrimary();
WriteResult result = primary.write(request);
// Track timestamp for read-your-writes
lastWriteTimestamp.set(result.getTimestamp());
return result;
}
/**
* Read with minimum timestamp guarantee
*/
public ReadResult read(String key) {
Long minTimestamp = lastWriteTimestamp.get();
// Try each replica until we find one with fresh enough data
for (ReplicaClient replica : getReadReplicas()) {
try {
ReadResult result = replica.read(key);
if (minTimestamp == null || result.getTimestamp() >= minTimestamp) {
return result;
}
// This replica is stale, try another
} catch (Exception e) {
// Replica unavailable, try another
}
}
// Fall back to primary
return getPrimary().read(key);
}
/**
* Alternative: Include version in response for client-side validation
*/
public ReadResult readWithVersion(String key, Long expectedMinVersion) {
for (ReplicaClient replica : getReadReplicas()) {
ReadResult result = replica.read(key);
if (expectedMinVersion == null || result.getVersion() >= expectedMinVersion) {
return result;
}
}
throw new StaleReadException("No replica has required version: " + expectedMinVersion);
}
}
Example 2: Causal Consistency with Vector Clocks
public class VectorClock implements Comparable<VectorClock> {
private final Map<String, Long> clock;
public VectorClock() {
this.clock = new ConcurrentHashMap<>();
}
public VectorClock(Map<String, Long> clock) {
this.clock = new ConcurrentHashMap<>(clock);
}
public void increment(String nodeId) {
clock.merge(nodeId, 1L, Long::sum);
}
public void merge(VectorClock other) {
other.clock.forEach((node, time) ->
clock.merge(node, time, Math::max)
);
}
/**
* Check if this clock happened-before other
*/
public boolean happenedBefore(VectorClock other) {
boolean atLeastOneLess = false;
Set<String> allNodes = new HashSet<>(clock.keySet());
allNodes.addAll(other.clock.keySet());
for (String node : allNodes) {
long thisTime = clock.getOrDefault(node, 0L);
long otherTime = other.clock.getOrDefault(node, 0L);
if (thisTime > otherTime) {
return false; // This is not before other
}
if (thisTime < otherTime) {
atLeastOneLess = true;
}
}
return atLeastOneLess;
}
/**
* Check if clocks are concurrent (neither happened-before the other)
*/
public boolean isConcurrentWith(VectorClock other) {
return !this.happenedBefore(other) && !other.happenedBefore(this);
}
public Map<String, Long> toMap() {
return new HashMap<>(clock);
}
}
public class CausallyConsistentStore {
private final String nodeId;
private final Map<String, VersionedValue> store = new ConcurrentHashMap<>();
private final VectorClock localClock;
public WriteResult write(String key, byte[] value, VectorClock clientClock) {
// Merge client's causal context
localClock.merge(clientClock);
// Increment for this write
localClock.increment(nodeId);
// Store with new clock
VectorClock writeTime = new VectorClock(localClock.toMap());
store.put(key, new VersionedValue(value, writeTime));
return new WriteResult(key, writeTime);
}
public ReadResult read(String key, VectorClock clientClock) {
// Wait until we have all causally preceding writes
waitForCausalDependencies(clientClock);
VersionedValue value = store.get(key);
// Merge clocks
if (value != null) {
localClock.merge(value.getClock());
}
return new ReadResult(value, new VectorClock(localClock.toMap()));
}
private void waitForCausalDependencies(VectorClock required) {
// Block until local clock >= required for all entries
while (!localClock.happenedBefore(required) &&
!localClock.equals(required)) {
// Wait for replication to catch up
Thread.sleep(10);
}
}
/**
* Handle concurrent writes - Last-Writer-Wins or merge
*/
public void handleReplicatedWrite(String key, byte[] value, VectorClock writeClock) {
store.compute(key, (k, existing) -> {
if (existing == null) {
return new VersionedValue(value, writeClock);
}
if (writeClock.happenedBefore(existing.getClock())) {
// Old write, ignore
return existing;
}
if (existing.getClock().happenedBefore(writeClock)) {
// New write, accept
return new VersionedValue(value, writeClock);
}
// Concurrent - need conflict resolution
return resolveConflict(existing, new VersionedValue(value, writeClock));
});
localClock.merge(writeClock);
}
}
Example 3: Eventual Consistency with CRDTs
/**
* Conflict-free Replicated Data Types
* Automatically converge without coordination
*/
public interface CRDT<T extends CRDT<T>> {
T merge(T other);
byte[] serialize();
}
/**
* G-Counter: Grow-only counter
*/
public class GCounter implements CRDT<GCounter> {
private final Map<String, Long> counts;
public GCounter() {
this.counts = new ConcurrentHashMap<>();
}
public void increment(String nodeId) {
counts.merge(nodeId, 1L, Long::sum);
}
public long value() {
return counts.values().stream().mapToLong(Long::longValue).sum();
}
@Override
public GCounter merge(GCounter other) {
GCounter merged = new GCounter();
Set<String> allNodes = new HashSet<>(counts.keySet());
allNodes.addAll(other.counts.keySet());
for (String node : allNodes) {
long max = Math.max(
counts.getOrDefault(node, 0L),
other.counts.getOrDefault(node, 0L)
);
merged.counts.put(node, max);
}
return merged;
}
}
/**
* LWW-Register: Last-Writer-Wins Register
*/
public class LWWRegister<T> implements CRDT<LWWRegister<T>> {
private T value;
private long timestamp;
private String nodeId; // Tie-breaker
public void set(T value, long timestamp, String nodeId) {
if (timestamp > this.timestamp ||
(timestamp == this.timestamp && nodeId.compareTo(this.nodeId) > 0)) {
this.value = value;
this.timestamp = timestamp;
this.nodeId = nodeId;
}
}
public T get() {
return value;
}
@Override
public LWWRegister<T> merge(LWWRegister<T> other) {
LWWRegister<T> merged = new LWWRegister<>();
if (this.timestamp > other.timestamp ||
(this.timestamp == other.timestamp &&
this.nodeId.compareTo(other.nodeId) > 0)) {
merged.value = this.value;
merged.timestamp = this.timestamp;
merged.nodeId = this.nodeId;
} else {
merged.value = other.value;
merged.timestamp = other.timestamp;
merged.nodeId = other.nodeId;
}
return merged;
}
}
/**
* OR-Set: Observed-Remove Set
*/
public class ORSet<E> implements CRDT<ORSet<E>> {
// Each element has unique tags for each add operation
private final Map<E, Set<UUID>> elements = new ConcurrentHashMap<>();
private final Set<UUID> tombstones = ConcurrentHashMap.newKeySet();
public void add(E element) {
UUID tag = UUID.randomUUID();
elements.computeIfAbsent(element, k -> ConcurrentHashMap.newKeySet()).add(tag);
}
public void remove(E element) {
Set<UUID> tags = elements.get(element);
if (tags != null) {
tombstones.addAll(tags);
tags.clear();
}
}
public boolean contains(E element) {
Set<UUID> tags = elements.get(element);
if (tags == null) return false;
return tags.stream().anyMatch(tag -> !tombstones.contains(tag));
}
public Set<E> elements() {
return elements.entrySet().stream()
.filter(e -> e.getValue().stream().anyMatch(tag -> !tombstones.contains(tag)))
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
}
@Override
public ORSet<E> merge(ORSet<E> other) {
ORSet<E> merged = new ORSet<>();
// Merge all elements and tags
Set<E> allElements = new HashSet<>(elements.keySet());
allElements.addAll(other.elements.keySet());
for (E element : allElements) {
Set<UUID> mergedTags = new HashSet<>();
if (elements.containsKey(element)) {
mergedTags.addAll(elements.get(element));
}
if (other.elements.containsKey(element)) {
mergedTags.addAll(other.elements.get(element));
}
if (!mergedTags.isEmpty()) {
merged.elements.put(element, ConcurrentHashMap.newKeySet());
merged.elements.get(element).addAll(mergedTags);
}
}
// Merge tombstones
merged.tombstones.addAll(tombstones);
merged.tombstones.addAll(other.tombstones);
return merged;
}
}
Example 4: Session Consistency
public class SessionConsistentClient {
private final ReplicaRouter router;
private final String sessionId;
// Track read and write bounds for this session
private volatile long lastWriteSequence = 0;
private volatile String lastWriteReplica = null;
public SessionConsistentClient(ReplicaRouter router) {
this.router = router;
this.sessionId = UUID.randomUUID().toString();
}
public WriteResult write(String key, byte[] value) {
ReplicaClient primary = router.getPrimary();
WriteRequest request = WriteRequest.builder()
.key(key)
.value(value)
.sessionId(sessionId)
.build();
WriteResult result = primary.write(request);
// Track for future reads
lastWriteSequence = result.getSequence();
lastWriteReplica = primary.getId();
return result;
}
public ReadResult read(String key) {
ReadRequest request = ReadRequest.builder()
.key(key)
.sessionId(sessionId)
.minSequence(lastWriteSequence) // Read-your-writes
.preferredReplica(lastWriteReplica) // Sticky session
.build();
// Try sticky replica first
if (lastWriteReplica != null) {
try {
ReplicaClient sticky = router.getReplica(lastWriteReplica);
ReadResult result = sticky.readWithMinSequence(request);
if (result.getSequence() >= lastWriteSequence) {
return result;
}
} catch (Exception e) {
// Sticky replica unavailable
}
}
// Fall back to any replica that's caught up
for (ReplicaClient replica : router.getReadReplicas()) {
try {
ReadResult result = replica.readWithMinSequence(request);
if (result.getSequence() >= lastWriteSequence) {
lastWriteReplica = replica.getId();
return result;
}
} catch (Exception e) {
continue;
}
}
// Last resort: read from primary
return router.getPrimary().read(request);
}
}
Example 5: Consistency Level Selection
public enum ConsistencyLevel {
ONE, // Fastest, lowest durability
QUORUM, // Balanced
ALL, // Slowest, highest durability
LOCAL_QUORUM, // Within datacenter
EACH_QUORUM // Quorum in each datacenter
}
public class TunableConsistencyStore {
private final List<ReplicaClient> replicas;
private final int replicationFactor;
public WriteResult write(String key, byte[] value, ConsistencyLevel level) {
int required = getRequiredAcks(level);
List<CompletableFuture<WriteAck>> futures = replicas.stream()
.map(replica -> replica.writeAsync(key, value))
.collect(Collectors.toList());
// Wait for required number of acks
AtomicInteger ackCount = new AtomicInteger(0);
CompletableFuture<WriteResult> resultFuture = new CompletableFuture<>();
for (CompletableFuture<WriteAck> future : futures) {
future.whenComplete((ack, error) -> {
if (ack != null && ack.isSuccess()) {
if (ackCount.incrementAndGet() >= required) {
resultFuture.complete(new WriteResult(true));
}
}
});
}
try {
return resultFuture.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
throw new WriteTimeoutException(
"Only " + ackCount.get() + "/" + required + " replicas acknowledged"
);
}
}
public ReadResult read(String key, ConsistencyLevel level) {
int required = getRequiredReads(level);
List<CompletableFuture<ReadResponse>> futures = replicas.stream()
.map(replica -> replica.readAsync(key))
.collect(Collectors.toList());
List<ReadResponse> responses = new CopyOnWriteArrayList<>();
CompletableFuture<ReadResult> resultFuture = new CompletableFuture<>();
for (CompletableFuture<ReadResponse> future : futures) {
future.whenComplete((response, error) -> {
if (response != null) {
responses.add(response);
if (responses.size() >= required) {
// Read repair: return newest, repair stale replicas
ReadResponse newest = responses.stream()
.max(Comparator.comparing(ReadResponse::getTimestamp))
.orElseThrow();
// Async repair
repairStaleReplicas(key, newest, responses);
resultFuture.complete(new ReadResult(newest.getValue()));
}
}
});
}
try {
return resultFuture.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
throw new ReadTimeoutException(
"Only " + responses.size() + "/" + required + " replicas responded"
);
}
}
private int getRequiredAcks(ConsistencyLevel level) {
return switch (level) {
case ONE -> 1;
case QUORUM, LOCAL_QUORUM -> replicationFactor / 2 + 1;
case ALL -> replicationFactor;
case EACH_QUORUM -> replicationFactor; // Simplified
};
}
private void repairStaleReplicas(String key, ReadResponse newest,
List<ReadResponse> responses) {
for (ReadResponse response : responses) {
if (response.getTimestamp() < newest.getTimestamp()) {
// Async write to repair
response.getReplica().writeAsync(key, newest.getValue());
}
}
}
}
Anti-Patterns
❌ Assuming Strong Consistency
// WRONG - assuming read sees previous write
write(key, value);
Object result = read(key); // May not see value in eventual consistency!
// ✅ CORRECT - use appropriate consistency level
write(key, value, ConsistencyLevel.QUORUM);
Object result = read(key, ConsistencyLevel.QUORUM);
❌ Ignoring Partition Behavior
Always test how your system behaves during network partitions.