Spring Cloud H版

tech2023-12-01  34

openFeign 日志

日志级别

NONE: 默认的,不显示任何日志

BASIC: 仅记录请求方法、URL、响应状态码及执行时间

HEADERS: 除了BASIC中定义的信息之外,还有请求和响应头信息

FULL: 除了HEADERS中定义信息之外,还有请求和响应的正文及原数据

Feign调用服务的默认时长是1秒钟,也就是如果超过1秒没连接上或者超过1秒没响应,那么会相应的报错。而实际情况是因为业务的不同可能出现超出1秒的情况,这时我们需要调整超时时间。本文来看下怎么去设置。

全局配置

Feign 的负载均衡底层用的就是 Ribbon   在application.properties中添加如下配置,超过5秒没连接上报连接超时,如果超过5秒没有响应,报请求超时

# 设置 ribbon: #建立连接后从服务器读取到可用资源所用的最大超时时间 ReadTimeout: 5000 #指的是建立连接所用的最大超时时间 ConnectTimeout: 5000

Hystrix

hystrix对应的中文名字是“豪猪”,豪猪周身长满了刺,能保护自己不受天敌的伤害,代表了一种防御机制,这与hystrix本身的功能不谋而合,因此Netflix团队将该框架命名为Hystrix,并使用了对应的卡通形象做作为logo。

Hystrix是一个用于处理分布式系统的延迟和容错的开源库,在分布式系统里,许多依赖不可避免的会调用失败,比如超时、异常等。

Hystrix能够保证在一个依赖出问题的情况下,不会导致整体服务失败,避免级联故障,以提高分布式系统的弹性

“断路器” 本身是一种开关装置,当某个服务单元发生故障之后,通过断路器的故障监控(类似熔断器保险丝),向调用方返回一个符合预期的、可处理的备选响应(FallBack),而不是长时间的等待或者抛出调用方无法处理的异常,这样就保证了服务调用方的线程不会长时间、不必要地占用,从而避免了发生故障在分布式系统中的蔓延,乃至雪崩。

1. 服务降级

服务器忙,请稍后再试,不让客户端等待 并立刻返回一个友好提示,fallback

哪些情况会触发降级?

程序运行异常超时服务熔断触发服务降级线程池/信号量打满也会导致服务降级

服务降级可以在服务提供方 也可以在消费方 ,据需求而定,一般在提供方做服务降级

方法上指定服务降价 (vip服务 一对一) @GetMapping("/consumer/payment/hystrix/timeout/{id}") @HystrixCommand(fallbackMethod = "paymentInfo_TimeOutHandler", commandProperties = { @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "1000") }) String paymentInfo_TimeOut(@PathVariable("id") Integer id) { return hystrixService.paymentInfo_TimeOut(id); } String paymentInfo_TimeOutHandler(@PathVariable("id") Integer id) { return "我是消费方80,对方支付系统繁忙,请稍后再试 o(╯□╰)o"; } 类上指定默认的服务降级处理方法 @DefaultProperties(defaultFallback = "payment_Global_FallbackMethod") // controller层来控制指定默认的降级 public class OrderHystrixController { @Resource private PaymentHystrixService hystrixService; ... ... ... // 全局fallback方法, 在类上通过@DefaultProperties注解指定 public String payment_Global_FallbackMethod() { return "Global异常处理信息,请稍后再试 o(╥﹏╥)o"; } } feign调用的接口类里 在指定的服务提供者上 指定服务降级的类 (该接口的实现类) @Component @FeignClient(value = "CLOUD-PROVIDER-HYSTRIX-PAYMENT", fallback = PaymentFallbackService.class) //指定服务出问题时,降级处理的类(实现类) public interface PaymentHystrixService { @GetMapping("/payment/hystrix/ok/{id}") String paymentInfo_OK(@PathVariable("id") Integer id); @GetMapping("/payment/hystrix/timeout/{id}") String paymentInfo_TimeOut(@PathVariable("id") Integer id); } @Component public class PaymentFallbackService implements PaymentHystrixService { @Override public String paymentInfo_OK(Integer id) { return "----PaymentFallbackService fall back paymentInfo_OK , o(╥﹏╥)o"; } @Override public String paymentInfo_TimeOut(Integer id) { return "----PaymentFallbackService fall back paymentInfo_TimeOut , o(╥﹏╥)o"; } }

