生产者:用于发送消息 消费者:用于监听、接收、消费和处理消息 消息:可以看做是实际数据,在底层架构中是通过二进制的数据进行保存的 队列:指的是消息的暂存区和存储区,可以看做是一个中转站,消息经过这个中转站,便到了消费者手中。 交换机:同样也是消息的中转站,用于首次接收和分发消息,其中包括Fanout,Direct和Topic,Headers 路由:相当于秘钥,地址或者第三者,一般不单独使用,而是与交换机绑定在一起,将消息路由分支到指定队列
在RabbitmqConfig 这个类添加交换机、队列、绑定key
// 创建队列 @Bean(name = "basicQueue") public Queue basicQueue() { /* // durable :是否持久化(宕机以后重新发布) // exclusive : 是否排外,当前队列只能被一个消费者消费 // autoDelete 如果这个队列没有消费者,队列是否被删除 // arguments 指定当前队列的其他信息 */ return new Queue("basicQueue",true,false,false,null); } @Bean public DirectExchange basicExchange(){ // 跟创建队列一样 return new DirectExchange("basicExchange",true,false); } @Bean public Binding basicBinding(DirectExchange directExchange,Queue queue) { // 绑定路由key return BindingBuilder.bind(queue).to(directExchange).with("basicExchange-basicQueue-key"); } package com.learn.boot.controller; import com.learn.boot.resultVo.ResultVo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * author: zlx * date: 2020-1-1 17:17 * desc: **/ @RestController @RequestMapping("/rabbit") public class RabbitMQController { //定义日志 private static final Logger log= LoggerFactory.getLogger (RabbitMQController.class); @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/rabbitTest") public ResultVo rabbitMQTest(@RequestParam String name) { rabbitTemplate.convertAndSend("basicExchange", "basicExchange-basicQueue-key", name); return ResultVo.success("生产者发送信息成功"); } @RabbitListener(queues = "basicQueue",containerFactory = "singleListenerContainer") public void consumeMsg(Object msg){ try { log.info("基本消息模型-消费者-监听消费消息:{} ",msg.toString()); }catch (Exception e){ log.error("基本消息模型-消费者-发生异常:",e.fillInStackTrace()); } } }下面用一个实体对象进行传输
package com.learn.boot.model; import lombok.Data; import lombok.ToString; import org.springframework.format.annotation.DateTimeFormat; import java.io.Serializable; @Data @ToString public class User implements Serializable { private int age; private String name; } @RequestMapping("/rabbitTestObject") public ResultVo rabbitTestObject(@RequestBody User user) { // 该方法通过MessagePostProcessor实现类直接指定待发送消息的 类型,如上述代码中指定消息为Person类型。 // rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.convertAndSend("basicExchange", "basicExchange-basicQueue-key", user, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //获取消息的属性 MessageProperties messageProperties = message.getMessageProperties(); //设置消息的持久化模式 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); //设置消息的类型(在这里指定消息类型为Person类型) messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,User.class); //返回消息实例 return message; } }); log.info("生产者发送对象消息{}",user); return ResultVo.success("生产者发送信息成功"); } @RabbitListener(queues = "basicQueue",containerFactory = "singleListenerContainer") public void consumeMsg(@Payload User user){ try { log.info("基本消息模型-消费者-监听消费消息:{} ",user); }catch (Exception e){ log.error("基本消息模型-消费者-发生异常:",e.fillInStackTrace()); } }