Netty 中的通道 Channel 是对Java原生网络编程Api的封装,以TCP编程为例 ,在java中,有两种常用方式:
- 基于BIO,在JDK1.4之前,可以使用 java.net 包中的
ServerSocket
/Socket
来代表服务端和客户端;
- 基于NIO,在Jdk1.4引入Nio之后,可以使用 java.nio.channels 包中的
ServerSocketChannel
/SocketChannel
来代表服务端与客户端;
在Netty中,分别进行了封装:
- 使用
OioServerSocketChannel
/OioSocketChannel
对ServerSocket
/Socket
进行了封装;
- 使用
NioServerSocketChannel
/NioSocketChannel
对ServerSocketChannel
/SocketChannel
进行了封装;
封装主要有两个方面,首先通过ChannelConfig
来封装与Channel相关的配置,然后将对Channel的操作抽象成一个接口ChannelHander
,并通过链表的形式来进行组织,以便定义操作的顺序,本文尝试从这两个角度对Netty中的Channel进行一些分析。
另外,在分析之前,最好先对Linux I/O模型,以及Java中原生Nio的相关Api有所了解,具体可以参考相关笔记:
继承结构

对于BIO相关的类这里不再赘述,主要说下NioServerSocketChannel
/NioSocketChannel
,它们都继承自AbstractNioChannel
,其维护了与java Nio中Channel的对应关系,并提供了获取方法
io.netty.channel.nio.AbstractNioChannel1 2 3
| protected SelectableChannel javaChannel() { return ch; }
|
这里声明的返回类型是SelectableChannel
,不过子类实现中会返回具体的SocketChannel
或ServerSocketChannel
io.netty.channel.socket.nio.NioServerSocketChannel1 2 3 4
| @Override protected ServerSocketChannel javaChannel() { return (ServerSocketChannel) super.javaChannel(); }
|
io.netty.channel.socket.nio.NioSocketChannel1 2 3 4
| @Override protected SocketChannel javaChannel() { return (SocketChannel) super.javaChannel(); }
|
1. ChannelConfig
ChannelConfig
只是一个配置接口,其实现可以理解成一个配置集,内部维护了一个Map,然后定义了一些对配置的操作方法
io.netty.channel.ChannelConfig1 2 3 4 5 6 7
| public interface ChannelConfig { Map<ChannelOption<?>, Object> getOptions(); boolean setOptions(Map<ChannelOption<?>, ?> options); <T> T getOption(ChannelOption<T> option); <T> boolean setOption(ChannelOption<T> option, T value); }
|
Channel
中提供了配置集ChannelConfig的获取方法
io.netty.channel.Channel
并在创建时,负责对ChannelConfig进行初始化,比如
io.netty.channel.socket.nio.NioServerSocketChannel1 2 3 4
| public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
|
io.netty.channel.socket.nio.NioSocketChannel1 2 3 4
| public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); }
|
可以看到,这里在初始化时传入了一个Socket
,因为Netty只是对java网络编程Api的一层封装,因此最终对配置的设置还是要应用到原生的Socket对象上才能生效。比如:
io.netty.channel.socket.DefaultServerSocketChannelConfig1 2 3 4 5 6 7 8 9
| @Override public SocketChannelConfig setTcpNoDelay(boolean tcpNoDelay) { try { javaSocket.setTcpNoDelay(tcpNoDelay); } catch (SocketException e) { throw new ChannelException(e); } return this; }
|
1.1. Constant
通常对于Map中维护的配置项,都会预定义一些常量key,最直接的方式是定义一些常量String,比如
1 2 3 4 5
| public class Constant { public static final String CONFIG1 ="config1"; public static final String CONFIG2 ="config2"; }
|
但这样有一个问题,就是使用者无法知道应该给每个配置项设置什么类型的值,所以好一点的方式是通过泛型类来对常量key做一层包装,通过泛型来限制配置值的类型,顺便可以定义一些操作,比如值校验
https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/netty/constant/Constant.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class Constant<T> {
public static final Constant<Integer> CONFIG1 = new Constant<>("config1"); public static final Constant<String> CONFIG2 = new Constant<>("config2");
private final String name; public Constant(String name){ this.name = name; } public String name(){ return name; } public boolean valid(T value) { return true; } }
|
然后在实现配置管理类的时候,就可以在get
/put
中根据key实例指定的参数类型来对值的类型进行限制了
https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/netty/constant/XXXConfig.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class XXXConfig { @SuppressWarnings("rawtypes") private Map<Constant, Object> map = new HashMap<>(); public <T> void put(Constant<T> constant, T t){ constant.valid(t); map.put(constant, t); } @SuppressWarnings("unchecked") public <T> T get(Constant<T> constant){ return (T)map.get(constant); } }
|
比如以下示例,对于配置config1
的设置或取值,只能是int类型
https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/netty/constant/_Main.java1 2 3
| XXXConfig configs = new XXXConfig(); configs.put(Constant.CONFIG1, 123); System.out.println(configs.get(Constant.CONFIG1));
|
在Netty,对于Channel的配置项,使用的ChannelOption
来封装,其思路也是类似的,只是还做了一些其它处理,比如定义了一个常量池来管理Constant,其内部也是通过Map来进行维护,这样相比直接用static final定义的好处,是可以通过string获取Constant,再获取对应的值
io.netty.util.ConstantPool1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public abstract class ConstantPool<T extends Constant<T>> {
private final ConcurrentMap<String, T> constants = PlatformDependent.newConcurrentHashMap(); public T valueOf(String name) { return getOrCreate(checkNonEmpty(name, "name")); } private T getOrCreate(String name) { T constant = constants.get(name); if (constant == null) { final T tempConstant = newConstant(nextId(), name); constant = constants.putIfAbsent(name, tempConstant); if (constant == null) { return tempConstant; } } return constant; } protected abstract T newConstant(int id, String name); }
|
1.2. ChannelOption
下面看一下具体Channel中支持的一些配置项
- ChannelConfig中支持一些通用的ChannelOption
1 2 3 4 5 6 7 8 9 10 11
| ChannelOption.CONNECT_TIMEOUT_MILLIS ChannelOption.WRITE_SPIN_COUNT ChannelOption.ALLOCATOR ChannelOption.AUTO_READ ChannelOption.MAX_MESSAGES_PER_READ ChannelOption.RCVBUF_ALLOCATOR ChannelOption.ALLOCATOR ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK ChannelOption.WRITE_BUFFER_LOW_WATER_MARK ChannelOption.MESSAGE_SIZE_ESTIMATOR ChannelOption.AUTO_CLOSE
|
- SocketChannelConfig在ChannelConfig的基础上支持了一些额外的ChannelOption
1 2 3 4 5 6 7 8
| ChannelOption.SO_KEEPALIVE ChannelOption.SO_REUSEADDR ChannelOption.SO_LINGER ChannelOption.TCP_NODELAY ChannelOption.SO_RCVBUF ChannelOption.SO_SNDBUF ChannelOption.IP_TOS ChannelOption.ALLOW_HALF_CLOSURE
|
- ServerSocketChannelConfig又在ChannelConfig基础上支持了一些额外的ChannelOption
1 2 3
| ChannelOption.SO_REUSEADDR ChannelOption.SO_RCVBUF ChannelOption.SO_BACKLOG
|
其中SocketChannelConfig
和ServerSocketChannelConfig
中支持的ChannelOption
基本上与TCP连接参数相关,也就是每个ChannelOption
与 java.net.StandardSocketOptions 中定义标准TCP参数相对应
下面看下这些参数的含义:
1
| ChannelOption.SO_KEEPALIVE
|
套接字本身也有一套心跳保活机制的,在双方TCP建立连接后(进入ESTABLISHED状态),并且在2小时(可以通过 SO_KEEPALIVE 选项设置)左右上层没有任何数据传输的情况下,这套机制便会被激活。
但这套机制只是操作系统底层使用的一个被动机制,理论上不应该被上层应用层使用。当系统关闭一个由 KEEPALIVE 机制检查出来的死连接时,是不会主动通知上层应用的,应用只有在调用相应的IO操作时才能检查出来,所以,应用层最好还是自己实现一套保活机制比较靠谱。
在《UNIX网络编程第1卷》中也有阐述:
SO_KEEPALIVE 用于检测对方主机是否崩溃,避免(服务器)永远阻塞于TCP连接的输入。设置该选项后,如果2小时内在此套接口的任一方向都没有数据交换,TCP就自动给对方发一个保持存活探测分节(keepalive probe)。这是一个对方必须响应的TCP分节.它会导致以下三种情况:
1.对方接收一切正常:以期望的ACK响应。2小时后,TCP将发出另一个探测分节;
2.对方已崩溃且已重新启动:以RST响应。套接口的待处理错误被置为ECONNRESET,套接口本身则被关闭;
3.对方无任何响应:源自berkeley的TCP发送另外8个探测分节,相隔75秒一个,试图得到一个响应。在发出第一个探测分节11分钟15秒后若仍无响应就放弃,套接口的待处理错误被置为ETIMEOUT,套接口本身则被关闭。如ICMP错误是“host unreachable(主机不可达)”,说明对方主机并没有崩溃,但是不可达,这种情况下待处理错误被置为 EHOSTUNREACH。
1
| ChannelOption.SO_REUSEADDR
|
当调用 closesocket 关闭套接字时,SO_LINGER 将决定系统如何处理残存在套接字发送队列中的数据。处理方式无非两种:丢弃,或者将数据继续发送至对端。事实上,SO_LINGER并不被推荐使用,大多数情况下推荐使用默认的关闭方式。SO_LINGER 以秒为单位,最大取值为65535,也就是说,如果在指定时间内残余数据尚未发送完成,那么也立即关闭。
1
| ChannelOption.TCP_NODELAY
|
在TCP/IP协议中,无论发送多少数据,总是要在数据前面加上协议头,同时,对方接收到数据,也需要发送ACK表示确认。为了尽可能的利用网络带宽,TCP总是希望尽可能的发送足够大的数据,以提供发送包中数据的比例。
TCP_NODELAY用于是否开启Nagle算法,其目的就是为了尽可能发送大块数据,避免网络中充斥着许多小数据包。如果要求高实时性,一有数据就马上发送,那么可以置为true关闭Nagle算法,而如果要减少发送次数以减少网络交互,则可以置为false,等累积一定大小后再发。
1
| ChannelOption.SO_BACKLOG
|
BACKLOG用于构造服务端套接字ServerSocket,标识当服务器请求处理线程全满时,用于临时存放已完成TCP连接三次握手的请求的队列最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。在Netty中,默认从文件/proc/sys/net/core/somaxconn中读取,如果没有读到,默认取3072。
2. ChannelHandler
对于Channel
的I/O事件处理,Netty中抽象出了一个ChannelHandler
接口,并且针对输入和输出,分成了ChannelInboundHandler
和ChannelOutboundHandler
,其继承结构如下

