Netty Channel

———— 4.1.72.Final
words: 3.8k    views:    time: 16min

Netty 中的通道 Channel 是对Java原生网络编程Api的封装,以TCP编程为例 ,在java中,有两种常用方式:

  • 基于BIO,在JDK1.4之前,可以使用 java.net 包中的ServerSocket/Socket来代表服务端和客户端;
  • 基于NIO,在Jdk1.4引入Nio之后,可以使用 java.nio.channels 包中的ServerSocketChannel/SocketChannel来代表服务端与客户端;

在Netty中,分别进行了封装:

  • 使用OioServerSocketChannel/OioSocketChannelServerSocket/Socket进行了封装;
  • 使用NioServerSocketChannel/NioSocketChannelServerSocketChannel/SocketChannel进行了封装;

封装主要有两个方面,首先通过ChannelConfig来封装与Channel相关的配置,然后将对Channel的操作抽象成一个接口ChannelHander,并通过链表的形式来进行组织,以便定义操作的顺序,本文尝试从这两个角度对Netty中的Channel进行一些分析。

另外,在分析之前,最好先对Linux I/O模型,以及Java中原生Nio的相关Api有所了解,具体可以参考相关笔记:

继承结构

对于BIO相关的类这里不再赘述,主要说下NioServerSocketChannel/NioSocketChannel,它们都继承自AbstractNioChannel,其维护了与java Nio中Channel的对应关系,并提供了获取方法

io.netty.channel.nio.AbstractNioChannel
1
2
3
protected SelectableChannel javaChannel() {
return ch;
}

这里声明的返回类型是SelectableChannel,不过子类实现中会返回具体的SocketChannelServerSocketChannel

io.netty.channel.socket.nio.NioServerSocketChannel
1
2
3
4
@Override
protected ServerSocketChannel javaChannel() {
return (ServerSocketChannel) super.javaChannel();
}
io.netty.channel.socket.nio.NioSocketChannel
1
2
3
4
@Override
protected SocketChannel javaChannel() {
return (SocketChannel) super.javaChannel();
}

1. ChannelConfig

ChannelConfig只是一个配置接口,其实现可以理解成一个配置集,内部维护了一个Map,然后定义了一些对配置的操作方法

io.netty.channel.ChannelConfig
1
2
3
4
5
6
7
public interface ChannelConfig {
// ... ...
Map<ChannelOption<?>, Object> getOptions(); // 获取所有配置
boolean setOptions(Map<ChannelOption<?>, ?> options); // 替换所有配置
<T> T getOption(ChannelOption<T> option); // 获取以某个ChannelOption为key的参数值
<T> boolean setOption(ChannelOption<T> option, T value); // 替换某个ChannelOption为key的参数值
}

Channel中提供了配置集ChannelConfig的获取方法

io.netty.channel.Channel
1
ChannelConfig config();

并在创建时,负责对ChannelConfig进行初始化,比如

  • NioServerSocketChannel
io.netty.channel.socket.nio.NioServerSocketChannel
1
2
3
4
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
  • NioSocketChannel
io.netty.channel.socket.nio.NioSocketChannel
1
2
3
4
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}

可以看到,这里在初始化时传入了一个Socket,因为Netty只是对java网络编程Api的一层封装,因此最终对配置的设置还是要应用到原生的Socket对象上才能生效。比如:

io.netty.channel.socket.DefaultServerSocketChannelConfig
1
2
3
4
5
6
7
8
9
@Override
public SocketChannelConfig setTcpNoDelay(boolean tcpNoDelay) {
try {
javaSocket.setTcpNoDelay(tcpNoDelay);
} catch (SocketException e) {
throw new ChannelException(e);
}
return this;
}
1.1. Constant

通常对于Map中维护的配置项,都会预定义一些常量key,最直接的方式是定义一些常量String,比如

1
2
3
4
5
public class Constant {
public static final String CONFIG1 ="config1";
public static final String CONFIG2 ="config2";
// ...
}

