Understanding AQS Mechanism
AbstractQueuedSynchronizer (AQS) is a foundational class used in synchronization utilities like ReentrantLock, Semaphore, and CountDownLatch. It serves as an abstract template that provides a set of rules and tools to implement custom locks by overriding its abstract methods.
(I). Internal Structure of AQS Template
- Shared variable for resource availability (state)
- Queue for waiting threads (Queue(Node))
- Blocking condition variables (Condition)
- Queue for condition wait (Queue(Node))
1.1 AQS Execution Flow
Take ReentrantLock (exclusive lock) as an example:
ReentrantLock lock = new ReentrantLock();
Condition c1 = lock.newCondition();
Condition c2 = lock.newCondition();
lock.lock();
try {
//...
} finally {
lock.unlock();
}
- Multiple threads entering the lock method attempt to modify the resource flag (state) using CAS. If successful, set to true; otherwise, enter the waiting queue.
- The thread that acquires the lock calls the await method on Condition, releasing the resource and entering the condition wait queue.
- When the condition is signaled, it reverts to step 1.
1.2 Internal Properties of AQS Template
1.2.1 state
Indicates the availability of resources, with multiple threads attempting to acquire resources via CAS operations.
1.2.2 head/tail
- head (Node): First element in the queue.
- tail (Node): Last element in the queue.
1.2.2 Node Object Property Description
Thread Access Mode
- SHARED: Shared access, allowing multiple threads to execute, e.g., Semaphore/CountDownLatch.
- EXCLUSIVE: Exclusive access, only one thread can execute, e.g., ReentrantLock.
Current Thread Status
- waitStatus: 0 indicates initialization, the node is in the queue and waiting to acquire the lock.
- CANCELLED: 1 indicates the thread has been canceled.
- SIGNAL: -1 indicates the next node's thread is blocked and needs to be unparked.
- CONDITION: -2 indicates the node is in the condition wait queue.
- PROPAGATE: -3 indicates subsequent shared acquire operations can proceed.
1.2.3 Main APIs
- isHeldExclusively(): Indicates whether the current thread holds the exclusive resource. Only needed when using conditions.
- tryAcquire(int): Acquires the resource exclusively. Returns true if successful, false otherwise.
- tryRelease(int): Releases the resource exclusively. Returns true if successful, falce otherwise.
- tryAcquireShared(int): Acquires the resource in shared mode. Returns negative for failure, 0 for success without remaining resources, and positive for success with remaining resources.
- tryReleaseShared(int): Releases the resource in shared mode. Returns true if it allows waking up subsequent nodes, false otherwise.
1.2.4 Synchronizasion Wait Queue
- A bidirectional linked list (Node property) data structure.
- Threads that fail to acquire shared resources are added to this queue.
- Nodes from the condition wait queue are either awakened or added to this queue.
1.2.5 Condition Wait Queue
The condition queue is implemented as a single-linked list using nextWaiter:
- Calling await() blocks the thread.
Why is there a condition wait queue?
Threads must be awakened after meeting certain conditions before they can join the 'synchronization wait queue' to acquire resources.
1.3 How to Implement a Lock
Consider the following questions:
-
What happens when a thread fails to acquire the resource and is parked? How does it resume and compete for the resource again?
-
If the lock is reentered, the state will be greater than 1. How is it reduced back to 1 during release?
-
For reentrant locks, the caller either recursively calls or invokes other locked methods. Each recursive call releases the lock, eventually reducing the state back to 1.
Why is the wait queue and condition wait queue not a queue but a bidirectional linked list?
If a thread is placed in the linked list, why is it parked and then unparked?
- Parking and placing the thread in the linked list are two separate operations. It’s similar to turning off a phone to save battery and then turning it back on latter.
How to design fair and non-fair locks, and how to implement reentrant locks?
- Refer to part two, 'Source Code Analysis'.
(II). Source Code Analysis
2.1 Fair vs Non-Fair Lock Implementation
2.1.1 Non-Fair Lock Implementation
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
2.1.2 Fair Lock Implementation
final void lock() {
acquire(1);
}
2.2 Reentrant Lock Implementation
protected final boolean tryAcquire(int acquires) {
setExclusiveOwnerThread(current);
if (current == getExclusiveOwnerThread()) {
return true;
}
return false;
}
2.3 Enqueue and Dequeue Implementation
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2.3.1 Enqueue Implementation
private transient volatile Node head; // Head of the queue
private transient volatile Node tail; // Tail of the queue
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
2.3.2 Dequeue Implementation
Attempts to acquire the resource until it succeeds, returning true indicating the thread was interrupted.
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}