Netty NioEventLoopGroup

———— 4.1.72.Final
words: 6.4k    views:    time: 26min

在Java中,任务执行的主要抽象不是Thread,而是Executor,它提供了一种标准的方式将任务的提交过程与执行过程解耦开来,为灵活而强大的异步任务执行框架提供了基础,线程池ThreadPoolExecutor便是在此基础上实现的一套异步执行框架。

Netty也在Executor的基础上自定义实现了一套高性能的任务执行框架,本文尝试从I/O事件执行器NioEventLoopGroup作为切入点对其中的部分源码做一些分析,希望能从别人的设计中汲取一些思路和启发。建议在分析Netty中的任务执行机制之前先对Java中原生的任务执行框架有所了解,可以参考笔记:

类继承结构

上图主要站在NioEventLoopGroup/NioEventLoop的角度,梳理了一下相关的类结构,整体上是一个装饰器模式,右边ExecutorGroup是左边Executor的一个聚合,想当于一个整体的装饰器,而左边Executor本身也是一个装饰器。

右边ExecutorGroup的职责是负责管理左边Executor的生命周期(即创建和关闭),以及给它们分配要执行的任务,它在执行器集合的基础上增加了一个负载均衡的能力,至于这个分配逻辑则由EventExecutorChooserFactory定义,可以由外部实现;

左边的Executor也管理了一个内部执行器(由外部提供),并负责它的启动和关闭,但是任务队列由自己管理,也就说它将自己的任务队列交给一个外部的执行器来负责执行。SingleThreadEventLoop中默认使用的ThreadPerTaskExecutor来执行,虽然本身是单线程,但是多个Executor组成的group就相当于一个线程池了。

这样在总体上就显得比较灵活,用户可以很方便地自定义任务分配的策略,以及任务执行的方式。

这里可以与Java线程池的任务执行策略做个比较:

下图是线程池中对于任务异步执行的策略,其通过一个任务队列将任务的提交和执行两个过程解耦,也就是生成-消费模式,这样实现了线程的复用,避免了重复建立线程的开销。但存在一个问题,就是所有的工作线程都阻塞在一个共享的队列上,那么势必会存在锁竞争问题,这样在高并发场景下就可能会成为一个性能瓶颈。

所以改进版的设计就是让每个工作线程维护自己的任务队列,这样就不存在与其它线程竞争的问题了。而NioEventLoopGroup / NioEventLoop采用的就是这种设计,如下图所示

1. NioEventLoopGroup

下面以NioEventLoopGroup为例,从其初始化、任务执行execute、以及关闭shutdownGracefully的过程,对源码做一些分析

1.1. new

1
EventLoopGroup eventGroup = new NioEventLoopGroup();

通过跟代码,会发现其初始化逻辑最终在MultithreadEventExecutorGroup中,这里主要做了三件事:

  1. 初始化一个执行器实例executor,默认使用ThreadPerTaskExecutor
  2. 初始化子执行器,这里就是NioEventLoop,默认共创建2倍的cpu个数;
  3. 初始化任务选择器,默认的策略是平均分配,实现方式根据具体子执行器的个数不同略有差异;
io.netty.util.concurrent.MultithreadEventExecutorGroup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// nThreads = 2 * cpu, executor = null, chooserFactory = DefaultEventExecutorChooserFactory.INSTANCE
// args = [SelectorProvider.provider(), DefaultSelectStrategyFactory.INSTANCE, RejectedExecutionHandlers.reject()]
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
checkPositive(nThreads, "nThreads");

if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

children = new EventExecutor[nThreads];

for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
// ... ... 如果失败了,则关闭前面已经创建的
}
}
}

chooser = chooserFactory.newChooser(children);

// 这里创建一个共享的结束事件监听器,然后分发给每个子执行器,
// 目的是让最后一个关闭的子执行器帮忙将状态置为完成,也就是关闭这个动作完成了
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}

// ... ...
}
1.1.1. ThreadPerTaskExecutor

默认创建的执行器是在每次执行任务时都创建一个新的线程

io.netty.util.concurrent.ThreadPerTaskExecutor
1
2
3
4
5
6
7
public final class ThreadPerTaskExecutor implements Executor {
// ... ...
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}

