SpringCloudStream之RabbitMq
日期:2020/09/04
pom.xml引入,spring-cloud版本Greenwich.SR6,spring-boot版本2.1.13.RELEASE,stream-rabbit版本2.1.4.RELEASE
<dependency>
<groupId>org.springframework.cloud
</groupId>
<artifactId>spring-cloud-starter-stream-rabbit
</artifactId>
</dependency>
yml中配置
2.1 生产者服务配置
spring:
stream:
binders:
his-rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
bindings:
output-order:
binder: his
-rabbit
destination: his.exchange.stream
content-type: application/json
producer:
partition-count: 10
partition-key-expression: "0"
2.2 消费者服务配置
spring:
stream:
binders:
his-rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
bindings:
input-order:
binder: his
-rabbit
destination: his.exchange.stream
content-type: application/json
group: his.group.finance
consumer:
partitioned: true
instance-index: 0
以上配置无误可以正确连接rabbitmq,但会报rabbitmq健康检查无法连接的错误,虽然不影响使用,但是可以通过关闭健康检查去除报错提示,yml中配置如下
management:
health:
rabbit:
enabled: false
自定义消息通道,包含一个生产者通道,一个消费者通道
package com
.choice
.his
.finance
.producer
;
import org
.springframework
.cloud
.stream
.annotation
.Input
;
import org
.springframework
.cloud
.stream
.annotation
.Output
;
import org
.springframework
.messaging
.MessageChannel
;
import org
.springframework
.messaging
.SubscribableChannel
;
public interface IProcessor {
String OUTPUT_ORDER
= "output-order";
@Output(IProcessor
.OUTPUT_ORDER
)
MessageChannel
outputOrder();
String INPUT_ORDER
= "input-order";
@Input(IProcessor
.INPUT_ORDER
)
SubscribableChannel
inputOrder();
}
生产者消息接口与实现
package com
.choice
.his
.finance
.producer
;
public interface ProducerService {
String
send();
}
package com
.choice
.his
.finance
.producer
.impl
;
import com
.choice
.his
.finance
.producer
.IProcessor
;
import com
.choice
.his
.finance
.producer
.ProducerService
;
import org
.springframework
.cloud
.stream
.annotation
.EnableBinding
;
import org
.springframework
.messaging
.MessageChannel
;
import org
.springframework
.messaging
.support
.MessageBuilder
;
import javax
.annotation
.Resource
;
import java
.text
.SimpleDateFormat
;
import java
.util
.Date
;
@EnableBinding(IProcessor
.class)
public class ProducerServiceImpl implements ProducerService {
@Resource(name
= IProcessor
.OUTPUT_ORDER
)
private MessageChannel outputOrder
;
@Override
public String
send() {
final SimpleDateFormat sdf
= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String msg
= "his-finance-send----->" + sdf
.format(new Date());
outputOrder
.send(MessageBuilder
.withPayload(msg
).build());
return msg
;
}
}
生产者消息Controller
package com
.choice
.his
.finance
.controller
;
import com
.choice
.his
.finance
.producer
.ProducerService
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.web
.bind
.annotation
.GetMapping
;
import org
.springframework
.web
.bind
.annotation
.RequestMapping
;
import org
.springframework
.web
.bind
.annotation
.RestController
;
@RestController
@RequestMapping("/msg")
public class ProducerController {
@Autowired
private ProducerService producerService
;
@GetMapping("/send")
public String
send() {
return producerService
.send();
}
}
**注:**以上为生产者全部配置与代码
消费者业务实现
package com
.choice
.his
.finance
.consumer
;
import com
.choice
.his
.finance
.producer
.IProcessor
;
import org
.springframework
.cloud
.stream
.annotation
.EnableBinding
;
import org
.springframework
.cloud
.stream
.annotation
.StreamListener
;
import org
.springframework
.messaging
.Message
;
@EnableBinding(IProcessor
.class)
public class ConsumerService {
@StreamListener(IProcessor
.INPUT_ORDER
)
public void input(Message
<String> msg
) {
System
.out
.println("消费者-his-finance-01号:msg=" + msg
.getPayload());
}
}
**注:**以上为消费者全部配置与代码
rabbitmq管理界面
Exchanges菜单会创建一个his.exchange.stream的交换机,类型为topic(生产者、消费者)Queues菜单会创建一个his.exchange.stream.his.group.finance-0队列(仅消费者配置产生)Queues队列名称由yml中配置的(destination点group横杠instance-index),3部分组成