0%

Java线程池之ThreadPoolExecutor

日常开发中对于多线程的使用,一般很少直接new Thread。因为线程的频繁创建、销毁会耗费大量的系统资源。为此基于池化技术的线程池应运而生

abstract.jpeg

Executors类

在Executors类中提供了很多创建线程池的工厂方法。这里介绍下一些常见的工厂方法

newFixedThreadPool

该方法创建一个固定数量线程的线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class TestThreadPool {
/**
* 定长的线程池
*/
public static void test1() {
ExecutorService executor = Executors.newFixedThreadPool(3);
work(executor);
executor.shutdown();
}

private static void work(ExecutorService executor) {
for (int i=0; i<10; i++) {
executor.submit( new Job("Job-"+i) );
}
}
}

从测试结果可以看到,无论多少个任务,可用的线程数量都是固定的

figure 1.jpeg

newCachedThreadPool

该方法创建一个线程池,当没有空闲线程可用时,其会一直创建新的线程来处理任务

1
2
3
4
5
6
7
8
9
10
public class TestThreadPool {
/**
* 可缓存的线程池
*/
public static void test2() {
ExecutorService executor = Executors.newCachedThreadPool();
work(executor);
executor.shutdown();
}
}

测试结果如下所示

figure 2.jpeg

newSingleThreadExecutor

该方法创建的线程池中只有一个线程,故提交至此的任务会依次执行

1
2
3
4
5
6
7
8
9
10
public class TestThreadPool {
/**
* 单线程的线程池
*/
public static void test3() {
ExecutorService executor = Executors.newSingleThreadExecutor();
work(executor);
executor.shutdown();
}
}

测试结果如下所示

figure 3.jpeg

newScheduledThreadPool

该方法创建的线程池可用于执行定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class TestThreadPool {
/**
* 定时任务的线程池
*/
public static void test4() {

Consumer<String> task = (String taskName) -> {
String now = DateUtil.format( new Date(), "HH:mm:ss" );
String info = "<Time>: " + now + " [Thread]: " + Thread.currentThread().getName() + " <Job>: " + taskName;
System.out.println( info );
try{
// 耗时模拟:5秒
Thread.sleep(1000*5);
}catch (InterruptedException e) {
e.printStackTrace();
}
};

ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
System.out.println( "<Time>: " + DateUtil.format( new Date(), "HH:mm:ss") );

// 一次性任务, 延迟10秒后执行
executorService.schedule( ()->task.accept("OneTime"), 10, TimeUnit.SECONDS );

// 定时任务: 延迟30秒启动,每次间隔10秒开始执行
executorService.scheduleAtFixedRate( ()->task.accept("fixedRate"), 30, 10, TimeUnit.SECONDS);

// 定时任务: 延迟30秒启动,每次完成10秒后再次执行
executorService.scheduleWithFixedDelay( ()->task.accept("fixedDelay") , 30, 10, TimeUnit.SECONDS);
}
}

其中上述代码中scheduleAtFixedRate、scheduleWithFixedDelay方法第三个参数的含义分别是两次任务开始执行的间隔时间上一次任务结束至本次任务开始的间隔时间。与SpringBoot中的@Scheduled(fixedRate)、@Scheduled(fixedDelay)注解的用途类似。值得一提的是,对于scheduleAtFixedRate而言,当 我们指定的两次任务开始执行的间隔时间 小于 该任务执行一次所需的耗时 时,将会以 该任务执行所需的耗时 作为 两次任务开始执行的实际间隔时间

测试结果如下所示。从蓝框可以看出,三个任务的第一次执行时机均按指定的延时时间(分别延迟10秒、30秒、30秒)启动;从绿框可知,对于名为fixedRate的任务而言,每次开始执行的间隔为10秒;从红框可知,对于名为fixedDelay的任务而言,每次开始执行的间隔为15秒。因为其上一次任务结束至本次任务开始的时间间隔为10秒,加上该任务本身耗时5秒,故累计为15秒

figure 4.jpeg

ThreadPoolExecutor类

概述

事实上对于上述的工厂方法而言,其内部是使用线程池ThreadPoolExecutor类。该类的继承结构如下所示

figure 5.jpeg

与线程类似。对于线程池而言,其整个生命周期阶段也存在若干不同的状态。具体如下

  • Running:该状态下,线程池可以接受新任务,并能够处理阻塞队列中的任务
  • ShutDown:该状态下,线程池不再可以接受新任务,但能够继续处理阻塞队列中的任务
  • Stop:该状态下,线程池不再可以接受新任务,也不会继续处理阻塞队列中的任务。同时会中断正在处理的任务
  • Tidying:该状态下,线程池中的工作线程数量为0。并且会调用terminated()钩子方法(hook method)
  • Terminated:当terminated()钩子方法(hook method)执行完毕后,线程池进入该状态

各状态的变化流程如下所示

figure 6.jpeg

值得一提的是,在ThreadPoolExecutor的实现过程中。其通过一个AtomicInteger类型的原子变量ctl实现了对线程池状态、工作线程数的记录。具体来说,是将高3位用于表示线程池状态,剩余位表示工作线程数。runStateOf方法用于获取线程池状态信息,workerCountOf方法用于获取工作线程数

figure 7.jpeg

