前言
创建一个线程,最简单的做法是new
一个线程,但是当大量请求过来时(类比窗口卖票的场景), 创建这么多个线程的开销就很大了。 今天整理下线程池相关知识,相对于传统做法,线程池的优势还是很明显的: 节省了创建和销毁线程的时间,提高了任务执行效率,也就增加了CPU的吞吐能力
线程池的优势
引用一下方腾飞的话, 合理利用线程池能够带来三个好处。
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。
Executors中的线程池
j.u.c的Executors中默认提供了一些方便的线程池创建:
静态方法 | 线程池类型 | 说明 | 返回值的实际实现 |
---|---|---|---|
newCachedThreadPool() | 可缓存的线程池 | 如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。 | ThreadPoolExecutor |
newFixedThreadPool(int) | 固定线程池 | 每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。 | ThreadPoolExecutor |
newScheduledThreadPool(int) | 定时及周期性线程池 | 此线程池支持定时以及周期性执行任务的需求。 | ScheduledThreadPoolExecutor |
newSingleThreadExecutor() | 单线程的线程池 | 这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。 | FinalizableDelegatedExecutorService |
线程池的使用
public static void main(String[] args) {
//ExecutorService pool = Executors. newSingleThreadExecutor();
//ExecutorService pool = Executors.newFixedThreadPool(2);
ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
pool.execute(() -> System.out.println(Thread.currentThread().getName() + "正在执行..."));
}
pool.shutdown();
}
如上述测试程序,如果ExecutorService
的实现pool是newSingleThreadExecutor
的时,输出
pool-1-thread-1正在执行...
pool-1-thread-1正在执行...
pool-1-thread-1正在执行...
pool-1-thread-1正在执行...
pool-1-thread-1正在执行...
如果ExecutorService
的实现pool是newFixedThreadPool
的时,参数设置大小为2,输出
pool-1-thread-1正在执行...
pool-1-thread-2正在执行...
pool-1-thread-1正在执行...
pool-1-thread-2正在执行...
pool-1-thread-1正在执行...
如果ExecutorService
的实现pool是newCachedThreadPool
的时,输出
pool-1-thread-1正在执行...
pool-1-thread-3正在执行...
pool-1-thread-2正在执行...
pool-1-thread-4正在执行...
pool-1-thread-5正在执行...
如果是newScheduledThreadPool
,则使用方法有些不同:
public static void main(String[] args) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(2); // 创建一个可定时的线程池
// 每2秒打印一次
exec.scheduleAtFixedRate(() -> System.out.println(System.currentTimeMillis() / 1000), 1, 2, TimeUnit.SECONDS);
// 线程池中某个线程出错
exec.scheduleAtFixedRate(() -> {
System.out.println("池中一个线程出错了!");
throw new RuntimeException();
}, 1, 2, TimeUnit.SECONDS);
}
上述程序输出如下,可见池中的线程是隔离的
1540794708
池中一个线程出错了!
1540794710
1540794712
1540794714
核心ThreadPoolExecutor
无论你从上诉任何一种静态方法进去,其最终都是离不开这个类:ThreadPoolExecutor
构造器:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
解释一下这几个参数:
corePoolSize
(核心线程池大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads
方法,线程池会提前创建并启动所有基本线程。maximumPoolSize
(线程池最大大小,其值=核心线程数+其他线程数):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。keepAliveTime
(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。TimeUnit
(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。workQueue
(任务队列):用于保存等待执行的任务的阻塞队列。关于阻塞队列,可参考之前写的浅析java并发包(三):阻塞队列(BlockingQueue)threadFactory
(线程工厂):用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字,Debug和定位问题时非常又帮助。RejectedExecutionHandler
(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略:- AbortPolicy:中止策略,默认。直接抛出异常。
- CallerRunsPolicy:“调用者运行”策略,任务回退到调用者,从而降低了新任务的流量,只用调用者所在线程来运行任务。
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
- DiscardPolicy:不处理,丢弃掉。
- 第五种,通过实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
线程池运行流程
一图胜前言,直接引用来自方腾飞《聊聊并发》:
图码结合,从线程池的执行方法ThreadPoolExecutor.execute
分析。分3种,我在代码里用注释标出
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();// 获取线程数
// 1.如果线程数小于核心线程数,则addWorker创建线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.线程是否RUNNING状态且尝试加入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再检查, 任务队列不在运行且从队列中删除,执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 运行任务数量为0,则移除核心线程外的线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3.非RUNNING状态或者加入队列失败,尝试创建非核心线程直到maxPoolSize,如果失败则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
配置线程池
知道了线程池的参数和运行规则,那么如何配置线程池呢?
一. 线程池的大小。主要是根据任务类型:计算密集型 or I/O密集型 or 二者皆可?
计算密集型的任务,通常情况线程池大小=Ncpu+1最优;
I/O密集型任务,由于线程不会一直执行,通常设置为Ncpu*2。
混合型,最好因地制宜,拆分为CPU密集型和I/O密集型处理。
一个通用公式:
Ncpu:cpu的个数
Ucpu:使用cpu的个数
W/C:计算时间等待率
Nthreads=Ncpu * Ucpu * (1 + W/C)
二. 阻塞队列的选择。主要分为3种,有界队列、无界队列、同步移交。
有界队列有助于避免资源耗尽,大部分情况比较适合。使用有界队列时,队列大小与线程池大小必须一起调节。当线程池较小而队列较大时,有助于减少内存使用量,降低CPU使用率,同时减少上下文切换,但代价是可能会限制吞吐量。
无界队列可以通过使用SynchronousQueue来避免排队。只有当线程池是无界的或者可以拒绝任务时,SynchronousQueue才有实际价值。
三.关于ThreadPoolExecutor的扩展性。它提供了几个可以在子类改写的方法:beforeExecute
,afterExecute
,terminated
。可以利用这些方法在线程执行前、后、以及销毁时做一些特别操作,比如添加日志、计时、监控、收集统计信息等功能。
总结
本文主要参考了:
- java自带线程池和队列详细讲解
- 聊聊并发(三)Java线程池的分析和使用
- java并发编程实战
对这些优秀博文书籍进行了一次聚合总结吧,感谢原作者。同时,对java线程池也更加的了解一些。