Java I/O 网络

words: 5.2k    views:    time: 26min
I/O


对于网络I/O通信,其实站在应用程序的角度,就是面向Socket进行读写的过程,至于如何建立连接,数据包如何传输等问题则由TCP/IP以及更底层的协议保障,这些通常由操作系统和底层硬件负责实现,它们的目的就是希望能简化应用程序的开发。

在Linux系统中,一切皆文件,所以Socket也统一抽象为一个文件描述符,那么对应用程序来讲,收发消息就是对一个文件描述符进行读写。但是它与读写普通的本地磁盘文件也有区别,由于网络的延迟和不可靠性,在读Socket的过程中可能会有大量的时间花在等待数据包的到达上,因此如何提高应用程序在读写Socket过程中的响应性,就成了网络通信编程中一个很重要的问题。

针对网络I/O,Linux提供了五种IO模型,具体可以参考笔记

不过Java并没有全部支持,而是逐步选择支持了三种模型:
在JDK1.4之前,Java的IO模型只支持阻塞式IO(Blocking IO),简称为BIO;
在JDK1.4时,Java支持了I/O多路复用模型, 简称NIO,即新IO模型,不过现在JDK1.8早已成为主流版本,已经没什么新鲜了,所以更多的人愿意将它理解为非阻塞I/O,即None-Blocking IO;
在JDK1.7时,Java对NIO包进行了升级,支持异步I/O(Asynchronous IO),简称为AIO,因为是对nio包的升级,所有也称为NIO2.0;

1. Bio

最简单的办法是每建立一个连接,就创建一个连接处理线程,但这样问题是当连接数很多时,必然会耗尽服务器资源。于是好一点的做法是将连接处理丢给线程池,这样可以对使用的资源做一个限制,比如当大量连接过来时,如果线程池任务队列已满,那么可以拒绝处理。

这样的处理方式是直接面向连接的,也就是一旦建立连接,服务端就创建对应的处理,并且整个过程是同步阻塞的,因此很难应付高并发场景。

1.1. BioClient

BioClient中的逻辑很简单,就是循环向指定服务发送消息,不过这里采用的短连接,客户端每次发消息都重新建立连接,发送消息后同步等待响应,然后由服务端在写回响应后负责关闭。另外,为了模拟并发场景,这里创建了多个客户端,并用了一个栅栏,尽量等到一批客户端都准备好时,再同时发起连接

https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/bio/BioClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class BioClient extends Thread {

private static final Logger LOG = Logger.getLogger(BioClient.class);

private static AtomicInteger msg_index = new AtomicInteger(0);

private static SecureRandom random = new SecureRandom();

private final CyclicBarrier barrier;

private final String host;

private final int port;

public BioClient(String host, int port, int index, CyclicBarrier barrier) throws Exception {
this.host = host;
this.port = port;
this.barrier = barrier;
this.setName("client-" + index);
}

@Override
public void run() {
while (true) {
Socket socket = null; // 短连接消息,每次消息都重新建立连接
try {
sleep(random.nextInt(1000));
barrier.await(3000, TimeUnit.MILLISECONDS);
socket = new Socket(host, port);
} catch (Exception e) {
LOG.error("连接失败:" + e.getMessage());
return;
}

try(OutputStream out = socket.getOutputStream();
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()))){
String msg = "msg" + msg_index.incrementAndGet();
LOG.info(">> send " + msg);
out.write(msg.getBytes());
String resp = in.readLine();
LOG.info("<< " + resp);
}catch(Exception e){
LOG.error("发送失败:" + e.getMessage());
return;
}finally {
IOUtils.closeQuietly(socket);
}
}
}

public static void main(String[] args) throws Exception {
CyclicBarrier barrier = new CyclicBarrier(10); // 模拟并发消息
for (int i = 1; i <= 30; i++) {
new BioClient("127.0.0.1", 4040, i, barrier).start();
}
}
}

1.2. BioServer

