data-driven-docs

Living documentation for evolving technologies

View on GitHub

Concurrency Utilities

Advanced Java 5+ Est. Time: 25min


Table of Contents


Overview

Java provides high-level synchronization utilities in java.util.concurrent for coordinating threads.

Main Utilities:

Advantages over wait/notify:

Back to top


CountDownLatch

Purpose: One or more threads wait until N operations (counted down) complete.

Key Characteristics:

Basic Usage

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        int numWorkers = 3;
        CountDownLatch latch = new CountDownLatch(numWorkers);

        // Start worker threads
        for (int i = 0; i < numWorkers; i++) {
            int workerId = i;
            new Thread(() -> {
                System.out.println("Worker " + workerId + " starting");
                doWork();
                System.out.println("Worker " + workerId + " finished");
                latch.countDown();  // Decrement count
            }).start();
        }

        // Wait for all workers to finish
        latch.await();  // Blocks until count reaches 0
        System.out.println("All workers finished!");
    }

    static void doWork() {
        try { Thread.sleep(1000); } catch (InterruptedException e) {}
    }
}

Output:

Worker 0 starting
Worker 1 starting
Worker 2 starting
Worker 0 finished
Worker 1 finished
Worker 2 finished
All workers finished!

Use Case: Service Startup

public class ApplicationStartup {
    private static final int NUM_SERVICES = 5;
    private final CountDownLatch latch = new CountDownLatch(NUM_SERVICES);

    public void startServices() throws InterruptedException {
        startService("Database", latch);
        startService("Cache", latch);
        startService("MessageQueue", latch);
        startService("WebServer", latch);
        startService("Scheduler", latch);

        // Wait for all services to start
        latch.await();
        System.out.println("All services started. Application ready!");
    }

    private void startService(String name, CountDownLatch latch) {
        new Thread(() -> {
            System.out.println("Starting " + name + "...");
            // Initialize service
            try { Thread.sleep(2000); } catch (InterruptedException e) {}
            System.out.println(name + " started");
            latch.countDown();
        }).start();
    }
}

Key Methods

CountDownLatch latch = new CountDownLatch(5);

// Decrement count (called by worker threads)
latch.countDown();

// Wait indefinitely for count to reach 0
latch.await();

// Wait with timeout
boolean done = latch.await(10, TimeUnit.SECONDS);
if (!done) {
    System.out.println("Timeout! Not all workers finished");
}

// Get current count
long current = latch.getCount();

Back to top


CyclicBarrier

Purpose: N threads wait for each other at a synchronization point, then all proceed together.

Key Characteristics:

