线程池源码分析及动态更新大小实现

本文主要聊聊java线程池ThreadPoolExecutor的源码实现,以及如何通过Apollo配置中心来达到动态调整线程池大小

任务执行过程简述

首先我还是用图文来说明一下线程池执行任务的大致原理,防止大家一看代码就晕圈

线程池中的每一个线程我们称之为worker线程,因为源码中就是一个继承了AQS,实现了Runnable的Worker类,线程池分为2个部分:core线程,超出coreSize到maxSize之间的非core线程

线程池初始化时,里面是没有线程的,当然你也可以调用prestartCoreThread/prestartAllCoreThreads方法来启动一个或所有core线程,也就是所谓的预热,这样当第一个任务添加到线程池时不至于等待线程的创建,也就是说在一定程度上提高了响应速度

线程池中的每一个worker线程(即所有线程)在创建时,都可以传入一个任务(firstTask),之后worker线程启动,就会执行这个任务,但是不能每次来一个任务就创建一个worker线程,这失去了池化的意义,这时就有一个缓冲队列(workQueue),它和worker线程的关系可以用MQ中的消费者循环从broker拉取消息来类比,其实就是最简单的生产者消费者模式,生产者就是我们的应用程序,我们既可以在创建worker线程时,传入任务并执行,也可以在core线程数达到上限时,暂时将任务放入workQueue中,等待core worker线程轮询获取,并执行

线程池里所有的线程都有一个存活时间,由keepAliveTime参数控制,它默认只针对非core线程,如果想对core线程也设置存活时间,需要设置allowCoreThreadTimeOut为true

那么非core线程什么时候创建呢?

超出coreSize的线程会在workQueue达到容量上限之后,才会创建,那么思考一个问题,假设我的队列很大,可以容纳1000个任务,亦或者是LinkedBlockingQueue这种无界队列,里面的任务在未达到workQueue最大容量之前,又由于core线程处理之前的任务过慢,那么队列尾部的任务只有等待,如果这是一个处理http请求的任务,那么很容易造成接口超时

workQueue的大小也取决于业务特性,要求响应速度快的应该设置小,极端情况你可以用SynchronousQueue,如果业务讲求吞吐量,对延迟要求并不严苛,可以稍微设置大一些

如果workQueue满了,线程数也达到了maxSize了,会出现什么情况?

此时才是拒绝(reject)策略执行的时候,也就是RejectedExecutionHandler会处理这种情况,默认是AbortPolicy,它会抛出一个RejectedExecutionException,其它实现还有CallerRunsPolicy让我们自己定义处理方式的策略

什么时候情况下容易会出现reject

首先就是我们的maxPoolSize设置的太小了,反映在真实的业务上就是对业务流量的评估不足,亦或者是不可预料的突发流量(微博明星出轨算一个)

任务执行的流程图大致是这个样子的,这里需要说明的:因为需要执行任务而创建线程,那么这个任务对于该线程而言就是第一个任务(firstTask),那么执行完后,线程不会销毁,它会一直轮回阻塞队列,从中获取任务

下面对源码的分析主要都是依据上述图文

业务实践

延迟与吞吐量

延迟与吞吐量总是成反比的,根据不同的业务特性要做出取舍,比如要求响应速度快的业务,它就要去延迟低,比如app加载首页,而一些吞吐量大的业务,它并不需要多快的响应速度,典型的场景如报表
那么针对这些业务场景,如何去调整线程池参数呢?

侧重响应速度的业务,它需要执行的任务可能很多,但是都比较快,我们应该用更多的线程去执行任务,而等待队列就不能太大,否则任务积压会导致延迟增高,甚至超时
侧重吞吐量的业务,例如报表,离线计算,并不需要在很快的时间内完成,而是需要保证在单位时间内尽可能的执行更多的任务,也就是追求吞吐量,那么可以在保证有充分的线程数情况下,尽可能的用等待队列去缓存任务

源码分析

经过上面的讲解是希望大家不要害怕线程池的原理,它并不难,甚至比一些复杂的业务代码简单多了

使用案例

下面是一个ThreadPoolExecutor的使用案例,没有什么难点,我这里只是记录了RejectedExecutionException的发生数,因为在生产环境中我们是需要对线程池监控的,比如把该错误发生至falcon监控,并报警给研发负责人

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private static AtomicInteger order = new AtomicInteger(0);

private static AtomicInteger rejectCounter = new AtomicInteger(0);

private static BlockingQueue<Runnable> blockingDeque = new ArrayBlockingQueue<Runnable>(10);

public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS, blockingDeque,
(r) -> {
Thread t = new Thread(r,"thread-pool-demo-" + order.getAndIncrement());
return t;
},
(r, e) -> {
rejectCounter.getAndIncrement();
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
});

executor.allowCoreThreadTimeOut(true);

executor.submit(() -> {
System.out.println("executing ...");
sleep(3);
});
}

线程的状态

int类型占4个字节,共32位,这里用前3位表示线程池的状态(rs),后29位表示线程的个数(wc)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池的状态流转如下:

线程池状态

任务的执行

我们通常调用submit方法传入任务,submit源码很简单,封装成RunnableFuture之后,调用execute方法执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 新增一个core线程,任务作为该线程的firstTask,并启动了worker线程去执行任务
if (addWorker(command, true))
return;
c = ctl.get();
}
// core满了,添加到workQueue
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 注意上面的状态是isRunning,现在没有运行了,就删除任务
if (! isRunning(recheck) && remove(command))
reject(command); // reject任务
// worker线程数为0,我理解这里可能是core有超时回收的设置
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// workQueue添加失败了(满了),才创建非core的worker线程,注意第二个参数是false
else if (!addWorker(command, false))
// 创建非core worker线程失败了,就reject任务
reject(command);
}

