Fork me on GitHub

浅析java并发包(三):阻塞队列(BlockingQueue)

前言

阻塞队列 (BlockingQueue)是j.u.c下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式: 当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。

接口 BlockingQueue

提供的方法API

从API文档上看,BlockingQueue定义的方法有四种形式,具有不同的操作方式,不能立即满足,但可能在将来的某个时间点满足: 一个抛出异常,第二个返回一个特殊值( null或false ,具体取决于操作),第三个程序将无限期地阻止当前线程,直到操作成功为止, 而第四个程序块在放弃之前只有给定的最大时限。 这些方法总结在下表中:

method\wayThrows exceptionSpecial valueBlocksTimes out
Insertadd(e)offer(e)put(e)offer(e, time, unit)
Removeremove()poll()take()poll(time, unit)
Examineelement()peek()not applicablenot applicable

简单解释一下四种行为方式:

  • 抛异常(Throws exception):如果试图的操作无法立即执行,抛一个异常。
  • 特定值(Special value):如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  • 阻塞(Blocks):如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  • 超时(Times out):如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是true / false)。

使用场景

阻塞队列的典型使用场景就是生产者 - 消费者场景,它可以安全的与多个生产者和多个消费者一起使用。 引用API文档里的例子:

class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }

 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }

 class Setup {
   void main() {
     BlockingQueue q = new SomeQueueImplementation();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   }
 }

其他特性

  • 不接受null元素。当尝试对一个阻塞队列执行add,put或offer一个null对象时,会抛出NullPointerException。
  • BlockingQueue实现是线程安全的。 所有排队方法使用内部锁或其他形式的并发控制在原子上实现其效果。

BlockingQueue常见实现类与介绍

  • 有界阻塞队列:ArrayBlockingQueue,LinkedBlockingQueue…
  • 无界阻塞队列:PriorityBlockingQueue,DelayQueue…
  • 优先级阻塞队列:PriorityBlockingQueue
  • 延迟阻塞队列:DelayQueue

ArrayBlockingQueue

有界阻塞队列,FIFO队列,内部是通过数组实现的,大小固定,创建后容量无法修改。尝试put成满的队列的元件将导致在操作阻挡; 尝试take从空队列的元件将类似地阻塞。

举个栗子:

public class BlockingQueueTest {
    /**
     * 实例化一个队列,队列中的容量为10
     */
    private static BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10);

    public static void main(String[] args) {
        ScheduledExecutorService product = Executors.newScheduledThreadPool(1);
        Random random = new Random();
        product.scheduleAtFixedRate(() -> {
            int value = random.nextInt(101);
            try {
                blockingQueue.offer(value);  //offer()方法就是往队列的尾部设置值
                System.out.println("已经往队列里加入数据:" + value);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }, 0, 100, TimeUnit.MILLISECONDS);  //每100毫秒执行线程

        new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(1000);
                    Integer poll = blockingQueue.poll();// 弹出
                    System.out.println("已经从队列里取出数据:" + poll);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

以上实现一个固定队列的例子,容量为10,可以通过构造器创建容量大小为n的阻塞队列。

LinkedBlockingQueue

有界阻塞队列,FIFO队列,基于链表实现,创建时可以不指定容量,不指定时默认容量Integer.MAX_VALUE。队列的节点可以动态扩展,只是不能超过容量。 这一点上与ArrayBlockingQueue区别在于ArrayBlockingQueue创建后不能再扩展队列的元素了。

使用方式,将上面的BlockingQueueTest里的阻塞队列的实现改为LinkedBlockingQueue即可。

PriorityBlockingQueue

无界阻塞队列,优先级排序。 PriorityBlockingQueue只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容。 所以,虽然这个队列逻辑上无界,但是可能会耗尽资源导致OOM的问题。 PriorityBlockingQueue的内部排序默认是自然排序,也可通过omparator指定排序规则,便于自定义优先级逻辑。

PriorityBlockingQueue内部也是通过数组实现,数组的容量可以动态扩展,源码如下:

/**
 * Tries to grow array to accommodate at least one more element
 * (but normally expand by about 50%), giving up (allowing retry)
 * on contention (which we expect to be rare). Call only while
 * holding lock.
 *
 * @param array the heap array
 * @param oldCap the length of the array
 */
private void tryGrow(Object[] array, int oldCap) {
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    lock.lock();
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

DelayQueue

无界阻塞队列,其元素是一个Delayed元素,其元素只能在其延迟到期时才被使用。 在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

其使用场景:

  • 定时任务调度。通过DelayQueue保存执行的任务和执行时间,当从DelayQueue中获取到任务时立即执行,从而实现定时调度。
  • 缓存有效期。缓存元素的有效期,当循环获取队列的元素时,只要获取到就说明缓存有效期过了。

其他队列

除了常用的队列以外,jdk还提供了一些其他的实现,比如:SynchronousQueueLinkedTransferQueueLinkedBlockingDeque等, 各有特色与其使用场景。

-------------本文结束,感谢您的阅读-------------
贵在坚持,如果您觉得本文还不错,不妨打赏一下~
0%