SpringCloudStream之RabbitMq

tech2025-10-08  3

SpringCloudStream之RabbitMq


日期:2020/09/04


pom.xml引入,spring-cloud版本Greenwich.SR6,spring-boot版本2.1.13.RELEASE,stream-rabbit版本2.1.4.RELEASE

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>

yml中配置

2.1 生产者服务配置
spring: stream: binders: # 绑定消息中间件,可以同时绑定多个不同的中间件 his-rabbit: # 中间件配置名称(唯一) type: rabbit # 指定具体的中间件,可以是rabbit、kafka、redis等 environment: spring: rabbitmq: # rabbitmq配置 host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / bindings: # 绑定消息通道,可以同时绑定多个不同的消息通道 output-order: # 消息通道名称,此处是一个生产者 binder: his-rabbit # 指定消息中间件(spring.stream.binders.his-rabbit) destination: his.exchange.stream # 相当于rabbitmq中交换机名称,交换机模式默认是topic content-type: application/json # 消息类型:text/plain(普通文本),application/json(JSON消息) producer: # 生产者 partition-count: 10 # 指定参与消息分区的消费端节点数量为10(0-9)个 partition-key-expression: "0" # 指定消费者的实例索引(consumer.instance-index),此处必须用""括起来,否者启动就会报错
2.2 消费者服务配置
spring: stream: binders: # 绑定消息中间件,可以同时绑定多个不同的中间件 his-rabbit: # 中间件配置名称(唯一) type: rabbit # 指定具体的中间件,可以是rabbit、kafka、redis等 environment: spring: rabbitmq: # rabbitmq配置 host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / bindings: # 绑定消息通道,可以同时绑定多个不同的消息通道 input-order: # 消息通道名称,此处是一个消费者 binder: his-rabbit # 指定消息中间件(spring.stream.binders.his-rabbit) destination: his.exchange.stream # 相当于rabbitmq中交换机名称,交换机模式默认是topic,与生产者绑定同一个交换机 content-type: application/json # 消息类型:text/plain(普通文本),application/json(JSON消息) group: his.group.finance # 分组名称,消费者必须指定,可持久化消息并且分组名称相同的消费者,不会重复消费 consumer: # 消费者 partitioned: true # 开启分区 instance-index: 0 # 实例索引,相当于rabbitmq中RoutingKey,此时该消费者接收producer.partition-key-expression=0的消息

以上配置无误可以正确连接rabbitmq,但会报rabbitmq健康检查无法连接的错误,虽然不影响使用,但是可以通过关闭健康检查去除报错提示,yml中配置如下

# 关闭rabbitmq健康检查 management: health: rabbit: enabled: false

自定义消息通道,包含一个生产者通道,一个消费者通道

package com.choice.his.finance.producer; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; /** * <p> * 自定义消息通道,包含一个生产者通道,一个消费者通道 * </p> * * @author cczhaoyc@163.com * @version v_1.0.0 * @date 2020/9/4 9:44 */ public interface IProcessor { /** * 输出通道的名称,对应yml配置文件中spring.stream.bindings.output-order,名称必须相同 */ String OUTPUT_ORDER = "output-order"; /** * @return 输出通道,即生产者通道 */ @Output(IProcessor.OUTPUT_ORDER) MessageChannel outputOrder(); /** * 输入通道的名称,对应yml配置文件中spring.stream.bindings.input-order,名称必须相同 */ String INPUT_ORDER = "input-order"; /** * @return 输入通道,即消费者通道 */ @Input(IProcessor.INPUT_ORDER) SubscribableChannel inputOrder(); }

生产者消息接口与实现

package com.choice.his.finance.producer; /** * <p> * 生产者消息接口 * </p> * * @author cczhaoyc@163.com * @version v_1.0.0 * @date 2020/9/1 17:09 */ public interface ProducerService { /** * <p> * 发送消息 * </p> * * @author cczhaoyc@163.com * @date 2020/9/4 10:06 */ String send(); } package com.choice.his.finance.producer.impl; import com.choice.his.finance.producer.IProcessor; import com.choice.his.finance.producer.ProducerService; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import javax.annotation.Resource; import java.text.SimpleDateFormat; import java.util.Date; /** * <p> * 生产者消息接口实现 * </p> * * @author cczhaoyc@163.com * @version v_1.0.0 * @date 2020/9/1 17:13 */ @EnableBinding(IProcessor.class)// 绑定消息通道 public class ProducerServiceImpl implements ProducerService { @Resource(name = IProcessor.OUTPUT_ORDER)// IProcessor.OUTPUT_ORDER为自定义输出通道,即生产者 private MessageChannel outputOrder; @Override public String send() { final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String msg = "his-finance-send----->" + sdf.format(new Date()); outputOrder.send(MessageBuilder.withPayload(msg).build()); return msg; } }

生产者消息Controller

package com.choice.his.finance.controller; import com.choice.his.finance.producer.ProducerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * <p> * 生产者消息 * </p> * * @author cczhaoyc@163.com * @version v_1.0.0 * @date 2020/9/1 17:28 */ @RestController @RequestMapping("/msg") public class ProducerController { @Autowired private ProducerService producerService; @GetMapping("/send") public String send() { return producerService.send(); } }

**注:**以上为生产者全部配置与代码


消费者业务实现

package com.choice.his.finance.consumer; import com.choice.his.finance.producer.IProcessor; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.Message; /** * <p> * 消费者业务类 * </p> * * @author cczhaoyc@163.com * @version v_1.0.0 * @date 2020/9/1 16:58 */ @EnableBinding(IProcessor.class)// 绑定消息通道 public class ConsumerService { @StreamListener(IProcessor.INPUT_ORDER)// 监听IProcessor.INPUT_ORDER通道,即消费者 public void input(Message<String> msg) { System.out.println("消费者-his-finance-01号:msg=" + msg.getPayload()); } }

**注:**以上为消费者全部配置与代码


rabbitmq管理界面

Exchanges菜单会创建一个his.exchange.stream的交换机,类型为topic(生产者、消费者)Queues菜单会创建一个his.exchange.stream.his.group.finance-0队列(仅消费者配置产生)Queues队列名称由yml中配置的(destination点group横杠instance-index),3部分组成
最新回复(0)