BioServer中也比较简单,即监听端口,然后等待Socket,并从中读取消息,然后将消息和连接丢给线程池处理,当然这里只是简单模拟下消息的发送和接收,实际应用中还有消息协议、编解码、加解密、以及后面的TCP粘包/拆包等问题需要考虑。

https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/bio/BioServer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class BioServer extends Thread {

private static final Logger LOG = Logger.getLogger(BioServer.class);

private final CancellableExecutor executor = new CancellableExecutor(5, 5, 1);;

private final ServerSocket serverSocket;

public BioServer(int port) throws IOException {
this.serverSocket = new ServerSocket(port);
this.setName("server-accepter");
}

@Override
public void run() {
String msg = null;
LOG.info("启动监听...");
while (!interrupted()) {
try {
Socket socket = serverSocket.accept();
InputStream input = socket.getInputStream();

// 根据约定的消息协议和编解码,对读取的字节码进行解析,这里简单消息简单处理
byte[] bytes = new byte[1024];
input.read(bytes);
msg = new String(bytes);

// 根据协议选择对应的Handler进行处理,这里还是简单处理
executor.submit(new Handler(socket, msg));
} catch (RejectedExecutionException e){
if(isInterrupted()){
LOG.info("服务已经被关闭,保存或丢弃接收的消息:" + msg);
return;
}
LOG.warn("请求超过服务负载,保存或丢弃接收的消息:" + msg);
} catch (Exception e) {
LOG.error("消息接收异常," + msg , e);
}
}
}

public void shutdownNow() {
LOG.info("关闭服务,停止接收请求...");
interrupt();// 中断accepter

LOG.info("关闭任务线程池,中断正在处理事的任务,以及还在等待处理的任务");
List<Runnable> taskList = executor.shutdownNow();
if (!taskList.isEmpty()) {
for (Runnable task : taskList) {
CancellableFuture<?> future = (CancellableFuture<?>) task;
future.cancel(true); // 关闭socket,保存消息
LOG.warn("保存或丢弃未处理的消息:" + future.getMsg());
}
}

LOG.info("断开连接...");
IOUtils.closeQuietly(serverSocket);
}

public static void main(String[] args) throws Exception {
BioServer server = new BioServer(4040);
server.start();

Thread.sleep(5000);
server.shutdownNow();
}
}
1.2.1. CancellableHandler

另外,对于BioServer中使用的线程池,这里做了一个简单的扩展,主要是考虑服务关闭的问题,目的是希望在服务关闭时,能及时关闭正在执行的任务,并保存它们收到的信息。

https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/bio/handler/CancellableExecutor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class CancellableExecutor extends ThreadPoolExecutor {

public CancellableExecutor(int corePoolSize, int maximumPoolSize, long keepAliveSeconds) {
super(corePoolSize, maximumPoolSize, keepAliveSeconds, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(100), new HandleThreadFactory());
}

@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable,T value) {
if (runnable instanceof CancellableHandler) {
return new CancellableFuture<>(runnable, value);
} else {
return super.newTaskFor(runnable,value);
}
}

private static class HandleThreadFactory implements ThreadFactory {
private AtomicInteger index = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "server-handler-" + index.incrementAndGet());
}
}
}
https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/bio/handler/CancellableFuture.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class CancellableFuture<T> extends FutureTask<T> {

private CancellableHandler handler;

private String msg;

public CancellableFuture(Runnable runnable, T result) {
super(runnable, result);
if(runnable instanceof CancellableHandler){
this.handler = (CancellableHandler)runnable;
}
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if(handler != null){
this.msg = handler.cancel();
return true;
}else{
return super.cancel(mayInterruptIfRunning);
}
}

public String getMsg(){
return msg;
}
}
https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/bio/handler/CancellableHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class CancellableHandler implements Runnable {

private static final Logger LOG = Logger.getLogger(CancellableHandler.class);

private static SecureRandom random = new SecureRandom();

private final Socket socket;

private final String msg;

public CancellableHandler(Socket socket, String msg){
this.socket = socket;
this.msg = msg;
}

@Override
public void run() {
long beginTime = System.currentTimeMillis();
try(PrintWriter out = new PrintWriter(socket.getOutputStream(), true)){
LOG.info(">>:" + msg);
Thread.sleep(random.nextInt(1000)); // 模拟处理耗时

String response = "resp for " + msg;
out.println(response);
LOG.info("<<:" + response
+ " , cost=" + (System.currentTimeMillis() - beginTime) + "ms");
} catch (InterruptedException e) {
LOG.warn("中断处理,保存或丢弃消息:" + cancel());
} catch (Exception e) {
LOG.error("处理异常,保存或丢弃消息:" + cancel());
} finally {
IOUtils.closeQuietly(socket); // 对于短连接, 比如Htpp,直接在响应后关闭连接
}
}

public String cancel() {
IOUtils.closeQuietly(socket);
return msg;
}
}

