当前位置 : 主页 > 编程语言 > 其它开发 >

BlockingQueue 源码分析

来源:互联网 收集:自由互联 发布时间:2022-05-30
学习多线程定时器时遇到 BlockingQueue 阻塞队列,当时的认识仅限于了解其是一个并发阻塞队列,不知如何使用及其原理 1. 介绍 BlockingQueue 首先是一个队列,其次提供了阻塞功能。它看起

学习多线程定时器时遇到 BlockingQueue 阻塞队列,当时的认识仅限于了解其是一个并发阻塞队列,不知如何使用及其原理


1. 介绍

BlockingQueue 首先是一个队列,其次提供了阻塞功能。它看起来很像消息队列可让消息解耦,但其在生产者-消费者模型中通过阻塞又可使二者速度达到平衡。使用阻塞队列无需过多考虑线程安全问题,专注业务逻辑的实现即可


BlockingQueue 有正常的队列功能,即出队与入队。其阻塞体现在:

  • 当队列满时,若有生产者继续往队列添加元素,则阻塞这个生产者线程
  • 当队列为空,若有消费者继续从队列移除元素,则阻塞这个消费者线程
  • 添加元素时,则队列不为空,则唤醒消费者线程
  • 移除元素时,则队列不为满,则唤醒生产者线程

常见的阻塞队列:

  • ArrayBlockingQueue:基于数组的有界阻塞队列
  • LinkedBlockingQueue:基于链表的有界阻塞队列
  • PriorityBlockingQueue:基于堆的优先级无界阻塞队列
  • DelayQueue:基于时间优先级的无界阻塞队列

后面将介绍 ArrayBlockingQueue、LinkedBlockingQueue 两个阻塞队列, 其类继承图如下:







2. Queue 接口

Queue 接口具有队列的基本方法,其不同之处在于同一个功能他有两套方法,两套方法区别于一套是实现返回值,另一套是抛出异常

Throw Exception Return value 增加 add() offer() 删除 remove() poll() 检查 element() peek()





3. BlockingQueue 接口

BlockingQueue 接口在 Queue 的接口上添加多几个方法或重载,最常用的方法有 put 和 take(有阻塞功能)

非阻塞 阻塞线程方法 增加 offer(E e, long timeout, TimeUnit unit):等待超时后返回值 put() 删除 poll(long timeout, TimeUnit unit):等待超时后返回值 take() 包含 contains(Object o) 剩余大小 remainingCapacity() 转移元素 drainTo(Collection<? super E> c)





4. AbstractQueue 抽象类

通过 AbstractQueue 抽象类可知道 Queue 接口的两套方法其实本质是一样的,只不过多了一层抛异常的判断而已

public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> {

    /**
     * 构造器
     */
    protected AbstractQueue() {
    }


    /**
     * add 调用的还是 offer 方法返回值
     */
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

    
    /**
     * remove 调用的还是 poll 方法返回值
     */
    public E remove() {
        E x = poll();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }

    
    /**
     * remove 调用的还是 poll 方法返回值
     */
    public E element() {
        E x = peek();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }


    /**
     * 清空元素
     */
    public void clear() {
        while (poll() != null)
            ;
    }

    
    /**
     * 批量添加元素
     */
    public boolean addAll(Collection<? extends E> c) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        boolean modified = false;
        for (E e : c)
            if (add(e))
                modified = true;
        return modified;
    }
}






5. ArrayBlockingQueue

ArrayBlockingQueue 使用循环数组来存储元素,其大小是确定的,所以属于有界阻塞队列。阻塞功能由可重入锁实现,出队入队都靠同一个锁来控制,并且由条件锁来实现生产者和消费者线程的阻塞和唤醒。


5.1 内部属性
// 存储元素的数组
final Object[] items;

// 可获取元素的下标
int takeIndex;

// 可存放元素的下标
int putIndex;

// 元素总数量
int count;

// 可重入锁
final ReentrantLock lock;

// 非空条件锁,队列为空时阻塞消费者线程
private final Condition notEmpty;

// 非满条件锁,队列满时阻塞生产者线程
private final Condition notFull;

// 自定义的迭代器(可使用默认提供的)
transient Itrs itrs = null;

5.2 关键方法
// 入队,添加元素后队列则不为空,会唤醒被阻塞的消费者线程
private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}
// 出队,移除元素后队列则不满,唤醒被阻塞的生产者线程
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

