日常开发中对于多线程的使用,一般很少直接new Thread。因为线程的频繁创建、销毁会耗费大量的系统资源。为此基于池化技术的线程池应运而生
Executors类
在Executors类中提供了很多创建线程池的工厂方法。这里介绍下一些常见的工厂方法
newFixedThreadPool
该方法创建一个固定数量线程的线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class TestThreadPool {
public static void test1() { ExecutorService executor = Executors.newFixedThreadPool(3); work(executor); executor.shutdown(); }
private static void work(ExecutorService executor) { for (int i=0; i<10; i++) { executor.submit( new Job("Job-"+i) ); } } }
|
从测试结果可以看到,无论多少个任务,可用的线程数量都是固定的
newCachedThreadPool
该方法创建一个线程池,当没有空闲线程可用时,其会一直创建新的线程来处理任务
1 2 3 4 5 6 7 8 9 10
| public class TestThreadPool {
public static void test2() { ExecutorService executor = Executors.newCachedThreadPool(); work(executor); executor.shutdown(); } }
|
测试结果如下所示
newSingleThreadExecutor
该方法创建的线程池中只有一个线程,故提交至此的任务会依次执行
1 2 3 4 5 6 7 8 9 10
| public class TestThreadPool {
public static void test3() { ExecutorService executor = Executors.newSingleThreadExecutor(); work(executor); executor.shutdown(); } }
|
测试结果如下所示
newScheduledThreadPool
该方法创建的线程池可用于执行定时任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class TestThreadPool {
public static void test4() {
Consumer<String> task = (String taskName) -> { String now = DateUtil.format( new Date(), "HH:mm:ss" ); String info = "<Time>: " + now + " [Thread]: " + Thread.currentThread().getName() + " <Job>: " + taskName; System.out.println( info ); try{ Thread.sleep(1000*5); }catch (InterruptedException e) { e.printStackTrace(); } };
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4); System.out.println( "<Time>: " + DateUtil.format( new Date(), "HH:mm:ss") );
executorService.schedule( ()->task.accept("OneTime"), 10, TimeUnit.SECONDS );
executorService.scheduleAtFixedRate( ()->task.accept("fixedRate"), 30, 10, TimeUnit.SECONDS);
executorService.scheduleWithFixedDelay( ()->task.accept("fixedDelay") , 30, 10, TimeUnit.SECONDS); } }
|
其中上述代码中scheduleAtFixedRate、scheduleWithFixedDelay方法第三个参数的含义分别是两次任务开始执行的间隔时间、上一次任务结束至本次任务开始的间隔时间。与SpringBoot中的@Scheduled(fixedRate)、@Scheduled(fixedDelay)注解的用途类似。值得一提的是,对于scheduleAtFixedRate而言,当 我们指定的两次任务开始执行的间隔时间 小于 该任务执行一次所需的耗时 时,将会以 该任务执行所需的耗时 作为 两次任务开始执行的实际间隔时间
测试结果如下所示。从蓝框可以看出,三个任务的第一次执行时机均按指定的延时时间(分别延迟10秒、30秒、30秒)启动;从绿框可知,对于名为fixedRate的任务而言,每次开始执行的间隔为10秒;从红框可知,对于名为fixedDelay的任务而言,每次开始执行的间隔为15秒。因为其上一次任务结束至本次任务开始的时间间隔为10秒,加上该任务本身耗时5秒,故累计为15秒
ThreadPoolExecutor类
概述
事实上对于上述的工厂方法而言,其内部是使用线程池ThreadPoolExecutor类。该类的继承结构如下所示
与线程类似。对于线程池而言,其整个生命周期阶段也存在若干不同的状态。具体如下
- Running:该状态下,线程池可以接受新任务,并能够处理阻塞队列中的任务
- ShutDown:该状态下,线程池不再可以接受新任务,但能够继续处理阻塞队列中的任务
- Stop:该状态下,线程池不再可以接受新任务,也不会继续处理阻塞队列中的任务。同时会中断正在处理的任务
- Tidying:该状态下,线程池中的工作线程数量为0。并且会调用terminated()钩子方法(hook method)
- Terminated:当terminated()钩子方法(hook method)执行完毕后,线程池进入该状态
各状态的变化流程如下所示
值得一提的是,在ThreadPoolExecutor的实现过程中。其通过一个AtomicInteger类型的原子变量ctl实现了对线程池状态、工作线程数的记录。具体来说,是将高3位用于表示线程池状态,剩余位表示工作线程数。runStateOf方法用于获取线程池状态信息,workerCountOf方法用于获取工作线程数
在实际应用过程中,线程池ThreadPoolExecutor常见参数如下:
- corePoolSize:线程池的核心线程数
- maximumPoolSize:线程池的最大线程数
- keepAliveTime:空闲线程的超时时间,用于终止空闲线程。通常其只对线程池中超过corePoolSize的多余线程生效。除非allowCoreThreadTimeOut属性设为true,才会对核心线程生效
- unit:keepAliveTime参数的时间单位。其可选值定义在枚举类TimeUnit中
- workQueue:任务的阻塞队列
- handler:当 任务队列workQueue已满 且 线程数已达到maximumPoolSize ,提交新任务时的拒绝策略
其在接收任务后的基本流程如下所示
拒绝策略
JDK提供了四种拒绝策略,均实现了RejectedExecutionHandler接口
DiscardPolicy 丢弃策略
该策略下当提交的任务 无空闲线程执行 或 任务队列已满 时,则会直接被丢弃且不会产生任何异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class RejectedPolicyDemo {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1) );
public static void test1() { System.out.println("-------------------- 拒绝策略:直接丢弃 --------------------"); executor.setRejectedExecutionHandler( new ThreadPoolExecutor.DiscardPolicy() ); executeJob( "clean" );
executor.shutdown(); }
private static void executeJob(String name) { for (int i=0; i<10; i++) { executor.submit( new Job( name+"-"+i) ); } } }
|
测试结果如下所示,#3-#9号任务被直接丢弃了
DiscardOldestPolicy 丢弃最老策略
该策略下当提交的任务 无空闲线程执行 或 任务队列已满 时,则会丢弃队列中最旧的任务以释放空间来存储该任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class RejectedPolicyDemo {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1) );
public static void test2(){ System.out.println("-------------------- 拒绝策略:丢弃队列中最旧的任务 --------------------"); executor.setRejectedExecutionHandler( new ThreadPoolExecutor.DiscardOldestPolicy() ); executeJob( "register" );
executor.shutdown(); } }
|
测试结果如下所示。当#0、#1号任务在执行时,#2~#9号任务不断被存储到队列、然后被丢弃以存放最新的任务。所以#9号任务最终被保留并执行
AbortPolicy 中止策略
在该策略下,当线程池无法继续接收提交的任务时会抛出RejectedExecutionException异常。其也是线程池的默认拒绝策略。显然抛出异常的方式可以让开发者更好的把握系统的运行状态。当然在此种拒绝策略下,我们需要处理好其所抛出的异常,以免打断当前的执行流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class RejectedPolicyDemo {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1) );
public static void test3(){ System.out.println("-------------------- 拒绝策略:抛异常 --------------------"); executor.setRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy() ); try{ executeJob( "request" ); }catch (RejectedExecutionException e) { System.out.println("[Error] 提交到线程池的任务量过多"); } executor.shutdown(); } }
|
测试结果如下所示
CallerRunsPolicy 调用者执行策略
在该策略下,当线程池无法继续接收提交的任务时,其会交由调用者(提交任务的线程)去执行完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class RejectedPolicyDemo {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1) );
public static void test4() { System.out.println("-------------------- 拒绝策略:由调用者执行 --------------------"); executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy() ); executeJob( "caller" );
executor.shutdown(); } }
|
测试结果如下所示,符合预期。线程池无法继续接收新任务时,其会被提交任务的线程(即这里的main线程)执行完成
Note
- 当线程池的拒绝策略为DiscardPolicy、DiscardOldestPolicy时,则对于被拒绝任务的Future实例而言。如果在其上调用无参的get()方法,则会导致一直被阻塞。故在此种场景下,推荐使用支持超时机制的get()方法。测试代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class ThreadPoolDemo {
@Test public void test1() throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), new ThreadPoolExecutor.DiscardPolicy());
Future future1 = executor.submit( taskFactory("Task 1") ); Future future2 = executor.submit( taskFactory("Task 2") ); Future future3 = executor.submit( taskFactory("Task 3") );
future1.get(); System.out.println("future 1 Over");
future2.get(); System.out.println("future 2 Over");
future3.get(); System.out.println("future 3 Over"); }
private static Runnable taskFactory(String taskName) { return () -> { try{Thread.sleep(3000);} catch (Exception e){} System.out.println(taskName + ": Over"); }; } }
|
测试结果如下所示,符合预期
- 线程池使用完毕后,应通过 shutdown() 方法进行关闭。实例代码如下所示
1 2 3 4 5 6 7 8 9 10
| public static void main(String[] args) { System.out.println("Hello World"); ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); threadPoolExecutor.execute( ()-> System.out.println("Test Task") );
System.out.println("Main Over"); }
|
测试结果如下所示,左下角处的红色方块表明由于线程池未关闭,JVM依然存在并未退出。正确做法是放开上述代码中对线程池关闭操作的注释
参考文献
- Java并发编程之美 翟陆续、薛宾田著