Netty入门学习笔记(四)

tech2022-10-10  100

文章目录

七、Google Protobuf编码和解码的基本介绍Netty 本身的编码解码的机制和问题分析 ProtobufProtobuf基本介绍和使用示意图Protobuf快速入门实例Protobuf快速入门实例2 八、Netty编解码器和handler的调用机制基本说明编码解码器解码器-ByteToMessageDecoderNetty的handler链的调用机制解码器-ReplayingDecoder其它的解码编码器Log4j 整合到Netty 九、TCP 粘包和拆包及解决方案基本介绍现象实例解决方案

七、Google Protobuf

编码和解码的基本介绍

Netty 本身的编码解码的机制和问题分析

Protobuf

Protobuf基本介绍和使用示意图

https://developers.google.com/protocol-buffers/docs/proto【国内看不了,需要才科学上网能看】

Protobuf快速入门实例

IDEA下protobuf的具体安装和使用:https://blog.csdn.net/qq_36903261/article/details/108384951

在NettyClientHandler里修改channelActive方法

//当通道就绪后就会触发 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //发送一个Student对象到服务器 StudentPOJO.Student stu1 = StudentPOJO.Student.newBuilder().setId(1).setName("张三").build(); ctx.writeAndFlush(stu1); }

NettyClient NettyServer 修改NettyServerHandler里的channelRead方法

//读取数据事件 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //读取从客户端发送的StudentPojo.student StudentPOJO.Student student = (StudentPOJO.Student) msg; System.out.println("客户端发送的数据 id:" + student.getId() + " name:" + student.getName()); }

NettyServerHandler的第二种写法