配合我前面的文字解释及下面的注释,过程十分简单,我们用一个流程图来描述下

execute方法详情

创建worker线程并执行任务的逻辑在addWorker方法中

创建worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();int rs = runStateOf(c);


// Check if queue empty only if necessary.
// SHUTDOWN,STOP,TIDYING,TERMINATED四种状态返回失败
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
// >corePoolSize需要放入workQueue,>maximumPoolSize也会返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;


// cas (rs,wc) + 1,rs在高位,这里是wc+1
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 如果状态变了,需要重新循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建一个worker
w = new Worker(firstTask);
// 被代理的线程
final Thread t = w.thread;
if (t != null) {
// 该锁主要针对 HashSet<Worker> workers
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 线程已经启动过了
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加到Set<Worker>
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

这里核心逻辑也就是创建了work线程并传入任务:new Worker(firstTask),之后启动了线程t,那么线程t与worker的关系又是什么呢?

Worker继承AQS,同时实现了Runnable,构造方法中可以看到获取到了我们传入的ThreadFactory(),创建了Thread线程对象,同时传入当前worker对象,即new Thread(new Runnable)的形式

1
2
3
4
5
6
7
class Worker extends AbstractQueuedSynchronizer implements Runnable {
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
}

上面的addWorker方法已经启动了线程: t.start(), 那么剩下来的就很简单了,看Worker的run方法即可

获取并执行任务

runWorker方法主要就是获取task并执行,逻辑十分简单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public void run() {
runWorker(this);
}

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
// 获取完立马置为null,下面的while之后的task就为null了,就会从workQueue中获取
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 任务从2个地方获取
// 1. 创建线程时,它的firstTask
// 2. 从workQueue中获取,getTask方法,返回null说明rs>=Stop或者队列为空
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);// 啥也没做

Throwable thrown = null;
try {
// 执行我们的任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);// 啥也没做
}
} finally {
task = null;
// 已执行的任务数加1
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

getTask方法是从workQueue中获取任务,此处也可以看出keepAliveTime就是从队列中获取的超时时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
// worker需不需要淘汰,core,或者非core都有空闲时间淘汰策略
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// timed 表示core在设置了allowCoreThreadTimeOut会被淘汰,非core之外的也会被淘汰
// 空闲时间=worker从workQueue里获取任务超时的时间
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take(); // 不需要淘汰只会是core线程没有设置超时的情况,这里用take会一直等待
// 返回获取到的任务
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

关于线程池的常见问题

队列太长导致超时

假设A服务在接收到上游服务的一个请求后用线程池处理,但是队列设置的太大,假设某一个处理环境速度变慢,比如数据库处理了异常,导致大量任务积压,那么上游服务就会调用A服务超时

maxSize线程数太小

maxSize太小,同时等待队列也不大,在任务堆积满了之后,就会reject任务,有可能抛出大量的RejectedExecutionException

父子任务同用一个线程池

这种问题很隐蔽,假设任务的执行分为3步,step2需要执行的逻辑较为复杂,或者某一时刻,redis/mysql等出现了阻塞,那么就会hang在step2,就会导致父任务一直占用线程,得不到释放,正确的做法是把step2单独作为一个子任务放入到另一个线程池

1
2
3
4
5
Task{
step1();
step2();
step3();
}

动态调整线程池大小

线程池大小的设定,根据CPU密集型和IO密集型划分,但是理论毕竟是理论,它受限于以下环境:一台服务器上只有一个应用,一个应用也只有一个线程池,同时业务的流量一直都是平稳的,没有突发性
但实际应用中一台服务器上一般都有日志收集,sidecar等各类agent,一个应用中也有多个线程池,业务流量在上下班,用餐时间段高也是很正常的事

那么动态调整线程池大小就很有必要了,基于Apollo可以很轻松的实现以下方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@ApolloConfigChangeListener
public void listen(ConfigChangeEvent event) {
log.info("changing thread pool info ...");

Set<String> changeKeys = event.changedKeys();

if(changeKeys.contains(CORE_SIZE) || changeKeys.contains(MAX_SIZE)) {
if(coreSize > maxSize) {
log.warn("core size must less or equals max size");
return;
}
// 必须都重新设置一遍
threadPoolTaskExecutor.setCorePoolSize(coreSize);
threadPoolTaskExecutor.setMaxPoolSize(maxSize);
}
}

那么原理是怎样的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
// 差值
int delta = corePoolSize - this.corePoolSize;
// 重新复制
this.corePoolSize = corePoolSize;

// 如果原来的线程数大于新的线程数,则会终止
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
// 创建线程
while (k-- > 0 && addWorker(null, true)) {
// workQueue为空就先停止创建,毕竟没必要
if (workQueue.isEmpty())
break;
}
}
}
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}

至于workQueue的大小动态调整可以复制一下LinkedBlockingQueue实现一个ResizeableQueue,去掉capacity的final修饰符并提供set方法即可

总结

ThreadPoolExecutor线程池重要的是worker线程的创建,以及任务的获取,总的来说就是先创建core线程,core满了就放入workQueue,workQueue也满了就创建worker线程至maxSize,如果超出了maxSize就执行reject

引用:Java线程池实现原理及其在美团业务中的实践

Author: 紫夜
Link: https://greedypirate.github.io/2020/04/14/线程池源码分析及动态更新大小实现/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.
支付宝打赏
微信打赏