在Java中,任务执行的主要抽象不是Thread
,而是Executor
,它提供了一种标准的方式将任务的提交过程与执行过程解耦开来,为灵活而强大的异步任务执行框架提供了基础,线程池ThreadPoolExecutor
便是在此基础上实现的一套异步执行框架。
Netty也在Executor
的基础上自定义实现了一套高性能的任务执行框架,本文尝试从I/O事件执行器NioEventLoopGroup
作为切入点对其中的部分源码做一些分析,希望能从别人的设计中汲取一些思路和启发。建议在分析Netty中的任务执行机制之前先对Java中原生的任务执行框架有所了解,可以参考笔记:
类继承结构

上图主要站在NioEventLoopGroup
/NioEventLoop
的角度,梳理了一下相关的类结构,整体上是一个装饰器模式,右边ExecutorGroup
是左边Executor
的一个聚合,想当于一个整体的装饰器,而左边Executor
本身也是一个装饰器。
右边ExecutorGroup
的职责是负责管理左边Executor
的生命周期(即创建和关闭),以及给它们分配要执行的任务,它在执行器集合的基础上增加了一个负载均衡的能力,至于这个分配逻辑则由EventExecutorChooserFactory
定义,可以由外部实现;
左边的Executor
也管理了一个内部执行器(由外部提供),并负责它的启动和关闭,但是任务队列由自己管理,也就说它将自己的任务队列交给一个外部的执行器来负责执行。SingleThreadEventLoop
中默认使用的ThreadPerTaskExecutor
来执行,虽然本身是单线程,但是多个Executor
组成的group就相当于一个线程池了。
这样在总体上就显得比较灵活,用户可以很方便地自定义任务分配的策略,以及任务执行的方式。
这里可以与Java线程池的任务执行策略做个比较:
下图是线程池中对于任务异步执行的策略,其通过一个任务队列将任务的提交和执行两个过程解耦,也就是生成-消费模式,这样实现了线程的复用,避免了重复建立线程的开销。但存在一个问题,就是所有的工作线程都阻塞在一个共享的队列上,那么势必会存在锁竞争问题,这样在高并发场景下就可能会成为一个性能瓶颈。

所以改进版的设计就是让每个工作线程维护自己的任务队列,这样就不存在与其它线程竞争的问题了。而NioEventLoopGroup
/ NioEventLoop
采用的就是这种设计,如下图所示

