Skip to content
Home / Skills / Distributed Systems / Consistency Models
DI

Consistency Models

Distributed Systems core v1.0.0

Consistency Models

Overview

Consistency models define the guarantees a distributed system provides about the order and visibility of operations. This skill covers strong consistency, eventual consistency, causal consistency, and the tradeoffs between them.


Key Concepts

Consistency Spectrum

┌─────────────────────────────────────────────────────────────┐
│                   Consistency Spectrum                       │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  Stronger ◄────────────────────────────────────▶ Weaker    │
│                                                              │
│  ┌────────────┐  ┌────────────┐  ┌────────────┐  ┌───────┐ │
│  │Linearizable│  │ Sequential │  │   Causal   │  │Eventual│ │
│  │            │  │            │  │            │  │       │ │
│  │• Real-time │  │• Program   │  │• Respects  │  │• All  │ │
│  │  ordering  │  │  order     │  │  causality │  │ replicas│
│  │• Single    │  │• Global    │  │• Partial   │  │ converge│
│  │  copy      │  │  order     │  │  order     │  │       │ │
│  └────────────┘  └────────────┘  └────────────┘  └───────┘ │
│       │               │               │              │      │
│       ▼               ▼               ▼              ▼      │
│   Latency +++     Latency ++      Latency +      Latency   │
│   Availability -  Availability -  Availability + Avail +++ │
│                                                              │
│  CAP Theorem:                                               │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ During network partition, choose:                    │   │
│  │                                                      │   │
│  │    Consistency (CP)         Availability (AP)       │   │
│  │    ┌─────────────┐         ┌─────────────┐         │   │
│  │    │ Reject ops  │         │ Accept ops  │         │   │
│  │    │ until healed│         │ may diverge │         │   │
│  │    └─────────────┘         └─────────────┘         │   │
│  │                                                      │   │
│  │ Examples:                                            │   │
│  │ CP: ZooKeeper, etcd        AP: Cassandra, DynamoDB  │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                              │
└─────────────────────────────────────────────────────────────┘

PACELC Theorem

ScenarioChoiceExamples
PartitionConsistency (PC)HBase, VoltDB
PartitionAvailability (PA)Cassandra, DynamoDB
Else (normal)Latency (EL)Cassandra
Else (normal)Consistency (EC)PAXOS stores

Best Practices

1. Match Consistency to Use Case

Bank transactions need strong; social feeds can be eventual.

2. Use Read-Your-Writes

Users should see their own updates immediately.

3. Consider Session Consistency

Maintain consistency within a user session.

4. Document Consistency Guarantees

Make explicit what clients can expect.

5. Test with Chaos Engineering

Verify behavior under partition scenarios.


Code Examples

Example 1: Read-Your-Writes Consistency

public class ReadYourWritesClient {
    
    private final List<ReplicaClient> replicas;
    private final ThreadLocal<Long> lastWriteTimestamp = new ThreadLocal<>();
    
    /**
     * Write to primary and wait for acknowledgment
     */
    public WriteResult write(String key, byte[] value) {
        WriteRequest request = new WriteRequest(key, value, System.currentTimeMillis());
        
        // Write to primary
        ReplicaClient primary = getPrimary();
        WriteResult result = primary.write(request);
        
        // Track timestamp for read-your-writes
        lastWriteTimestamp.set(result.getTimestamp());
        
        return result;
    }
    
    /**
     * Read with minimum timestamp guarantee
     */
    public ReadResult read(String key) {
        Long minTimestamp = lastWriteTimestamp.get();
        
        // Try each replica until we find one with fresh enough data
        for (ReplicaClient replica : getReadReplicas()) {
            try {
                ReadResult result = replica.read(key);
                
                if (minTimestamp == null || result.getTimestamp() >= minTimestamp) {
                    return result;
                }
                
                // This replica is stale, try another
            } catch (Exception e) {
                // Replica unavailable, try another
            }
        }
        
        // Fall back to primary
        return getPrimary().read(key);
    }
    
    /**
     * Alternative: Include version in response for client-side validation
     */
    public ReadResult readWithVersion(String key, Long expectedMinVersion) {
        for (ReplicaClient replica : getReadReplicas()) {
            ReadResult result = replica.read(key);
            
            if (expectedMinVersion == null || result.getVersion() >= expectedMinVersion) {
                return result;
            }
        }
        
        throw new StaleReadException("No replica has required version: " + expectedMinVersion);
    }
}

Example 2: Causal Consistency with Vector Clocks

