springcloud 微服务组件 Hystrix 之源码分析(十五)

tech2022-07-09  188

今天我们分析Hystrix 的源码:废话少说,从jar包开始:

 

 

1、点击进入 HystrixCircuitBreakerConfiguration  创建 hystrixCommand 注解的切面类

 2、找到HystrixCommand  注解,发现对应业务接口上的注解,说明这个业务类会生成代理

客户端调用主要走这个方法:

3、点击CommandExecutor进入:

先看同步方法:

进入:HystrixCommand 核心类,后面的fegin和zuul 组件都用到这个类,调用同步阻塞方法

点击query方法:

点击toObservable方法进入 AbstractCommand 里面有多个匿名内部类,属于Rsjava 代码写法风格

点击 applyHystrixSemantics 调用匿名类 里面的call方法

进入发现allRequest()方法 是是否允许请求正常的业务方法,此为熔断的功能,比如:

 

4、进入:熔断器是否开启(配置)、是否关闭(配置)、没有开启、允许单个请求; 点击isOpen(),最大请求数、错误百分比、原子操作、记录时间; 点击allowSingleTest(),单个测试请求 对比时间

返回刚才源代码: private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // mark that we're starting execution on the ExecutionHook // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent executionHook.onStart(_cmd); /* determine if we're allowed to execute */ if (circuitBreaker.allowRequest()) { final TryableSemaphore executionSemaphore = getExecutionSemaphore();//信号量 final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } }; final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; if (executionSemaphore.tryAcquire()) {//获取请求 try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd)//信号量执行的主方法 .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback();//信号量降级方法 } } else { return handleShortCircuitViaFallback();//降级方法 } }

 5、点击 getExecutionSemaphore  找到信号量配置,和开发代码对比

 

信号量的值是否达到最大

点击 executeCommandAndObserve 方法:里面有超时时间设置

点击executeCommandWithSpecifiedIsolation 线程池策略:

 ctrl+t 对这个方法  getExecutionObservable,找到核心钩子方法

 

 

 5、进入反射调用的方法:

 这条核心路执行完了,信号量和线程池策略实现思路大同小异。

点击wrapWithAllOnNextHooks 调用匿名类里面的call方法:

3、线程池的创建及超时控制

 

 

这里创建了线程其他的,比如熔断器开启,线程池,信号量都满了,则会走到降级方法

这里也是会反射调用到 fallback 方法,fallback 降级方法也是有信号量和线程池的大小控制 的,也就是信号量或线程池是多少大小,fallback 降级方法也会接收多少降级的请求

Hystrix 源码分析结束,明天我们分析Fegin的源码,敬请期待。

最新回复(0)