会不断更新!冲冲冲!跳转连接
https://blog.csdn.net/qq_35349982/category_10317485.html
dubbo笔记
1.概念
RPC全称为remote procedure call,即远程过程调用。
借助RPC可以做到像本地调用一样调用远程服务,是一种进程间的通信方式。
Java RMI 指的是远程方法调用 (Remote Method Invocation),是java原生支持的远程调用 ,采用JRMP(JavaRemote Messageing protocol)作为通信协议,可以认为是纯java版本的分布式远程调用解决方案, RMI主要用于不同虚拟机之间的通信
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)
官网: http://dubbo.apache.org/zh-cn/docs/user/quick-start.html
2.配置方式
xml配置: http://dubbo.apache.org/zh-cn/docs/user/quick-start.html
注解配置:http://dubbo.apache.org/zh-cn/docs/user/configuration/annotation.html
@EnableDubbo(scanBasePackages
= "com.lagou.service.impl")
@EnableDubboConfig
@DubboComponentScan
@EnableDubboLifecycle
public @
interface EnableDubbo {
}
3. SPI简介
SPI 全称为 (Service Provider Interface) ,是JDK内置的一种服务提供发现机制。 目前有不少框架用它来做服务的扩展发现,简单来说,它就是一种动态替换发现的机制。使用SPI机制的优势是实现解耦,使得第三方服务模块的装配控制逻辑与调用者的业务代码分离。
1.java的SPI
1.约定
1、当服务提供者提供了接口的一种具体实现后,在META-INF/services目录下创建一个以“接口全限定名”为命名的文件,内容为实现类的全限定名;
2、接口实现类所在的jar包放在主程序的classpath中;
3、主程序通过java.util.ServiceLoader动态装载实现模块,它通过扫描META-INF/services目录下的配置文件找到实现类的全限定名,把类加载到JVM;
4、SPI的实现类必须携带一个无参构造方法;
2.例子
数据库驱动加载接口实现类的加载 JDBC加载不同类型数据库的驱动
日志门面接口实现类加载 SLF4J加载不同提供商的日志实现类
Spring Spring中大量使用了SPI,比如:对servlet3.0规范对ServletContainerInitializer的实现、自动类型转换Type Conversion SPI(Converter SPI、Formatter SPI)等
Dubbo中也大量使用SPI的方式实现框架的扩展, 不过它对Java提供的原生SPI做了封装,允许用户扩展实Filter接口
3.code
1.api层
public interface HelloService {
String
sayHello();
}
2.impl层
public class DogHelloService implements HelloService {
@Override
public String
sayHello() {
return "wang wang";
}
}
public class HumanHelloService implements HelloService {
@Override
public String
sayHello() {
return "hello 你好";
}
}
resources层
新建 META-INF/services/ 目录
后面跟上接口的包名
META-INF/services/com.lagou.service.HelloService
com.lagou.service.impl.DogHelloService
com.lagou.service.impl.HumanHelloService
3.调用层
public static void main(String
[] args
) {
final ServiceLoader
<HelloService> helloServices
= ServiceLoader
.load(HelloService
.class);
for (HelloService helloService
: helloServices
){
System
.out
.println(helloService
.getClass().getName() + ":" + helloService
.sayHello());
}
}
2.dubbo的SPI
1.约定
使用注解 @SPI
@Adaptive 主要解决的问题是如何动态的选择具体的扩展点。通过getAdaptiveExtension 统一对指定接口对应的所有扩展点进行封装
配置到 META-INF/dubbo/下 + com.lagou.service.HelloService(接口路径)
dubbo自己做SPI的目的
1. JDK 标准的 SPI 会一次性实例化扩展点所有实现,如果有扩展实现初始化很耗时,但如果没用上也加 载,会很浪费资源
2. 如果有扩展点加载失败,则所有扩展点无法使用
3. 提供了对扩展点包装的功能(Adaptive),并且还支持通过set的方式对其他的扩展点进行注入
2.code
1.api层
@SPI("human")
public interface HelloService {
String
sayHello();
@Adaptive
String
sayHello(URL url
);
}
2.impi层
public class DogHelloService implements HelloService{
@Override
public String
sayHello() {
return "wang wang";
}
@Override
public String
sayHello(URL url
) {
return "wang url";
}
}
public class HumanHelloService implements HelloService{
@Override
public String
sayHello() {
return "hello 你好";
}
@Override
public String
sayHello(URL url
) {
return "hello url";
}
}
在Resources的META-INF/dubbo/com.lagou.service.HelloService目录下
human=com.lagou.service.impl.HumanHelloService
dog=com.lagou.service.impl.DogHelloService
3.调用层
调用全部拓展加载器
public static void main(String
[] args
) {
ExtensionLoader
<HelloService> extensionLoader
= ExtensionLoader
.getExtensionLoader(HelloService
.class);
Set
<String> extensions
= extensionLoader
.getSupportedExtensions();
for (String extension
: extensions
){
String result
= extensionLoader
.getExtension(extension
).sayHello();
System
.out
.println(result
);
}
}
调用指定的
public static void main(String
[] args
) {
URL url
= URL
.valueOf("test://localhost/hello?hello.service=dog");
HelloService adaptiveExtension
= ExtensionLoader
.getExtensionLoader(HelloService
.class).getAdaptiveExtension();
String msg
= adaptiveExtension
.sayHello(url
);
System
.out
.println(msg
);
}
4.dubbo使用
1.api模块
public interface HelloService {
String
sayHello(String name
);
}
2.provider模块
@Service
public class HelloServiceImpl implements HelloService {
@Override
public String
sayHello(String name
) {
try {
Thread
.sleep(3000);
} catch (InterruptedException e
) {
e
.printStackTrace();
}
return "hello:"+name
;
}
}
public class DubboPureMain {
public static void main(String
[] args
) throws Exception
{
AnnotationConfigApplicationContext context
= new AnnotationConfigApplicationContext(ProviderConfiguration
.class);
context
.start();
System
.in
.read();
}
@Configuration
@EnableDubbo(scanBasePackages
= "com.lagou.service.impl")
@PropertySource("classpath:/dubbo-provider.properties")
static class ProviderConfiguration{
@Bean
public RegistryConfig
registryConfig(){
RegistryConfig registryConfig
= new RegistryConfig();
registryConfig
.setAddress("zookeeper://127.0.0.1:2181?timeout=10000");
return registryConfig
;
}
}
}
resources中
dubbo-consumer.properties
dubbo.application.name=service-consumer
dubbo.registry.address=zookeeper://127.0.0.1:2181
dubbo.consumer.timeout=4000
##运维命令
dubbo.application.qosEnable=true
dubbo.application.qosPort=33333
dubbo.application.qosAcceptForeignIp=false
3.consumer模块
@Component
public class ComsumerComponet {
@Reference
private HelloService helloService
;
public String
sayHello(String name
){
return helloService
.sayHello(name
);
}
}
public class AnnotationConsumerMain {
public static void main(String
[] args
) throws Exception
{
System
.out
.println("-------------");
AnnotationConfigApplicationContext context
= new AnnotationConfigApplicationContext(ConsumerConfiguration
.class);
context
.start();
ComsumerComponet service
= context
.getBean(ComsumerComponet
.class);
while(true){
System
.in
.read();
String hello
= service
.sayHello("world");
System
.out
.println("result:"+hello
);
}
}
@Configuration
@PropertySource("classpath:/dubbo-consumer.properties")
@ComponentScan(basePackages
= "com.lagou.bean")
@EnableDubbo
static class ConsumerConfiguration{
}
}
#=============
3.过滤器配置
@Activate(group
= {CommonConstants
.CONSUMER
,CommonConstants
.PROVIDER
})
public class DubboInvokeFilter implements Filter {
@Override
public Result
invoke(Invoker
<?> invoker
, Invocation invocation
) throws RpcException
{
long startTime
= System
.currentTimeMillis();
try {
return invoker
.invoke(invocation
);
} finally {
System
.out
.println("invoke time:"+(System
.currentTimeMillis()-startTime
) + "毫秒");
}
}
}
在Resources配置文件中配置
META-INF/dubbo/org.apache.dubbo.rpc.Filter
timeFilter=com.lagou.filter.DubboInvokeFilter
在使用的模块中 添加依赖即可
4.负载均衡
public class OnlyFirstLoadbalancer implements LoadBalance {
@Override
public <T> Invoker
<T> select(List
<Invoker
<T>> list
, URL url
, Invocation invocation
) throws RpcException
{
return list
.stream().sorted((i1
,i2
)->{
final int ipCompare
= i1
.getUrl().getIp().compareTo(i2
.getUrl().getIp());
if(ipCompare
== 0){
return Integer
.compare(i1
.getUrl().getPort(),i2
.getUrl().getPort());
}
return ipCompare
;
}).findFirst().get();
}
}
在Resources配置文件中配置
META-INF/dubbo/org.apache.dubbo.rpc.cluster.LoadBalance
onlyFirst=com.laogu.loadbalance.OnlyFirstLoadbalancer
在Counsumer中使用
@Component
public class ConsumerComponent {
@Reference(loadbalance
= "onlyFirst")
private HelloService helloService
;
public String
sayHello(String name
, int timeToWait
) {
return helloService
.sayHello(name
, timeToWait
);
}
}
5.线程池
官网 : http://dubbo.apache.org/zh-cn/docs/user/demos/thread-model.html
public class WachingThreadPool extends FixedThreadPool implements Runnable{
private static final Logger LOGGER
= LoggerFactory
.getLogger(WachingThreadPool
.class);
private static final double ALARM_PERCENT
= 0.90;
private final Map
<URL, ThreadPoolExecutor> THREAD_POOLS
= new ConcurrentHashMap<>();
public WachingThreadPool(){
Executors
.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this,1,3, TimeUnit
.SECONDS
);
}
@Override
public Executor
getExecutor(URL url
) {
final Executor executor
= super.getExecutor(url
);
if(executor
instanceof ThreadPoolExecutor){
THREAD_POOLS
.put(url
,(ThreadPoolExecutor
)executor
);
}
return executor
;
}
@Override
public void run() {
for (Map
.Entry
<URL,ThreadPoolExecutor> entry
: THREAD_POOLS
.entrySet()){
final URL url
= entry
.getKey();
final ThreadPoolExecutor executor
= entry
.getValue();
final int activeCount
= executor
.getActiveCount();
final int poolSize
= executor
.getCorePoolSize();
double usedPercent
= activeCount
/ (poolSize
*1.0);
LOGGER
.info("线程池执行状态:[{}/{}:{}%]",activeCount
,poolSize
,usedPercent
*100);
if (usedPercent
> ALARM_PERCENT
){
LOGGER
.error("超出警戒线! host:{} 当前使用率是:{},URL:{}",url
.getIp(),usedPercent
*100,url
);
}
}
}
}
public class WachingThreadPool extends FixedThreadPool implements Runnable{
private static final Logger LOGGER
= LoggerFactory
.getLogger(WachingThreadPool
.class);
private static final double ALARM_PERCENT
= 0.90;
private final Map
<URL, ThreadPoolExecutor> THREAD_POOLS
= new ConcurrentHashMap<>();
public WachingThreadPool(){
Executors
.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this,1,3, TimeUnit
.SECONDS
);
}
@Override
public Executor
getExecutor(URL url
) {
final Executor executor
= super.getExecutor(url
);
if(executor
instanceof ThreadPoolExecutor){
THREAD_POOLS
.put(url
,(ThreadPoolExecutor
)executor
);
}
return executor
;
}
@Override
public void run() {
for (Map
.Entry
<URL,ThreadPoolExecutor> entry
: THREAD_POOLS
.entrySet()){
final URL url
= entry
.getKey();
final ThreadPoolExecutor executor
= entry
.getValue();
final int activeCount
= executor
.getActiveCount();
final int poolSize
= executor
.getCorePoolSize();
double usedPercent
= activeCount
/ (poolSize
*1.0);
LOGGER
.info("线程池执行状态:[{}/{}:{}%]",activeCount
,poolSize
,usedPercent
*100);
if (usedPercent
> ALARM_PERCENT
){
LOGGER
.error("超出警戒线! host:{} 当前使用率是:{},URL:{}",url
.getIp(),usedPercent
*100,url
);
}
}
}
}
在provider中配置文件中添加线程池
dubbo.application.name=dubbo-demo-annotation-provider
dubbo.protocol.name=dubbo
dubbo.protocol.port=20885
#dubbo.protocol.host=192.168.1.109
dubbo.provider.threadpool=watching
#dubbo.protocol.telnet=clear,exit,help,status,log,ls,ps,cd,pwd,invoke,trace,count,select,shutdown
6.服务降级
服务降级,当服务器压力剧增的情况下,根据当前业务情况及流量对一些服务有策略的降低服务级别,以释放服务器资源,保证核心任务的正常运行。
在 dubbo 管理控制台配置服务降级
屏蔽和容错
mock=force:return+null
表示消费方对该服务的方法调用都直接返回 null 值,不发起远程调用。用来屏蔽不重要服务不可用时对调用方的影响。
mock=fail:return+null
表示消费方对该服务的方法调用在失败后,再返回 null 值,不抛异常。用来容忍不重要服务不稳定时对调用方的影响。
指定返回简单值或者null
<dubbo:reference id="xxService" check="false" interface="com.xx.XxService" timeout="3000" mock="return null" />
<dubbo:reference id="xxService2" check="false" interface="com.xx.XxService2" timeout="3000" mock="return 1234" />
使用java代码 动态写入配置中心
RegistryFactory registryFactory
= ExtensionLoader
.getExtensionLoader(RegistryFactory
.class).getAdaptiveExtension() ;
Registry registry
= registryFactory
.getRegistry(URL
.valueOf("zookeeper://IP:端 口")); registry
.register(URL
.valueOf("override://0.0.0.0/com.foo.BarService?category=configurators&dynamic=false&application=foo&mock=force:return+null"));
dubbo的控制台客户端
1.从git 上下载项目 https://github.com/apache/dubbo-admin
2.修改项目下的dubbo.properties文件
注意
dubbo.registry.address对应的值需要对应当前使用的Zookeeper的ip地址和端口号
dubbo.registry.address=zookeeper://zk所在机器ip:zk端口
dubbo.admin.root.password=root
dubbo.admin.guest.password=guest
3.切换到项目所在的路径 使用mvn 打包 mvn clean package -Dmaven.test.skip=true
4.java 命令运行 java -jar 对应的jar包
dubbo的补充
1.dubbo中的Url
protocol:一般是 dubbo 中的各种协议 如:dubbo thrift http zkusername/password:用户名/密码host/port:主机/端口path:接口名称parameters:参数键值对
任意的一个领域中的一个实现都可以认为是一类 URL,dubbo 使用 URL 来统一描述了元数据,配置信息,贯穿在整个框架之中。
http://dubbo.apache.org/zh-cn/blog/introduction-to-dubbo-url.html
dubbo的设计
http://dubbo.apache.org/zh-cn/docs/dev/design.html
作业思路
作业二
服务端:
定义一个类,三个方法,用随机数100秒 随机
过滤器:
方法执行时,记录时间存储到Map
key是 方法名
value是一个Map
key是方法最后的执行时间
value是方法的执行消耗时间
消费端:
开启两个线程
1.循环执行三个方法
2.读取Map,key
根据key剔除掉一分钟之前的数据
Map按照时间戳排序
T90取size()*90的数据
消费端
public class AnnotationConsumerMain {
public static void main(String
[] args
) throws Exception
{
AnnotationConfigApplicationContext context
= new AnnotationConfigApplicationContext(ConsumerConfiguration
.class);
context
.start();
ConsumerComponent service
= context
.getBean(ConsumerComponent
.class);
ThreadPoolExecutor threadPoolExecutor
= new ThreadPoolExecutor(2,2,100, TimeUnit
.MILLISECONDS
,new ArrayBlockingQueue<>(10));
Runnable consumer
= new Thread(() -> {
try {
service
.consumer();
} catch (InterruptedException e
) {
e
.printStackTrace();
}
});
threadPoolExecutor
.execute(consumer
);
Runnable consumer1
= new Thread(() -> {
PrintMethodCost
.print();
});
threadPoolExecutor
.execute(consumer1
);
}
@Configuration
@PropertySource("classpath:/dubbo-consumer.properties")
@ComponentScan("com.lagou.bean")
@EnableDubbo
static class ConsumerConfiguration {
}
}
public class PrintMethodCost {
public static void print() {
while (true) {
Map
<String
, HashMap
<Long, Long>> methodCount
= MethodCostTimeCount
.getMethodCount();
long currentTimeMillis
= System
.currentTimeMillis();
long oneMinuteBefore
= currentTimeMillis
- 60000;
for (Map
.Entry
<String
, HashMap
<Long, Long>> entry
: methodCount
.entrySet()) {
Iterator iterator
= entry
.getValue().entrySet().iterator();
while (iterator
.hasNext()) {
Map
.Entry
<Long, Long> key
= (Map
.Entry
<Long, Long>) iterator
.next();
if (key
.getKey() < oneMinuteBefore
) {
iterator
.remove();
}
}
HashMap
<Long, Long> oneMinuteMap
= entry
.getValue();
List
<Map
.Entry
<Long, Long>> list
= new ArrayList<>(oneMinuteMap
.entrySet());
Collections
.sort(list
, Comparator
.comparing(Map
.Entry
::getValue
));
int index90
= (int) (list
.size() * 0.9);
int index99
= (int) (list
.size() * 0.99);
Long tp90Time
= list
.get(index90
).getKey();
Long tp90Cost
= list
.get(index90
).getValue();
System
.out
.println("T90方法名="+entry
.getKey() + "毫秒数=" + tp90Cost
+ " ms,方法执行时间 : " + tp90Time
);
Long tp99Time
= list
.get(index99
).getKey();
Long tp99Cost
= list
.get(index99
).getValue();
System
.out
.println("T99方法名="+entry
.getKey() + "毫秒数=" + tp99Cost
+ " ms,方法执行时间 : " + tp99Time
);
}
try {
TimeUnit
.SECONDS
.sleep(5);
} catch (InterruptedException e
) {
System
.out
.println("Thread sleep exception!");
}
System
.out
.println("==============================");
}
}
}
@Component
public class ConsumerComponent {
@Reference
private HelloService helloService
;
private volatile Integer num
=0;
private ThreadPoolExecutor threadPoolExecutor
= new ThreadPoolExecutor(8,8,100, TimeUnit
.MILLISECONDS
,new ArrayBlockingQueue<>(1000));
public void consumer() throws InterruptedException
{
Runnable runnableA
= new Thread(() -> helloService
.sayHello1());
Runnable runnableB
= new Thread(() -> helloService
.sayHello2());
Runnable runnableC
= new Thread(() -> helloService
.sayHello3());
while (true){
TimeUnit
.MILLISECONDS
.sleep(70);
try{
threadPoolExecutor
.execute(runnableA
);
threadPoolExecutor
.execute(runnableB
);
threadPoolExecutor
.execute(runnableC
);
} catch (RejectedExecutionException e
){
try {
TimeUnit
.SECONDS
.sleep(1);
} catch (InterruptedException ex
) {
System
.out
.println("Thread sleep exception!");
}
}
}
}
}
过滤器
public class MethodCostTimeCount {
public static Map
<String
, HashMap
<Long, Long>> methodCount
= new ConcurrentHashMap<>();
public static void put(String method
, Long time
, Long cost
){
HashMap
<Long, Long> hashMap
= methodCount
.get(method
);
if(hashMap
== null
){
hashMap
= new HashMap<>();
methodCount
.put(method
, hashMap
);
}
hashMap
.put(time
, cost
);
}
public static Map
<String
, HashMap
<Long, Long>> getMethodCount() {
return methodCount
;
}
}
@Activate(group
= {CommonConstants
.CONSUMER
})
public class TPMonitorFilter implements Filter {
@Override
public Result
invoke(Invoker
<?> invoker
, Invocation invocation
) throws RpcException
{
String methodName
= invocation
.getMethodName();
long startTime
= System
.currentTimeMillis();
try {
return invoker
.invoke(invocation
);
} finally {
long endTime
= System
.currentTimeMillis();
long costTime
= endTime
- startTime
;
MethodCostTimeCount
.put(methodName
, endTime
, costTime
);
System
.out
.println("invoke time:"+costTime
+ "毫秒");
}
}
}
附录
1.spring注解开发AnnotationConfigApplicationContext的使用
使用AnnotationConfigApplicationContext可以实现基于Java的配置类(包括各种注解)加载Spring的应用上下文。避免使用application.xml进行配置。相比XML配置,更加便捷。
2.spring的上下文
Spring有两个核心接口:BeanFactory和ApplicationContext,其中ApplicationContext是BeanFactory的子接口。他们都可代表Spring容器,Spring容器是生成Bean实例的工厂,并且管理容器中的Bean。
Spring容器最基本的接口就是BeanFactor。BeanFactory负责配置、创建、管理Bean,
他有一个子接口:ApplicationContext,因此也称之为Spring上下文。
Spring容器负责管理Bean与Bean之间的依赖关系。
https://www.cnblogs.com/chenssy/archive/2012/11/15/2772287.html
3.java8的Stream流编程
Stream 是 Java8 中处理集合的关键抽象概念
中间操作
filter:过滤流中的某些元素 limit(n):获取n个元素 skip(n):跳过n元素,配合limit(n)可实现分页 distinct:通过流中元素的 hashCode() 和 equals() 去除重复元素
Stream
<Integer> stream
= Stream
.of(6, 4, 6, 7, 3, 9, 8, 10, 12, 14, 14);
Stream
<Integer> newStream
= stream
.filter(s
-> s
> 5)
.distinct()
.skip(2)
.limit(2);
newStream
.forEach(System
.out
::println
);
https://blog.csdn.net/y_k_y/article/details/84633001
4.Thread.sleep()和TimeUnit.SECONDS.sleep()的区别与联系
TimeUnit对Thread.sleep方法的包装,实现是一样的,只是多了时间单位转换和验证
源码底层
public void sleep(long timeout
) throws InterruptedException
{
if (timeout
> 0) {
long ms
= toMillis(timeout
);
int ns
= excessNanos(timeout
, ms
);
Thread
.sleep(ms
, ns
);
}
}
5.Future是什么?使用?
https://www.cnblogs.com/cz123/p/7693064.html
https://www.jianshu.com/p/b8952f07ee5d
6.TP90、TP99耗时监控设计与实现
https://blog.csdn.net/u012472945/article/details/105611155
TP50:指在一个时间段内(如5分钟),统计该方法每次调用所消耗的时间,并将这些时间按从小到大的顺序进行排序,取第50%的那个值作为TP50的值;配置此监控指标对应的报警阀值后,需要保证在这个时间段内该方法所有调用的消耗时间至少有50%的值要小于此阀值,否则系统将会报警
7.粘包与沾包
8.线程池OOM
https://mp.weixin.qq.com/s/1ZRkW4ND8y1KqRSFY_7_-A
作业
dubbo定义拦截器获取 :Ip获取白名单 :https://www.jianshu.com/p/98d68d57f62a
单词
manual 手动的
Extension 拓展
retention 保留
Policy 政策
exhaust 废弃,疲惫
excess超过
convenience 方便,适宜
Scheduled 安排
business 商业,业务
cluster 群,聚集
protocol 协议
attachment附件