RE
Bulkhead
Resilience core v1.0.0
Bulkhead Pattern
Overview
The bulkhead pattern isolates resources to prevent failures from cascading. Like ship compartments that contain flooding, bulkheads limit the blast radius of failures by partitioning resources.
Key Concepts
Bulkhead Types
┌─────────────────────────────────────────────────────────────┐
│ Bulkhead Patterns │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. Thread Pool Bulkhead: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Service A Pool (10 threads) ████████████ │ │
│ │ Service B Pool (5 threads) █████ │ │
│ │ Service C Pool (8 threads) ████████ │ │
│ │ │ │
│ │ If Service B is slow, only its 5 threads blocked │ │
│ │ Services A and C continue unaffected │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 2. Semaphore Bulkhead: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Permit count limits concurrent calls │ │
│ │ │ │
│ │ Semaphore(5) ──▶ [■][■][■][□][□] │ │
│ │ ▲ ▲ │ │
│ │ 3 acquired 2 available │ │
│ │ │ │
│ │ Lighter weight than thread pools │ │
│ │ Uses caller's thread │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 3. Partition Bulkhead: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Tenant A ──▶ [Instance 1, Instance 2] │ │
│ │ Tenant B ──▶ [Instance 3, Instance 4] │ │
│ │ Tenant C ──▶ [Instance 5, Instance 6] │ │
│ │ │ │
│ │ Noisy neighbor in Tenant A doesn't affect B or C │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ When to Use Which: │
│ • Thread Pool: Need isolation + timeout │
│ • Semaphore: Lighter weight, caller-thread execution │
│ • Partition: Multi-tenant, blast radius control │
│ │
└─────────────────────────────────────────────────────────────┘
Best Practices
1. Size Based on Expected Load
Analyze traffic patterns to size bulkheads appropriately.
2. Monitor Queue Depths
Alert when queues approach capacity.
3. Use Timeouts with Thread Pools
Prevent thread starvation from slow calls.
4. Different Bulkheads for Critical Services
Protect critical paths with dedicated resources.
5. Consider Rejection Handling
Define what happens when bulkhead is full.
Code Examples
Example 1: Thread Pool Bulkhead
@Configuration
public class BulkheadConfig {
@Bean
public ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry() {
ThreadPoolBulkheadConfig defaultConfig = ThreadPoolBulkheadConfig.custom()
.maxThreadPoolSize(10)
.coreThreadPoolSize(5)
.queueCapacity(20)
.keepAliveDuration(Duration.ofSeconds(60))
.writableStackTraceEnabled(true)
.build();
ThreadPoolBulkheadRegistry registry = ThreadPoolBulkheadRegistry.of(defaultConfig);
// High-priority service gets more resources
registry.addConfiguration("high-priority",
ThreadPoolBulkheadConfig.custom()
.maxThreadPoolSize(20)
.coreThreadPoolSize(10)
.queueCapacity(50)
.build()
);
// Low-priority service gets fewer resources
registry.addConfiguration("low-priority",
ThreadPoolBulkheadConfig.custom()
.maxThreadPoolSize(5)
.coreThreadPoolSize(2)
.queueCapacity(10)
.build()
);
return registry;
}
}
@Service
public class IsolatedServiceClient {
private final ThreadPoolBulkhead paymentBulkhead;
private final ThreadPoolBulkhead notificationBulkhead;
private final ThreadPoolBulkhead analyticsBulkhead;
public IsolatedServiceClient(ThreadPoolBulkheadRegistry registry) {
this.paymentBulkhead = registry.bulkhead("payment", "high-priority");
this.notificationBulkhead = registry.bulkhead("notification", "default");
this.analyticsBulkhead = registry.bulkhead("analytics", "low-priority");
registerMetrics(paymentBulkhead);
registerMetrics(notificationBulkhead);
registerMetrics(analyticsBulkhead);
}
public CompletableFuture<PaymentResult> processPayment(Payment payment) {
return ThreadPoolBulkhead.decorateCallable(
paymentBulkhead,
() -> paymentService.process(payment)
).toCompletableFuture();
}
public CompletableFuture<Void> sendNotification(Notification notification) {
return ThreadPoolBulkhead.decorateRunnable(
notificationBulkhead,
() -> notificationService.send(notification)
).toCompletableFuture();
}
public CompletableFuture<Void> trackEvent(AnalyticsEvent event) {
return ThreadPoolBulkhead.decorateRunnable(
analyticsBulkhead,
() -> analyticsService.track(event)
).toCompletableFuture()
.exceptionally(e -> {
// Analytics failures are non-critical
log.warn("Analytics tracking failed", e);
return null;
});
}
private void registerMetrics(ThreadPoolBulkhead bulkhead) {
bulkhead.getEventPublisher()
.onCallRejected(event ->
meterRegistry.counter("bulkhead.rejected",
"name", event.getBulkheadName()).increment())
.onCallPermitted(event ->
meterRegistry.counter("bulkhead.permitted",
"name", event.getBulkheadName()).increment())
.onCallFinished(event ->
meterRegistry.timer("bulkhead.duration",
"name", event.getBulkheadName())
.record(event.getEventDuration()));
}
}
Example 2: Semaphore Bulkhead
@Configuration
public class SemaphoreBulkheadConfig {
@Bean
public BulkheadRegistry bulkheadRegistry() {
BulkheadConfig defaultConfig = BulkheadConfig.custom()
.maxConcurrentCalls(25)
.maxWaitDuration(Duration.ofMillis(500))
.writableStackTraceEnabled(true)
.build();
return BulkheadRegistry.of(defaultConfig);
}
}
@Service
public class DatabaseBulkheadService {
private final Bulkhead readBulkhead;
private final Bulkhead writeBulkhead;
private final Repository repository;
public DatabaseBulkheadService(BulkheadRegistry registry) {
// More permits for reads
this.readBulkhead = registry.bulkhead("db-read", BulkheadConfig.custom()
.maxConcurrentCalls(50)
.maxWaitDuration(Duration.ofMillis(100))
.build());
// Fewer permits for writes
this.writeBulkhead = registry.bulkhead("db-write", BulkheadConfig.custom()
.maxConcurrentCalls(10)
.maxWaitDuration(Duration.ofMillis(500))
.build());
}
public Entity findById(String id) {
return Bulkhead.decorateSupplier(readBulkhead, () -> repository.findById(id))
.get();
}
public void save(Entity entity) {
try {
Bulkhead.decorateRunnable(writeBulkhead, () -> repository.save(entity))
.run();
} catch (BulkheadFullException e) {
throw new ServiceOverloadedException("Database write capacity exceeded", e);
}
}
/**
* Check bulkhead status before attempting expensive operation
*/
public boolean canAcceptWrite() {
BulkheadConfig.Metrics metrics = writeBulkhead.getMetrics();
return metrics.getAvailableConcurrentCalls() > 0;
}
/**
* Get current utilization
*/
public double getWriteUtilization() {
BulkheadConfig.Metrics metrics = writeBulkhead.getMetrics();
int max = writeBulkhead.getBulkheadConfig().getMaxConcurrentCalls();
int available = metrics.getAvailableConcurrentCalls();
return (double) (max - available) / max;
}
}
Example 3: Per-Tenant Bulkhead
@Component
public class TenantBulkheadManager {
private final ConcurrentMap<String, Bulkhead> tenantBulkheads = new ConcurrentHashMap<>();
private final TenantConfig tenantConfig;
public Bulkhead getBulkhead(String tenantId) {
return tenantBulkheads.computeIfAbsent(tenantId, this::createBulkhead);
}
private Bulkhead createBulkhead(String tenantId) {
TenantLimits limits = tenantConfig.getLimits(tenantId);
BulkheadConfig config = BulkheadConfig.custom()
.maxConcurrentCalls(limits.getMaxConcurrentCalls())
.maxWaitDuration(Duration.ofMillis(limits.getMaxWaitMs()))
.build();
Bulkhead bulkhead = Bulkhead.of("tenant-" + tenantId, config);
// Monitor per-tenant usage
bulkhead.getEventPublisher()
.onCallRejected(event -> {
log.warn("Tenant {} bulkhead full, request rejected", tenantId);
meterRegistry.counter("tenant.bulkhead.rejected",
"tenant", tenantId).increment();
});
return bulkhead;
}
/**
* Update tenant limits dynamically
*/
public void updateLimits(String tenantId, TenantLimits newLimits) {
tenantBulkheads.compute(tenantId, (id, existing) -> {
BulkheadConfig config = BulkheadConfig.custom()
.maxConcurrentCalls(newLimits.getMaxConcurrentCalls())
.maxWaitDuration(Duration.ofMillis(newLimits.getMaxWaitMs()))
.build();
return Bulkhead.of("tenant-" + id, config);
});
}
/**
* Cleanup inactive tenant bulkheads
*/
@Scheduled(fixedRate = 3600000) // hourly
public void cleanupInactiveTenants() {
Set<String> activeTenants = tenantConfig.getActiveTenants();
tenantBulkheads.keySet().removeIf(id -> !activeTenants.contains(id));
}
}
@Service
public class TenantIsolatedService {
private final TenantBulkheadManager bulkheadManager;
public Result execute(String tenantId, Request request) {
Bulkhead bulkhead = bulkheadManager.getBulkhead(tenantId);
return Try.ofSupplier(
Bulkhead.decorateSupplier(bulkhead, () -> processRequest(request))
).recover(BulkheadFullException.class, e -> {
throw new TenantQuotaExceededException(
"Tenant " + tenantId + " has exceeded concurrent request limit"
);
}).get();
}
}
Example 4: Combined Bulkhead and Circuit Breaker
@Service
public class ResilientExternalService {
private final CircuitBreaker circuitBreaker;
private final ThreadPoolBulkhead bulkhead;
private final Retry retry;
private final TimeLimiter timeLimiter;
public ResilientExternalService(
CircuitBreakerRegistry cbRegistry,
ThreadPoolBulkheadRegistry bulkheadRegistry,
RetryRegistry retryRegistry,
TimeLimiterRegistry timeLimiterRegistry) {
this.circuitBreaker = cbRegistry.circuitBreaker("external");
this.bulkhead = bulkheadRegistry.bulkhead("external");
this.retry = retryRegistry.retry("external");
this.timeLimiter = timeLimiterRegistry.timeLimiter("external");
}
/**
* Full resilience stack:
* Retry -> CircuitBreaker -> Bulkhead -> TimeLimiter -> Call
*/
public CompletableFuture<Response> callExternalService(Request request) {
// Combine all patterns using Decorators
Supplier<CompletableFuture<Response>> supplier = () ->
bulkhead.executeCallable(() -> externalClient.call(request));
Callable<Response> timedCallable = TimeLimiter.decorateCallable(
timeLimiter,
() -> supplier.get().get()
);
Callable<Response> circuitBreakerCallable = CircuitBreaker.decorateCallable(
circuitBreaker,
timedCallable
);
Callable<Response> retryingCallable = Retry.decorateCallable(
retry,
circuitBreakerCallable
);
return CompletableFuture.supplyAsync(() -> {
try {
return retryingCallable.call();
} catch (Exception e) {
throw new CompletionException(e);
}
});
}
/**
* Cleaner approach using Decorators API
*/
public Response callWithDecorators(Request request) {
return Decorators.ofCallable(() -> externalClient.call(request).get())
.withThreadPoolBulkhead(bulkhead)
.withTimeLimiter(timeLimiter)
.withCircuitBreaker(circuitBreaker)
.withRetry(retry)
.withFallback(
List.of(BulkheadFullException.class, CallNotPermittedException.class),
e -> getFallbackResponse(request)
)
.call();
}
private Response getFallbackResponse(Request request) {
// Return cached or default response
return cachedResponses.getOrDefault(request.getKey(), Response.empty());
}
}
Example 5: Adaptive Bulkhead
public class AdaptiveBulkhead {
private final AtomicInteger maxConcurrent = new AtomicInteger(10);
private final AtomicInteger currentConcurrent = new AtomicInteger(0);
private final SlidingWindowCounter successCounter;
private final SlidingWindowCounter failureCounter;
// Limits
private final int minConcurrent;
private final int maxConcurrentLimit;
private final double targetSuccessRate;
public AdaptiveBulkhead(int minConcurrent, int maxConcurrentLimit, double targetSuccessRate) {
this.minConcurrent = minConcurrent;
this.maxConcurrentLimit = maxConcurrentLimit;
this.targetSuccessRate = targetSuccessRate;
this.successCounter = new SlidingWindowCounter(Duration.ofMinutes(1));
this.failureCounter = new SlidingWindowCounter(Duration.ofMinutes(1));
}
public <T> T execute(Supplier<T> operation) {
if (!tryAcquire()) {
throw new BulkheadFullException("Adaptive bulkhead at capacity");
}
try {
T result = operation.get();
recordSuccess();
return result;
} catch (Exception e) {
recordFailure();
throw e;
} finally {
release();
}
}
private boolean tryAcquire() {
while (true) {
int current = currentConcurrent.get();
int max = maxConcurrent.get();
if (current >= max) {
return false;
}
if (currentConcurrent.compareAndSet(current, current + 1)) {
return true;
}
}
}
private void release() {
currentConcurrent.decrementAndGet();
}
private void recordSuccess() {
successCounter.increment();
maybeAdjustLimit();
}
private void recordFailure() {
failureCounter.increment();
maybeAdjustLimit();
}
/**
* AIMD (Additive Increase, Multiplicative Decrease)
*/
@Scheduled(fixedRate = 5000)
public void maybeAdjustLimit() {
long successes = successCounter.getCount();
long failures = failureCounter.getCount();
long total = successes + failures;
if (total < 10) {
return; // Not enough data
}
double successRate = (double) successes / total;
int currentMax = maxConcurrent.get();
if (successRate >= targetSuccessRate) {
// Additive increase
int newMax = Math.min(currentMax + 1, maxConcurrentLimit);
maxConcurrent.set(newMax);
log.debug("Increased bulkhead limit to {}, success rate: {}",
newMax, successRate);
} else {
// Multiplicative decrease
int newMax = Math.max((int) (currentMax * 0.8), minConcurrent);
maxConcurrent.set(newMax);
log.warn("Decreased bulkhead limit to {}, success rate: {}",
newMax, successRate);
}
}
/**
* Get current metrics
*/
public BulkheadMetrics getMetrics() {
return new BulkheadMetrics(
currentConcurrent.get(),
maxConcurrent.get(),
successCounter.getCount(),
failureCounter.getCount()
);
}
}
Anti-Patterns
❌ Shared Thread Pool for All Services
// WRONG - one slow service blocks all
ExecutorService sharedPool = Executors.newFixedThreadPool(10);
serviceA.call(); // Uses shared pool
serviceB.call(); // Uses shared pool
// ✅ CORRECT - isolated pools per service
ThreadPoolBulkhead serviceAPool = ...;
ThreadPoolBulkhead serviceBPool = ...;
❌ Bulkhead Too Small
Under-provisioned bulkheads cause excessive rejections under normal load.