对于创建的线程由默认的线程工厂定义,这里它将任务和线程分别包装成FastThreadLocalRunnableFastThreadLocalThread,至于这样做的目的后面再进行分析(TODO),这里先不讨论

io.netty.util.concurrent.DefaultThreadFactory
1
2
3
4
5
6
7
8
9
10
@Override
public Thread newThread(Runnable r) {
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
// ... ...
return t;
}

protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
1.1.2. newChild

初始化子执行器,这里就是创建NioEventLoop

io.netty.channel.nio.NioEventLoopGroup
1
2
3
4
5
6
7
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
// ... ...
return new NioEventLoop(this, executor, selectorProvider,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}

NioEventLoop是一个单线程执行器,其初始化过程中将传入的执行器executor又装饰了一层,目的主要是为了线程在执行任务的过程中,能通过ThreadLocal找到自己所对应的执行器。

根据传入的ThreadPerTaskExecutor可以知道,后面SingleThreadEventExecutor在真正执行任务时将会创建一个新的线程,并将这个线程记住,也就是绑定了,但是反过来如果线程在执行任务的过程想知道自己是在哪个SingleThreadEventExecutor中创建的,就只能通过ThreadLocal了。

io.netty.util.concurrent.SingleThreadEventExecutor
1
2
3
4
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, ...) {
// ... ...
this.executor = ThreadExecutorMap.apply(executor, this);
}
io.netty.util.internal.ThreadExecutorMap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Executor() {
@Override
public void execute(final Runnable command) {
executor.execute(apply(command, eventExecutor)); // 执行任务时创建新的线程
}
};
}

public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Runnable() {
@Override
public void run() {
setCurrentEventExecutor(eventExecutor);
try {
command.run();
} finally {
setCurrentEventExecutor(null);
}
}
};
}
1.1.3. newChooser

初始化选择器,默认由工厂DefaultEventExecutorChooserFactory创建,其策略是平均分配,至于实现方式则根据执行器总数是否为2的幂,分成了2种情况:如果是,则直接减一然后进行与运算即可,否则只能通过取余运算。

不过平均分配也可能存在问题,即如果任务的执行耗时不一样,那么可能存在这样的场景:某个子执行器被分配了几个耗时长的任务一直在执行,而其他的子执行器却被闲置。

这个问题有两种解决思路,一种是从任务分配者的角度,在分配任务时根据实际情况,将任务分配给一个当前负载程度较低的子执行器来执行。另一种是从任务执行者的角度出发,可以让空闲的执行者尝试从其它执行者的任务队列尾部获取待执行的任务,类似工作密取模式,这样一定程度上就实现了执行者之间的自平衡。

两种思路各有优缺点,前者分配者很难真正了解执行器的负载情况,只能通过任务数来比较个大概,比如执行器A有2个待执行任务,每个任务耗时5min,而执行器B有5个待执行任务,每个任务耗时2s,这种情况分配者就很难解决,因为它无法知道任务的具体耗时,只有执行者自己在真正执行后才知道什么时候能结束。第二种理论上应该可以解决这个问题,但是实现起来会比较复杂,而且让空闲的执行器从其它执行器的任务队列中获取任务,这个过程中会存在大量的无效竞争,这又回到了共享队列的问题上了,因此最终效果可能适得其反。

所以在使用时需要注意,只有当任务都是同类型并且相互独立时,性能才能达到最佳,也就是说这个问题完全可以由用户自己避免。

io.netty.util.concurrent.DefaultEventExecutorChooserFactory
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

private DefaultEventExecutorChooserFactory() { }

@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}

private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
// ... ...
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
// ... ...
@Override
public EventExecutor next() {
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}

1.2. execute

任务的提交入口由ExecutorGroup对外提供,然后它再通过某种策略将任务分配给子Executor,而子Executor将任务保存到队列,并最终由委托给自己依赖的执行器执行。

1.2.1. AbstractEventExecutorGroup

对于NioEventLoopGroup,其逻辑在父类AbstractEventExecutorGroup中实现,next()就是选择一个子执行器进行执行

