DI
Partitioning
Distributed Systems core v1.0.0
Data Partitioning
Overview
Data partitioning (sharding) divides data across multiple nodes to achieve horizontal scalability. This skill covers partitioning strategies, consistent hashing, rebalancing, and handling cross-partition operations.
Key Concepts
Partitioning Strategies
┌─────────────────────────────────────────────────────────────┐
│ Partitioning Strategies │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. Hash Partitioning: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ key ──▶ hash(key) ──▶ partition = hash % N │ │
│ │ │ │
│ │ Pros: Even distribution │ │
│ │ Cons: No range queries, rebalancing pain │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 2. Range Partitioning: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Partition 0: A-F │ │
│ │ Partition 1: G-M │ │
│ │ Partition 2: N-S │ │
│ │ Partition 3: T-Z │ │
│ │ │ │
│ │ Pros: Range queries, sorted access │ │
│ │ Cons: Hot spots if keys not uniform │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 3. Consistent Hashing: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 0° │ │
│ │ ╱────────╲ │ │
│ │ ╱ N1 ╲ │ │
│ │ 270° •───────• 90° │ │
│ │ ╲ N3 ╱ │ │
│ │ ╲────────╱ │ │
│ │ 180° │ │
│ │ │ │
│ │ Key maps to next node clockwise on ring │ │
│ │ Adding node only moves ~1/N keys │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 4. Compound Partitioning: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Partition key + Sort key │ │
│ │ e.g., (user_id, timestamp) │ │
│ │ │ │
│ │ All items for user on same partition │ │
│ │ Sorted by timestamp within partition │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Best Practices
1. Choose Partition Key Wisely
High cardinality, even distribution, query-aligned.
2. Avoid Hot Partitions
Monitor and add salt to hot keys if needed.
3. Plan for Rebalancing
Design for adding/removing nodes with minimal disruption.
4. Keep Related Data Together
Colocate data accessed together to avoid cross-partition joins.
5. Use Virtual Nodes
Improve load distribution in consistent hashing.
Code Examples
Example 1: Consistent Hashing
public class ConsistentHashRing<N> {
private final TreeMap<Long, N> ring = new TreeMap<>();
private final int virtualNodes;
private final HashFunction hashFunction;
public ConsistentHashRing(int virtualNodes) {
this.virtualNodes = virtualNodes;
this.hashFunction = Hashing.murmur3_128();
}
public void addNode(N node) {
for (int i = 0; i < virtualNodes; i++) {
long hash = hash(node.toString() + "-" + i);
ring.put(hash, node);
}
}
public void removeNode(N node) {
for (int i = 0; i < virtualNodes; i++) {
long hash = hash(node.toString() + "-" + i);
ring.remove(hash);
}
}
public N getNode(String key) {
if (ring.isEmpty()) {
throw new IllegalStateException("No nodes in ring");
}
long hash = hash(key);
// Find first node >= hash (clockwise)
Map.Entry<Long, N> entry = ring.ceilingEntry(hash);
// Wrap around to first node
if (entry == null) {
entry = ring.firstEntry();
}
return entry.getValue();
}
/**
* Get N nodes for replication
*/
public List<N> getNodes(String key, int count) {
if (ring.isEmpty()) {
throw new IllegalStateException("No nodes in ring");
}
long hash = hash(key);
List<N> nodes = new ArrayList<>();
Set<N> seen = new HashSet<>();
// Start from ceiling and walk clockwise
NavigableMap<Long, N> tailMap = ring.tailMap(hash, true);
for (N node : tailMap.values()) {
if (seen.add(node)) {
nodes.add(node);
if (nodes.size() >= count) {
return nodes;
}
}
}
// Wrap around
for (N node : ring.values()) {
if (seen.add(node)) {
nodes.add(node);
if (nodes.size() >= count) {
return nodes;
}
}
}
return nodes;
}
private long hash(String key) {
return hashFunction.hashString(key, StandardCharsets.UTF_8).asLong();
}
/**
* Calculate which keys need to move when a node is added
*/
public Map<String, N> getKeysToMove(N newNode, Collection<String> allKeys) {
Map<String, N> toMove = new HashMap<>();
// Temporarily add new node
addNode(newNode);
for (String key : allKeys) {
N owner = getNode(key);
if (owner.equals(newNode)) {
toMove.put(key, newNode);
}
}
return toMove;
}
}
Example 2: Partition-Aware Router
public class PartitionRouter {
private final List<PartitionInfo> partitions;
private final ConsistentHashRing<String> ring;
private final ReplicaPlacementStrategy replicaStrategy;
public PartitionRouter(List<String> nodes, int partitionCount, int replicationFactor) {
this.ring = new ConsistentHashRing<>(100); // Virtual nodes
this.replicaStrategy = new RackAwareReplicaPlacement(replicationFactor);
this.partitions = new ArrayList<>();
// Initialize ring with nodes
nodes.forEach(ring::addNode);
// Create partition map
for (int i = 0; i < partitionCount; i++) {
String partitionKey = "partition-" + i;
List<String> replicas = ring.getNodes(partitionKey, replicationFactor);
partitions.add(new PartitionInfo(i, replicas.get(0), replicas));
}
}
public int getPartition(String key) {
long hash = Hashing.murmur3_128()
.hashString(key, StandardCharsets.UTF_8)
.asLong();
return Math.abs((int) (hash % partitions.size()));
}
public PartitionInfo getPartitionInfo(String key) {
return partitions.get(getPartition(key));
}
public String getLeader(String key) {
return getPartitionInfo(key).getLeader();
}
public List<String> getReplicas(String key) {
return getPartitionInfo(key).getReplicas();
}
/**
* Route request to appropriate node
*/
public <T> CompletableFuture<T> route(String key, Function<String, CompletableFuture<T>> operation) {
PartitionInfo partition = getPartitionInfo(key);
// Try leader first
return operation.apply(partition.getLeader())
.exceptionallyCompose(e -> {
// Fallback to replicas
return tryReplicas(partition.getReplicas(), operation);
});
}
private <T> CompletableFuture<T> tryReplicas(
List<String> replicas,
Function<String, CompletableFuture<T>> operation) {
CompletableFuture<T> result = new CompletableFuture<>();
AtomicInteger index = new AtomicInteger(1); // Skip leader
tryNextReplica(replicas, operation, result, index);
return result;
}
private <T> void tryNextReplica(
List<String> replicas,
Function<String, CompletableFuture<T>> operation,
CompletableFuture<T> result,
AtomicInteger index) {
int i = index.getAndIncrement();
if (i >= replicas.size()) {
result.completeExceptionally(new NoAvailableReplicaException());
return;
}
operation.apply(replicas.get(i))
.whenComplete((value, error) -> {
if (error == null) {
result.complete(value);
} else {
tryNextReplica(replicas, operation, result, index);
}
});
}
}
@Value
class PartitionInfo {
int id;
String leader;
List<String> replicas;
}
Example 3: Hot Partition Handling
public class HotKeyHandler {
private final PartitionRouter router;
private final ConcurrentMap<String, AtomicLong> keyAccessCounts = new ConcurrentHashMap<>();
private final long hotKeyThreshold;
private final Set<String> hotKeys = ConcurrentHashMap.newKeySet();
/**
* Detect and handle hot keys
*/
public void recordAccess(String key) {
long count = keyAccessCounts
.computeIfAbsent(key, k -> new AtomicLong())
.incrementAndGet();
if (count > hotKeyThreshold && hotKeys.add(key)) {
handleHotKey(key);
}
}
private void handleHotKey(String key) {
log.warn("Hot key detected: {}", key);
// Options:
// 1. Add replicas for read scaling
// 2. Add salt to split writes
// 3. Cache aggressively
// 4. Rate limit
}
/**
* Salt hot keys to distribute across partitions
*/
public String getSaltedKey(String key, boolean isWrite) {
if (!hotKeys.contains(key)) {
return key;
}
if (isWrite) {
// Append random salt for writes
int salt = ThreadLocalRandom.current().nextInt(10);
return key + "#" + salt;
} else {
// Reads must check all salted variants
return key; // Handle in read logic
}
}
/**
* Read from all salted partitions and merge
*/
public <T> CompletableFuture<T> readHotKey(
String key,
Function<String, CompletableFuture<T>> reader,
BinaryOperator<T> merger) {
if (!hotKeys.contains(key)) {
return reader.apply(key);
}
// Read from all salt variants
List<CompletableFuture<T>> futures = new ArrayList<>();
for (int salt = 0; salt < 10; salt++) {
String saltedKey = key + "#" + salt;
futures.add(reader.apply(saltedKey).exceptionally(e -> null));
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.reduce(merger)
.orElse(null));
}
@Scheduled(fixedRate = 60000)
public void resetCounters() {
// Periodic reset to detect changing access patterns
Map<String, Long> snapshot = new HashMap<>();
keyAccessCounts.forEach((k, v) -> snapshot.put(k, v.getAndSet(0)));
// Update hot key set based on recent access
hotKeys.clear();
snapshot.entrySet().stream()
.filter(e -> e.getValue() > hotKeyThreshold)
.map(Map.Entry::getKey)
.forEach(hotKeys::add);
}
}
Example 4: Partition Rebalancing
public class PartitionRebalancer {
private final PartitionRouter router;
private final DataMigrator migrator;
/**
* Rebalance when adding a new node
*/
public void addNode(String newNode) {
log.info("Adding node: {}", newNode);
// Calculate new partition assignments
Map<Integer, List<String>> currentAssignments = router.getAllAssignments();
Map<Integer, List<String>> newAssignments = calculateNewAssignments(
currentAssignments, newNode
);
// Find partitions that need to move
List<PartitionMove> moves = calculateMoves(currentAssignments, newAssignments);
log.info("Need to move {} partitions", moves.size());
// Execute moves with minimal disruption
for (PartitionMove move : moves) {
executeMove(move);
}
// Update router
router.updateAssignments(newAssignments);
}
private void executeMove(PartitionMove move) {
log.info("Moving partition {} from {} to {}",
move.getPartitionId(), move.getSource(), move.getTarget());
try {
// Phase 1: Copy data to new location
migrator.copyPartition(move.getPartitionId(), move.getSource(), move.getTarget());
// Phase 2: Catch up with writes during copy
migrator.syncWrites(move.getPartitionId(), move.getSource(), move.getTarget());
// Phase 3: Switch ownership (brief pause)
router.setPartitionReadOnly(move.getPartitionId(), true);
migrator.finalSync(move.getPartitionId(), move.getSource(), move.getTarget());
router.updatePartitionOwner(move.getPartitionId(), move.getTarget());
router.setPartitionReadOnly(move.getPartitionId(), false);
// Phase 4: Clean up old data
migrator.scheduleCleanup(move.getPartitionId(), move.getSource());
} catch (Exception e) {
log.error("Failed to move partition {}", move.getPartitionId(), e);
// Rollback
router.setPartitionReadOnly(move.getPartitionId(), false);
throw e;
}
}
/**
* Parallel rebalancing with concurrency limit
*/
public CompletableFuture<Void> rebalanceAsync(int maxConcurrentMoves) {
Map<Integer, List<String>> current = router.getAllAssignments();
Map<Integer, List<String>> target = calculateBalancedAssignments(current);
List<PartitionMove> moves = calculateMoves(current, target);
Semaphore semaphore = new Semaphore(maxConcurrentMoves);
List<CompletableFuture<Void>> futures = moves.stream()
.map(move -> CompletableFuture.runAsync(() -> {
try {
semaphore.acquire();
executeMove(move);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release();
}
}))
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
}
@Value
class PartitionMove {
int partitionId;
String source;
String target;
}
Example 5: Cross-Partition Queries
public class CrossPartitionQueryExecutor {
private final PartitionRouter router;
private final ExecutorService executor;
/**
* Scatter-Gather pattern for cross-partition queries
*/
public <T> List<T> scatterGather(
Query query,
Function<Query, List<T>> partitionExecutor,
Comparator<T> ordering,
int limit) {
// Scatter: send to all partitions
List<CompletableFuture<List<T>>> futures = router.getAllPartitions().stream()
.map(partition -> CompletableFuture.supplyAsync(
() -> partitionExecutor.apply(query.forPartition(partition)),
executor
))
.collect(Collectors.toList());
// Gather: collect and merge results
List<T> allResults = futures.stream()
.map(f -> {
try {
return f.get(30, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("Partition query failed", e);
return Collections.<T>emptyList();
}
})
.flatMap(List::stream)
.collect(Collectors.toList());
// Sort and limit
return allResults.stream()
.sorted(ordering)
.limit(limit)
.collect(Collectors.toList());
}
/**
* Parallel aggregation across partitions
*/
public <T, R> R aggregate(
Query query,
Function<Query, T> partitionExecutor,
R identity,
BiFunction<R, T, R> accumulator,
BinaryOperator<R> combiner) {
return router.getAllPartitions().parallelStream()
.map(partition -> partitionExecutor.apply(query.forPartition(partition)))
.reduce(identity, accumulator, combiner);
}
/**
* Two-phase commit for cross-partition transactions
*/
public <T> T executeTransaction(
List<String> keys,
Function<TransactionContext, T> transaction) {
// Group keys by partition
Map<Integer, List<String>> keysByPartition = keys.stream()
.collect(Collectors.groupingBy(router::getPartition));
if (keysByPartition.size() == 1) {
// Single partition - simple case
return executeSinglePartitionTx(
keysByPartition.keySet().iterator().next(),
transaction
);
}
// Multi-partition - need 2PC
String txId = UUID.randomUUID().toString();
TransactionContext ctx = new TransactionContext(txId);
try {
// Phase 1: Prepare
for (int partition : keysByPartition.keySet()) {
boolean prepared = preparePartition(partition, txId, keysByPartition.get(partition));
if (!prepared) {
throw new TransactionAbortedException("Prepare failed for partition " + partition);
}
}
// Execute transaction logic
T result = transaction.apply(ctx);
// Phase 2: Commit
for (int partition : keysByPartition.keySet()) {
commitPartition(partition, txId);
}
return result;
} catch (Exception e) {
// Abort all partitions
for (int partition : keysByPartition.keySet()) {
try {
abortPartition(partition, txId);
} catch (Exception ex) {
log.error("Failed to abort partition {}", partition, ex);
}
}
throw e;
}
}
}
Anti-Patterns
❌ Modulo Partitioning with Changing Node Count
// WRONG - adding a node moves almost all data
int partition = hash(key) % nodeCount;
// ✅ CORRECT - use consistent hashing
// Only ~1/N keys move when adding a node
❌ Cross-Partition Joins in Hot Path
Avoid designs requiring frequent cross-partition operations.