当前位置 : 主页 > 编程语言 > 其它开发 >

队列——在线程池等有限资源池中的应用

来源:互联网 收集:自由互联 发布时间:2022-06-24
  我们都知道 CPU 资源是优先的,任务的处理速度和线程个数并不是线性相关的。相反,过多的线程反而会导致 CPU 频繁的切换,处理性能下降,所以,线程池的线程数量是预先设置的

  我们都知道 CPU 资源是优先的,任务的处理速度和线程个数并不是线性相关的。相反,过多的线程反而会导致 CPU 频繁的切换,处理性能下降,所以,线程池的线程数量是预先设置的好的。
  那么当我们向一个固定大小的线程池中请求一个线程池时,如果线程池中没有空闲资源了,这个时候线程池如何处理这个请求?时拒绝请求还是排队请求?各种处理策略又是怎样实现的呢?
  实际上,这个问题并不复杂,其底层的数据结构就是我们今天要学的东西,队列。

如何理解“队列”?

  队列这个概念非常好理解,相当于超市排队结算,先来的先结算,不能插队,也就是先进先出,于栈的先进后出相反,但是队列和栈一样,也是一个操作受限的线性表数据结构。队列被多应用于一些偏向底层系统、框架、中间件的开发,比如高性能队列Disruptor、 Linux环形缓存,都用到了循环并发队列; Java concurrent并发包利用ArrayBlockingQueue来实现公平锁等。

顺序队列和链式队列

  那么提炼一个基础的队列行为,它具有先进先出的特性,支持在队尾插入元素,在对头删除元素,那它究竟该如何实现呢?跟栈一样,队列可以用数组来实现,也可以用链表来实现。用数组实现的栈叫作顺序栈,用链表实现的栈叫作链式栈。同样,用数组实现的队列叫作顺序队列,用链表实现的队列叫作链式队列。
  我们先来看一下数组的实现方式:

public class ArrayQueue {

    private String[] items;
    private int n;
    private int head;
    private int tail;

    public ArrayQueue(int capacity) {
        items = new String[capacity];
        n = capacity;
    }

    public boolean enqueue(String item) {
        if (tail == n) {
            return false;
        }

        items[tail] = item;
        tail++;
        return true;
    }

    public String dequeue() {
        if (head == tail) {
            return null;
        }

        String tmp = items[head];
        head++;
        return tmp;
    }

    public String[] getItems() {
        return items;
    }

    public void setItems(String[] items) {
        this.items = items;
    }

    public int getN() {
        return n;
    }

    public void setN(int n) {
        this.n = n;
    }

    public int getHead() {
        return head;
    }

    public void setHead(int head) {
        this.head = head;
    }

    public int getTail() {
        return tail;
    }

    public void setTail(int tail) {
        this.tail = tail;
    }
}

  比起栈的数组实现,队列的数组实现稍微有点儿复杂,但是没关系。我稍微解释一下实现思路,你很容易就能明白了。
  对于栈来说,我们只需要一个栈顶指针就可以了。但是队列需要两个指针:一个是head指针,指向队头;一个是tail指针,指向队尾。你可以借助下图来理解:

  当我们调用两次出队操作之后,队列中head指针指向下标为2的位置, tail指针仍然指向下标为4的位置。也就是说随着不停的出队和入队操作,head 和 tail 会持续移动,当 tail 移动到最右边,及时数组中还有空闲的空间,也无法继续向队列中添加数据了。这个问题如何解决?

  实际上我们可以通过搬移数据的形式,但是不搬移整块空间,看图会好理解一些:

  这样,我们的出队函数 dequeue() 保持不变,需要稍加改动入队函数 enqueue() 的实现:

public boolean enqueue_2(String item) {
    if (tail == n) {
        if (head == 0) {
            // 表示队列已满
            return false;
        }
        // 搬运数据
        for (int i = head; i < tail; i++) {
            items[i-head] = items[i];
        }
        // 搬移完之后重新更新head和tail
        tail -= head;
        head = 0;
    }

    items[tail] = item;
    ++tail;
    return true;
}

  这种实现思路中,出队操作的时间复杂度仍然是O(1)

  接下来,我们再来看下基于链表的队列实现方法。
  基于链表的实现,我们同样需要两个指针: head指针和tail指针。他们分别指向第一个结点和最后一个结点,如图所示,入队时, tail->next= new_node, tail = tail->next;出队时, head = head->next。

