AIO是java中IO模型的一种,作为NIO的改进和增强随JDK1.7版本更新被集成在JDK的nio包中,因此AIO也被称作是NIO2.0。区别于传统的BIO(Blocking IO,同步阻塞式模型,JDK1.4之前就存在于JDK中,NIO于JDK1.4版本发布更新)的阻塞式读写,AIO提供了从建立连接到读、写的全异步操作。AIO可用于异步的文件读写和网络通信。
异步IO则采用“订阅-通知”模式:即应用程序向操作系统注册IO监听,然后继续做自己的事情。当操作系统发生IO事件,并且准备好数据后,在主动通知应用程序,触发相应的函数。
点击图片可以放大查看 JAVA AIO框架在windows下使用windows IOCP技术,在Linux下使用epoll多路复用IO技术模拟异步IO,这个从JAVA AIO框架的部分类设计上就可以看出来。例如框架中,在Windows下负责实现套接字通道的具体类是“sun.nio.ch.WindowsAsynchronousSocketChannelImpl”,在Linux下负责实现套接字通道的具体类是“sun.nio.ch.UnixAsynchronousServerSocketChannelImpl”
实现一个最简单的AIO socket通信server、client,主要需要这些相关的类和接口:
AsynchronousServerSocketChannel 服务端Socket通道类,负责服务端Socket的创建和监听;
AsynchronousSocketChannel 客户端Socket通道类,负责客户端消息读写;
CompletionHandler<A,V> 消息处理回调接口,是一个负责消费异步IO操作结果的消息处理器;
ByteBuffer 负责承载通信过程中需要读、写的消息。
此外,还有可选的用于异步通道资源共享的AsynchronousChannelGroup类,接下来将一一介绍这些类的主要接口及使用。
服务端
package com.sgcc.controller; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * AIO 服务端 */ public class AIOServer { private final int port; public static void main(String args[]) { int port = 8000; new AIOServer(port); } public AIOServer(int port) { this.port = port; listen(); } private void listen() { try { ExecutorService executorService = Executors.newCachedThreadPool(); AsynchronousChannelGroup threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1); //过调用AsynchronousServerSocketChannel的静态方法open()来创建AsynchronousServerSocketChannel实例: //AsynchronousServerSocketChannel的使用和ServerSocketChannel一样需要经过三个步骤:创建/打开通道、绑定地址和端口和监听客户端连接请求。 final AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(threadGroup); server.bind(new InetSocketAddress(port)); System.out.println("服务已启动, 监听端口" + port); server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { final ByteBuffer buffer = ByteBuffer.allocateDirect(1024); public void completed(AsynchronousSocketChannel result, Object attachment) { System.out.println("IO 操作成功, 开始获取数据"); try { buffer.clear(); result.read(buffer).get(); buffer.flip(); result.write(buffer); buffer.flip(); } catch (Exception e) { System.out.println(e.toString()); } finally { try { result.close(); server.accept(null, this); } catch (Exception e) { System.out.println(e.toString()); } } System.out.println("操作完成"); } @Override public void failed(Throwable exc, Object attachment) { System.out.println("IO 操作是失败: " + exc); } }); try { Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException ex) { System.out.println(ex); } } catch (IOException e) { System.out.println(e); } } }客户端
package com.sgcc.controller; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; /** * AIO 客户端 */ public class AIOClient { private final AsynchronousSocketChannel client; public AIOClient() throws Exception { client = AsynchronousSocketChannel.open(); } public void connect(String host, int port) throws Exception { client.connect(new InetSocketAddress(host, port), null, new CompletionHandler<Void, Void>() { @Override public void completed(Void result, Void attachment) { try { client.write(ByteBuffer.wrap("这是一条测试数据".getBytes())).get(); System.out.println("已发送至服务器"); } catch (Exception ex) { ex.printStackTrace(); } } @Override public void failed(Throwable exc, Void attachment) { exc.printStackTrace(); } }); final ByteBuffer bb = ByteBuffer.allocate(1024); client.read(bb, null, new CompletionHandler<Integer, Object>() { @Override public void completed(Integer result, Object attachment) { System.out.println("IO 操作完成" + result); System.out.println("获取反馈结果" + new String(bb.array())); } @Override public void failed(Throwable exc, Object attachment) { exc.printStackTrace(); } }); try { Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException ex) { System.out.println(ex); } } public static void main(String args[]) throws Exception { new AIOClient().connect("localhost", 8000); } }