JA
Concurrency
Java core v1.0.0
Java Concurrency
Overview
This skill covers Java concurrency fundamentals, including traditional threading, the Executor framework, synchronization primitives, and modern Virtual Threads (Project Loom). Understanding concurrency is essential for building high-performance, scalable Java applications.
Key Concepts
Thread Lifecycle
┌──────────────────────────────────────────────────────────────┐
│ Thread Lifecycle │
├──────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ │
│ │ NEW │ ← Thread created, not started │
│ └────┬────┘ │
│ │ start() │
│ ▼ │
│ ┌──────────┐ │
│ │ RUNNABLE │ ← Running or ready to run │
│ └────┬─────┘ │
│ │ │
│ ┌────┴──────────────────────────────────────┐ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────┐ ┌─────────────┐ ┌───────────────────┐ │
│ │ BLOCKED │ │ WAITING │ │ TIMED_WAITING │ │
│ └────┬────┘ └──────┬──────┘ └─────────┬─────────┘ │
│ │ │ │ │
│ └────────────────┴──────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────┐ │
│ │ TERMINATED │ │
│ └────────────┘ │
│ │
└──────────────────────────────────────────────────────────────┘
Memory Model (JMM)
The Java Memory Model defines how threads interact through memory:
- Visibility: Changes made by one thread may not be visible to others without synchronization
- Ordering: Compiler/CPU may reorder instructions
- Atomicity: Operations that complete as a single unit
Happens-Before Relationship:
- Unlock of a monitor happens-before every subsequent lock
- Write to volatile happens-before every subsequent read
- Thread.start() happens-before any action in started thread
- All actions in a thread happen-before Thread.join() returns
Best Practices
1. Prefer High-Level Concurrency Utilities
Use java.util.concurrent over raw threads and synchronized blocks.
2. Use Immutability
Immutable objects are inherently thread-safe.
3. Minimize Synchronization Scope
Lock only what’s necessary, for as short as possible.
4. Avoid Nested Locks
Nested locks risk deadlock; use a consistent lock ordering.
5. Prefer Virtual Threads for I/O-Bound Work
Virtual threads (Java 21+) are ideal for high-concurrency I/O operations.
Code Examples
Example 1: ExecutorService with Proper Shutdown
public class ExecutorServiceExample {
public void processTasksWithExecutor(List<Runnable> tasks) {
// Use try-with-resources for auto-close (Java 19+)
// Or manually manage lifecycle
ExecutorService executor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
try {
// Submit all tasks
List<Future<?>> futures = tasks.stream()
.map(executor::submit)
.toList();
// Wait for all to complete
for (Future<?> future : futures) {
try {
future.get(30, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true);
log.warn("Task timed out and was cancelled");
} catch (ExecutionException e) {
log.error("Task failed", e.getCause());
}
}
} finally {
shutdownExecutor(executor);
}
}
private void shutdownExecutor(ExecutorService executor) {
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
log.error("Executor did not terminate");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
Example 2: CompletableFuture Composition
public class CompletableFutureExample {
private final UserService userService;
private final OrderService orderService;
private final RecommendationService recommendationService;
public CompletableFuture<UserDashboard> loadDashboardAsync(String userId) {
// Execute independent operations in parallel
CompletableFuture<User> userFuture = CompletableFuture
.supplyAsync(() -> userService.findById(userId));
CompletableFuture<List<Order>> ordersFuture = CompletableFuture
.supplyAsync(() -> orderService.findByUserId(userId));
CompletableFuture<List<Product>> recommendationsFuture = CompletableFuture
.supplyAsync(() -> recommendationService.getRecommendations(userId));
// Combine results
return userFuture
.thenCombine(ordersFuture, (user, orders) ->
new UserDashboard.Builder()
.user(user)
.orders(orders)
.build())
.thenCombine(recommendationsFuture, (dashboard, recommendations) ->
dashboard.withRecommendations(recommendations))
.exceptionally(ex -> {
log.error("Failed to load dashboard", ex);
return UserDashboard.empty(userId);
});
}
// With timeout and fallback
public CompletableFuture<UserDashboard> loadDashboardWithTimeout(String userId) {
return loadDashboardAsync(userId)
.orTimeout(5, TimeUnit.SECONDS)
.exceptionally(ex -> {
if (ex.getCause() instanceof TimeoutException) {
log.warn("Dashboard load timed out for user: {}", userId);
return loadCachedDashboard(userId);
}
throw new CompletionException(ex);
});
}
}
Example 3: Virtual Threads (Java 21+)
public class VirtualThreadsExample {
// Creating virtual threads directly
public void basicVirtualThread() {
Thread.startVirtualThread(() -> {
System.out.println("Running in virtual thread: " +
Thread.currentThread().isVirtual());
});
}
// Virtual thread executor for I/O-bound tasks
public List<ApiResponse> fetchAllEndpoints(List<String> urls) {
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<Future<ApiResponse>> futures = urls.stream()
.map(url -> executor.submit(() -> fetchUrl(url)))
.toList();
return futures.stream()
.map(this::getResult)
.toList();
}
}
// Structured concurrency (Preview in Java 21)
public UserProfile loadUserProfile(String userId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask<User> userTask = scope.fork(() -> userService.findById(userId));
Subtask<Settings> settingsTask = scope.fork(() -> settingsService.getSettings(userId));
Subtask<Avatar> avatarTask = scope.fork(() -> avatarService.getAvatar(userId));
scope.join(); // Wait for all
scope.throwIfFailed(); // Propagate errors
return new UserProfile(
userTask.get(),
settingsTask.get(),
avatarTask.get()
);
}
}
// Platform threads vs Virtual threads decision
public ExecutorService chooseExecutor(TaskType taskType) {
return switch (taskType) {
// Virtual threads for I/O-bound, high concurrency
case IO_BOUND -> Executors.newVirtualThreadPerTaskExecutor();
// Platform threads for CPU-bound, compute intensive
case CPU_BOUND -> Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
// Cached for mixed workloads with bursty traffic
case MIXED -> Executors.newCachedThreadPool();
};
}
private ApiResponse fetchUrl(String url) {
// I/O operation - perfect for virtual threads
return httpClient.get(url);
}
}
Example 4: Thread-Safe Data Structures
public class ThreadSafeDataStructures {
// ConcurrentHashMap for thread-safe caching
private final ConcurrentHashMap<String, User> userCache = new ConcurrentHashMap<>();
public User getOrLoadUser(String userId) {
return userCache.computeIfAbsent(userId, id -> {
log.info("Loading user from database: {}", id);
return userRepository.findById(id).orElse(null);
});
}
// Atomic operations
private final AtomicLong requestCounter = new AtomicLong(0);
private final LongAdder highContentionCounter = new LongAdder();
public void recordRequest() {
// For moderate contention
requestCounter.incrementAndGet();
// For high contention (better scalability)
highContentionCounter.increment();
}
// BlockingQueue for producer-consumer
private final BlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>(1000);
public void produce(Task task) throws InterruptedException {
// Blocks if queue is full
boolean added = taskQueue.offer(task, 5, TimeUnit.SECONDS);
if (!added) {
throw new QueueFullException("Task queue is full");
}
}
public void consume() {
while (!Thread.currentThread().isInterrupted()) {
try {
Task task = taskQueue.poll(1, TimeUnit.SECONDS);
if (task != null) {
processTask(task);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
// Copy-on-write for read-heavy workloads
private final CopyOnWriteArrayList<EventListener> listeners = new CopyOnWriteArrayList<>();
public void addListener(EventListener listener) {
listeners.add(listener);
}
public void notifyListeners(Event event) {
// Safe iteration even with concurrent modifications
for (EventListener listener : listeners) {
listener.onEvent(event);
}
}
}
Example 5: Synchronization Primitives
public class SynchronizationPrimitives {
// ReentrantLock with try-lock pattern
private final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
public void lockWithTimeout() {
boolean acquired = false;
try {
acquired = lock.tryLock(5, TimeUnit.SECONDS);
if (acquired) {
// Critical section
performCriticalOperation();
} else {
throw new LockTimeoutException("Could not acquire lock");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (acquired) {
lock.unlock();
}
}
}
// ReadWriteLock for read-heavy workloads
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private Map<String, String> cache = new HashMap<>();
public String read(String key) {
rwLock.readLock().lock();
try {
return cache.get(key);
} finally {
rwLock.readLock().unlock();
}
}
public void write(String key, String value) {
rwLock.writeLock().lock();
try {
cache.put(key, value);
} finally {
rwLock.writeLock().unlock();
}
}
// StampedLock for optimistic reads
private final StampedLock stampedLock = new StampedLock();
private double x, y;
public double distanceFromOrigin() {
// Try optimistic read first
long stamp = stampedLock.tryOptimisticRead();
double currentX = x, currentY = y;
if (!stampedLock.validate(stamp)) {
// Fall back to read lock
stamp = stampedLock.readLock();
try {
currentX = x;
currentY = y;
} finally {
stampedLock.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
// CountDownLatch for one-time synchronization
public void waitForDependencies(List<Service> services) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(services.size());
for (Service service : services) {
Thread.startVirtualThread(() -> {
try {
service.initialize();
} finally {
latch.countDown();
}
});
}
boolean completed = latch.await(30, TimeUnit.SECONDS);
if (!completed) {
throw new InitializationException("Services did not initialize in time");
}
}
// Semaphore for rate limiting
private final Semaphore rateLimiter = new Semaphore(10); // 10 concurrent
public Result processWithRateLimit(Request request) throws InterruptedException {
rateLimiter.acquire();
try {
return process(request);
} finally {
rateLimiter.release();
}
}
}
Anti-Patterns
❌ Double-Checked Locking Without Volatile
// WRONG - broken without volatile
private static Singleton instance;
public static Singleton getInstance() {
if (instance == null) {
synchronized (Singleton.class) {
if (instance == null) {
instance = new Singleton(); // May see partially constructed object
}
}
}
return instance;
}
// ✅ CORRECT - with volatile
private static volatile Singleton instance;
❌ Synchronizing on Non-Final Field
// WRONG
private Object lock = new Object();
public void unsafeMethod() {
synchronized (lock) { // lock reference could change!
// ...
}
}
// ✅ CORRECT
private final Object lock = new Object();
❌ Catching InterruptedException Without Restoring
// WRONG
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Swallowing interrupt flag
log.error("Interrupted", e);
}
// ✅ CORRECT
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore flag
throw new RuntimeException("Operation interrupted", e);
}
Testing Strategies
Testing with ExecutorService
@Test
void shouldProcessTasksConcurrently() throws Exception {
// Use a single-threaded executor for deterministic tests
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
Future<String> future = executor.submit(() -> "result");
assertThat(future.get(1, TimeUnit.SECONDS)).isEqualTo("result");
} finally {
executor.shutdown();
}
}
// Testing race conditions with CountDownLatch
@Test
void shouldHandleConcurrentUpdates() throws Exception {
ConcurrentCounter counter = new ConcurrentCounter();
int threadCount = 100;
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch doneLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
Thread.startVirtualThread(() -> {
try {
startLatch.await();
counter.increment();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
doneLatch.countDown();
}
});
}
startLatch.countDown(); // Start all threads simultaneously
doneLatch.await(10, TimeUnit.SECONDS);
assertThat(counter.getValue()).isEqualTo(threadCount);
}