FanoutExchange具有广播的作用,当消息进入这个中转站的时候,交换机会检查哪个队列跟自己绑定一起的,找到相应的队列后,由队列对应的消费者进行监听消费。 就是一对多的原理 还是在RabbitMQConfig下添加交换机,路由
package com.learn.boot.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.validation.BindingResult; /** zlx * RabbitMQ自定义注入配置Bean相关组件 */ @Configuration public class RabbitmqConfig { private static final Logger log= LoggerFactory.getLogger(RabbitmqConfig.class); // 自动设置RabbitMQ的连接工厂实例 @Autowired private CachingConnectionFactory connectionFactory; // 自动设置消息监听器所在的容器工厂配置类实例 @Autowired private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer; /** * 下面为单一消费者实例的配置 * @return */ @Bean(name = "singleListenerContainer") public SimpleRabbitListenerContainerFactory listenerContainer(){ // 定义消息监听器所在的容器工厂 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); // 设置容器工厂所用的实例 factory.setConnectionFactory(connectionFactory); // 设置消息在传输中的格式,在这里采用JSON的格式进行传输 factory.setMessageConverter(new Jackson2JsonMessageConverter()); // 设置并发消费者实例的初始数量,在这里为1个 factory.setConcurrentConsumers(1); // 设置并发消费者实例的最大数量,在这里为1个 factory.setMaxConcurrentConsumers(1); // 设置并发消费者实例中每个实例拉取的消息数量,在这里为1个 factory.setPrefetchCount(1); return factory; } /** *下面为多个消费者实例的配置,主要是针对高并发业务场景的配置 * @return */ @Bean(name = "multiListenerContainer") public SimpleRabbitListenerContainerFactory multiListenerContainer(){ // 定义消息监听器所在的容器工厂 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); // 设置容器工厂所用的实例 factoryConfigurer.configure(factory,connectionFactory); // 设置消息在传输中的格式,在这里采用JSON的格式进行传输 factory.setMessageConverter(new Jackson2JsonMessageConverter()); // 设置消息的确认消费模式,在这里为NONE,表示不需要确认消费 factory.setAcknowledgeMode(AcknowledgeMode.NONE); // 设置并发消费者实例的初始数量,在这里为10个 factory.setConcurrentConsumers(10); // 设置并发消费者实例的最大数量,在这里为15个 factory.setMaxConcurrentConsumers(15); // 设置并发消费者实例中每个实例拉取的消息数量,在这里为10个 factory.setPrefetchCount(10); return factory; } @Bean public RabbitTemplate rabbitTemplate(){ // 设置“发送消息后返回确认信息” connectionFactory.setPublisherReturns(true); // 构造发送消息组件实例对象 RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 这里是为了消息确认和手动ACK rabbitTemplate.setMandatory(true); // 设置消息在传输中的格式,在这里采用JSON的格式进行传输 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); // 发送消息后,如果发送成功,则输出“消息发送成功”的反馈信息 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息发送成功:correlationData({}),ack({}), cause({})", correlationData,ack,cause); } }); // 发送消息后,如果发送失败,则输出“消息丢失”的反馈信息 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息丢失:exchange({}),route({}),replyCode ({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText, message); } }); // 最终返回RabbitMQ的操作组件实例RabbitTemplate return rabbitTemplate; } // 创建队列 @Bean(name = "basicQueue") public Queue basicQueue() { /* // durable :是否持久化(宕机以后重新发布) // exclusive : 是否排外,当前队列只能被一个消费者消费 // autoDelete 如果这个队列没有消费者,队列是否被删除 // arguments 指定当前队列的其他信息 */ return new Queue("basicQueue",true,false,false,null); } @Bean public DirectExchange basicExchange(){ // 跟创建队列一样 return new DirectExchange("basicExchange",true,false); } @Bean public Binding basicBinding() { // 绑定路由key return BindingBuilder.bind(basicQueue()).to(basicExchange()).with("basicExchange-basicQueue-key"); } /**创建消息模型FanoutExchange ------------------------------------------------------------------------------**/ /** FanoutExchange 交换机 * @return */ @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange",true,false); } //创建队列1 @Bean(name = "fanoutQueueOne") public Queue fanoutQueueOne(){ return new Queue("fanoutQueueOne",true,false,false,null); } //创建队列2 @Bean(name = "fanoutQueueTwo") public Queue fanoutQueueTwo(){ return new Queue("fanoutQueueTwo",true,false,false,null); } //创建绑定1 @Bean public Binding fanoutBindingOne(){ return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange()); } //创建绑定2 @Bean public Binding fanoutBindingTwo(){ return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange()); } } @RequestMapping("/rabbitTestFanoutExchange") public ResultVo rabbitTestFanoutExchange(@RequestBody User user) { // 注意这里不需要指定路由key了,绑定交换机就行 rabbitTemplate.convertAndSend("fanoutExchange", "", user, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //获取消息的属性 MessageProperties messageProperties = message.getMessageProperties(); //设置消息的持久化模式 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); //设置消息的类型(在这里指定消息类型为Person类型) messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,User.class); //返回消息实例 return message; } }); log.info("生产者发送对象消息{}",user); return ResultVo.success("生产者发送信息成功"); } @RabbitListener(queues = "fanoutQueueOne",containerFactory = "singleListenerContainer") public void receiveFanoutResultOne(@Payload User user){ try { log.info("基本消息模型-消费者1-监听消费消息:{} ",user); }catch (Exception e){ log.error("基本消息模型-消费者1-发生异常:",e.fillInStackTrace()); } } @RabbitListener(queues = "fanoutQueueTwo",containerFactory = "singleListenerContainer") public void receiveFanoutResultTwo(@Payload User user){ try { log.info("消息模型-消费者2-监听消费消息:{} ",user); }catch (Exception e){ log.error("基本消息模型-消费者2-发生异常:",e.fillInStackTrace()); } }这种模式适用于业务数据需要广播式的传播,比如用户操作写日志,将用户操作的日志封装成实体类,并将其序列化
这是最正规的消息模式,这个必须交换机和路由绑定,才能把消息发送到队列。 这应该是RabbitMQ最简单的消息模型,在上面的基础上,绑定一个固有的路由key就行了
/**创建消息模型directExchange----------------------------------------------------------------- **/ //创建交换机directExchange @Bean public DirectExchange directExchange(){ return new DirectExchange("directExchange",true,false); } //创建队列1 @Bean(name = "directQueueOne") public Queue directQueueOne(){ return new Queue("directQueueOne",true); } //创建队列2 @Bean(name = "directQueueTwo") public Queue directQueueTwo(){ return new Queue("directQueueOne",true); } //创建绑定1 @Bean public Binding directBindingOne(){ return BindingBuilder.bind(directQueueOne()).to(directExchange()). with("directExchange-key-one"); } //创建绑定2 @Bean public Binding directBindingTwo(){ return BindingBuilder.bind(directQueueTwo()).to(directExchange()). with("directExchange-key-two"); } @RequestMapping("/rabbitTestDirectExchange") public ResultVo rabbitTestDirectExchange(@RequestBody User user) { // 注意这里不需要指定路由key了,绑定交换机就行 rabbitTemplate.convertAndSend("directExchange", "directExchange-key-one", user, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //获取消息的属性 MessageProperties messageProperties = message.getMessageProperties(); //设置消息的持久化模式 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); //设置消息的类型(在这里指定消息类型为User类型) messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,User.class); //返回消息实例 return message; } }); log.info("生产者发送对象消息{}",user); return ResultVo.success("生产者发送信息成功"); } @RequestMapping("/rabbitTestDirectExchangeTwo") public ResultVo rabbitTestDirectExchangeTwo(@RequestBody User user) { // 注意这里不需要指定路由key了,绑定交换机就行 rabbitTemplate.convertAndSend("directExchange", "directExchange-key-two", user, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //获取消息的属性 MessageProperties messageProperties = message.getMessageProperties(); //设置消息的持久化模式 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); //设置消息的类型(在这里指定消息类型为User类型) messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,User.class); //返回消息实例 return message; } }); log.info("生产者发送对象消息{}",user); return ResultVo.success("生产者发送信息成功"); } @RabbitListener(queues = "directQueueOne",containerFactory = "singleListenerContainer") public void receiveDirectQueueOne(@Payload User user){ try { log.info("direct消息模型-消费者1-监听消费消息:{} ",user); }catch (Exception e){ log.error("direct消息模型-消费者1-发生异常:",e.fillInStackTrace()); } } @RabbitListener(queues = "directQueueTwo",containerFactory = "singleListenerContainer") public void receiveDirectQueueTwo(@Payload User user){ try { log.info("direct消息模型-消费者2-监听消费消息:{} ",user); }catch (Exception e){ log.error("direct消息模型-消费者2-发生异常:",e.fillInStackTrace()); } }指定路由key:directExchange-key-one 指定路由key:directExchange-key-two
这种消息模型,最大的特点就是支持通配路由,以*和#作为通配符,从而绑定不同的队列,*表示一个特定的单词,而#表示任意 slow.red.dog
fast.red.monkey
