【netty in action】学习笔记-第三章

tech2022-08-21  138

【netty in action】学习笔记-第三章

netty程序起源于一个名叫bootstrap的类,你可以把它看成一个启动程序,netty通过它隐藏了很多启动的细节。netty可以支持很多协议和处理数据的方式,这一切得益于netty里的一个核心概念handlers,也就是各种各样的handler组成的集合。

比如像我这钱做个基于nio的网络编程,网络编程里面经常遇到的一个问题就是拆包,粘包的概念。如下图所示:

正常情况下,服务端先收到packet2处理,然后收到packet1处理。这样没有问题。当时由于TCP缓冲去的存在,很多时候我们会先收到packet2和packet1的一部分,或者先收到packet2的一部分,然后再接收到剩下的部分。

netty内置了一些handler来处理这种拆包,粘包的情况,比如

FixedLengthFrameDecoderLineBasedFrameDecoderDelimiterBasedFrameDecoderLengthFieldBasedFrameDecoder

最后这个是比较常用的,这里不展开这些handler的用法,后面的章节遇到了再详细讲。

我们把上一章的示例程序放在这里方便分析。

public class EchoServer { public int port; public EchoServer(int port) { this.port = port; } public void start() { EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(group) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoServerHandler()); } }); ChannelFuture f = b.bind().sync(); System.out.println(EchoServer.class.getName() + "started and listen on " + f.channel().localAddress()); f.channel().closeFuture().sync(); }catch (Exception e) { }finally { group.shutdownGracefully(); } } public static void main(String[] args) { new EchoServer(8888).start(); } }

关于ChannelHanler

Channel是netty里的核心概念,处理Channel的组件叫ChannelHandler。

如上图所示,从大类上分就是输入和输出两个接口,我们自己定义的handler一般都是继承自这两个接口。不过大部分时候我们只需要继承ChannelInboundHandlerAdapter就可以了。这个抽象类实现了ChannelInboundHandler接口,默认的行为是把数据转发到下一个handler。对于输出类的也是类似,总之*Adapter相关的类简化了我们很多工作,还有一些其它的Adapter类比如:

ChannelHandlerAdapterChannelInboundHandlerAdapterChannelOutboundHandlerAdapterChannelDuplexHandlerAdapter

ChannelInitializer负责把channelHanlder添加到ChannelPipeline。后者是netty里用来串联起来handler的管理员。ChannelPipeline这里我认为可以理解成责任链模式,跟spring的过滤器实现机制有点类似。

当一个输入的数据或者事件过来的时候,它会沿着ChannelPipeline的头部遍历ChannelInboundHandler类型的handler进行处理直到结束。输出的处理也是类似。

这里还有一个概念叫ChannelHandlerContext,每当一个ChannelHanler被添加到ChannelPipeline的时候,netty给它分配一个ChannelHandlerContext。它做什么用呢?

@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException { //msg为接收到的客户端传递的数据 个人这边直接传的json 数据 ByteBuf readMessage= (ByteBuf) msg; //获取客户端的请求地址 取到的值为客户端的 ip+端口号 String url=ctx.channel().remoteAddress().toString(); System.out.println("获取客户端请求地址:" + url); ...

在这个示例中,接收到数据后,我们通过ChannelHandlerContext获取到客户端的请求地址。

关于ChannelFutrue

在上面的代码里也看到ChannelFuture的概念。我个人的理解它是对java并发包Future的封装,扩展了后者的功能。Future是个通用的表示异步操作的结果。但是ChannelFuture只和异步I/O操作结果有关,它只关心channel。

我们知道java里的Future获取结果是用get方法,这个方法是阻塞的直到操作完成。而ChannelFuture可以让我们添加一个GenericFutureListener以便在操作完成时通知我们。(netty真的是处处都异步啊)

public interface ChannelFuture extends Future<Void> { Channel channel(); ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> var1); ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... var1); ...

EventLoopGroup和EventLoop

上面这张图有几个知识点:

EventLoopGroup是EventLoop的集合注册一个Channel后,Netty将这个Channel绑定到一个EventLoopI/O处理也是同一个EventLoop

EventLoop可以认为是只有一个线程的线程池。

EventLoop和EventLoopGroup的关系可以用下面这个图:

上面这个图可能有点奇怪,因为我们前面说过EventLoopGroup包含一个或者多个EventLoop,但是这个图上二者的关系又像是继承的关系。这个其实并不矛盾,一个是逻辑上的包含关系,一个是类定义上的is-a关系。

Bootstrap前面的章节也简单说过,包括服务端的ServerBootstrap和客户端的Bootstrap。既然前面说到了EventLoopGroup,二者有一个区别就是,客户端的Bootstrap只需要一个EventLoopGroup,而服务端需要两个。

正如上图所示,服务端的两个EventLoopGroup一个负责处理连接请求,一个负责处理I/O逻辑。这样做的好处当然是提高吞吐量。当然netty支持我们只配置一个EventLoopGroup,很多情况下这种模式也会运行的很好。

编码器和解码器

编码器和解码器其实都是前面讲的ChannelHandler的具体实现。首先要搞清楚一个问题就是为啥需要编码器和解码器。netty底层进行网络传输的数据都是byte,但是我们应用中传输的数据都是由业务含义的数据,可能一句话,也可能是几个数字。把byte数据转换为业务含义的数据的过程叫解码,反之是编码。

netty内置了一些常用的编解码器,比如ByteToMessageDecoder和MessageToByteEncoder,听名字你就知道含义了。下面是一个示例:

编码器,

public class IntegerToByteEncoder extends MessageToByteEncoder<Integer> { @Override protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception { System.out.println("IntegerToByteEncoder encode msg is " + msg); out.writeInt(msg); } }

解码器,

public class ByteToIntegerDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // Check if there are at least 4 bytes readable if (in.readableBytes() >= 4) { int n = in.readInt(); System.out.println("ByteToIntegerDecoder decode msg is " + n); // Read integer from inbound ByteBuf // add to the List of decodec messages out.add(n); } } }
最新回复(0)