2. Nio

NIO主要有三大核心部分:选择器Selector、通道Channel、缓冲区Buffer,这在相关的笔记中已经有过介绍,这里不再赘述

NIO的方式是面向缓存的,相对于BIO,这样最大的意义在于降低了通信过程中对应用线程的持有时间,这样降低的服务的资源消耗也就能支持更大规模的并发了。

2.1. NioClient

NioClient中创建了10个Sender线程,分别重复创建连接和发送消息,然后在一个统一的Accpeter线程中异步接收响应

https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/nio/NioClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
public class NioClient {

private static final Logger LOG = Logger.getLogger(NioClient.class);

private final SecureRandom random = new SecureRandom();

private final AtomicInteger msg_index = new AtomicInteger(0);

private final Selector selector;

private final int port;

private final String host;

public NioClient(String host, int port) throws IOException{
this.host = host;
this.port = port;
this.selector = Selector.open();
}

public void start() throws IOException{
for(int i = 1; i <= 10; i++){
new Sender(i).start();
}
new Accpeter().start();
}

private class Sender extends Thread {

public Sender(int index){
this.setName("client-sender-" + index);
}

@Override
public void run() {
try{
while (!interrupted()) {
// 这里先同步进行连接,当然也可以通过SelectionKey.OP_CONNECT去监听异步连接
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(true);
socketChannel.connect(new InetSocketAddress(host, port));
// 然后再设置为异步,进行消息发送
socketChannel.configureBlocking(false);

String msg = "msg" + msg_index.incrementAndGet();
byte[] bytes = msg.getBytes();
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();

LOG.info(">> send " + msg);
socketChannel.write(buffer); // 写入通道,并注册读事件
socketChannel.register(selector, SelectionKey.OP_READ);

sleep(random.nextInt(1000));
}
}catch(Exception e){
LOG.error("sender stopped, " + e.getMessage());
}
}
}

private class Accpeter extends Thread {

public Accpeter(){
this.setName("client-accpeter");
this.setDaemon(true);
}

@Override
public void run() {
try{
while (!interrupted()) {
selector.select();
for (Iterator<SelectionKey> it = selector.selectedKeys().iterator(); it.hasNext();) {
SelectionKey key = it.next();
if (!key.isValid()) {
it.remove();
continue;
}

SocketChannel channel = (SocketChannel) key.channel();
if(key.isReadable()){
it.remove();
ByteBuffer buffer = ByteBuffer.allocate(1024);
if(channel.read(buffer) > 0){
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);

String resp = new String(bytes);
LOG.info("<< " + resp);
}
}
}
}
}catch(Exception e){
LOG.error("accpeter stopped, " + e.getMessage());
}
}
}

public static void main(final String[] args) throws IOException {
new NioClient("127.0.0.1", 8080).start();
}
}

2.2. NioServer

