基于Netty实现dubbo RPC调用

tech2025-08-15  6

Dubbo底层使用Netty作为网络通讯框架,根据Netty手写简单的RPC框架

具体步骤:

定义一个接口和抽象方法,用于消费者和服务提供者之间的约定

定义一个服务提供者,该类需要监听消费者的请求,并按照约定返回数据

定义一个服务消费者,该类需要透明的调用自己不存在的方法,内部使用Netty请求提供者返回数据

 

定义接口协议和抽象方法,如下的HelloService接口

//提供给服务消息者和服务提供者使用 public interface HelloService { public String run(String msg); }

定义服务提供者

public class NettyServer { public static void startServer(String host, int port){ startServer0(host,port); } //初始Netty ,启动 private static void startServer0(String host, int port){ NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new NettyServerHandler()); } }); ChannelFuture channelFuture = bootstrap.bind(host, port).sync(); log.info("服务提供者开始提供服务"); channelFuture.channel().closeFuture().sync(); }catch (Exception e){ log.error("netty server start error, {}", e); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //获取客户端发送的消息,并调用自己的服务 System.out.println("收到消费者的消息:" + msg); //客户端在调用服务器API,需要定义一个协议 //要求:每次在发消息的时候都必须以“HelloServer#” 开头 if(msg.toString().startsWith("HelloService#")){ String result = new HelloServiceImpl().run(msg.toString().substring(msg.toString().lastIndexOf("#") + 1)); ctx.writeAndFlush(result); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("NettyServerHandler run error", cause); ctx.close(); } }

并通过启动类启动NettyServer服务,监听消费者的请求

public class ServerBootStrap { public static void main(String[] args) { NettyServer.startServer("127.0.0.1", 7000); } }

服务提供者中提供服务HelloServiceImpl,实现HelloService接口!

public class HelloServiceImpl implements HelloService { //当有消费者 调用该方法时就返回一个字符串 @Override public String run(String msg) { System.out.println("收到客户端消息: " + msg); if(msg != null){ return "hello too, response"; }else{ return "hello too"; } } }

编写服务消费者,服务消费者提供HelloService的一个代理对象,在代理对象中初始化NettyClient并设置相关参数,获取请求结果

public class NettyClient { //创建线程池 private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private static NettyClientHandler clientHandler = null; //创建代理对象 public Object getBean(final Class<?> serviceClass, final String providerName){ return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{serviceClass}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if(clientHandler == null) { initClient();//初始化client,建立连接成功后执行handler的channelActive方法 } //设置要发给服务器的信息,providerName:协议头,args[0] 就是客户端调用api的参数 clientHandler.setParam(providerName + args[0]); //执行setParam方法 return executor.submit(clientHandler).get();//call方法被调用 } }); } private static void initClient(){ clientHandler = new NettyClientHandler(); NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); try{ bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(clientHandler); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7000).sync(); //channelFuture.channel().closeFuture().sync(); }catch (InterruptedException e) { e.printStackTrace(); } // }finally { // group.shutdownGracefully(); // } } }

消费者的Handler,通过wait和notify方法实现参数的发送和结果的响应

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable { private ChannelHandlerContext context; private String result;//返回的结果 private String param;//客户端调用方法传入的参数 //被代理对象调用,发送数据给服务器,等待被唤醒 @Override public synchronized Object call() throws Exception { context.writeAndFlush(param); System.out.println("call1 方法被调用" + param); wait();//等待ChannelRead方法获取服务器的结果后,唤醒,channelRead方法执行后,result就有结果了 System.out.println("call2 方法被调用"); return result; } //与服务器连接成功后就会被调用,第一个被调用 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive 方法被调用"); context = ctx; //因为在其他方法需要使用上下文 System.out.println("context " + context); } //与收到服务器的数据后调用 @Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("channelRead 方法被调用"); result = msg.toString(); notify();//唤醒等待的线程 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } void setParam(String param){ System.out.println("setParam"); this.param = param; } }

启动消费者,通过创建代理对象(创建代理对象过程中就会根据接口类型,发送请求,获取请求结果)请求某个接口的响应

public class ClientBootstrap { public static final String PROVIDE_NAME = "HelloService#"; public static void main(String[] args) { NettyClient consumer = new NettyClient(); //创建代理对象 HelloService helloService =(HelloService) consumer.getBean(HelloService.class, PROVIDE_NAME); //通过代理对象调用服务提供者的服务 for(;;){ try { Thread.sleep(4*1000); } catch (InterruptedException e) { e.printStackTrace(); } String result = helloService.run("hello, dubbo"); System.out.println("调用的结果: "+ result); } } }

需要注意的是消费者的线程组不能shutDown!

启动服务端和客户端,服务端运行结果:

收到消费者的消息:HelloService#hello, dubbo 收到客户端消息: hello, dubbo 收到消费者的消息:HelloService#hello, dubbo 收到客户端消息: hello, dubbo

客户端运行结果

channelActive 方法被调用 setParam 11:55:20.990 [pool-1-thread-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 32768 11:55:20.990 [pool-1-thread-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2 11:55:20.990 [pool-1-thread-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16 11:55:20.990 [pool-1-thread-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8 call1 方法被调用HelloService#hello, dubbo context ChannelHandlerContext(NettyClientHandler#0, [id: 0xd0bbfebc, L:/127.0.0.1:59593 - R:/127.0.0.1:7000]) 11:55:21.008 [nioEventLoopGroup-2-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.bytebuf.checkAccessible: true 11:55:21.009 [nioEventLoopGroup-2-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@6769d414 channelRead 方法被调用 call2 方法被调用 调用的结果: hello too, response setParam call1 方法被调用HelloService#hello, dubbo channelRead 方法被调用 call2 方法被调用 调用的结果: hello too, response

最新回复(0)