public class VectorClock implements Comparable<VectorClock> {
    
    private final Map<String, Long> clock;
    
    public VectorClock() {
        this.clock = new ConcurrentHashMap<>();
    }
    
    public VectorClock(Map<String, Long> clock) {
        this.clock = new ConcurrentHashMap<>(clock);
    }
    
    public void increment(String nodeId) {
        clock.merge(nodeId, 1L, Long::sum);
    }
    
    public void merge(VectorClock other) {
        other.clock.forEach((node, time) -> 
            clock.merge(node, time, Math::max)
        );
    }
    
    /**
     * Check if this clock happened-before other
     */
    public boolean happenedBefore(VectorClock other) {
        boolean atLeastOneLess = false;
        
        Set<String> allNodes = new HashSet<>(clock.keySet());
        allNodes.addAll(other.clock.keySet());
        
        for (String node : allNodes) {
            long thisTime = clock.getOrDefault(node, 0L);
            long otherTime = other.clock.getOrDefault(node, 0L);
            
            if (thisTime > otherTime) {
                return false; // This is not before other
            }
            if (thisTime < otherTime) {
                atLeastOneLess = true;
            }
        }
        
        return atLeastOneLess;
    }
    
    /**
     * Check if clocks are concurrent (neither happened-before the other)
     */
    public boolean isConcurrentWith(VectorClock other) {
        return !this.happenedBefore(other) && !other.happenedBefore(this);
    }
    
    public Map<String, Long> toMap() {
        return new HashMap<>(clock);
    }
}

public class CausallyConsistentStore {
    
    private final String nodeId;
    private final Map<String, VersionedValue> store = new ConcurrentHashMap<>();
    private final VectorClock localClock;
    
    public WriteResult write(String key, byte[] value, VectorClock clientClock) {
        // Merge client's causal context
        localClock.merge(clientClock);
        
        // Increment for this write
        localClock.increment(nodeId);
        
        // Store with new clock
        VectorClock writeTime = new VectorClock(localClock.toMap());
        store.put(key, new VersionedValue(value, writeTime));
        
        return new WriteResult(key, writeTime);
    }
    
    public ReadResult read(String key, VectorClock clientClock) {
        // Wait until we have all causally preceding writes
        waitForCausalDependencies(clientClock);
        
        VersionedValue value = store.get(key);
        
        // Merge clocks
        if (value != null) {
            localClock.merge(value.getClock());
        }
        
        return new ReadResult(value, new VectorClock(localClock.toMap()));
    }
    
    private void waitForCausalDependencies(VectorClock required) {
        // Block until local clock >= required for all entries
        while (!localClock.happenedBefore(required) && 
               !localClock.equals(required)) {
            // Wait for replication to catch up
            Thread.sleep(10);
        }
    }
    
    /**
     * Handle concurrent writes - Last-Writer-Wins or merge
     */
    public void handleReplicatedWrite(String key, byte[] value, VectorClock writeClock) {
        store.compute(key, (k, existing) -> {
            if (existing == null) {
                return new VersionedValue(value, writeClock);
            }
            
            if (writeClock.happenedBefore(existing.getClock())) {
                // Old write, ignore
                return existing;
            }
            
            if (existing.getClock().happenedBefore(writeClock)) {
                // New write, accept
                return new VersionedValue(value, writeClock);
            }
            
            // Concurrent - need conflict resolution
            return resolveConflict(existing, new VersionedValue(value, writeClock));
        });
        
        localClock.merge(writeClock);
    }
}

Example 3: Eventual Consistency with CRDTs

/**
 * Conflict-free Replicated Data Types
 * Automatically converge without coordination
 */
public interface CRDT<T extends CRDT<T>> {
    T merge(T other);
    byte[] serialize();
}

/**
 * G-Counter: Grow-only counter
 */
public class GCounter implements CRDT<GCounter> {
    
    private final Map<String, Long> counts;
    
    public GCounter() {
        this.counts = new ConcurrentHashMap<>();
    }
    
    public void increment(String nodeId) {
        counts.merge(nodeId, 1L, Long::sum);
    }
    
    public long value() {
        return counts.values().stream().mapToLong(Long::longValue).sum();
    }
    
    @Override
    public GCounter merge(GCounter other) {
        GCounter merged = new GCounter();
        
        Set<String> allNodes = new HashSet<>(counts.keySet());
        allNodes.addAll(other.counts.keySet());
        
        for (String node : allNodes) {
            long max = Math.max(
                counts.getOrDefault(node, 0L),
                other.counts.getOrDefault(node, 0L)
            );
            merged.counts.put(node, max);
        }
        
        return merged;
    }
}

