NIO实现服务器和客户端简单通讯与群聊系统

tech2022-09-28  75

参考 https://www.bilibili.com/video/BV1DJ411m7NR

相关API汇总

public abstract class Buffer { //JDK1.4时,引入的api public final int capacity( )//返回此缓冲区的容量 public final int position( )//返回此缓冲区的位置 public final Buffer position (int newPositio)//设置此缓冲区的位置 public final int limit( )//返回此缓冲区的限制 public final Buffer limit (int newLimit)//设置此缓冲区的限制 public final Buffer mark( )//在此缓冲区的位置设置标记 public final Buffer reset( )//将此缓冲区的位置重置为以前标记的位置 public final Buffer clear( )//清除此缓冲区, 即将各个标记恢复到初始状态,但是数据并 public final Buffer flip( )//反转此缓冲区 public final Buffer rewind( )//重绕此缓冲区 public final int remaining( )//返回当前位置与限制之间的元素数 public final boolean hasRemaining( )//告知在当前位置和限制之间是否有元素 public abstract boolean isReadOnly( );//告知此缓冲区是否为只读缓冲区 //JDK1.6时引入的api public abstract boolean hasArray();//告知此缓冲区是否具有可访问的底层实现数组 public abstract Object array();//返回此缓冲区的底层实现数组 public abstract int arrayOffset();//返回此缓冲区的底层实现数组中第一个缓冲区元素的 public abstract boolean isDirect();//告知此缓冲区是否为直接缓冲区 } public abstract class ByteBuffer { //缓冲区创建相关api public static ByteBuffer allocateDirect(int capacity)//创建直接缓冲区 public static ByteBuffer allocate(int capacity)//设置缓冲区的初始容量 public static ByteBuffer wrap(byte[] array)//把一个数组放到缓冲区中使用 //构造初始化位置offset和上界length的缓冲区 public static ByteBuffer wrap(byte[] array,int offset, int length) //缓存区存取相关API public abstract byte get( );//从当前位置position上get,get之后,position会自动+1 public abstract byte get (int index);//从绝对位置get public abstract ByteBuffer put (byte b);//从当前位置上添加,put之后,position会自动+1 public abstract ByteBuffer put (int index, byte b);//从绝对位置上put } FileChannel //主要用来对本地文件进行 IO 操作,常见的方法有 1) public int read(ByteBuffer dst) //从通道读取数据并放到缓冲区中 2) public int write(ByteBuffer src) //把缓冲区的数据写到通道中 3) public long transferFrom(ReadableByteChannel src, long position, long count)//从目标通道中复制数据到当前通道 4) public long transferTo(long position, long count, WritableByteChannel target)//把数据从当前通道复制给目标通道 //Selector 类是一个抽象类, 常用方法和说明如下: public abstract class Selector implements Closeable { public static Selector open();//得到一个选择器对象 //监控所有注册的通道,当其中有IO 操作可以进行时,将对应的SelectionKey 加入到内部集合中并返回,参数用来设置超时时间 public int select(long timeout); selector.select()//阻塞 selector.select(1000);//阻塞1000毫秒,在1000毫秒后返回 selector.selectNow();//不阻塞,立马返还 selector.wakeup();//唤醒selector public Set<SelectionKey> selectedKeys();//从内部集合中得到所有的SelectionKey } /** * 1) SelectionKey,表示 Selector 和网络通道的注册关系, 共四种: * int OP_ACCEPT:有新的网络连接可以 accept,值为 16 * int OP_CONNECT:代表连接已经建立,值为 8 * int OP_READ:代表读操作,值为 1 * int OP_WRITE:代表写操作,值为 4 * * 源码中: * public static final int OP_READ = 1 << 0; * public static final int OP_WRITE = 1 << 2; * public static final int OP_CONNECT = 1 << 3; * public static final int OP_ACCEPT = 1 << 4; */ public abstract class SelectionKey { public abstract Selector selector();//得到与之关联的Selector 对象 public abstract SelectableChannel channel();//得到与之关联的通道 public final Object attachment();//得到与之关联的共享数据 public abstract SelectionKey interestOps(int ops);//设置或改变监听事件 public final boolean isAcceptable();//是否可以 accept public final boolean isReadable();//是否可以读 public final boolean isWritable(); // 是否可以写 } //在服务器端监听新的客户端Socket 连接 public abstract class ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel{ public static ServerSocketChannel open()//得到一个 ServerSocketChannel 通道 public final ServerSocketChannel bind(SocketAddress local)//设置服务器端端口号 public final SelectableChannel configureBlocking(boolean block)//设置阻塞或非阻塞模式,取值false 表示采用非阻塞模式 public SocketChannel accept()//接受一个连接,返回代表这个连接的通道对象 public final SelectionKey register(Selector sel, int ops)//注册一个选择器并设置监听事件 } //网络 IO 通道,具体负责进行读写操作。NIO 把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区。 public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel{ public static SocketChannel open();//得到一个 SocketChannel 通道 public final SelectableChannel configureBlocking(boolean block);//设置阻塞或非阻塞模式,取值false 表示采用非阻塞模式 public boolean connect(SocketAddress remote);//连接服务器 public boolean finishConnect();//如果上面的方法连接失败,接下来就要通过该方法完成连接操作 public int write(ByteBuffer src);//往通道里写数据 public int read(ByteBuffer dst);//从通道里读数据 public final SelectionKey register(Selector sel, int ops, Object att);//注册一个选择器并设置监听事件,最后一个参数可以设置共享数据 public final void close();//关闭通道 }

server与client实现

原理概述:

当客户端连接时,会通过ServerSocketChannel 得到SocketChannelSelector 进行监听 select方法,返回有事件发生的通道的个数.将socketChannel注册到Selector上,register(Selector sel,int ops),一个selector上可以注册多个SocketChannel注册后返回一个Selectionkey,会和该Selector关联(集合)进一步得到各个Selectionkey(有事件发生)在通过SelectionKey反向获取SocketChannel,方法 channel()可以通过得到的channel,完成业务处理

NIOServer

public class NIOServer { public static void main(String[] args) throws Exception{ //创建ServerSocketChannel -> ServerSocket ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //得到一个Selecor对象 Selector selector = Selector.open(); //绑定一个端口6666, 在服务器端监听 serverSocketChannel.socket().bind(new InetSocketAddress(6666)); //设置为非阻塞 serverSocketChannel.configureBlocking(false); //把 serverSocketChannel 注册到 selector 关心 事件为 OP_ACCEPT serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("注册后的selectionkey 数量=" + selector.keys().size()); // 1 //循环等待客户端连接 while (true) { //监控所有注册的通道,当其中有IO 操作可以进行时,将 //对应的SelectionKey 加入到内部集合中并返回,参数用来设置超时时间 //这里我们等待1秒,如果没有事件发生, 返回 if(selector.select(1000) == 0) { //没有事件发生 System.out.println("服务器等待了1秒,无连接"); continue; } //如果返回的>0, 就获取到相关的 selectionKey集合 //1.如果返回的>0, 表示已经获取到关注的事件 //2. selector.selectedKeys() 返回关注事件的集合 // 通过 selectionKeys 反向获取通道 Set<SelectionKey> selectionKeys = selector.selectedKeys(); System.out.println("selectionKeys 数量 = " + selectionKeys.size()); //遍历 Set<SelectionKey>, 使用迭代器遍历 Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); while (keyIterator.hasNext()) { //获取到SelectionKey SelectionKey key = keyIterator.next(); //根据key 对应的通道发生的事件做相应处理 if(key.isAcceptable()) { //如果是 OP_ACCEPT, 有新的客户端连接 //该该客户端生成一个 SocketChannel SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("客户端连接成功 生成了一个 socketChannel " + socketChannel.hashCode()); //将 SocketChannel 设置为非阻塞 socketChannel.configureBlocking(false); //将socketChannel 注册到selector, 关注事件为 OP_READ, 同时给socketChannel关联一个Buffer socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024)); System.out.println("客户端连接后 ,注册的selectionkey 数量=" + selector.keys().size()); //2,3,4.. } if(key.isReadable()) { //发生 OP_READ //通过key 反向获取到对应channel SocketChannel channel = (SocketChannel)key.channel(); //获取到该channel关联的buffer ByteBuffer buffer = (ByteBuffer)key.attachment(); channel.read(buffer); System.out.println("form 客户端 " + new String(buffer.array())); } //手动从集合中移动当前的selectionKey, 防止重复操作 keyIterator.remove(); } } } }

NIOClient

public class NIOClient { public static void main(String[] args) throws Exception{ //得到一个网络通道 SocketChannel socketChannel = SocketChannel.open(); //设置非阻塞 socketChannel.configureBlocking(false); //提供服务器端的ip 和 端口 InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666); //连接服务器 if (!socketChannel.connect(inetSocketAddress)) { while (!socketChannel.finishConnect()) { System.out.println("因为连接需要时间,客户端不会阻塞,可以做其它工作.."); } } //...如果连接成功,就发送数据 String str = "hello"; //Wraps a byte array into a buffer ByteBuffer buffer = ByteBuffer.wrap(str.getBytes()); //发送数据,将 buffer 数据写入 channel socketChannel.write(buffer); System.in.read(); } }

群聊系统实现

服务端: 服务端类首先会进行初始化 ,然后会进行监听,若是连接事件注册到seletor即可,并输出上线的客户端信息,若是read事件将消息打印并要转发给其它客户端。

public class GroupChatServer { //定义属性 private Selector selector; private ServerSocketChannel listenChannel; private static final int PORT = 6667; //构造器 //初始化工作 public GroupChatServer() { try { //得到选择器 selector = Selector.open(); //ServerSocketChannel listenChannel = ServerSocketChannel.open(); //绑定端口 listenChannel.socket().bind(new InetSocketAddress(PORT)); //设置非阻塞模式 listenChannel.configureBlocking(false); //将该listenChannel 注册到selector listenChannel.register(selector, SelectionKey.OP_ACCEPT); }catch (IOException e) { e.printStackTrace(); } } //监听 public void listen() { System.out.println("监听线程: " + Thread.currentThread().getName()); try { //循环处理 while (true) { int count = selector.select(); if(count > 0) {//有事件处理 //遍历得到selectionKey 集合 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { //取出selectionkey SelectionKey key = iterator.next(); //监听到accept if(key.isAcceptable()) { SocketChannel sc = listenChannel.accept(); sc.configureBlocking(false); //将该 sc 注册到seletor sc.register(selector, SelectionKey.OP_READ); //提示 System.out.println(sc.getRemoteAddress() + " 上线 "); } if(key.isReadable()) { //通道发送read事件,即通道是可读的状态 //处理读 (专门写方法..) readData(key); } //当前的key 删除,防止重复处理 iterator.remove(); } } else { System.out.println("等待...."); } } }catch (Exception e) { e.printStackTrace(); }finally { //发生异常处理.... } } //读取客户端消息 private void readData(SelectionKey key) { //取到关联的channle SocketChannel channel = null; try { //得到channel channel = (SocketChannel) key.channel(); //创建buffer ByteBuffer buffer = ByteBuffer.allocate(1024); int count = channel.read(buffer); //根据count的值做处理 if(count > 0) { //把缓存区的数据转成字符串 String msg = new String(buffer.array()); //输出该消息 System.out.println("form 客户端: " + msg); //向其它的客户端转发消息(去掉自己), 专门写一个方法来处理 sendInfoToOtherClients(msg, channel); } }catch (IOException e) { try { System.out.println(channel.getRemoteAddress() + " 离线了.."); //取消注册 key.cancel(); //关闭通道 channel.close(); }catch (IOException e2) { e2.printStackTrace();; } } } //转发消息给其它客户(通道) private void sendInfoToOtherClients(String msg, SocketChannel self ) throws IOException{ System.out.println("服务器转发消息中..."); System.out.println("服务器转发数据给客户端线程: " + Thread.currentThread().getName()); //遍历 所有注册到selector 上的 SocketChannel,并排除 self for(SelectionKey key: selector.keys()) { //通过 key 取出对应的 SocketChannel Channel targetChannel = key.channel(); //排除自己 if(targetChannel instanceof SocketChannel && targetChannel != self) { //转型 SocketChannel dest = (SocketChannel)targetChannel; //将msg 存储到buffer ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); //将buffer 的数据写入 通道 dest.write(buffer); } } } public static void main(String[] args) { //创建服务器对象 GroupChatServer groupChatServer = new GroupChatServer(); groupChatServer.listen(); } }

客户端

public class GroupChatClient { //定义相关的属性 private final String HOST = "127.0.0.1"; // 服务器的ip private final int PORT = 6667; //服务器端口 private Selector selector; private SocketChannel socketChannel; private String username; //构造器, 完成初始化工作 public GroupChatClient() throws IOException { selector = Selector.open(); //连接服务器 socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1", PORT)); //设置非阻塞 socketChannel.configureBlocking(false); //将channel 注册到selector socketChannel.register(selector, SelectionKey.OP_READ); //得到username username = socketChannel.getLocalAddress().toString().substring(1); System.out.println(username + " is ok..."); } //向服务器发送消息 public void sendInfo(String info) { info = username + " 说:" + info; try { socketChannel.write(ByteBuffer.wrap(info.getBytes())); }catch (IOException e) { e.printStackTrace(); } } //读取从服务器端回复的消息 public void readInfo() { try { int readChannels = selector.select(); if(readChannels > 0) {//有可以用的通道 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if(key.isReadable()) { //得到相关的通道 SocketChannel sc = (SocketChannel) key.channel(); //得到一个Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //读取 sc.read(buffer); //把读到的缓冲区的数据转成字符串 String msg = new String(buffer.array()); System.out.println(msg.trim()); } } iterator.remove(); //删除当前的selectionKey, 防止重复操作 } else { //System.out.println("没有可以用的通道..."); } }catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { //启动我们客户端 GroupChatClient chatClient = new GroupChatClient(); //启动一个线程, 每个3秒,读取从服务器发送数据 new Thread() { public void run() { while (true) { chatClient.readInfo(); try { Thread.currentThread().sleep(3000); }catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); //发送数据给服务器端 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String s = scanner.nextLine(); chatClient.sendInfo(s); } } }

可启动一个服务端 两个客户端进行实验 GroupChatClient输入消息 查看服务端打印结果及是否转发 查看GroupChatClient(1)是否接收到 GroupChatClient发出的信息

最新回复(0)