Netty NioEventLoop

———— 4.1.72.Final
words: 1.4k    views:    time: 7min

Java Nio是对多路复用IO模型的实现,其核心在于借助操作系统提供的选择器实现对连接的监听,然后应用服务只需对就绪的连接事件进行处理即可,比如可读可写等,这样就降低了应用服务在并发场景下的线程资源压力。

Netty中设计了专门的NioEventLoop来封装对Nio事件的处理,其思路是给每个NioEventLoop初始化一个选择器Selector,然后将通道对Selector的注册转化成对NioEventLoop的注册,这样可以让每个NioEventLoop各司其职,只负责处理注册到自己身上的通道事件,从而即实现了通道事件的多线程处理,同时又避免了过程中的线程安全问题。

1. register

关于通道的注册有三种情况:

  1. 客户端SocketChannel注册;
  2. 服务端ServerSocketChannel注册;
  3. 由服务端ServerSocketChannel收到连接后创建的SocketChannel注册;

Netty将这三种情况都抽象成了统一的接口register,由MultithreadEventLoopGroup对外提供

io.netty.channel.MultithreadEventLoopGroup
1
2
3
4
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}

然后在具体SingleThreadEventLoop中注册之前,会先将Channel包装成一个DefaultChannelPromise,关于Promise的作用在之前笔记[Netty Future & Promise]中已经有过介绍,至于这里的作用则是用来表示注册操作的成功或失败

io.netty.channel.SingleThreadEventLoop
1
2
3
4
5
6
7
8
9
10
11
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this)); // 使用Promise绑定channel和EventLoop
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise); // 反过来再让channel绑定Promise和EventLoop
return promise;
}

另外,这里还有一个意图就是让PromiseChannelEventLoop相互关联,其相关类图可以如下所示:

最后真正的注册还是交给Channel自己来进行,它将注册分成了两步,第一次注册时声明对任何事件都不感兴趣

io.netty.channel.AbstractChannel#AbstractUnsafe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) { // 不允许重复注册,通过在Channel中添加一个registered标识位来实现
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) { // EventLoop是否支持处理此Channel,具体子类清楚哪种类型的EventLoop可以处理自己
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}

AbstractChannel.this.eventLoop = eventLoop; // 在Channel中绑定EventLoop

// 下面即进行注册,区别是当前线程自己操作还是交给执行器来注册
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

private void register0(ChannelPromise promise) {
try {
// 将操作置为不可中断,并且检查通道是否仍处于连接状态
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}

boolean firstRegistration = neverRegistered; // 记个变量,确保下面fireChannelActive事件只在首次注册时被调用
doRegister();
neverRegistered = false;
registered = true;

pipeline.invokeHandlerAddedIfNeeded();

safeSetSuccess(promise); // 先设置成功,如果下面注册异常了再设置成失败
pipeline.fireChannelRegistered(); // 回调事件ChannelRegistered
if (isActive()) { // 对客户端就是连接成功了isConnected(),服务端则是地址绑定成功了isBound()
if (firstRegistration) {
pipeline.fireChannelActive(); // 只在首次注册时回调事件ChannelActive
} else if (config().isAutoRead()) { // 如果通道没有立即就绪,并且设置了autoRead(),则注册到选择器上
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
1.1. doRegister
io.netty.channel.nio.AbstractNioChannel#AbstractNioUnsafe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow(); // 第一次捕获CancelledKeyException,则清掉缓存进行重试
selected = true;
} else {
throw e; // 理论不应该再次出现CancelledKeyException,不知道什么原因,可能是JDK bug
}
}
}
}
1.2. doBeginRead
io.netty.channel.AbstractChannel#AbstractUnsafe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public final void beginRead() {
assertEventLoop(); // assert !registered || eventLoop.inEventLoop();
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
io.netty.channel.nio.AbstractNioChannel#AbstractNioUnsafe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}

readPending = true;

final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}

2. execute

io.netty.channel.nio.NioEventLoop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;

case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO

case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}

selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}

if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}


参考:

  1. https://juejin.cn/post/6844903732652539911
  2. https://www.jianshu.com/p/9e5e45a23309
  3. https://www.jianshu.com/p/f7cb63f0d1a1