去Google的protocol buffers简单的绕了一圈终于回来了,大致的了解了protobuf的用法,现在回来和Netty集成在一起看看到底有多香!
protobuf学习路线
Netty的客户端和服务端传递数据的demo已经写了几个了,对Netty在使用上的套路已经有了很清楚的认识:注意各种场景下的Handler使用、重写ChannelInboundHandlerAdapter中各个管道状态的回调方法(channelRegistered、channelUnregistered、channelRead0等)......这里就不在赘述客户端和服务端怎么编写的了。
服务端:
package com.leolee.netty.sixthExample; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import java.net.InetSocketAddress; /** * @ClassName TestServer * @Description: 基于protobuf的服务端 * @Author LeoLee * @Date 2020/9/2 * @Version V1.0 **/ public class TestServer { public static void main(String[] args) throws InterruptedException { //定义线程组 EventLoopGroup为死循环 //boss线程组一直在接收客户端发起的请求,但是不对请求做处理,boss会将接收到的请i交给worker线程组来处理 //实际可以用一个线程组来做客户端的请求接收和处理两件事,但是不推荐 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //启动类定义 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) //子处理器,自定义处理器,服务端可以使用childHandler或者handler,handlerr对应接收线程组(bossGroup),childHandler对应处理线程组(workerGroup) .handler(new LoggingHandler(LogLevel.INFO))//日志处理器 .childHandler(new TestServerInitializer()); //绑定监听端口 ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8899)).sync(); //定义关闭监听 channelFuture.channel().closeFuture().sync(); } finally { //Netty提供的优雅关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } package com.leolee.netty.sixthExample; import com.leolee.protobuf.DataInfo; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; /** * @ClassName TestServerInitializer * @Description: TODO * @Author LeoLee * @Date 2020/9/2 * @Version V1.0 **/ public class TestServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("protobufVarint32FrameDecoder", new ProtobufVarint32FrameDecoder()); pipeline.addLast("protobufDecoder", new ProtobufDecoder(DataInfo.Student.getDefaultInstance())); pipeline.addLast("protobufVarint32LengthFieldPrepender", new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast("protobufEncoder", new ProtobufEncoder()); pipeline.addLast(new TestServerHandler()); } } package com.leolee.netty.sixthExample; import com.leolee.protobuf.DataInfo; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * @ClassName TestServerHandler * @Description: TODO * @Author LeoLee * @Date 2020/9/2 * @Version V1.0 **/ public class TestServerHandler extends SimpleChannelInboundHandler<DataInfo.Student> { @Override protected void channelRead0(ChannelHandlerContext ctx, DataInfo.Student msg) throws Exception { System.out.println(msg.getName()); System.out.println(msg.getAge()); System.out.println(msg.getAddress()); } }主要注意的就是TestServerInitializer中如下代码
pipeline.addLast("protobufDecoder", new ProtobufDecoder(DataInfo.Student.getDefaultInstance()));以及TestServerHandler继承类SimpleChannelInboundHandler的泛型类型的变化:
public class TestServerHandler extends SimpleChannelInboundHandler<DataInfo.Student> { }这里用到我在protobuf学习(3):编译.proto文件生成Java代码,以及序列化和反序列化message中生成Java code:DataInfo类
该类提供了对Student message的序列化、反序列化等一些列的操作方法来帮助我们极其简单的构造Netty消息传递所需要的数据,同时Netty对protobuf也有很好的支持。
客户端:
package com.leolee.netty.sixthExample; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; /** * @ClassName TestClient * @Description: 基于protobuf的客户端 * @Author LeoLee * @Date 2020/9/2 * @Version V1.0 **/ public class TestClient { public static void main(String[] args) throws InterruptedException { //客户端只需要一个线程组 EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { //声明客户端启动类 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new TestClientInitializer()); ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync(); channelFuture.channel().closeFuture().sync(); } finally { //优雅关闭 eventLoopGroup.shutdownGracefully(); } } } package com.leolee.netty.sixthExample; import com.leolee.protobuf.DataInfo; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; /** * @ClassName TestClientInitializer * @Description: TODO * @Author LeoLee * @Date 2020/9/2 * @Version V1.0 **/ public class TestClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("protobufVarint32FrameDecoder", new ProtobufVarint32FrameDecoder()); pipeline.addLast("protobufDecoder", new ProtobufDecoder(DataInfo.Student.getDefaultInstance())); pipeline.addLast("protobufVarint32LengthFieldPrepender", new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast("protobufEncoder", new ProtobufEncoder()); pipeline.addLast("TestClientHandler", new TestClientHandler()); } } package com.leolee.netty.sixthExample; import com.leolee.protobuf.DataInfo; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * @ClassName TestClientHandler * @Description: TODO * @Author LeoLee * @Date 2020/9/2 * @Version V1.0 **/ public class TestClientHandler extends SimpleChannelInboundHandler<DataInfo.Student> { @Override protected void channelRead0(ChannelHandlerContext ctx, DataInfo.Student msg) throws Exception { } /** * 功能描述: <br> 连接建立变为活跃状态后,马上向服务端写入Student message * 〈〉 * @Param: [ctx] * @Return: void * @Author: LeoLee * @Date: 2020/9/2 21:11 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { DataInfo.Student student = DataInfo.Student .newBuilder() .setName("LeoLee") .setAge(25) .setAddress("上海") .build(); ctx.channel().writeAndFlush(student); } }运行结果:
九月 02, 2020 10:14:20 下午 io.netty.handler.logging.LoggingHandler channelRegistered 信息: [id: 0x034b9076] REGISTERED 九月 02, 2020 10:14:20 下午 io.netty.handler.logging.LoggingHandler bind 信息: [id: 0x034b9076] BIND: 0.0.0.0/0.0.0.0:8899 九月 02, 2020 10:14:20 下午 io.netty.handler.logging.LoggingHandler channelActive 信息: [id: 0x034b9076, L:/0:0:0:0:0:0:0:0:8899] ACTIVE 九月 02, 2020 10:14:30 下午 io.netty.handler.logging.LoggingHandler channelRead 信息: [id: 0x034b9076, L:/0:0:0:0:0:0:0:0:8899] READ: [id: 0x054993bd, L:/127.0.0.1:8899 - R:/127.0.0.1:49587] 九月 02, 2020 10:14:30 下午 io.netty.handler.logging.LoggingHandler channelReadComplete 信息: [id: 0x034b9076, L:/0:0:0:0:0:0:0:0:8899] READ COMPLETE LeoLee 25 上海上面的例子很好的演示了Netty集成protobuf的解析和序列化,但是写死了DataInfo.Student.getDefaultInstance()和SimpleChannelInboundHandler<DataInfo.Student>,这样就存在一个问题,就是客户端只能传递DataInfo.Student类型的message,实际应用场景客户端和服务端数据沟通肯定是多种多样的,Netty又没有提供类似于SpringMVC那样的请求路由功能,实际上就是客户端和服务端建立了一个可保持的通讯通道,所有的数据都要从这个通道传递,那么针对于这种实际情况应该怎么处理呢?
在 protobuf 中有一种类型的字段叫做 oneof ,官网是这么解释的:
如果你想在同一时刻在一个多字段的message中只设置其中一个值,你可以使用 oneof 特性来解决,并且节省内存。
Oneof 字段除了共享内存这一特性之外就像其他optional修饰的字段一样,而且在同一时间只能设置一个 Oneof 字段,设置任何成员字段的同时 oneof 将清除其他字段,示例如下:
message SampleMessage { oneof test_oneof { string name = 4; SubMessage sub_message = 9; } }所以我们来创建一下我们 .proto 文件。
syntax = "proto2"; package com.leolee.protobuf; option optimize_for = SPEED;//Can be set to SPEED, CODE_SIZE, or LITE_RUNTIME,This affects the C++ and Java code generators (and possibly third-party generators) in the following ways option java_package = "com.leolee.protobuf"; option java_outer_classname = "DataInfo2"; //生成java code 命令:protoc --java_out=src/main/java/ src/protobuf/Person.proto //----------------多message的根节点 message DataPackage { optional PackageType package_type = 1; oneof Package { Student sudent = 2; Dog dog = 3; } } //数据包类型 enum PackageType { STUDENT = 0; DOG = 1; } //----------------多message message Student { optional string name = 1; optional int32 age = 2; optional string address = 3; } message Dog { optional string dog_name = 1; optional int32 dog_age = 2; }使用命令生成Java code:
protoc --java_out=src/main/java/ src/protobuf/Person.proto关于 protobuf 生成代码的教程请看这里:protobuf学习(3):编译.proto文件生成Java代码,以及序列化和反序列化message
修改之前的客户端和服务端的代码
客户端:
package com.leolee.netty.sixthExample.multiProtocol; import com.leolee.protobuf.DataInfo; import com.leolee.protobuf.DataInfo2; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; /** * @ClassName TestClientInitializer * @Description: TODO * @Author LeoLee * @Date 2020/9/2 * @Version V1.0 **/ public class TestClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("protobufVarint32FrameDecoder", new ProtobufVarint32FrameDecoder()); pipeline.addLast("protobufDecoder", new ProtobufDecoder(DataInfo2.DataPackage.getDefaultInstance())); pipeline.addLast("protobufVarint32LengthFieldPrepender", new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast("protobufEncoder", new ProtobufEncoder()); pipeline.addLast("TestClientHandler", new TestClientHandler()); } } package com.leolee.netty.sixthExample.multiProtocol; import com.leolee.protobuf.DataInfo2; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @ClassName TestClientHandler * @Description: TODO * @Author LeoLee * @Date 2020/9/2 * @Version V1.0 **/ public class TestClientHandler extends SimpleChannelInboundHandler<DataInfo2.DataPackage> { private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); @Override protected void channelRead0(ChannelHandlerContext ctx, DataInfo2.DataPackage msg) throws Exception { } /** * 功能描述: <br> 连接建立变为活跃状态后,马上向服务端写入Student message * 〈〉 * @Param: [ctx] * @Return: void * @Author: LeoLee * @Date: 2020/9/2 21:11 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //一秒执行一次 executor.scheduleAtFixedRate(() -> { //随机生成 0 或者 1 int packType = new Random().nextInt(2); switch (DataInfo2.DataPackage.PackageType.forNumber(packType)) { case STUDENT: System.out.println("发送student"); DataInfo2.DataPackage dataPackage = DataInfo2.DataPackage.newBuilder() .setPackageType(DataInfo2.DataPackage.PackageType.STUDENT) .setSudent(DataInfo2.Student.newBuilder() .setName("LeoLee").setAge(25).setAddress("上海").build()).build(); ctx.channel().writeAndFlush(dataPackage); break; case DOG: System.out.println("发送dog"); DataInfo2.DataPackage dataPackage2 = DataInfo2.DataPackage.newBuilder() .setPackageType(DataInfo2.DataPackage.PackageType.DOG) .setDog(DataInfo2.Dog.newBuilder() .setDogName("恶霸犬").setDogAge(3).build()).build(); ctx.channel().writeAndFlush(dataPackage2); break; } }, 0, 1, TimeUnit.SECONDS); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("客户端出现异常已关闭"); cause.printStackTrace(); ctx.close(); } }服务端:
package com.leolee.netty.sixthExample.multiProtocol; import com.leolee.protobuf.DataInfo2; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; /** * @ClassName TestServerInitializer * @Description: TODO * @Author LeoLee * @Date 2020/9/2 * @Version V1.0 **/ public class TestServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("protobufVarint32FrameDecoder", new ProtobufVarint32FrameDecoder()); pipeline.addLast("protobufDecoder", new ProtobufDecoder(DataInfo2.DataPackage.getDefaultInstance())); pipeline.addLast("protobufVarint32LengthFieldPrepender", new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast("protobufEncoder", new ProtobufEncoder()); pipeline.addLast(new TestServerHandler()); } } package com.leolee.netty.sixthExample.multiProtocol; import com.leolee.protobuf.DataInfo2; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * @ClassName TestServerHandler * @Description: TODO * @Author LeoLee * @Date 2020/9/2 * @Version V1.0 **/ public class TestServerHandler extends SimpleChannelInboundHandler<DataInfo2.DataPackage> { @Override protected void channelRead0(ChannelHandlerContext ctx, DataInfo2.DataPackage msg) throws Exception { System.out.println("msg.getPackageType().getNumber():" + msg.getPackageType().getNumber()); //msg.PackageType.forNumber(packType) switch (msg.getPackageType().getNumber()) { case 0: System.out.println(msg.getSudent().getName()); System.out.println(msg.getSudent().getAge()); System.out.println(msg.getSudent().getAddress()); break; case 1: System.out.println(msg.getDog().getDogName()); System.out.println(msg.getDog().getDogAge()); break; } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("服务端出现异常已关闭"); cause.printStackTrace(); ctx.close(); } }执行结果就不贴出来了,自己感受一下。
需要代码的来这里拿嗷:demo项目地址
未完待续,有几个内置handler没解释?