Fork me on GitHub

浅析java并发包(六):线程池那些事儿

前言

创建一个线程,最简单的做法是new一个线程,但是当大量请求过来时(类比窗口卖票的场景), 创建这么多个线程的开销就很大了。 今天整理下线程池相关知识,相对于传统做法,线程池的优势还是很明显的: 节省了创建和销毁线程的时间,提高了任务执行效率,也就增加了CPU的吞吐能力

线程池的优势

引用一下方腾飞的话, 合理利用线程池能够带来三个好处。

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。

Executors中的线程池

j.u.c的Executors中默认提供了一些方便的线程池创建:

静态方法线程池类型说明返回值的实际实现
newCachedThreadPool()可缓存的线程池如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。ThreadPoolExecutor
newFixedThreadPool(int)固定线程池每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。ThreadPoolExecutor
newScheduledThreadPool(int)定时及周期性线程池此线程池支持定时以及周期性执行任务的需求。ScheduledThreadPoolExecutor
newSingleThreadExecutor()单线程的线程池这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。FinalizableDelegatedExecutorService

线程池的使用

public static void main(String[] args) {
    //ExecutorService pool = Executors. newSingleThreadExecutor();
    //ExecutorService pool = Executors.newFixedThreadPool(2);
    ExecutorService pool = Executors.newCachedThreadPool();

    for (int i = 0; i < 5; i++) {
        pool.execute(() -> System.out.println(Thread.currentThread().getName() + "正在执行..."));
    }
    pool.shutdown();
}

如上述测试程序,如果ExecutorService的实现pool是newSingleThreadExecutor的时,输出

pool-1-thread-1正在执行...
pool-1-thread-1正在执行...
pool-1-thread-1正在执行...
pool-1-thread-1正在执行...
pool-1-thread-1正在执行...

如果ExecutorService的实现pool是newFixedThreadPool的时,参数设置大小为2,输出

pool-1-thread-1正在执行...
pool-1-thread-2正在执行...
pool-1-thread-1正在执行...
pool-1-thread-2正在执行...
pool-1-thread-1正在执行...

如果ExecutorService的实现pool是newCachedThreadPool的时,输出

pool-1-thread-1正在执行...
pool-1-thread-3正在执行...
pool-1-thread-2正在执行...
pool-1-thread-4正在执行...
pool-1-thread-5正在执行...

如果是newScheduledThreadPool,则使用方法有些不同:

public static void main(String[] args) {
    ScheduledExecutorService exec = Executors.newScheduledThreadPool(2); // 创建一个可定时的线程池

    // 每2秒打印一次
    exec.scheduleAtFixedRate(() -> System.out.println(System.currentTimeMillis() / 1000), 1, 2, TimeUnit.SECONDS);

    // 线程池中某个线程出错
    exec.scheduleAtFixedRate(() -> {
        System.out.println("池中一个线程出错了!");
        throw new RuntimeException();
    }, 1, 2, TimeUnit.SECONDS);
}

上述程序输出如下,可见池中的线程是隔离的

1540794708
池中一个线程出错了!
1540794710
1540794712
1540794714

核心ThreadPoolExecutor

无论你从上诉任何一种静态方法进去,其最终都是离不开这个类:ThreadPoolExecutor

构造器:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

解释一下这几个参数:

  1. corePoolSize(核心线程池大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
  2. maximumPoolSize(线程池最大大小,其值=核心线程数+其他线程数):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
  3. keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
  4. TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
  5. workQueue(任务队列):用于保存等待执行的任务的阻塞队列。关于阻塞队列,可参考之前写的浅析java并发包(三):阻塞队列(BlockingQueue)
  6. threadFactory(线程工厂):用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字,Debug和定位问题时非常又帮助。
  7. RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略:
    • AbortPolicy:中止策略,默认。直接抛出异常。
    • CallerRunsPolicy:“调用者运行”策略,任务回退到调用者,从而降低了新任务的流量,只用调用者所在线程来运行任务。
    • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
    • DiscardPolicy:不处理,丢弃掉。
    • 第五种,通过实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。

线程池运行流程

一图胜前言,直接引用来自方腾飞《聊聊并发》:

图码结合,从线程池的执行方法ThreadPoolExecutor.execute分析。分3种,我在代码里用注释标出

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();// 获取线程数
    // 1.如果线程数小于核心线程数,则addWorker创建线程执行任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2.线程是否RUNNING状态且尝试加入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //再检查, 任务队列不在运行且从队列中删除,执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 运行任务数量为0,则移除核心线程外的线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3.非RUNNING状态或者加入队列失败,尝试创建非核心线程直到maxPoolSize,如果失败则执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

配置线程池

知道了线程池的参数和运行规则,那么如何配置线程池呢?

一. 线程池的大小。主要是根据任务类型:计算密集型 or I/O密集型 or 二者皆可?

计算密集型的任务,通常情况线程池大小=Ncpu+1最优;

I/O密集型任务,由于线程不会一直执行,通常设置为Ncpu*2。

混合型,最好因地制宜,拆分为CPU密集型和I/O密集型处理。

一个通用公式:

Ncpu:cpu的个数
Ucpu:使用cpu的个数
W/C:计算时间等待率
Nthreads=Ncpu * Ucpu * (1 + W/C)

二. 阻塞队列的选择。主要分为3种,有界队列、无界队列、同步移交。

有界队列有助于避免资源耗尽,大部分情况比较适合。使用有界队列时,队列大小与线程池大小必须一起调节。当线程池较小而队列较大时,有助于减少内存使用量,降低CPU使用率,同时减少上下文切换,但代价是可能会限制吞吐量。

无界队列可以通过使用SynchronousQueue来避免排队。只有当线程池是无界的或者可以拒绝任务时,SynchronousQueue才有实际价值。

三.关于ThreadPoolExecutor的扩展性。它提供了几个可以在子类改写的方法:beforeExecute,afterExecute,terminated。可以利用这些方法在线程执行前、后、以及销毁时做一些特别操作,比如添加日志、计时、监控、收集统计信息等功能。

总结

本文主要参考了:

对这些优秀博文书籍进行了一次聚合总结吧,感谢原作者。同时,对java线程池也更加的了解一些。

-------------本文结束,感谢您的阅读-------------
贵在坚持,如果您觉得本文还不错,不妨打赏一下~
0%