原来微服务之间负载均衡 调用需要通过 restTemplate + Ribbon 实现.feign 实现了创建一个被调用方 接口的同样格式的接口,再加上feign的注解就可以实现微服务之间的调用.类似于 mybatis 的 @mapper 注解
feign使用在消费端
需要注意的是,在接口上需要添加@FeignClient 注解,里面填写的是 服务提供方在 EUREKA 里面注册的名称
直接在使用的地方自动注入即可tips: 1. get请求参数的时候,参数一定需要加上@RequestParam 注解,和入参一致
feign 客户端默认等待时间1秒钟,但是有的时候一些业务逻辑使得生产者超过了一秒钟,就需要设置feign的超时时间
ribbon: # 等待读取的最大时间 ReadTimeout: 5000 # 建立链接可以的最大时间 ConnectTimeout: 5000可能在工作中需要知道请求的详细信息,feign支持打印http信息
配置一个bean 设置打印级别,直接设置全部就好了 @Bean Logger.Level feignLevel(){ return Logger.Level.FULL; } 配置文件中指定那个feign-client 的打印级别是多少 logging: level: com.zc.feign.ProviderClient: debug思路.总体方向肯定是feign扫描到所有 @FeignClient 注解标记的接口创建动态代理 生成实体类存入 Spring容器中. 和其他功能一样,想要使用Feign功能需要在启动类上引入 @EnableFeignClients注解.所以一定是这个注解 引入了配置项做的事情.
@Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) @Documented @Import({FeignClientsRegistrar.class}) public @interface EnableFeignClients { }可以看到引入了一个FeignClientsRegistrar 类,我们再来看这个类是干嘛的 实现了 ImportBeanDefinitionRegistrar 接口.该接口的含义是可以自定义导入组件到容器中.
我们来分析一下这个方法中实现了哪些事情
@Override public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) { // 判断有没有 EnableFeignClients 注解,如果有的话完成一些配置信息 registerDefaultConfiguration(metadata, registry); // 重点,从名字都可以看出这里注册了feign的客户端 registerFeignClients(metadata, registry); }删了很多乱七八糟的东西,看源码就是看个大概思路.我们可以看到在 registerFeignClients 方法中 扫描了资源下的所有 FeignClient 注解的类然后遍历调用了 registerFeignClient 方法 扫描注解的包名在@EnableFeignClients中指定,和springboot注解不同
ClassPathScanningCandidateComponentProvider scanner = getScanner(); scanner.setResourceLoader(this.resourceLoader); Set<String> basePackages; Map<String, Object> attrs = metadata .getAnnotationAttributes(EnableFeignClients.class.getName()); AnnotationTypeFilter annotationTypeFilter = new AnnotationTypeFilter( FeignClient.class); basePackages = getBasePackages(metadata); for (String basePackage : basePackages) { Set<BeanDefinition> candidateComponents = scanner .findCandidateComponents(basePackage); for (BeanDefinition candidateComponent : candidateComponents) { Map<String, Object> attributes = annotationMetadata .getAnnotationAttributes( FeignClient.class.getCanonicalName()); String name = getClientName(attributes); registerClientConfiguration(registry, name, attributes.get("configuration")); registerFeignClient(registry, annotationMetadata, attributes); } } }最后我们来看单个被标记了 @FeignClient 的类会怎么处理
private void registerFeignClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata, Map<String, Object> attributes) { String className = annotationMetadata.getClassName(); // 基于FeignClientFactoryBean 创建了一个definition,然后把上面的配置信息导入进来创建实体到容器中 BeanDefinitionBuilder definition = BeanDefinitionBuilder .genericBeanDefinition(FeignClientFactoryBean.class); definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE); String alias = contextId + "FeignClient"; AbstractBeanDefinition beanDefinition = definition.getBeanDefinition(); BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className, new String[] { alias }); BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry); }这个方法大体的意思就是根据一个 beanfactory 创建了一个 definition,然后把标记了@feignclient 的类的信息存到 definition中然后在通过工具类注入到容器中. 所以我们重点需要看 FeignClientFactoryBean 这个工厂做了什么 众所周知, FactoryBean 实现了object方法,返回的类就会被放入容器中.于是看看 Loadbalance 做了什么 可以看到spring创建了一个 LoadBalancerFeignClient 最后也是基于这个类创建的动态代理
默认的调用处理器 FeignInvocationHandler是一个相对简单的类,有一个非常重要Map类型成员 dispatch 映射,保存着远程接口方法到MethodHandler方法处理器的映射。
容器中保存这方法和 MethodHandler 的映射关系. feign 默认使用 SynchronousMethodHandler 该接口就是发送http请求然后返回内容
public Object invoke(Object[] argv) throws Throwable { RequestTemplate template = buildTemplateFromArgs.create(argv); Options options = findOptions(argv); Retryer retryer = this.retryer.clone(); while (true) { try { return executeAndDecode(template, options); } catch (RetryableException e) { try { retryer.continueOrPropagate(e); } catch (RetryableException th) { Throwable cause = th.getCause(); if (propagationPolicy == UNWRAP && cause != null) { throw cause; } else { throw th; } } if (logLevel != Logger.Level.NONE) { logger.logRetry(metadata.configKey(), logLevel); } continue; } } }上面是 重试机制的逻辑 真正发送请求的是 executeAndDecode 方法
Object executeAndDecode(RequestTemplate template, Options options) throws Throwable { // 省略 Request request = targetRequest(template); response = client.execute(request, options); }其中的 client 就是 LoadBalancerFeignClient
@Override public Response execute(Request request, Request.Options options) throws IOException { try { URI asUri = URI.create(request.url()); String clientName = asUri.getHost(); URI uriWithoutHost = cleanUrl(request.url(), clientName); FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest( this.delegate, request, uriWithoutHost); IClientConfig requestConfig = getClientConfig(options, clientName); return lbClient(clientName) .executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse(); } catch (ClientException e) { IOException io = findIOException(e); if (io != null) { throw io; } throw new RuntimeException(e); } }上面的源码中创建了 FeignLoadBalancer 来执行 executeWithLoadBalancer 方法
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig); try { return command.submit( new ServerOperation<T>() { @Override public Observable<T> call(Server server) { URI finalUri = reconstructURIWithServer(server, request.getUri()); S requestForServer = (S) request.replaceUri(finalUri); try { return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)); } catch (Exception e) { return Observable.error(e); } } }) .toBlocking() .single(); } catch (Exception e) { Throwable t = e.getCause(); if (t instanceof ClientException) { throw (ClientException) t; } else { throw new ClientException(e); } } }LoadBalancerCommand 中有一个 selectServer 方法,可以知道是用来选择服务器的.
private Observable<Server> selectServer() { return Observable.create(new OnSubscribe<Server>() { @Override public void call(Subscriber<? super Server> next) { try { Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey); next.onNext(server); next.onCompleted(); } catch (Exception e) { next.onError(e); } } }); }在 getServerFromLoadBalancer 方法中,我们看到了熟悉的 ILoadBalancer ,于是就回到了 Ribbon 的负载均衡选择服务器.至此,openfeign 的负载均衡请求原理就通了
总体流程就是
扫描所有被@FeignClient注解的类循环为每个类创建一个FeignClientFactoryBean工厂类FeignClientFactoryBean 类中基于 LoadBalancerFeignClient 创建了一个动态代理 存入容器中在执行方法的时候 创建了 FeignLoadBalancer 类 里面又创建了 LoadBalancerCommand 在 该类的 selectServer 方法中 通过 LoadBalancerContext 里面用 ILoadBalancer 负载均衡得到服务器.SynchronousMethodHandler 中的重试策略 只是对于 feign 之间调用超时支持,并不能实现方法内部如果报错去另一个服务去尝试.
思路: feign 默认的重试只能支持 RetryableException 而且 自己在while true 里面循环,做不到 判断请求之后下次请求就剔除该服务器. 所以我的思路是, 通过 AOP feignclient 代理请求,并且自定义 负载均衡器 IRule 在选择了服务器之后,把服务器信息存入ThreadLocal 中,并且 提供自定义 ErrorDecoder 判断如果服务器响应体内返回的是500 则 抛出自定义异常,并且在 ThreadLocal 中把 IRule 传过来的当前服务器信息加入该次请求的错误服务器列表中. 在AOP 中捕获异常判断如果是自定义异常则继续重试. 之后的重试 IRule 会取出不再错误服务列表中的服务.