Skip to content
Home / Skills / Java / Concurrency
JA

Concurrency

Java core v1.0.0

Java Concurrency

Overview

This skill covers Java concurrency fundamentals, including traditional threading, the Executor framework, synchronization primitives, and modern Virtual Threads (Project Loom). Understanding concurrency is essential for building high-performance, scalable Java applications.


Key Concepts

Thread Lifecycle

┌──────────────────────────────────────────────────────────────┐
│                    Thread Lifecycle                           │
├──────────────────────────────────────────────────────────────┤
│                                                               │
│    ┌─────────┐                                               │
│    │   NEW   │ ← Thread created, not started                 │
│    └────┬────┘                                               │
│         │ start()                                            │
│         ▼                                                    │
│    ┌──────────┐                                              │
│    │ RUNNABLE │ ← Running or ready to run                    │
│    └────┬─────┘                                              │
│         │                                                    │
│    ┌────┴──────────────────────────────────────┐            │
│    │                                            │            │
│    ▼                                            ▼            │
│ ┌─────────┐    ┌─────────────┐    ┌───────────────────┐     │
│ │ BLOCKED │    │   WAITING   │    │  TIMED_WAITING    │     │
│ └────┬────┘    └──────┬──────┘    └─────────┬─────────┘     │
│      │                │                      │               │
│      └────────────────┴──────────────────────┘               │
│                       │                                      │
│                       ▼                                      │
│                 ┌────────────┐                               │
│                 │ TERMINATED │                               │
│                 └────────────┘                               │
│                                                               │
└──────────────────────────────────────────────────────────────┘

Memory Model (JMM)

The Java Memory Model defines how threads interact through memory:

  • Visibility: Changes made by one thread may not be visible to others without synchronization
  • Ordering: Compiler/CPU may reorder instructions
  • Atomicity: Operations that complete as a single unit

Happens-Before Relationship:

  • Unlock of a monitor happens-before every subsequent lock
  • Write to volatile happens-before every subsequent read
  • Thread.start() happens-before any action in started thread
  • All actions in a thread happen-before Thread.join() returns

Best Practices

1. Prefer High-Level Concurrency Utilities

Use java.util.concurrent over raw threads and synchronized blocks.

2. Use Immutability

Immutable objects are inherently thread-safe.

3. Minimize Synchronization Scope

Lock only what’s necessary, for as short as possible.

4. Avoid Nested Locks

Nested locks risk deadlock; use a consistent lock ordering.

5. Prefer Virtual Threads for I/O-Bound Work

Virtual threads (Java 21+) are ideal for high-concurrency I/O operations.


Code Examples

Example 1: ExecutorService with Proper Shutdown

public class ExecutorServiceExample {
    
    public void processTasksWithExecutor(List<Runnable> tasks) {
        // Use try-with-resources for auto-close (Java 19+)
        // Or manually manage lifecycle
        ExecutorService executor = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors()
        );
        
        try {
            // Submit all tasks
            List<Future<?>> futures = tasks.stream()
                .map(executor::submit)
                .toList();
            
            // Wait for all to complete
            for (Future<?> future : futures) {
                try {
                    future.get(30, TimeUnit.SECONDS);
                } catch (TimeoutException e) {
                    future.cancel(true);
                    log.warn("Task timed out and was cancelled");
                } catch (ExecutionException e) {
                    log.error("Task failed", e.getCause());
                }
            }
        } finally {
            shutdownExecutor(executor);
        }
    }
    
    private void shutdownExecutor(ExecutorService executor) {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    log.error("Executor did not terminate");
                }
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

Example 2: CompletableFuture Composition

public class CompletableFutureExample {
    
    private final UserService userService;
    private final OrderService orderService;
    private final RecommendationService recommendationService;
    
    public CompletableFuture<UserDashboard> loadDashboardAsync(String userId) {
        // Execute independent operations in parallel
        CompletableFuture<User> userFuture = CompletableFuture
            .supplyAsync(() -> userService.findById(userId));
        
        CompletableFuture<List<Order>> ordersFuture = CompletableFuture
            .supplyAsync(() -> orderService.findByUserId(userId));
        
        CompletableFuture<List<Product>> recommendationsFuture = CompletableFuture
            .supplyAsync(() -> recommendationService.getRecommendations(userId));
        
        // Combine results
        return userFuture
            .thenCombine(ordersFuture, (user, orders) -> 
                new UserDashboard.Builder()
                    .user(user)
                    .orders(orders)
                    .build())
            .thenCombine(recommendationsFuture, (dashboard, recommendations) ->
                dashboard.withRecommendations(recommendations))
            .exceptionally(ex -> {
                log.error("Failed to load dashboard", ex);
                return UserDashboard.empty(userId);
            });
    }
    
