对于网络I/O通信,其实站在应用程序的角度,就是面向Socket进行读写的过程,至于如何建立连接,数据包如何传输等问题则由TCP/IP以及更底层的协议保障,这些通常由操作系统和底层硬件负责实现,它们的目的就是希望能简化应用程序的开发。
在Linux系统中,一切皆文件,所以Socket也统一抽象为一个文件描述符,那么对应用程序来讲,收发消息就是对一个文件描述符进行读写。但是它与读写普通的本地磁盘文件也有区别,由于网络的延迟和不可靠性,在读Socket的过程中可能会有大量的时间花在等待数据包的到达上,因此如何提高应用程序在读写Socket过程中的响应性,就成了网络通信编程中一个很重要的问题。
针对网络I/O,Linux提供了五种IO模型,具体可以参考笔记
不过Java并没有全部支持,而是逐步选择支持了三种模型:
在JDK1.4之前,Java的IO模型只支持阻塞式IO(Blocking IO),简称为BIO;
在JDK1.4时,Java支持了I/O多路复用模型, 简称NIO,即新IO模型,不过现在JDK1.8早已成为主流版本,已经没什么新鲜了,所以更多的人愿意将它理解为非阻塞I/O,即None-Blocking IO;
在JDK1.7时,Java对NIO包进行了升级,支持异步I/O(Asynchronous IO),简称为AIO,因为是对nio包的升级,所有也称为NIO2.0;
1. Bio
最简单的办法是每建立一个连接,就创建一个连接处理线程,但这样问题是当连接数很多时,必然会耗尽服务器资源。于是好一点的做法是将连接处理丢给线程池,这样可以对使用的资源做一个限制,比如当大量连接过来时,如果线程池任务队列已满,那么可以拒绝处理。
这样的处理方式是直接面向连接的,也就是一旦建立连接,服务端就创建对应的处理,并且整个过程是同步阻塞的,因此很难应付高并发场景。
1.1. BioClient
BioClient
中的逻辑很简单,就是循环向指定服务发送消息,不过这里采用的短连接,客户端每次发消息都重新建立连接,发送消息后同步等待响应,然后由服务端在写回响应后负责关闭。另外,为了模拟并发场景,这里创建了多个客户端,并用了一个栅栏,尽量等到一批客户端都准备好时,再同时发起连接
:https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/bio/BioClient.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| public class BioClient extends Thread {
private static final Logger LOG = Logger.getLogger(BioClient.class); private static AtomicInteger msg_index = new AtomicInteger(0);
private static SecureRandom random = new SecureRandom(); private final CyclicBarrier barrier;
private final String host;
private final int port;
public BioClient(String host, int port, int index, CyclicBarrier barrier) throws Exception { this.host = host; this.port = port; this.barrier = barrier; this.setName("client-" + index); }
@Override public void run() { while (true) { Socket socket = null; try { sleep(random.nextInt(1000)); barrier.await(3000, TimeUnit.MILLISECONDS); socket = new Socket(host, port); } catch (Exception e) { LOG.error("连接失败:" + e.getMessage()); return; }
try(OutputStream out = socket.getOutputStream(); BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()))){ String msg = "msg" + msg_index.incrementAndGet(); LOG.info(">> send " + msg); out.write(msg.getBytes()); String resp = in.readLine(); LOG.info("<< " + resp); }catch(Exception e){ LOG.error("发送失败:" + e.getMessage()); return; }finally { IOUtils.closeQuietly(socket); } } } public static void main(String[] args) throws Exception { CyclicBarrier barrier = new CyclicBarrier(10); for (int i = 1; i <= 30; i++) { new BioClient("127.0.0.1", 4040, i, barrier).start(); } } }
|
1.2. BioServer
BioServer
中也比较简单,即监听端口,然后等待Socket
,并从中读取消息,然后将消息和连接丢给线程池处理,当然这里只是简单模拟下消息的发送和接收,实际应用中还有消息协议、编解码、加解密、以及后面的TCP粘包/拆包等问题需要考虑。
:https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/bio/BioServer.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| public class BioServer extends Thread {
private static final Logger LOG = Logger.getLogger(BioServer.class); private final CancellableExecutor executor = new CancellableExecutor(5, 5, 1);;
private final ServerSocket serverSocket;
public BioServer(int port) throws IOException { this.serverSocket = new ServerSocket(port); this.setName("server-accepter"); }
@Override public void run() { String msg = null; LOG.info("启动监听..."); while (!interrupted()) { try { Socket socket = serverSocket.accept(); InputStream input = socket.getInputStream(); byte[] bytes = new byte[1024]; input.read(bytes); msg = new String(bytes); executor.submit(new Handler(socket, msg)); } catch (RejectedExecutionException e){ if(isInterrupted()){ LOG.info("服务已经被关闭,保存或丢弃接收的消息:" + msg); return; } LOG.warn("请求超过服务负载,保存或丢弃接收的消息:" + msg); } catch (Exception e) { LOG.error("消息接收异常," + msg , e); } } }
public void shutdownNow() { LOG.info("关闭服务,停止接收请求..."); interrupt();
LOG.info("关闭任务线程池,中断正在处理事的任务,以及还在等待处理的任务"); List<Runnable> taskList = executor.shutdownNow(); if (!taskList.isEmpty()) { for (Runnable task : taskList) { CancellableFuture<?> future = (CancellableFuture<?>) task; future.cancel(true); LOG.warn("保存或丢弃未处理的消息:" + future.getMsg()); } } LOG.info("断开连接..."); IOUtils.closeQuietly(serverSocket); } public static void main(String[] args) throws Exception { BioServer server = new BioServer(4040); server.start();
Thread.sleep(5000); server.shutdownNow(); } }
|
1.2.1. CancellableHandler
另外,对于BioServer
中使用的线程池,这里做了一个简单的扩展,主要是考虑服务关闭的问题,目的是希望在服务关闭时,能及时关闭正在执行的任务,并保存它们收到的信息。
:https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/bio/handler/CancellableExecutor.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class CancellableExecutor extends ThreadPoolExecutor { public CancellableExecutor(int corePoolSize, int maximumPoolSize, long keepAliveSeconds) { super(corePoolSize, maximumPoolSize, keepAliveSeconds, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(100), new HandleThreadFactory()); }
@Override protected <T> RunnableFuture<T> newTaskFor(Runnable runnable,T value) { if (runnable instanceof CancellableHandler) { return new CancellableFuture<>(runnable, value); } else { return super.newTaskFor(runnable,value); } }
private static class HandleThreadFactory implements ThreadFactory { private AtomicInteger index = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "server-handler-" + index.incrementAndGet()); } } }
|
:https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/bio/handler/CancellableFuture.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class CancellableFuture<T> extends FutureTask<T> {
private CancellableHandler handler; private String msg;
public CancellableFuture(Runnable runnable, T result) { super(runnable, result); if(runnable instanceof CancellableHandler){ this.handler = (CancellableHandler)runnable; } }
@Override public boolean cancel(boolean mayInterruptIfRunning) { if(handler != null){ this.msg = handler.cancel(); return true; }else{ return super.cancel(mayInterruptIfRunning); } }
public String getMsg(){ return msg; } }
|
:https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/bio/handler/CancellableHandler.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public class CancellableHandler implements Runnable {
private static final Logger LOG = Logger.getLogger(CancellableHandler.class);
private static SecureRandom random = new SecureRandom();
private final Socket socket;
private final String msg;
public CancellableHandler(Socket socket, String msg){ this.socket = socket; this.msg = msg; }
@Override public void run() { long beginTime = System.currentTimeMillis(); try(PrintWriter out = new PrintWriter(socket.getOutputStream(), true)){ LOG.info(">>:" + msg); Thread.sleep(random.nextInt(1000));
String response = "resp for " + msg; out.println(response); LOG.info("<<:" + response + " , cost=" + (System.currentTimeMillis() - beginTime) + "ms"); } catch (InterruptedException e) { LOG.warn("中断处理,保存或丢弃消息:" + cancel()); } catch (Exception e) { LOG.error("处理异常,保存或丢弃消息:" + cancel()); } finally { IOUtils.closeQuietly(socket); } }
public String cancel() { IOUtils.closeQuietly(socket); return msg; } }
|
2. Nio
NIO主要有三大核心部分:选择器Selector、通道Channel、缓冲区Buffer,这在相关的笔记中已经有过介绍,这里不再赘述
NIO的方式是面向缓存的,相对于BIO,这样最大的意义在于降低了通信过程中对应用线程的持有时间,这样降低的服务的资源消耗也就能支持更大规模的并发了。
2.1. NioClient
NioClient
中创建了10个Sender
线程,分别重复创建连接和发送消息,然后在一个统一的Accpeter
线程中异步接收响应
:https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/nio/NioClient.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
| public class NioClient {
private static final Logger LOG = Logger.getLogger(NioClient.class);
private final SecureRandom random = new SecureRandom();
private final AtomicInteger msg_index = new AtomicInteger(0);
private final Selector selector;
private final int port;
private final String host;
public NioClient(String host, int port) throws IOException{ this.host = host; this.port = port; this.selector = Selector.open(); }
public void start() throws IOException{ for(int i = 1; i <= 10; i++){ new Sender(i).start(); } new Accpeter().start(); }
private class Sender extends Thread {
public Sender(int index){ this.setName("client-sender-" + index); }
@Override public void run() { try{ while (!interrupted()) { SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(true); socketChannel.connect(new InetSocketAddress(host, port)); socketChannel.configureBlocking(false);
String msg = "msg" + msg_index.incrementAndGet(); byte[] bytes = msg.getBytes(); ByteBuffer buffer = ByteBuffer.allocate(bytes.length); buffer.put(bytes); buffer.flip();
LOG.info(">> send " + msg); socketChannel.write(buffer); socketChannel.register(selector, SelectionKey.OP_READ); sleep(random.nextInt(1000)); } }catch(Exception e){ LOG.error("sender stopped, " + e.getMessage()); } } }
private class Accpeter extends Thread {
public Accpeter(){ this.setName("client-accpeter"); this.setDaemon(true); }
@Override public void run() { try{ while (!interrupted()) { selector.select(); for (Iterator<SelectionKey> it = selector.selectedKeys().iterator(); it.hasNext();) { SelectionKey key = it.next(); if (!key.isValid()) { it.remove(); continue; }
SocketChannel channel = (SocketChannel) key.channel(); if(key.isReadable()){ it.remove(); ByteBuffer buffer = ByteBuffer.allocate(1024); if(channel.read(buffer) > 0){ buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes);
String resp = new String(bytes); LOG.info("<< " + resp); } } } } }catch(Exception e){ LOG.error("accpeter stopped, " + e.getMessage()); } } }
public static void main(final String[] args) throws IOException { new NioClient("127.0.0.1", 8080).start(); } }
|
2.2. NioServer
NioClient
选择怎样的方式其实并不敏感,主要还是在NioServer
能体现NIO优势,注意这里创建处理线程的时机是在数据可读时,之前BioServer
中是在客户端发起连接时就创建,而在客户端发起连接到服务端数据可读之间所需的时间几乎占了整个通信过程的绝大部分。因此,在BioServer
中会存在很多处理线程阻塞在连接上,等待数据包分组到达,其实就是浪费了很多资源但其实什么事都没干。
:https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/nio/NioServer.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
| public class NioServer extends Thread {
private static final Logger LOG = Logger.getLogger(NioServer.class); private static SecureRandom random = new SecureRandom();
private final Selector selector;
private final ServerSocketChannel serverChannel;
public NioServer(int port) throws IOException { this.selector = Selector.open(); this.serverChannel = ServerSocketChannel.open(); this.serverChannel.configureBlocking(false); this.serverChannel.socket().bind(new InetSocketAddress(port)); this.serverChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("server-accepter"); LOG.info("启动监听..."); }
@Override public void run() { try { while (!interrupted()) { selector.select(); for (Iterator<SelectionKey> it = selector.selectedKeys().iterator(); it.hasNext();) { SelectionKey key = it.next(); it.remove(); if (!key.isValid()) { continue; }
if (key.isAcceptable()) { ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); SocketChannel channel = serverChannel.accept(); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); }
if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); if(channel.read(buffer) > 0){ buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes);
String msg = new String(bytes); new Handler(channel, msg).start(); } } } } } catch (Exception e) { LOG.error("接收异常,",e); } }
private class Handler extends Thread {
private SocketChannel channel;
private String msg;
public Handler(SocketChannel channel, String msg){ this.channel = channel; this.msg = msg; }
@Override public void run(){ LOG.info(">> recv " + msg); long beginTime = System.currentTimeMillis(); try { String response = "resp for " + msg; byte[] bytes = response.getBytes(); ByteBuffer buffer = ByteBuffer.allocate(bytes.length); buffer.put(bytes); buffer.flip();
Thread.sleep(random.nextInt(100)); channel.write(buffer); LOG.info("<< " + response + ", cost=" + (System.currentTimeMillis() - beginTime) + "ms"); } catch (Exception e){ LOG.error("处理异常", e); }finally{ IOUtils.closeQuietly(channel); } } }
public void shutdown(){ LOG.info("关闭服务,停止接收请求..."); interrupt(); LOG.info("断开连接..."); IOUtils.closeQuietly(selector); }
public static void main(String[] args) throws InterruptedException, IOException { NioServer server = new NioServer(8080); server.start();
Thread.sleep(5000); server.shutdown(); } }
|
3. Aio
NIO将应用线程由阻塞在Socket连接上,改进成阻塞在统一的Selector上,而AIO则真正意义上实现了基于事件回调的异步非阻塞模式。不过目前Java中的主流消息中间件还是基于NIO实现,比如大名鼎鼎的Netty,其它比如Tomcat中默认的方式也是NIO,至于原因,据Netty作者说是因为在Linux中通过AIO的方式实现并不能比NIO带来更高的性能。
下面通过Java 1.7封装的Api简单模拟一个消息收发过程,与上面的示例一样,这里仅仅是作为一个了解,或者作为更深入了解网络Socket编程的一个开始。
3.1. callback
3.1.1. AioClient
AioClient
这里创建了一个长连接,在连接成功之后,无限循环发送消息,并在回调中进行响应读取
:https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/aio/callback/AioClient.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| public class AioClient extends Thread {
private static final Logger LOG = Logger.getLogger(AioServer.class);
private final AsynchronousSocketChannel socketchannel; private final CountDownLatch connectLatch = new CountDownLatch(1); private final String host;
private final int port;
private int msg_index = 0;
public AioClient(String host, int port) throws IOException { this.host = host; this.port = port; this.socketchannel = AsynchronousSocketChannel.open(); this.socketchannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); this.setName("client"); }
@Override public void run() { socketchannel.connect(new InetSocketAddress(host, port), connectLatch, new ClientConnectHandler()); try{ connectLatch.await(); while (!interrupted()) { String msg = "msg" + ++msg_index; byte[] bytes = msg.getBytes(); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put(bytes); buffer.flip();
LOG.info(">> send " + msg); socketchannel.write(buffer, buffer, new ClientWriteHandler(socketchannel)); Thread.sleep(1000); } } catch (Exception e) { LOG.error("client stopped, " + e.getMessage()); } } public void shutdown() { LOG.info("关闭客户端,停止发送..."); interrupt(); LOG.info("断开连接..."); IOUtils.closeQuietly(socketchannel); }
public static void main(String[] args) throws IOException, InterruptedException { AioClient client = new AioClient("127.0.0.1", 8080); client.start(); Thread.sleep(8000); client.shutdown(); } }
|
连接事件回调,这里传入一个闭锁connectLatch
,因为虽然步骤都是异步的,但在逻辑上发送消息应该要等到连接成功之后才能进行
:https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/aio/callback/ClientConnectHandler.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class ClientConnectHandler implements CompletionHandler<Void, CountDownLatch> { private static final Logger LOG = Logger.getLogger(ClientConnectHandler.class); @Override public void completed (Void result, CountDownLatch connectLatch) { LOG.info("连接成功..."); connectLatch.countDown(); }
@Override public void failed(Throwable e, CountDownLatch connectLatch) { LOG.error("连接失败," + e.getMessage()); connectLatch.countDown(); } }
|
写事件回调,即在写入完成之后进行读取
:https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/aio/callback/ClientWriteHandler.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class ClientWriteHandler implements CompletionHandler<Integer, ByteBuffer> {
private static final Logger LOG = Logger.getLogger(ClientWriteHandler.class);
private AsynchronousSocketChannel socketchannel;
public ClientWriteHandler(AsynchronousSocketChannel clientChannel) { this.socketchannel = clientChannel; }
@Override public void completed(final Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) { socketchannel.write(buffer, buffer, this); }else { buffer.clear(); socketchannel.read(buffer, buffer, new ClientReadHandler(socketchannel)); } }
@Override public void failed(Throwable e, ByteBuffer attachment) { LOG.error("write failed, " + e.getMessage()); IOUtils.closeQuietly(socketchannel); } }
|
读事件回调,这时操作系统已经帮忙将数据拷贝到了应用空间,即这里的buffer
,所以直接读取就行
:https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/aio/callback/ClientReadHandler.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class ClientReadHandler implements CompletionHandler<Integer, ByteBuffer> { private static final Logger LOG = Logger.getLogger(ClientReadHandler.class); private AsynchronousSocketChannel socketchannel; public ClientReadHandler(AsynchronousSocketChannel clientChannel) { this.socketchannel = clientChannel; }
@Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String resp = new String(bytes); LOG.info("<< " + resp); }
@Override public void failed(final Throwable e, ByteBuffer attachment) { LOG.error("read failed, " + e.getMessage()); IOUtils.closeQuietly(socketchannel); } }
|
3.1.2. AioServer
AioServer
中逻辑很少,只是创建一个监听并注册一个监听回调处理
:https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/aio/callback/AioServer.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class AioServer extends Thread {
private static final Logger LOG = Logger.getLogger(AioServer.class);
private final AsynchronousServerSocketChannel serverSocketChannel;
private ServerAcceptHandler acceptHandler = new ServerAcceptHandler();
public AioServer() throws IOException { ExecutorService threadPool = new ThreadPoolExecutor( 0, 10, 5000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(threadPool); serverSocketChannel = AsynchronousServerSocketChannel.open(group); serverSocketChannel.bind(new InetSocketAddress(8080)); serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); this.setName("server"); LOG.info("启动监听..."); }
public AsynchronousServerSocketChannel getServerChannel() { return serverSocketChannel; }
@Override public void run() { serverSocketChannel.accept(this, acceptHandler); }
public void shutDown(){ LOG.info("关闭服务,停止接收请求..."); interrupt();
LOG.info("断开连接..."); IOUtils.closeQuietly(serverSocketChannel); }
public static void main(String[] args) throws InterruptedException, IOException { AioServer server = new AioServer(); server.start();
Thread.sleep(3000); } }
|
监听回调,当听到一个连接后,直接申请一个Buffer并注册一个读事件回调器即可,操作系统会负责将数据拷贝到Buffer并帮忙调用读事件回调器,然后继续监听下一次连接
:https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/aio/callback/ServerAcceptHandler.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class ServerAcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AioServer> { private static final Logger LOG = Logger.getLogger(ServerAcceptHandler.class);
@Override public void completed(AsynchronousSocketChannel serverSocketChannel, AioServer server) { ByteBuffer readBuffer = ByteBuffer.allocate(1024); serverSocketChannel.read(readBuffer, readBuffer, new ServerReadHandler(serverSocketChannel)); server.getServerChannel().accept(server, this); }
@Override public void failed(Throwable e, AioServer server) { LOG.error("accept failed, " + e.getMessage()); } }
|
读事件回调,即读取Buffer中的数据,然后写回响应并注册一个写事件回调器
:https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/aio/callback/ServerReadHandler.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public class ServerReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private static final Logger LOG = Logger.getLogger(ServerReadHandler.class);
private AsynchronousSocketChannel serverSocketChannel;
public ServerReadHandler(AsynchronousSocketChannel serverSocketChannel) { this.serverSocketChannel = serverSocketChannel; } @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String msg = new String(bytes); LOG.info(">> accept " + msg);
long beginTime = System.currentTimeMillis(); String response = "resp for " + msg; bytes = response.getBytes();
buffer.clear(); buffer.put(bytes); buffer.flip(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } LOG.info("<< " + response + ", cost=" + (System.currentTimeMillis() - beginTime) + "ms"); serverSocketChannel.write(buffer, buffer, new ServerWriteHandler(serverSocketChannel)); }
@Override public void failed(Throwable e, ByteBuffer attachment) { LOG.error("read failed, " + e.getMessage()); IOUtils.closeQuietly(serverSocketChannel); } }
|
写事件回调中根据是短连接还是长连接,可以选择是关闭通道,还是继续注册一个读事件回调
:https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/aio/callback/ServerWriteHandler.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class ServerWriteHandler implements CompletionHandler<Integer, ByteBuffer> { private static final Logger LOG = Logger.getLogger(ServerWriteHandler.class);
private AsynchronousSocketChannel serverSocketChannel; public ServerWriteHandler(AsynchronousSocketChannel serverSocketChannel) { this.serverSocketChannel = serverSocketChannel; }
@Override public void completed(Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) serverSocketChannel.write(buffer, buffer, this); else{ ByteBuffer readBuffer = ByteBuffer.allocate(1024); serverSocketChannel.read(readBuffer, readBuffer, new ServerReadHandler(serverSocketChannel)); } }
@Override public void failed(Throwable e, ByteBuffer attachment) { LOG.error("write failed, " + e.getMessage()); IOUtils.closeQuietly(serverSocketChannel); } }
|
3.2. future
AIO的也可以通过同步方式进行通信,通过事件返回的Future
,进行get()
阻塞调用,具体示例如下,比较简单,就不赘述了
3.2.1. AioClient
:https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/aio/future/AioClient.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| public class AioClient extends Thread {
private static final Logger LOG = Logger.getLogger(AioClient.class);
private final AsynchronousSocketChannel socketChannel;
private final String host;
private final int port;
private int msg_index = 0;
public AioClient(String host, int port) throws IOException { this.host = host; this.port = port; this.socketChannel = AsynchronousSocketChannel.open(); this.setName("client"); }
@Override public void run(){ Void isConnect = null; try { isConnect = socketChannel.connect(new InetSocketAddress(host, port)).get(); } catch (Exception e) { LOG.error("连接异常," + e.getMessage()); IOUtils.closeQuietly(socketChannel); return; } if(!(isConnect == null)){ IOUtils.closeQuietly(socketChannel); LOG.error("连接失败"); }
LOG.info("连接成功"); ByteBuffer buffer = ByteBuffer.allocate(1024); try { while (!interrupted()) { String msg = "msg" + ++msg_index; byte[] bytes = msg.getBytes(); buffer.clear(); buffer.put(bytes); buffer.flip(); LOG.info(">> send " + msg); socketChannel.write(buffer).get();
buffer.clear(); socketChannel.read(buffer).get(); buffer.flip(); bytes = new byte[buffer.remaining()]; buffer.get(bytes); String resp = new String(bytes); LOG.info("<< " + resp); } }catch (Exception e) { LOG.error("client stopped, " + e.getMessage()); IOUtils.closeQuietly(socketChannel); } } public void shutdown() { LOG.info("关闭客户端,停止发送..."); interrupt(); LOG.info("断开连接..."); IOUtils.closeQuietly(socketChannel); }
public static void main(String[] args) throws InterruptedException, IOException { AioClient client = new AioClient("127.0.0.1", 7070); client.start(); Thread.sleep(5000); client.shutdown(); } }
|
3.2.2. AioServer
:https://github.com/shanhm1991/Echo/blob/master/src/main/java/io/github/echo/io/aio/future/AioServer.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| public class AioServer extends Thread{
private static final Logger LOG = Logger.getLogger(AioServer.class); private static SecureRandom random = new SecureRandom();
private final AsynchronousServerSocketChannel serverSocketChannel;
public AioServer(int port) throws IOException{ ExecutorService threadPool = new ThreadPoolExecutor( 0, 10, 5000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(threadPool); this.serverSocketChannel = AsynchronousServerSocketChannel.open(group); this.serverSocketChannel.bind(new InetSocketAddress(port)); this.setName("server"); }
@Override public void run(){ LOG.info("启动监听..."); try{ while (!interrupted()) { AsynchronousSocketChannel socketChannel = serverSocketChannel.accept().get(); new Handler(socketChannel).start(); } }catch(Exception e){ LOG.error("server stopped, " + e.getMessage()); return; } } private class Handler extends Thread { private final AsynchronousSocketChannel socketChannel; public Handler(AsynchronousSocketChannel socketChannel){ this.socketChannel = socketChannel; this.setName("handler"); } @Override public void run() { ByteBuffer buffer = ByteBuffer.allocateDirect(1024); try{ while (!interrupted()) { buffer.clear(); socketChannel.read(buffer).get(); buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String msg = new String(bytes); LOG.info(">> accept " + msg); long beginTime = System.currentTimeMillis(); Thread.sleep(random.nextInt(100)); String response = "resp for " + msg; bytes = response.getBytes(); buffer.clear(); buffer.put(bytes); buffer.flip(); socketChannel.write(buffer); LOG.info("<< " + response + ", cost=" + (System.currentTimeMillis() - beginTime) + "ms"); } }catch(Exception e){ LOG.error("handle error, " + e.getMessage()); shutdown(); } } } public void shutdown(){ LOG.info("关闭服务,停止接收请求..."); interrupt();
LOG.info("断开连接..."); IOUtils.closeQuietly(serverSocketChannel); }
public static void main(String[] args) throws IOException, InterruptedException { AioServer server = new AioServer(7070); server.start(); } }
|
参考:
- 《Netty权威指南》
- http://www.tianshouzhi.com/api/tutorials/netty/221