一、阻塞队列 BlockingQueue
在java.util.concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。
1.1、BlockingQueue的基本原理
先来解释一下阻塞队列:
如上图:
- 1、生产线程1往阻塞队列里面添加新的数据,当阻塞队列满的时候(针对有界队列),生产线程1将会处于阻塞状态,直到消费线程2从队列中取走一个数据;
- 2、消费线程2从阻塞队列取数据,当阻塞队列空的时候,消费线程2将会处于阻塞状态,直到生产线程把一个数据放进去。
阻塞队列的基本原理就这样,至于队列是用什么数据结构进行存储的,这里并没有规定,所以后面我们可以看到很多阻塞队列的实现。
阻塞队列的常用方法
查阅BlockingQueue总结了以下阻塞队列的方法:
1、boolean add(E e)
- 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回IllegalStateException异常。
2、boolean offer(E e)
- 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回false。
3、void put(E e)
- 直接在队列中插入元素,当无可用空间时候,阻塞等待。
4、boolean offer(E e, long timeout, TimeUnit unit)
- 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false。
5、E take()
- 获取并移除队列头部的元素,无元素时候阻塞等待。
6、E poll( long time, timeunit unit)
- 获取并移除队列头部的元素,无元素时候阻塞等待指定时间。
7、boolean remove()
- 获取并移除队列头部的元素,无元素时候会抛出NoSuchElementException异常。
8、E element()
- 不移除的情况下返回列头部的元素,无元素时候会抛出NoSuchElementException异常。
9、E peek()
- 不移除的情况下返回列头部的元素,队列为空无元素时返回null。
注意:
根据remove(Object o)方法签名可知,这个方法可以移除队列的特定对象,但是这个方法效率并不高。因为需要遍历队列匹配到特定的对象之后,再进行移除。 以上支持阻塞和超时的方法都是能够响应中断的。
1.2、BlockingQueue的实现
BlockingQueue底层也是基于AQS实现的,队列的阻塞使用ReentrantLock的Condition实现的。
下面我们来看看各个实现类的原理。以下分析我都会基于支持阻塞的put和take方法来分析。
二、LinkedBlockingQueue
LinkedBlockingQueue也是一个阻塞队列,相比于ArrayBlockingQueue,他的底层是使用链表(单向链表)实现的,而且是一个可有界可无界的队列,在生产和消费的时候使用了两把锁,提高并发,是一个高效的阻塞队列。
LinkedBlockingQueue底层的数据结构是链表,这一点很容易验证,在源码中,我们可以看到它有一个内部类Node,基本源码如下所示:
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //链表节点定义 static class Node<E> { //节点中存放的值 E item; //下一个节点 /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; } } }从上面的注释可以知道,当某个node节点的next节点为null的时候,说明当前节点是最后一个节点。
LinkedBlockingQueue的基本成员属性如下代码所示:
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** 队列容量,最大为Integer.MAX_VALUE */ private final int capacity; /** 队列长度 */ private final AtomicInteger count = new AtomicInteger(); /** * 头结点 * Invariant: head.item == null */ transient Node<E> head; /** * 尾结点 * Invariant: last.next == null */ private transient Node<E> last; /** 移除操作的锁,take/poll方法用到 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 移除操作需要等待的条件notEmpty,与takeLock绑定 */ private final Condition notEmpty = takeLock.newCondition(); /** 入队操作的锁,put/offer方法用到 */ private final ReentrantLock putLock = new ReentrantLock(); /** 入队操作需要等待的条件notFull,与putLock绑定 */ private final Condition notFull = putLock.newCondition(); }可以看到,LinkedBlockingQueue内部是用单向链表实现的,并且它有两把锁:takeLock和putLock,以及对应的两个等待条件:notEmpty和notFull。takeLock控制同一时刻只有一个线程从队列头部获取/移除元素,putLock控制同一时刻只有一个线程在队列尾部添加元素。
2.1、构造函数
- 容量大小可以由构造函数的capacity设定,默认为:Integer.MAX_VALUE
2.2、阻塞入队
LinkedBlockingQueue提供的入队的方法有多个,包括add、offer、put。
2.2.1、add(E e)方法
其中add(E e)调用的就是offer(E e),offer方法入队成功返回true,入队失败(队列已满或者阻塞超时)会返回false,那么add方法调用offer方法返回false的话,那么就抛出异常,代码如下:
public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> { public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } }2.2.2、offer(E e)方法
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public boolean offer(E e) { // 如果存入的值为null,直接抛出空指针异常 if (e == null) throw new NullPointerException(); // 获取队列元素个数 final AtomicInteger count = this.count; if (count.get() == capacity) //如果已经满了,直接返回失败 return false; // 预先设置c为 -1,约定负数为入队失败 int c = -1; Node<E> node = new Node<E>(e); // 获取入队锁 final ReentrantLock putLock = this.putLock; putLock.lock(); try { //双重判断 if (count.get() < capacity) { //加入链表 enqueue(node); c = count.getAndIncrement(); // 队列元素个数+1,此时c为元素入队前的个数,也就是比当前队列元素个数少1 if (c + 1 < capacity) //唤醒生产者线程,继续插入 // 如果添加数据后还队列还没有满, //则继续调用notFull的signal方法唤醒其他等待在入队的线程,继续插入 notFull.signal(); } } finally { // 释放锁 putLock.unlock(); } if (c == 0) //说明里面有一个元素,唤醒消费者 signalNotEmpty(); return c >= 0; } }2.2.3、offer(E e, long timeout, TimeUnit unit)方法
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { // 如果存入的值为null,直接抛出空指针异常 if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); // 预先设置c为 -1,约定负数为入队失败 int c = -1; // 获取入队锁 final ReentrantLock putLock = this.putLock; // 获取队列元素个数 final AtomicInteger count = this.count; // 加锁 putLock.lockInterruptibly(); try { while (count.get() == capacity) { // 如果超时时间过了队列仍然是满的话就直接返回false if (nanos <= 0) return false; // 否则调用awaitNanos等待,超时会返回<= 0L nanos = notFull.awaitNanos(nanos); } // 如果上述没有阻塞,也就是队列没有满,那么这里直接入队 enqueue(new Node<E>(e)); // 队列元素个数+1,此时c为元素入队前的个数,也就是比当前队列元素个数少1 c = count.getAndIncrement(); if (c + 1 < capacity) // 如果添加数据后还队列还没有满, //则继续调用notFull的signal方法唤醒其他等待在入队的线程 notFull.signal(); } finally { // 释放锁 putLock.unlock(); } // c==0说明队列中有一个元素了,那么就需要唤醒其他正在等待出队的线程 // 这一点可能不好理解,c = count.getAndIncrement();理解了就差不多 if (c == 0) signalNotEmpty(); return true; } }我们一起总结一下上述的入队源码:
-
1、入队第一步,上锁,这样保证了线程安全,保证了同一时刻只能有一个入队线程在操作队列。
-
2、如果队列满了,那么会产生阻塞,如果阻塞时间过了,队列依旧是满的,那么将返回false,放弃入队。
-
3、如果队列没有满,那么直接将入队元素加入到队列的尾部,然后检查当前队列是否满了,如果没有满,则唤醒其他入队线程。
-
4、最后检查入队前的队列是否为空(c==0就表示当前入队操作前,是一个空队列),如果为空,那么就有可能存在等待出队的线程在阻塞着,那么在这里进行唤醒。
2.2.4、put(E e)方法
对于put方法,它也是入队的一个方法,这个方法和offer方法原理几乎一致,最大的区别在于put方法没有阻塞超时时间,如果队列满了,那么执行put方法的线程将一直阻塞下去。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public void put(E e) throws InterruptedException { // 如果存入的值为null,直接抛出空指针异常 if (e == null) throw new NullPointerException(); // 预先设置c为 -1,约定负数为入队失败 int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; // 使用AtomicInteger保证原子性 final AtomicInteger count = this.count; // 获取put锁 putLock.lockInterruptibly(); try { // 如果队列满了,则进入put条件队列等待 while (count.get() == capacity) { notFull.await(); } // 队列不满,或者被取数线程唤醒了,那么会继续执行 // 这里会往阻塞队列末尾添加一个数据 enqueue(node); c = count.getAndIncrement(); // 如果队列不满,则唤醒等待时间最长的put线程 if (c + 1 < capacity) notFull.signal(); } finally { // 释放put锁 putLock.unlock(); } // 如果队列为空,再次获取put锁,然后唤醒等待时间最长的put线程 if (c == 0) signalNotEmpty(); } //直接放到链表的尾部 private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; } }2.3、阻塞出队
2.3.1、remove()方法
public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> { public E remove() // 调用poll()方法出队 E x = poll(); if (x != null) // 如果有元素出队就返回这个元素 return x; else // 如果没有元素出队就抛出异常 throw new NoSuchElementException(); } }2.3.2、poll()方法
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public E poll() { final AtomicInteger count = this.count; //如果队列为空,直接返回空 if (count.get() == 0) return null; E x = null; int c = -1; // 获取take锁 final ReentrantLock takeLock = this.takeLock; // 上锁 takeLock.lock(); try { // 如果队列不空 if (count.get() > 0) { //调用dequeue获取队列中的数据 x = dequeue(); // 阻塞队列数量减1 c = count.getAndDecrement(); // 如果阻塞队列数量不为空,那么唤醒等待时间最长的take线程 if (c > 1) // 释放take锁 notEmpty.signal(); } } finally { // 解锁 takeLock.unlock(); } // 如果c == capacity就是说队列中有一个空位,唤醒入队线程 if (c == capacity) signalNotFull(); return x; } }2.3.3、poll(long timeout, TimeUnit unit)方法
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; // 获取take锁 final ReentrantLock takeLock = this.takeLock; // 上锁 takeLock.lockInterruptibly(); try { while (count.get() == 0) { // 如果队列空了,则进入take条件队列等待 // 且如果阻塞时间过期,那么将返回null if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } // 在超时时间内返回,则调用dequeue获取队列中的数据 x = dequeue(); // 阻塞队列数量减1 c = count.getAndDecrement(); // 如果c > 1,说明队列中还有节点元素,那么继续唤醒其他出队线程 if (c > 1) notEmpty.signal(); } finally { // 解锁 takeLock.unlock(); } // 如果c == capacity就是说队列中有一个空位,唤醒入队线程 if (c == capacity) signalNotFull(); return x; } }2.3.4、take()方法
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; // 获取take锁 final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 如果队列空了,则进入take条件队列等待 while (count.get() == 0) { notEmpty.await(); } // 获取到第一个节点,非哑节点 x = dequeue(); // 阻塞队列数量减1 c = count.getAndDecrement(); // 如果阻塞队列数量不为空,那么唤醒等待时间最长的take线程 if (c > 1) notEmpty.signal(); } finally { // 释放take锁 takeLock.unlock(); } // 如果队列满了,再次获取take锁,然后唤醒等待时间最长的take线程 if (c == capacity) signalNotFull(); return x; } //通过这个方法可以看出,链表的首节点的值是null,每次获取元素的时候 //先把首节点干掉,然后从第二个节点获取值 private E dequeue() { Node<E> h = head; // 获取第一个元素结点first Node<E> first = h.next; // 将头结点自引用,并被垃圾回收掉 h.next = h; // help GC // 将头结点指向第一个元素结点first head = first; // 获取第一个元素结点的值 E x = first.item; // 将第一个元素结点的值置为null,成为新的哑节点 first.item = null; // 返回被移除的节点元素值 return x; } }take和put操作如下图所示:
- 1、队列第一个节点为哑节点,占位用的;
- 2、put操作一直往链表后面追加节点;
- 3、take操作从链表头取节点;
三、ArrayBlockingQueue与LinkedBlockingQueue对比
队列 是否阻塞 是否有界 线程安全 适用场景 ArrayBlockingQueue √ √ 一把ReentrantLock锁 生产消费模型,平衡处理速度 LinkedBlockingQueue √ 可配置 两把ReentrantLock锁 生产消费模型,平衡处理速度3.1、ArrayBlockingQueue
- 数据结构:数组,存储空间预先分配,无需动态申请空间,使用过程中内存开销较小;
3.2、LinkedBlockingQueue:
- 数据结构:单项链表,存储空间动态申请,会增加JVM垃圾回收负担;
- 两把锁,并发性能较好;
- 可设置为无界,吞吐量比较大,但是不稳定,入队速度太快有可能导致内存溢出。
参考: https://www.itzhai.com/articles/graphical-blocking-queue.html
https://segmentfault.com/a/1190000039174436
https://cloud.tencent.com/developer/article/1609320