2. 服务熔断

类比保险丝达到最大服务访问后,直接拒绝访问,拉闸限电,然后调用服务降级的方法并返回友好提示

服务的降级–>进而熔断–>恢复调用链路

熔断机制

​ 熔断机制是应对雪崩效应的一种微服务链路保护机制。当扇出链路的某个微服务出错不可用或者响应时间太长时,会进行服务降级,进而熔断该节点微服务的调用,快速返回错误的响应信息。 当检测掉该节点微服务调用响应正常后,恢复调用链路。

在微服务中使用Hystrix 作为断路器时,通常涉及到一下三个重要的指标参数(这里是写在@HystrixProperties注解中,当然实际项目中可以全局配置在yml或properties中)

// ** 服务熔断 ** @HystrixCommand(fallbackMethod = "paymentCircuitBreaker_fallback", commandProperties = { // 以下配置解读 在时间窗口期内(10s内)10次请求中 有6次(60%)都是失败 @HystrixProperty(name = "circuitBreaker.enabled", value = "true"), // 是否开启断路器 @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "10"), // 请求次数 @HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds", value = "10000"), // 请求窗口期 @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "60") // 失败率阈值 }) public String paymentCircuitBreaker(@PathVariable("id") Integer id) { if (id < 0) { throw new RuntimeException("*** id 不能为负数"); } String serialNumber = IdUtil.simpleUUID(); return Thread.currentThread().getName() + "\t 调用成功,流水号: " + serialNumber; } public String paymentCircuitBreaker_fallback(@PathVariable("id") Integer id) { return "id 不能为负数,请稍后再试, o(╥﹏╥)o , id : " + id; }

1.circuitBreaker.sleepWindowInMilliseconds

断路器的快照时间窗,也叫做窗口期。可以理解为一个触发断路器的周期时间值,默认为10秒(10000)。

2.circuitBreaker.requestVolumeThreshold

断路器的窗口期内触发断路的请求阈值,默认为20。换句话说,假如某个窗口期内的请求总数都不到该配置值,那么断路器连发生的资格都没有。断路器在该窗口期内将不会被打开。

3.circuitBreaker.errorThresholdPercentage

断路器的窗口期内能够容忍的错误百分比阈值,默认为50(也就是说默认容忍50%的错误率)。打个比方,假如一个窗口期内,发生了100次服务请求,其中50次出现了错误。在这样的情况下,断路器将会被打开。在该窗口期结束之前,即使第51次请求没有发生异常,也将被执行fallback逻辑。


综上所述,在以上三个参数缺省的情况下,Hystrix断路器触发的默认策略为:

在10秒内,发生20次以上的请求时,假如错误率达到50%以上,则断路器将被打开。(当一个窗口期过去的时候,断路器将变成半开(HALF-OPEN)状态,如果这时候发生的请求正常,则关闭,否则又打开)

断路器被触发打开时,窗口期还没过去,即便现在是正常请求,也返回的是 fallback. 即服务当前处于服务熔断状态

3. 服务限流

秒杀等高并发操作,严禁一窝蜂的过来拥挤,大家排队,一秒钟N个,有序进行

Hystrix Dashboard

​ 除了隔离依赖服务的调用以外,Hystrix还提供了 准实时的调用监控(Hystrix Dashboard),Hystrix会持续地记录所有通过Hystrix发起的请求的执行信息,并以统计报表和图形的形式展示给用户,包括没秒执行多少请求多少成功,多少失败等。Netflix通过 hystrix-metrics-event-stream 项目实现了对以上指标的监控。Spring Cloud也提供了Hystrix Dashboard的整合,对监控内容转化为可视化界面