在实现输入输出事件处理器时,可以直接继承适配器ChannelInboundHandlerAdapter
或ChannelOutboundHandlerAdapter
2.1. ChannelPipeline
通常对于IO事件的处理,会分成几个阶段,每个阶段执行不同的操作,比如处理TCP拆包/粘包问题、编解码、以及业务处理,因此通常需要编写多个ChannelHandler
,并以特定的顺序进行组织。
于是,Netty中定义了管道的概念,通过ChannelPipeline
来定义ChannelHandler
之间的处理顺序,每个 Channel 对象创建的时候,都会初始化一个私有的管道,可以通过pipeline()
方法获取。
io.netty.channel.AbstractChannel1 2 3 4 5 6 7 8 9 10
| protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| ServerBootstrap serverBoot = new ServerBootstrap(); serverBoot.group(acceptGroup, handleGroup).channel(NioServerSocketChannel.class); serverBoot.option(ChannelOption.SO_BACKLOG, 1024); serverBoot.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast("name1", new InboundHandlerA()); channel.pipeline().addLast("name2", new InboundHandlerB()); channel.pipeline().addLast("name3", new OutboundHandlerA()); channel.pipeline().addLast("name4", new OutboundHandlerB()); channel.pipeline().addLast("name5", new InOutboundHandlerX()); } });
|
上述代码表示服务在收到一个SocketChannel
的时候,通过initChannel()
来对其进行设置,这里就是获取其管道并向其中添加了一些ChannelHander
,而添加的顺序也就定义了在收到读/写事件时调用handler处理的顺序。要注意的是,当处理输入事件时,输出事件处理器是不会发生作用的,反之亦然。另外,输入处理器的调用顺序是正序的,而输出处理的调用则相反。
2.2. ChannelHandlerContext
管道ChannelPipeline
中负责维护ChannelHander
的顺序,最简单的方式是直接丢到一个List
中便可,但这里其实是通过链表的形式,首先将其包装成一个ChannelHandlerContext
io.netty.channel.DefaultChannelPipeline1 2 3 4 5 6 7 8 9
| @Override public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) { synchronized (this) { checkDuplicateName(name); AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); addLast0(name, newCtx); } return this; }
|
ChannelHandlerContext
中会记住前后节点,包装一层节点的另一个好处是在调用Handler处理的前后还可以做一些额外代理操作
io.netty.channel.AbstractChannelHandlerContext1 2 3 4 5 6 7 8
| abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev; }
|
DefaultChannelPipeline
中会记住链表的哨兵节点,并负责对链表进行维护
io.netty.channel.DefaultChannelPipeline1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail;
private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; } }
|
ChannelHander、ChannelPipeline、ChannelHandlerContext的调用过程:
以输入事件为例,可以看到ChannelInboundHandler
中定义了9个事件处理
io.netty.channel.ChannelInboundHandler1 2 3 4 5 6 7 8 9 10 11
| public interface ChannelInboundHandler extends ChannelHandler { void channelRegistered(ChannelHandlerContext ctx) throws Exception; void channelUnregistered(ChannelHandlerContext ctx) throws Exception; void channelActive(ChannelHandlerContext ctx) throws Exception; void channelInactive(ChannelHandlerContext ctx) throws Exception; void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception; void channelReadComplete(ChannelHandlerContext ctx) throws Exception; void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception; void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception; void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; }
|
然后在ChannelPipeline
和ChannelHandlerContext
中,也可以看到与之对应的9个以fire开头的方法


