一、前言
Jdk中独占锁的实现除了使用关键字synchronized外,还可以使用ReentrantLock。虽然在性能上ReentrantLock和synchronized没有什么区别,但ReentrantLock相比synchronized而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景。
使用synchronized结合Object上的wait和notify方法可以实现线程间的等待通知机制。Condition同样可以实现这个功能,而且相比前者使用起来更清晰也更简单。前者是java底层级别的,后者是语言级别的,后者可控制性和扩展性更好。
与wait/notify区别
-
1.Condition能够支持不响应中断,而通过使用Object方式不支持。
-
2.Condition能够支持多个等待队列(new 多个Condition对象),而Object方式只能支持一个。
-
3.Condition能够支持超时时间的设置,而Object不支持。
二、Condition实现生产者和消费者模式
为了方便理解,我们先写一个用condition实现的生产者消费者的例子。
/** * @Description: 演示Condition实现生产者和消费者模式 */ public class ConditionDemo2 { private int queueSize = 10; private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize); private ReentrantLock lock = new ReentrantLock(); private Condition notFull = lock.newCondition(); private Condition notEmpty = lock.newCondition(); public static void main(String[] args) { ConditionDemo2 conditionDemo2 = new ConditionDemo2(); Producer producer = conditionDemo2.new Producer(); Consumer consumer = conditionDemo2.new Consumer(); new Thread(producer).start(); new Thread(consumer).start(); } class Consumer implements Runnable{ @Override public void run() { consume(); } public void consume(){ while(true){ lock.lock(); try{ while (queue.size() == 0){ System.out.println("队列空,等待数据"); notEmpty.await(); } Integer poll = queue.poll();//走过await()证明队列不为空,取出数据 System.out.println("消费者消费数据:"+poll+",队列剩余数据数量:"+queue.size()); notFull.signalAll();//获取数据之后,队列肯定有空闲,那么唤醒生产者进行生产 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } } class Producer implements Runnable{ @Override public void run() { produce(); } public void produce(){ while(true){ lock.lock(); try{ while (queue.size() == queueSize){ System.out.println("队列已满,等待空余"); notFull.await(); } queue.offer(1);//走过await()证明队列有空闲,开始往队列里生产数据 System.out.println("生产者向队列生产一个数据,队列剩余空间:"+(queueSize-queue.size())); notEmpty.signalAll();//向队列生产数据之后,队列不为空,那么唤醒消费者进行消费 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } } }Condition注意点
- 实际上,如果说Lock用来替代synchronized,那么Condition就是用来代替相对应的Object.wait/notify的,所以在用法和性质上,几乎一样。
- await方法会自动释放持有的Lock锁,和Object.wait一样,不需要自己手动先释放锁。
- 调用await的时候,必须持有锁,否则会抛异常,和Object.wait一样。
三、原理分析
在AQS中存在两个FIFO队列:同步队列(等待队列)和条件队列。
同步队列(等待队列):ReentrantLock实现原理
本文主要是讲condition实现原理(即条件队列),条件队列是由Condition内部实现的,是一个虚拟的FIFO单向队列,在AQS中同步队列、等待队列组成关系:
-
1、AQS中tail 和 head主要构成了一个FIFO双向的同步队列。
-
2、AQS中condition构成了一个FIFO单向条件队列。condition是AQS内部类,每个Condition对象中保存了firstWaiter和lastWaiter作为队列首节点和尾节点,每个节点使用Node.nextWaiter保存下一个节点的引用,因此等待队列是一个单向队列。
3.1 队列关系
在Object的监视器(monitor)模型上,一个对象拥有一个同步队列和一个等待队列;而并发包中的AQS上拥有一个同步队列和多个等待队列。两者的具体实现原理的有所不同,但在多线程下等待/唤醒 操作的思路有相同之处,Object的监视器模型 和 AQS对同步队列、等待队列对应关系如下图
3.1.1、Object的监视器模型同步、等待队列对应关系图
3.1.2、AQS中同步队列、条件队列对应关系图
当多线程并发访问AQS的lock()、await()、single()方法时,同步队列和等待队列变化处理过程包括:
- 1、多个线程执行lock()方法时,线程会竞争获取同步锁state,获取成功的线程占有锁state、获取失败的线程会封装成node加入到AQS的同步队列中,等待锁state的释放。
- 2、等获取了state锁的线程(同步队列中head节点)执行await()方法时,condition会将当前线程封装成一个新的node添加到condition等待队列的尾部,同时阻塞(waiting),直到被唤醒。
- 3、等获取了state锁的线程(同步队列中head节点)single()方法时,condition会将等待队列首节点移动到同步队列的尾部,直到获取同步锁state才被唤醒。
3.2 Condition的实现
3.2.1 等待的实现
当线程调用Condition.await()方法时,将会把前线程封装成node节点,并将节点加入等待队列的尾部,然后释放同步state状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。当前线程加入Condition的等待队列逻辑如下图:
-
1、能够调用Condition.await()方法的节点是获取了同步state锁的node,即同步队列中的head节点;调用Condition的await()方法(或者以await开头的方法)会使当前线程进入等待队列并释放锁、唤醒同步队列中的后继节点,最后线程状态变为等待状态。
-
2、Condition拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter指向它,并且更新尾节点即可。
-
3、调用Condition.await()节点引用更新的过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了state锁的线程,也就是说该过程是由锁来保证线程安全的。
3.2.2 通知的实现
整个signal()的过程可以总结如下:
-
1、执行signal()唤醒线程时,先判断当前线程是否是同步锁state持有线程,所以能够调用signal()方法的线程一定持有了同步锁state。
-
2、自旋唤醒等待队列的firstWaiter(首节点),在唤醒firstWaiter节点之前,会将等待队列首节点移到同步队列中。
四、源码分析
可以看到,想要获得一个condition对象,需要首先通过一个ReentrantLock锁来创建,而最终调用其实为AQS中的内部类ConditionObject。
Condition condition = lock.newCondition(); public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { final ConditionObject newCondition() { return new ConditionObject(); } } } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public class ConditionObject implements Condition, java.io.Serializable { /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; /** * Creates a new {@code ConditionObject} instance. */ public ConditionObject() { } } }condition是要和lock配合使用的,而lock的实现原理又依赖于AQS,所以AQS内部实现了ConditionObject。我们知道在锁机制的实现上,AQS内部维护了一个双向的同步队列,如果是独占式锁的话,所有获取锁失败的线程的尾插入到同步队列。condition内部也是使用相似的方式,内部维护了一个单向的条件队列,所有调用condition.await方法的线程会加入到条件队列中,并且线程状态转换为等待状态。
ConditionObject中有两个成员变量:头节点firstWaiter 和 尾节点lastWaiter ,条件队列的成员Node 复用了实现同步队列的内部类Node。用nextWaiter保存了下一个等待节点。
用Object的方式Object对象监视器上只能拥有一个同步队列和一个等待队列,而使用Lock可以有有一个同步队列和多个等待队列。可以多次调用lock.newCondition()创建多个Condition,所以一个Lock可以持有多个等待队列。
4.1 await等待
只有线程获取到lock之后,才可以使用condition的await方法。假设此时线程1获取到了ReentrantLock锁,在执行代码逻辑的时候,发现某些条件不符合,于是调用了**condition.await();**代码:
此时AQS主要执行以下动作:
- 线程1把自己包装成节点,waitStatus设为CONDITION(-2),追加到ConditionObject中的条件队列(每个ConditionObject有一个自己的条件队列);
- 线程1释放锁,把state设置为0;
- 然后唤醒等待队列中head节点的下一个节点;
保存新节点addConditionWaiter()方法如下。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public class ConditionObject implements Condition, java.io.Serializable { private Node addConditionWaiter() { Node t = lastWaiter; // 清除被取消的尾节点 if (t != null && t.waitStatus != Node.CONDITION) { //解除关联 unlinkCancelledWaiters(); t = lastWaiter; } //将当前线程保存在Node中 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else //队尾插入 t.nextWaiter = node; //更新lastWaiter lastWaiter = node; return node; } } }将当前节点保存到新建立的Node,如果等待队列的firstWaiter为null的话(等待队列为空队列),则将firstWaiter指向当前的Node,否则,更新lastWaiter(尾节点)即可。可以看出等待队列是一个不带头结点的链式队列,而AQS中的同步队列是一个带头结点的链式队列。
将当前节点插入到等待对列之后,会调用fullyRelease,使当前线程释放lock。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { //成功释放同步状态 failed = false; return savedState; } else { //不成功释放同步状态抛出异常 throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } }方法内部调用AQS的模板方法release方法释放AQS的同步状态,并且唤醒在同步队列中头结点的后继节点引用的线程,如果释放成功则正常返回,若失败的话就抛出异常。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public class ConditionObject implements Condition, java.io.Serializable { public final void await() throws InterruptedException { //...... while (!isOnSyncQueue(node)) { // 3. 当前线程进入到等待状态 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //...... } } }当线程第一次调用condition.await()方法时,会进入到这个while()循环中,然后通过LockSupport.park(this)方法使得当前线程进入等待状态,那么要想退出这个await方法第一个前提条件自然而然的是要先退出这个while循环,有两种可能:
-
1.逻辑走到break退出while循环(当前等待的线程被中断)
-
2.while循环中的逻辑判断为false(当前节点被移动到了同步队列中,即另外线程调用的condition的signal或者signalAll方法)。
总的说就是当前线程被中断或者调用condition.signal/condition.signalAll方法当前节点移动到了同步队列后 ,这是当前线程退出await方法的前提条件。当退出while循环后就会调用acquireQueued(node, savedState)(之前Reentlock中讲过),自旋过程中线程不断尝试获取同步状态,直至获取lock成功。这也说明了退出await方法必须是已经获得了condition关联的lock。
4.2 signal唤醒
当另一个线程执行了 condition.signal之后,主要是做了以下事情:
- 1、把条件队列中的第一个节点追加到等待队列中;
- 2、把等待队列原来尾节点的waitStatus设置为SIGNAL。
然后继续处理自己的事情,自己的事情处理完成之后,会释放锁,唤醒等待队列中head节点的下一个节点线程进行工作。
调用condition的signal唤醒一个等待在condition上的线程(头节点),将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回,源码如下。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public class ConditionObject implements Condition, java.io.Serializable { public final void signal() { //1. 先检测当前线程是否已经获取lock if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //2. 获取条件队列中第一个节点,之后的操作都是针对这个节点 Node first = firstWaiter; if (first != null) doSignal(first); } } }signal方法首先会检测当前线程是否已经获取lock,没有获取lock会直接抛出异常,再调用doSignal传入头节点。doSignal方法源码为:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public class ConditionObject implements Condition, java.io.Serializable { private void doSignal(Node first) { do { // 已经是尾节点了 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 将头结点从条件队列中移除 first.nextWaiter = null; // while中transferForSignal方法对头结点做真正的处理 // 将等待队列中的 Node 转移至 AQS 同步队列, 不成功且还有节点则继续循环 } while (!transferForSignal(first) && // 队列还有节点 (first = firstWaiter) != null); } } }具体逻辑请看注释,真正对头节点做处理的逻辑在transferForSignal,该方法源码为:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { // 如果节点状态是取消, 返回 false 表示转移失败, 否则转移成功 final boolean transferForSignal(Node node) { // 更新状态为0 // 如果状态已经不是 Node.CONDITION, 说明被取消了 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //将该节点移入到AQS同步队列尾部 Node p = enq(node); int ws = p.waitStatus; // 上一个节点被取消 // 上一个节点不能设置状态为 Node.SIGNAL if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // unpark 取消阻塞, 让线程重新同步状态 LockSupport.unpark(node.thread); return true; } }4.3 signalAll()源码
signalAll()会从首节点循环遍历条件队列,将条件队列中的所有节点移到同步队列中去。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public class ConditionObject implements Condition, java.io.Serializable { public final void signalAll() { //1. 先检测当前线程是否已经获取lock if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } //遍历条件队列,将条件队列中的node移动到同步队列中 private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; //移动节点到同步队列中 transferForSignal(first); first = next; } while (first != null); } } }4.4 不可打断等待 - 直到被唤醒
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public class ConditionObject implements Condition, java.io.Serializable { public final void awaitUninterruptibly() { // 添加一个 Node 至等待队列 Node node = addConditionWaiter(); // 释放节点持有的锁 int savedState = fullyRelease(node); boolean interrupted = false; // 如果该节点还没有转移至 AQS 队列, 阻塞 while (!isOnSyncQueue(node)) { // park 阻塞 LockSupport.park(this); // 如果被打断, 仅设置打断状态 if (Thread.interrupted()) interrupted = true; } // 唤醒后, 尝试竞争锁, 如果失败进入 AQS 队列 if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } } }4.5 等待 - 直到被唤醒或打断或超时
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public class ConditionObject implements Condition, java.io.Serializable { public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 添加一个 Node 至等待队列, Node node = addConditionWaiter(); // 释放节点持有的锁 int savedState = fullyRelease(node); // 获得最后期限 final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; // 如果该节点还没有转移至 AQS 队列, 阻塞 while (!isOnSyncQueue(node)) { // 已超时, 退出等待队列 if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } // park 阻塞一定时间, spinForTimeoutThreshold 为 1000 ns if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // 如果被打断, 退出等待队列 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } // 退出等待队列后, 还需要获得 AQS 队列的锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 所有已取消的 Node 从队列链表删除 if (node.nextWaiter != null) unlinkCancelledWaiters(); // 应用打断模式 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); } } }4.6 await恢复后继续执行
被唤醒的如果是之前执行了await方法的线程,那么该线程会接着就像往await方法里面阻塞处的下面继续执行,下面是源码:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public class ConditionObject implements Condition, java.io.Serializable { public final void await() throws InterruptedException { //如果当前线程中断,抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // 1. 添加新节点,将当前线程保存其中,并且添加到等待队列队尾 Node node = addConditionWaiter(); // 2. 释放当前线程所占用的lock,并且唤醒同步队列中的下一个节点 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { // 3. 当前线程进入到等待状态 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 4. 自旋等待获取到同步状态(即获取到lock) if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled //删除无效的等待节点 unlinkCancelledWaiters(); // 5. 处理被中断的情况 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } } }可以发现,这里主要是判断到当前线程节点已经放入等待队列了,那么会尝试获取锁,获取成功则继续往下执行代码。
只有线程获取到ReentrantLock的锁之后才可以继续往下执行,中间可能会因为执行await而进入条件队列并释放锁,最后又会被唤醒重新获取锁,继续往下执行。最后按照书写规范,我们一定会在代码中执行ReentrantLock.unlock()释放锁,然后继续唤醒等待队列后续线程继续执行。
总结
-
1、Condition等待通知的本质就是条件队列 和 同步队列的交互的过程,跟object的wait()/notify()机制一样;Condition是基于同步锁state实现的,而objec是基于monitor模式实现的。
-
2、一个lock(AQS)可以有多个Condition,即多个条件队列,只有一个同步队列。
-
3、Condition.await()方法执行时,会将同步队列里的head锁释放掉,把线程封装成新node添加到条件队列中;Condition.signal()方法执行时,会把条件队列中的首节点移到同步队列中去,直到锁state被获取才被唤醒。
参考: https://www.itzhai.com/articles/analysis-of-reentrantlocks-condition-principle.html
https://blog.csdn.net/e891377/article/details/104715461
https://blog.csdn.net/weixin_42103620/article/details/117331593
https://blog.csdn.net/sinat_32873711/article/details/106619981
https://blog.csdn.net/qq_33996921/article/details/106629546