前言
今天看源码,从 DelayQueue中发现了一些有意思的事情: Leader-follower线程模型。 顺便记录一下源码解析和Leader-follower线程模型的思想。
DelayQueue
主要成员
如下四个:
// 一把可重入锁
private final transient ReentrantLock lock = new ReentrantLock();
// 优先级队列,根据Delay排序
private final PriorityQueue<E> q = new PriorityQueue<E>();
// Leader-follower线程模型的 leader
private Thread leader = null;
// 条件,用于阻塞通知
private final Condition available = lock.newCondition();
Delayed接口
能放入延迟队列的元素必须实现Delayed接口,其设计也比较明显。 实现Delayed接口的对象必须实现:
getDelay方法:此对象的剩余时间compareTo方法:用于排序,而且要与getDelay方法一致的排序public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); }
总体思路
- 优先级无界队列
PriorityQueue当做容器。 - 入队的元素必须实现
Delayed接口. - 出队时获取队列头的元素,队列头的元素是根据
Delayed排序获取的最先过期的元素。
核心方法
主要是入队offer方法和出队take方法。
offer(E e)
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e); // 插入优先级队列
if (q.peek() == e) { // 如果队首元素是当前元素
leader = null; // leader置空
available.signal(); // 唤醒等待线程
}
return true;
} finally {
lock.unlock();
}
}
take()
等待直到有一个过期的元素可用,获取并删除队首元素。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek(); // 获取队首元素
// 如果队首元素为空,则等待唤醒
if (first == null)
available.await();
else {
// 获取超时时间
long delay = first.getDelay(NANOSECONDS);
// 已过期即 delay<=0,直接从队列中出队并返回
if (delay <= 0)
return q.poll();
// 下面这个很关键
first = null; // don't retain ref while waiting
// 如果leader不为null,说明有其他线程在操作,则等待唤醒
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
// leader设置为当前线程
leader = thisThread;
try {
// 超时等待
available.awaitNanos(delay);
} finally {
// 释放leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 最终还是需要判断释放leader已被释放,并唤醒
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
简单明了:
- 首先是获取队首元素,如果队首元素已过期 ,则直接出队。
- 否则设置
first = null,这里设置为null的主要目的是为了避免内存泄漏。如果 leader != null 则表示当前有线程占用,则阻塞。 - 否则设置leader为当前线程,然后调用awaitNanos()方法超时等待。
注:first = null的作用是避免其他线程持有 first 的引用并且不会被回收,避免了线程多的情况造成内存泄漏的可能。
Leader-follower线程模型
前面DelayQueue里涉及到的leader有些摸不清楚为啥要这样,于是补充了一下Leader-follower线程模型知识,如下一图胜千言:

如图,涉及三种状态:
leading:leader线程负责监听事件。following:follower为其他线程,处于等待状态。processing:leader处理事件时状态轮转为processing状态。
过程如下:
- 一个新的事件到来时:
leader线程监听到此事件,follower线程们等待。 leader线程接受请求:leader线程轮转为processing状态,leader线程会释放自己leader的角色,follower线程们里会挑选出一个新的线程作为leader线程,其余的follower则会继续等待。leader线程处理请求:之前的leader线程(processing状态)会开始处理事件,处理完成以后变成一个follower角色进行等待唤醒。
优势:
- 接受请求和进行处理使用的是同一个线程,这避免了线程上下文切换和线程通讯数据拷贝。
- 不需要消息队列。
适用场景:线程可以快速的完成工作任务的情况。
总结
从延迟队列DelayQueue的源码分析,了解到了Leader-follower线程模型。
DelayQueue实现还是比较简单的,只是增加了只获取过期任务的条件。DelayQueue源码里涉及避免内存泄漏的代码,再今后自己写高并发代码的情况下提供了一种思路。- Leader-follower线程模型,它的优势与适用场景,也提供了多线程任务的一种思路。