坑点,被监控的服务端必须注册一个Bean

@Bean public ServletRegistrationBean setServlet() { HystrixMetricsStreamServlet streamServlet = new HystrixMetricsStreamServlet(); ServletRegistrationBean registrationBean = new ServletRegistrationBean(streamServlet); registrationBean.setLoadOnStartup(1); registrationBean.addUrlMappings("/hystrix.stream"); registrationBean.setName("HystrixMetricsStreamServlet"); return registrationBean; }

Spring Cloud Gateway

​ Spring Cloud Gateway 是 Spring Cloud 新推出的网关框架,之前是 Netflix Zuul。网关通常在项目中为了简化

前端的调用逻辑,同时也简化内部服务之间互相调用的复杂度;具体作用就是转发服务,接收并转发所有内外

部的客户端调用;

能干嘛?

反向代理鉴权流量监控熔断日志监控…

微服务架构中网关的位置 :

Gateway 是基于 异步非阻塞模型上进行开发的,性能方面不需要担心

具有如下特性:

基于Spring Framework5,Project Reactor 和 Springboot 2.0 进行构建动态路由:能够匹配任何请求属性可以对路由指定 Predicate (断言) 和 Filter (过滤器)集成Hystrix的断路器功能集成Spring Cloud服务发现功能易于编写的 Predicate 和 Filter请求限流功能支持路径重写

Spring Cloud Gateway 与 Zuul的区别:

在spring cloud Finchley 正式版之前,Spring Cloud 推荐的网关是 Netflix 提供的 Zuul

Zuul 1.x 是一个基于阻塞 I/O 的 API GatewayZuul 1.x 基于Servlet 2.5使用阻塞架构 ,它不支持任何长连接(如 WebSocket),Zuul的设计模式和Nginx较为想象,每次I/O操作都是从工作线程中选择一个执行,请求线程被阻塞到工作线程完成,但是差别是Nginx采用C++实现,Zuul是用Java实现,而JVM本身会有第一次加载较慢的情况,使得Zuul的性能相对较差Zuul 2.x 理念更先进,想基于Netty非阻塞和支持长连接,但Spring Cloud目前没有整合。Zuul 2.x 的性能较 Zuul 1.x 有较大的提升。在性能方面,根据官方提示的基准测试 Spring Cloud Gateway 的 RPS (每秒请求数)是 Zuul 的1.6倍。Spring Cloud Gateway 建立在 Spring Framework 5、Project Reactor和Spring Boot 2 之上,使用非阻塞APISpring Cloud Gateway 还支持WebSocket,并且与Spring紧密集成拥有更好的开发体验

三个核心概念

路由(Route) :路由是构建网关的基本模块,它由ID,目标URI,一系列的断言和过滤器组成,如果断言为true则匹配该路由断言(Predicate) :可以匹配HTTP请求中的所有内容(请求头或者请求参数),如果请求与断言相匹配则进行路由过滤(Filter) :指的是Spring框架中GatewayFilter的实例,使用过滤器,可以在请求被路由前或者之后对请求进行修改

总结:

​ web请求,通过一些匹配条件,定位到真正的服务节点。并在这个转发过程的前后,进行一些精细化控制。

predicate就是我们匹配的条件;而filter,就可以理解为一个无所不能的拦截器。

有了这两个元素,再加上目标uri,就可以实现一个具体的路由了

官网总结:

​ 客户端向Spring Cloud Gateway发出请求。然后在Gateway Handler Mapping中找到与请求相匹配的路由,将其发送到Gateway Web Handler。

​ Handler再通过指定的过滤器链来将请求发送到我们实际的服务执行业务逻辑,然后返回。

过滤器之间用虚线分开是因为过滤器可能会在发送代理之前(Pre)或者之后(Post)执行业务逻辑

