本文基于dubbo v2.6.x
我们在《深度解析dubbo网络传输层Transporter》介绍Transporter的时候,在其实现类中都创建了个对应的Server实现类对象返回,Server是它们的抽象接口,我们来看下Server都抽象了哪些功能
public interface Server extends Endpoint, Resetable { /** * is bound. * 是否绑定本地端口,提供服务,即是否启动成功,可连接,可接受消息 * @return bound */ boolean isBound(); /** * get channels. * 获取所有的channel ,所有客户端 * @return channels */ Collection<Channel> getChannels(); /** * get channel. * 根据地址获取channel * @param remoteAddress * @return channel */ Channel getChannel(InetSocketAddress remoteAddress); @Deprecated void reset(com.alibaba.dubbo.common.Parameters parameters); }isBound()这个方法,主要是用来检查Server端是否可用,是否能够正常使用 getChannels() 该方法获取该服务端与客户端们的连接通道。 getChannel(InetSocketAddress remoteAddress) 该方法是通过对端的addr 获取对应的连接通道。 reset() 这个reset方法已经弃用了,我们可以看到它继承Resetable 接口,Resetable接口提供了reset 重置方法。 我们看下从它父接口继承过来的功能有哪些, Endpoint 端点的意思,它抽象了一个端应该有的功能,有关闭,获取该端的addr,获取url,获取ChannelHandler的还有发送消息send的功能。 接下来我们来看下它的继承关系UML类图: 红线右侧主要是Exchange层,也就是信息交换层的实现,本文主要讲解网络传输层的实现,也就是左侧的一堆。
AbstractServer 是Server的一个抽象实现,它主要是对打开服务器前的参数处理,以及关闭服务器的处理。同时它还是一个ChannelHandler,会实现connected,disconnected 方法,我们先来看下它的主要功能,然后再来解析下它的其他功能
// 线程池 ExecutorService executor; // 服务地址 private InetSocketAddress localAddress; //绑定地址 private InetSocketAddress bindAddress; // 服务器最大可接受连接数 private int accepts; // 空闲超时时间 private int idleTimeout = 600; //600 seconds接下来看下构造方法: 首先是获取获取服务器绑定ip 与绑定port,如果anyhost 参数值是true或者你这个绑定ip 是localhost的话,绑定ip就是0.0.0.0(这个其实就是监听本机上所有ipv4的地址),接着获取accepts 参数值,该参数值表示服务器最大可接受连接数,缺省是0(0并不是最大连接是0个,而是不对连接数做限制),获取idle.timeout 空闲超时时间参数值,缺省是600*1000 也就是600s(10分钟),接着调用doOpen 方法打开服务器,该方法是个抽象方法,需要子类进行具体实现,最后就是从dataStore中获取线程池了(这个线程池是实例化某个handler 委托类的时候set进去的。)dataStore 其实可以理解为写入,取出数据的容器。 我们再来看下close 关闭方法: 先是调用ExecutorUtil 这个工具类的优雅关闭方法,用来关闭executor 这个线程池,这个线程池其实就是用来执行用户远程调用的线程池,先是调用线程池关闭方法, 一旦超过了这个timeout,就调用线程池立即关闭的方法。 最后调用close()方法。 这里close方法给线程池100ms来关闭。最后调用doClose 方法,这个doClose 是个抽象方法,需要子类具体实现。 好了,到这打开,关闭方法都要解析完成了,我们再来解析下其他方法: 我们上面说过AbstractServer 还是一个ChannelHandler,当有客户端连接过来的时候,就会调用connected(Channel ch)方法,其中ch就是那个连接的通道,首先是判断服务器状态,接着就是判断当前连接数有没有超过accepts参数,0的话不做限制,超过了就关闭这个ch,最后是调用交给父类的connected 处理。 当断开连接的时候会调用到disconnected 方法,这里就是判断连接还有多少个,如果就一个的话就会打印警告日志,最后交给父类的disconnected方法处理。
这里我们看下netty4 实现的Server , NettyServer 继承AbstractServer ,他要实现 doOpen 与doClose 方法。 我们先来看下它的成员变量
/** * 通道集合 * Channel 是dubbo的 channel */ private Map<String, Channel> channels; // <ip:port, channel> private ServerBootstrap bootstrap;// netty ServerBootstrap /** * netty 的channel */ private io.netty.channel.Channel channel; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup;构造方法:
public NettyServer(URL url, ChannelHandler handler) throws RemotingException { super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); }就是调用父类的构造,可以看到handler 在这里又被包装了一堆,关于ChannelHandler 我们后面会有讲解。 接下来看下 doOpen 打开服务器的方法: 这里就是netty的api使用,这里解释下几个参数,我们都知道netty 是 selector 模型的,bossGroup专门用来接受连接请求,然后workerGroup用来处理用户请求响应的,也就是read与write 的。bossGroup 线程数是1 这个没啥好说的,然后这个workerGroup线程数是根据iothreads参数值来创建的, 这里缺省是cpu 核心数+1 与32 比较大小,选择那个小的。 ChannelOption.TCP_NODELAY这个参数是true , 就是禁止使用nagle算法,nagle算法是tcp里面的一个参数,使用这个参数就是防止小消息的发送,所以它要等一会, 不使用nagle算法 能够快速发出。ChannelOption.SO_REUSEADDR这个参数是port的重用也是 tcp里面的一个参数,这个能实现快速启动服务器。ChannelOption.ALLOCATOR 这个是重用缓冲区
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);这行需要注意,就是编解码的适配器,在后面我们可以看到 添加一个解码器,一个编码器,一个 nettyServerHandler 最后通过bootstrap的bind方法 获得channel。到这doOpen就结束了。 我们再来看下doClose方法实现:
channel.close();先是channel的关闭,然后获取所有channel ,进行循环关闭,最后是将bossGroup 与workerGroup 关闭,清空存放channel的map。 获取所有的channel 方法getChannels : 可以看到getChannels就是从channels 这个map中获取所有的value ,然后移除那些断开连接的channel,返会那堆channel。 isBound 方法就是 调用channel的isActive方法。
@Override public boolean isBound() { return channel.isActive(); }通过addr 获取对应的channel 方法getChannel: 就是从channels这个channel缓存map中查找对应addr(ip:port)的channel。
@Override public Channel getChannel(InetSocketAddress remoteAddress) { return channels.get(NetUtils.toAddressString(remoteAddress)); }