io.netty.util.concurrent.AbstractEventExecutorGroup
1
2
3
4
5
6
7
8
9
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {

@Override
public void execute(Runnable command) {
next().execute(command);
}

// ... ... 其它的 next().XXX
}
1.2.2. SingleThreadEventExecutor

NioEventLoop是一个单线程执行器,其执行上下文在父类SingleThreadEventExecutor中定义。

这里SingleThreadEventExecutor将启动的时机,延迟到了真正执行任务时进行,即在收到分配的任务后,首先将任务添加到任务队列中,然后检查当前状态,如果没启动则尝试进行启动,接着再检查状态是否已经关闭,如果关闭了则尝试将刚才添加的任务删除掉,如果删除成功了再执行reject(),具体下面注释已经很详细了。

io.netty.util.concurrent.SingleThreadEventExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Override
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}

private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task); // 首先将任务添加到任务队列
if (!inEventLoop) { // 如果没启动,则尝试进行启动
startThread();
if (isShutdown()) { // 如果已经关闭,则再尝试将添加到队列中的任务删除掉
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// 如果任务队列不支持删除操作,那么最好的选择就是继续执行下去,
// 并希望能够在真正关闭之前这个任务能被pick-up并消费掉,否则最差的情况就是在结束时打印一下日志。
}
if (reject) { // 如果上面删除成功,则执行reject()逻辑,比如用户可以根据反馈将任务保存或者怎么样
reject();
}
}
}

if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}

// 通过CAS操作保证在并发情况下,只有一个线程能将状态置为ST_STARTED,然后执行启动;并且如果它失败了,其它线程还可以重试
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}

具体启动的逻辑,就是委托依赖的执行器executor执行一个由子类实现的run()方法,通常是一个无限循环从队列中阻塞获取任务并执行的过程,这里负责定义了任务执行前后的一些上下文处理,比如在执行前记录下执行的线程,以及在退出时做一些善后处理。

根据前面的分析知道,这里默认依赖的执行器是ThreadPerTaskExecutor,它会在执行时帮忙拉起一个新的线程,然后这里会在任务起跑前通过Thread.currentThread()将线程记下来。

io.netty.util.concurrent.SingleThreadEventExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void doStartThread() {
assert thread == null; // 启动之前,thread应该是null
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread(); // 记录当前执行器中负责执行任务的线程
if (interrupted) {
thread.interrupt();
}

boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run(); // 子类实现,一般就是循环从队列中获取并执行任务
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) { // 通过CAS操作,确保状态改为关闭
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN
|| STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)){
break;
}
}

// ... ... confirmShutdown 善后处理,下面再讨论
}
}
});
}
1.2.3. NioEventLoop

至于如何执行任务,具体交给子类实现,一般就是无限循环阻塞获取任务并执行,如果获取为 null,则退出。不过前面说过,对于任务的管理统一由父类管理,SingleThreadEventExecutor提供了统一的从队列获取任务的方法,根据实现的接口,为了同时支持定时任务,这里用了两个任务队列,taskQueuescheduledTaskQueue

说下takeTask()思路:

首先从队列scheduledTaskQueuepeek(获取列头但不删除)一个定时任务,如果为空,则表示当前没有定时任务,那么就从任务队列taskQueue中阻塞获取;否则看下这个定时任务是否还有剩余的等待时间,如果有那么再把这段时间用于从taskQueue上阻塞获取,如果这段时间内没获取到则将这个定时任务从scheduledTaskQueue挪到taskQueue中,这样统一返回从taskQueue中获取的任务,并巧妙地解决了关于定时任务线程等待的问题。

虽然统一从taskQueue中获取任务,但是这里优先检查队列scheduledTaskQueue,并且将到了时间的定时任务挪到taskQueue中,这样做可以解决一个问题,就是如果任务队列taskQueue中一直有任务,那么定时队列中的任务将一直没有机会执行,直到最后关闭检查的时候,这样的就任务的定时计划就没有意义了。不过这里定时任务的执行可能也是滞后的,并且滞后的时间是不确定的,比如在定时任务不为空并且还没到执行时机的过程中,获取到了一个较耗时的任务,或者在将定时任务挪到taskQueue之前,其它线程添加了很多任务,都会导致定时任务被延后执行,所以单独使用taskQueuescheduledTaskQueue都没有问题,但同时使用时需要了解这个问题。