NioClient选择怎样的方式其实并不敏感,主要还是在NioServer能体现NIO优势,注意这里创建处理线程的时机是在数据可读时,之前BioServer中是在客户端发起连接时就创建,而在客户端发起连接到服务端数据可读之间所需的时间几乎占了整个通信过程的绝大部分。因此,在BioServer中会存在很多处理线程阻塞在连接上,等待数据包分组到达,其实就是浪费了很多资源但其实什么事都没干。

https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/nio/NioServer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
public class NioServer extends Thread {

private static final Logger LOG = Logger.getLogger(NioServer.class);

private static SecureRandom random = new SecureRandom();

private final Selector selector;

private final ServerSocketChannel serverChannel;

public NioServer(int port) throws IOException {
this.selector = Selector.open();
this.serverChannel = ServerSocketChannel.open();
this.serverChannel.configureBlocking(false);
this.serverChannel.socket().bind(new InetSocketAddress(port));
this.serverChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("server-accepter");
LOG.info("启动监听...");
}

@Override
public void run() {
try {
while (!interrupted()) {
selector.select();
for (Iterator<SelectionKey> it = selector.selectedKeys().iterator(); it.hasNext();) {
SelectionKey key = it.next();
it.remove();
if (!key.isValid()) {
continue;
}

if (key.isAcceptable()) {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
}

if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
if(channel.read(buffer) > 0){
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);

String msg = new String(bytes);
new Handler(channel, msg).start();
}
}
}
}
} catch (Exception e) {
LOG.error("接收异常,",e);
}
}

private class Handler extends Thread {

private SocketChannel channel;

private String msg;

public Handler(SocketChannel channel, String msg){
this.channel = channel;
this.msg = msg;
}

@Override
public void run(){
LOG.info(">> recv " + msg);

long beginTime = System.currentTimeMillis();
try {
String response = "resp for " + msg;
byte[] bytes = response.getBytes();
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();

Thread.sleep(random.nextInt(100));
channel.write(buffer); // 这里假设是短消息一次写完
LOG.info("<< " + response + ", cost=" + (System.currentTimeMillis() - beginTime) + "ms");
} catch (Exception e){
LOG.error("处理异常", e);
}finally{
IOUtils.closeQuietly(channel);
}
}
}

public void shutdown(){
LOG.info("关闭服务,停止接收请求...");
interrupt(); //中断accepter

LOG.info("断开连接...");
IOUtils.closeQuietly(selector);
}

public static void main(String[] args) throws InterruptedException, IOException {
NioServer server = new NioServer(8080);
server.start();

Thread.sleep(5000);
server.shutdown();
}
}

3. Aio

NIO将应用线程由阻塞在Socket连接上,改进成阻塞在统一的Selector上,而AIO则真正意义上实现了基于事件回调的异步非阻塞模式。不过目前Java中的主流消息中间件还是基于NIO实现,比如大名鼎鼎的Netty,其它比如Tomcat中默认的方式也是NIO,至于原因,据Netty作者说是因为在Linux中通过AIO的方式实现并不能比NIO带来更高的性能。

下面通过Java 1.7封装的Api简单模拟一个消息收发过程,与上面的示例一样,这里仅仅是作为一个了解,或者作为更深入了解网络Socket编程的一个开始。

3.1. callback

3.1.1. AioClient

AioClient这里创建了一个长连接,在连接成功之后,无限循环发送消息,并在回调中进行响应读取

