BlockingQueue源码分析
2022/3/2 20:46:17
本文主要是介绍BlockingQueue源码分析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
一、阻塞队列简介
队列常被用来解决生产——消费者问题,Java中定义了Queue
接口以及通用的一些抽象方法
public interface Queue<E> extends Collection<E> { // 添加一个元素,添加成功返回true,如果队列满了就抛出异常 boolean add(E e); //添加一个元素,添加成功返回true,如果队列满了返回false boolean offer(E e); // 删除并返回队首元素,队列为空则抛出异常 E remove(); // 移除并返回队首元素,队列为空则返回null E poll(); // 返回队首元素,但并不移除,队列为空则抛出异常 E element(); // 返回队首元素,但并不移除,队列为空则返回null E peek(); }
上面所列举出来的只是普通的队列的通用方法,而Java中的阻塞队列BlockingQueue
,继承了Queue
接口,同时又添加了两个具有阻塞功能的抽象方法,同时又提供了offer()
和poll
两个可阻塞的重载方法:
通过下面阻塞方法的定义可以看出,只要是会被阻塞的方法,都会抛出InterruptedException
异常
public interface BlockingQueue<E> extends Queue<E> { // 添加元素,队列满时,插入线程会被阻塞,直到队列不满 void put(E e) throws InterruptedException; // 移除并返回元素,队列为空时,获取元素线程会被阻塞,直到队列非空 E take() throws InterruptedException; // 可以指定添加元素时线程被阻塞的超时时间 boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; // 可以指定获取元素时线程被阻塞的超时时间 E poll(long timeout, TimeUnit unit) throws InterruptedException; }
对BlockingQueue
的常用方法做一个归纳如下:
方法 | 抛出异常 | 返回特定值 | 阻塞 | 指定阻塞时间 |
---|---|---|---|---|
入队 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
出队 | remove() | poll() | take() | poll(time,unit) |
获取队首元素 | element() | peek() | 不支持 | 不支持 |
阻塞队列除了具有可阻塞的特性之外,还有另外一个重要的特性就是容量大小,分为有界和无界。没有绝对意义的上的无界,只是这个界限很大,可以放很多元素。以LinkedBlockingQueue
为例,它的容量大小为Integer.MAX_VALUE
,这是一个非常大的数字,我们通常认为它就是无界的。也有一些阻塞队列是有界的,比如ArrayBlockingQueue
,如果达到最大容量之后,也不会进行扩容。所以一旦满了就无法再往里面放数据了。
BlockingQueue
同时也是线程安全的,它可以保证多线程的情况下,保证生产者和消费者的线程安全,其内部大多都是采用CAS
和ReentrantLock
来保证线程安全,业务代码无需再关注多线程安全的问题,直接向队列里面放或者取就可以了,如图所示:
同时,阻塞队列还启动了资源隔离的作用,在复杂业务中,业务A完成后,只需要将结果丢到队列中即可,不需要关心后面的步骤,业务B会从队列中获取任务来执行对应的业务,实现了业务之间的解耦,也可以提高安全性。
下面就介绍一些常用的阻塞队列和部分核心源码
二、常用阻塞队列及核心源码分析
2.1 ArrayBlockingQueue
ArrayBlockingQueue
是一个典型的有界的线程安全的阻塞队列,初始化时需要指定其容量大小,其内部元素使用数组进行存储,以put()
方法为例,使用ReentrantLock
来保证线程安全,通过条件队列的两个条件notEmpty
和notFull
来进行阻塞和唤醒
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { final Object[] items; int takeIndex; int putIndex; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; public ArrayBlockingQueue(int capacity) { this(capacity, false); } // 加锁保证线程安全 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 使用while而不是if,是为了防止虚假唤醒 while (count == items.length) // 队列满了,生产者阻塞 notFull.await(); enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; // 如果添加一个元素队列满了之后,会被putIndex置为0,典型的环形数组的实现 if (++putIndex == items.length) putIndex = 0; count++; // 条件队列转同步队列并唤醒线程 notEmpty.signal(); } }
由于ArrayBlockingQueue
的put()
和take()
方法使用ReentrantLock
进行同步,同时只有一个方法可以执行,所以在高并发的情况下,性能会比较差。
思考:ArrayBlockingQueue为什么采用双指针环形数组的方式?
普通的数组,删除数组元素时需要进行移位操作,导致它的时间复杂度为O(n),而采用双指针环形数组,不需要进行移位,只需要分别移动两个指针即可。
2.2 LinkedBlockingQueue
LinkedBlockingQueue
是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE
,由于这个数值特别大,所以 LinkedBlockingQueue
也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM
错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。
LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { static class Node<E> { E item; // 元素内容 Node<E> next; // 下一个元素节点 单链表结构 Node(E x) { item = x; } } // 初始化容量,默认Integer.MAX_VALUE private final int capacity; // 元素个数,因为读写操作的锁分离,这里使用线程安全的计数变量 private final AtomicInteger count = new AtomicInteger(); // 链表头,本身不存储元素信息,其item为null transient Node<E> head; // 链表尾元素 private transient Node<E> last; // 获取元素的锁,锁分离,提高效率 private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); // 添加元素的锁 private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition(); public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 使用put锁,可以被中断 putLock.lockInterruptibly(); try { // 队列满了,阻塞生产者线程 while (count.get() == capacity) { notFull.await(); } enqueue(node); // 返回旧值 c = count.getAndIncrement(); // 可能有很多线程阻塞在notFull这个条件上,而取元素时才会唤醒notFull,此处不用等到取元素时才唤醒 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } // 队列之前为空,现在新增了一个元素后,可以直接去唤醒获取元素的线程 if (c == 0) signalNotEmpty(); } }
LinkedBlockingQueue与ArrayBlockingQueue对比
- ArrayBlockingQueue使用一个独占锁,读写不分离,而LinkedBlockeingQueue使用两个独占锁,读写操作锁分离,性能更好
- 队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。
- 数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。
- 由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。
2.3 LinkedBlockingDeque
LinkedBlockingDeque
是对LinkedBlockingQueue
的增强,其顶层接口为Deque
,该接口定义了更加丰富的操作队列的方法,通过方法名就可以看出来,这些方法打破了队列先进先出的固有规则,提供了可以从头部或者尾部操作的API
public interface Deque<E> extends Queue<E> { void addFirst(E e); void addLast(E e); boolean offerFirst(E e); boolean offerLast(E e); E removeFirst(); E removeLast(); E pollFirst(); E pollLast(); E getFirst(); E getLast(); E peekFirst(); E peekLast(); }
BlockingDeque
接口继承了Deque
,同时又提供了几个可阻塞的方法
public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> { void putFirst(E e) throws InterruptedException; void putLast(E e) throws InterruptedException; E takeFirst() throws InterruptedException; E takeLast() throws InterruptedException; }
而LinkedBlockingDeque
实现了BlockingDeque
接口,其内部通过双向链表来记录元素,通过一把ReentrantLock
来保证线程安全,该类可以看成是ArrayBlockingQueue
和LinkedBlockingQueue
的结合与增强
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { static final class Node<E> { E item; Node<E> prev; Node<E> next; Node(E x) { item = x; } } transient Node<E> first; transient Node<E> last; private transient int count; private final int capacity; final ReentrantLock lock = new ReentrantLock(); private final Condition notEmpty = lock.newCondition(); private final Condition notFull = lock.newCondition(); public LinkedBlockingDeque() { this(Integer.MAX_VALUE); } public void putFirst(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkFirst(node)) notFull.await(); } finally { lock.unlock(); } } private boolean linkFirst(Node<E> node) { // 超过容量,直接返回 if (count >= capacity) return false; Node<E> f = first; node.next = f; first = node; if (last == null) last = node; else f.prev = node; ++count; // 唤醒被阻塞的获取元素的线程 notEmpty.signal(); return true; } }
2.4 SynchronousQueue
SynchronousQueue
是一个没有缓冲的BlockingQueue
,生产者线程对元素的插入操作put()
必须等待消费者的移除操作take()
,其模型如下图:
如上图所示,SynchronousQueue
最大的不同在于,它的容量为0,没有地方来缓存元素,这就导致了每次添加元素都会被阻塞,直到有线程来取元素;同理,取元素也是一样,取元素的线程也会被阻塞,直到有线程添加元素。
由于SynchronousQueue
不需要持有元素,它的作用在于直接传递,所以它非常适用于传递性场景做交换工作,生产者线程和消费者线程同步传递某些信息、事务或任务
SynchronousQueue
常见的一个场景就是在Executors.newCachedThreadPool()
中,因为不确定生产者的请求数量(创建任务),而这些请求又需要被及时处理,那么使用SynchronousQueue
为每个生产者线程分配一个消费者线程就是处理效率最高的方式。线程池会根据需要(新任务到来)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60s之后会被回收。
下面结合源码来分析它的实现原理:
SynchronousQueue
内部抽象类Transferer
提供了任务传递的方法transfer()
,而该方法内部包含了线程阻塞与唤醒的逻辑,而Transferer
有两个实现类TransferQueue
和TransferStack
,可以理解为存储阻塞线程的方式有两种:队列和栈。根据这两者的特性,可以分为公平和非公平的实现,队列满足FIFO(先进先出)
的特性,所以是公平的实现;而栈满足LIFO(后进先出)
的特性,所以是非公平的实现。
下面SynchronousQueue
的构造方法,提供了公平和非公平的选项,默认为非公平实现
public SynchronousQueue() { this(false); } public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
下面以TransferQueue
为例简要分析元素入队和出队的操作,SynchronousQueue
的put()
和take()
都会去调用transfer()
方法添加元素或获取元素
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } } public E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }
下面分析TransferQueue
的transfer()
方法,在分析该方法之前,先看一下它的内部类QNode
,TransferQueue
中使用QNode
来记录元素和被阻塞的线程,其中还利用UNSAFE
来获取元素和下一个节点的偏移量,直接通过CAS修改对应的数值,QNode
中还有很多的CAS
方法这里没有一一列举出来。
static final class TransferQueue<E> extends Transferer<E> { static final class QNode { // 下一个节点的指针 volatile QNode next; // next node in queue // 元素内容 volatile Object item; // CAS'ed to or from null // 被阻塞的线程 volatile Thread waiter; // to control park/unpark // 用于区分节点类型,false表示为取元素,true为添加元素 final boolean isData; // 通过CAS修改下一个节点(多线程安全) boolean casNext(QNode cmp, QNode val) { return next == cmp && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // QNode属性的偏移量 private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { // 根据Unsafe类计算属性在QNode类中的偏移量 UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = QNode.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } // 头节点 transient volatile QNode head; // 尾节点 transient volatile QNode tail; // 初始化时就创建一个元素为null,数据类型为false的节点,头尾节点都指向该该节点 TransferQueue() { QNode h = new QNode(null, false); // initialize to dummy node. head = h; tail = h; } // 计算头尾节点的偏移量,通过CAS直接修改(保证线程安全) private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; private static final long cleanMeOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = TransferQueue.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail")); cleanMeOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("cleanMe")); } catch (Exception e) { throw new Error(e); } } }
上面介绍了TransferQueue
大致的内部构造,下面重点看transfer()
方法实现,
E transfer(E e, boolean timed, long nanos) { QNode s = null; // constructed/reused as needed boolean isData = (e != null); for (;;) { QNode t = tail; QNode h = head; // 自旋等待初始化完成 if (t == null || h == null) // saw uninitialized value continue; // spin // 为空或者当前节点 if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; // 防止其他线程修改,这里再次判断 if (t != tail) // inconsistent read continue; // 如果当前尾节点后面还有节点,则通过CAS把后面的节点修改为尾节点 if (tn != null) { // lagging tail advanceTail(t, tn); continue; } // 如果需要超时阻塞,但超时时间小于0(不能阻塞),直接返回null // put或take方法中中断该线程并抛出中断异常 if (timed && nanos <= 0) // can't wait return null; // 创建一个节点,通过CAS添加到尾节点后面,这个节点可以是取元素的节点,也可以是添加元素的节点 if (s == null) s = new QNode(e, isData); if (!t.casNext(null, s)) // failed to link in continue; // 新的节点添加完成之后,通过CAS将其修改为尾节点 advanceTail(t, s); // swing tail and wait // 自旋阻塞线程 下面重点介绍 Object x = awaitFulfill(s, e, timed, nanos); // 如果返回的节点为当前节点,表示该节点被取消了,直接清除掉 if (x == s) { // wait was cancelled clean(t, s); return null; } if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; } else { // complementary-mode // 队列不为空,且新的节点类型与队列里面的节点类型不一致(说明可以唤醒线程了) QNode m = h.next; // node to fulfill // 为了保证线程安全,再次判断自旋 if (t != tail || m == null || h != head) continue; // inconsistent read Object x = m.item; // 如果m节点已经被别的线程处理了,这里就修改头节点自旋 if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; } // 当前线程去修改头节点,阻塞线程节点出队 advanceHead(h, m); // successfully fulfilled // 唤起m节点被阻塞的线程 LockSupport.unpark(m.waiter); return (x != null) ? (E)x : e; } } }
在transfer()
中有一个重要的方法awaitFulfill
,它会去进行自旋阻塞
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 根据处理器的核数计算自旋次数 int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // 如果当前线程被中断了,就修改S节点的item属性为当前节点 //然后在判断节点是否被取消时就直接判断其item值是否为当前节点即可 if (w.isInterrupted()) s.tryCancel(e); // 当节点取消就返回 Object x = s.item; if (x != e) return x; // 过了超时时间就取消 if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(e); continue; } } // 自旋,达到一定次数之后,填充S节点的waiter属性为当前线程,然后就阻塞 // 至此一个节点的内容就完整了 if (spins > 0) --spins; else if (s.waiter == null) s.waiter = w; else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
2.5 PriorityBlockingQueue
PriorityBlockingQueue
是一个无界的基于数组的优先级阻塞队列,虽然它是无界的,但在初始化的时候,它是可以指定数组初始化容量的,它的无界是基于它可以进行动态扩容而言的。
如果没有指定初始化容量,它默认的容量为11,最大容量为Integer.MAX_VALUE - 8
private static final int DEFAULT_INITIAL_CAPACITY = 11; private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); }
同时,PriorityBlockingQueue
是一个优先级队列,它每次出队都会返回优先级最高或最低的元素,它的构造方法中提供了自定义Comparator
比较器,默认情况下使用自然顺序升序排序。
通过下面的构造方法也可以看出,该队列线程安全是由ReentrantLock
来保证的,同时需要注意的是PriorityBlockingQueue不能保证同等优先级元素的顺序
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }
那么PriorityBlockingQueue
如果只是简单的使用数组操作来对插入元素移除进行排序,其性能将是非常低的,而它采用的是最大最小堆的方式来插入或移除数据,大小堆只是逻辑上的一种操作方式而已,其储存结构依然是数组
完全二叉树:除了最后一行,其他行都满的二叉树,而且最后一行所有叶子节点都从左向右开始排序
二叉堆:完全二叉树的基础上,加以一定的条件约束的一种特殊的二叉树。根据约束条件的不同,二叉堆又可以分为两个类型:大顶堆和小顶堆。
最大最小堆满足以下特性:
- 最大堆:根结点的键值是所有堆结点键值中最大者
- 最小堆:根结点的键值是所有堆结点键值中最小者
下图展示了最小二叉堆的情况:
最大最小堆按照从上到下,从左到右来一次表示索引位置,上图中右下角的数字表示该元素在数组中的索引下标
在最大最小二叉堆中,插入或移除元素时,都可能涉及到元素位置调整,而在二叉堆中,利用元素的下标索引,可以很简单的计算其父节点以及左右节点的下标(以索引下标为t的元素为例):
父节点:P(t) = (t-1) >>> 1 <=> (t-1)/2
左节点:L(t) = t <<< 1 +1 <=> t*2 +1
右节点:R(t) = t <<< 1 + 2 <=> t*2 +2
下面结合源码分析它是如何添加和移除元素的
由于PriorityBlockingQueue
是无界队列,所以添加元素时线程不需要阻塞,容量不够进行扩容就可以了
public void put(E e) { offer(e); // never need to block } public boolean offer(E e) { if (e == null) throw new NullPointerException(); // 加锁保证线程安全 final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; // 如果已经达到当前容量就进行扩容 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (cmp == null) // 需要注意如果没有执行比较器,元素类必须实现Comparable接口 siftUpComparable(n, e, array); else // 指定了比较器,就是用自定义的来做比较 siftUpUsingComparator(n, e, array, cmp); size = n + 1; // 添加元素后,直接唤醒被阻塞的获取元素的线程 notEmpty.signal(); } finally { lock.unlock(); } return true; }
扩容的代码如下,tryGrow()
方法在offer()
方法的while
循环体内部,就实现了CAS+自旋的方式来实现线程安全的扩容
private void tryGrow(Object[] array, int oldCap) { // 释放锁,下面通过CAS来进行扩容 lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { // 如果原来的容量小于64,则容量就扩大一倍再+2,否则容量直接扩大为原来的三倍 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); // 容量不能超过最大值 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } // 其他线程在扩容时,当前线程就让出CPU if (newArray == null) // back off if another thread is allocating Thread.yield(); lock.lock(); if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }
核心的方法在于siftUpComparable()
和siftUpUsingComparator()
这两个方法,这两个方法才是二叉堆入队的核心方法,以siftUpUsingComparator()
为例来分析
这里面是一个while
循环,进行元素的上浮操作,每次都是获取当前节点的父节点,然后与插入的元素进行比对,如果比较的结果满足最大最小堆的结构,就直接退出循环,否则就换位,继续进行比较,知道满足条件
private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (cmp.compare(x, (T) e) >= 0) break; array[k] = e; k = parent; } array[k] = x; }
获取元素的代码如下,因为获取元素的线程会被阻塞,所以这个方法会抛出中断异常
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null) // 如果没有元素就进行阻塞 notEmpty.await(); } finally { lock.unlock(); } return result; } private E dequeue() { // 最后一个元素的索引下标 int n = size - 1; if (n < 0) return null; else { Object[] array = queue; E result = (E) array[0]; // 取出最后一个元素 E x = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } } private static <T> void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp) { if (n > 0) { int half = n >>> 1; while (k < half) { // 第一次进来时,取得是第二层的两个节点进行比较 int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; // 如果左节点与右节点比较,满足比较条件,就把右节点的值作为与最后一个节点比较的值 if (right < n && cmp.compare((T) c, (T) array[right]) > 0) c = array[child = right]; if (cmp.compare(x, (T) c) <= 0) break; array[k] = c; k = child; } array[k] = x; } }
2.6 DelayQueue
DelayQueue
是一个支持延时获取元素的阻塞队列,内部采用PriorityQueue
存储元素,同时元素必须实现Delayed
接口,接口的getDelay()
方法可以返回延时时间延时的时间,方法参数为时间工具类TimeUnit
在获取元素是,只有延迟时间到了才能从队列中提取元素。
延迟队列的特点:并不是先进先出,而是按照延迟时间的长短进行排序,下一个被执行的任务排在队列的最前面。
由于队列元素必须实现Delayed
接口,而该接口又继承自Comparable
接口,所以,元素类还要去实现compareTo()
方法,这样在创建队列时就不需要在额外创建Comparator
对象了,元素本身就具有了排序的能力。
下面定义了一个元素类
class DelayObject implements Delayed { private String name; private long time; //延时时间 public DelayObject(String name, long delayTime) { this.name = name; this.time = System.currentTimeMillis() + delayTime; } @Override public long getDelay(TimeUnit unit) { long diff = time - System.currentTimeMillis(); return unit.convert(diff, TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed obj) { if (this.time < ((DelayObject) obj).time) { return -1; } if (this.time > ((DelayObject) obj).time) { return 1; } return 0; } }
使用Demo:
//实例化一个DelayQueue BlockingQueue<DelayObject> blockingQueue = new DelayQueue<>(); //向DelayQueue添加2个元素对象,注意延时时间不同 blockingQueue.put(new DelayObject("lizhi", 1000 * 10)); //延时10秒 blockingQueue.put(new DelayObject("linan", 1000 * 30)); //延时30秒 // 取出lizhi DelayObject lizhi = blockingQueue.take(); // 取出linan DelayObject linan = blockingQueue.take();
下面看一下DelayQueue
的构造,使用ReentrantLock
来保证线程安全,取元素需要进行阻塞,底层使用PriorityQeue
进行存储,这是一个优先级队列,与上面PriorityBlockingQueue
是一样的,只是没有阻塞功能
private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>(); private final Condition available = lock.newCondition();
下面看一下具体的put()
和take()
public void put(E e) { offer(e); } public boolean offer(E e) { final ReentrantLock lock = this.lock; // 加锁保证线程安全 lock.lock(); try { // 调用PriorityQueue添加元素 // 与PriorityBlockingQueue的逻辑基本一致 q.offer(e); // 如果当前队列只有这一个元素,就去唤醒阻塞的线程 if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
下面是take()
方法,要比put()
方法复杂一些
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 加锁保证线程安全,由于线程可能会被阻塞,所以这里可中断 lock.lockInterruptibly(); try { for (;;) { // 取出队列第一个元素,如果没有,直接让当前线程阻塞 E first = q.peek(); if (first == null) available.await(); else { // 如果延迟时间已到,直接取出该元素 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting // 如果有线程已经在阻塞了,就让当前线程直接去阻塞 if (leader != null) available.await(); else { // 没有线程阻塞,则记录当前线程,然后让当前线程阻塞,阻塞的时间等于最近元素的延迟时间 Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { // 当前线程被唤醒后,重置leader,然后自旋 if (leader == thisThread) leader = null; } } } } } finally { // 出队成功后,如果leader为空,并且当前对了还有元素,就去唤醒下一个被阻塞的线程 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
延迟队列的应用场景**:**
- 订单超时关闭:下单后在规定时间内没有付款就取消订单
- 异步短信通知:外卖下单成功60S之后给用户发送短信
- 关闭空闲连接:连接池中,有一些非核心的连接在空闲一段时间后就关闭
三、选择合适的阻塞队列
我们接触的比较多的就是线程中使用阻塞队列,线程池有很多种,不同种类的线程池会根据自己的特点,来选择适合自己的阻塞队列。
- FixedThreadPool(SingleThreadExecutor 同理)选取的是 LinkedBlockingQueue
- CachedThreadPool 选取的是 SynchronousQueue
- ScheduledThreadPool(SingleThreadScheduledExecutor同理)选取的是延迟队列
注:ScheduledThreadPool
中使用的阻塞队列并不是DelayQueue
,而是自定义实现的DelayedWorkQueue
一般从以下几个维度来选择合适的阻塞队列
-
功能
比如是否需要阻塞队列帮我们排序,如优先级排序、延迟执行等。如果有这个需要,就必须选择类似于 PriorityBlockingQueue 之类的有排序能力的阻塞队列。
-
容量
是否有存储的要求,还是只需要“直接传递”。在考虑这一点的时候,我们知道前面介绍的那几种阻塞队列,有的是容量固定的,如 ArrayBlockingQueue;有的默认是容量无限的,如 LinkedBlockingQueue;而有的里面没有任何容量,如 SynchronousQueue;而对于 DelayQueue 而言,它的容量固定就是 Integer.MAX_VALUE。所以不同阻塞队列的容量是千差万别的,我们需要根据任务数量来推算出合适的容量,从而去选取合适的 BlockingQueue。
-
能够扩容
因为有时我们并不能在初始的时候很好的准确估计队列的大小,因为业务可能有高峰期、低谷期。如果一开始就固定一个容量,可能无法应对所有的情况,也是不合适的,有可能需要动态扩容。如果我们需要动态扩容的话,那么就不能选择 ArrayBlockingQueue ,因为它的容量在创建时就确定了,无法扩容。相反,PriorityBlockingQueue 即使在指定了初始容量之后,后续如果有需要,也可以自动扩容。所以我们可以根据是否需要扩容来选取合适的队列。
-
内存结构
我们分析过 ArrayBlockingQueue 的源码,看到了它的内部结构是“数组”的形式。和它不同的是,LinkedBlockingQueue 的内部是用链表实现的,所以这里就需要我们考虑到,ArrayBlockingQueue 没有链表所需要的“节点”,空间利用率更高。所以如果我们对性能有要求可以从内存的结构角度去考虑这个问题。
-
性能
从性能的角度去考虑。比如 LinkedBlockingQueue 由于拥有两把锁,它的操作粒度更细,在并发程度高的时候,相对于只有一把锁的 ArrayBlockingQueue 性能会更好。另外,SynchronousQueue 性能往往优于其他实现,因为它只需要“直接传递”,而不需要存储的过程。如果我们的场景需要直接传递的话,可以优先考虑 SynchronousQueue。
这篇关于BlockingQueue源码分析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-15PingCAP 黄东旭参与 CCF 秀湖会议,共探开源教育未来
- 2024-05-13PingCAP 戴涛:构建面向未来的金融核心系统
- 2024-05-09flutter3.x_macos桌面os实战
- 2024-05-09Rust中的并发性:Sync 和 Send Traits
- 2024-05-08使用Ollama和OpenWebUI在CPU上玩转Meta Llama3-8B
- 2024-05-08完工标准(DoD)与验收条件(AC)究竟有什么不同?
- 2024-05-084万 star 的 NocoDB 在 sealos 上一键起,轻松把数据库编程智能表格
- 2024-05-08Mac 版Stable Diffusion WebUI的安装
- 2024-05-08解锁CodeGeeX智能问答中3项独有的隐藏技能
- 2024-05-08RAG算法优化+新增代码仓库支持,CodeGeeX的@repo功能效果提升