前言
今天看源码,从 DelayQueue
中发现了一些有意思的事情: Leader-follower线程模型。 顺便记录一下源码解析和Leader-follower线程模型的思想。
DelayQueue
主要成员
如下四个:
// 一把可重入锁
private final transient ReentrantLock lock = new ReentrantLock();
// 优先级队列,根据Delay排序
private final PriorityQueue<E> q = new PriorityQueue<E>();
// Leader-follower线程模型的 leader
private Thread leader = null;
// 条件,用于阻塞通知
private final Condition available = lock.newCondition();
Delayed接口
能放入延迟队列的元素必须实现Delayed
接口,其设计也比较明显。 实现Delayed
接口的对象必须实现:
getDelay
方法:此对象的剩余时间compareTo
方法:用于排序,而且要与getDelay
方法一致的排序public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); }
总体思路
- 优先级无界队列
PriorityQueue
当做容器。 - 入队的元素必须实现
Delayed
接口. - 出队时获取队列头的元素,队列头的元素是根据
Delayed
排序获取的最先过期的元素。
核心方法
主要是入队offer
方法和出队take
方法。
offer(E e)
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e); // 插入优先级队列
if (q.peek() == e) { // 如果队首元素是当前元素
leader = null; // leader置空
available.signal(); // 唤醒等待线程
}
return true;
} finally {
lock.unlock();
}
}
take()
等待直到有一个过期的元素可用,获取并删除队首元素。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek(); // 获取队首元素
// 如果队首元素为空,则等待唤醒
if (first == null)
available.await();
else {
// 获取超时时间
long delay = first.getDelay(NANOSECONDS);
// 已过期即 delay<=0,直接从队列中出队并返回
if (delay <= 0)
return q.poll();
// 下面这个很关键
first = null; // don't retain ref while waiting
// 如果leader不为null,说明有其他线程在操作,则等待唤醒
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
// leader设置为当前线程
leader = thisThread;
try {
// 超时等待
available.awaitNanos(delay);
} finally {
// 释放leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 最终还是需要判断释放leader已被释放,并唤醒
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
简单明了:
- 首先是获取队首元素,如果队首元素已过期 ,则直接出队。
- 否则设置
first = null
,这里设置为null的主要目的是为了避免内存泄漏。如果 leader != null 则表示当前有线程占用,则阻塞。 - 否则设置leader为当前线程,然后调用awaitNanos()方法超时等待。
注:first = null
的作用是避免其他线程持有 first 的引用并且不会被回收,避免了线程多的情况造成内存泄漏的可能。
Leader-follower线程模型
前面DelayQueue
里涉及到的leader
有些摸不清楚为啥要这样,于是补充了一下Leader-follower线程模型知识,如下一图胜千言:
如图,涉及三种状态:
leading
:leader
线程负责监听事件。following
:follower
为其他线程,处于等待状态。processing
:leader
处理事件时状态轮转为processing
状态。
过程如下:
- 一个新的事件到来时:
leader
线程监听到此事件,follower
线程们等待。 leader
线程接受请求:leader
线程轮转为processing
状态,leader
线程会释放自己leader
的角色,follower
线程们里会挑选出一个新的线程作为leader
线程,其余的follower
则会继续等待。leader
线程处理请求:之前的leader
线程(processing
状态)会开始处理事件,处理完成以后变成一个follower
角色进行等待唤醒。
优势:
- 接受请求和进行处理使用的是同一个线程,这避免了线程上下文切换和线程通讯数据拷贝。
- 不需要消息队列。
适用场景:线程可以快速的完成工作任务的情况。
总结
从延迟队列DelayQueue
的源码分析,了解到了Leader-follower线程模型。
DelayQueue
实现还是比较简单的,只是增加了只获取过期任务的条件。DelayQueue
源码里涉及避免内存泄漏的代码,再今后自己写高并发代码的情况下提供了一种思路。- Leader-follower线程模型,它的优势与适用场景,也提供了多线程任务的一种思路。