网关微服务注意点

​ 不要引入web依赖,即不要在pom里面加上下面这个依赖

<dependency>--> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>

路由配置方式一:配置文件

spring: application: name: cloud-gateway cloud: gateway: routes: - id: payment_routh # 路由的ID,没有固定规则 按要求唯一,建议配合服务名使用 uri: http://localhost:8001 # 匹配后提供服务的路由地址 predicates: - Path=/payment/get/** # 断言,路径相匹配的进行路由 - id: payment_routh2 uri: http://localhost:8001 predicates: - Path=/payment/lb/**

路由配置方式二:配置类

@Configuration public class GatewayConfig { @Bean public RouteLocator customRouteLocator(RouteLocatorBuilder routeLocatorBuilder) { RouteLocatorBuilder.Builder routes = routeLocatorBuilder.routes(); return routes.route("path_route_bd", r -> r.path("/guonei").uri("http://news.baidu.com/guonei")).build(); } }

我们发现网关微服务启动的时候打印了这个日志

之前 predicates 中配置的 - Path 其实就是对应的这里面的一种

Route Predicate Factories 是什么?

​ Spring Cloud Gateway将路由匹配作为Spring WebFlux HandlerMapping基础架构的一部分。

​ Spring Cloud Gateway包括许多内置的Route Predicate工厂。

​ 所有这些Predicate都与Http请求的不同属性匹配。多个Route Predicate工厂可以进行组合

​ Spring Cloud Gateway创建Route对象时,使用RoutePredicateFactory创建Predicate对象,Prdicate对象可以赋值给Route.

​ Spring Cloud Gateway包含许多内置的Route Predicate Factories。

​ 所有这些谓词都匹配HTTP请求的不同属性。多种谓词工厂可以组合,并通过逻辑and。

项目小问题 :

​ 启动项目时候,端口被占用。

​ 查看当前端口被是否被占用,被哪个进程占用,如查看8001是否被占用: lsof -i :8001

​ 结束这个进程:sudo kill -9 44131 这个44131就是上面命令查出来的进程id

jinchengming@MacBook-Pro ~ % lsof -i :8001 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 44131 jinchengming 173u IPv6 0x2160b173638ae111 0t0 TCP *:vcom-tunnel (LISTEN) jinchengming@MacBook-Pro ~ % sudo kill -9 44131

常用断言配置:

uri: lb://CLOUD-PAYMENT-SERVICE predicates: - Path=/payment/lb/** - After=2020-08-12T10:25:00.668+08:00[Asia/Shanghai] # 表示必须满足在这个时间之后 这个格式的时间可用ZonedDateTime来生成 # - Between=2020-08-10T10:25:00.668+08:00[Asia/Shanghai],2020-08-20T10:25:00.668+08:00[Asia/Shanghai] # 时间段内通过 # - Cookie=username,chengming #请求必须带这个cookie: "username=chengming" # - Header=X-Request-Id,\d+

过滤器配置:

过滤器配置也可以向断言一样在yml配置文件中配置,分为全局的和定制的配置

一般网关中我们使用java配置类通过实现两个接口来做全局配置,如下面这段代码就是一个简单的全局日志过滤器