在实际应用过程中,线程池ThreadPoolExecutor常见参数如下:

  • corePoolSize:线程池的核心线程数
  • maximumPoolSize:线程池的最大线程数
  • keepAliveTime:空闲线程的超时时间,用于终止空闲线程。通常其只对线程池中超过corePoolSize的多余线程生效。除非allowCoreThreadTimeOut属性设为true,才会对核心线程生效
  • unit:keepAliveTime参数的时间单位。其可选值定义在枚举类TimeUnit中
  • workQueue:任务的阻塞队列
  • handler:当 任务队列workQueue已满 且 线程数已达到maximumPoolSize ,提交新任务时的拒绝策略

其在接收任务后的基本流程如下所示

figure 8.jpeg

拒绝策略

JDK提供了四种拒绝策略,均实现了RejectedExecutionHandler接口

DiscardPolicy 丢弃策略

该策略下当提交的任务 无空闲线程执行 或 任务队列已满 时,则会直接被丢弃且不会产生任何异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class RejectedPolicyDemo {

/**
* 创建一个线程池,其最多只会创建2个线程,任务队列最多存放1个任务
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1) );

public static void test1() {
// 拒绝策略:直接丢弃
System.out.println("-------------------- 拒绝策略:直接丢弃 --------------------");
executor.setRejectedExecutionHandler( new ThreadPoolExecutor.DiscardPolicy() );
executeJob( "clean" );

executor.shutdown();
}

private static void executeJob(String name) {
for (int i=0; i<10; i++) {
executor.submit( new Job( name+"-"+i) );
}
}
}

测试结果如下所示,#3-#9号任务被直接丢弃了

figure 9.jpeg

DiscardOldestPolicy 丢弃最老策略

该策略下当提交的任务 无空闲线程执行 或 任务队列已满 时,则会丢弃队列中最旧的任务以释放空间来存储该任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class RejectedPolicyDemo {

/**
* 创建一个线程池,其最多只会创建2个线程,任务队列最多存放1个任务
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1) );

public static void test2(){
// 拒绝策略:丢弃队列中最旧的任务
System.out.println("-------------------- 拒绝策略:丢弃队列中最旧的任务 --------------------");
executor.setRejectedExecutionHandler( new ThreadPoolExecutor.DiscardOldestPolicy() );
executeJob( "register" );

executor.shutdown();
}
}

测试结果如下所示。当#0、#1号任务在执行时,#2~#9号任务不断被存储到队列、然后被丢弃以存放最新的任务。所以#9号任务最终被保留并执行

figure 10.jpeg

AbortPolicy 中止策略

在该策略下,当线程池无法继续接收提交的任务时会抛出RejectedExecutionException异常。其也是线程池的默认拒绝策略。显然抛出异常的方式可以让开发者更好的把握系统的运行状态。当然在此种拒绝策略下,我们需要处理好其所抛出的异常,以免打断当前的执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class RejectedPolicyDemo {

/**
* 创建一个线程池,其最多只会创建2个线程,任务队列最多存放1个任务
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1) );

public static void test3(){
// 拒绝策略:抛异常
System.out.println("-------------------- 拒绝策略:抛异常 --------------------");
executor.setRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy() );
try{
executeJob( "request" );
}catch (RejectedExecutionException e) {
System.out.println("[Error] 提交到线程池的任务量过多");
}
executor.shutdown();
}
}

测试结果如下所示

figure 11.jpeg

CallerRunsPolicy 调用者执行策略

在该策略下,当线程池无法继续接收提交的任务时,其会交由调用者(提交任务的线程)去执行完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class RejectedPolicyDemo {

/**
* 创建一个线程池,其最多只会创建2个线程,任务队列最多存放1个任务
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1) );

public static void test4() {
// 拒绝策略:由调用者执行
System.out.println("-------------------- 拒绝策略:由调用者执行 --------------------");
executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy() );
executeJob( "caller" );

executor.shutdown();
}
}

测试结果如下所示,符合预期。线程池无法继续接收新任务时,其会被提交任务的线程(即这里的main线程)执行完成

figure 12.jpeg

Note

  • 当线程池的拒绝策略为DiscardPolicy、DiscardOldestPolicy时,则对于被拒绝任务的Future实例而言。如果在其上调用无参的get()方法,则会导致一直被阻塞。故在此种场景下,推荐使用支持超时机制的get()方法。测试代码如下所示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ThreadPoolDemo {

@Test
public void test1() throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), new ThreadPoolExecutor.DiscardPolicy());

Future future1 = executor.submit( taskFactory("Task 1") );
Future future2 = executor.submit( taskFactory("Task 2") );
Future future3 = executor.submit( taskFactory("Task 3") );

future1.get();
System.out.println("future 1 Over");

future2.get();
System.out.println("future 2 Over");

future3.get();
System.out.println("future 3 Over");
}

private static Runnable taskFactory(String taskName) {
return () -> {
// 模拟业务耗时
try{Thread.sleep(3000);} catch (Exception e){}
System.out.println(taskName + ": Over");
};
}
}

测试结果如下所示,符合预期

figure 13.jpeg

  • 线程池使用完毕后,应通过 shutdown() 方法进行关闭。实例代码如下所示
1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
System.out.println("Hello World");
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
threadPoolExecutor.execute( ()-> System.out.println("Test Task") );

// 关闭线程池
//threadPoolExecutor.shutdown();

System.out.println("Main Over");
}

测试结果如下所示,左下角处的红色方块表明由于线程池未关闭,JVM依然存在并未退出。正确做法是放开上述代码中对线程池关闭操作的注释

figure 14.jpeg

参考文献

  1. Java并发编程之美 翟陆续、薛宾田著
请我喝杯咖啡捏~

欢迎关注我的微信公众号:青灯抽丝