dubbo笔记+源码刨析

tech2023-07-31  98

会不断更新!冲冲冲!跳转连接

https://blog.csdn.net/qq_35349982/category_10317485.html

dubbo笔记

1.概念

RPC全称为remote procedure call,即远程过程调用。

借助RPC可以做到像本地调用一样调用远程服务,是一种进程间的通信方式。

Java RMI 指的是远程方法调用 (Remote Method Invocation),是java原生支持的远程调用 ,采用JRMP(JavaRemote Messageing protocol)作为通信协议,可以认为是纯java版本的分布式远程调用解决方案, RMI主要用于不同虚拟机之间的通信

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)

官网: http://dubbo.apache.org/zh-cn/docs/user/quick-start.html

2.配置方式

xml配置: http://dubbo.apache.org/zh-cn/docs/user/quick-start.html

注解配置:http://dubbo.apache.org/zh-cn/docs/user/configuration/annotation.html

@EnableDubbo(scanBasePackages = "com.lagou.service.impl") //类似于启动类 //config文件 //加载报扫描 //生命周期 @EnableDubboConfig @DubboComponentScan @EnableDubboLifecycle public @interface EnableDubbo { }

3. SPI简介

SPI 全称为 (Service Provider Interface) ,是JDK内置的一种服务提供发现机制。 目前有不少框架用它来做服务的扩展发现,简单来说,它就是一种动态替换发现的机制。使用SPI机制的优势是实现解耦,使得第三方服务模块的装配控制逻辑与调用者的业务代码分离。

1.java的SPI

1.约定

1、当服务提供者提供了接口的一种具体实现后,在META-INF/services目录下创建一个以“接口全限定名”为命名的文件,内容为实现类的全限定名;

2、接口实现类所在的jar包放在主程序的classpath中;

3、主程序通过java.util.ServiceLoader动态装载实现模块,它通过扫描META-INF/services目录下的配置文件找到实现类的全限定名,把类加载到JVM;

4、SPI的实现类必须携带一个无参构造方法;

2.例子

数据库驱动加载接口实现类的加载 JDBC加载不同类型数据库的驱动

日志门面接口实现类加载 SLF4J加载不同提供商的日志实现类

Spring Spring中大量使用了SPI,比如:对servlet3.0规范对ServletContainerInitializer的实现、自动类型转换Type Conversion SPI(Converter SPI、Formatter SPI)等

Dubbo中也大量使用SPI的方式实现框架的扩展, 不过它对Java提供的原生SPI做了封装,允许用户扩展实Filter接口

3.code

1.api层

public interface HelloService { String sayHello(); }

2.impl层

public class DogHelloService implements HelloService { @Override public String sayHello() { return "wang wang"; } } public class HumanHelloService implements HelloService { @Override public String sayHello() { return "hello 你好"; } }

resources层

新建 META-INF/services/ 目录

后面跟上接口的包名

META-INF/services/com.lagou.service.HelloService

com.lagou.service.impl.DogHelloService com.lagou.service.impl.HumanHelloService

3.调用层

public static void main(String[] args) { final ServiceLoader<HelloService> helloServices = ServiceLoader.load(HelloService.class); for (HelloService helloService : helloServices){ System.out.println(helloService.getClass().getName() + ":" + helloService.sayHello()); } }

2.dubbo的SPI

1.约定

使用注解 @SPI

@Adaptive 主要解决的问题是如何动态的选择具体的扩展点。通过getAdaptiveExtension 统一对指定接口对应的所有扩展点进行封装

配置到 META-INF/dubbo/下 + com.lagou.service.HelloService(接口路径)

dubbo自己做SPI的目的 1. JDK 标准的 SPI 会一次性实例化扩展点所有实现,如果有扩展实现初始化很耗时,但如果没用上也加 载,会很浪费资源 2. 如果有扩展点加载失败,则所有扩展点无法使用 3. 提供了对扩展点包装的功能(Adaptive),并且还支持通过set的方式对其他的扩展点进行注入

2.code

1.api层

@SPI("human") public interface HelloService { String sayHello(); @Adaptive String sayHello(URL url); }

2.impi层

//================================== public class DogHelloService implements HelloService{ @Override public String sayHello() { return "wang wang"; } @Override public String sayHello(URL url) { return "wang url"; } } //================================== public class HumanHelloService implements HelloService{ @Override public String sayHello() { return "hello 你好"; } @Override public String sayHello(URL url) { return "hello url"; } }

在Resources的META-INF/dubbo/com.lagou.service.HelloService目录下

human=com.lagou.service.impl.HumanHelloService dog=com.lagou.service.impl.DogHelloService