很明显这是一个责任链模式的应用,即首先ChannelPipeline
某个事件被调用,然后它依次调用链表中ChannelHandlerContext
对应的事件,进而再调用ChannelHander
对应的事件,直至整个链表调用完为止。
下面看下具体事件被调用的时机:
fireChannelRegistered()
在注册时调用,如果通道激活了并且是首次注册则调用fireChannelActive()
io.netty.channel.AbstractChannel#AbstractUnsafe1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| private void register0(ChannelPromise promise) { try { boolean firstRegistration = neverRegistered; pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { } }
|
io.netty.channel.nio.AbstractNioByteChannel#NioByteUnsafe1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Override public void read() { try { int totalReadAmount = 0; boolean readPendingReset = false; do { pipeline.fireChannelRead(byteBuf); } while (++ messages < maxMessagesPerRead); pipeline.fireChannelReadComplete(); } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } finally { } }
|
fireExceptionCaught()
在读取数据出错时被调用
io.netty.channel.nio.AbstractNioByteChannel#NioByteUnsafe1 2 3 4 5 6 7
| private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { pipeline.fireExceptionCaught(cause); if (close || cause instanceof IOException) { closeOnRead(pipeline); } }
|
fireUserEventTriggered(Object event)
当正在读取数据时,如果连接关闭,则调用此方法
上述异常处理时,会判断异常类型是否是IOException,或者连接是否关闭。如果是,则调用closeOnRead方法,这个方法内部会调用fireUserEventTriggered(Object event)
io.netty.channel.nio.AbstractNioByteChannel#NioByteUnsafe1 2 3 4 5 6 7 8 9 10 11 12
| private void closeOnRead(ChannelPipeline pipeline) { SelectionKey key = selectionKey(); setInputShutdown(); if (isOpen()) { if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { key.interestOps(key.interestOps() & ~readInterestOp); pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); } else { close(voidPromise()); } } }
|
fireChannelWritabilityChanged()
当有数据需要输出的时候被调用
io.netty.channel.ChannelOutboundBuffer1 2 3 4 5 6 7 8 9
| void incrementPendingOutboundBytes(int size) { long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { if (WRITABLE_UPDATER.compareAndSet(this, 1, 0)) { channel.pipeline().fireChannelWritabilityChanged(); } } }
|
参考:
- http://www.tianshouzhi.com/api/tutorials/netty/338
- http://www.tianshouzhi.com/api/tutorials/netty/339
- http://www.tianshouzhi.com/api/tutorials/netty/340