NIO Channel & Selector

words: 2.2k    views:    time: 8min
I/O


Channel 与 Stream 是一个级别,只不过 Stream 是单向的,比如InputStreamOutputStream,而 Channel 可以是单向的,也可以是双向的,既可以同时支持读和写操作,具体由其实现的接口所决定

java.nio.channels
1
2
3
4
5
6
7
public interface ReadableByteChannel extends Channel {
public int read(ByteBuffer dst) throws IOException;
}

public interface WritableByteChannel extends Channel{
public int write(ByteBuffer src) throws IOException;
}

可以看到 read 和 write 方法接受的都是一个 ByteBuffer 参数,其中Channel.read是向ByteBuffer中put数据,然后应用从ByteBuffer中获取,而Channel.write是从ByteBuffer中get数据,然后发送给其他远程主机。两者均返回字节数,然后缓冲区的position位置也会前移对应的字节数,如果只进行了部分传输,缓冲区可以被重新提交给通道并从上次中断的地方继续传输,该过程可以重复进行直到缓冲区的hasRemaining()方法返回false

1. Socket Channel

Channel的类继承机构如下图所示,类似于I/O的分类,通道也可以分为文件file通道,和套接字socket通道

在socket通道类中,DatagramChannel和SocketChannel实现了读写接口,而ServerSocketChannel只负责监听传入的连接和创建新的SocketChannel对象,本身不传输数据。

它们内部都依赖一个socket对象,即Socket、ServerSocket或DatagramSocket,可以通过socket()方法获取。反过来socket也可以通过getChannel()获取其对应的通道,但前提是这个socket对象必须是由通道类创建的,而不是直接new的。

1.1. ServerSocketChannel

ServerSocketChannel是一个基于通道的socket监听器,其任务与ServerSocket相同,只是增加了通道语义,能够在非阻塞模式下运行

静态工厂open()可以负责创建一个新的实例,但是它没有绑定端口,需要获取对应的ServerSocket再进行端口绑定以及一些其它设置

1
2
3
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverChannel.socket();
serverSocket.bind(new InetSocketAddress(port));

如果在ServerSocket上调用accept(),将会阻塞直到返回一个Socket对象,而在ServerSocketChannel上调用accept(),则会返回一个SocketChannel,但它支持非阻塞模式运行,如果没有连接接入,accept()会立即返回null。

1.2. SocketChannel

SocketChannel作用与Socket一样,用于封装点对点、有序的网络连接。它负责发起一个到监听服务的连接,直到连接成功或失败。

1
2
3
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(true); // 默认
socketChannel.connect(new InetSocketAddress(host, port));

类似的,可以通过静态工厂open()创建实例,其内部与一个Socket关联,同时提供了非阻塞创建连接的能力,也就是不等连接建立完成就立即返回。如果没有立即完成,可以将其注册到选择器上,然后等连接建立成功了再进行处理。

2. Selector

Selector依赖操作系统提供的一个轮询服务接口,用于对多个注册的连接进行监听。它是NIO能提高服务并发能力的关键,使得应用可以用一个线程来监听多个Socket连接的事件,比如连接建立,数据到达等,当发现有Socket事件就绪之后再返回应用线程,然后应用线程可以拿到对应的SocketChannel后再进行处理。

2.1. open

通过静态工厂方法open()可以创建Selector实例,其内部通过默认的SelectorProvider获取一个新的实例

1
2
3
4
5
Selector selector = Selector.open( );

// 等价于
// SelectorProvider provider = SelectorProvider.provider();
// Selector abstractSelector = provider.openSelector();

2.2. register

注册操作由通道负责定义,因为它是通道的行为。由于通道支持非阻塞模式,那么就需要一种方式来知道通道在什么时候操作就绪,而方式就是将自己其注册到Selector上。

java.nio.channelsSelectableChannel
1
public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;

要注意的是,在注册到Selector上时,通道必须处于非阻塞模式,否则将抛出IllegalBlockingModeException异常,并且一旦被注册,就不能再回到阻塞模式。

一个通道可以被注册到多个选择器上,但对每个选择器而言,最好只注册一次。如果一个Selector上多次注册同一个Channel,返回的SelectionKey将是同一个实例,即后面注册的感兴趣的操作类型会覆盖之前的。

2.3. SelectionKey

具体在通道注册时可以指定感兴趣的事件,并且可以绑定一个对象,然后会将这些信息封装在SelectionKey中,并由register()返回