3.调用层

调用全部拓展加载器 public static void main(String[] args) { // 获取扩展加载器 ExtensionLoader<HelloService> extensionLoader = ExtensionLoader.getExtensionLoader(HelloService.class); // 遍历所有的支持的扩展点 META-INF.dubbo Set<String> extensions = extensionLoader.getSupportedExtensions(); for (String extension : extensions){ String result = extensionLoader.getExtension(extension).sayHello(); System.out.println(result); } } 调用指定的 public static void main(String[] args) { //test://localhost/hello? 可以先随便写 // hello.service=dog 表示 HelloService类的 配置的key为dog的接口 URL url = URL.valueOf("test://localhost/hello?hello.service=dog"); HelloService adaptiveExtension = ExtensionLoader.getExtensionLoader(HelloService.class).getAdaptiveExtension(); String msg = adaptiveExtension.sayHello(url); System.out.println(msg); }

4.dubbo使用

1.api模块

//公共模块 service-api public interface HelloService { String sayHello(String name); }

2.provider模块

//package com.lagou.service.impl; //dubbo的service注解 @Service public class HelloServiceImpl implements HelloService { @Override public String sayHello(String name) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello:"+name; } } //====Main方法 public class DubboPureMain { public static void main(String[] args) throws Exception{ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ProviderConfiguration.class); context.start(); System.in.read(); } @Configuration @EnableDubbo(scanBasePackages = "com.lagou.service.impl") @PropertySource("classpath:/dubbo-provider.properties") static class ProviderConfiguration{ @Bean public RegistryConfig registryConfig(){ RegistryConfig registryConfig = new RegistryConfig(); registryConfig.setAddress("zookeeper://127.0.0.1:2181?timeout=10000"); //registryConfig.setTimeout(10000); return registryConfig; } } }

resources中

dubbo-consumer.properties

dubbo.application.name=service-consumer dubbo.registry.address=zookeeper://127.0.0.1:2181 dubbo.consumer.timeout=4000 ##运维命令 dubbo.application.qosEnable=true dubbo.application.qosPort=33333 dubbo.application.qosAcceptForeignIp=false

3.consumer模块

@Component public class ComsumerComponet { //dubbo的reference注解 @Reference private HelloService helloService; public String sayHello(String name){ return helloService.sayHello(name); } } //================================= //Main方法 public class AnnotationConsumerMain { public static void main(String[] args) throws Exception { System.out.println("-------------"); AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ConsumerConfiguration.class); context.start(); // 获取消费者组件 ComsumerComponet service = context.getBean(ComsumerComponet.class); while(true){ System.in.read(); String hello = service.sayHello("world"); System.out.println("result:"+hello); } } @Configuration @PropertySource("classpath:/dubbo-consumer.properties") @ComponentScan(basePackages = "com.lagou.bean") @EnableDubbo static class ConsumerConfiguration{ } }

#=============

3.过滤器配置

@Activate(group = {CommonConstants.CONSUMER,CommonConstants.PROVIDER}) public class DubboInvokeFilter implements Filter { @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { long startTime = System.currentTimeMillis(); try { // 执行方法 return invoker.invoke(invocation); } finally { System.out.println("invoke time:"+(System.currentTimeMillis()-startTime) + "毫秒"); } } }

在Resources配置文件中配置

META-INF/dubbo/org.apache.dubbo.rpc.Filter

timeFilter=com.lagou.filter.DubboInvokeFilter

在使用的模块中 添加依赖即可

4.负载均衡

public class OnlyFirstLoadbalancer implements LoadBalance { @Override public <T> Invoker<T> select(List<Invoker<T>> list, URL url, Invocation invocation) throws RpcException { // 所有的服务提供者 按照IP + 端口排序 选择第一个 return list.stream().sorted((i1,i2)->{ final int ipCompare = i1.getUrl().getIp().compareTo(i2.getUrl().getIp()); if(ipCompare == 0){ return Integer.compare(i1.getUrl().getPort(),i2.getUrl().getPort()); } return ipCompare; }).findFirst().get(); } }

在Resources配置文件中配置

META-INF/dubbo/org.apache.dubbo.rpc.cluster.LoadBalance

onlyFirst=com.laogu.loadbalance.OnlyFirstLoadbalancer

在Counsumer中使用

@Component public class ConsumerComponent { //在dubbo的注解这块 添加负载均衡 @Reference(loadbalance = "onlyFirst") private HelloService helloService; public String sayHello(String name, int timeToWait) { return helloService.sayHello(name, timeToWait); } }

5.线程池

官网 : http://dubbo.apache.org/zh-cn/docs/user/demos/thread-model.html

public class WachingThreadPool extends FixedThreadPool implements Runnable{ private static final Logger LOGGER = LoggerFactory.getLogger(WachingThreadPool.class); // 定义线程池使用的阀值 private static final double ALARM_PERCENT = 0.90; private final Map<URL, ThreadPoolExecutor> THREAD_POOLS = new ConcurrentHashMap<>(); public WachingThreadPool(){ // 每隔3秒打印线程使用情况 Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this,1,3, TimeUnit.SECONDS); } // 通过父类创建线程池 @Override public Executor getExecutor(URL url) { final Executor executor = super.getExecutor(url); if(executor instanceof ThreadPoolExecutor){ THREAD_POOLS.put(url,(ThreadPoolExecutor)executor); } return executor; } @Override public void run() { // 遍历线程池 for (Map.Entry<URL,ThreadPoolExecutor> entry: THREAD_POOLS.entrySet()){ final URL url = entry.getKey(); final ThreadPoolExecutor executor = entry.getValue(); // 计算相关指标 final int activeCount = executor.getActiveCount(); final int poolSize = executor.getCorePoolSize(); double usedPercent = activeCount / (poolSize*1.0); LOGGER.info("线程池执行状态:[{}/{}:{}%]",activeCount,poolSize,usedPercent*100); if (usedPercent > ALARM_PERCENT){ LOGGER.error("超出警戒线! host:{} 当前使用率是:{},URL:{}",url.getIp(),usedPercent*100,url); } } } } public class WachingThreadPool extends FixedThreadPool implements Runnable{ private static final Logger LOGGER = LoggerFactory.getLogger(WachingThreadPool.class); // 定义线程池使用的阀值 private static final double ALARM_PERCENT = 0.90; private final Map<URL, ThreadPoolExecutor> THREAD_POOLS = new ConcurrentHashMap<>(); public WachingThreadPool(){ // 每隔3秒打印线程使用情况 Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this,1,3, TimeUnit.SECONDS); } // 通过父类创建线程池 @Override public Executor getExecutor(URL url) { final Executor executor = super.getExecutor(url); if(executor instanceof ThreadPoolExecutor){ THREAD_POOLS.put(url,(ThreadPoolExecutor)executor); } return executor; } @Override public void run() { // 遍历线程池 for (Map.Entry<URL,ThreadPoolExecutor> entry: THREAD_POOLS.entrySet()){ final URL url = entry.getKey(); final ThreadPoolExecutor executor = entry.getValue(); // 计算相关指标 final int activeCount = executor.getActiveCount(); final int poolSize = executor.getCorePoolSize(); double usedPercent = activeCount / (poolSize*1.0); LOGGER.info("线程池执行状态:[{}/{}:{}%]",activeCount,poolSize,usedPercent*100); if (usedPercent > ALARM_PERCENT){ LOGGER.error("超出警戒线! host:{} 当前使用率是:{},URL:{}",url.getIp(),usedPercent*100,url); } } } }

在provider中配置文件中添加线程池

dubbo.application.name=dubbo-demo-annotation-provider dubbo.protocol.name=dubbo dubbo.protocol.port=20885 #dubbo.protocol.host=192.168.1.109 dubbo.provider.threadpool=watching #dubbo.protocol.telnet=clear,exit,help,status,log,ls,ps,cd,pwd,invoke,trace,count,select,shutdown

6.服务降级

服务降级,当服务器压力剧增的情况下,根据当前业务情况及流量对一些服务有策略的降低服务级别,以释放服务器资源,保证核心任务的正常运行。

在 dubbo 管理控制台配置服务降级

屏蔽和容错

mock=force:return+null

表示消费方对该服务的方法调用都直接返回 null 值,不发起远程调用。用来屏蔽不重要服务不可用时对调用方的影响。

mock=fail:return+null

表示消费方对该服务的方法调用在失败后,再返回 null 值,不抛异常。用来容忍不重要服务不稳定时对调用方的影响。

指定返回简单值或者null <dubbo:reference id="xxService" check="false" interface="com.xx.XxService" timeout="3000" mock="return null" /> <dubbo:reference id="xxService2" check="false" interface="com.xx.XxService2" timeout="3000" mock="return 1234" /> 使用java代码 动态写入配置中心 RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension() ; Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://IP:端 口")); registry.register(URL.valueOf("override://0.0.0.0/com.foo.BarService?category=configurators&dynamic=false&application=foo&mock=force:return+null"));

dubbo的控制台客户端

1.从git 上下载项目 https://github.com/apache/dubbo-admin 2.修改项目下的dubbo.properties文件 注意 dubbo.registry.address对应的值需要对应当前使用的Zookeeper的ip地址和端口号 dubbo.registry.address=zookeeper://zk所在机器ip:zk端口 dubbo.admin.root.password=root dubbo.admin.guest.password=guest 3.切换到项目所在的路径 使用mvn 打包 mvn clean package -Dmaven.test.skip=true 4.java 命令运行 java -jar 对应的jar包

dubbo的补充

1.dubbo中的Url

protocol:一般是 dubbo 中的各种协议 如:dubbo thrift http zkusername/password:用户名/密码host/port:主机/端口path:接口名称parameters:参数键值对

任意的一个领域中的一个实现都可以认为是一类 URL,dubbo 使用 URL 来统一描述了元数据,配置信息,贯穿在整个框架之中。

http://dubbo.apache.org/zh-cn/blog/introduction-to-dubbo-url.html

dubbo的设计

http://dubbo.apache.org/zh-cn/docs/dev/design.html

作业思路

作业二

服务端:

​ 定义一个类,三个方法,用随机数100秒 随机

过滤器:

​ 方法执行时,记录时间存储到Map

​ key是 方法名

​ value是一个Map

​ key是方法最后的执行时间

​ value是方法的执行消耗时间

消费端:

​ 开启两个线程

​ 1.循环执行三个方法

​ 2.读取Map,key

​ 根据key剔除掉一分钟之前的数据

​ Map按照时间戳排序

​ T90取size()*90的数据

消费端

//消费端 public class AnnotationConsumerMain { public static void main(String[] args) throws Exception { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ConsumerConfiguration.class); context.start(); ConsumerComponent service = context.getBean(ConsumerComponent.class); //定义线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,2,100, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10)); //定义三个方法执行的方法 Runnable consumer = new Thread(() -> { try { service.consumer(); } catch (InterruptedException e) { e.printStackTrace(); } }); threadPoolExecutor.execute(consumer); //定义监控的方法 Runnable consumer1 = new Thread(() -> { PrintMethodCost.print(); }); threadPoolExecutor.execute(consumer1); } @Configuration @PropertySource("classpath:/dubbo-consumer.properties") //@EnableDubbo(scanBasePackages = "com.lagou.bean") @ComponentScan("com.lagou.bean") @EnableDubbo static class ConsumerConfiguration { } } ///监控的方法 public class PrintMethodCost { public static void print() { while (true) { //取执行方法的集合 //key 方法名 //value 是 方法的最后执行时间-执行毫秒数 Map<String, HashMap<Long, Long>> methodCount = MethodCostTimeCount.getMethodCount(); long currentTimeMillis = System.currentTimeMillis(); //现在的时间-60秒就是 开始的时间; long oneMinuteBefore = currentTimeMillis - 60000; for (Map.Entry<String, HashMap<Long, Long>> entry : methodCount.entrySet()) { //移除 Iterator iterator = entry.getValue().entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<Long, Long> key = (Map.Entry<Long, Long>) iterator.next(); if (key.getKey() < oneMinuteBefore) { iterator.remove(); } } HashMap<Long, Long> oneMinuteMap = entry.getValue(); //按照value值进行升序排序 List<Map.Entry<Long, Long>> list = new ArrayList<>(oneMinuteMap.entrySet()); /* Collections.sort(list, new Comparator<Map.Entry<Long, Long>>() { public int compare(Map.Entry<Long, Long> o1, Map.Entry<Long, Long> o2) { return o1.getValue().compareTo(o2.getValue());//利用String类的compareTo方法 } });*/ Collections.sort(list, Comparator.comparing(Map.Entry::getValue)); int index90 = (int) (list.size() * 0.9); int index99 = (int) (list.size() * 0.99); Long tp90Time = list.get(index90).getKey(); Long tp90Cost = list.get(index90).getValue(); System.out.println("T90方法名="+entry.getKey() + "毫秒数=" + tp90Cost + " ms,方法执行时间 : " + tp90Time); Long tp99Time = list.get(index99).getKey(); Long tp99Cost = list.get(index99).getValue(); System.out.println("T99方法名="+entry.getKey() + "毫秒数=" + tp99Cost + " ms,方法执行时间 : " + tp99Time); } try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { System.out.println("Thread sleep exception!"); } System.out.println("=============================="); } } } //执行方法的 @Component public class ConsumerComponent { @Reference private HelloService helloService; private volatile Integer num =0; private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,8,100, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1000)); public void consumer() throws InterruptedException { Runnable runnableA = new Thread(() -> helloService.sayHello1()); Runnable runnableB = new Thread(() -> helloService.sayHello2()); Runnable runnableC = new Thread(() -> helloService.sayHello3()); while (true){ TimeUnit.MILLISECONDS.sleep(70); try{ threadPoolExecutor.execute(runnableA); threadPoolExecutor.execute(runnableB); threadPoolExecutor.execute(runnableC); } catch (RejectedExecutionException e){ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException ex) { System.out.println("Thread sleep exception!"); } } } } }

过滤器

//过滤器存储类 public class MethodCostTimeCount { public static Map<String, HashMap<Long, Long>> methodCount = new ConcurrentHashMap<>(); public static void put(String method, Long time, Long cost){ HashMap<Long, Long> hashMap = methodCount.get(method); if(hashMap == null){ hashMap = new HashMap<>(); methodCount.put(method, hashMap); } hashMap.put(time, cost); } public static Map<String, HashMap<Long, Long>> getMethodCount() { return methodCount; } } @Activate(group = {CommonConstants.CONSUMER}) public class TPMonitorFilter implements Filter { @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { String methodName = invocation.getMethodName(); long startTime = System.currentTimeMillis(); try { // 执行方法 return invoker.invoke(invocation); } finally { long endTime = System.currentTimeMillis(); long costTime = endTime - startTime; //添加到集合 MethodCostTimeCount.put(methodName, endTime, costTime); System.out.println("invoke time:"+costTime + "毫秒"); } } }

附录

1.spring注解开发AnnotationConfigApplicationContext的使用

使用AnnotationConfigApplicationContext可以实现基于Java的配置类(包括各种注解)加载Spring的应用上下文。避免使用application.xml进行配置。相比XML配置,更加便捷。

2.spring的上下文

Spring有两个核心接口:BeanFactory和ApplicationContext,其中ApplicationContext是BeanFactory的子接口。他们都可代表Spring容器,Spring容器是生成Bean实例的工厂,并且管理容器中的Bean。

Spring容器最基本的接口就是BeanFactor。BeanFactory负责配置、创建、管理Bean,

他有一个子接口:ApplicationContext,因此也称之为Spring上下文。

Spring容器负责管理Bean与Bean之间的依赖关系。

https://www.cnblogs.com/chenssy/archive/2012/11/15/2772287.html

3.java8的Stream流编程

Stream 是 Java8 中处理集合的关键抽象概念

中间操作

​ filter:过滤流中的某些元素 ​ limit(n):获取n个元素 ​ skip(n):跳过n元素,配合limit(n)可实现分页 ​ distinct:通过流中元素的 hashCode() 和 equals() 去除重复元素

Stream<Integer> stream = Stream.of(6, 4, 6, 7, 3, 9, 8, 10, 12, 14, 14); Stream<Integer> newStream = stream.filter(s -> s > 5) //6 6 7 9 8 10 12 14 14 .distinct() //6 7 9 8 10 12 14 .skip(2) //9 8 10 12 14 .limit(2); //9 8 newStream.forEach(System.out::println);

https://blog.csdn.net/y_k_y/article/details/84633001

4.Thread.sleep()和TimeUnit.SECONDS.sleep()的区别与联系

TimeUnit对Thread.sleep方法的包装,实现是一样的,只是多了时间单位转换和验证

源码底层

//TimeUnit源码 public void sleep(long timeout) throws InterruptedException { if (timeout > 0) { long ms = toMillis(timeout); int ns = excessNanos(timeout, ms); Thread.sleep(ms, ns); } }

5.Future是什么?使用?

https://www.cnblogs.com/cz123/p/7693064.html

https://www.jianshu.com/p/b8952f07ee5d

6.TP90、TP99耗时监控设计与实现

https://blog.csdn.net/u012472945/article/details/105611155

TP50:指在一个时间段内(如5分钟),统计该方法每次调用所消耗的时间,并将这些时间按从小到大的顺序进行排序,取第50%的那个值作为TP50的值;配置此监控指标对应的报警阀值后,需要保证在这个时间段内该方法所有调用的消耗时间至少有50%的值要小于此阀值,否则系统将会报警

7.粘包与沾包

8.线程池OOM

https://mp.weixin.qq.com/s/1ZRkW4ND8y1KqRSFY_7_-A

作业

dubbo定义拦截器获取 :Ip获取白名单 :https://www.jianshu.com/p/98d68d57f62a

单词

manual 手动的

Extension 拓展

retention 保留

Policy 政策

exhaust 废弃,疲惫

excess超过

convenience 方便,适宜

Scheduled 安排

business 商业,业务

cluster 群,聚集

protocol 协议

attachment附件

最新回复(0)