本文已在公众号上发布,感谢关注,期待和你交流。
AQS源码二探-JUC系列 共享模式 doAcquireShared这个方法是共享模式下获取资源失败,执行入队和等待操作,等待的线程在被唤醒后也在这个方法中自旋执行,直到拿到资源。
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
// 新节点入队,SHARED模式
final Node node = addWaiter(Node.SHARED);
// 标识是否获取失败
boolean failed = true;
try {
// 标识是否线程是否中断
boolean interrupted = false;
for (;;) {
// 获得当前节点的前节点
final Node p = node.predecessor();
// 前节点是头节点
if (p == head) {
// 尝试获取资源【1】
int r = tryAcquireShared(arg);
// 返回值大于等于0算获取成功
if (r >= 0) {
// 设置头节点【2】
setHeadAndPropagate(node, r);
// 断开前节点next引用
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 取消尝试获取锁的节点
cancelAcquire(node);
}
}
-
【1】,这个入队等待方法和独占模式下的实现逻辑基本是一致的,共享模式使用
tryAcquireShared
获取资源方法返回值使用int来表示成功或失败的。获取成功,就会进行设置head操作,把原来的head移出队列 -
【2】,独占模式的代码这里获取资源成功后是调用
setHead
,共享模式这里是调用setHeadAndPropagate
方法,setHeadAndPropagate
方法执行了setHead后还调用了doReleaseShared()
触发了尝试唤醒节点的操作。在释放资源的时候触发唤醒节点的操作很好理解,而这里是头节点后继节点获取资源成功后为什么需要触发唤醒节点呢?对这点特别展开分析一下:
- release唤醒操作都是从head往后找节点,并且只会找到一个,并没有一次找出多个去唤醒的能力
- 在独占模式时的release操作只需要判断head不为null并且head的waitStatus不是取消状态就会去唤醒后继节点,因为独占模式下只有一个线程能获取到资源,队列里等待的节点也只有head后的最近一个有效节点需要唤醒,所以这个被唤醒的节点没有必要再去判断是否需要唤醒自己的后继节点,只需要依靠自己的release就可以了
- 共享模式下多个线程同时可以获取资源成功,也就意味着同时多个线程释放资源,那么仍然是依赖head往后找有一个有效节点唤醒必然不满足要求了,假设已经有多个线程在在队列等待资源释放,此时瞬间释放了两个资源,释放资源的时候会去唤醒head后继节点,这两个释放线程拿到的head是同一个,所以这时候的唤醒就不能做到唤醒head后两个有效节点,而期望的是唤醒两个,这一点是理解独占和共享模式之间在唤醒操作上的不同处理方式的关键。
理解以上几点后,再思考下只需要补充head节点发生变化时触发唤醒后继节点就可以满足要求了,如果队列中等待的节点依次被唤醒成为head然后继续往后唤醒节点,形式上看起来像一个传播(propagate)的动作。
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);//设置头节点【1】
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// 判断是否需要进行唤醒操作【2】
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
-
【1】如前方法名所示,先设置head然后进行传播(doReleaseShared)
-
【2】第一个if判断分解如下:
- 1,
propagate > 0
,这个判读为true表示还有资源可以获取,直接进入if - 2,
h == null || h.waitStatus < 0
,这个h是设置head前存好的老的head,这个先判断了老head的waitStatus如果是小于0的就进入if,先判空是防空指针 - 3,
(h = head) == null || h.waitStatus < 0
,前面两个都不成立,最后会判断新的head的waitStatus如果是小于0的就进入if
里面第二个if判断只需确定后继节点是共享模式,就调用doReleaseShared方法。
- 1,
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
@ReservedStackAccess
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
releaseShared执行成功就调用doReleaseShared即可完成释放资源,重点看doReleaseShared的代码。
doReleaseShared这里已经知道调用doReleaseShared的地方有两处,一处是释放资源的时候(releaseShared),一处是等待队列头节点的后继节点获得资源设置新的head后调用(setHeadAndPropagate)
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
// 循环内执行【1】
for (;;) {
Node h = head;
// 【2】
if (h != null && h != tail) {
int ws = h.waitStatus;
// 头节点waitStatus为SIGNAL【3】
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 头节点waitStatus为0【4】
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 出循环唯一判断就是head节点没有变化
if (h == head) // loop if head changed
break;
}
}
- 【1】这个方法的功能就是来唤醒head的后继节点的,这里却来了个死循环,退出条件是执行到最后head没有变化,也就是说在执行的时候无论是符合条件触发了unparkSuccessor还是没有触发,只要最后判断head变化了,那么还得继续循环执行唤醒head后继节点的逻辑,也就是说并不是一个线程进入这个方法逻辑完成唤醒就结束了,只要head不断在变化,那么可能会有很多线程同时在执行这段逻辑而不会退出循环,直到
tryAcquireShared
返回负数,可以想象很多线程同时在执行这个循环代码,如果也刚好tryAcquireShared
能成功的情况,这样是能够让那些等待的节点更快的被唤醒并且获得资源。 - 【2】
h != null && h != tail
这个判断都会认为是能够判断此时等待队列是否有等待被唤醒节点,所以进入后面的逻辑。其实这里还有一种场景就是初始化头节点执行了cas设置head,tail还是为空状态,在前篇中重点分析过。那么在这里尝试唤醒head后继节点的逻辑里是可以忽略这个场景的。 - 【3】如果head的waitStatus为SIGNAL,那么意味着有资格唤醒后继节点,不过需要先cas修改waitStatus到0成功才行,如果修改失败,表示已经有线程抢先一步做了同样的事情,所以没有必要去下面判断head是否变化了,直接继续执行循环。
- 【4】如果head的waitStatus为0,cas修改为PROPAGATE,如果修改失败,说明已经有线程抢先一步做了同样的事情,和前面一样直接继续循环执行。关于PROPAGATE这个状态后面详细分析。
在AQS的注释和API中发现有根据是否会抛出InterruptedException,分成了 uninterruptible mode 和 interruptible mode。比如acquire(int arg)
和acquireInterruptibly(int arg)
,acquireShared(int arg)
和acquireSharedInterruptibly(int arg)
本质区别就是在是否执行过程中对中断的响应。
以acquireSharedInterruptibly为例
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
进入方法就先判断线程中断标志,看起来是非常关心执行代码的线程是不是中断了,哈哈。
doAcquireSharedInterruptibly方法:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();【1】
}
} finally {
if (failed)
cancelAcquire(node);【3】
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();【2】返回线程中断标志
}
-
【1】【2】【3】,线程等待的地方是在parkAndCheckInterrupt的
LockSupport.park(this);
这行代码,唤醒时继续从这行代码下开始执行。这里涉及到一个比较关键的信息就是LockSupport.park(this);
里面是调用UNSAFE.park
方法,这个方法是native方法,因为这里没有等待时间的设置,只有两种可能会唤醒这个线程:1,调用了unPark方法;2,线程中断。线程醒来的时候并不能确认是unPark唤醒的还是中断唤醒的。所以无论如何都会在唤醒线程后第一步【2】中就是得到中断标志,如果线程已中断parkAndCheckInterrupt返回true,就会抛出InterruptedException
异常。抛出异常就会退出自旋,也会执行到
cancelAcquire(node)
。关于这个方法前篇已经详细介绍,它会把传入的node的waitstatus设置为CANCELLED状态。
而对于uninterruptible mode的实现就是在发现唤醒线程中断标志是true后,就执行Thread.currentThread().interrupt();
再次设置中断,然后并不处理中断。使用者可以在外面检查线程中断状态进行处理。
除了中断以外,AQS还提供了有超时机制的API,这个能力的基础是基于Unsafe的park(boolean isAbsolute, long time)方法。
比如AQS中doAcquireSharedNanos方法:
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)// 参数判断
return false;
// 计算到期时间【1】
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
// 计算剩余时间 【2】
nanosTimeout = deadline - System.nanoTime();
// 如果已经超时就返回【3】
if (nanosTimeout <= 0L)
return false;
// 对剩余时间有限制【4】
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 处理中断【5】
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 【1】注意,在进入for循环之前就计算出过期时间,这个过期时间是不会变的
- 【2】,【3】,【4】在原来的自旋逻辑里加入了自旋计算超时剩余时间,每次循环是先使用deadline计算出剩余超时时间,已经超时就返回,这个for循环可能会执行多次,在执行过程中时间就会消耗,有可能还未进入park操作,就已经超时了。而在执行park之前的条件是等待时间必须超过
spinForTimeoutThreshold
(1000微秒),如果连1000微秒都不到了,就没必要把线程弄等待了,直接再执行一下循环代码消耗掉时间,从【3】判断退出循环 - 【5】AQS中
doAcquireSharedNanos
和doAcquireNanos
方法都会在处理中断
下面介绍下使用共享模式扩展的两个同步器:Semaphore和CountDownLatch。
SemaphoreSemaphore经常用于限制访问资源的线程数量。
以一个停车场的运作为例。简单起见,假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了五辆车,看门人允许其中三辆直接进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入外面的一辆进去,如果又离开两辆,则又可以放入两辆,如此往复。
在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人起的就是信号量的作用。
— 百度百科
和ReentrantLock实现相似,它内部也有一个Sync类,然后子类NonfairSync和FairSync分别实现非公平和公平模式。
NonfairSync.tryAcquireSharedSemaphore自定实现tryAcquireShared方法。
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
// Sync
final int nonfairTryAcquireShared(int acquires) {
for (;;) { // 自旋【1】
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) //cas【2】
return remaining;
}
}
- 【1】,【2】nonfairTryAcquireShared实现是一个循环+cas操作,它们操作的是AQS中的state字段,先获取当前state的值,然后使用请求的数量计算剩余的数量,如果计算的剩余是小于0,直接返回这个负数值,表示获取失败;如果是大于等于0,表示有机会获取成功,那么就cas尝试更新state值,这里没有考虑ABA问题,因为出现ABA情况对于此时的逻辑也是正确的。当然,cas成功那就是获取成功,如果cas失败,没事,循环继续执行。方法只有两种返回可能,一种cas成功,返回大于等于0的值;一种计算出剩余小于0,返回小于0的值。
对于公平场景,Semaphore实现和ReentrantLock也是一致:
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
核心就是hasQueuedPredecessors方法,在前篇中已经详细分析,其他代码和非公平模式代码一致。
Sync.tryReleaseShared无论公平还是非公平,释放资源操作都是执行AQS的releaseShared方法,从而执行到tryReleaseShared方法。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
和前面的获取操作一样,释放操作需要修改state值也是使用循环+cas结合使用,对int值overflow的情况做了抛出异常处理。
Semaphore例子以下是例子代码,Semaphore的初始化值是2,一共5个线程。首先两个线程acquire成功,另三个个线程进入队列等待,然后5秒后,两个线程分别执行release,等待队列中的线程被唤醒,队列中前两个线程获取成功,最后一个线程继续等待直到再有线程执行release。
public class SemaphoreTest {
static Semaphore semaphore = new Semaphore(2);
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
executorService.submit(new Task());
}
}
static class Task implements Runnable{
@Override
public void run() {
try {
semaphore.acquire();
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName()+" semaphore release");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
以下使用图示例进行说明:
1,三个线程进入等待队列等待,state值被前两个线程获取成功后从2变为了0,注意因为head指向的节点是作为虚节点所以实际等待队列中此时为4个节点,最后一个节点因为没有后继节点所以它的waitstatus为0,其他节点的waitstatus都为SIGNAL。
2,5秒后,前面获取成功的两个线程先后执行release,会执行到doReleaseShared
方法唤醒head的后继节点,我们假设一种并发场景:在doReleaseShared
方法代码中第一个线程执行了compareAndSetWaitStatus(h, Node.SIGNAL, 0)
成功,并且唤醒一个后继节点线程T3,此时head的状态是0,就符合了下面的判断,所以第二个线程会执行compareAndSetWaitStatus(h, 0, Node.PROPAGATE)
把head的状态更新为PROPAGATE,然后假设被唤醒的线程还没有更新head指向,那么第二个线程在判断h == head
为true后退出。被唤醒的线程就继续执行doAcquireShared
的自旋代码,因为资源已经释放,它能获取成功,然后执行setHeadAndPropagate
方法设置head和继续唤醒后继节点,因为这个场景里老head状态是PROPAGATE,新head状态是SIGNAL,随便哪一个都符合唤醒后继节点的要求。唤醒T4后,也会获取成功,执行setHeadAndPropagate
,新head状态是SIGNAL所以会执行doReleaseShared
,并且符合唤醒后继节点条件,就会唤醒T5,注意这里就2个资源,但是会多唤醒一个线程,T5被唤醒后执行doAcquireShared
的自旋代码,但是资源不足获取失败,然后修改head节点的状态为SIGNAL,线程继续等待。
允许一个或多个线程等待一组操作在其他线程中执行完成。注释中翻译一下它作用,可以作为一个闩锁,所有调用await的线程都会在锁前面等待,直到它调用countDown;或者初始化为N的CountDownLatch可以用来让一个线程等待,直到N线程完成某个动作,或者某个动作已经完成N次。
要实现CountDownLatch的功能,使用AQS的state计数,用同步队列阻塞线程,实现应该也简单。
源码实现没有公平和非公平,内部也实现了一个继承AbstractQueuedSynchronizer的子类Sync。
Sync.tryAcquireShared这个tryAcquireShared和前面同步器实现都不同,入参acquires不需要使用,也不会更改state值,而是判断当state为0的时候,来获取的操作能成功,否则获取失败。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
Sync.tryReleaseShared
自定义的释放操作确保state值只能减到0,不会减到负数,仍然是循环+cas操作实现。
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
await和countDown
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void countDown() {
sync.releaseShared(1);
}
await是获取,countDown是释放。在初始化CountDownLatch的时候需要传入一个countDown数值,表示线程通过await之前必须调用countDown的次数,初始化state为countDown,每次执行countDown后state值就减1。
从前面的实现代码可以看出CountDownLatch的初始化后,在state没有变成0之前执行await就会进入到同步队列等待,每次执行countDown,state就会减1,state变成0的时候触发唤醒等待线程。
CountDownLatch类注释上写了它两个典型用法:
- 1,某一个线程开始运行前需要等待一个或多个线程都完成。一个线程先执行await等待,然后根据初始化的值,执行这个数量的countDown次数,可以一个线程执行,也可以多个线程执行,state被减到小于等于0的时候触发唤醒前面等待的一个线程。
- 2,可以实现多个线程并行执行的场景,多个线程开始执行前都在await上等待,然后用一个countDown方法就可以几乎同时唤醒多个等待的线程。
在共享模式中waitStatus的PROPAGATE状态显得有点突兀,然而在网上一查,原来是有一个bug的故事,仔细查看这个bug的场景以及修复代码。我觉得非常有助于理解共享模式下释放操作唤醒head后继节点和被唤醒线程获取成功后唤醒后继节点时产生的竞争场景。
bug描述:链接
修复代码记录:链接
在bug描述中有复现测试代码,也有详细的描述,可以两者结合查看帮助理解。
Doug Lea说:
There was a race condition that could prevent a releaseShared from being propagated through to a parked thread, leaving the thread parked while a permit existed.
这里稍作解释:
查看原来代码,对于release是否进行唤醒后继节点的判断:(h != null && h.waitStatus != 0
,对于setHeadAndPropagate是否进行唤醒后继节点的判断:propagate > 0 && node.waitStatus != 0
。
当一个release发生,被唤醒的节点获取到最后一个资源(propagate=0),此时head的状态是0,再有一个release发生,条件不满足唤醒后继节点退出,前面被唤醒的线程执行setHeadAndPropagate,也不会去唤醒后继节点,而此时资源是有的,但是不能触发唤醒后继节点了。
所以引入了PROPAGATE状态,这个状态是在等待队列中有等待节点的时候判断到head节点是0的情况,意味着一定是head的后继节点已经被唤醒在执行,就把head的节点改为PROPAGATE状态,如果是最后一个资源(propagate=0),但是也是判断h.waitStatus < 0依旧可以触发唤醒。在unparkSuccessor方法中只要状态小于0的都会重置成0,PROPAGATE也不影响其他流程。
总结1,共享模式和独占模式在实现上的的区别是共享模式需要处理并发释放和并发唤醒的场景,而独占模式只需要处理单个唤醒和单个唤醒的场景。所以在共享模式下获取成功也会触发后继节点的唤醒。
2,AQS支持中断模式和非中断模式,另外超时的API也是会处理中断的,而对于中断的场景会使节点的状态改为取消状态,而不是当时就进行删除,这些取消状态的节点在唤醒的时候就会从队列中切断连接剔除出去。
3,上面介绍了基于AQS共享模式实现的同步器Semaphore和CountDownLatch,在AQS内部机制基础上实现这两个同步器非常简单了,这也是AQS的厉害之处吧。
4,关于AQS源码这是第二篇,后续预计还有2篇文章,下一篇主要介绍condition队列的实现。