当前位置 : 主页 > 编程语言 > java >

Java并发编程——ArrayBlockingQueue

来源:互联网 收集:自由互联 发布时间:2023-02-04
一、阻塞队列 BlockingQueue 在java.util.concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的

一、阻塞队列 BlockingQueue

在java.util.concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。

1.1、BlockingQueue的基本原理

先来解释一下阻塞队列:

如上图:

  • 1、生产线程1往阻塞队列里面添加新的数据,当阻塞队列满的时候(针对有界队列),生产线程1将会处于阻塞状态,直到消费线程2从队列中取走一个数据;
  • 2、消费线程2从阻塞队列取数据,当阻塞队列空的时候,消费线程2将会处于阻塞状态,直到生产线程把一个数据放进去。

阻塞队列的基本原理就这样,至于队列是用什么数据结构进行存储的,这里并没有规定,所以后面我们可以看到很多阻塞队列的实现。

阻塞队列的常用方法

查阅BlockingQueue总结了以下阻塞队列的方法:

1、boolean add(E e)

  • 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回IllegalStateException异常。

2、boolean offer(E e)

  • 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回false。

3、void put(E e)

  • 直接在队列中插入元素,当无可用空间时候,阻塞等待。

4、boolean offer(E e, long timeout, TimeUnit unit)

  • 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false。

5、E take()

  • 获取并移除队列头部的元素,无元素时候阻塞等待。

6、E poll( long time, timeunit unit)

  • 获取并移除队列头部的元素,无元素时候阻塞等待指定时间。

7、boolean remove()

  • 获取并移除队列头部的元素,无元素时候会抛出NoSuchElementException异常。

8、E element()

  • 不移除的情况下返回列头部的元素,无元素时候会抛出NoSuchElementException异常。

9、E peek()

  • 不移除的情况下返回列头部的元素,队列为空无元素时返回null。

注意:

根据remove(Object o)方法签名可知,这个方法可以移除队列的特定对象,但是这个方法效率并不高。因为需要遍历队列匹配到特定的对象之后,再进行移除。 以上支持阻塞和超时的方法都是能够响应中断的。

1.2、BlockingQueue的实现

BlockingQueue底层也是基于AQS实现的,队列的阻塞使用ReentrantLock的Condition实现的。

 

下面我们来看看各个实现类的原理。以下分析我都会基于支持阻塞的put和take方法来分析。

二、ArrayBlockingQueue

ArrayBlockingQueue使用的数据结构是数组

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** 队列中元素保存的地方 */ final Object[] items; /** 取元素的指针 记录下一次操作的位置 */ int takeIndex; /** 放元素的指针 记录下一次操作的位置 */ int putIndex; /** 元素数量 */ int count; /** 保证并发访问的锁 */ final ReentrantLock lock; /** 等待出队的条件 消费者监视器 */ private final Condition notEmpty; /** 等待入队的条件 生产者监视器 */ private final Condition notFull; }

构造函数

  • 容量大小有构造函数的capacity参数决定。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //必须传入容量,可以控制重入锁是公平还是非公平 public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); // 初始化数组 this.items = new Object[capacity]; // 创建重入锁及两个条件 lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); //final修饰的变量不会发生指令重排 final ReentrantLock lock = this.lock; lock.lock(); // 保证可见性 不是为了互斥 防止指令重排 保证item的安全 try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } } }

2.1、入队

