MQ如何保证消息不丢失

tech2024-02-02  98

一、Rabbit MQ 基础知识

1.生产者(Producer): 投递消息的一方

生产者创建消息,然后对消息体进行序列化+消息的标签发送给RabbitMQ。

2.消费者(Consumer): 接收消息的一方

消费者连接到RabbitMQ 服务器,并订阅队列中消息。消费者接收到消息后,先进行反序列化,然后消费进行处理。

3.Broker: 消息中间件的服务节点

对于RabbitMQ来说,一个RabbitMQ broker可以简单看作一个RabbitMQ服务节点或服务实例。

4.队列(Queue):用于存储消息的对象

RabbitMQ 中消息都只存储在队列中。

注意:多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询),给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

5.交换器、绑定、路由键

交换器(Exchange): 生产者把消息发送到交换器,由交换器将消息路由到一个或多个队列中(可以把它想像成路由器的功能)。如果路由不到,或许返回给生产者,或许直接丢弃。如果消息不允许有丢失的情况,需要在生产者发送消息之前在确保交换器、队列、绑定都已经创建好。绑定(Binding):RabbitMQ通过绑定将交换器与队列关联起来,在绑定的时候会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确地将消息路由到队列。路由键(RoutingKey):生产者将消息发送给交换器时,需要一个RoutingKey,当BindingKey和RoutingKey相匹配时,消息被路由到对应的队列中。RabbitMQ Java API中都把BindingKey 和RoutingKey 看作RoutingKey,这里需要注意,不要混淆。 在Spring 中 BindingBuilder中使用的RoutingKey 其实就是 BindingKey;而在RabbitTemplate 中发送消息方法中使用的路由键,就是RoutingKey。

6.交换器类型

fanout: 它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。交换器会将接收到的消息,为各个队列各复制一份,并放入这些队列之中,所以发送消息时不需要RoutingKey,绑定时也不需要BindingKey。direct: direct类型的交换器路由规则也很简单,它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中。在实际使用过程中,我们通常把 BindingKey和RoutingKey 设置为队列名称,以方便使用。topic: topic类型与direct类型都是通过BindingKey和RoutingKey 进匹配进行路由消息,但direct类型是要求BindingKey和RoutingKey完全匹配,而topic类型是通过BindingKey中的“*”和“#”两个特殊字符进行模糊匹配来路由消息。其中*用于匹配一个单词,#用于匹配多个单词(可以是零个),单词与单词之间使用“.” 进行分隔。

 

二、两个配置

publisher-confirms: true  支持发布确认, 用于记录消息没发送成功的情况

publisher-returns: true  支持发布返回, 交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Redtun命令将消息返回给生产者。用于记录消息没发送成功的情况

spring.rabbitmq.publisher-confirms 和 spring.rabbitmq.publisher-returns 只是对生产者有用,建议都设置成true,当消息没有发送成功时会进行回调,同时需要实现MessageProducerCallback 接口,并注入到Spring中

@Configuration public class ProducerConfiguration { @Bean public MessageProducerCallback messageProducerCallback() { return new MessageProducerCallback() { /** * 消息发送到RabbitMQ 交换器中情况,注意,此时还没有发送到消息者中 <br/> * 当 spring.rabbitmq.publisher-confirms设置为true时生效 <br/> * 在onConfirm中是没有原message的,所以无法在这个函数中调用重发,confirmCallback只有一个通知的作用 */ @Override public void onConfirm(String id, boolean ack, String cause) { if (ack) { System.out.println("消息 " + id + " 已经成功发送到交换器中"); } else { System.err.println("消息 " + id + " 因 " + cause + " 未能发送到交换器中"); } } @Override public void onReturned(String messageId, Object message, int replyCode, String replyText, String exchange,String routingKey) { System.err.println("消息:" + messageId + " 发送到交换器:" + exchange + " 后,无法根据自身的类型和路由键:" + routingKey + " 找到一个符合条件的队列"); } }; } }

 

三、生产者保证消息不丢失的措施

为了保证消息能发送到MQ,需要将 spring.rabbitmq.publisher-confirms 设置为true, 通过MQ的confirm功能确认消息是否发送成功。处理流程如下:

1.发送消息时,先把消息进行本地缓存;

2.发送消息;

3.在收到confirmCallback 消息后,如果是成功ack的则将消息从缓存中删除;

4.在应用启动时开启定时任务,定时检查本地缓存中,超过一定时间没有ack的消息,并进行重发送;

5.应用运行过程中,会每隔1秒将本缓存写到磁盘;应用在下次启动时,将其重新加载回本地缓存,进行重新处理。为了保证性能不是实时保证数据,所以可能会丢失1秒的数据;在程序重启时,有可能没有收到ack消息,所以可能会造成重发。

四、消费者保证消息不丢失的措施

方案1:

默认使用自动ACK模式进行消费消息,如果需要保证消息不会有丢失,必须使用手动ACK模式,设置方法:consumer.setAcknowledgeMode(AcknowledgeMode.MANUAL); 当MessageHandler.onMessage抛异常时,将会发nack消息给MQ,将当前消息重新入队列,进行下一次投递。

在高并发场景下,使用手动ACK模式可能会造成消息堆积比较严重,为了解决这个问题,建议消息接收到后,直接写入数据库,写入成功后,再执行ACK,然后再使用定时任务去扫数据库的数据进行消费。一方面保证消息不丢失,另一方面还可以保证消息不容易堆积。

方案2:

生产者发送消息

1.1 生产者发送消息到RabbitMQ服务器

1.2 RabbitMQ通过异步confirm方式通知生产者消息接收完成

1.3 生产者可以异步将消息存入ES(备份消息),防止MQ服务器重启导致消息丢失,当然交换机、队列和消息现在是持久化状态

消费者消费消息

2.1 消费者接收数据

2.2 接收到消息首先写入ES

2.3 业务处理完成后删除ES中到消息

补发定时任务

3 定时任务补发消费失败消息并更新补发次数 (这步只能补发到交换机上,不能指定发送到具体的队列,如果是fanout模式,在消费者里需判断,非本队列的消息需丢弃掉)

报警定时任务

4 定时任务抓去补发超过3次消息钉钉报警人工干预

最新回复(0)