但这样有一个问题,就是使用者无法知道应该给每个配置项设置什么类型的值,所以好一点的方式是通过泛型类来对常量key做一层包装,通过泛型来限制配置值的类型,顺便可以定义一些操作,比如值校验

https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/netty/constant/Constant.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Constant<T> {

public static final Constant<Integer> CONFIG1 = new Constant<>("config1");
public static final Constant<String> CONFIG2 = new Constant<>("config2");

private final String name;

public Constant(String name){
this.name = name;
}

public String name(){
return name;
}

public boolean valid(T value) {
// 自定义值校验 ...
return true;
}
}

然后在实现配置管理类的时候,就可以在get/put中根据key实例指定的参数类型来对值的类型进行限制了

https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/netty/constant/XXXConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class XXXConfig {

@SuppressWarnings("rawtypes")
private Map<Constant, Object> map = new HashMap<>();

public <T> void put(Constant<T> constant, T t){
constant.valid(t); // 值校验
map.put(constant, t);
}

@SuppressWarnings("unchecked")
public <T> T get(Constant<T> constant){
return (T)map.get(constant);
}
}

比如以下示例,对于配置config1的设置或取值,只能是int类型

https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/netty/constant/_Main.java
1
2
3
XXXConfig configs = new XXXConfig();
configs.put(Constant.CONFIG1, 123); // put或get的值只能是int型
System.out.println(configs.get(Constant.CONFIG1));

在Netty,对于Channel的配置项,使用的ChannelOption来封装,其思路也是类似的,只是还做了一些其它处理,比如定义了一个常量池来管理Constant,其内部也是通过Map来进行维护,这样相比直接用static final定义的好处,是可以通过string获取Constant,再获取对应的值

io.netty.util.ConstantPool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public abstract class ConstantPool<T extends Constant<T>> {

private final ConcurrentMap<String, T> constants = PlatformDependent.newConcurrentHashMap();

public T valueOf(String name) {
return getOrCreate(checkNonEmpty(name, "name"));
}

private T getOrCreate(String name) { // 线程安全考虑,如果已经存在就直接使用,丢弃本次创建的
T constant = constants.get(name);
if (constant == null) {
final T tempConstant = newConstant(nextId(), name);
constant = constants.putIfAbsent(name, tempConstant);
if (constant == null) {
return tempConstant;
}
}
return constant;
}

protected abstract T newConstant(int id, String name);

// ... ...
}
1.2. ChannelOption

下面看一下具体Channel中支持的一些配置项

  • ChannelConfig中支持一些通用的ChannelOption
1
2
3
4
5
6
7
8
9
10
11
ChannelOption.CONNECT_TIMEOUT_MILLIS
ChannelOption.WRITE_SPIN_COUNT
ChannelOption.ALLOCATOR
ChannelOption.AUTO_READ
ChannelOption.MAX_MESSAGES_PER_READ
ChannelOption.RCVBUF_ALLOCATOR
ChannelOption.ALLOCATOR
ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK
ChannelOption.WRITE_BUFFER_LOW_WATER_MARK
ChannelOption.MESSAGE_SIZE_ESTIMATOR
ChannelOption.AUTO_CLOSE
  • SocketChannelConfig在ChannelConfig的基础上支持了一些额外的ChannelOption
1
2
3
4
5
6
7
8
ChannelOption.SO_KEEPALIVE
ChannelOption.SO_REUSEADDR
ChannelOption.SO_LINGER
ChannelOption.TCP_NODELAY
ChannelOption.SO_RCVBUF
ChannelOption.SO_SNDBUF
ChannelOption.IP_TOS
ChannelOption.ALLOW_HALF_CLOSURE
  • ServerSocketChannelConfig又在ChannelConfig基础上支持了一些额外的ChannelOption