https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/aio/callback/AioClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class AioClient extends Thread {

private static final Logger LOG = Logger.getLogger(AioServer.class);

private final AsynchronousSocketChannel socketchannel;

private final CountDownLatch connectLatch = new CountDownLatch(1);

private final String host;

private final int port;

private int msg_index = 0;

public AioClient(String host, int port) throws IOException {
this.host = host;
this.port = port;
this.socketchannel = AsynchronousSocketChannel.open();
this.socketchannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
this.setName("client");
}

@Override
public void run() {
socketchannel.connect(new InetSocketAddress(host, port), connectLatch, new ClientConnectHandler()); // 异步连接
try{
connectLatch.await();
while (!interrupted()) {
String msg = "msg" + ++msg_index;
byte[] bytes = msg.getBytes();
ByteBuffer buffer = ByteBuffer.allocate(1024); // 简单消息,这里申请个固定的缓存
buffer.put(bytes);
buffer.flip();

LOG.info(">> send " + msg);
socketchannel.write(buffer, buffer, new ClientWriteHandler(socketchannel)); // 异步写消息,事件回调
Thread.sleep(1000);
}
} catch (Exception e) {
LOG.error("client stopped, " + e.getMessage());
}
}

public void shutdown() {
LOG.info("关闭客户端,停止发送...");
interrupt();

LOG.info("断开连接...");
IOUtils.closeQuietly(socketchannel);
}

public static void main(String[] args) throws IOException, InterruptedException {
AioClient client = new AioClient("127.0.0.1", 8080);
client.start();

Thread.sleep(8000);
client.shutdown();
}
}

连接事件回调,这里传入一个闭锁connectLatch,因为虽然步骤都是异步的,但在逻辑上发送消息应该要等到连接成功之后才能进行

https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/aio/callback/ClientConnectHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ClientConnectHandler implements CompletionHandler<Void, CountDownLatch> {

private static final Logger LOG = Logger.getLogger(ClientConnectHandler.class);

@Override
public void completed (Void result, CountDownLatch connectLatch) {
LOG.info("连接成功...");
connectLatch.countDown();
}

@Override
public void failed(Throwable e, CountDownLatch connectLatch) {
LOG.error("连接失败," + e.getMessage());
connectLatch.countDown();
}
}

写事件回调,即在写入完成之后进行读取

https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/aio/callback/ClientWriteHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ClientWriteHandler implements CompletionHandler<Integer, ByteBuffer> {

private static final Logger LOG = Logger.getLogger(ClientWriteHandler.class);

private AsynchronousSocketChannel socketchannel;

public ClientWriteHandler(AsynchronousSocketChannel clientChannel) {
this.socketchannel = clientChannel;
}

@Override
public void completed(final Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
socketchannel.write(buffer, buffer, this);
}else {
buffer.clear();
socketchannel.read(buffer, buffer, new ClientReadHandler(socketchannel)); // 异步读响应
}
}

@Override
public void failed(Throwable e, ByteBuffer attachment) {
LOG.error("write failed, " + e.getMessage());
IOUtils.closeQuietly(socketchannel);
}
}

读事件回调,这时操作系统已经帮忙将数据拷贝到了应用空间,即这里的buffer,所以直接读取就行

https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/aio/callback/ClientReadHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ClientReadHandler implements CompletionHandler<Integer, ByteBuffer> {

private static final Logger LOG = Logger.getLogger(ClientReadHandler.class);

private AsynchronousSocketChannel socketchannel;

public ClientReadHandler(AsynchronousSocketChannel clientChannel) {
this.socketchannel = clientChannel;
}

@Override
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);

String resp = new String(bytes);
LOG.info("<< " + resp);
}

@Override
public void failed(final Throwable e, ByteBuffer attachment) {
LOG.error("read failed, " + e.getMessage());
IOUtils.closeQuietly(socketchannel);
}
}
3.1.2. AioServer

AioServer中逻辑很少,只是创建一个监听并注册一个监听回调处理