2.1.1、add(E e)方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public boolean add(E e) { return super.add(e); } } public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> { // AbstractQueue 调用offer(e)如果成功返回true,如果失败抛出异常 public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } }

2.1.2、offer(E e)方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public boolean offer(E e) { // 元素不可为空 checkNotNull(e); final ReentrantLock lock = this.lock; // 加锁 lock.lock(); try { if (count == items.length) // 如果数组满了就返回false return false; else { // 如果数组没满就调用入队方法并返回true enqueue(e); return true; } } finally { //释放锁 lock.unlock(); } } }

2.1.3、put(E e) 方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public void put(E e) throws InterruptedException { checkNotNull(e); // 获取ReentrantLock锁 final ReentrantLock lock = this.lock; // 加锁,如果线程中断了抛出异常 lock.lockInterruptibly(); try { // 如果队列满了,则进入条件队列进行等待 while (count == items.length) notFull.await(); // 队列不满,或者被取数线程唤醒了,那么会继续执行 // 这里会往阻塞队列添加一个数据,然后唤醒等待时间最长的取数线程 enqueue(e); } finally { // 释放ReentrantLock锁 lock.unlock(); } } }

2.1.4、offer(E e, long timeout, TimeUnit unit)方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; // 如果数组满了,就阻塞nanos纳秒,如果唤醒这个线程时依然没有空间且时间到了就返回false nanos = notFull.awaitNanos(nanos); } //入队 enqueue(e); return true; } finally { lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; // 把元素直接放在放指针的位置上 items[putIndex] = x; // 如果放指针到数组尽头了,就返回头部 if (++putIndex == items.length) putIndex = 0; // 数量加1 count++; // 唤醒notEmpty,因为入队了一个元素,所以肯定不为空了 notEmpty.signal(); } }
  • add(e)时如果队列满了则抛出异常;
  • offer(e)时如果队列满了则返回false;
  • put(e)时如果队列满了则使用notFull等待;
  • offer(e, timeout, unit)时如果队列满了则等待一段时间后如果队列依然满就返回false;
  • 利用放指针循环使用数组来存储元素;

2.2、出队

2.2.1、 remove()方法

public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> { public E remove() // 调用poll()方法出队 E x = poll(); if (x != null) // 如果有元素出队就返回这个元素 return x; else // 如果没有元素出队就抛出异常 throw new NoSuchElementException(); } }

2.2.2、 poll()方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { //如果队列里没有数据就直接返回null //否则从队列头部出队 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } }

2.2.3、 poll(long timeout, TimeUnit unit)方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 如果队列无元素,则阻塞等待nanos纳秒 // 如果下一次这个线程获得了锁但队列依然无元素且已超时就返回null while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } } }

2.2.4、 take()方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //加锁,如果线程中断了抛出异常 lock.lockInterruptibly(); try { //队列中不存元素 while (count == 0) /* * 一直等待条件notEmpty,即被其他线程唤醒 * (唤醒其实就是,有线程将一个元素入队了,然后调用notEmpty.signal() * 唤醒其他等待这个条件的线程,同时队列也不空了) */ notEmpty.await(); //否则出队 return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") // 取取指针位置的元素 E x = (E) items[takeIndex]; // 把取指针位置设为null items[takeIndex] = null; // 取指针前移,如果数组到头了就返回数组前端循环利用 if (++takeIndex == items.length) takeIndex = 0; // 元素数量减1 count--; if (itrs != null) itrs.elementDequeued(); // 唤醒notFull条件 notFull.signal(); return x; } }
  • remove()时如果队列为空则抛出异常;
  • poll()时如果队列为空则返回null;
  • take()时如果队列为空则阻塞等待在条件notEmpty上;
  • poll(timeout, unit)时如果队列为空则阻塞等待一段时间后如果还为空就返回null;
  • 利用取指针循环从数组中取元素;

如下图,以put和take方法为例:

这里put和take使用了同一个ReentrantLock,不能并发执行。

2.3、缺点

  • a、队列长度固定且必须在初始化时指定,所以使用之前一定要慎重考虑好容量;

  • b、如果消费速度跟不上入队速度,则会导致提供者线程一直阻塞,且越阻塞越多,非常危险;

  • c、只使用了一个锁来控制入队出队,效率较低。

参考: https://www.itzhai.com/articles/graphical-blocking-queue.html

https://zhuanlan.zhihu.com/p/224946304

上一篇:Java并发编程——LinkedBlockingQueue
下一篇:没有了
网友评论