    // With timeout and fallback
    public CompletableFuture<UserDashboard> loadDashboardWithTimeout(String userId) {
        return loadDashboardAsync(userId)
            .orTimeout(5, TimeUnit.SECONDS)
            .exceptionally(ex -> {
                if (ex.getCause() instanceof TimeoutException) {
                    log.warn("Dashboard load timed out for user: {}", userId);
                    return loadCachedDashboard(userId);
                }
                throw new CompletionException(ex);
            });
    }
}

Example 3: Virtual Threads (Java 21+)

public class VirtualThreadsExample {
    
    // Creating virtual threads directly
    public void basicVirtualThread() {
        Thread.startVirtualThread(() -> {
            System.out.println("Running in virtual thread: " + 
                Thread.currentThread().isVirtual());
        });
    }
    
    // Virtual thread executor for I/O-bound tasks
    public List<ApiResponse> fetchAllEndpoints(List<String> urls) {
        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
            List<Future<ApiResponse>> futures = urls.stream()
                .map(url -> executor.submit(() -> fetchUrl(url)))
                .toList();
            
            return futures.stream()
                .map(this::getResult)
                .toList();
        }
    }
    
    // Structured concurrency (Preview in Java 21)
    public UserProfile loadUserProfile(String userId) throws Exception {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            
            Subtask<User> userTask = scope.fork(() -> userService.findById(userId));
            Subtask<Settings> settingsTask = scope.fork(() -> settingsService.getSettings(userId));
            Subtask<Avatar> avatarTask = scope.fork(() -> avatarService.getAvatar(userId));
            
            scope.join();           // Wait for all
            scope.throwIfFailed();  // Propagate errors
            
            return new UserProfile(
                userTask.get(),
                settingsTask.get(),
                avatarTask.get()
            );
        }
    }
    
    // Platform threads vs Virtual threads decision
    public ExecutorService chooseExecutor(TaskType taskType) {
        return switch (taskType) {
            // Virtual threads for I/O-bound, high concurrency
            case IO_BOUND -> Executors.newVirtualThreadPerTaskExecutor();
            
            // Platform threads for CPU-bound, compute intensive
            case CPU_BOUND -> Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors()
            );
            
            // Cached for mixed workloads with bursty traffic
            case MIXED -> Executors.newCachedThreadPool();
        };
    }
    
    private ApiResponse fetchUrl(String url) {
        // I/O operation - perfect for virtual threads
        return httpClient.get(url);
    }
}

Example 4: Thread-Safe Data Structures

public class ThreadSafeDataStructures {
    
    // ConcurrentHashMap for thread-safe caching
    private final ConcurrentHashMap<String, User> userCache = new ConcurrentHashMap<>();
    
    public User getOrLoadUser(String userId) {
        return userCache.computeIfAbsent(userId, id -> {
            log.info("Loading user from database: {}", id);
            return userRepository.findById(id).orElse(null);
        });
    }
    
    // Atomic operations
    private final AtomicLong requestCounter = new AtomicLong(0);
    private final LongAdder highContentionCounter = new LongAdder();
    
    public void recordRequest() {
        // For moderate contention
        requestCounter.incrementAndGet();
        
        // For high contention (better scalability)
        highContentionCounter.increment();
    }
    
    // BlockingQueue for producer-consumer
    private final BlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>(1000);
    
    public void produce(Task task) throws InterruptedException {
        // Blocks if queue is full
        boolean added = taskQueue.offer(task, 5, TimeUnit.SECONDS);
        if (!added) {
            throw new QueueFullException("Task queue is full");
        }
    }
    
    public void consume() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                Task task = taskQueue.poll(1, TimeUnit.SECONDS);
                if (task != null) {
                    processTask(task);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
    
    // Copy-on-write for read-heavy workloads
    private final CopyOnWriteArrayList<EventListener> listeners = new CopyOnWriteArrayList<>();
    
    public void addListener(EventListener listener) {
        listeners.add(listener);
    }
    
    public void notifyListeners(Event event) {
        // Safe iteration even with concurrent modifications
        for (EventListener listener : listeners) {
            listener.onEvent(event);
        }
    }
}

Example 5: Synchronization Primitives

public class SynchronizationPrimitives {
    