循环队列

  我们刚才用数组来实现队列的时候,在tail==n时,会有数据搬移操作,这样入队操作性能就会受到影响。那有没有办法能够避免数据搬移呢?我们来看看循环队列的解决思路。
  循环队列,顾名思义,它长得像一个环。原本数组是有头有尾的,是一条直线。现在我们把首尾相连,扳成了一个环。我画了一张图,你可以直观地感受一下。

  我们可以看到,图中这个队列的大小为8,当前head=4, tail=7。当有一个新的元素a入队时,我们放入下标为7的位置。但这个时候,我们并不把tail更新为8,而是将其在环中后移一位,到下标为0的位置。当再有一个元素b入队时,我们将b放入下标为0的位置,然后tail加1更新为1。所以,在a, b依次入队之后,循环队列中的元素就变成了下面的样子:

  通过这样的方式我们就避免了数据搬移的麻烦,但是判空条件就麻烦起来了,队列为空的判断条件仍然是head == tail。但队列满的判断条件就稍微有点复杂了:

  对照着上图,tail=3, head=4, n=8,所以总结一下规律就是: (3+1)%8=4。多画几张队满的图,你就会发现,当队满时, (tail+1)%n=head
  如果还没怎么理解,就可以看下代码:

public class CircularQueue {
  // 数组: items,数组大小: n
  private String[] items;
  private int n = 0;
  // head表示队头下标, tail表示队尾下标
  private int head = 0;
  private int tail = 0;
  // 申请一个大小为capacity的数组
  public CircularQueue(int capacity) {
    items = new String[capacity];
    n = capacity;
  }
  // 入队
  public boolean enqueue(String item) {
    // 队列满了
    if ((tail + 1) % n == head) return false;
    items[tail] = item;
    tail = (tail + 1) % n;
    return true;
  }
  // 出队
  public String dequeue() {
    // 如果head == tail 表示队列为空
    if (head == tail) return null;
    String ret = items[head];
    head = (head + 1) % n;
    return ret;
  }
}
阻塞队列和并发队列

  上面这些队列说实话开发都很少涉足,倒是另外一些像是阻塞队列和并发队列应用却比较广泛。阻塞队列其实就是在队列基础上增加了阻塞操作。简单来说,就是在队列为空的时候,从队头取数据会被阻塞。因为此时还没有数据可取,直到队列中有了数据才能返回;如果队列已经满了,那么插入数据的操作就会被阻塞,直到队列中有空闲位置后再插入数据,然后再返回。
  你应该已经发现了,上述的定义就是一个“生产者-消费者模型”!是的,我们可以使用阻塞队列,轻松实现一个“生产者-消费者模型”!
  这种基于阻塞队列实现的“生产者-消费者模型”,可以有效地协调生产和消费的速度。当“生产者”生产数据的速度过快, “消费者”来不及消费时,存储数据的队列很快就会满了。这个时候,生产者就阻塞等待,直到“消费者”消费了数据, “生产者”才会被唤醒继续“生产”。
  前面我们讲了阻塞队列,在多线程情况下,会有多个线程同时操作队列,这个时候就会存在线程安全问题,那如何实现一个线程安全的队列呢?
  线程安全的队列我们叫作并发队列。最简单直接的实现方式是直接在enqueue()、 dequeue()方法上加锁,但是锁粒度大并发度会比较低,同一时刻仅允许一个存或者取操作。实际上,基于数组的循环队列,利用CAS原子操作,可以实现非常高效的并发队列。这也是循环队列比链式队列应用更加广泛的原因。

开篇解答

  队列的知识就讲完了,我们现在回过来看下开篇的问题。线程池没有空闲线程时,新的任务请求线程资源时,线程池该如何处理?各种处理策略又是如何实现的?
  我们一般有两种处理策略。第一种是非阻塞的处理方式,直接拒绝任务请求;另一种是阻塞的处理方式,将请求排队,等到有空闲线程时,取出排队的请求继续处理。那如何存储排队的请求呢?其实就是所谓的拒绝策略。
  需要注意的是队列的选择,基于链表的实现方式,可以实现一个支持无限排队的无界队列(unbounded queue),但是可能会导致过多的请求排队等待,请求处理的响应时间过长。所以,针对响应时间比较敏感的系统,基于链表实现的无限排队的线程池是不合适的;而基于数组实现的有界队列(bounded queue),队列的大小有限,所以线程池中排队的请求超过队列大小时,接下来的请求就会被拒绝,这种方式对响应时间敏感的系统来说,就相对更加合理。

上一篇:Redis和Mysql保持数据一致性
下一篇:没有了
网友评论