1. NioEventLoopGroup
下面以NioEventLoopGroup
为例,从其初始化、任务执行execute
、以及关闭shutdownGracefully
的过程,对源码做一些分析
1.1. new
1
| EventLoopGroup eventGroup = new NioEventLoopGroup();
|
通过跟代码,会发现其初始化逻辑最终在MultithreadEventExecutorGroup
中,这里主要做了三件事:
- 初始化一个执行器实例
executor
,默认使用ThreadPerTaskExecutor
;
- 初始化子执行器,这里就是
NioEventLoop
,默认共创建2倍的cpu个数;
- 初始化任务选择器,默认的策略是平均分配,实现方式根据具体子执行器的个数不同略有差异;
io.netty.util.concurrent.MultithreadEventExecutorGroup1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
|
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { checkPositive(nThreads, "nThreads");
if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); }
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args); success = true; } catch (Exception e) { throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { } } }
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); }
}
|
1.1.1. ThreadPerTaskExecutor
默认创建的执行器是在每次执行任务时都创建一个新的线程
io.netty.util.concurrent.ThreadPerTaskExecutor1 2 3 4 5 6 7
| public final class ThreadPerTaskExecutor implements Executor { @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); } }
|
对于创建的线程由默认的线程工厂定义,这里它将任务和线程分别包装成FastThreadLocalRunnable
和FastThreadLocalThread
,至于这样做的目的后面再进行分析(TODO),这里先不讨论
io.netty.util.concurrent.DefaultThreadFactory1 2 3 4 5 6 7 8 9 10
| @Override public Thread newThread(Runnable r) { Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet()); return t; }
protected Thread newThread(Runnable r, String name) { return new FastThreadLocalThread(threadGroup, r, name); }
|
1.1.2. newChild
初始化子执行器,这里就是创建NioEventLoop
io.netty.channel.nio.NioEventLoopGroup1 2 3 4 5 6 7
| @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, selectorProvider, selectStrategyFactory.newSelectStrategy(), rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory); }
|
NioEventLoop
是一个单线程执行器,其初始化过程中将传入的执行器executor
又装饰了一层,目的主要是为了线程在执行任务的过程中,能通过ThreadLocal
找到自己所对应的执行器。
根据传入的ThreadPerTaskExecutor
可以知道,后面SingleThreadEventExecutor
在真正执行任务时将会创建一个新的线程,并将这个线程记住,也就是绑定了,但是反过来如果线程在执行任务的过程想知道自己是在哪个SingleThreadEventExecutor
中创建的,就只能通过ThreadLocal
了。
io.netty.util.concurrent.SingleThreadEventExecutor1 2 3 4
| protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, ...) { this.executor = ThreadExecutorMap.apply(executor, this); }
|
io.netty.util.internal.ThreadExecutorMap1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public static Executor apply(final Executor executor, final EventExecutor eventExecutor) { ObjectUtil.checkNotNull(executor, "executor"); ObjectUtil.checkNotNull(eventExecutor, "eventExecutor"); return new Executor() { @Override public void execute(final Runnable command) { executor.execute(apply(command, eventExecutor)); } }; } public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) { ObjectUtil.checkNotNull(command, "command"); ObjectUtil.checkNotNull(eventExecutor, "eventExecutor"); return new Runnable() { @Override public void run() { setCurrentEventExecutor(eventExecutor); try { command.run(); } finally { setCurrentEventExecutor(null); } } }; }
|
1.1.3. newChooser
初始化选择器,默认由工厂DefaultEventExecutorChooserFactory
创建,其策略是平均分配,至于实现方式则根据执行器总数是否为2的幂,分成了2种情况:如果是,则直接减一然后进行与运算即可,否则只能通过取余运算。
不过平均分配也可能存在问题,即如果任务的执行耗时不一样,那么可能存在这样的场景:某个子执行器被分配了几个耗时长的任务一直在执行,而其他的子执行器却被闲置。
这个问题有两种解决思路,一种是从任务分配者的角度,在分配任务时根据实际情况,将任务分配给一个当前负载程度较低的子执行器来执行。另一种是从任务执行者的角度出发,可以让空闲的执行者尝试从其它执行者的任务队列尾部获取待执行的任务,类似工作密取模式,这样一定程度上就实现了执行者之间的自平衡。
两种思路各有优缺点,前者分配者很难真正了解执行器的负载情况,只能通过任务数来比较个大概,比如执行器A有2个待执行任务,每个任务耗时5min,而执行器B有5个待执行任务,每个任务耗时2s,这种情况分配者就很难解决,因为它无法知道任务的具体耗时,只有执行者自己在真正执行后才知道什么时候能结束。第二种理论上应该可以解决这个问题,但是实现起来会比较复杂,而且让空闲的执行器从其它执行器的任务队列中获取任务,这个过程中会存在大量的无效竞争,这又回到了共享队列的问题上了,因此最终效果可能适得其反。
所以在使用时需要注意,只有当任务都是同类型并且相互独立时,性能才能达到最佳,也就是说这个问题完全可以由用户自己避免。
io.netty.util.concurrent.DefaultEventExecutorChooserFactory1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
@Override public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } }
private static boolean isPowerOfTwo(int val) { return (val & -val) == val; }
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } }
private static final class GenericEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)]; } } }
|
1.2. execute
任务的提交入口由ExecutorGroup
对外提供,然后它再通过某种策略将任务分配给子Executor
,而子Executor
将任务保存到队列,并最终由委托给自己依赖的执行器执行。
1.2.1. AbstractEventExecutorGroup
对于NioEventLoopGroup
,其逻辑在父类AbstractEventExecutorGroup
中实现,next()
就是选择一个子执行器进行执行
io.netty.util.concurrent.AbstractEventExecutorGroup1 2 3 4 5 6 7 8 9
| public abstract class AbstractEventExecutorGroup implements EventExecutorGroup { @Override public void execute(Runnable command) { next().execute(command); } }
|
1.2.2. SingleThreadEventExecutor
NioEventLoop
是一个单线程执行器,其执行上下文在父类SingleThreadEventExecutor
中定义。
这里SingleThreadEventExecutor
将启动的时机,延迟到了真正执行任务时进行,即在收到分配的任务后,首先将任务添加到任务队列中,然后检查当前状态,如果没启动则尝试进行启动,接着再检查状态是否已经关闭,如果关闭了则尝试将刚才添加的任务删除掉,如果删除成功了再执行reject()
,具体下面注释已经很详细了。
io.netty.util.concurrent.SingleThreadEventExecutor1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| @Override public void execute(Runnable task) { ObjectUtil.checkNotNull(task, "task"); execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task)); } private void execute(Runnable task, boolean immediate) { boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); if (isShutdown()) { boolean reject = false; try { if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) { } if (reject) { reject(); } } }
if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); } }
private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try { doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); } } } } }
|
具体启动的逻辑,就是委托依赖的执行器executor
执行一个由子类实现的run()
方法,通常是一个无限循环从队列中阻塞获取任务并执行的过程,这里负责定义了任务执行前后的一些上下文处理,比如在执行前记录下执行的线程,以及在退出时做一些善后处理。
根据前面的分析知道,这里默认依赖的执行器是ThreadPerTaskExecutor
,它会在执行时帮忙拉起一个新的线程,然后这里会在任务起跑前通过Thread.currentThread()
将线程记下来。
io.netty.util.concurrent.SingleThreadEventExecutor1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); }
boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { for (;;) { int oldState = state; if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)){ break; } }
} } }); }
|
1.2.3. NioEventLoop
至于如何执行任务,具体交给子类实现,一般就是无限循环阻塞获取任务并执行,如果获取为 null,则退出。不过前面说过,对于任务的管理统一由父类管理,SingleThreadEventExecutor
提供了统一的从队列获取任务的方法,根据实现的接口,为了同时支持定时任务,这里用了两个任务队列,taskQueue
和scheduledTaskQueue
说下takeTask()
思路:
首先从队列scheduledTaskQueue
中peek
(获取列头但不删除)一个定时任务,如果为空,则表示当前没有定时任务,那么就从任务队列taskQueue
中阻塞获取;否则看下这个定时任务是否还有剩余的等待时间,如果有那么再把这段时间用于从taskQueue
上阻塞获取,如果这段时间内没获取到则将这个定时任务从scheduledTaskQueue
挪到taskQueue
中,这样统一返回从taskQueue
中获取的任务,并巧妙地解决了关于定时任务线程等待的问题。
虽然统一从taskQueue
中获取任务,但是这里优先检查队列scheduledTaskQueue
,并且将到了时间的定时任务挪到taskQueue
中,这样做可以解决一个问题,就是如果任务队列taskQueue
中一直有任务,那么定时队列中的任务将一直没有机会执行,直到最后关闭检查的时候,这样的就任务的定时计划就没有意义了。不过这里定时任务的执行可能也是滞后的,并且滞后的时间是不确定的,比如在定时任务不为空并且还没到执行时机的过程中,获取到了一个较耗时的任务,或者在将定时任务挪到taskQueue
之前,其它线程添加了很多任务,都会导致定时任务被延后执行,所以单独使用taskQueue
或scheduledTaskQueue
都没有问题,但同时使用时需要了解这个问题。
对于执行器的关闭,这里通过return null
来体现。具体有两个出口,对于普通任务获取的阻塞是检测是否遇到唤醒任务WAKEUP_TASK
,这样能保证在关闭事件这个时刻之前提交的任务都能被执行,而对于定时任务的等待过程则直接检查是否有中断请求,这里应该也能统一使用WAKEUP_TASK
,但作者没有这样做,可能有其它原因,没再去深究。
io.netty.util.concurrent.SingleThreadEventExecutor1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| protected Runnable takeTask() { assert inEventLoop(); if (!(taskQueue instanceof BlockingQueue)) { throw new UnsupportedOperationException(); }
BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue; for (;;) { ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); if (scheduledTask == null) { Runnable task = null; try { task = taskQueue.take(); if (task == WAKEUP_TASK) { task = null; } } catch (InterruptedException e) { } return task; } else { long delayNanos = scheduledTask.delayNanos(); Runnable task = null; if (delayNanos > 0) { try { task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { return null; } } if (task == null) { fetchFromScheduledTaskQueue(); task = taskQueue.poll(); }
if (task != null) { return task; } } } }
private boolean fetchFromScheduledTaskQueue() { if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) { return true; } long nanoTime = AbstractScheduledEventExecutor.nanoTime(); for (;;) { Runnable scheduledTask = pollScheduledTask(nanoTime); if (scheduledTask == null) { return true; } if (!taskQueue.offer(scheduledTask)) { scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask); return false; } } }
|
事实上NioEventLoop
除了要处理上述队列中的任务,还要处理注册通过的I/O读写事件,不过那又涉及到Nio的一些处理逻辑,考虑到需要一定的篇幅,而且跟本文叙述的角度也不一样,后面再单独笔记进行整理。
1.3. shutdownGracefully
如果优雅地关闭自己,是所有设计良好的服务都需要考虑的问题。对于执行器的关闭入口,同样由ExecutorGroup
对外提供,其逻辑就是逐个关闭子执行器,并返回一个Future
用来表示关闭是否完成。
1.3.1. MultithreadEventExecutorGroup
对于NioEventLoopGroup
,其实现在MultithreadEventExecutorGroup
中定义。这里返回的是一个共享的terminationFuture
,它封装在一个共享的关闭事件监听器FutureListener
中,其中维护了一个关闭计数,并且这个监听器由所有子执行器共享,然后每个子执行器在结束时都会检查执行关闭事件,也就是将这个计数加1,那么最后一个子执行器在关闭后将会发现关闭计数等于子执行器总数,那么就由它帮忙将关闭状态置为完成。
io.netty.util.concurrent.MultithreadEventExecutorGroup1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
@Override public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { for (EventExecutor l: children) { l.shutdownGracefully(quietPeriod, timeout, unit); } return terminationFuture(); }
@Override public Future<?> terminationFuture() { return terminationFuture; } }
|
1.3.2. SingleThreadEventExecutor
对于每个具体的子执行器,它需要在收到关闭通知后妥善的处理好此前已经提交的任务,为了解决这个问题,它首先自定义了一套状态
io.netty.util.concurrent.SingleThreadEventExecutor1 2 3 4 5
| private static final int ST_NOT_STARTED = 1; private static final int ST_STARTED = 2; private static final int ST_SHUTTING_DOWN = 3; private static final int ST_SHUTDOWN = 4; private static final int ST_TERMINATED = 5;
|
当外部线程请求关闭时,首先将它的状态置为ST_SHUTTING_DOWN
,并添加一个唤醒任务WAKEUP_TASK
,执行线程将在检测到唤醒任务后退出阻塞获取,并检测到状态ST_SHUTTING_DOWN
,然后它应该尽快执行完当前队列中所有的任务,并将状态置为ST_SHUTDOWN
,最后再做一些清理工作,具体见下面源码分析。
1.3.2.1 shutdownGracefully
shutdownGracefully
由外部线程调用,主要是负责将状态置为ST_SHUTTING_DOWN
,并做一些准备工作,然后尝试唤醒当前执行线程,具体方式通过提交唤醒任务WAKEUP_TASK
,以及调用子类自定义的实现
io.netty.util.concurrent.SingleThreadEventExecutor1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| @Override public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod"); if (timeout < quietPeriod) { throw new IllegalArgumentException( "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))"); } ObjectUtil.checkNotNull(unit, "unit");
if (isShuttingDown()) { return terminationFuture(); }
boolean inEventLoop = inEventLoop(); boolean wakeup; int oldState; for (;;) { if (isShuttingDown()) { return terminationFuture(); } int newState; wakeup = true; oldState = state; if (inEventLoop) { newState = ST_SHUTTING_DOWN; } else { switch (oldState) { case ST_NOT_STARTED: case ST_STARTED: newState = ST_SHUTTING_DOWN; break; default: newState = oldState; wakeup = false; } } if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { break; } } gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod); gracefulShutdownTimeout = unit.toNanos(timeout); if (ensureThreadStarted(oldState)) { return terminationFuture; }
if (wakeup) { taskQueue.offer(WAKEUP_TASK); if (!addTaskWakesUp) { wakeup(inEventLoop); } }
return terminationFuture(); }
|
1.3.2.2 doStartThread
执行线程在跳出任务获取的阻塞后,将会看到状态已经是ST_SHUTTING_DOWN
,如果不是它自己设置(其它场景,如果子类实现的有问题,没有循环从队列中获取执行任务,而是就获取并实现第一个任务,那么就真的没执行一个任务创建一次线程了)。
然后调用confirmShutdown()
,确认队列中已有的任务都被执行后再将状态置为ST_SHUTDOWN
,最后再做些释放资源或事件触发等操作。
io.netty.util.concurrent.SingleThreadEventExecutor1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { try { } finally { for (;;) { int oldState = state; if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)){ break; } }
if (success && gracefulShutdownStartTime == 0) { if (logger.isErrorEnabled()) { logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " + "be called before run() implementation terminates."); } }
try { for (;;) { if (confirmShutdown()) { break; } }
for (;;) { int oldState = state; if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) { break; } }
confirmShutdown(); } finally { try { cleanup(); } finally { FastThreadLocal.removeAll(); STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.countDown(); int numUserTasks = drainTasks(); if (numUserTasks > 0 && logger.isWarnEnabled()) { logger.warn("An event executor terminated with " + "non-empty task queue (" + numUserTasks + ')'); } terminationFuture.setSuccess(null); } } } } }); }
|
1.3.2.3 confirmShutdown
confirmShutdown
用来确保关闭时队列中的任务都能被执行,当然也是有限制的,根据关闭线程提供的参数静默时间quietPeriod
(默认:2s),和超时时间timeout
(默认:15s)来控制,比如如果执行完了但是静默期还没结束,那么在等等,而如果超过了超时时间,即便没执行完也返回。
io.netty.util.concurrent.SingleThreadEventExecutor1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| protected boolean confirmShutdown() { if (!isShuttingDown()) { return false; }
if (!inEventLoop()) { throw new IllegalStateException("must be invoked from an event loop"); }
cancelScheduledTasks();
if (gracefulShutdownStartTime == 0) { gracefulShutdownStartTime = ScheduledFutureTask.nanoTime(); }
if (runAllTasks() || runShutdownHooks()) { if (isShutdown()) { return true; }
if (gracefulShutdownQuietPeriod == 0) { return true; } taskQueue.offer(WAKEUP_TASK); return false; }
final long nanoTime = ScheduledFutureTask.nanoTime(); if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) { return true; }
if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) { taskQueue.offer(WAKEUP_TASK); try { Thread.sleep(100); } catch (InterruptedException e) { } return false; }
return true; }
|
2. 示例
在了解了Netty提供的任务执行框架之后,我们也可以借助它来实现自己的执行器了,而且性能可以很容易的超过线程池的实现
- 首先定义执行器组,这里直接使用默认的参数,然后覆盖子执行器的创建即可
https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/netty/executor/TaskExecutorGroup.java1 2 3 4 5 6 7 8 9 10 11
| public class TaskExecutorGroup extends MultithreadEventExecutorGroup {
protected TaskExecutorGroup(int n) { super(n, null, DefaultEventExecutorChooserFactory.INSTANCE, new Object[3]); }
@Override protected EventExecutor newChild(Executor executor, Object... args) throws Exception { return new TaskExecutor(); } }
|
- 定义具体任务执行器,这里很简单,就是循环从队列获取任务并执行,然后在退出前调用
confirmShutdown()
https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/netty/executor/TaskExecutor.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class TaskExecutor extends SingleThreadEventExecutor { private static AtomicInteger index = new AtomicInteger();
protected TaskExecutor() { super(null, new ThreadFactory(){ @Override public Thread newThread(Runnable r) { return new Thread("task-executor-" + index.incrementAndGet()){ @Override public void run() { r.run(); } }; } }, true); } @Override protected void run() { Runnable r; while((r = takeTask()) != null){ r.run(); } confirmShutdown(); } }
|
https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/netty/executor/_Main.java1 2 3 4 5 6 7 8 9 10 11 12
| public static void main(String[] args) throws InterruptedException { TaskExecutorGroup exec = new TaskExecutorGroup(4); for(int i = 0; i < 20; i++){ exec.submit(new Runnable(){ @Override public void run() { LOG.info(Thread.currentThread().getName()); } }); } exec.shutdownGracefully(); }
|
参考:
- http://www.tianshouzhi.com/api/tutorials/netty/334