我们都知道RocketMQ是高性能的消息中间件,其高性能不仅体现在其优秀的消息吞吐量上,也体现在其基于Netty实现的高性能通信能力上。接下来将通过几篇文章来阐述RocketMQ的通信模块。通过RocketMQ对于通信模块的设计分析,我们在日后需要设计中间件关于通信模块时,其实也可以参考以及借鉴已经成熟的中间件的设计,同时结合自身业务进行改进。
通信架构说明以NameServer启动为例消息编解码总结RocketMQ的网络通信模块主要实现在remoting模块中,从模块中我们可以得知RocketMQ使用netty进行底层的通信实现,同时在protocol中自定义了通信协议。
最主要的类关系如下所示:
(1)RemotingService接口
RemotingService 作为顶层接口定义了三个主要的方法,主要包括启动netty服务、关闭netty服务以及注册RPC钩子处理请求前后的逻辑。
public interface RemotingService { //开启服务 void start(); //停止服务 void shutdown(); //注册RPC钩子 void registerRPCHook(RPCHook rpcHook); }(2)RPCHook 接口
其中RPCHook 接口定义了请求前后进行的逻辑处理,
public interface RPCHook { void doBeforeRequest(final String remoteAddr, final RemotingCommand request); void doAfterResponse(final String remoteAddr, final RemotingCommand request, final RemotingCommand response); }(3)服务端与客户端接口
RemotingServer 和 RemotingClient 接口分别继承了RemotingService 接口,并进行了自己的业务扩展。 RemotingServer 接口
public interface RemotingServer extends RemotingService { //注册处理请求的处理器, 根据requestCode, 获取处理器,处理请求 void registerProcessor(final int requestCode, final NettyRequestProcessor processor, final ExecutorService executor); void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor); int localListenPort(); Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode); RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException; void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; //单向发送消息,只管发送消息,不管消息发送的结果 void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; }(1)通信协议设计 (图片来自于网络)
(2)编码 remoting模块对于消息进行了自定义协议,将发送的消息以及收到的消息封装为RemotingCommand对象。
public ByteBuffer encode() { // 1> header length size int length = 4; // 2> header data length byte[] headerData = this.headerEncode(); length += headerData.length; // 3> body data length if (this.body != null) { length += body.length; } ByteBuffer result = ByteBuffer.allocate(4 + length); // length result.putInt(length); // header length result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data result.put(headerData); // body data; if (this.body != null) { result.put(this.body); } result.flip(); return result; }(3)解码
public static RemotingCommand decode(final ByteBuffer byteBuffer) { // 获取byteBuffer的总长度 int length = byteBuffer.limit(); int oriHeaderLen = byteBuffer.getInt(); int headerLength = getHeaderLength(oriHeaderLen); // 保存header data byte[] headerData = new byte[headerLength]; byteBuffer.get(headerData); RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength > 0) { bodyData = new byte[bodyLength]; // 获取消息体的数据 byteBuffer.get(bodyData); } cmd.body = bodyData; return cmd; }在了解remoting模块的核心接口之后,我们接下来看下具体的实现过程。其实在如NameServer启动过程中,它本身就会作为一个Netty的服务端进行启动。我们这里先忽略掉NameServer启动过程中的其他的配置操作,着重对Netty作为服务端启动的流程。大致的启动流程如下所示: NameServer实际作为Netty服务端启动底层网络连接的,我们都知道它的作用是作为服务端提供给Broker进行注册以及客户端向其拉取路由信息。 NameServer启动过程中实际是创建了NettyRemotingServer,而NettyRemotingServer是RocketMQ自己开发的网络连接组件,当然它的底层实际是基于Netty的接口实现的ServerBootstrap。下列是start的方法,同样我们只关注Netty服务器的启动。
public static NamesrvController start(final NamesrvController controller) throws Exception { if (null == controller) { throw new IllegalArgumentException("NamesrvController is null"); } //初始化 boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } //通过Runtime类注册了一个JVM关闭时的shutdown的钩子 Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call() throws Exception { controller.shutdown(); return null; } })); controller.start(); return controller; }其中初始化的方法如下所示:
public boolean initialize() { //加载配置 this.kvConfigManager.load(); //构建Netty服务器 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); //Netty的分作线程池 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); //将工作线程池分配给Netty服务器 this.registerProcessor(); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context"); certChanged = keyChanged = false; reloadServerSslContext(); } } private void reloadServerSslContext() { ((NettyRemotingServer) remotingServer).loadSslContext(); } }); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } } return true; }初始化完成之后进行启动,我们可以看到实际启动的是NettyRemotingServer。
public void start() throws Exception { this.remotingServer.start(); if (this.fileWatchService != null) { this.fileWatchService.start(); } }NettyRemotingServer启动过程如下代码所示:
@Override public void start() { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } }); //配置启动Netty服务器 ServerBootstrap childHandler = //各种网络配置 this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) //设置网络请求处理器,当Netty服务器收到网络请求后,就会有这些Handler进行处理 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler(TlsSystemConfig.tlsMode)) .addLast(defaultEventExecutorGroup, //编解码 new NettyEncoder(), new NettyDecoder(), //空闲连接管理 new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //网络连接管理 new NettyConnectManageHandler(), //网络请求处理 new NettyServerHandler() ); } }); if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } try { //启动Netty服务器,绑定对应的端口号 ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); } if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); }通过以上分析可知,RocketMQ实际是在原生Netty之上进行了自己的封装。最后一张图来说明NameServer启动过程中关于Netty启动的部分。在后续的文章中我们再着重分析RocketMQ如何高效使用Netty框架。