Advanced Java

Concurrency

Module 6

Modern Concurrent Programming in Java

Why Concurrency?

Benefits:


Challenges:

Creating Threads

Method 1: Extending Thread class

public class MyThread extends Thread {
    public void run() {
        System.out.println("Thread running: " + getName());
    }
}

MyThread thread = new MyThread();
thread.start();

Method 2: Implementing Runnable (Preferred)

public class MyTask implements Runnable {
    public void run() {
        System.out.println("Task running");
    }
}

Thread thread = new Thread(new MyTask());
thread.start();

Method 3: Lambda (Modern)

Thread thread = new Thread(() -> {
    System.out.println("Lambda task running");
});
thread.start();

Thread Lifecycle

Thread Methods

Thread thread = new Thread(() -> {
    System.out.println("Working...");
    try {
        Thread.sleep(1000); // Sleep for 1 second
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
});

thread.start();           // Start the thread
thread.join();            // Wait for thread to complete
thread.interrupt();       // Interrupt the thread
thread.isAlive();         // Check if thread is running
thread.getName();         // Get thread name
thread.setName("Worker"); // Set thread name
Thread.currentThread();   // Get current thread

The Problem: Race Conditions

public class Counter {
    private int count = 0;
    
    public void increment() {
        count++; // NOT thread-safe! (read-modify-write)
    }
    
    public int getCount() {
        return count;
    }
}

// Multiple threads incrementing
Counter counter = new Counter();
Thread t1 = new Thread(() -> {
    for (int i = 0; i < 1000; i++) counter.increment();
});
Thread t2 = new Thread(() -> {
    for (int i = 0; i < 1000; i++) counter.increment();
});
t1.start();
t2.start();
t1.join();
t2.join();

System.out.println(counter.getCount()); // Not 2000! Race condition!

Solution 1: Synchronized

public class Counter {
    private int count = 0;
    
    public synchronized void increment() {
        count++;
    }
    
    public synchronized int getCount() {
        return count;
    }
}

// Or synchronized block
public class Counter {
    private int count = 0;
    private final Object lock = new Object();
    
    public void increment() {
        synchronized(lock) {
            count++;
        }
    }
}

Note: synchronized has performance overhead

Solution 2: Atomic Classes

import java.util.concurrent.atomic.AtomicInteger;

public class Counter {
    private AtomicInteger count = new AtomicInteger(0);
    
    public void increment() {
        count.incrementAndGet();
    }
    
    public int getCount() {
        return count.get();
    }
}

// Other atomic classes
AtomicLong atomicLong = new AtomicLong();
AtomicBoolean atomicBoolean = new AtomicBoolean();
AtomicReference atomicRef = new AtomicReference<>();

Atomic operations are lock-free and faster than synchronized!

Executor Framework

Better alternative to manually creating threads:

import java.util.concurrent.*;

// Create thread pool with 5 threads
ExecutorService executor = Executors.newFixedThreadPool(5);

// Submit tasks
executor.submit(() -> {
    System.out.println("Task 1 executing");
});

executor.submit(() -> {
    System.out.println("Task 2 executing");
});

// Shutdown executor
executor.shutdown();

// Wait for all tasks to complete
executor.awaitTermination(1, TimeUnit.MINUTES);

Types of Executors

// Fixed thread pool
ExecutorService fixed = Executors.newFixedThreadPool(10);

// Cached thread pool (creates threads as needed)
ExecutorService cached = Executors.newCachedThreadPool();

// Single thread executor
ExecutorService single = Executors.newSingleThreadExecutor();

// Scheduled executor (for delayed/periodic tasks)
ScheduledExecutorService scheduled = 
    Executors.newScheduledThreadPool(5);

scheduled.schedule(() -> {
    System.out.println("Execute after 5 seconds");
}, 5, TimeUnit.SECONDS);

scheduled.scheduleAtFixedRate(() -> {
    System.out.println("Execute every 10 seconds");
}, 0, 10, TimeUnit.SECONDS);

Callable and Future

Callable is like Runnable but can return a value:

import java.util.concurrent.*;

Callable task = () -> {
    Thread.sleep(2000);
    return 42;
};

ExecutorService executor = Executors.newSingleThreadExecutor();
Future future = executor.submit(task);

System.out.println("Task submitted");

// Do other work while task executes

// Get result (blocks if not ready)
try {
    Integer result = future.get(); // Blocks until complete
    System.out.println("Result: " + result);
    
    // Or with timeout
    Integer result2 = future.get(3, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    e.printStackTrace();
}

executor.shutdown();

CompletableFuture

Modern async programming in Java:

CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    // Runs in separate thread
    return "Hello";
});