/**
 * LWW-Register: Last-Writer-Wins Register
 */
public class LWWRegister<T> implements CRDT<LWWRegister<T>> {
    
    private T value;
    private long timestamp;
    private String nodeId; // Tie-breaker
    
    public void set(T value, long timestamp, String nodeId) {
        if (timestamp > this.timestamp || 
            (timestamp == this.timestamp && nodeId.compareTo(this.nodeId) > 0)) {
            this.value = value;
            this.timestamp = timestamp;
            this.nodeId = nodeId;
        }
    }
    
    public T get() {
        return value;
    }
    
    @Override
    public LWWRegister<T> merge(LWWRegister<T> other) {
        LWWRegister<T> merged = new LWWRegister<>();
        
        if (this.timestamp > other.timestamp ||
            (this.timestamp == other.timestamp && 
             this.nodeId.compareTo(other.nodeId) > 0)) {
            merged.value = this.value;
            merged.timestamp = this.timestamp;
            merged.nodeId = this.nodeId;
        } else {
            merged.value = other.value;
            merged.timestamp = other.timestamp;
            merged.nodeId = other.nodeId;
        }
        
        return merged;
    }
}

/**
 * OR-Set: Observed-Remove Set
 */
public class ORSet<E> implements CRDT<ORSet<E>> {
    
    // Each element has unique tags for each add operation
    private final Map<E, Set<UUID>> elements = new ConcurrentHashMap<>();
    private final Set<UUID> tombstones = ConcurrentHashMap.newKeySet();
    
    public void add(E element) {
        UUID tag = UUID.randomUUID();
        elements.computeIfAbsent(element, k -> ConcurrentHashMap.newKeySet()).add(tag);
    }
    
    public void remove(E element) {
        Set<UUID> tags = elements.get(element);
        if (tags != null) {
            tombstones.addAll(tags);
            tags.clear();
        }
    }
    
    public boolean contains(E element) {
        Set<UUID> tags = elements.get(element);
        if (tags == null) return false;
        return tags.stream().anyMatch(tag -> !tombstones.contains(tag));
    }
    
    public Set<E> elements() {
        return elements.entrySet().stream()
            .filter(e -> e.getValue().stream().anyMatch(tag -> !tombstones.contains(tag)))
            .map(Map.Entry::getKey)
            .collect(Collectors.toSet());
    }
    
    @Override
    public ORSet<E> merge(ORSet<E> other) {
        ORSet<E> merged = new ORSet<>();
        
        // Merge all elements and tags
        Set<E> allElements = new HashSet<>(elements.keySet());
        allElements.addAll(other.elements.keySet());
        
        for (E element : allElements) {
            Set<UUID> mergedTags = new HashSet<>();
            if (elements.containsKey(element)) {
                mergedTags.addAll(elements.get(element));
            }
            if (other.elements.containsKey(element)) {
                mergedTags.addAll(other.elements.get(element));
            }
            if (!mergedTags.isEmpty()) {
                merged.elements.put(element, ConcurrentHashMap.newKeySet());
                merged.elements.get(element).addAll(mergedTags);
            }
        }
        
        // Merge tombstones
        merged.tombstones.addAll(tombstones);
        merged.tombstones.addAll(other.tombstones);
        
        return merged;
    }
}

Example 4: Session Consistency

public class SessionConsistentClient {
    
    private final ReplicaRouter router;
    private final String sessionId;
    
    // Track read and write bounds for this session
    private volatile long lastWriteSequence = 0;
    private volatile String lastWriteReplica = null;
    
    public SessionConsistentClient(ReplicaRouter router) {
        this.router = router;
        this.sessionId = UUID.randomUUID().toString();
    }
    
    public WriteResult write(String key, byte[] value) {
        ReplicaClient primary = router.getPrimary();
        
        WriteRequest request = WriteRequest.builder()
            .key(key)
            .value(value)
            .sessionId(sessionId)
            .build();
        
        WriteResult result = primary.write(request);
        
        // Track for future reads
        lastWriteSequence = result.getSequence();
        lastWriteReplica = primary.getId();
        
        return result;
    }
    