@Component @Slf4j public class MyLogGatewayFilter implements GlobalFilter, Ordered { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("*********** come in MyLogGatewayFilter : " + new Date()); String uname = exchange.getRequest().getQueryParams().getFirst("uname"); if (null == uname) { log.info("*********** 用户名为null ,非法用户,o(╥﹏╥)o"); exchange.getResponse().setStatusCode(HttpStatus.NOT_ACCEPTABLE); return exchange.getResponse().setComplete(); } return chain.filter(exchange); } @Override public int getOrder() { return 0; // 顺序 数字越小,优先级越高 } }

curl测试网关接口

curl http://localhost:9527/payment/lb --cookie "username=chengming" # 带cookie curl http://localhost:9527/payment/lb -H "X-Request-Id:12" # 带请求头

Spring Cloud Config

​ Spring Cloud Config 为微服务架构中的微服务提供集中化的外部配置支持,配置服务器为各个不同的微服务应用的所有环境提供一个中心化的外部配置。

​ Spring Cloud Config分为服务端和客户端两部分。

服务端也称为分布式配置中心,他是一个独立的微服务应用,用来连接配置服务器并为客户端提供获取配置信息,加密/解密信息等访问接口

客户端则是通过指定的配置中心来管理应用资源,以及与业务相关的配置内容,并在启动的时候从配置中心获取和加载配置信息,配置服务器默认采用git来存储配置信息,这样有助于对环境配置进行版本管理,并且可以通过git客户端工具来方便的管理和访问配置内容。

能干嘛?

集中管理配置文件不同环境不同配置,动态化的配置更新,分环境部署比如dev/test/prod/beta/release运行期间动态调整配置,不再需要在每个服务部署的机器上编写配置文件,服务会向配置中心统一拉取配置自己的信息当配置发生变动时,服务不需要重启即可感知到配置的变化并应用新的配置将配置信息以REST接口的形式暴露

bootstrap.yml

与application.yml相比,application.yml是用户级的资源配置项,bootstrap.yml是系统级的,优先级更高

Spring Cloud会创建一个**“Bootstrap Context”,作为Spring应用的Application Context的父上下文**。初始化的时候,“Bootstrap Context”负责从外部源加载配置属性并解析配置。这两个上下文共享一个从外部获取的Environment。

Bootstrap 属性有高优先级,默认情况下,它们不会被本地配置覆盖 。Bootstrap context 和 Application Context 有着不同的约定,所以新增了一个 bootstrap.yml文件,保证 Bootstrap Context 和 Application Context 配置分离。

配置中心的配置

server: port: 3344 spring: application: name: cloud-config-center cloud: config: server: git: uri: https://github.com/jinchengming/springcloud-config.git search-paths: - springcloud-config # 读取分支 label: master eureka: client: service-url: defaultZone: http://eureka7001.com:7001/eureka

主启动类需要加上开启配置中心(服务端)的注解

@SpringBootApplication @EnableConfigServer public class ConfigCenterMain { public static void main(String[] args) { SpringApplication.run(ConfigCenterMain.class, args); } }

客户端配置文件 bootstrap.yml

server: port: 3355 spring: application: name: config-client cloud: config: label: master name: config profile: dev # 这三个组合起来就能找到github上的文件 uri: http://localhost:3344 # 配置中心地址 eureka: client: service-url: defaultZone: http://localhost:7001/eureka # 暴露监控端点 management: endpoints: web: exposure: include: "*"

客户端启动类

@SpringBootApplication @EnableEurekaClient public class ConfigClientMain3355 { public static void main(String[] args) { SpringApplication.run(ConfigClientMain3355.class, args); } }

向外暴露的restful接口

@RestController @RefreshScope public class ConfigClientController { @Value("${config.info}") private String configInfo; @GetMapping("/configInfo") public String getConfigInfo() { return configInfo; } }

总结: 因为我们配置中心微服务3344是直接获取的github上的文件,所以请求3344的地址 (http://localhost:3344/master/config-dev.yml) 是可以获取github上实时更新的文件内容

但是我们的客户端3355是通过服务端3344来获取远程的配置文件,测试发现客户端必须重启才能获取github上更新后的文件内容,所以配置文件中家里暴露监控配置,rest接口类 加了 @RefreshScope注解来让客户端可刷新。

需要注意的是 必须通过调用POST 通知接口才能让客户端主动刷新

curl -X POST "http://localhost:3355/actuator/refresh"

Spring Cloud Bus

消息总线, Bus 支持两种消息代理:RabbitMQ 和 Kafka

Spring Cloud Bus 配合 Spring Cloud Config 使用可以实现配置的动态刷新

Spring Cloud Bus 是用来将分布式系统的节点与轻量级消息系统链接起来的框架

整合了 Java的时间处理机制和消息中间件的功能。 目前只支持 RabbitMQ和Kafka。

Spring Cloud Bus 能管理和传播分布式系统间的消息,就像一个分布式执行器,可用于广播状态更改、事件推送等,也可以当做微服务间的通信通道

什么是总线?

​ 在微服务架构的系统中,通常会使用轻量级的消息代理来构建一个共用的消息主题,并让系统中所有的微服务实例都连接上来。由于该主题中产生的消息会被所有实例监听和消费,所以称它为消息总线。在总线上的各个实例,都可以方便地广播一些需要让其他连接在该主题上的实例都知道的消息。

基本原理

ConfigClient实例都监听MQ中同一个topic(默认是==springCloudBus==)。当一个服务刷新数据的时候,它会把这个信息放到Topic中,这样其它监听同一个Topic的服务就能得到通知,然后去更新自身的配置。

设计思想

利用消息总线触发一个客户端/bus/refresh,而刷新所有客户端的配置

利用消息总线触发一个服务端ConfigServer的/bus/refresh端点,而刷新所有客户端的配置

我们使用的是第二种思想,思想一不适合原因:

打破了微服务的职责单一性,因为微服务本身是业务模块,它本不应该承担配置刷新的职责。破坏了微服务各节点的对等性有一定的局限性。例如微服务在迁移时,它的网络地址常常发生变化,此时如果想要做到自动刷新,那就会增加更多的修改

为了测试 新加一个和3355 相同的微服务 3366

然后根据上面的思想,先动服务端 3344

改动一 : 引入新的依赖

<!-- 添加消息总线 mq 支持--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency>

改动二: 配置文件加上 rabbitmq的配置

spring: rabbitmq: host: localhost port: 5672 username: guest password: guest # rabbitmq 相关配置,暴露bus刷新配置的端点 management: endpoints: # 暴露bus刷新配置的端点 web: exposure: include: 'bus-refresh'

客户端修改

改动一:添加消息依赖 spring-cloud-starter-bus-amqp 和服务端相同

改动二:配置文件加上rabbitmq配置

spring: rabbitmq: host: localhost port: 5672 username: guest password: guest

测试 : 这时候我们不再需要对每个客户端微服务用POST请求通知,只需要对服务端3344发POST即可

curl -X POST "http://localhost:3344/actuator/bus-refresh"

打开rabbitmq的控制台,也证明了上面所介绍的实现原理

定点通知

在post请求服务端时指定要通知的客户端,可以利用微服务名和端口号来实现定点通知

如只通知3355,不通知3366

curl -X POST "http://localhost:3344/actuator/bus-refresh/config-client:3355"

Spring Cloud Stream

即消息驱动

是什么?

​ 屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。可以类比 JDBC

官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架

应用程序通过inputs 或者 outputs 来与 Spring Cloud Stream 中binder对象交互

通过我们配置来binding(绑定),而Spring Cloud Stream 的binder对象负责与消息中间件交互。

所以,我们只需要搞清楚如何与Spring Cloud Stream 交互就可以方便使用消息驱动的方式

通常使用Spring Integration 来连接消息代理中间件以实现消息时间驱动。

Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念

目前仅支持 Rabbit 、Kafka。

stream凭什么可以统一底层差异?

​ 在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,他们的实现细节上会有较大的差异

通过定义绑定器作为中间层,完美地实现了 应用程序与消息中间件细节之间的隔离。

通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。

Spring Cloud Stream标准流程套路

Binder : 很方便的连接中间件,屏蔽差异Channel : 通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置Source/Sink : 简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

编码API和注解:

组成说明Middleware中间件,目前只支持RabbitMQ和KafkaBinderBinder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现@Input注解标识输入通道,通过该输入通道接收到的消息进入应用程序@Output注解标识输出通道,发布的消息将通过该通道离开应用程序@StreamListener监听队列,用于消费者的队列的消息接收@EnableBinding指信道channel和exchange绑到在一起

编码:新建三个模块

8801 : 作为生产者进行发消息8802、8803 : 作为消费者接收消息

生产者 8801

pom文件:

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> ... ...

配置文件:

server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此处配置要绑到的rabbitmq的服务消息 defaultRabbit: # 表示定义的名称,用于与binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关配置环境 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 output: # 表示是一个消息生产者 发送 destination: studyExchange # 表示要使用的exchange的名称定义 content-type: application/json binder: defaultRabbit # 设置要绑到的消息服务的具体设置 eureka: client: service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 lease-expiration-duration-in-seconds: 5 instance-id: send-8001.com prefer-ip-address: true

消息发送service

// 发送接口 public interface MsgProvider { void send(); } // 消息发送实现类 @EnableBinding(Source.class) // 定义消息的推送管道 public class MsgProviderImpl implements MsgProvider { @Resource private MessageChannel output; // 消息发送管道 @Override public void send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); System.out.println("******** " + serial); } }

提供测试接口

@RestController public class SendMessageController { @Resource private MsgProvider msgProvider; @GetMapping("/sendMessage") public String sendMessage() { msgProvider.send(); return "ok"; } }

消费者 8802

pom文件同生产者,主要是配置文件不同

在对stream的配置中 output 表示生产者,input表示消费者

server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑到的rabbitmq的服务消息 defaultRabbit: # 表示定义的名称,用于与binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关配置环境 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 input: # 表示是一个消息消费者 destination: studyExchange # 表示要使用的exchange的名称定义 content-type: application/json binder: defaultRabbit # 设置要绑到的消息服务的具体设置 eureka: client: service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 lease-expiration-duration-in-seconds: 5 instance-id: receive-8002.com prefer-ip-address: true

编写一个监听类,实时获取消息队列中的消息

@Component @EnableBinding(Sink.class) // 定义小心接收管道 public class ReceiveMessageListener { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message) { System.out.println(serverPort + " 接收到的消息:\t" + message.getPayload()); } }

**注意:**这里用到的Message相关的类都是springframework包下的

添加一个消费者 8803 (完全克隆8802 只是改了端口号,模拟客户端集群)

启动测试,8801send message之后,8802和8803同时收到所以消息,这种逻辑是不合理的,我们称之为 重复消费

如:一个订单系统生产订单 只能被一个支付系统完成支付,如果支付集群中的多个微服务都消费,则会造成多次扣款,数据错误

我们可以使用Stream 中的**消息分组**来解决。

**注意:**在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组可以重复消费。

进入rabbitmq管理页面,因为我们没有配置过分组,系统默认生产了两个分组,对应的是8802和8803两个消费微服务,所以造成了重复消费

修改配置文件,加上group配置(两个消费者都设置一样的分组)

bindings: # 服务的整合处理 input: # 表示是一个消息消费者 destination: studyExchange # 表示要使用的exchange的名称定义 content-type: application/json binder: defaultRabbit # 设置要绑到的消息服务的具体设置 group: streamStudy # 设置分组,解决重复消费

重启动后,再观察mq中的变化

**注意:**添加消息分组还可以解决消息持久化。即服务停机后,再次上线还是可以消费未被消费的消息

Spring Cloud Sleuth

在微服务框架中,一个由客户端发起的请求在后端系统中会经过多个不同的服务节点调用来协同产生最后的请求结果。每一个前段请求都会形成一条复杂的分布式服务调用链路,链路中的任何一环出现高延时或错误都会引起整个请求最后的失败。

Spring Cloud Sleuth提供了一套完整的服务跟踪的解决方案,在分布式系统中提供追踪解决方案并且兼容支持了zipkin

最新回复(0)