https://developers.google.com/protocol-buffers/docs/proto【国内看不了,需要才科学上网能看】
IDEA下protobuf的具体安装和使用:https://blog.csdn.net/qq_36903261/article/details/108384951
在NettyClientHandler里修改channelActive方法
//当通道就绪后就会触发 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //发送一个Student对象到服务器 StudentPOJO.Student stu1 = StudentPOJO.Student.newBuilder().setId(1).setName("张三").build(); ctx.writeAndFlush(stu1); }NettyClient NettyServer 修改NettyServerHandler里的channelRead方法
//读取数据事件 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //读取从客户端发送的StudentPojo.student StudentPOJO.Student student = (StudentPOJO.Student) msg; System.out.println("客户端发送的数据 id:" + student.getId() + " name:" + student.getName()); }NettyServerHandler的第二种写法
public class NettyServerHandler extends SimpleChannelInboundHandler<StudentPOJO.Student> { @Override protected void channelRead0(ChannelHandlerContext ctx, StudentPOJO.Student msg) throws Exception { //读取从客户端发送的StudentPojo.student StudentPOJO.Student student = msg; System.out.println("客户端发送的数据 id:" + student.getId() + " name:" + student.getName()); } ...Student.proto
syntax = "proto3"; option optimize_for = SPEED; //加快解析 //option java_package = "com.angenin.netty.codec2"; //指定生成到哪个包下,这步可以省略,因为生成的文件是在target包下的,生成后还是得移动位置 option java_outer_classname = "MyDataInfo"; //外部类名 //protobuf可以使用 message管理其他的 message message MyMessage{ //定义一个枚举类型 enum DataType { StudentType = 0; //在proto3 要求enum的编号从0开始 WorkerType = 1; } //用data_type来标识是哪一个枚举类型 DataType data_type = 1; //表示每次枚举类型最多只能出现下面其中的一个,节省空间 oneof dataBody { Student student = 2; //上面1已经表明了,所以这里从2开始 Worker worker = 3; } } message Student{ int32 id = 1; string name = 2; } message Worker{ string name = 1; int32 age = 2; }按照之前的步骤生成文件。
修改NettyClientHandler的channelActive方法
@Override //当通道就绪后就会触发 public void channelActive(ChannelHandlerContext ctx) throws Exception { //随机的发送Student或Worker对象 int random = new Random().nextInt(3); MyDataInfo.MyMessage myMessage = null; if (0 == random){ //发送Student myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType) .setStudent(MyDataInfo.Student.newBuilder().setId(1).setName("张三").build()).build(); }else{ //发送Worker myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.WorkerType) .setWorker(MyDataInfo.Worker.newBuilder().setName("李四").setAge(20).build()).build(); } ctx.writeAndFlush(myMessage); }NettyServer NettyServerHandler
public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> { @Override protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception { //根据dataType来显示不同的信息 MyDataInfo.MyMessage.DataType dataType = msg.getDataType(); if (dataType == MyDataInfo.MyMessage.DataType.StudentType) { MyDataInfo.Student student = msg.getStudent(); System.out.println("student id: " + student.getId() + " name: " + student.getName()); }else if(dataType == MyDataInfo.MyMessage.DataType.WorkerType){ MyDataInfo.Worker worker = msg.getWorker(); System.out.println("worker name: " + worker.getName() + " age: " + worker.getAge()); }else{ System.out.println("传输的类型不正确"); } } ...服务器端:
服务器 -> 客户端:出站(out)客户端 -> 服务器:入站(in)客户端:
服务器 -> 客户端:入站(in)客户端 -> 服务器:出站(out)具体是入站还是出站,要看是从哪个角度出发的。
出站(out)需要编码,入站(in)需要解码。
MyByteToLongDecoder
public class MyByteToLongDecoder extends ByteToMessageDecoder { /** * 入站解码 * decode 会根据接收的数据,被多次调用,直到确认没有新的元素被添加到list中,或者是ByteBuf in中没有更多的可读字节为止 * 如果list<> out 不为空,就会将list的内容传递给下一个channelinboundhandler进行处理,改处理器也会被调用多次 * @param ctx 上下文对象 * @param in 入站的 ByteBuf * @param out List集合,将解码后的数据传给下一个handler进行处理 * @throws Exception */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { //客户端发送abcdabcdabcdabcdabcd,调用了三次decode,第三次没通过if,结束调用 System.out.println("MyByteToLongDecoder 被调用"); //因为long为8个字节,如果小于8,说明不是long类型 if (in.readableBytes() >= 8){ out.add(in.readLong()); } } }MyLongToByteEncoder
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> { //出站编码 @Override protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception { System.out.println("MyLongToByteEncoder 的 encode 方法被调用..."); System.out.println("msg=" + msg); out.writeLong(msg); } }MyServerHandler
public class MyServerHandler extends SimpleChannelInboundHandler<Long> { @Override protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception { System.out.println("从客户端" + ctx.channel().remoteAddress() + "读取到的long:" + msg); //读取到客户端发送的数据后,给客户端发送一个long ctx.writeAndFlush(98765L); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }MyServerInitializer
public class MyServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //入站的handler进行解码 MyByteToLongDecoder,解码后传给下一个handler,即MyServerHandler //入站时,按照添加的顺序从上到下依次执行handler(解码) //出站时,按照添加的顺序从下到上依次执行handler(编码) //入站和出站虽然在同一条handler链,但是由于每个handler的上下文对象中会有两个属性,一个代表入站操作,一个代表出站操作,所以并不会同时执行 //入站:inbound=true,outbound=false //出站:inbound=false,outbound=true pipeline.addLast(new MyByteToLongDecoder()); //出站,往客户端发送数据 pipeline.addLast(new MyLongToByteEncoder()); //自定义的handler,处理业务逻辑 pipeline.addLast(new MyServerHandler()); } }MyServer
public class MyServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new MyServerInitializer()); //自定义一个初始化类 ChannelFuture channelFuture = serverBootstrap.bind(7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }MyClientHandler
public class MyClientHandler extends SimpleChannelInboundHandler<Long> { @Override protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception { System.out.println("服务器" + ctx.channel().remoteAddress() + "回复的信息:" + msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("MyClientHandler 发送数据"); //发送一个Long ctx.writeAndFlush(123456L); //发送 "abcdabcdabcdabcdabcd" //ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcdabcd", CharsetUtil.UTF_8)); //分析 //1. abcdabcdabcdabcdabcd 是 16个字节 //2. 该处理器的前一个handler 是 MyLongToByteEncoder,因为是出站,所以反而先执行MyLongToByteEncoder //3. 那么为什么发送的不是long类型就不执行 MyLongToByteEncoder呢?因为其父类是 MessageToByteEncoder //4. MessageToByteEncoder中的 方法会判断是否是有处理的类型(泛型指定的类型),如果是就编码处理,不是就直接写出,不进行编码 /* @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { if (acceptOutboundMessage(msg)) { //在这里判断是否是要处理的类型 ... encode(ctx, cast, buf); //是要处理的类型,进行编码 ... } else { ctx.write(msg, promise); //不是泛型指定的类型,直接写出,不编码 } ... } */ //5. 因此我们在编写 Encoder 时,要注意传入的数据类型和要处理的数据类型要一致 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }MyClientInitializer
public class MyClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //加入一个出站handler,对数据进行编码 //出站是按照顺序先下后上,即先执行MyClientHandler后再执行MyLongToByteEncoder pipeline.addLast(new MyLongToByteEncoder()); //入站和出站虽然在同一条handler链,但是由于每个handler的上下文对象中会有两个属性,一个代表入站操作,一个代表出站操作,所以并不会同时执行 //入站:inbound=true,outbound=false //出站:inbound=false,outbound=true //入站,先上后下(解码) pipeline.addLast(new MyByteToLongDecoder()); //加入一个自定义handler,处理业务 pipeline.addLast(new MyClientHandler()); } }MyClient
public class MyClient { public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new MyClientInitializer()); //自定义初始化对象 ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { group.shutdownGracefully(); } } }
MyByteToLongDecoder2
//Void代表不需要状态管理 public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("MyByteToLongDecoder2 被调用"); //MyByteToLongDecoder // if (in.readableBytes() >= 8){ out.add(in.readLong()); } //MyByteToLongDecoder2 //继承 ReplayingDecoder 后,不需要判断数据类型是否足够读取,因为内部会进行处理判断 out.add(in.readLong()); } }
在编写Netty 程序时,如果没有做处理,就会发生粘包和拆包的问题。
MyServerHandler
public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { byte[] buffer = new byte[msg.readableBytes()]; msg.readBytes(buffer); //将buffer转成字符串 String message = new String(buffer, CharsetUtil.UTF_8); System.out.println("服务器接收到数据 " + message); System.out.println("服务器接收到的消息量 " + (++this.count)); //服务器回送一个随机id给客户端 ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString() + " ", CharsetUtil.UTF_8); ctx.writeAndFlush(responseByteBuf); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }MyServerInitializer
public class MyServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new MyServerHandler()); } }MyServer
public class MyServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new MyServerInitializer()); //自定义一个初始化类 ChannelFuture channelFuture = serverBootstrap.bind(7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }MyClientHandler
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private int count; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //往服务器端发送10条数据 for (int i = 0; i < 10; i++) { ByteBuf byteBuf = Unpooled.copiedBuffer("hello,server " + i, CharsetUtil.UTF_8); ctx.writeAndFlush(byteBuf); } } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { byte[] buffer = new byte[msg.readableBytes()]; msg.readBytes(buffer); //将buffer转成字符串 String message = new String(buffer, CharsetUtil.UTF_8); System.out.println("客户端接收到数据 " + message); System.out.println("客户端接收到的消息量 " + (++this.count)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }MyClientInitializer
public class MyClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new MyClientHandler()); } }MyClient
public class MyClient { public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new MyClientInitializer()); //自定义初始化对象 ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { group.shutdownGracefully(); } } }
MessageProtocol
//协议包 public class MessageProtocol { private int len; //关键 private byte[] content; get... set...MyMessageEncoder
//编码器 public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> { @Override protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception { System.out.println("MyMessageEncoder 被调用了"); out.writeInt(msg.getLen()); out.writeBytes(msg.getContent()); } }MyMessageDecoder
//解码器 public class MyMessageDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("MyMessageDecoder 被调用"); //将传送过来的二进制字节码转换成 MessageProtocol 对象(数据包) int length = in.readInt(); byte[] content = new byte[length]; in.readBytes(content); //封装成 MessageProtocol 对象,放入 out,传递给下一个handler 进行业务处理 MessageProtocol messageProtocol = new MessageProtocol(); messageProtocol.setLen(length); messageProtocol.setContent(content); out.add(messageProtocol); } }MyServerHandler
public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception { //接收到数据,进行处理 int len = msg.getLen(); byte[] content = msg.getContent(); System.out.println("服务器接收到信息:长度=" + len + ";内容=" + new String(content, CharsetUtil.UTF_8)); System.out.println("服务器接收到的信息包数量=" + (++this.count)); //回复给客户端消息 byte[] responseContent = UUID.randomUUID().toString().getBytes("utf-8"); int responseLen = responseContent.length; //构建一个协议包 MessageProtocol messageProtocol = new MessageProtocol(); messageProtocol.setLen(responseLen); messageProtocol.setContent(responseContent); ctx.writeAndFlush(messageProtocol); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("异常:" + cause.getMessage()); ctx.close(); } }MyServerInitializer
public class MyServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //入站,由上向下,先解码 pipeline.addLast(new MyMessageDecoder()); //出站,由下向上,后编码 pipeline.addLast(new MyMessageEncoder()); pipeline.addLast(new MyServerHandler()); } }MyClientHandler
public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> { private int count; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //往服务器端发送5条数据 for (int i = 0; i < 5; i++) { String message = "你好鸭~"; byte[] content = message.getBytes(CharsetUtil.UTF_8); int length = content.length; //创建协议包对象 MessageProtocol messageProtocol = new MessageProtocol(); messageProtocol.setLen(length); messageProtocol.setContent(content); ctx.writeAndFlush(messageProtocol); } } @Override protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception { //接收服务器回复的数据,进行处理 int len = msg.getLen(); byte[] content = msg.getContent(); System.out.println("客户端接收到信息:长度=" + len + ";内容=" + new String(content, CharsetUtil.UTF_8)); System.out.println("客户端接收到的信息包数量=" + (++this.count)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("异常:" + cause.getMessage()); ctx.close(); } }MyClientInitializer
public class MyClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //出站,由下向上,后编码 pipeline.addLast(new MyMessageEncoder()); //入站,由上向下,先解码 pipeline.addLast(new MyMessageDecoder()); pipeline.addLast(new MyClientHandler()); } }MyServer和MyClient代码同上,不变。
下一篇笔记:Netty入门学习笔记(五)
学习视频(p73-p91):https://www.bilibili.com/video/BV1DJ411m7NR?p=73