NIO Channel & Selector
words: 2.2k views: time: 8minChannel 与 Stream 是一个级别,只不过 Stream 是单向的,比如InputStream
或OutputStream
,而 Channel 可以是单向的,也可以是双向的,既可以同时支持读和写操作,具体由其实现的接口所决定
1 | public interface ReadableByteChannel extends Channel { |
可以看到 read 和 write 方法接受的都是一个 ByteBuffer 参数,其中Channel.read是向ByteBuffer中put数据,然后应用从ByteBuffer中获取,而Channel.write是从ByteBuffer中get数据,然后发送给其他远程主机。两者均返回字节数,然后缓冲区的position位置也会前移对应的字节数,如果只进行了部分传输,缓冲区可以被重新提交给通道并从上次中断的地方继续传输,该过程可以重复进行直到缓冲区的hasRemaining()
方法返回false
1. Socket Channel
Channel的类继承机构如下图所示,类似于I/O的分类,通道也可以分为文件file通道,和套接字socket通道
在socket通道类中,DatagramChannel和SocketChannel实现了读写接口,而ServerSocketChannel只负责监听传入的连接和创建新的SocketChannel对象,本身不传输数据。
它们内部都依赖一个socket对象,即Socket、ServerSocket或DatagramSocket,可以通过socket()
方法获取。反过来socket也可以通过getChannel()
获取其对应的通道,但前提是这个socket对象必须是由通道类创建的,而不是直接new的。
1.1. ServerSocketChannel
ServerSocketChannel是一个基于通道的socket监听器,其任务与ServerSocket相同,只是增加了通道语义,能够在非阻塞模式下运行
静态工厂open()
可以负责创建一个新的实例,但是它没有绑定端口,需要获取对应的ServerSocket再进行端口绑定以及一些其它设置
1 | ServerSocketChannel serverChannel = ServerSocketChannel.open(); |
如果在ServerSocket上调用accept()
,将会阻塞直到返回一个Socket对象,而在ServerSocketChannel上调用accept()
,则会返回一个SocketChannel,但它支持非阻塞模式运行,如果没有连接接入,accept()
会立即返回null。
1.2. SocketChannel
SocketChannel作用与Socket一样,用于封装点对点、有序的网络连接。它负责发起一个到监听服务的连接,直到连接成功或失败。
1 | SocketChannel socketChannel = SocketChannel.open(); |
类似的,可以通过静态工厂open()
创建实例,其内部与一个Socket关联,同时提供了非阻塞创建连接的能力,也就是不等连接建立完成就立即返回。如果没有立即完成,可以将其注册到选择器上,然后等连接建立成功了再进行处理。
2. Selector
Selector依赖操作系统提供的一个轮询服务接口,用于对多个注册的连接进行监听。它是NIO能提高服务并发能力的关键,使得应用可以用一个线程来监听多个Socket连接的事件,比如连接建立,数据到达等,当发现有Socket事件就绪之后再返回应用线程,然后应用线程可以拿到对应的SocketChannel后再进行处理。
2.1. open
通过静态工厂方法open()
可以创建Selector实例,其内部通过默认的SelectorProvider获取一个新的实例
1 | Selector selector = Selector.open( ); |
2.2. register
注册操作由通道负责定义,因为它是通道的行为。由于通道支持非阻塞模式,那么就需要一种方式来知道通道在什么时候操作就绪,而方式就是将自己其注册到Selector上。
1 | public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException; |
要注意的是,在注册到Selector上时,通道必须处于非阻塞模式,否则将抛出IllegalBlockingModeException异常,并且一旦被注册,就不能再回到阻塞模式。
一个通道可以被注册到多个选择器上,但对每个选择器而言,最好只注册一次。如果一个Selector上多次注册同一个Channel,返回的SelectionKey将是同一个实例,即后面注册的感兴趣的操作类型会覆盖之前的。
2.3. SelectionKey
具体在通道注册时可以指定感兴趣的事件,并且可以绑定一个对象,然后会将这些信息封装在SelectionKey中,并由register()
返回
SelectonKey中预定义了4种事件,对应的也提供了方法来检测事件是否就绪,其实现就是一个简单的与运算
1 | public static final int OP_READ = 1 << 0; // 读 |
同时,SelectonKey也提供了一些方法来获取注册时相关的信息,比如通道或事件
1 | public abstract SelectableChannel channel(); // 获取SelectionKey关联的channel |
另外,SelectonKey还提供了一些方法来修改注册时相关的信息,比如感兴趣的事件和绑定对象。Netty中在注册通道时就分成了两步,首先注册时将事件指定为0,即对任何事件都不感兴趣,然后成功之后在通过interestOps
设置感兴趣的事件。
1 | public abstract SelectionKey interestOps(int ops); |
2.4. select
Selector的核心在于其提供的选择方法,可以概括下其思路:
- 通道各自将自己注册到Selector上,并声明感兴趣的事件;
- 然后调用Selector的select方法,当有通道就绪后select会返回,并将就绪的通道放入一个集合中;
- 在select返回后,便可以从集合中获取SelectonKey,然后就能获取对应的通道进行处理;
对于具体的选择操作,Selector提供了三种方式:
1 | public abstract int select() throws IOException; // 阻塞直到有通道准备就绪 |
其实,Selector中维护了三个键SelectonKey集合
已注册的键集合(Registered):已经注册的键集合,并不是所有注册过的键都仍然有效。可以通过keys()
方法返回,但这个集合不可以直接修改,试图这么做的话会抛出UnsupportedOperationException异常。
已选择的键的集合(Selected):已注册并且已就绪的键集合,通过selectedKeys()
返回,其中每个键对应的通道都被选择器判断为就绪。
已取消的键的集合(Cancelled):已注册并且调用过cancel()
的键集合,这些键已经无效,但它们还没有被注销。该集合为选择器私有。
选择是累积的,一旦选择器将一个键添加到它的已选择的键集合中,它就不会再移除这个键。并且,一旦一个键处于已选择的键集合中,这个键的 readyOPS 将只会被设置,而不会被清理。这样就将键的管理操作交给了开发人员,比如移除那些已处理过的键,或者重新注册。
- 唤醒选择
对于阻塞在select()
上的选择操作,有三种唤醒方式
wakeup():使得选择器上的第一个还没有返回的选择操作立即返回,如果当前没有在进行中的选择,那么下一次对select()方法的调用将立即返回,但之后的选择操作则正常进行。
close():使得所有在选择操作中阻塞的线程都被唤醒,就像wakeup()方法被调用了一样。另外,与选择器相关的通道将会被注销,而键将被取消,注意这里并没有说会关闭通道连接。
interrupt():即中断线程的语义,这里不再赘述。
- 线程安全
要注意的是,选择器是线程安全的,但其包含的键集合并不是
1 | protected Set<SelectionKey> selectedKeys = new HashSet(); // sun.nio.ch.SelectorImpl |
参考: