ThreadPoolExecutor的创建

  • 当我们需要创建线程池时,我们可以使用Executors工具类创建相应的如FixedThreadPool、SingleThreadPool、CachedThreadPool等线程池。

  • 注意实际上并不是存在FixedThreadPool、SingleThreadPool、CachedThreadPool这三个类,只是Executors通过调用ThreadPoolExecutor的构造方法传入不同的参数,从而创建了不同的ThreadPoolExecutor。

  • 但是很多公司规范如阿里巴巴编码规范规定不能使用Executors工具类来创建线程池,因为其创建的线程池要么没有限制任务的最大值,要么没有限制工作线程的最大值,会导致OOM异常。

  • 有时候如果我们没有设置executor.allowCoreThreadTimeOut(true)或者没有执行executor.shutdown();,使用线程池的进程总是没有退出,这是因为核心线程的状态是WAITING (parking),而只有当JVM中只存在守护线程的时候,JVM才会彻底退出。如下图(图源极客时间)所示,如果想要一个线程终止,唯一的途径是让其从RUNNABLE状态转换为TERMINATED。那么如何让线程从WAITING状态转换为RUNNABLE呢?可以使用中断来终止线程,executor的shutdown方法正是调用了interruptIdleWorkers方法来一一中断工作线程,使用二阶段终止模式优雅地终止工作线程的。

线程状态

  • 我们可以自己封装一个ThreadPoolExecutorBuilder来创建ThreadPoolExecutor,封装如下,可以创建一个有界的默认参数线程池,也可以方便地自定义参数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class ThreadPoolExecutorBuilder {
private int corePoolSize;
private int maximumPoolSize;
private long keepAliveTime;
private TimeUnit unit;
private BlockingQueue<Runnable> workQueue;
private RejectedExecutionHandler handler;
private ThreadFactory threadFactory;
private static AtomicInteger threadId = new AtomicInteger();

/**
* 默认构造方法设置默认参数
*/
public ThreadPoolExecutorBuilder() {
int processors = Runtime.getRuntime().availableProcessors();
// 核心工作线程以及最大空闲线程数目
this.corePoolSize = processors;
this.maximumPoolSize = 2 * processors;
// 空闲线程的最大存活时间(注意参数生效的条件)
// 当线程池中线程数量大于corePoolSize(核心线程数量)或设置了allowCoreThreadTimeOut(是否允许空闲核心线程超时)时,线程会根据keepAliveTime的值进行活性检查,一旦超时便销毁线程
this.keepAliveTime = 8;
this.unit = TimeUnit.SECONDS;
// 任务队列以及饱和处理策略
this.workQueue = new ArrayBlockingQueue<>(16);
this.handler = new ThreadPoolExecutor.AbortPolicy();
// 创建线程的工厂,一般可以用来对线程命名
this.threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "threadPoolWorker-" + threadId.getAndIncrement());
}
};
}

public ThreadPoolExecutor build() {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}

public ThreadPoolExecutorBuilder setCapacity(int corePoolSize, int maximumPoolSize) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
return this;
}

public ThreadPoolExecutorBuilder setKeepAliveTime(long keepAliveTime, TimeUnit unit) {
this.keepAliveTime = keepAliveTime;
this.unit = unit;
return this;
}

public ThreadPoolExecutorBuilder setWorkQueueAndRejectHandler(BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
this.workQueue = workQueue;
this.handler = handler;
return this;
}

public ThreadPoolExecutorBuilder setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}
}