future.thenApply(s -> s + " World")
      .thenAccept(System.out::println)
      .join();

// Combine multiple futures
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture combined = future1.thenCombine(future2, 
    (s1, s2) -> s1 + " " + s2);

System.out.println(combined.join()); // Hello World

CompletableFuture - Advanced

// Chain async operations
CompletableFuture.supplyAsync(() -> getUserId())
    .thenApplyAsync(id -> fetchUser(id))
    .thenApplyAsync(user -> enrichUserData(user))
    .thenAccept(user -> saveUser(user))
    .exceptionally(ex -> {
        log.error("Error: " + ex.getMessage());
        return null;
    });

// Run multiple tasks and wait for all
CompletableFuture f1 = CompletableFuture.supplyAsync(() -> "Task1");
CompletableFuture f2 = CompletableFuture.supplyAsync(() -> "Task2");
CompletableFuture f3 = CompletableFuture.supplyAsync(() -> "Task3");

CompletableFuture allOf = CompletableFuture.allOf(f1, f2, f3);
allOf.join();

// Or wait for any to complete
CompletableFuture anyOf = CompletableFuture.anyOf(f1, f2, f3);
System.out.println("First result: " + anyOf.join());


Concurrent Collections

Thread-safe collections from java.util.concurrent:

// Thread-safe List
CopyOnWriteArrayList list = new CopyOnWriteArrayList<>();

// Thread-safe Set
ConcurrentSkipListSet set = new ConcurrentSkipListSet<>();

// Thread-safe Map
ConcurrentHashMap map = new ConcurrentHashMap<>();
map.put("john", new User("John"));
map.putIfAbsent("jane", new User("Jane"));
map.computeIfAbsent("bob", k -> new User("Bob"));

// Thread-safe Queue
ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>();
queue.offer(new Task());
Task task = queue.poll();

// Blocking Queue (producer-consumer)
BlockingQueue blockingQueue = new LinkedBlockingQueue<>();
blockingQueue.put("item"); // Blocks if queue is full
String item = blockingQueue.take(); // Blocks if queue is empty

Producer-Consumer Pattern

BlockingQueue queue = new LinkedBlockingQueue<>(10);

