十一、springCloud之stream

tech2024-11-18  28

屏蔽底层消息中间的操作

新建消息提供者:

<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.smy.springcloud</groupId> <artifactId>common-api</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies>

yml配置:

server: port: 8801 spring: application: name: STREAM-RABBITMQ-PROVIDER cloud: stream: binders: defaultRabbit: type: rabbit environment: spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest bindings: output: destination: studyExchange content-type: application/json binder: defaultRabbit eureka: client: register-with-eureka: true fetch-registry: true service-url: defaultZone: http://peer1:7001/eureka,http://peer2:7002/eureka instance: instance-id: stream-rabbitmq-8801 prefer-ip-address: true lease-renewal-interval-in-seconds: 1 lease-expiration-duration-in-seconds: 2

主类:

@SpringBootApplication @EnableEurekaClient public class StreamRabbutmqMain8801 { public static void main(String[] args) { SpringApplication.run(StreamRabbutmqMain8801.class,args); } }

具体的提供逻辑:注意包的引入

package com.xx.service.impl; import cn.hutool.core.lang.UUID; import com.smy.service.IMessageItf; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import javax.annotation.Resource; /** * @author: xx * @createDate: 2020/9/4 7:08 */ @EnableBinding(Source.class) //定义消息的推送管道 public class IMessageImpl implements IMessageItf { @Resource private MessageChannel output; @Override public String send() { String uuid = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(uuid).build()); System.out.println("提供消息:"+uuid); return uuid; } }

controller:

@RestController public class IMessageController { @Resource private IMessageItf iMessageItf; @GetMapping("/sendMessage") public String sendMessage(){ return iMessageItf.send(); } }

消费方构建:pom和主类与提供方一样

yml配置:

server: port: 8802 spring: application: name: STREAM-RABBITMQ-CONSOMER cloud: stream: binders: defaultRabbit: type: rabbit environment: spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest bindings: input: destination: studyExchange content-type: application/json binder: defaultRabbit eureka: client: register-with-eureka: true fetch-registry: true service-url: defaultZone: http://peer1:7001/eureka,http://peer2:7002/eureka instance: instance-id: stream-rabbitmq-8802 prefer-ip-address: true lease-renewal-interval-in-seconds: 1 lease-expiration-duration-in-seconds: 2

controller:

@Component @EnableBinding(Sink.class) public class StreamListenerController { @StreamListener(Sink.INPUT) public void input(Message<String> message){ System.out.println("消息:"+message.getPayload()); } }

请求提供方的地址:http://127.0.0.1:8801/sendMessage    好完成最简单的提供和订阅。

重复消费,消息持久化。

重复消费问题:同一个组内会发生竞争关系,消息只会被消费一次。

不同的组会存在重复消费的问题。下面是两个组的示例。

解决重复消费就要分组添加配置yml文件group: xxx

持久化:当有服务添加group分组信息,消息消费服务停机重启会自动拉去在这段时间未消费的消息。

spring: application: name: STREAM-RABBITMQ-CONSOMER cloud: stream: binders: defaultRabbit: type: rabbit environment: spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest bindings: input: destination: studyExchange content-type: application/json binder: defaultRabbit group: rabbitmqA

 

 

 

 

 

 

 

 

 

最新回复(0)