rabbitMq 企业开始实战-direct

tech2024-03-09  83

第三章 rabbitMq 企业开始实战-direct


文章目录

场景分析一、定义direct 交换机二、定义消息发送方法1.引入库2.定义消费端


场景分析

现在需要实现针对不同的系统日志级别保存到不同的媒介中。 如有 info,debug,error ,现在需要将 info,debug 已文件形式保存,error 需要保存到数据库中 用第二章 的 fanout 模式是否可以实现? 明显满足不了我们上面的需求,所以我们需要下面这种模式来实现 区别是在原来的基础上对广播又加了一个条件


direct 实现方式如下图所示:

一、定义direct 交换机

示例:注意它与fanout 模式的区别

最大区别如图 下面是整理代码

package net.getbang.rabbitmq.direct; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 定义并初始化diretc 交换机,队列,以及将队列绑定到 交换机上 */ @Configuration public class DiretcBuildConfig { /** * 定义一个Direct类型的交换机 * @return */ @Bean DirectExchange directExchange(){ return new DirectExchange("direct-test"); } /** * 定义存文件队列 */ @Bean public Queue queueSaveText(){ return new Queue("direct.text"); } /** * 定义保存数据库队列 */ @Bean public Queue queueSaveDB(){ return new Queue("direct.db"); } /** * 将队列绑定到交换机上 */ @Bean Binding bindingDirectText(Queue queueSaveText, DirectExchange directExchange) { return BindingBuilder.bind(queueSaveText).to(directExchange).with("text"); } @Bean Binding bindingDirectDB(Queue queueSaveDB, DirectExchange directExchange) { return BindingBuilder.bind(queueSaveDB).to(directExchange).with("db"); } }

二、定义消息发送方法

1.引入库

代码如下(示例):

package net.getbang.rabbitmq.producer; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * direct 消息发送 */ @Component public class DirectProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendDirectMessage(String exchangeName,String routingKey, Object message){ rabbitTemplate.convertAndSend(exchangeName,routingKey,message); } }

从上面代码对比之前的fanout 消息发送端多一个routingKey

2.定义消费端

代码如下(示例):

package net.getbang.rabbitmq.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * */ @Component public class DirectListener { @RabbitListener(queues = "direct.text") public void reviceFanout(Object message){ System.out.println("DirectListener.reviceDirect 接收的消息:"+message); System.out.println("保存到文件系统"); } @RabbitListener(queues = "direct.db") public void reviceFanout2(Object message){ System.out.println("DirectListener.reviceDirect2 接收的消息:"+message); System.out.println("保存到数据库"); } }

编写测试类

@Autowired DirectProducer directProducer; /** * diretc 模式发送消息测试 * */ @Test public void sendDircetMessage(){ Map<String,String> messageBody =new HashMap(2); messageBody.put("info","我是info 日志"); directProducer.sendDirectMessage("direct-test","text",messageBody); }

接收消息成功

然后按上面需求发送消息测试

@Test public void sendDircetMessage(){ Map<String,String> messageBody =new HashMap(2); messageBody.put("info","我是info 日志"); directProducer.sendDirectMessage("direct-test","text",messageBody); Map<String,String> messageBody2 =new HashMap(2); messageBody2.put("debug","我是debug 日志"); directProducer.sendDirectMessage("direct-test","text",messageBody2); Map<String,String> messageBody3 =new HashMap(2); messageBody3.put("error","我是error 日志"); directProducer.sendDirectMessage("direct-test","db",messageBody3); }

测试结果: 发现发送了三条,接收了2条?why?

问题:针对上面的需求 现又有改动,需要对error 日志里的系统报错提取出来放到文件系统去,只要业务的error ?怎么处理?

代码地址:https://code.aliyun.com/411741962/rabbitmq-springboot.git

最新回复(0)