Lecture from 29.04.2025 | Video: Video ETHZ
Last Week Recap
Let’s briefly review the key topics covered previously:
- Locks & Locks with Atomics: We explored how locks provide mutual exclusion. We saw that implementing locks using only atomic reads/writes (like Peterson’s, Filter, Bakery locks) often requires memory proportional to the number of threads (O(N)). In contrast, hardware atomic read-modify-write (RMW) operations (TAS, CAS, LL/SC) enable constant memory (O(1)) lock implementations like spinlocks (TASLock, TATAS, TATAS with exponential backoff).
- (Dead)locks Introduction: We formally defined deadlocks as cycles in the resource allocation graph and discussed fundamental avoidance strategies: global/ordered locking and non-overlapping critical sections.
- Rendezvous & Barriers: We looked at these large-scale synchronization patterns and highlighted some common pitfalls when implementing them naively, especially with semaphores.
Learning Goals Today
Building on our previous discussions, today we aim to:
- Understand and implement solutions for the Producer/Consumer problem using various synchronization techniques.
- Introduce Monitors, a higher-level synchronization construct that bundles locking with condition management (
wait
/signal
). - Briefly touch upon Reader/Writer locks as another specialized synchronization mechanism.
- Recognize common patterns and pitfalls in concurrent programming.
Producer Consumer Pattern
A fundamental pattern in concurrent and parallel programming is the Producer-Consumer pattern.
- Scenario: One or more “producer” threads generate data (items, tasks, etc.), and one or more “consumer” threads process that data.
- Communication: They need a shared buffer or queue to pass data from producers to consumers.
- Simple Case (T0 → T1): If one thread T0 produces data X and passes it directly to thread T1, and only T1 uses X after it’s passed, direct synchronization on X itself might not be needed (as access is sequentialized). However, a synchronized mechanism is still required to safely pass X from T0 to T1 (e.g., putting it in a shared queue).
This pattern is foundational for building data-flow programs and pipelines, where the output of one stage (thread/process) becomes the input for the next. A pipeline node typically dequeues an item from an input queue, processes it, and enqueues the result to an output queue.
// Conceptual Pipeline Node Logic
while (true) {
input = q_in.dequeue();
output = do_something(input);
q_out.enqueue(output);
}
The communication channel between producers and consumers is often implemented as a shared queue, acting as a buffer. Producers enqueue
items, and consumers dequeue
items.
The pattern can involve multiple producers adding to the same queue and multiple consumers removing from it. This necessitates that the queue implementation itself is thread-safe.
Bounded Buffer Implementation
A common implementation uses a bounded buffer (a queue with a fixed size limit), often realized as a circular buffer (an array with wrap-around semantics).
- An array
buffer
stores the items. - An
in
index points to the next slot for enqueueing. - An
out
index points to the next slot for dequeueing. - Indices wrap around using the modulo operator (
% size
).
This slide illustrates the movement of in
and out
pointers and the count
for enqueue and dequeue operations in a circular buffer.
Let’s start building a concurrent queue implementation in Java.
class Queue {
private int in; // index for next enqueue
private int out; // index for next dequeue
private int size; // queue capacity
private long[] buffer; // array to hold items
Queue(int size) {
this.size = size;
in = out = 0;
buffer = new long[size];
}
// Helper to calculate next index with wrap-around
private int next(int i) {
return (i + 1) % size;
}
// Basic (non-thread-safe) enqueue
public void doEnqueue(long item) {
buffer[in] = item;
in = next(in);
}
// Basic (non-thread-safe) dequeue
public long doDequeue() {
long item = buffer[out];
out = next(out);
return item;
}
// -- Attempt at thread-safe versions --
public synchronized void enqueue(long item) {
// What if queue is full?
doEnqueue(item);
}
public synchronized long dequeue() {
// What if queue is empty?
long item = doDequeue();
return item;
}
}
The synchronized
keyword ensures mutual exclusion (only one thread can be executing enqueue
or dequeue
at a time for a given Queue
instance), preventing low-level data races on in
, out
, and the buffer
.
Problem: What happens if a producer calls enqueue
when the buffer is full, or a consumer calls dequeue
when the buffer is empty?
We need helper functions to check the buffer state:
// Inside Queue class...
// Checks if the buffer is full
// (in == out initially means empty; after wrapping, (in+1)%size == out means full)
public boolean isFull() {
return next(in) == out; // Leave one slot empty to distinguish full from empty
}
// Checks if the buffer is empty
public boolean isEmpty() {
return in == out;
}
(Note: Leaving one slot unused is a common way to differentiate between a full and empty circular buffer when only using in
and out
indices. Alternatively, a separate count
variable could be used, requiring synchronization itself.)
Now, let’s try to handle the full/empty conditions within the synchronized methods:
// Inside Queue class...
public synchronized void enqueue(long item) {
while (isFull()) {
; // Spin-wait (BAD!) - holds the lock!
}
doEnqueue(item);
}
public synchronized long dequeue() {
while (isEmpty()) {
; // Spin-wait (BAD!) - holds the lock!
}
return doDequeue();
}
Major Problem: This introduces potential deadlock and definitely poor performance. If enqueue
finds the queue is full, it spins while holding the lock. No consumer can acquire the lock to call dequeue
and make space. Similarly, if dequeue
finds the queue empty, it spins while holding the lock, preventing any producer from acquiring the lock to enqueue
an item. The threads block forever inside an infinite loop while holding the resource needed by other threads to make progress.
Attempt with sleep()
Maybe we can release the lock and sleep for a while if the condition isn’t met?
// Inside Queue class... (Illustrative, enqueue only)
public void enqueue(long item) throws InterruptedException {
while (true) {
synchronized(this) { // Acquire lock
if (!isFull()) {
doEnqueue(item);
return; // Success!
}
// If full, release lock by exiting synchronized block
}
// Lock is released, sleep for a bit before retrying
Thread.sleep(timeout); // Sleep *without* holding the lock!
}
}
Problem: This avoids holding the lock while waiting, but:
- Busy Waiting / Polling: Threads repeatedly wake up, re-acquire the lock, check the condition, and potentially go back to sleep. This is inefficient.
- Choosing
timeout
: What’s the right sleep duration? Too short, and it’s just inefficient polling. Too long, and threads react slowly to state changes. - No Direct Notification: Ideally, a waiting thread should be notified exactly when the condition it’s waiting for (e.g., queue not full) becomes true.
sleep
doesn’t provide this.
Producer/Consumer with Semaphores
Semaphores seem suitable here because they have a built-in counter and waiting mechanism.
Idea: Use three semaphores:
nonEmpty
: Counts the number of items currently in the buffer. Consumers wait on this. Initialized to 0.nonFull
: Counts the number of empty slots available in the buffer. Producers wait on this. Initialized tosize
.manipulation
(ormutex
): A binary semaphore (initialized to 1) to ensure mutual exclusion when actually modifying the buffer indices (in
,out
) and data.
import java.util.concurrent.Semaphore;
class Queue {
int in, out, size;
long buf[];
Semaphore nonEmpty, nonFull, manipulation;
Queue(int s) {
size = s;
buf = new long[size];
in = out = 0;
nonEmpty = new Semaphore(0); // Initially 0 items
nonFull = new Semaphore(size); // Initially 'size' empty slots
manipulation = new Semaphore(1); // Mutex for buffer access
}
// ... enqueue/dequeue methods ...
}
Implementation Attempt 1:
// Inside Queue class...
void enqueue(long x) {
try {
manipulation.acquire(); // Lock buffer access
nonFull.acquire(); // Wait for an empty slot & decrement slot count
buf[in] = x;
in = (in + 1) % size;
} catch (InterruptedException ex) {}
finally {
manipulation.release(); // Unlock buffer access
nonEmpty.release(); // Increment item count / signal consumers
}
}
long dequeue() {
long x = 0;
try {
manipulation.acquire(); // Lock buffer access
nonEmpty.acquire(); // Wait for an item & decrement item count
x = buf[out];
out = (out + 1) % size;
} catch (InterruptedException ex) {}
finally {
manipulation.release(); // Unlock buffer access
nonFull.release(); // Increment empty slot count / signal producers
}
return x;
}
Problem: Potential Deadlock!
Consider this scenario:
- Buffer is full.
nonFull
count is 0,nonEmpty
issize
. - Producer: Calls
enqueue
. Acquiresmanipulation
lock. CallsnonFull.acquire()
and blocks (since count is 0). It holds themanipulation
lock while blocked. - Consumer: Calls
dequeue
. Tries to acquiremanipulation
lock, but the Producer holds it. Consumer blocks. - The producer is waiting for
nonFull
(needs consumer torelease
it), and the consumer is waiting formanipulation
(held by producer). Deadlock!
The cycle is: Producer holds manipulation
, waits for nonFull
. Consumer needs manipulation
to execute and eventually release(nonFull)
.
Corrected Semaphore Implementation: Change the order of acquisition. Acquire the “condition” semaphore (nonFull
or nonEmpty
) before acquiring the mutex (manipulation
).
// Inside Queue class...
void enqueue(long x) {
try {
nonFull.acquire(); // Wait for an empty slot FIRST
manipulation.acquire(); // THEN lock buffer access
buf[in] = x;
in = (in + 1) % size;
} catch (InterruptedException ex) {} // Basic exception handling
finally {
manipulation.release(); // Unlock buffer access
nonEmpty.release(); // Signal consumers AFTER modification
}
}
long dequeue() {
long x = 0;
try {
nonEmpty.acquire(); // Wait for an item FIRST
manipulation.acquire(); // THEN lock buffer access
x = buf[out];
out = (out + 1) % size;
} catch (InterruptedException ex) {} // Basic exception handling
finally {
manipulation.release(); // Unlock buffer access
nonFull.release(); // Signal producers AFTER modification
}
return x;
}
This version avoids the deadlock because a thread only acquires the mutex after it knows it can proceed (i.e., after its condition semaphore acquire succeeds). (Note: Interrupt handling might still need refinement for robust production code.)
Why Are Semaphores (and Locks) Problematic?
While semaphores work, they are considered somewhat “unstructured”:
- Discipline Required: Correct use depends heavily on the programmer correctly pairing
acquire
andrelease
calls, acquiring locks/semaphores in the right order, and using the correct initial values. Errors are easy to make. - Deadlock Prone: As seen, incorrect ordering easily leads to deadlocks.
- Condition Waiting: Basic locks/semaphores don’t cleanly integrate waiting for an arbitrary condition (like
isFull() == false
) with mutual exclusion. We either spin (bad) or use complex semaphore arrangements.
What we need: A construct that combines mutual exclusion (like a lock) with the ability to efficiently wait for a specific condition while temporarily releasing the lock, and be notified when that condition might have become true.
Monitors
Monitors, invented by Tony Hoare and Per Brinch Hansen, provide this higher-level abstraction.
- Definition: A monitor is an abstract data structure (like a class or module) where:
- Data is encapsulated.
- Operations (methods) on that data execute with implicit mutual exclusion – only one thread can be executing any method of the monitor instance at a time.
- It includes mechanisms for threads to wait for conditions and signal other threads when conditions change.
Conceptual Difference: Instead of scattering lock
/unlock
(or acquire
/release
) calls throughout the code (left side), the monitor encapsulates the shared data and code, implicitly managing the locking (right side).
Producer/Consumer with Monitors
How would our queue look conceptually with a monitor?
// Monitor-based Queue (Conceptual)
public monitor class BoundedQueue {
// internal state (buffer, in, out, etc.)
public void enqueue(long item) { // Implicit mutual exclusion
while (isFull()) {
wait_until_not_full(); // Wait if condition not met
}
doEnqueue(item);
signal_that_not_empty(); // Notify waiting consumers
}
public long dequeue() { // Implicit mutual exclusion
while (isEmpty()) {
wait_until_not_empty(); // Wait if condition not met
}
long item = doDequeue();
signal_that_not_full(); // Notify waiting producers
return item;
}
}
The monitor handles the locking. We just need primitives to wait
for conditions and signal
when conditions change.
Monitor Semantics for Condition Handling:
When a thread inside a monitor operation finds a condition doesn’t hold (e.g., isFull()
is true):
- It needs to atomically release the monitor lock.
- It needs to wait until the condition potentially becomes true.
- Another thread, upon changing the state (e.g., making the queue not full), needs to signal waiting threads.
- A waiting thread, upon being signaled, needs to re-acquire the monitor lock before resuming execution.
In-Depth Explanation of Synchronization Primitives
To understand the necessity of these primitives at a hardware level, we must first consider the fundamental problem they solve: concurrent access to shared memory locations by multiple threads or processes. Without proper synchronization, this can lead to race conditions, where the outcome of the program depends on the unpredictable order of execution of these concurrent entities, resulting in data corruption and incorrect behavior.
The Core Problem: Uncontrolled Concurrent Access
At the hardware level, multiple CPU cores (or even the same core rapidly switching between threads) can simultaneously attempt to read and write to the same memory address. Basic load and store instructions are not atomic with respect to other cores or threads. For example, a seemingly simple increment operation (
count++
) typically involves multiple machine instructions (read, increment in a register, write back). If two threads execute this concurrently, the read and write operations can interleave, leading to lost updates (e.g., both threads read the same value, increment it, and write it back, resulting in only one increment instead of two).Why Simple Approaches Fail
Naive attempts to avoid this, like simply hoping for the best or relying on scheduling order, are inherently unreliable and non-deterministic. We need mechanisms that enforce some form of order or exclusivity when accessing shared resources.
Hardware Support for Atomicity: The Foundation
Modern CPUs provide atomic instructions that are crucial for implementing synchronization primitives. These instructions guarantee that a sequence of operations on a memory location will be performed indivisibly, without any other processor or thread being able to interfere. Common atomic instructions include:
- Test-and-Set (TAS): Atomically checks if a memory location is zero and, if so, sets it to one, returning the original value.
- Compare-and-Swap (CAS): Atomically compares the value of a memory location with an expected value. If they match, it writes a new value to the location.
- Fetch-and-Add: Atomically increments (or decrements) the value of a memory location and returns the original value.
These atomic instructions are the building blocks upon which higher-level synchronization primitives are constructed. They typically rely on hardware mechanisms like cache coherence protocols and bus locking to ensure atomicity across multiple cores.
1. Lock (Mutex): Ensuring Mutual Exclusion
Why Needed: The primary purpose of a lock (or mutex, for mutual exclusion) is to protect a critical section of code that accesses shared resources. It ensures that only one thread can be executing within this section at any given time, preventing race conditions.
Hardware Level: Locks are often implemented using the atomic instructions mentioned above. For instance, a simple spinlock can be implemented using TAS. A thread trying to acquire the lock repeatedly executes TAS on a shared memory location. If TAS returns zero (meaning the lock was free), the thread has acquired the lock (the location is now one). To release the lock, the thread simply sets the memory location back to zero.
Spinlocks vs. Blocking Locks: While simple, spinlocks can be inefficient if the lock is held for a long time, as waiting threads will continuously consume CPU cycles. To address this, operating systems often implement blocking locks. When a thread tries to acquire a blocking lock that is already held, the OS puts the thread into a waiting queue and deschedules it. When the lock is released, the OS wakes up one of the waiting threads, which then attempts to acquire the lock. This involves OS-level context switching, which has its own overhead but can be more efficient for longer wait times.
2. Semaphore: Controlling Access to a Limited Number of Resources
Why Needed: Semaphores generalize the concept of a lock. Instead of just allowing one thread, a semaphore allows a specified number of threads to access a resource concurrently. This is useful for managing pools of resources (e.g., database connections, network sockets) or for signaling between threads.
Hardware Level: Semaphores typically maintain a counter representing the number of available resources or permits. The
acquire
operation decrements this counter, and therelease
operation increments it. These operations need to be atomic. Atomic instructions like Fetch-and-Decrement and Fetch-and-Increment can be used. When a thread tries to acquire a semaphore and the counter is zero, it needs to wait. Similar to blocking locks, this waiting is often managed by the operating system, which puts the waiting thread into a queue associated with the semaphore and wakes it up when a permit becomes available.3. Barrier: Synchronizing the Progress of Multiple Threads
Why Needed: Barriers are essential in parallel algorithms where multiple threads are working on different parts of a problem. A barrier ensures that all participating threads reach a certain point in their execution before any of them can proceed further. This is crucial when the next stage of computation depends on the results computed by all threads in the previous stage.
Hardware Level: Implementing a barrier typically involves a shared counter and a mechanism for threads to wait. When a thread reaches the barrier, it atomically increments the counter. The last thread to reach the barrier will notice that the counter has reached the total number of participating threads. At this point, all waiting threads need to be signaled to resume execution. This signaling can be done using various mechanisms, such as setting a flag in shared memory or using OS-level signaling primitives. Efficient barrier implementations often consider cache coherence and strive to minimize contention on the shared counter. Some architectures also provide specialized hardware instructions or features to optimize barrier synchronization.
The “Why” of Barriers: Without barriers, threads might proceed at different speeds, and some might start working on the next stage of a parallel computation before others have finished the previous stage, leading to incorrect results.
4. Monitor: High-Level Synchronization with Condition Variables
Why Needed: Monitors provide a higher-level, more structured approach to synchronization. They encapsulate shared data and the synchronization mechanisms (a lock and condition variables) needed to access it safely. This helps in writing more robust and easier-to-understand concurrent code by enforcing certain programming patterns.
Hardware Level: Monitors are typically implemented using the lower-level primitives we’ve discussed: locks and condition variables. The lock ensures mutual exclusion for accessing the monitor’s data. Condition variables provide a mechanism for threads to wait for specific conditions to become true while holding the monitor’s lock.
Condition Variables: When a thread finds that a necessary condition for it to proceed is false, it can call
wait()
on a condition variable. This operation atomically releases the monitor’s lock and puts the thread to sleep (typically managed by the OS). Another thread, when it changes the state such that the condition might now be true, can callsignal()
(to wake up one waiting thread) orbroadcast()
(to wake up all waiting threads) on the same condition variable. The awakened thread(s) then re-acquire the monitor’s lock and re-check the condition.The “Why” of Monitors: Monitors help prevent common synchronization errors, such as forgetting to release a lock or using incorrect signaling mechanisms. The encapsulation of data and synchronization logic makes concurrent programs easier to reason about and maintain. Condition variables are crucial for implementing more complex synchronization patterns where simple mutual exclusion is not sufficient (e.g., producer-consumer problem).
Conclusion: The Necessity at the Hardware Level
These synchronization primitives are essential because they address the fundamental challenges of concurrent programming at the hardware level. They rely on atomic hardware instructions to provide guarantees of indivisibility, which are crucial for preventing race conditions and ensuring data integrity in the presence of multiple concurrent threads or processes. The evolution from simple locks to more sophisticated constructs like semaphores, barriers, and monitors reflects the increasing complexity of concurrent applications and the need for more structured and efficient ways to manage shared resources and coordinate the execution of parallel tasks. The operating system plays a significant role in managing blocking and waiting threads associated with these primitives, interacting directly with the hardware’s scheduling mechanisms.
Monitors in Java (synchronized
, wait
, notify
, notifyAll
)
Java provides monitor semantics using the intrinsic lock associated with every object, combined with methods from the Object
class:
- Mutual Exclusion: The
synchronized
keyword acquires/releases the object’s intrinsic lock. - Condition Waiting/Signaling:
wait()
: Must be called while holding the lock. Atomically releases the lock and puts the current thread into a “wait set” for that object. The thread sleeps until notified. Upon waking, it must re-acquire the lock before proceeding.notify()
: Must be called while holding the lock. Wakes up one arbitrary thread waiting on this object’s wait set. The woken thread competes to re-acquire the lock.notifyAll()
: Must be called while holding the lock. Wakes up all threads waiting on this object’s wait set. They all compete to re-acquire the lock.
Producer/Consumer with Java Monitors:
class Queue {
// ... state variables, constructor, helpers ...
synchronized void enqueue(long x) {
while (isFull()) { // MUST use while, not if!
try {
wait(); // Releases lock on 'this', waits
} catch (InterruptedException e) { /* handle */ }
// Re-acquires lock upon waking
}
doEnqueue(x);
notifyAll(); // Notify any waiting consumers
}
synchronized long dequeue() {
while (isEmpty()) { // MUST use while, not if!
try {
wait(); // Releases lock on 'this', waits
} catch (InterruptedException e) { /* handle */ }
// Re-acquires lock upon waking
}
long item = doDequeue();
notifyAll(); // Notify any waiting producers
return item;
}
}
Important Questions & Refinements:
while
vsif
aroundwait()
: You must usewhile (condition) wait();
. Why? Because a thread can wake up fromwait()
for reasons other than the specific condition being true (spurious wakeups), or another thread might have changed the condition again between thenotify
and the woken thread re-acquiring the lock. Thewhile
loop ensures the condition is re-checked after waking. Usingif
can lead to errors.notify()
vsnotifyAll()
: WhynotifyAll()
here? If we usednotify()
, we might accidentally wake up another producer when the queue becomes not full (instead of a consumer), or wake a consumer when the queue becomes not empty (instead of a producer).notifyAll()
wakes everyone, and thewhile
loop lets only the threads whose condition is actually met proceed.notify()
can be used as an optimization only if you are certain that any thread woken can make progress and that only threads waiting for the specific signaled condition need to be woken.notifyAll()
is generally safer.
Java Monitor Implementation Details
- Thread States:
wait()
moves a thread from RUNNABLE to WAITING (or TIMED_WAITING ifwait(timeout)
is used).notify
/notifyAll
moves waiting threads to BLOCKED (competing for re-entry). - Monitor Queues: Conceptually, each object monitor has:
- An entry queue: Threads waiting to acquire the lock initially (via
synchronized
). - A wait set (condition queue): Threads that called
wait()
and are waiting for a notification.
- An entry queue: Threads waiting to acquire the lock initially (via
- Notification Process: When
notify
/notifyAll
is called, thread(s) are moved from the wait set to the entry queue to compete for the lock again once the notifying thread releases it.
Monitor Notification Semantics: What happens exactly when thread A calls notify
while holding the lock, waking thread B?
- Signal and Wait: Thread A gives the lock directly to B and A goes to the entry queue (waiting to reacquire).
- Signal and Continue: Thread A keeps the lock, continues executing, and B is moved to the entry queue (will compete for lock only when A releases it).
- Others: Signal and Exit, Signal and Urgent Wait…
Implementing a Semaphore using Monitors:
Java uses Signal and Continue. This is important! When thread P calls notify()
waking Q:
- P continues executing and still holds the monitor lock.
- Q moves from the wait set to the entry queue but remains blocked.
- Other threads (like R) might try to acquire the lock.
- When P exits the synchronized block (releasing the lock), Q and R (and any others in the entry queue) compete non-deterministically to acquire the lock.
- If R acquires the lock before Q, R might change the state such that the condition Q was waiting for is no longer true by the time Q eventually acquires the lock and resumes after its
wait()
.
The Cure: The while
Loop!
This “signal and continue” semantic is precisely why the condition must be re-checked in a while
loop after returning from wait()
. The state might have changed between the notification and the woken thread re-acquiring the lock.
synchronized void acquire() {
// If condition check was 'if', Q might proceed with number=0!
while (number <= 0) {
try { wait(); }
catch (InterruptedException e) { };
}
// Re-check condition ensures safety even if R snuck in.
number--;
}
Alternative: java.util.concurrent.locks.Condition
Intrinsic locks (synchronized
/wait
/notify
) have limitations:
- One implicit lock per object.
wait
/notify
work on a single, implicit condition set per object.- Tied to block structure.
The java.util.concurrent.locks
package provides more flexibility:
Lock
interface (e.g.,ReentrantLock
) provides explicitlock()
andunlock()
.- A
Lock
object can create multipleCondition
objects associated with it usinglock.newCondition()
.
Condition
Interface:
- Each
Condition
object represents a separate wait set associated with the sameLock
. await()
: Called while holding the associatedLock
. Atomically releases theLock
and waits on this specific condition. Re-acquires theLock
before returning. Must re-check application condition in awhile
loop. (await stands for atomic wait)signal()
: Called while holding the associatedLock
. Wakes up one thread waiting on this specific condition.signalAll()
: Called while holding the associatedLock
. Wakes up all threads waiting on this specific condition.
This allows creating more complex synchronization logic, like separate conditions for “not full” and “not empty” associated with the same buffer lock, potentially allowing more targeted signal()
calls instead of always using signalAll()
.
Queue (Producer/Consumer) using Monitors
Monitors now allow us to write “safe” threads much more elegantly.
Continue here: 20 Producer Consumer, Sleeping Barber, Conditional Waits, Reader Writer Locks, Lock Granularity