1
2
3
ChannelOption.SO_REUSEADDR
ChannelOption.SO_RCVBUF
ChannelOption.SO_BACKLOG

其中SocketChannelConfigServerSocketChannelConfig中支持的ChannelOption基本上与TCP连接参数相关,也就是每个ChannelOption与 java.net.StandardSocketOptions 中定义标准TCP参数相对应

下面看下这些参数的含义:

1
ChannelOption.SO_KEEPALIVE // 对应StandardSocketOptions.SO_KEEPALIVE 是否启用心跳机制,默认false

套接字本身也有一套心跳保活机制的,在双方TCP建立连接后(进入ESTABLISHED状态),并且在2小时(可以通过 SO_KEEPALIVE 选项设置)左右上层没有任何数据传输的情况下,这套机制便会被激活。

但这套机制只是操作系统底层使用的一个被动机制,理论上不应该被上层应用层使用。当系统关闭一个由 KEEPALIVE 机制检查出来的死连接时,是不会主动通知上层应用的,应用只有在调用相应的IO操作时才能检查出来,所以,应用层最好还是自己实现一套保活机制比较靠谱

在《UNIX网络编程第1卷》中也有阐述:

SO_KEEPALIVE 用于检测对方主机是否崩溃,避免(服务器)永远阻塞于TCP连接的输入。设置该选项后,如果2小时内在此套接口的任一方向都没有数据交换,TCP就自动给对方发一个保持存活探测分节(keepalive probe)。这是一个对方必须响应的TCP分节.它会导致以下三种情况:
1.对方接收一切正常:以期望的ACK响应。2小时后,TCP将发出另一个探测分节;
2.对方已崩溃且已重新启动:以RST响应。套接口的待处理错误被置为ECONNRESET,套接口本身则被关闭;
3.对方无任何响应:源自berkeley的TCP发送另外8个探测分节,相隔75秒一个,试图得到一个响应。在发出第一个探测分节11分钟15秒后若仍无响应就放弃,套接口的待处理错误被置为ETIMEOUT,套接口本身则被关闭。如ICMP错误是“host unreachable(主机不可达)”,说明对方主机并没有崩溃,但是不可达,这种情况下待处理错误被置为 EHOSTUNREACH。

1
ChannelOption.SO_REUSEADDR // 对应StandardSocketOptions.SO_REUSEADDR 是否重用处于TIME_WAIT状态的地址,默认false
1
ChannelOption.SO_LINGER // 对应StandardSocketOptions.SO_LINGER 套接字策略

当调用 closesocket 关闭套接字时,SO_LINGER 将决定系统如何处理残存在套接字发送队列中的数据。处理方式无非两种:丢弃,或者将数据继续发送至对端。事实上,SO_LINGER并不被推荐使用,大多数情况下推荐使用默认的关闭方式。SO_LINGER 以秒为单位,最大取值为65535,也就是说,如果在指定时间内残余数据尚未发送完成,那么也立即关闭。

1
ChannelOption.SO_SNDBUF // 对应StandardSocketOptions.SO_SNDBUF 发送缓冲区的大小,默认8K
1
ChannelOption.SO_RCVBUF // 对应StandardSocketOptions.SO_RCVBUF 接收缓冲区的大小,默认8K
1
ChannelOption.TCP_NODELAY // 对应StandardSocketOptions.TCP_NODELAY 是否一有数据就马上发送,默认为false

在TCP/IP协议中,无论发送多少数据,总是要在数据前面加上协议头,同时,对方接收到数据,也需要发送ACK表示确认。为了尽可能的利用网络带宽,TCP总是希望尽可能的发送足够大的数据,以提供发送包中数据的比例。

TCP_NODELAY用于是否开启Nagle算法,其目的就是为了尽可能发送大块数据,避免网络中充斥着许多小数据包。如果要求高实时性,一有数据就马上发送,那么可以置为true关闭Nagle算法,而如果要减少发送次数以减少网络交互,则可以置为false,等累积一定大小后再发。

