Fork me on GitHub

从DelayQueue到 Leader-follower线程模型

前言

今天看源码,从 DelayQueue中发现了一些有意思的事情: Leader-follower线程模型。 顺便记录一下源码解析和Leader-follower线程模型的思想。

DelayQueue

主要成员

如下四个:

// 一把可重入锁
private final transient ReentrantLock lock = new ReentrantLock();

// 优先级队列,根据Delay排序
private final PriorityQueue<E> q = new PriorityQueue<E>();

// Leader-follower线程模型的 leader
private Thread leader = null;

// 条件,用于阻塞通知
private final Condition available = lock.newCondition();

Delayed接口

能放入延迟队列的元素必须实现Delayed接口,其设计也比较明显。 实现Delayed接口的对象必须实现:

  • getDelay方法:此对象的剩余时间
  • compareTo方法:用于排序,而且要与getDelay方法一致的排序

    public interface Delayed extends Comparable<Delayed> {
    
      long getDelay(TimeUnit unit);
    }
    

总体思路

  1. 优先级无界队列PriorityQueue当做容器。
  2. 入队的元素必须实现Delayed接口.
  3. 出队时获取队列头的元素,队列头的元素是根据Delayed排序获取的最先过期的元素。

核心方法

主要是入队offer方法和出队take方法。

offer(E e)

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e); // 插入优先级队列
        if (q.peek() == e) { // 如果队首元素是当前元素
            leader = null; // leader置空
            available.signal(); // 唤醒等待线程
        }
        return true;
    } finally {
        lock.unlock();
    }
}

take()

等待直到有一个过期的元素可用,获取并删除队首元素。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek(); // 获取队首元素
            // 如果队首元素为空,则等待唤醒
            if (first == null)
                available.await();
            else {
                // 获取超时时间
                long delay = first.getDelay(NANOSECONDS);
                // 已过期即 delay<=0,直接从队列中出队并返回
                if (delay <= 0)
                    return q.poll();
                // 下面这个很关键
                first = null; // don't retain ref while waiting
                // 如果leader不为null,说明有其他线程在操作,则等待唤醒
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    // leader设置为当前线程
                    leader = thisThread;
                    try {
                        // 超时等待
                        available.awaitNanos(delay);
                    } finally {
                        // 释放leader
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 最终还是需要判断释放leader已被释放,并唤醒
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

简单明了:

  1. 首先是获取队首元素,如果队首元素已过期 ,则直接出队。
  2. 否则设置 first = null,这里设置为null的主要目的是为了避免内存泄漏。如果 leader != null 则表示当前有线程占用,则阻塞。
  3. 否则设置leader为当前线程,然后调用awaitNanos()方法超时等待。

注:first = null的作用是避免其他线程持有 first 的引用并且不会被回收,避免了线程多的情况造成内存泄漏的可能。

Leader-follower线程模型

前面DelayQueue里涉及到的leader有些摸不清楚为啥要这样,于是补充了一下Leader-follower线程模型知识,如下一图胜千言:

如图,涉及三种状态:

  • leading:leader线程负责监听事件。
  • following:follower为其他线程,处于等待状态。
  • processing:leader处理事件时状态轮转为processing状态。

过程如下:

  1. 一个新的事件到来时:leader线程监听到此事件,follower线程们等待。
  2. leader线程接受请求: leader线程轮转为processing状态,leader线程会释放自己leader的角色,follower线程们里会挑选出一个新的线程作为leader线程,其余的follower则会继续等待。
  3. leader线程处理请求:之前的leader线程(processing状态)会开始处理事件,处理完成以后变成一个follower角色进行等待唤醒。

优势

  • 接受请求和进行处理使用的是同一个线程,这避免了线程上下文切换和线程通讯数据拷贝。
  • 不需要消息队列。

适用场景:线程可以快速的完成工作任务的情况。

总结

从延迟队列DelayQueue的源码分析,了解到了Leader-follower线程模型。

  • DelayQueue实现还是比较简单的,只是增加了只获取过期任务的条件。
  • DelayQueue源码里涉及避免内存泄漏的代码,再今后自己写高并发代码的情况下提供了一种思路。
  • Leader-follower线程模型,它的优势与适用场景,也提供了多线程任务的一种思路。
-------------本文结束,感谢您的阅读-------------
贵在坚持,如果您觉得本文还不错,不妨打赏一下~
0%