public class NettyServerHandler extends SimpleChannelInboundHandler<StudentPOJO.Student> { @Override protected void channelRead0(ChannelHandlerContext ctx, StudentPOJO.Student msg) throws Exception { //读取从客户端发送的StudentPojo.student StudentPOJO.Student student = msg; System.out.println("客户端发送的数据 id:" + student.getId() + " name:" + student.getName()); } ...

Protobuf快速入门实例2

Student.proto

syntax = "proto3"; option optimize_for = SPEED; //加快解析 //option java_package = "com.angenin.netty.codec2"; //指定生成到哪个包下,这步可以省略,因为生成的文件是在target包下的,生成后还是得移动位置 option java_outer_classname = "MyDataInfo"; //外部类名 //protobuf可以使用 message管理其他的 message message MyMessage{ //定义一个枚举类型 enum DataType { StudentType = 0; //在proto3 要求enum的编号从0开始 WorkerType = 1; } //用data_type来标识是哪一个枚举类型 DataType data_type = 1; //表示每次枚举类型最多只能出现下面其中的一个,节省空间 oneof dataBody { Student student = 2; //上面1已经表明了,所以这里从2开始 Worker worker = 3; } } message Student{ int32 id = 1; string name = 2; } message Worker{ string name = 1; int32 age = 2; }

按照之前的步骤生成文件。

修改NettyClientHandler的channelActive方法

@Override //当通道就绪后就会触发 public void channelActive(ChannelHandlerContext ctx) throws Exception { //随机的发送Student或Worker对象 int random = new Random().nextInt(3); MyDataInfo.MyMessage myMessage = null; if (0 == random){ //发送Student myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType) .setStudent(MyDataInfo.Student.newBuilder().setId(1).setName("张三").build()).build(); }else{ //发送Worker myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.WorkerType) .setWorker(MyDataInfo.Worker.newBuilder().setName("李四").setAge(20).build()).build(); } ctx.writeAndFlush(myMessage); }

NettyServer NettyServerHandler

public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> { @Override protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception { //根据dataType来显示不同的信息 MyDataInfo.MyMessage.DataType dataType = msg.getDataType(); if (dataType == MyDataInfo.MyMessage.DataType.StudentType) { MyDataInfo.Student student = msg.getStudent(); System.out.println("student id: " + student.getId() + " name: " + student.getName()); }else if(dataType == MyDataInfo.MyMessage.DataType.WorkerType){ MyDataInfo.Worker worker = msg.getWorker(); System.out.println("worker name: " + worker.getName() + " age: " + worker.getAge()); }else{ System.out.println("传输的类型不正确"); } } ...

八、Netty编解码器和handler的调用机制

基本说明

服务器端:

服务器 -> 客户端:出站(out)客户端 -> 服务器:入站(in)

客户端:

服务器 -> 客户端:入站(in)客户端 -> 服务器:出站(out)

具体是入站还是出站,要看是从哪个角度出发的。

编码解码器

出站(out)需要编码,入站(in)需要解码。

解码器-ByteToMessageDecoder

Netty的handler链的调用机制

MyByteToLongDecoder

public class MyByteToLongDecoder extends ByteToMessageDecoder { /** * 入站解码 * decode 会根据接收的数据,被多次调用,直到确认没有新的元素被添加到list中,或者是ByteBuf in中没有更多的可读字节为止 * 如果list<> out 不为空,就会将list的内容传递给下一个channelinboundhandler进行处理,改处理器也会被调用多次 * @param ctx 上下文对象 * @param in 入站的 ByteBuf * @param out List集合,将解码后的数据传给下一个handler进行处理 * @throws Exception */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { //客户端发送abcdabcdabcdabcdabcd,调用了三次decode,第三次没通过if,结束调用 System.out.println("MyByteToLongDecoder 被调用"); //因为long为8个字节,如果小于8,说明不是long类型 if (in.readableBytes() >= 8){ out.add(in.readLong()); } } }

MyLongToByteEncoder

public class MyLongToByteEncoder extends MessageToByteEncoder<Long> { //出站编码 @Override protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception { System.out.println("MyLongToByteEncoder 的 encode 方法被调用..."); System.out.println("msg=" + msg); out.writeLong(msg); } }

MyServerHandler

public class MyServerHandler extends SimpleChannelInboundHandler<Long> { @Override protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception { System.out.println("从客户端" + ctx.channel().remoteAddress() + "读取到的long:" + msg); //读取到客户端发送的数据后,给客户端发送一个long ctx.writeAndFlush(98765L); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

MyServerInitializer

public class MyServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //入站的handler进行解码 MyByteToLongDecoder,解码后传给下一个handler,即MyServerHandler //入站时,按照添加的顺序从上到下依次执行handler(解码) //出站时,按照添加的顺序从下到上依次执行handler(编码) //入站和出站虽然在同一条handler链,但是由于每个handler的上下文对象中会有两个属性,一个代表入站操作,一个代表出站操作,所以并不会同时执行 //入站:inbound=true,outbound=false //出站:inbound=false,outbound=true pipeline.addLast(new MyByteToLongDecoder()); //出站,往客户端发送数据 pipeline.addLast(new MyLongToByteEncoder()); //自定义的handler,处理业务逻辑 pipeline.addLast(new MyServerHandler()); } }

MyServer

public class MyServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new MyServerInitializer()); //自定义一个初始化类 ChannelFuture channelFuture = serverBootstrap.bind(7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

MyClientHandler

public class MyClientHandler extends SimpleChannelInboundHandler<Long> { @Override protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception { System.out.println("服务器" + ctx.channel().remoteAddress() + "回复的信息:" + msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("MyClientHandler 发送数据"); //发送一个Long ctx.writeAndFlush(123456L); //发送 "abcdabcdabcdabcdabcd" //ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcdabcd", CharsetUtil.UTF_8)); //分析 //1. abcdabcdabcdabcdabcd 是 16个字节 //2. 该处理器的前一个handler 是 MyLongToByteEncoder,因为是出站,所以反而先执行MyLongToByteEncoder //3. 那么为什么发送的不是long类型就不执行 MyLongToByteEncoder呢?因为其父类是 MessageToByteEncoder //4. MessageToByteEncoder中的 方法会判断是否是有处理的类型(泛型指定的类型),如果是就编码处理,不是就直接写出,不进行编码 /* @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { if (acceptOutboundMessage(msg)) { //在这里判断是否是要处理的类型 ... encode(ctx, cast, buf); //是要处理的类型,进行编码 ... } else { ctx.write(msg, promise); //不是泛型指定的类型,直接写出,不编码 } ... } */ //5. 因此我们在编写 Encoder 时,要注意传入的数据类型和要处理的数据类型要一致 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

MyClientInitializer

public class MyClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //加入一个出站handler,对数据进行编码 //出站是按照顺序先下后上,即先执行MyClientHandler后再执行MyLongToByteEncoder pipeline.addLast(new MyLongToByteEncoder()); //入站和出站虽然在同一条handler链,但是由于每个handler的上下文对象中会有两个属性,一个代表入站操作,一个代表出站操作,所以并不会同时执行 //入站:inbound=true,outbound=false //出站:inbound=false,outbound=true //入站,先上后下(解码) pipeline.addLast(new MyByteToLongDecoder()); //加入一个自定义handler,处理业务 pipeline.addLast(new MyClientHandler()); } }

MyClient

public class MyClient { public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new MyClientInitializer()); //自定义初始化对象 ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { group.shutdownGracefully(); } } }

解码器-ReplayingDecoder

MyByteToLongDecoder2

//Void代表不需要状态管理 public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("MyByteToLongDecoder2 被调用"); //MyByteToLongDecoder // if (in.readableBytes() >= 8){ out.add(in.readLong()); } //MyByteToLongDecoder2 //继承 ReplayingDecoder 后,不需要判断数据类型是否足够读取,因为内部会进行处理判断 out.add(in.readLong()); } }

其它的解码编码器

Log4j 整合到Netty

在Maven 中添加对Log4j的依赖 在 pom.xml <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> <scope>test</scope> </dependency> 配置 Log4j , 在 resources/log4j.propertieslog4j.rootLogger=DEBUG, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%p] %C{1} - %m%n 运行项目即可。

九、TCP 粘包和拆包及解决方案

基本介绍

现象实例

在编写Netty 程序时,如果没有做处理,就会发生粘包和拆包的问题。

MyServerHandler

public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { byte[] buffer = new byte[msg.readableBytes()]; msg.readBytes(buffer); //将buffer转成字符串 String message = new String(buffer, CharsetUtil.UTF_8); System.out.println("服务器接收到数据 " + message); System.out.println("服务器接收到的消息量 " + (++this.count)); //服务器回送一个随机id给客户端 ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString() + " ", CharsetUtil.UTF_8); ctx.writeAndFlush(responseByteBuf); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

MyServerInitializer

public class MyServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new MyServerHandler()); } }

MyServer

public class MyServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new MyServerInitializer()); //自定义一个初始化类 ChannelFuture channelFuture = serverBootstrap.bind(7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

MyClientHandler

public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private int count; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //往服务器端发送10条数据 for (int i = 0; i < 10; i++) { ByteBuf byteBuf = Unpooled.copiedBuffer("hello,server " + i, CharsetUtil.UTF_8); ctx.writeAndFlush(byteBuf); } } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { byte[] buffer = new byte[msg.readableBytes()]; msg.readBytes(buffer); //将buffer转成字符串 String message = new String(buffer, CharsetUtil.UTF_8); System.out.println("客户端接收到数据 " + message); System.out.println("客户端接收到的消息量 " + (++this.count)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

MyClientInitializer

public class MyClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new MyClientHandler()); } }

MyClient

public class MyClient { public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new MyClientInitializer()); //自定义初始化对象 ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { group.shutdownGracefully(); } } }

解决方案

MessageProtocol

//协议包 public class MessageProtocol { private int len; //关键 private byte[] content; get... set...

MyMessageEncoder

//编码器 public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> { @Override protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception { System.out.println("MyMessageEncoder 被调用了"); out.writeInt(msg.getLen()); out.writeBytes(msg.getContent()); } }

MyMessageDecoder

//解码器 public class MyMessageDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("MyMessageDecoder 被调用"); //将传送过来的二进制字节码转换成 MessageProtocol 对象(数据包) int length = in.readInt(); byte[] content = new byte[length]; in.readBytes(content); //封装成 MessageProtocol 对象,放入 out,传递给下一个handler 进行业务处理 MessageProtocol messageProtocol = new MessageProtocol(); messageProtocol.setLen(length); messageProtocol.setContent(content); out.add(messageProtocol); } }

MyServerHandler

public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception { //接收到数据,进行处理 int len = msg.getLen(); byte[] content = msg.getContent(); System.out.println("服务器接收到信息:长度=" + len + ";内容=" + new String(content, CharsetUtil.UTF_8)); System.out.println("服务器接收到的信息包数量=" + (++this.count)); //回复给客户端消息 byte[] responseContent = UUID.randomUUID().toString().getBytes("utf-8"); int responseLen = responseContent.length; //构建一个协议包 MessageProtocol messageProtocol = new MessageProtocol(); messageProtocol.setLen(responseLen); messageProtocol.setContent(responseContent); ctx.writeAndFlush(messageProtocol); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("异常:" + cause.getMessage()); ctx.close(); } }

MyServerInitializer

public class MyServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //入站,由上向下,先解码 pipeline.addLast(new MyMessageDecoder()); //出站,由下向上,后编码 pipeline.addLast(new MyMessageEncoder()); pipeline.addLast(new MyServerHandler()); } }

MyClientHandler

public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> { private int count; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //往服务器端发送5条数据 for (int i = 0; i < 5; i++) { String message = "你好鸭~"; byte[] content = message.getBytes(CharsetUtil.UTF_8); int length = content.length; //创建协议包对象 MessageProtocol messageProtocol = new MessageProtocol(); messageProtocol.setLen(length); messageProtocol.setContent(content); ctx.writeAndFlush(messageProtocol); } } @Override protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception { //接收服务器回复的数据,进行处理 int len = msg.getLen(); byte[] content = msg.getContent(); System.out.println("客户端接收到信息:长度=" + len + ";内容=" + new String(content, CharsetUtil.UTF_8)); System.out.println("客户端接收到的信息包数量=" + (++this.count)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("异常:" + cause.getMessage()); ctx.close(); } }

MyClientInitializer

public class MyClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //出站,由下向上,后编码 pipeline.addLast(new MyMessageEncoder()); //入站,由上向下,先解码 pipeline.addLast(new MyMessageDecoder()); pipeline.addLast(new MyClientHandler()); } }

MyServer和MyClient代码同上,不变。

下一篇笔记:Netty入门学习笔记(五)

学习视频(p73-p91):https://www.bilibili.com/video/BV1DJ411m7NR?p=73

最新回复(0)