Basic Usage

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        int numThreads = 3;
        CyclicBarrier barrier = new CyclicBarrier(numThreads, () -> {
            System.out.println("All threads reached barrier. Proceeding...");
        });

        for (int i = 0; i < numThreads; i++) {
            int threadId = i;
            new Thread(() -> {
                try {
                    System.out.println("Thread " + threadId + " doing part 1");
                    Thread.sleep(1000 * threadId);

                    barrier.await();  // Wait for all threads

                    System.out.println("Thread " + threadId + " doing part 2");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

Output:

Thread 0 doing part 1
Thread 1 doing part 1
Thread 2 doing part 1
(all wait at barrier)
All threads reached barrier. Proceeding...
Thread 0 doing part 2
Thread 1 doing part 2
Thread 2 doing part 2

Use Case: Parallel Matrix Processing

public class ParallelMatrixProcessor {
    private final int numThreads = 4;
    private final CyclicBarrier barrier;
    private double[][] matrix;

    public ParallelMatrixProcessor() {
        barrier = new CyclicBarrier(numThreads, () -> {
            System.out.println("Phase complete. Matrix normalized.");
        });
    }

    public void processMatrix(double[][] data) {
        this.matrix = data;
        int rowsPerThread = matrix.length / numThreads;

        for (int i = 0; i < numThreads; i++) {
            int startRow = i * rowsPerThread;
            int endRow = (i == numThreads - 1) ? matrix.length : (i + 1) * rowsPerThread;

            new Thread(() -> processRows(startRow, endRow)).start();
        }
    }

    private void processRows(int startRow, int endRow) {
        try {
            // Phase 1: Multiply by 2
            for (int i = startRow; i < endRow; i++) {
                for (int j = 0; j < matrix[i].length; j++) {
                    matrix[i][j] *= 2;
                }
            }
            barrier.await();  // Synchronize after phase 1

            // Phase 2: Add 10
            for (int i = startRow; i < endRow; i++) {
                for (int j = 0; j < matrix[i].length; j++) {
                    matrix[i][j] += 10;
                }
            }
            barrier.await();  // Synchronize after phase 2

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Key Methods

CyclicBarrier barrier = new CyclicBarrier(3);

// Wait at barrier (blocks until all threads arrive)
barrier.await();

// Wait with timeout
barrier.await(5, TimeUnit.SECONDS);

// Get number of waiting threads
int waiting = barrier.getNumberWaiting();

// Get required number of parties
int parties = barrier.getParties();

// Reset the barrier (releases all waiting threads with BrokenBarrierException)
barrier.reset();

// Check if barrier is broken
boolean broken = barrier.isBroken();

Back to top


Semaphore

Purpose: Control access to a resource pool with N permits.

Key Characteristics:

Basic Usage

import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    // Allow max 3 threads to access resource simultaneously
    private static final Semaphore semaphore = new Semaphore(3);

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            int threadId = i;
            new Thread(() -> accessResource(threadId)).start();
        }
    }

    static void accessResource(int threadId) {
        try {
            semaphore.acquire();  // Acquire permit (blocks if none available)
            System.out.println("Thread " + threadId + " accessing resource");
            Thread.sleep(2000);  // Simulate resource usage
            System.out.println("Thread " + threadId + " releasing resource");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();  // Release permit
        }
    }
}

Output: Only 3 threads access resource at a time.

Use Case: Database Connection Pool

public class ConnectionPool {
    private final Semaphore semaphore;
    private final List<Connection> connections = new ArrayList<>();

    public ConnectionPool(int poolSize) {
        semaphore = new Semaphore(poolSize);
        for (int i = 0; i < poolSize; i++) {
            connections.add(createConnection());
        }
    }

    public Connection getConnection() throws InterruptedException {
        semaphore.acquire();  // Wait for available connection
        return getAvailableConnection();
    }

    public void releaseConnection(Connection conn) {
        returnConnection(conn);
        semaphore.release();  // Make connection available
    }

    // Try to get connection without blocking
    public Connection tryGetConnection() {
        if (semaphore.tryAcquire()) {
            return getAvailableConnection();
        }
        return null;  // No connection available
    }

    // Try with timeout
    public Connection tryGetConnection(long timeout, TimeUnit unit)
            throws InterruptedException {
        if (semaphore.tryAcquire(timeout, unit)) {
            return getAvailableConnection();
        }
        return null;
    }
}

Fair vs Unfair Semaphore

// Unfair (default) - better performance
Semaphore unfair = new Semaphore(5);

// Fair - FIFO ordering (prevents starvation)
Semaphore fair = new Semaphore(5, true);

Key Methods

Semaphore semaphore = new Semaphore(5);

// Acquire one permit
semaphore.acquire();

// Acquire N permits
semaphore.acquire(3);

// Release permit
semaphore.release();

// Try to acquire without blocking
boolean acquired = semaphore.tryAcquire();

// Try with timeout
acquired = semaphore.tryAcquire(10, TimeUnit.SECONDS);

// Get available permits
int available = semaphore.availablePermits();

// Check if fair
boolean fair = semaphore.isFair();

Back to top


Phaser

Purpose: Flexible multi-phase synchronization (Java 7+). Combines features of CountDownLatch and CyclicBarrier with dynamic thread registration.

Key Characteristics:

Basic Usage

import java.util.concurrent.Phaser;

public class PhaserDemo {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(1);  // Register main thread

        for (int i = 0; i < 3; i++) {
            int threadId = i;
            phaser.register();  // Register each worker
            new Thread(() -> {
                // Phase 0
                System.out.println("Thread " + threadId + " - Phase 0");
                phaser.arriveAndAwaitAdvance();  // Wait for all

                // Phase 1
                System.out.println("Thread " + threadId + " - Phase 1");
                phaser.arriveAndAwaitAdvance();

                // Phase 2
                System.out.println("Thread " + threadId + " - Phase 2");
                phaser.arriveAndDeregister();  // Deregister after completion
            }).start();
        }

        phaser.arriveAndDeregister();  // Main thread deregisters
    }
}

Use Case: Multi-Phase Task Execution

public class MultiPhaseTask {
    public static void main(String[] args) {
        Phaser phaser = new Phaser() {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("=== Phase " + phase + " completed ===");
                return registeredParties == 0;  // Terminate if no parties left
            }
        };

        int numWorkers = 3;
        for (int i = 0; i < numWorkers; i++) {
            int workerId = i;
            phaser.register();
            new Thread(() -> {
                // Phase 0: Download
                System.out.println("Worker " + workerId + ": Downloading");
                sleep(1000);
                phaser.arriveAndAwaitAdvance();

                // Phase 1: Process
                System.out.println("Worker " + workerId + ": Processing");
                sleep(1000);
                phaser.arriveAndAwaitAdvance();

                // Phase 2: Upload
                System.out.println("Worker " + workerId + ": Uploading");
                sleep(1000);
                phaser.arriveAndDeregister();
            }).start();
        }
    }

    static void sleep(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException e) {}
    }
}

Key Methods

Phaser phaser = new Phaser(3);  // 3 registered parties

// Register a new party
phaser.register();

// Arrive and wait for others
phaser.arriveAndAwaitAdvance();

// Arrive without waiting
phaser.arrive();

// Arrive and deregister
phaser.arriveAndDeregister();

// Get current phase number
int phase = phaser.getPhase();

// Get registered parties
int parties = phaser.getRegisteredParties();

// Get arrived parties in current phase
int arrived = phaser.getArrivedParties();

// Check if terminated
boolean terminated = phaser.isTerminated();

Back to top


Exchanger

Purpose: Two threads exchange data at a synchronization point.

Basic Usage

import java.util.concurrent.Exchanger;

public class ExchangerDemo {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        // Producer thread
        new Thread(() -> {
            try {
                String data = "Data from producer";
                System.out.println("Producer sending: " + data);
                String received = exchanger.exchange(data);
                System.out.println("Producer received: " + received);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // Consumer thread
        new Thread(() -> {
            try {
                Thread.sleep(1000);
                String data = "Data from consumer";
                System.out.println("Consumer sending: " + data);
                String received = exchanger.exchange(data);
                System.out.println("Consumer received: " + received);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

Output:

Producer sending: Data from producer
Consumer sending: Data from consumer
Producer received: Data from consumer
Consumer received: Data from producer

Use Case: Buffer Swapping

public class BufferSwapping {
    private final Exchanger<List<Integer>> exchanger = new Exchanger<>();

    // Producer thread
    public void producer() {
        List<Integer> buffer = new ArrayList<>();
        try {
            while (true) {
                // Fill buffer
                for (int i = 0; i < 100; i++) {
                    buffer.add(generateData());
                }
                // Exchange full buffer for empty buffer
                buffer = exchanger.exchange(buffer);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    // Consumer thread
    public void consumer() {
        List<Integer> buffer = new ArrayList<>();
        try {
            while (true) {
                // Exchange empty buffer for full buffer
                buffer = exchanger.exchange(buffer);
                // Process buffer
                processData(buffer);
                buffer.clear();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

Back to top


Comparison Summary

Utility Purpose Reusable Parties Direction
CountDownLatch Wait for N operations ❌ No Fixed One-way (count down)
CyclicBarrier N threads wait for each other ✅ Yes Fixed Mutual wait
Semaphore Limit concurrent access ✅ Yes Dynamic Resource control
Phaser Multi-phase synchronization ✅ Yes Dynamic Multi-phase
Exchanger Exchange data between 2 threads ✅ Yes Fixed (2) Data exchange

When to use:

Back to top


Best Practices

1. Always Handle InterruptedException

try {
    latch.await();
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();  // Restore interrupt status
    // Handle interruption appropriately
}

2. Use try-finally with Semaphore

semaphore.acquire();
try {
    // Use resource
} finally {
    semaphore.release();  // Always release
}

3. Set Timeouts to Avoid Indefinite Waits

if (!latch.await(10, TimeUnit.SECONDS)) {
    throw new TimeoutException("Latch timeout");
}

4. Consider Fairness for Semaphores

// Fair semaphore prevents starvation
Semaphore semaphore = new Semaphore(permits, true);

Back to top


Common Pitfalls

❌ 1. Forgetting to countDown()

latch.countDown();  // If forgotten, await() waits forever!

❌ 2. Not Releasing Semaphore Permit

semaphore.acquire();
// If exception occurs and no finally block, permit lost!

❌ 3. Wrong Party Count

// Creating barrier for 3 parties but only 2 threads call await()
CyclicBarrier barrier = new CyclicBarrier(3);  // Deadlock!

❌ 4. Reusing CountDownLatch

CountDownLatch latch = new CountDownLatch(5);
// Cannot be reset! Use CyclicBarrier or Phaser instead

Back to top


Ref.

Official Documentation:

Books:

Guides:

Related Topics:


Get Started | Java Concurrency | Java 8