    public ReadResult read(String key) {
        ReadRequest request = ReadRequest.builder()
            .key(key)
            .sessionId(sessionId)
            .minSequence(lastWriteSequence) // Read-your-writes
            .preferredReplica(lastWriteReplica) // Sticky session
            .build();
        
        // Try sticky replica first
        if (lastWriteReplica != null) {
            try {
                ReplicaClient sticky = router.getReplica(lastWriteReplica);
                ReadResult result = sticky.readWithMinSequence(request);
                if (result.getSequence() >= lastWriteSequence) {
                    return result;
                }
            } catch (Exception e) {
                // Sticky replica unavailable
            }
        }
        
        // Fall back to any replica that's caught up
        for (ReplicaClient replica : router.getReadReplicas()) {
            try {
                ReadResult result = replica.readWithMinSequence(request);
                if (result.getSequence() >= lastWriteSequence) {
                    lastWriteReplica = replica.getId();
                    return result;
                }
            } catch (Exception e) {
                continue;
            }
        }
        
        // Last resort: read from primary
        return router.getPrimary().read(request);
    }
}

Example 5: Consistency Level Selection

public enum ConsistencyLevel {
    ONE,            // Fastest, lowest durability
    QUORUM,         // Balanced
    ALL,            // Slowest, highest durability
    LOCAL_QUORUM,   // Within datacenter
    EACH_QUORUM     // Quorum in each datacenter
}

public class TunableConsistencyStore {
    
    private final List<ReplicaClient> replicas;
    private final int replicationFactor;
    
    public WriteResult write(String key, byte[] value, ConsistencyLevel level) {
        int required = getRequiredAcks(level);
        
        List<CompletableFuture<WriteAck>> futures = replicas.stream()
            .map(replica -> replica.writeAsync(key, value))
            .collect(Collectors.toList());
        
        // Wait for required number of acks
        AtomicInteger ackCount = new AtomicInteger(0);
        CompletableFuture<WriteResult> resultFuture = new CompletableFuture<>();
        
        for (CompletableFuture<WriteAck> future : futures) {
            future.whenComplete((ack, error) -> {
                if (ack != null && ack.isSuccess()) {
                    if (ackCount.incrementAndGet() >= required) {
                        resultFuture.complete(new WriteResult(true));
                    }
                }
            });
        }
        
        try {
            return resultFuture.get(5, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            throw new WriteTimeoutException(
                "Only " + ackCount.get() + "/" + required + " replicas acknowledged"
            );
        }
    }
    
    public ReadResult read(String key, ConsistencyLevel level) {
        int required = getRequiredReads(level);
        
        List<CompletableFuture<ReadResponse>> futures = replicas.stream()
            .map(replica -> replica.readAsync(key))
            .collect(Collectors.toList());
        
        List<ReadResponse> responses = new CopyOnWriteArrayList<>();
        CompletableFuture<ReadResult> resultFuture = new CompletableFuture<>();
        
        for (CompletableFuture<ReadResponse> future : futures) {
            future.whenComplete((response, error) -> {
                if (response != null) {
                    responses.add(response);
                    
                    if (responses.size() >= required) {
                        // Read repair: return newest, repair stale replicas
                        ReadResponse newest = responses.stream()
                            .max(Comparator.comparing(ReadResponse::getTimestamp))
                            .orElseThrow();
                        
                        // Async repair
                        repairStaleReplicas(key, newest, responses);
                        
                        resultFuture.complete(new ReadResult(newest.getValue()));
                    }
                }
            });
        }
        
        try {
            return resultFuture.get(5, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            throw new ReadTimeoutException(
                "Only " + responses.size() + "/" + required + " replicas responded"
            );
        }
    }
    
    private int getRequiredAcks(ConsistencyLevel level) {
        return switch (level) {
            case ONE -> 1;
            case QUORUM, LOCAL_QUORUM -> replicationFactor / 2 + 1;
            case ALL -> replicationFactor;
            case EACH_QUORUM -> replicationFactor; // Simplified
        };
    }
    
    private void repairStaleReplicas(String key, ReadResponse newest, 
                                      List<ReadResponse> responses) {
        for (ReadResponse response : responses) {
            if (response.getTimestamp() < newest.getTimestamp()) {
                // Async write to repair
                response.getReplica().writeAsync(key, newest.getValue());
            }
        }
    }
}

Anti-Patterns

❌ Assuming Strong Consistency

// WRONG - assuming read sees previous write
write(key, value);
Object result = read(key); // May not see value in eventual consistency!

// ✅ CORRECT - use appropriate consistency level
write(key, value, ConsistencyLevel.QUORUM);
Object result = read(key, ConsistencyLevel.QUORUM);

❌ Ignoring Partition Behavior

Always test how your system behaves during network partitions.


References