对于执行器的关闭,这里通过return null来体现。具体有两个出口,对于普通任务获取的阻塞是检测是否遇到唤醒任务WAKEUP_TASK,这样能保证在关闭事件这个时刻之前提交的任务都能被执行,而对于定时任务的等待过程则直接检查是否有中断请求,这里应该也能统一使用WAKEUP_TASK,但作者没有这样做,可能有其它原因,没再去深究。

io.netty.util.concurrent.SingleThreadEventExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
protected Runnable takeTask() {
assert inEventLoop();
if (!(taskQueue instanceof BlockingQueue)) {
throw new UnsupportedOperationException();
}

BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
for (;;) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); // 首先看下是否存在定时任务
if (scheduledTask == null) { // 如果不存在,则在taskQueue上阻塞获取
Runnable task = null;
try {
task = taskQueue.take();
if (task == WAKEUP_TASK) {
task = null;
}
} catch (InterruptedException e) {
// 忽略中断请求,上面通过预定义的 WAKEUP_TASK 进行唤醒并退出
}
return task;
} else {
long delayNanos = scheduledTask.delayNanos();
Runnable task = null;
if (delayNanos > 0) { // 定时任务还没到执行时机,则先尝试从taskQueue中或取,并指定超时
try {
task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
// 响应中断,返回null表示退出
return null;
}
}

if (task == null) { // 指定时间内没获取到任务,那么将定时任务挪到taskQueue中
// See https://github.com/netty/netty/issues/1614
fetchFromScheduledTaskQueue();
task = taskQueue.poll();
}

if (task != null) {
return task;
}
}
}
}

private boolean fetchFromScheduledTaskQueue() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return true;
}
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
for (;;) {
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
return true;
}
if (!taskQueue.offer(scheduledTask)) { // 如果taskQueue中没有空间了,那么还放回scheduledTaskQueue中,下次再试
scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
}
}

事实上NioEventLoop除了要处理上述队列中的任务,还要处理注册通过的I/O读写事件,不过那又涉及到Nio的一些处理逻辑,考虑到需要一定的篇幅,而且跟本文叙述的角度也不一样,后面再单独笔记进行整理。

1.3. shutdownGracefully

如果优雅地关闭自己,是所有设计良好的服务都需要考虑的问题。对于执行器的关闭入口,同样由ExecutorGroup对外提供,其逻辑就是逐个关闭子执行器,并返回一个Future用来表示关闭是否完成。

1.3.1. MultithreadEventExecutorGroup

对于NioEventLoopGroup,其实现在MultithreadEventExecutorGroup中定义。这里返回的是一个共享的terminationFuture,它封装在一个共享的关闭事件监听器FutureListener中,其中维护了一个关闭计数,并且这个监听器由所有子执行器共享,然后每个子执行器在结束时都会检查执行关闭事件,也就是将这个计数加1,那么最后一个子执行器在关闭后将会发现关闭计数等于子执行器总数,那么就由它帮忙将关闭状态置为完成。

io.netty.util.concurrent.MultithreadEventExecutorGroup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
for (EventExecutor l: children) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
return terminationFuture();
}

@Override
public Future<?> terminationFuture() {
return terminationFuture;
}

// ... ...
}
1.3.2. SingleThreadEventExecutor

对于每个具体的子执行器,它需要在收到关闭通知后妥善的处理好此前已经提交的任务,为了解决这个问题,它首先自定义了一套状态

io.netty.util.concurrent.SingleThreadEventExecutor
1
2
3
4
5
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;

当外部线程请求关闭时,首先将它的状态置为ST_SHUTTING_DOWN,并添加一个唤醒任务WAKEUP_TASK,执行线程将在检测到唤醒任务后退出阻塞获取,并检测到状态ST_SHUTTING_DOWN,然后它应该尽快执行完当前队列中所有的任务,并将状态置为ST_SHUTDOWN,最后再做一些清理工作,具体见下面源码分析。

1.3.2.1 shutdownGracefully

