一、阻塞队列 BlockingQueue 在java.util.concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的
一、阻塞队列 BlockingQueue
在java.util.concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。
1.1、BlockingQueue的基本原理
先来解释一下阻塞队列:
如上图:
- 1、生产线程1往阻塞队列里面添加新的数据,当阻塞队列满的时候(针对有界队列),生产线程1将会处于阻塞状态,直到消费线程2从队列中取走一个数据;
- 2、消费线程2从阻塞队列取数据,当阻塞队列空的时候,消费线程2将会处于阻塞状态,直到生产线程把一个数据放进去。
阻塞队列的基本原理就这样,至于队列是用什么数据结构进行存储的,这里并没有规定,所以后面我们可以看到很多阻塞队列的实现。
阻塞队列的常用方法
查阅BlockingQueue总结了以下阻塞队列的方法:
1、boolean add(E e)
- 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回IllegalStateException异常。
2、boolean offer(E e)
- 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回false。
3、void put(E e)
- 直接在队列中插入元素,当无可用空间时候,阻塞等待。
4、boolean offer(E e, long timeout, TimeUnit unit)
- 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false。
5、E take()
- 获取并移除队列头部的元素,无元素时候阻塞等待。
6、E poll( long time, timeunit unit)
- 获取并移除队列头部的元素,无元素时候阻塞等待指定时间。
7、boolean remove()
- 获取并移除队列头部的元素,无元素时候会抛出NoSuchElementException异常。
8、E element()
- 不移除的情况下返回列头部的元素,无元素时候会抛出NoSuchElementException异常。
9、E peek()
- 不移除的情况下返回列头部的元素,队列为空无元素时返回null。
注意:
根据remove(Object o)方法签名可知,这个方法可以移除队列的特定对象,但是这个方法效率并不高。因为需要遍历队列匹配到特定的对象之后,再进行移除。 以上支持阻塞和超时的方法都是能够响应中断的。
1.2、BlockingQueue的实现
BlockingQueue底层也是基于AQS实现的,队列的阻塞使用ReentrantLock的Condition实现的。
下面我们来看看各个实现类的原理。以下分析我都会基于支持阻塞的put和take方法来分析。
二、ArrayBlockingQueue
ArrayBlockingQueue使用的数据结构是数组
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** 队列中元素保存的地方 */ final Object[] items; /** 取元素的指针 记录下一次操作的位置 */ int takeIndex; /** 放元素的指针 记录下一次操作的位置 */ int putIndex; /** 元素数量 */ int count; /** 保证并发访问的锁 */ final ReentrantLock lock; /** 等待出队的条件 消费者监视器 */ private final Condition notEmpty; /** 等待入队的条件 生产者监视器 */ private final Condition notFull; }构造函数
- 容量大小有构造函数的capacity参数决定。
2.1、入队
2.1.1、add(E e)方法
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public boolean add(E e) { return super.add(e); } } public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> { // AbstractQueue 调用offer(e)如果成功返回true,如果失败抛出异常 public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } }2.1.2、offer(E e)方法
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public boolean offer(E e) { // 元素不可为空 checkNotNull(e); final ReentrantLock lock = this.lock; // 加锁 lock.lock(); try { if (count == items.length) // 如果数组满了就返回false return false; else { // 如果数组没满就调用入队方法并返回true enqueue(e); return true; } } finally { //释放锁 lock.unlock(); } } }2.1.3、put(E e) 方法
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public void put(E e) throws InterruptedException { checkNotNull(e); // 获取ReentrantLock锁 final ReentrantLock lock = this.lock; // 加锁,如果线程中断了抛出异常 lock.lockInterruptibly(); try { // 如果队列满了,则进入条件队列进行等待 while (count == items.length) notFull.await(); // 队列不满,或者被取数线程唤醒了,那么会继续执行 // 这里会往阻塞队列添加一个数据,然后唤醒等待时间最长的取数线程 enqueue(e); } finally { // 释放ReentrantLock锁 lock.unlock(); } } }2.1.4、offer(E e, long timeout, TimeUnit unit)方法
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; // 如果数组满了,就阻塞nanos纳秒,如果唤醒这个线程时依然没有空间且时间到了就返回false nanos = notFull.awaitNanos(nanos); } //入队 enqueue(e); return true; } finally { lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; // 把元素直接放在放指针的位置上 items[putIndex] = x; // 如果放指针到数组尽头了,就返回头部 if (++putIndex == items.length) putIndex = 0; // 数量加1 count++; // 唤醒notEmpty,因为入队了一个元素,所以肯定不为空了 notEmpty.signal(); } }- add(e)时如果队列满了则抛出异常;
- offer(e)时如果队列满了则返回false;
- put(e)时如果队列满了则使用notFull等待;
- offer(e, timeout, unit)时如果队列满了则等待一段时间后如果队列依然满就返回false;
- 利用放指针循环使用数组来存储元素;
2.2、出队
2.2.1、 remove()方法
public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> { public E remove() // 调用poll()方法出队 E x = poll(); if (x != null) // 如果有元素出队就返回这个元素 return x; else // 如果没有元素出队就抛出异常 throw new NoSuchElementException(); } }2.2.2、 poll()方法
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { //如果队列里没有数据就直接返回null //否则从队列头部出队 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } }2.2.3、 poll(long timeout, TimeUnit unit)方法
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 如果队列无元素,则阻塞等待nanos纳秒 // 如果下一次这个线程获得了锁但队列依然无元素且已超时就返回null while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } } }2.2.4、 take()方法
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //加锁,如果线程中断了抛出异常 lock.lockInterruptibly(); try { //队列中不存元素 while (count == 0) /* * 一直等待条件notEmpty,即被其他线程唤醒 * (唤醒其实就是,有线程将一个元素入队了,然后调用notEmpty.signal() * 唤醒其他等待这个条件的线程,同时队列也不空了) */ notEmpty.await(); //否则出队 return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") // 取取指针位置的元素 E x = (E) items[takeIndex]; // 把取指针位置设为null items[takeIndex] = null; // 取指针前移,如果数组到头了就返回数组前端循环利用 if (++takeIndex == items.length) takeIndex = 0; // 元素数量减1 count--; if (itrs != null) itrs.elementDequeued(); // 唤醒notFull条件 notFull.signal(); return x; } }- remove()时如果队列为空则抛出异常;
- poll()时如果队列为空则返回null;
- take()时如果队列为空则阻塞等待在条件notEmpty上;
- poll(timeout, unit)时如果队列为空则阻塞等待一段时间后如果还为空就返回null;
- 利用取指针循环从数组中取元素;
如下图,以put和take方法为例:
这里put和take使用了同一个ReentrantLock,不能并发执行。
2.3、缺点
-
a、队列长度固定且必须在初始化时指定,所以使用之前一定要慎重考虑好容量;
-
b、如果消费速度跟不上入队速度,则会导致提供者线程一直阻塞,且越阻塞越多,非常危险;
-
c、只使用了一个锁来控制入队出队,效率较低。
参考: https://www.itzhai.com/articles/graphical-blocking-queue.html
https://zhuanlan.zhihu.com/p/224946304