RabbitMq实战之(二)springboot整合RabbitMQ

tech2025-01-12  5

RabbitMQ

相关组件SpringBoot整合RabbitMQ自定义配置Bean相关组件 RabbitMQ发送消息和接收消息其他发送和接收消息方式实战

相关组件

生产者:用于发送消息 消费者:用于监听、接收、消费和处理消息 消息:可以看做是实际数据,在底层架构中是通过二进制的数据进行保存的 队列:指的是消息的暂存区和存储区,可以看做是一个中转站,消息经过这个中转站,便到了消费者手中。 交换机:同样也是消息的中转站,用于首次接收和分发消息,其中包括Fanout,Direct和Topic,Headers 路由:相当于秘钥,地址或者第三者,一般不单独使用,而是与交换机绑定在一起,将消息路由分支到指定队列

SpringBoot整合RabbitMQ

<!—RabbitMQ的起步依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>1.3.3.RELEASE</version> </dependency> #RabbitMQ配置 spring.rabbitmq.virtual-host=/ #RabbitMQ服务器所在的Host,在这里连接本地即可 spring.rabbitmq.host=127.0.0.1 #5672为RabbitMQ提供服务时的端口号 spring.rabbitmq.port=5672 #guest和guest为连接到RabbitMQ服务器的用户名和密码 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #这是自定义变量,表示本地开发环境 mq.env=local

自定义配置Bean相关组件

package com.learn.boot.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** zlx * RabbitMQ自定义注入配置Bean相关组件 */ @Configuration public class RabbitmqConfig { private static final Logger log= LoggerFactory.getLogger(RabbitmqConfig.class); //自动设置RabbitMQ的连接工厂实例 @Autowired private CachingConnectionFactory connectionFactory; //自动设置消息监听器所在的容器工厂配置类实例 @Autowired private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer; /** * 下面为单一消费者实例的配置 * @return */ @Bean(name = "singleListenerContainer") public SimpleRabbitListenerContainerFactory listenerContainer(){ //定义消息监听器所在的容器工厂 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); //设置容器工厂所用的实例 factory.setConnectionFactory(connectionFactory); //设置消息在传输中的格式,在这里采用JSON的格式进行传输 factory.setMessageConverter(new Jackson2JsonMessageConverter()); //设置并发消费者实例的初始数量,在这里为1个 factory.setConcurrentConsumers(1); //设置并发消费者实例的最大数量,在这里为1个 factory.setMaxConcurrentConsumers(1); //设置并发消费者实例中每个实例拉取的消息数量,在这里为1个 factory.setPrefetchCount(1); return factory; } /** *下面为多个消费者实例的配置,主要是针对高并发业务场景的配置 * @return */ @Bean(name = "multiListenerContainer") public SimpleRabbitListenerContainerFactory multiListenerContainer(){ //定义消息监听器所在的容器工厂 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); //设置容器工厂所用的实例 factoryConfigurer.configure(factory,connectionFactory); //设置消息在传输中的格式,在这里采用JSON的格式进行传输 factory.setMessageConverter(new Jackson2JsonMessageConverter()); //设置消息的确认消费模式,在这里为NONE,表示不需要确认消费 factory.setAcknowledgeMode(AcknowledgeMode.NONE); //设置并发消费者实例的初始数量,在这里为10个 factory.setConcurrentConsumers(10); //设置并发消费者实例的最大数量,在这里为15个 factory.setMaxConcurrentConsumers(15); //设置并发消费者实例中每个实例拉取的消息数量,在这里为10个 factory.setPrefetchCount(10); return factory; } @Bean public RabbitTemplate rabbitTemplate(){ //设置“发送消息后返回确认信息” connectionFactory.setPublisherReturns(true); //构造发送消息组件实例对象 RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 这里是为了消息确认和手动ACK rabbitTemplate.setMandatory(true); // 设置消息在传输中的格式,在这里采用JSON的格式进行传输 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); // 发送消息后,如果发送成功,则输出“消息发送成功”的反馈信息 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息发送成功:correlationData({}),ack({}), cause({})", correlationData,ack,cause); } }); //发送消息后,如果发送失败,则输出“消息丢失”的反馈信息 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息丢失:exchange({}),route({}),replyCode ({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText, message); } }); //最终返回RabbitMQ的操作组件实例RabbitTemplate return rabbitTemplate; } }

RabbitMQ发送消息和接收消息

在RabbitmqConfig 这个类添加交换机、队列、绑定key

// 创建队列 @Bean(name = "basicQueue") public Queue basicQueue() { /* // durable :是否持久化(宕机以后重新发布) // exclusive : 是否排外,当前队列只能被一个消费者消费 // autoDelete 如果这个队列没有消费者,队列是否被删除 // arguments 指定当前队列的其他信息 */ return new Queue("basicQueue",true,false,false,null); } @Bean public DirectExchange basicExchange(){ // 跟创建队列一样 return new DirectExchange("basicExchange",true,false); } @Bean public Binding basicBinding(DirectExchange directExchange,Queue queue) { // 绑定路由key return BindingBuilder.bind(queue).to(directExchange).with("basicExchange-basicQueue-key"); } package com.learn.boot.controller; import com.learn.boot.resultVo.ResultVo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * author: zlx * date: 2020-1-1 17:17 * desc: **/ @RestController @RequestMapping("/rabbit") public class RabbitMQController { //定义日志 private static final Logger log= LoggerFactory.getLogger (RabbitMQController.class); @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/rabbitTest") public ResultVo rabbitMQTest(@RequestParam String name) { rabbitTemplate.convertAndSend("basicExchange", "basicExchange-basicQueue-key", name); return ResultVo.success("生产者发送信息成功"); } @RabbitListener(queues = "basicQueue",containerFactory = "singleListenerContainer") public void consumeMsg(Object msg){ try { log.info("基本消息模型-消费者-监听消费消息:{} ",msg.toString()); }catch (Exception e){ log.error("基本消息模型-消费者-发生异常:",e.fillInStackTrace()); } } }

其他发送和接收消息方式实战

下面用一个实体对象进行传输

package com.learn.boot.model; import lombok.Data; import lombok.ToString; import org.springframework.format.annotation.DateTimeFormat; import java.io.Serializable; @Data @ToString public class User implements Serializable { private int age; private String name; } @RequestMapping("/rabbitTestObject") public ResultVo rabbitTestObject(@RequestBody User user) { // 该方法通过MessagePostProcessor实现类直接指定待发送消息的 类型,如上述代码中指定消息为Person类型。 // rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.convertAndSend("basicExchange", "basicExchange-basicQueue-key", user, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //获取消息的属性 MessageProperties messageProperties = message.getMessageProperties(); //设置消息的持久化模式 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); //设置消息的类型(在这里指定消息类型为Person类型) messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,User.class); //返回消息实例 return message; } }); log.info("生产者发送对象消息{}",user); return ResultVo.success("生产者发送信息成功"); } @RabbitListener(queues = "basicQueue",containerFactory = "singleListenerContainer") public void consumeMsg(@Payload User user){ try { log.info("基本消息模型-消费者-监听消费消息:{} ",user); }catch (Exception e){ log.error("基本消息模型-消费者-发生异常:",e.fillInStackTrace()); } }

最新回复(0)