https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/aio/callback/AioServer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class AioServer extends Thread {

private static final Logger LOG = Logger.getLogger(AioServer.class);

private final AsynchronousServerSocketChannel serverSocketChannel;

private ServerAcceptHandler acceptHandler = new ServerAcceptHandler();

public AioServer() throws IOException {
ExecutorService threadPool = new ThreadPoolExecutor(
0, 10, 5000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(threadPool);
serverSocketChannel = AsynchronousServerSocketChannel.open(group); // 指定任务线程组
serverSocketChannel.bind(new InetSocketAddress(8080));
serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
this.setName("server");
LOG.info("启动监听...");
}

public AsynchronousServerSocketChannel getServerChannel() {
return serverSocketChannel;
}

@Override
public void run() {
serverSocketChannel.accept(this, acceptHandler);
}

public void shutDown(){
LOG.info("关闭服务,停止接收请求...");
interrupt(); //中断accepter

LOG.info("断开连接...");
IOUtils.closeQuietly(serverSocketChannel);
}

public static void main(String[] args) throws InterruptedException, IOException {
AioServer server = new AioServer();
server.start();

Thread.sleep(3000); // 只要客户端一直在发消息保证任务线程存活,服务就不会关闭
}
}

监听回调,当听到一个连接后,直接申请一个Buffer并注册一个读事件回调器即可,操作系统会负责将数据拷贝到Buffer并帮忙调用读事件回调器,然后继续监听下一次连接

https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/aio/callback/ServerAcceptHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ServerAcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AioServer> {

private static final Logger LOG = Logger.getLogger(ServerAcceptHandler.class);

@Override
public void completed(AsynchronousSocketChannel serverSocketChannel, AioServer server) {
// 注册异步读事件
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
serverSocketChannel.read(readBuffer, readBuffer, new ServerReadHandler(serverSocketChannel));
// 继续监听下一次消息
server.getServerChannel().accept(server, this);
}

@Override
public void failed(Throwable e, AioServer server) {
LOG.error("accept failed, " + e.getMessage());
}
}

读事件回调,即读取Buffer中的数据,然后写回响应并注册一个写事件回调器

https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/aio/callback/ServerReadHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ServerReadHandler implements CompletionHandler<Integer, ByteBuffer> {

private static final Logger LOG = Logger.getLogger(ServerReadHandler.class);

private AsynchronousSocketChannel serverSocketChannel;

public ServerReadHandler(AsynchronousSocketChannel serverSocketChannel) {
this.serverSocketChannel = serverSocketChannel;
}
@Override
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String msg = new String(bytes);
LOG.info(">> accept " + msg);

long beginTime = System.currentTimeMillis();
String response = "resp for " + msg;
bytes = response.getBytes();

buffer.clear();
buffer.put(bytes);
buffer.flip();
try {
Thread.sleep(100); // 模拟处理耗时
} catch (InterruptedException e) {
e.printStackTrace();
}
LOG.info("<< " + response + ", cost=" + (System.currentTimeMillis() - beginTime) + "ms");
serverSocketChannel.write(buffer, buffer, new ServerWriteHandler(serverSocketChannel));
}

@Override
public void failed(Throwable e, ByteBuffer attachment) {
LOG.error("read failed, " + e.getMessage());
IOUtils.closeQuietly(serverSocketChannel);
}
}

写事件回调中根据是短连接还是长连接,可以选择是关闭通道,还是继续注册一个读事件回调

https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/aio/callback/ServerWriteHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ServerWriteHandler implements CompletionHandler<Integer, ByteBuffer> {

private static final Logger LOG = Logger.getLogger(ServerWriteHandler.class);

private AsynchronousSocketChannel serverSocketChannel;

public ServerWriteHandler(AsynchronousSocketChannel serverSocketChannel) {
this.serverSocketChannel = serverSocketChannel;
}

@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining())
//如果没有发送完,就继续发送直到完成
serverSocketChannel.write(buffer, buffer, this);
else{
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
serverSocketChannel.read(readBuffer, readBuffer, new ServerReadHandler(serverSocketChannel));
}
}

@Override
public void failed(Throwable e, ByteBuffer attachment) {
LOG.error("write failed, " + e.getMessage());
IOUtils.closeQuietly(serverSocketChannel);
}
}

3.2. future

AIO的也可以通过同步方式进行通信,通过事件返回的Future,进行get()阻塞调用,具体示例如下,比较简单,就不赘述了

