当前位置 : 主页 > 编程语言 > java >

Java并发编程——LinkedBlockingDeque

来源:互联网 收集:自由互联 发布时间:2023-02-04
一、阻塞队列 BlockingQueue 在java.util.concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的

一、阻塞队列 BlockingQueue

在java.util.concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。

1.1、BlockingQueue的基本原理

先来解释一下阻塞队列:

如上图:

  • 1、生产线程1往阻塞队列里面添加新的数据,当阻塞队列满的时候(针对有界队列),生产线程1将会处于阻塞状态,直到消费线程2从队列中取走一个数据;
  • 2、消费线程2从阻塞队列取数据,当阻塞队列空的时候,消费线程2将会处于阻塞状态,直到生产线程把一个数据放进去。

阻塞队列的基本原理就这样,至于队列是用什么数据结构进行存储的,这里并没有规定,所以后面我们可以看到很多阻塞队列的实现。

阻塞队列的常用方法

查阅BlockingQueue总结了以下阻塞队列的方法:

1、boolean add(E e)

  • 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回IllegalStateException异常。

2、boolean offer(E e)

  • 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回false。

3、void put(E e)

  • 直接在队列中插入元素,当无可用空间时候,阻塞等待。

4、boolean offer(E e, long timeout, TimeUnit unit)

  • 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false。

5、E take()

  • 获取并移除队列头部的元素,无元素时候阻塞等待。

6、E poll( long time, timeunit unit)

  • 获取并移除队列头部的元素,无元素时候阻塞等待指定时间。

7、boolean remove()

  • 获取并移除队列头部的元素,无元素时候会抛出NoSuchElementException异常。

8、E element()

  • 不移除的情况下返回列头部的元素,无元素时候会抛出NoSuchElementException异常。

9、E peek()

  • 不移除的情况下返回列头部的元素,队列为空无元素时返回null。

注意:

根据remove(Object o)方法签名可知,这个方法可以移除队列的特定对象,但是这个方法效率并不高。因为需要遍历队列匹配到特定的对象之后,再进行移除。 以上支持阻塞和超时的方法都是能够响应中断的。

1.2、BlockingQueue的实现

BlockingQueue底层也是基于AQS实现的,队列的阻塞使用ReentrantLock的Condition实现的。

 

下面我们来看看各个实现类的原理。以下分析我都会基于支持阻塞的put和take方法来分析。

二、LinkedBlockingDeque

  • LinkedBlockingDeque是一个由链表结构(双向链表)组成的双向阻塞队列,即可以从队列的两端插入和移除元素,支持FIFO和FILO。

  • 相比于其他阻塞队列,LinkedBlockingDeque多了addFirst、addLast、peekFirst、peekLast等方法。

  • LinkedBlockingDeque是可选容量的,默认容量大小为Integer.MAX_VALUE。

  • LinkedBlockingDequeConcurrentLinkedDeque类似,都是一种双端队列的结构,只不过LinkedBlockingDeque同时也是一种阻塞队列。

注意:LinkedBlockingDeque底层利用ReentrantLock实现同步,并不像ConcurrentLinkedDeque那样采用无锁算法。

如何使用 LinkedBlockingDeque

  • 1、并发场景下,需要作为双端队列使用时,如果只是作为 FIFO 队列使用,则 LinkedBlockingQueue 的性能更高。
  • 2、指定队列的容量,以避免生产速率远高于消费速率时资源耗尽的问题。

使用 LinkedBlockingDeque 的风险

  • 1、未指定容量的情况下,生产速率远高于消费速率时,会导致内存耗尽而 OOM。
  • 2、高并发场景下,性能远低于 LinkedBlockingQueue。
  • 3、由于需要维持前后节点的链接,内存消耗也高于 LinkedBlockingQueue。

2.1、内部结构

LinkedBlockingDeque内部是双链表的结构,结点Node的定义如下:

public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { /** 双向链表节点 */ static final class Node<E> { /** * 节点元素,如果节点已经被移除,则为 null */ E item; /** * 前驱结点指针. */ Node<E> prev; /** * 后驱结点指针. */ Node<E> next; Node(E x) { item = x; } } }

2.2、成员属性

public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { /** * 头结点 */ transient Node<E> first; /** * 尾结点 */ transient Node<E> last; /** 队列中个数 */ private transient int count; /** 队列长度,可以使用构造注入,如未设定,默认为无界队列 */ private final int capacity; /** 显示锁 */ final ReentrantLock lock = new ReentrantLock(); /** 消费队列(队列为空时,无法消费,线程阻塞) */ private final Condition notEmpty = lock.newCondition(); /** 生产队列(队列满时,无法入队,线程阻塞) */ private final Condition notFull = lock.newCondition(); }

2.3、构造函数

LinkedBlockingDeque一共三种构造器,不指定容量时,默认为Integer.MAX_VALUE:

public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { /** * 默认构造器. */ public LinkedBlockingDeque() { this(Integer.MAX_VALUE); } /** * 指定容量的构造器. */ public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; } /** * 从已有集合构造队列. */ public LinkedBlockingDeque(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock lock = this.lock; lock.lock(); // Never contended, but necessary for visibility try { for (E e : c) { if (e == null) throw new NullPointerException(); if (!linkLast(new Node<E>(e))) throw new IllegalStateException("Deque full"); } } finally { lock.unlock(); } } }

2.4、队首入队

初始:

队首插入结点node:

  • 1、将目标元素 e 添加到队列头部,如果队列已满,则阻塞等待有可用空间后重试
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { /** * 将目标元素 e 添加到队列头部,如果队列已满,则阻塞等待有可用空间后重试 */ public void putFirst(E e) throws InterruptedException { // 如果存入的值为null,直接抛出空指针异常 if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; //加锁 lock.lock(); try { // 尝试在头部添加元素 while (!linkFirst(node)) // 当前线程在非满条件上等待 notFull.await(); } finally { //释放锁 lock.unlock(); } } private boolean linkFirst(Node<E> node) { // 队列已满,则直接返回 false if (count >= capacity) return false; // 读取头节点 Node<E> f = first; // 将旧头结点链接到目标节点之后 node.next = f; // 写入新头节点 first = node; // 1)当前元素为第一个添加到队列中的元素 if (last == null) // 写入尾节点 last = node; else // 将旧头节点的前置节点设置为新头结点 f.prev = node; // 递增计数 ++count; // 唤醒在非空条件上阻塞等待的线程来读取元素 notEmpty.signal(); return true; } }
  • 2、如果队列已满,则直接返回 false,否则将目标元素 e 添加到队列头部
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { /** * 如果队列已满,则直接返回 false,否则将目标元素 e 添加到队列头部 */ public boolean offerFirst(E e) { // 如果存入的值为null,直接抛出空指针异常 if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; //加锁 lock.lock(); try { // 尝试在头部添加元素 return linkFirst(node); } finally { //释放锁 lock.unlock(); } } }
  • 3、在指定的超时时间内尝试将目标元素 e 添加到队列头部,成功则返回 true
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { /** * 在指定的超时时间内尝试将目标元素 e 添加到队列头部,成功则返回 true */ public boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException { // 如果存入的值为null,直接抛出空指针异常 if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 头结点添加失败 while (!linkFirst(node)) { // 已经超时则直接返回 if (nanos <= 0) return false; // 当前线程在非满条件上阻塞等待,唤醒后再次尝试添加 nanos = notFull.awaitNanos(nanos); } return true; } finally { lock.unlock(); } } }

2.4、队尾入队

初始:

队尾插入结点node:

  • 1、将目标元素 e 添加到队列尾部,如果队列已满,则阻塞等待有可用空间后重试
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { public void put(E e) throws InterruptedException { putLast(e); } /** * 将目标元素 e 添加到队列尾部,如果队列已满,则阻塞等待有可用空间后重试 */ public void putLast(E e) throws InterruptedException { // 如果存入的值为null,直接抛出空指针异常 if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; //加锁 lock.lock(); try { // 尝试将节点链接到队列尾部 while (!linkLast(node)) // 队列已满,当前线程在非满条件上阻塞等待,被唤醒后再次尝试 notFull.await(); } finally { //释放锁 lock.unlock(); } } private boolean linkLast(Node<E> node) { // 队列已满,则直接返回 false if (count >= capacity) return false; // 读取尾节点 Node<E> l = last; // 将目标节点链接到尾节点之后 node.prev = l; // 写入尾节点为新增节点 last = node; // 1)当前元素是第一个加入队列的元素 if (first == null) // 写入头结点 first = node; else // 将旧尾节点的后置节点更新为新增节点 l.next = node; // 递增总数 ++count; // 唤醒在非空条件上等待的线程 notEmpty.signal(); return true; } }
  • 2、如果队列已满,则直接返回 false,否则将目标元素 e 添加到队列尾部
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { public boolean offer(E e) { return offerLast(e); } /** * 如果队列已满,则直接返回 false,否则将目标元素 e 添加到队列尾部 */ public boolean offerLast(E e) { // 如果存入的值为null,直接抛出空指针异常 if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; //加锁 lock.lock(); try { // 尝试将节点链接到队列尾部 return linkLast(node); } finally { //释放锁 lock.unlock(); } } }
  • 3、在指定的超时时间内尝试将目标元素 e 添加到队列尾部,成功则返回 true
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { return offerLast(e, timeout, unit); } /** * 在指定的超时时间内尝试将目标元素 e 添加到队列尾部,成功则返回 true */ public boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException { // 如果存入的值为null,直接抛出空指针异常 if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; //加锁 lock.lockInterruptibly(); try { // 尝试将目标元素 e 添加到队列尾部 while (!linkLast(node)) { // 已经超时则直接返回 false if (nanos <= 0) return false; // 当前线程在非满条件上阻塞等待,被唤醒后再次尝试 nanos = notFull.awaitNanos(nanos); } return true; } finally { //释放锁 lock.unlock(); } } }

2.5、队首出队

初始:

删除队首结点:

  • 1、移除并返回头部节点,如果队列为空,则阻塞等待有可用元素之后重试
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { public E take() throws InterruptedException { return takeFirst(); } /** * 移除并返回头部节点,如果队列为空,则阻塞等待有可用元素之后重试 */ public E takeFirst() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; // 尝试移除并返回头部节点 while ( (x = unlinkFirst()) == null) // 队列为空,则阻塞等待有可用元素之后重试 notEmpty.await(); return x; } finally { lock.unlock(); } } }
  • 2、如果队列为空,则立即返回 null,否则移除并返回头部元素
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { public E poll() { return pollFirst(); } /** * 如果队列为空,则立即返回 null,否则移除并返回头部元素 */ public E pollFirst() { final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkFirst(); } finally { lock.unlock(); } } /** * 从队首删除一个元素, 失败则返回null. */ private E unlinkFirst() { // 获取首节点 Node<E> f = first; // 首节点为null,则返回null if (f == null) return null; // 获取首节点的后继节点 Node<E> n = f.next; // 移除first,将首节点更新为n E item = f.item; f.item = null; f.next = f; // help GC first = n; // 移除首节点后,为空队列 if (n == null) last = null; else // 将新的首节点的前驱节点设置为null n.prev = null; --count; // 唤醒阻塞在notFull上的线程 notFull.signal(); return item; } }
  • 3、在指定的超时时间内尝试移除并返回头部元素,如果已经超时,则返回 null
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { public E poll(long timeout, TimeUnit unit) throws InterruptedException { return pollFirst(timeout, unit); } /** * 在指定的超时时间内尝试移除并返回头部元素,如果已经超时,则返回 null */ public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { E x; // 尝试移除并返回头部元素 while ( (x = unlinkFirst()) == null) { // 已经超时则返回 null if (nanos <= 0) return null; // 当前线程在非空条件上阻塞等待,被唤醒后进行重试 nanos = notEmpty.awaitNanos(nanos); } // 移除成功则直接返回头部元素 return x; } finally { lock.unlock(); } } }

2.6、队尾出队

初始:

删除队尾结点:

  • 1、移除并返回尾部节点,如果队列为空,则阻塞等待有可用元素之后重试
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { /** * 移除并返回尾部节点,如果队列为空,则阻塞等待有可用元素之后重试 */ public E takeLast() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; // 尝试移除并返回尾部节点 while ( (x = unlinkLast()) == null) // 队列为空,则阻塞等待有可用元素之后重试 notEmpty.await(); return x; } finally { lock.unlock(); } } /** * 从队尾删除一个元素, 失败则返回null. */ private E unlinkLast() { // 获取尾节点 Node<E> l = last; // 尾节点为null,则返回null if (l == null) return null; // 获取尾节点的前驱节点 Node<E> p = l.prev; // 移除尾节点,将尾节点更新为p E item = l.item; l.item = null; l.prev = l; // help GC last = p; // 移除尾节点后,为空队列 if (p == null) first = null; else // 将新的尾节点的后继节点设置为null p.next = null; --count; // 唤醒阻塞在notFull上的线程 notFull.signal(); return item; } }
  • 2、如果队列为空,则立即返回 null,否则移除并返回尾部元素
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { /** * 如果队列为空,则抛出异常,否则移除并返回尾部元素 */ public E removeLast() { E x = pollLast(); if (x == null) throw new NoSuchElementException(); return x; } /** * 如果队列为空,则立即返回 null,否则移除并返回尾部元素 */ public E pollLast() { final ReentrantLock lock = this.lock; lock.lock(); try { //从队尾删除一个元素, 失败则返回null. return unlinkLast(); } finally { lock.unlock(); } } }
  • 3、在指定的超时时间内尝试移除并返回尾部元素,如果已经超时,则返回 null
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { public E pollLast(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { E x; // 尝试移除并返回尾部元素 while ( (x = unlinkLast()) == null) { // 已经超时则返回 null if (nanos <= 0) return null; // 当前线程在非空条件上阻塞等待,被唤醒后进行重试 nanos = notEmpty.awaitNanos(nanos); } // 移除成功则直接返回尾部元素 return x; } finally { lock.unlock(); } } }

三、LinkedBlockingQueue与LinkedBlockingDeque对比

LinkedBlockingQueue

  • FIFO;
  • 读写分开两个ReentrantLock;

LinkedBlockingDeque

  • FIFO & FILO;
  • 全局一把ReentrantLock;

参考: https://www.itzhai.com/articles/graphical-blocking-queue.html

https://www.cnblogs.com/zhuxudong/p/10079511.html

https://segmentfault.com/a/1190000016398508

https://www.cnblogs.com/snake107/p/12035580.html

上一篇:Java并发编程——ForkJoinPool
下一篇:没有了
网友评论