shutdownGracefully由外部线程调用,主要是负责将状态置为ST_SHUTTING_DOWN,并做一些准备工作,然后尝试唤醒当前执行线程,具体方式通过提交唤醒任务WAKEUP_TASK,以及调用子类自定义的实现

io.netty.util.concurrent.SingleThreadEventExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// quietPeriod = 2s, timeout = 15s
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
if (timeout < quietPeriod) {
throw new IllegalArgumentException(
"timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
}
ObjectUtil.checkNotNull(unit, "unit");

if (isShuttingDown()) { // 如果状态已经是ST_SHUTTING_DOWN,则有其它正在执行关闭,直接返回
return terminationFuture();
}

boolean inEventLoop = inEventLoop();
boolean wakeup;
int oldState;
for (;;) { // 循环CAS操作,保证状态置为 ST_SHUTTING_DOWN
if (isShuttingDown()) {
return terminationFuture(); // 如果被其它线程关闭了,则自己退出
}

int newState;
wakeup = true;
oldState = state;
if (inEventLoop) { // 如果关闭线程是自己,那么状态肯定是ST_STARTED,所以直接置为ST_SHUTTING_DOWN
newState = ST_SHUTTING_DOWN;
} else {
switch (oldState) { // 其它线程如果看到是ST_NOT_STARTED或ST_STARTED,则置为ST_SHUTTING_DOWN,否则保持不变
case ST_NOT_STARTED:
case ST_STARTED:
newState = ST_SHUTTING_DOWN;
break;
default:
newState = oldState;
wakeup = false;
}
}
if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
break;
}
}

// 在执行器中记录关闭参数,即静默周期,后超时时间
gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
gracefulShutdownTimeout = unit.toNanos(timeout);

// 确保执行启动:如果状态为ST_NOT_STARTED,则尝试启动并返回true,否则返回false
if (ensureThreadStarted(oldState)) {
return terminationFuture;
}

// 如果上一步返回false,则添加一个唤醒任务,唤醒可能阻塞的线程
// wakeup 确保只有成功将状态置为ST_SHUTTING_DOWN的线程才会执行
if (wakeup) {
taskQueue.offer(WAKEUP_TASK);
// 添加唤醒任务只是实现唤醒默认的一种策略,可能具体执行器并不是通过这种策略,
// 比如NioEventLoop在构造时,addTaskWakesUp就置为false,说明它自定义了唤醒操作
if (!addTaskWakesUp) {
wakeup(inEventLoop); // 调用子类唤醒策略
}
}

return terminationFuture();
}
1.3.2.2 doStartThread

执行线程在跳出任务获取的阻塞后,将会看到状态已经是ST_SHUTTING_DOWN,如果不是它自己设置(其它场景,如果子类实现的有问题,没有循环从队列中获取执行任务,而是就获取并实现第一个任务,那么就真的没执行一个任务创建一次线程了)。

然后调用confirmShutdown(),确认队列中已有的任务都被执行后再将状态置为ST_SHUTDOWN,最后再做些释放资源或事件触发等操作。

io.netty.util.concurrent.SingleThreadEventExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
// ... ...
try {
// ... ...
} finally {
for (;;) { // 1. CAS操作保证状态置为ST_SHUTTING_DOWN
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN
|| STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)){
break;
}
}

// 警告提示:子类实现在退出时,应该主动调用confirmShutdown()
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}

try {
// 2. 万一子类没有调用,这里做个托底,目的是确保当前已提交的任务都能被执行
for (;;) {
if (confirmShutdown()) {
break;
}
}

// 3. CAS操作保证状态置为 ST_SHUTDOWN,这之后不可以再提交任务
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
break;
}
}

// 最后在做一下保证操作吧
confirmShutdown();
} finally {
try {
cleanup();
} finally { // 释放资源
FastThreadLocal.removeAll(); // 清空ThreadLocal
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); // 状态置为ST_TERMINATED
threadLock.countDown(); // 放开 threadLock
int numUserTasks = drainTasks(); // 如果结束时队列中还有任务,则打印一个警告日志,理论上不应该出现
if (numUserTasks > 0 && logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + numUserTasks + ')');
}
terminationFuture.setSuccess(null); // future置为完成
}
}
}
}
});
}
1.3.2.3 confirmShutdown