3.2.1. AioClient
https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/aio/future/AioClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class AioClient extends Thread {

private static final Logger LOG = Logger.getLogger(AioClient.class);

private final AsynchronousSocketChannel socketChannel;

private final String host;

private final int port;

private int msg_index = 0;

public AioClient(String host, int port) throws IOException {
this.host = host;
this.port = port;
this.socketChannel = AsynchronousSocketChannel.open();
this.setName("client");
}

@Override
public void run(){
Void isConnect = null;
try {
isConnect = socketChannel.connect(new InetSocketAddress(host, port)).get();//阻塞
} catch (Exception e) {
LOG.error("连接异常," + e.getMessage());
IOUtils.closeQuietly(socketChannel);
return;
}
if(!(isConnect == null)){
IOUtils.closeQuietly(socketChannel);
LOG.error("连接失败");
}

LOG.info("连接成功"); // 返回null表示连接成功
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
while (!interrupted()) {
String msg = "msg" + ++msg_index;
byte[] bytes = msg.getBytes();
buffer.clear();
buffer.put(bytes);
buffer.flip();
LOG.info(">> send " + msg);
socketChannel.write(buffer).get();

buffer.clear();
socketChannel.read(buffer).get();
buffer.flip();
bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String resp = new String(bytes);
LOG.info("<< " + resp);
}
}catch (Exception e) {
LOG.error("client stopped, " + e.getMessage());
IOUtils.closeQuietly(socketChannel);
}
}

public void shutdown() {
LOG.info("关闭客户端,停止发送...");
interrupt();

LOG.info("断开连接...");
IOUtils.closeQuietly(socketChannel);
}

public static void main(String[] args) throws InterruptedException, IOException {
AioClient client = new AioClient("127.0.0.1", 7070);
client.start();

Thread.sleep(5000);
client.shutdown();
}
}
3.2.2. AioServer
https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/aio/future/AioServer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public class AioServer extends Thread{ 

private static final Logger LOG = Logger.getLogger(AioServer.class);

private static SecureRandom random = new SecureRandom();

private final AsynchronousServerSocketChannel serverSocketChannel;

public AioServer(int port) throws IOException{
ExecutorService threadPool = new ThreadPoolExecutor(
0, 10, 5000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(threadPool);
this.serverSocketChannel = AsynchronousServerSocketChannel.open(group);
this.serverSocketChannel.bind(new InetSocketAddress(port));
this.setName("server");
}

@Override
public void run(){
LOG.info("启动监听...");
try{
while (!interrupted()) {
AsynchronousSocketChannel socketChannel = serverSocketChannel.accept().get(); // 同步等待消息
new Handler(socketChannel).start(); // 这里只有一个连接,所以没有用线程池,不过这里是长链接
}
}catch(Exception e){
LOG.error("server stopped, " + e.getMessage());
return;
}
}

private class Handler extends Thread {

private final AsynchronousSocketChannel socketChannel;

public Handler(AsynchronousSocketChannel socketChannel){
this.socketChannel = socketChannel;
this.setName("handler");
}

@Override
public void run() {
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
try{
while (!interrupted()) {
buffer.clear();
socketChannel.read(buffer).get(); // 同步读取
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String msg = new String(bytes);
LOG.info(">> accept " + msg);

long beginTime = System.currentTimeMillis();
Thread.sleep(random.nextInt(100));

String response = "resp for " + msg;
bytes = response.getBytes();
buffer.clear();
buffer.put(bytes);
buffer.flip();
socketChannel.write(buffer); // 这里没有同步等待
LOG.info("<< " + response + ", cost=" + (System.currentTimeMillis() - beginTime) + "ms");
}
}catch(Exception e){
LOG.error("handle error, " + e.getMessage());
shutdown();
}
}
}

public void shutdown(){
LOG.info("关闭服务,停止接收请求...");
interrupt(); //中断accepter

LOG.info("断开连接...");
IOUtils.closeQuietly(serverSocketChannel);
}

public static void main(String[] args) throws IOException, InterruptedException {
AioServer server = new AioServer(7070);
server.start();
}
}


参考:

  1. 《Netty权威指南》
  2. http://www.tianshouzhi.com/api/tutorials/netty/221