// Producer thread
Thread producer = new Thread(() -> {
    try {
        for (int i = 0; i < 100; i++) {
            queue.put(i);
            System.out.println("Produced: " + i);
            Thread.sleep(100);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
});

// Consumer thread
Thread consumer = new Thread(() -> {
    try {
        while (true) {
            Integer item = queue.take();
            System.out.println("Consumed: " + item);
            Thread.sleep(200);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
});

producer.start();
consumer.start();

CountDownLatch

Wait for multiple threads to complete:

int workerCount = 3;
CountDownLatch latch = new CountDownLatch(workerCount);

for (int i = 0; i < workerCount; i++) {
    new Thread(() -> {
        try {
            System.out.println("Worker starting");
            Thread.sleep(2000);
            System.out.println("Worker completed");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            latch.countDown(); // Decrement count
        }
    }).start();
}

// Main thread waits for all workers
latch.await();
System.out.println("All workers completed");

CyclicBarrier

Synchronization point for multiple threads:

int parties = 3;
CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
    System.out.println("All threads reached barrier!");
});

for (int i = 0; i < parties; i++) {
    new Thread(() -> {
        try {
            System.out.println(Thread.currentThread().getName() + " working");
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + " waiting at barrier");
            barrier.await(); // Wait for others
            System.out.println(Thread.currentThread().getName() + " passed barrier");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }).start();
}

ReadWriteLock

Optimize for read-heavy scenarios:

public class Cache {
    private final Map data = new HashMap<>();
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    
    public String get(String key) {
        lock.readLock().lock(); // Multiple readers can access
        try {
            return data.get(key);
        } finally {
            lock.readLock().unlock();
        }
    }
    
    public void put(String key, String value) {
        lock.writeLock().lock(); // Only one writer
        try {
            data.put(key, value);
        } finally {
            lock.writeLock().unlock();
        }
    }
}

Virtual Threads (Java 21+)

Lightweight threads for massive concurrency:

// Create virtual thread
Thread vThread = Thread.startVirtualThread(() -> {
    System.out.println("Virtual thread running");
});

// Or use executor
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
    for (int i = 0; i < 10_000; i++) {
        executor.submit(() -> {
            // Can create millions of these!
            Thread.sleep(Duration.ofSeconds(1));
            return "Result";
        });
    }
}

// Virtual threads are perfect for I/O-bound tasks
CompletableFuture.supplyAsync(() -> {
    // Database call
    return fetchFromDatabase();
}, Executors.newVirtualThreadPerTaskExecutor());

Virtual threads use far less memory than platform threads!

Thread Safety Best Practices

  • Prefer immutability: Immutable objects are always thread-safe
  • Use concurrent collections: Instead of synchronizing regular collections
  • Minimize lock scope: Hold locks for minimal time
  • Avoid nested locks: Can cause deadlocks
  • Use high-level concurrency utilities: ExecutorService, CompletableFuture
  • Document thread safety: Clearly state if class is thread-safe
  • Test concurrency issues: Use tools like JMH for benchmarking

Common Pitfalls

1. Forgetting to call start()

Thread t = new Thread(() -> System.out.println("Hello"));
t.run(); // WRONG! Runs in current thread
t.start(); // CORRECT! Runs in new thread

2. Not handling InterruptedException

try {
    Thread.sleep(1000);
} catch (InterruptedException e) {
    Thread.currentThread().interrupt(); // IMPORTANT! Restore interrupt status
    // Handle interruption
}

3. Shared mutable state without synchronization

private int counter = 0; // NOT thread-safe
private final AtomicInteger counter = new AtomicInteger(); // Thread-safe

Exercise 1: Multi-threaded File Processor

Task: Process multiple files concurrently


Requirements:

  1. Create ExecutorService with fixed thread pool
  2. Submit tasks to read and process files
  3. Use CompletableFuture for async processing
  4. Aggregate results from all files
  5. Handle errors gracefully
  6. Measure performance improvement vs single-threaded

Exercise 2: Thread-Safe Cache

Task: Implement a thread-safe cache with expiration


Requirements:

  1. Use ConcurrentHashMap for storage
  2. Implement get() and put() methods
  3. Add automatic expiration after 5 minutes
  4. Use ScheduledExecutorService for cleanup
  5. Write multi-threaded tests
  6. Measure cache hit rate

Exercise 3: Producer-Consumer System

Task: Build a job processing system


Requirements:

  1. Use BlockingQueue for job queue
  2. Create multiple producer threads generating jobs
  3. Create multiple consumer threads processing jobs
  4. Implement graceful shutdown
  5. Add monitoring (jobs processed, queue size)
  6. Handle backpressure when queue is full

Summary

In this module, you learned:

  • ✓ Thread creation and lifecycle
  • ✓ Synchronization and thread safety
  • ✓ Executor framework for thread pools
  • ✓ CompletableFuture for async programming
  • ✓ Concurrent collections
  • ✓ Synchronization utilities (CountDownLatch, CyclicBarrier)
  • ✓ Virtual Threads for massive concurrency
  • ✓ Best practices and common pitfalls

Next Module: JVM Performance & Optimization

Resources

Slide Overview