confirmShutdown用来确保关闭时队列中的任务都能被执行,当然也是有限制的,根据关闭线程提供的参数静默时间quietPeriod(默认:2s),和超时时间timeout(默认:15s)来控制,比如如果执行完了但是静默期还没结束,那么在等等,而如果超过了超时时间,即便没执行完也返回。

io.netty.util.concurrent.SingleThreadEventExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
protected boolean confirmShutdown() {
if (!isShuttingDown()) { // 只有状态为ST_SHUTTING_DOWN,才需要停止任务
return false;
}

if (!inEventLoop()) { // 真正的关闭操作只能由执行线程自己执行,其它线程只是负责通知
throw new IllegalStateException("must be invoked from an event loop");
}

cancelScheduledTasks(); // 取消定时任务

if (gracefulShutdownStartTime == 0) { // 设置关闭的开始时间,即第一次调用关闭的时间
gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
}

// 如果执行完队列中所有的任务,以及Shutdown钩子,按理说可以结束了,但是如果有静默期,那么还再等一等
if (runAllTasks() || runShutdownHooks()) {
if (isShutdown()) { // 其它关闭入口比如shutdown(),会直接将状态置为ST_SHUTDOWN,那么这里直接返回
return true;
}

if (gracefulShutdownQuietPeriod == 0) { // 如果静默期为0,也直接返回
return true;
}

taskQueue.offer(WAKEUP_TASK); // 否则添加一个唤醒任务,等下次重试
return false;
}

// 否则即便没有执行结束,但如果状态已经是ST_SHUTDOWN,或者已经超过了设定的超时时间,那么也返回
final long nanoTime = ScheduledFutureTask.nanoTime();
if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
return true;
}

// 如果距离上一次执行的时间还在静默期之内,那么添加一个WAKEUP_TASK,并休眠100ms,等下次重试
if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
taskQueue.offer(WAKEUP_TASK);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore
}
return false;
}

// 走到这里就表示在最近一个静默期内没有添加过任务,那么基本上可以安全的关闭了,
// 但是这并不绝对,因为我们无法假设用户会不会在随后再次调用execute()方法
return true;
}

2. 示例

在了解了Netty提供的任务执行框架之后,我们也可以借助它来实现自己的执行器了,而且性能可以很容易的超过线程池的实现

  • 首先定义执行器组,这里直接使用默认的参数,然后覆盖子执行器的创建即可
https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/netty/executor/TaskExecutorGroup.java
1
2
3
4
5
6
7
8
9
10
11
public class TaskExecutorGroup extends MultithreadEventExecutorGroup {

protected TaskExecutorGroup(int n) { // 使用默认的单线程执行,以及默认选择器
super(n, null, DefaultEventExecutorChooserFactory.INSTANCE, new Object[3]);
}

@Override
protected EventExecutor newChild(Executor executor, Object... args) throws Exception {
return new TaskExecutor();
}
}
  • 定义具体任务执行器,这里很简单,就是循环从队列获取任务并执行,然后在退出前调用confirmShutdown()
https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/netty/executor/TaskExecutor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class TaskExecutor extends SingleThreadEventExecutor {

private static AtomicInteger index = new AtomicInteger();

protected TaskExecutor() {
super(null, new ThreadFactory(){
@Override
public Thread newThread(Runnable r) {
return new Thread("task-executor-" + index.incrementAndGet()){
@Override
public void run() {
r.run();
}
};
}
}, true);
}

@Override
protected void run() {
Runnable r;
while((r = takeTask()) != null){
r.run();
}
confirmShutdown(); // 退出之前执行 confirmShutdown()
}
}
  • 使用和关闭简单如下,与线程池类似
https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/netty/executor/_Main.java
1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws InterruptedException {
TaskExecutorGroup exec = new TaskExecutorGroup(4);
for(int i = 0; i < 20; i++){
exec.submit(new Runnable(){
@Override
public void run() {
LOG.info(Thread.currentThread().getName());
}
});
}
exec.shutdownGracefully();
}


参考:

  1. http://www.tianshouzhi.com/api/tutorials/netty/334