SelectonKey中预定义了4种事件,对应的也提供了方法来检测事件是否就绪,其实现就是一个简单的与运算

java.nio.channels.SelectionKey
1
2
3
4
5
6
7
8
9
public static final int OP_READ = 1 << 0;     // 读
public static final int OP_WRITE = 1 << 2; // 写
public static final int OP_CONNECT = 1 << 3; // 连接建立
public static final int OP_ACCEPT = 1 << 4; // 接收连接

public final boolean isReadable(); // (readyOps() & OP_ACCEPT) != 0
public final boolean isWritable(); // (readyOps() & OP_WRITE) != 0
public final boolean isConnectable(); // (readyOps() & OP_CONNECT) != 0
public final boolean isAcceptable(); // (readyOps() & OP_ACCEPT) != 0

同时,SelectonKey也提供了一些方法来获取注册时相关的信息,比如通道或事件

java.nio.channels.SelectionKey
1
2
3
4
5
6
public abstract SelectableChannel channel(); // 获取SelectionKey关联的channel
public abstract Selector selector(); // 获取selectionKey关联的Selector

public final Object attachment(); // 获取绑定的对象
public abstract int interestOps(); // 感兴趣的操作
public abstract int readyOps(); // 感兴趣的操作中,已经就绪的操作

另外,SelectonKey还提供了一些方法来修改注册时相关的信息,比如感兴趣的事件和绑定对象。Netty中在注册通道时就分成了两步,首先注册时将事件指定为0,即对任何事件都不感兴趣,然后成功之后在通过interestOps设置感兴趣的事件。

java.nio.channels.SelectionKey
1
2
public abstract SelectionKey interestOps(int ops);
public final Object attach(Object ob);

2.4. select

Selector的核心在于其提供的选择方法,可以概括下其思路:

  1. 通道各自将自己注册到Selector上,并声明感兴趣的事件;
  2. 然后调用Selector的select方法,当有通道就绪后select会返回,并将就绪的通道放入一个集合中;
  3. 在select返回后,便可以从集合中获取SelectonKey,然后就能获取对应的通道进行处理;

对于具体的选择操作,Selector提供了三种方式:

java.nio.channels.Selector
1
2
3
public abstract int select() throws IOException;               // 阻塞直到有通道准备就绪
public abstract int select (long timeout) throws IOException; // 阻塞直到有通道准备就绪,或者超过指定时间
public abstract int selectNow() throws IOException; // 非阻塞,立即返回

其实,Selector中维护了三个键SelectonKey集合

已注册的键集合(Registered):已经注册的键集合,并不是所有注册过的键都仍然有效。可以通过keys()方法返回,但这个集合不可以直接修改,试图这么做的话会抛出UnsupportedOperationException异常。

已选择的键的集合(Selected):已注册并且已就绪的键集合,通过selectedKeys()返回,其中每个键对应的通道都被选择器判断为就绪。

已取消的键的集合(Cancelled):已注册并且调用过cancel()的键集合,这些键已经无效,但它们还没有被注销。该集合为选择器私有。

选择是累积的,一旦选择器将一个键添加到它的已选择的键集合中,它就不会再移除这个键。并且,一旦一个键处于已选择的键集合中,这个键的 readyOPS 将只会被设置,而不会被清理。这样就将键的管理操作交给了开发人员,比如移除那些已处理过的键,或者重新注册。

  • 唤醒选择

对于阻塞在select()上的选择操作,有三种唤醒方式

wakeup():使得选择器上的第一个还没有返回的选择操作立即返回,如果当前没有在进行中的选择,那么下一次对select()方法的调用将立即返回,但之后的选择操作则正常进行。

close():使得所有在选择操作中阻塞的线程都被唤醒,就像wakeup()方法被调用了一样。另外,与选择器相关的通道将会被注销,而键将被取消,注意这里并没有说会关闭通道连接

interrupt():即中断线程的语义,这里不再赘述。

  • 线程安全

要注意的是,选择器是线程安全的,但其包含的键集合并不是

1
2
3
protected Set<SelectionKey> selectedKeys = new HashSet();                   // sun.nio.ch.SelectorImpl
protected HashSet<SelectionKey> keys = new HashSet(); // sun.nio.ch.SelectorImpl
private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();// java.nio.channels.spi.AbstractSelector


参考:

  1. http://www.tianshouzhi.com/api/tutorials/netty/316
  2. http://www.tianshouzhi.com/api/tutorials/netty/318
  3. https://stackoverflow.com/questions/24543675/does-selector-close-closes-all-client-sockets