1
ChannelOption.IP_TOS // 对应StandardSocketOptions.IP_TOS
1
ChannelOption.SO_BACKLOG // 对应StandardSocketOptions.SO_BACKLOG

BACKLOG用于构造服务端套接字ServerSocket,标识当服务器请求处理线程全满时,用于临时存放已完成TCP连接三次握手的请求的队列最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。在Netty中,默认从文件/proc/sys/net/core/somaxconn中读取,如果没有读到,默认取3072。

2. ChannelHandler

对于Channel的I/O事件处理,Netty中抽象出了一个ChannelHandler接口,并且针对输入和输出,分成了ChannelInboundHandlerChannelOutboundHandler,其继承结构如下

在实现输入输出事件处理器时,可以直接继承适配器ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter

2.1. ChannelPipeline

通常对于IO事件的处理,会分成几个阶段,每个阶段执行不同的操作,比如处理TCP拆包/粘包问题、编解码、以及业务处理,因此通常需要编写多个ChannelHandler,并以特定的顺序进行组织。

于是,Netty中定义了管道的概念,通过ChannelPipeline来定义ChannelHandler之间的处理顺序,每个 Channel 对象创建的时候,都会初始化一个私有的管道,可以通过pipeline()方法获取。

io.netty.channel.AbstractChannel
1
2
3
4
5
6
7
8
9
10
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}

protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
  • Netty服务端的创建:
1
2
3
4
5
6
7
8
9
10
11
12
13
ServerBootstrap serverBoot = new ServerBootstrap();
serverBoot.group(acceptGroup, handleGroup).channel(NioServerSocketChannel.class);
serverBoot.option(ChannelOption.SO_BACKLOG, 1024);
serverBoot.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast("name1", new InboundHandlerA());
channel.pipeline().addLast("name2", new InboundHandlerB());
channel.pipeline().addLast("name3", new OutboundHandlerA());
channel.pipeline().addLast("name4", new OutboundHandlerB());
channel.pipeline().addLast("name5", new InOutboundHandlerX());
}
});

上述代码表示服务在收到一个SocketChannel的时候,通过initChannel()来对其进行设置,这里就是获取其管道并向其中添加了一些ChannelHander,而添加的顺序也就定义了在收到读/写事件时调用handler处理的顺序。要注意的是,当处理输入事件时,输出事件处理器是不会发生作用的,反之亦然。另外,输入处理器的调用顺序是正序的,而输出处理的调用则相反

2.2. ChannelHandlerContext

管道ChannelPipeline中负责维护ChannelHander的顺序,最简单的方式是直接丢到一个List中便可,但这里其实是通过链表的形式,首先将其包装成一个ChannelHandlerContext

io.netty.channel.DefaultChannelPipeline
1
2
3
4
5
6
7
8
9
@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
synchronized (this) {
checkDuplicateName(name); // 每种类型的handler实例只允许添加一次
AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
addLast0(name, newCtx);
}
return this;
}

ChannelHandlerContext中会记住前后节点,包装一层节点的另一个好处是在调用Handler处理的前后还可以做一些额外代理操作

io.netty.channel.AbstractChannelHandlerContext
1
2
3
4
5
6
7
8
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {

volatile AbstractChannelHandlerContext next; // 上一个节点
volatile AbstractChannelHandlerContext prev; // 下一个节点

// ... ...
}

DefaultChannelPipeline中会记住链表的哨兵节点,并负责对链表进行维护

io.netty.channel.DefaultChannelPipeline
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class DefaultChannelPipeline implements ChannelPipeline {

// ... ...

final AbstractChannelHandlerContext head; // 链表头节点
final AbstractChannelHandlerContext tail; // 链表尾节点

// ... ...

private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
}

ChannelHander、ChannelPipeline、ChannelHandlerContext的调用过程:

以输入事件为例,可以看到ChannelInboundHandler中定义了9个事件处理

io.netty.channel.ChannelInboundHandler
1
2
3
4
5
6
7
8
9
10
11
public interface ChannelInboundHandler extends ChannelHandler {
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
void channelActive(ChannelHandlerContext ctx) throws Exception;
void channelInactive(ChannelHandlerContext ctx) throws Exception;
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

然后在ChannelPipelineChannelHandlerContext中,也可以看到与之对应的9个以fire开头的方法

很明显这是一个责任链模式的应用,即首先ChannelPipeline某个事件被调用,然后它依次调用链表中ChannelHandlerContext对应的事件,进而再调用ChannelHander对应的事件,直至整个链表调用完为止。

下面看下具体事件被调用的时机:

  • fireChannelRegistered()在注册时调用,如果通道激活了并且是首次注册则调用fireChannelActive()
io.netty.channel.AbstractChannel#AbstractUnsafe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void register0(ChannelPromise promise) {
try {
// ... ...
boolean firstRegistration = neverRegistered; // channel是否首次注册

// ... ...
pipeline.fireChannelRegistered(); // 回调 ChannelRegistered
if (isActive()) { // 对客户端就是连接成功了isConnected(),服务端则是地址绑定成功了isBound()
if (firstRegistration) {
pipeline.fireChannelActive(); // 只在首次注册时回调事件ChannelActive
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// ... ...
}
}
  • 类似的,fireChannelInactive()fireChannelUnregistered()则在取消注册时进行调用

  • fireChannelRead(Object msg)fireChannelReadComplete()在数据需要读取时被触发

io.netty.channel.nio.AbstractNioByteChannel#NioByteUnsafe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public void read() {
// ... ...
try {
int totalReadAmount = 0;
boolean readPendingReset = false;
do {
// ... ...
pipeline.fireChannelRead(byteBuf); //有数据要读取,调用fireChannelRead
// ... ...
} while (++ messages < maxMessagesPerRead);

pipeline.fireChannelReadComplete(); //数据读取完成,调用fireChannelReadComplete()
// ... ...
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
} finally {
// ... ...
}
}
  • fireExceptionCaught()在读取数据出错时被调用
io.netty.channel.nio.AbstractNioByteChannel#NioByteUnsafe
1
2
3
4
5
6
7
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) {
// ... ...
pipeline.fireExceptionCaught(cause); // 调用fireExceptionCaught
if (close || cause instanceof IOException) {
closeOnRead(pipeline);
}
}
  • fireUserEventTriggered(Object event)当正在读取数据时,如果连接关闭,则调用此方法

上述异常处理时,会判断异常类型是否是IOException,或者连接是否关闭。如果是,则调用closeOnRead方法,这个方法内部会调用fireUserEventTriggered(Object event)

io.netty.channel.nio.AbstractNioByteChannel#NioByteUnsafe
1
2
3
4
5
6
7
8
9
10
11
12
private void closeOnRead(ChannelPipeline pipeline) {
SelectionKey key = selectionKey();
setInputShutdown();
if (isOpen()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
key.interestOps(key.interestOps() & ~readInterestOp);
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); // 调用fireUserEventTriggered方法
} else {
close(voidPromise());
}
}
}
  • fireChannelWritabilityChanged()当有数据需要输出的时候被调用
io.netty.channel.ChannelOutboundBuffer
1
2
3
4
5
6
7
8
9
void incrementPendingOutboundBytes(int size) {
// ... ...
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
if (WRITABLE_UPDATER.compareAndSet(this, 1, 0)) {
channel.pipeline().fireChannelWritabilityChanged(); // 需要输出数据,调用fireChannelWritabilityChanged()
}
}
}


参考:

  1. http://www.tianshouzhi.com/api/tutorials/netty/338
  2. http://www.tianshouzhi.com/api/tutorials/netty/339
  3. http://www.tianshouzhi.com/api/tutorials/netty/340