5.3 常用方法

put、take 是最常用的方法,对应添加与移除元素

添加元素时,锁住整个数组。若队列满了则阻塞当前添加元素的线程,否则添加完元素后唤醒消费者线程进行消费

// 添加元素,
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    
    // 获取锁
    lock.lockInterruptibly();
    try {
        
        // 若队满,则阻塞生产者线程
        while (count == items.length)
            notFull.await();
        
        // 入队
        enqueue(e);
    } finally {
        
        // 释放锁
        lock.unlock();
    }
}

移除元素时,锁住整个数组。若队列空了则阻塞当前移除元素的线程,否则移除完元素后唤醒生产者线程进行生产

// 移除元素
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    
    // 获取锁
    lock.lockInterruptibly();
    try {
        
        // 队空则阻塞消费者线程
        while (count == 0)
            notEmpty.await();
        
        // 出队
        return dequeue();
    } finally {
        
        // 释放锁
        lock.unlock();
    }
}

5.4 使用示例

下面来模拟一个生产-消费者场景。发现刚开始生产者一直生产商品,到队满后则被阻塞,此后消费者消费了一个商品后,生产者才生产一个商品

public class BlockingQueueTest {
    public static void main(String[] args) {

        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(5);
        
        // 生产者线程,每秒生产一个商品
        new Thread(()->{
            while (true) {
                try {
                    Thread.sleep(1000);
                    System.out.println("生产者生产了商品");
                    blockingQueue.put(new Integer(1));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();


        // 消费者线程,每5秒消费一个商品
        new Thread(()->{
            while (true) {
                try {
                    Thread.sleep(5000);
                    System.out.println("消费者消费了商品");
                    blockingQueue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}






6. LinkedBlockingQueue

LinkedBlockingQueue 底层是一个单向链表,可指定链表大小(不指定默认为 Integer.MAX_VALUE),实现原理和 ArrayBlockingQueue 类似,不同在于其链表的结构可实现同时出队和入队,此时需要两把锁来控制并发,所以其出队入队是不影响的


下面的实现类是 删除简化很多代码之后的结果 ,为了简洁的展示实现原理

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
  
    /**
     * 单向链表的节点
     */
    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) {
        	item = x;
        }
    }

    /** 最大容量 */
    private final int capacity;

    /** 现有元素总数量,原子操作 */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * 链表头
     */
    transient Node<E> head;

    /**
     * 链表尾
     */
    private transient Node<E> last;

    /** take方法的可重入锁,控制并发 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** take方法的条件锁 */
    private final Condition notEmpty = takeLock.newCondition();

    /** put方法的可重入锁,控制并发 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** put方法的条件锁 */
    private final Condition notFull = putLock.newCondition();

    /**
     * 唤醒非空阻塞的线程
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    /**
     * 唤醒非满阻塞的线程
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

    /**
     * 链表尾入队
     */
    private void enqueue(Node<E> node) {
        last = last.next = node;
    }

    /**
     * 链表头出队
     */
    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

    
	/**
	 * 入队
	 */
	public void put(E e) throws InterruptedException {
		if (e == null) throw new NullPointerException();
       
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
         
        // 获取 put 方法锁,锁住链表尾部
        putLock.lockInterruptibly();
        try {
           
            // 队满时,阻塞添加元素的线程
            while (count.get() == capacity) {
                notFull.await();
            }
            
            // 入队操作
            enqueue(node);
            c = count.getAndIncrement();
            
            // 添加元素后队列还没满,唤醒非满阻塞的线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        
        // 说明队列为空了,唤醒被阻塞的生产者线程
        if (c == 0)
            signalNotEmpty();
    }
    
    
    /**
	 * 出队
	 */
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        
        // 获取 take 方法锁,锁住链表头部
        takeLock.lockInterruptibly();
        try {
            
            // 队空时,阻塞获取元素的线程
            while (count.get() == 0) {
                notEmpty.await();
            }
            
            // 出队
            x = dequeue();
            c = count.getAndDecrement();
            
            // 出队后,队列不为空,继续唤醒出队线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        
		// 队满了,唤醒被阻塞的消费者线程
        if (c == capacity)
            signalNotFull();
        return x;
    }
 }


上一篇:【cs231n】损失函数及梯度下降优化
下一篇:没有了
网友评论