    // ReentrantLock with try-lock pattern
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();
    
    public void lockWithTimeout() {
        boolean acquired = false;
        try {
            acquired = lock.tryLock(5, TimeUnit.SECONDS);
            if (acquired) {
                // Critical section
                performCriticalOperation();
            } else {
                throw new LockTimeoutException("Could not acquire lock");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            if (acquired) {
                lock.unlock();
            }
        }
    }
    
    // ReadWriteLock for read-heavy workloads
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private Map<String, String> cache = new HashMap<>();
    
    public String read(String key) {
        rwLock.readLock().lock();
        try {
            return cache.get(key);
        } finally {
            rwLock.readLock().unlock();
        }
    }
    
    public void write(String key, String value) {
        rwLock.writeLock().lock();
        try {
            cache.put(key, value);
        } finally {
            rwLock.writeLock().unlock();
        }
    }
    
    // StampedLock for optimistic reads
    private final StampedLock stampedLock = new StampedLock();
    private double x, y;
    
    public double distanceFromOrigin() {
        // Try optimistic read first
        long stamp = stampedLock.tryOptimisticRead();
        double currentX = x, currentY = y;
        
        if (!stampedLock.validate(stamp)) {
            // Fall back to read lock
            stamp = stampedLock.readLock();
            try {
                currentX = x;
                currentY = y;
            } finally {
                stampedLock.unlockRead(stamp);
            }
        }
        
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
    
    // CountDownLatch for one-time synchronization
    public void waitForDependencies(List<Service> services) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(services.size());
        
        for (Service service : services) {
            Thread.startVirtualThread(() -> {
                try {
                    service.initialize();
                } finally {
                    latch.countDown();
                }
            });
        }
        
        boolean completed = latch.await(30, TimeUnit.SECONDS);
        if (!completed) {
            throw new InitializationException("Services did not initialize in time");
        }
    }
    
    // Semaphore for rate limiting
    private final Semaphore rateLimiter = new Semaphore(10); // 10 concurrent
    
    public Result processWithRateLimit(Request request) throws InterruptedException {
        rateLimiter.acquire();
        try {
            return process(request);
        } finally {
            rateLimiter.release();
        }
    }
}

Anti-Patterns

❌ Double-Checked Locking Without Volatile

// WRONG - broken without volatile
private static Singleton instance;

public static Singleton getInstance() {
    if (instance == null) {
        synchronized (Singleton.class) {
            if (instance == null) {
                instance = new Singleton(); // May see partially constructed object
            }
        }
    }
    return instance;
}

// ✅ CORRECT - with volatile
private static volatile Singleton instance;

❌ Synchronizing on Non-Final Field

// WRONG
private Object lock = new Object();

public void unsafeMethod() {
    synchronized (lock) { // lock reference could change!
        // ...
    }
}

// ✅ CORRECT
private final Object lock = new Object();

❌ Catching InterruptedException Without Restoring

// WRONG
try {
    Thread.sleep(1000);
} catch (InterruptedException e) {
    // Swallowing interrupt flag
    log.error("Interrupted", e);
}

// ✅ CORRECT
try {
    Thread.sleep(1000);
} catch (InterruptedException e) {
    Thread.currentThread().interrupt(); // Restore flag
    throw new RuntimeException("Operation interrupted", e);
}

Testing Strategies

Testing with ExecutorService

@Test
void shouldProcessTasksConcurrently() throws Exception {
    // Use a single-threaded executor for deterministic tests
    ExecutorService executor = Executors.newSingleThreadExecutor();
    
    try {
        Future<String> future = executor.submit(() -> "result");
        assertThat(future.get(1, TimeUnit.SECONDS)).isEqualTo("result");
    } finally {
        executor.shutdown();
    }
}

// Testing race conditions with CountDownLatch
@Test
void shouldHandleConcurrentUpdates() throws Exception {
    ConcurrentCounter counter = new ConcurrentCounter();
    int threadCount = 100;
    CountDownLatch startLatch = new CountDownLatch(1);
    CountDownLatch doneLatch = new CountDownLatch(threadCount);
    
    for (int i = 0; i < threadCount; i++) {
        Thread.startVirtualThread(() -> {
            try {
                startLatch.await();
                counter.increment();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                doneLatch.countDown();
            }
        });
    }
    
    startLatch.countDown(); // Start all threads simultaneously
    doneLatch.await(10, TimeUnit.SECONDS);
    
    assertThat(counter.getValue()).isEqualTo(threadCount);
}

References