目录
- 概览
- 使用方法
- 1. add | remove | element
- 2. offer | poll | peek
- 3. put | take
- 4. offer | poll (timeout)
- 源码解析
- 说明
- 队列容器
- 关键成员变量
- 初始化
- put方法
- 总结
概览
1. 基于链表的可选有界阻塞队列。根据FIFO的出入队顺序,从队列头部检索和获取元素,在队列尾部插入新元素。
2. 当作为有界阻塞队列,在队列空间不足时,put方法将会一直阻塞直到有多余空间才会执行插入元素操作,take方法则相反,只到队列内元素不为空时,才将队列元素逐个取出。
3. 队列容量不指定时,默认为Integer.MAX_VALUE,此时可以看作无界队列。
4. 使用非公平锁进行并发控制。所有方法都是线程安全的。
使用方法
下面的文章给出了阻塞队列的四种基本用法:
了解BlockingQueue 大体框架和实现思路
LinkedBlockingQueue实现了BlockingQueue类。
在BlockingQueue中,方法被分为如下四类:
Throws exception
:操作未实现时(正常流程下的执行)抛出异常Special value
:根据操作的实际情况,返回特定值,例如null、false(这些失败可能是线程中断、队列为空引起的)Blocks
:阻塞当前线程,直到当前线程可以成功执行Times out
:尝试指定时间后,放弃执行
Throws exception
Special value
Blocks
Times out
新增
add(E e)
offer(E e)
put(E e)
offer(E e, long timeout, TimeUnit unit)
删除
remove()
poll()
take()
poll(long timeout, TimeUnit unit)
查询
element()
peek()
1. add | remove | element
这三个方法在BlockingQueue的定义中,都会在操作未实现时,抛出异常。
add(E e)
:在队尾添加元素e,add内部调用offer方法实现。因此,元素e为空时,抛出NullPointerException异常;插入失败时,抛出IllegalStateException异常。remove
:删除队首元素,内部调用poll方法。队首无数据时,抛出NoSuchElementException异常。element
:检索队首元素。队首无数据时,抛出NoSuchElementException异常。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>(); blockingQue.add(1); blockingQue.remove(1); blockingQue.remove(); // NoSuchElementException blockingQue.element(); // NoSuchElementException
2. offer | poll | peek
根据操作的实际情况,返回特定值,例如null、false(这些失败可能是线程中断、队列为空引起的)
offer(E e)
:在队尾添加元素e,元素e为空时,抛出NullPointerException异常;插入失败时返回false。poll
:删除队首元素。删除失败时返回false。peek
:检索队首元素。队首无数据时,返回null。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>(); blockingQue.offer(1); blockingQue.poll(); Integer peek = blockingQue.peek(); // 返回null
3. put | take
阻塞当前线程,直到当前线程可以成功执行。
put(E e)
:在队尾添加元素e,元素e为空时,抛出NullPointerException异常。当队列满时,阻塞put线程,等待队列被消费后,队列容量不满时,该阻塞线程继续尝试在队尾插入元素。该方法在阻塞时可以被中断,并抛出InterruptedException异常。take
:删除并获取队首元素。队首元素不为空时返回。队首元素为空,阻塞take线程,等待队列不为空时,再次尝试消费队首元素。该方法在阻塞时可以被中断,并抛出InterruptedException异常。
注意:阻塞时,不会解除锁占用。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>(); try { blockingQue.put(1); blockingQue.take(); } catch (InterruptedException e) { // 线程被中断 e.printStackTrace(); }
4. offer | poll (timeout)
尝试指定时间后,放弃执行
offer(E e, long timeout, TimeUnit unit)
:在队尾添加元素e,元素e为空时,抛出NullPointerException异常;当队列容量满时,线程休眠一定时间后再次查看队列容量,当该休眠时间大于等于timeout后,此时队列还满则返回false。不满时,尝试入队。需要注意的是,由于伪唤醒机制的存在,线程可能在timeout这个时间段内的任意一点被唤醒,如果队列容易不满,则会直接执行入队操作。阻塞时,当前线程被中断抛出InterruptedException异常。poll(long timeout, TimeUnit unit)
:删除队首元素。poll与offer对应的,当队列为空的时候,线程休眠一定时间。休眠时,当前线程被中断抛出InterruptedException异常。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>(); try { blockingQue.offer(1, 100, TimeUnit.MILLISECONDS); blockingQue.poll(100, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); }
当然除了上述的阻塞队列的基本操作外,LinkedBlockingQueue还具有集合Collection的性质。因此集合中的通用方法也可以使用。
源码解析
说明
本次源码分析主要按照下面几个步骤进行:
1. 保存队列数据的容器以及出入队方法
2. 主要成员变量以及作用
3. 主要方法分析
队列容器
结构图
仅有数据item和后继next的单向节点,结构简单。
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } }
next三种情况
A 普通节点的真实后继
B 真正的队首节点,item=null(队首节点恒为head.next)
C 队尾节点,next=null
- item: null -> first -> …… -> last
- next: first -> second -> ……-> null
入队操作
// 入队 private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; // last.next = node; last = node; }
步骤1
步骤2
出队操作
// 出队 private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; // 形成引用链闭环,JVM根据可达性分析时,GC root的引用链与该对象之间不可达,进行GC h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
步骤1
步骤2
关键成员变量
队列入队和出队锁分离,都使用了非公平锁。
这里的count属性需要注意下,这里使用了原子类保证操作的原子性。后面的入队和出队,将会频繁使用它。
/** 容量, 初始化不设置时默认为Integer.MAX_VALUE*/ private final int capacity; /** 当前队列内的元素数量 */ private final AtomicInteger count = new AtomicInteger(); /** * 队首 * 不变量: head.item == null */ transient Node<E> head; /** * 队尾 * 不变量: last.next == null */ private transient Node<E> last; /** 出队操作公用锁 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 用于出队操作的阻塞和唤醒 出队的话,只需要考虑队列是否为空 */ private final Condition notEmpty = takeLock.newCondition(); /** 入队操作公用锁 */ private final ReentrantLock putLock = new ReentrantLock(); /** 用于入队操作的阻塞和唤醒 入队只需要考虑队列空间是否足够*/ private final Condition notFull = putLock.newCondition();
初始化
三个构造函数
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } // 自定义容量 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } // 初始化时,批量添加集合中的元素 public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
put方法
put方法几个关注点
- 释放锁的时机
- 执行入队操作
- 唤醒生产者的时机
- 唤醒消费者的时机
这几点是整个阻塞操作的核心,可以在下面的分析中仔细观察。
注:由于阻塞队列就是基于生产者-消费者模型的,因此,下文中都把调用put方法的线程称为生产者,调用take方法的线程称为消费者。
总体分析
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; // -1代表操作异常 Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 如果线程没有被标记为中断,则获取锁 putLock.lockInterruptibly(); try { while (count.get() == capacity) { // 这里是线程在执行put操作时唯一一个执行过程中释放锁的地方 notFull.await(); // 容量已满,等待被消费后唤醒 } // 添加元素,更新容量 enqueue(node); c = count.getAndIncrement(); // 队列容量有余时,在这里再次唤醒一个其他的生产者线程(或者说消费者消费速度大于生产) if (c + 1 < capacity) notFull.signal(); } finally { // 释放锁 putLock.unlock(); } // 唤醒一个消费者 if (c == 0) signalNotEmpty(); }
count属性并发问题
这里需要重点关注count,由于有两把锁,count可以同时被putLock、takeLock操作,那么这里是否会产生并发问题。
分析如下:
A. 只有putLock或takeLock一把锁操作:就是单线程操作,没影响,不产生并发问题。
其他所有put操作都处于await的状态或者竞争锁状态,其他线程也因为获取不到锁而无法执行,只有等该节点添加完成释放锁,其他线程才有机会继续执行。
while (count.get() == capacity) { notFull.await(); // 容量已满,等待被消费后唤醒 }
B. putLock和takeLock同时操作:我们假设两个线程一个获取到putLock,一个获取到了takeLock(同时最多也只有两个线程操作count)。
// put while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); // take while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal();
由于count是原子类那么count的所有读写操作必然是一个串联的操作,而非并行操作,因此也不存在并发问题,如下图(顺序可能不同):
唤醒消费者
代码的最后一段,会有唤醒一个消费者的操作。
// 唤醒一个等待中的消费者 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
刚开始看到的时候很疑惑,为什么是c == 0才唤醒。如果生产者入队成功,那么c应该为如下值:
c = count.getAndIncrement();
后面看了一下count.getAndIncrement()方法定义才发现自己记混了,count.getAndIncrement()是一个原子操作,且返回值的是操作前的值。
ok,现在没问题了。
count >= 0,也就是说,只有在生产者入队前队列为空,入队成功之后才会唤醒一个消费者消费。
take方法
take方法与put方法大致相似,只是与put做相反操作。
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 队列元素为空,停止消费,让出锁并等待被唤醒 while (count.get() == 0) { notEmpty.await(); } // 移除队首元素,并更新容量 x = dequeue(); c = count.getAndDecrement(); // 生产速度大于消费速度,唤醒一个其他消费者进行消费 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } // 消费之前队列已满,消费后唤醒一个生产者 if (c == capacity) signalNotFull(); return x; } // 唤醒生产者 /** * Signals a waiting put. Called only from take/poll. */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
总结
总体上看LinkedBlockingQueue类不难,整个生产-消费的流程实现也比较简单。源码已经把该介绍的东西都讲得很明白了,我这属于依葫芦画瓢顺着源码注释写出来的。这么一写,自己这个类的印象就很深刻了。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持自由互联。