RabbitMQ

tech2022-07-17  187

目录

概述核心概念架构图RabbitMQ运行机制工作模式1、简单模式2、Work queues工作队列模式3、订阅模式类型4、Publish/Subscribe发布与订阅模式5、Routing路由模式6、topic模式 docker安装RabbitMQspringboot整合RabbitMQ自定义序列化监听消息1、@RabbitListener2、@RabbitListener+@RabbitHandler3、多线程消费消息 RabbitMQ消息确认机制 - 可靠抵达确认模式 - ConfirmCallback可靠抵达 - ReturnCallback可靠抵达 - ACK消息确认机制发送端消息确认 - 确认模式消费端消息确认 - ack模式 如何保证消息的可靠性一、消息丢失的问题二、消息重复的问题三、消息积压的问题 持久化1、交换机和队列持久化2、消息持久化 消息的状态Spring-RabbitMq 参数配置详解

概述

大多数应用中,可通过消息服务中间件来提升系统异步通信,扩展解耦能力

消息服务中两个重要概念:消息代理(message broker)和目的地(destination) 当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地

消息队列主要有两种形式的目的地

队列(queue):点对点消息通信主题(topic):发布(publish)/订阅(subscribe)消息通信

点对点式:

消息发送者发送消息,消息代理将其放入一个队列中,消息接受者从队列中获取消息内容,消息读取后别移出队列消息只有唯一的发送者和接受者,但并不是说只能有一个接受者

发布订阅式:

发送者发布消息到主题,多个接受者(订阅者)监听(订阅)这个主题,那么就会在消息到达同时接受到消息

JMS JAVA消息服务 :基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现。JMS是一个API。RocketMQ没有在mq核心中去实现JMS接口

AMQP

高级消息队列协议,也是一个消息代理的规范,兼容JMS

Spring支持

spring-jms提供了对JMS的支持spring-rabbit提供了对AMQP的支持需要ConnectionFactory的实现来连接消息代理提供 JmsTemplate,RabbitTemplate来发送消息@JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息@EnableJms、@EnableRabbit开启支持

SpringBoot自动配置

JmsAutoConfigurationRabbitAutoConfiguration

市面上的MQ产品

ActiveMQ、RabbitMQ、RocketMQ、Kafka

核心概念

架构图

RabbitMQ运行机制

fanout(广播交换机) 2. direct(直连交换机) 3. topic(话题交换机)

工作模式

简单说: 简单模式:一个生产者,一个消费者 work模式:一个生产者,多个消费者,每个消费者获取到的消息唯一。 订阅模式:一个生产者发送的消息会被多个消费者获取。 路由模式:发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key topic模式:将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,“*”只匹配一个词。

1、简单模式

发送消息时,没有指定交换机,使用的是默认交换机 在上图的模型中,有以下概念: P:生产者,也就是要发送消息的程序 C:消费者:消息的接受者,会一直等待消息到来。 queue:消息队列,图中红色部分。 默认交换机(default exchange):是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。绑定的路由键(routing key)名称与队列名称相同。

2、Work queues工作队列模式

Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。 应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

3、订阅模式类型

前面2个案例中,只有3个角色:实际上还是有交换机的,只不过使用的是默认交换机

P:生产者,也就是要发送消息的程序C:消费者:消息的接受者,会一直等待消息到来。queue:消息队列,图中红色部分

而在订阅模型中,多了一个exchange角色,而且过程略有变化: P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机) C:消费者,消息的接受者,会一直等待消息到来。 Queue:消息队列,接收消息、缓存消息。 Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

Fanout:广播,将消息交给所有绑定到交换机的队列Direct:定向,把消息交给符合指定routing key 的队列Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

4、Publish/Subscribe发布与订阅模式

发布订阅模式: 1、每个消费者监听自己的队列。 2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

5、Routing路由模式

6、topic模式

docker安装RabbitMQ

官网:https://www.rabbitmq.com/

启动:如果没下载镜像,会自动下载

docker run --privileged=true -d --name rabbitmq --publish 5671:5671 \ --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 \ rabbitmq:management

设置容器开机自启动:可设置,也可以不设置

[root@localhost ~]# docker update ba18b874b305 --restart=always

启动日志查看:

[root@localhost ~]# docker logs rabbitmq

注: 页面访问:http://192.168.59.131:15672/ 输入默认账号: guest : guest

springboot整合RabbitMQ

依赖:

<!--rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

启动类:@EnableRabbit

配置文件:

spring: rabbitmq: host: 192.168.59.131 port: 5672 ##虚拟主机 virtual-host: /

测试1:创建Exchange、Queue、Binding

@Autowired private AmqpAdmin amqpAdmin; /** * 1、如何创建Exchange、Queue、Binding * 1)、使用AmqpAdmin进行创建 */ @Test void contextLoads() { DirectExchange directExchange=new DirectExchange("hello-java-exchange",true,false,null); amqpAdmin.declareExchange(directExchange); log.info("Exchange[{}]创建成功","hello-java-exchange"); } /** * 创建队列 */ @Test void creatQueue(){ Queue queue = new Queue("hello-java-queue",true,false,false); amqpAdmin.declareQueue(queue); log.info("Queue[{}]创建成功","hello-java-queue"); } /** * 创建一个绑定关系 */ @Test public void createBinding(){ /** * 参数一:目的地 * 参数二:目的地的类型 * 参数三:交换机 * 参数四:路由键 * 参数五:携带的相关参数 */ Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,"hello-java-exchange","hello.world",null); amqpAdmin.declareBinding(binding); log.info("Binding[{}]创建成功","hello-java-binding"); }

exchanges创建后,可在页面查看;绑定关系以及队列也都可查看 测试2:如何收发消息 注:如果发送的消息是一个对象,需要将对象序列化

@Autowired private RabbitTemplate rabbitTemplate; /** * 收发消息 */ @Test void sendMsg(){ rabbitTemplate.convertAndSend("hello-java-exchange","hello.world","双方都"); log.info("消息发送完成"); } /** * 收发消息 */ @Test void sendMsg(){ OrderEntity orderEntity=new OrderEntity(); orderEntity.setId(1l); orderEntity.setBillContent("哈哈"); rabbitTemplate.convertAndSend("hello-java-exchange","hello.world",orderEntity); log.info("消息发送完成"); }

发现序列化的方式不是json格式,能不能序列化成json格式呢?

自定义序列化

源码查看: RabbitAutoConfiguration: configure方法:进入到RabbitTemplateConfigurer的configure方法 没在RabbitTemplateConfigurer类里发现messageConverter是怎么初始化的,看看RabbitTemplateConfigurer是怎么初始化的,回到RabbitAutoConfiguration 明白了,messageConverter是通过从容器中注入的,也就是我们往容器中放入一个messageConverter,即可。

那么默认的messageConverter是啥呢? 到RabbitTemplate中查看(因为之前template.setMessageConverter(this.messageConverter);) 看看这个默认的SimpleMessageConverter,发现它将对象序列化为一个byte数组 看看MessageConverter接口,都提供了哪些实现类 找到了,只要我们往容器里添加这个实现类即可

配置类:

@Configuration public class RabbitMQConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }

再次测试:发现发送消息为对象时,序列化的方式变成了json

监听消息

1、@RabbitListener

标在方法上 入参自动接受监听到的消息。

/** * @RabbitListener:可以标在类和方法上 * 属性queues:需要监听的所有队列 * * 入参自动接受监听到的消息。 * 类型为:org.springframework.amqp.core.Message * * 参数可以写以下类型: * 1.Message:原生消息详细信息 * message.getBody()获取消息体 * message.getMessageProperties()获取消息头 * 2.T<发送的消息的类型> OrderEntity orderEntity * 入参写消息的类型,监听到消息时,消息自动封装到入参中,就可以不需要使用原生Message.getBody()去获取消息了 * 3.Channel chanhel:当前传输数据的通道 *Queue: 可以很多服务来监听。只要收到消息,队列就删除消息,而且只能有一个收到此消息 * 场景: * 1.订单服务启动多个,监听同一个队列,队列里的消息,只能被一个订单的服务消息。一个消息,只能被一个客户端收到。 * 2.只有一个消息完全处理完,方法执行完,才能去接受下一个消息。 */ @RabbitListener(queues = {"hello-java-queue"}) public void recieveMessage(Message message, OrderEntity orderEntity, Channel chanhel){ /** * 消息体 */ byte[] body = message.getBody(); /** * 消息头 */ MessageProperties messageProperties = message.getMessageProperties(); System.out.println("接收到消息...:"+message); System.out.println("内容:"+orderEntity); }

2、@RabbitListener+@RabbitHandler

@RabbitListener标在类上,监听队列 @RabbitHandler标在方法上,重载区分不同的消息

通一个队列,可能发给通一个客户端,不同类型的消息。 这种场景:

消息可能是OrderReturnReasonEntity 或OrderEntity 类型的

@Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg") public String sendMsg(@RequestParam(value = "num",defaultValue = "10") Integer num){ for (int i = 0; i < num; i++) { if (i%2==0){ OrderReturnReasonEntity orderReturnReasonEntity=new OrderReturnReasonEntity(); orderReturnReasonEntity.setId(1L); orderReturnReasonEntity.setCreateTime(new Date()); orderReturnReasonEntity.setName("嘻嘻"+i); //最后一个参数表示消息的唯一id rabbitTemplate.convertAndSend("hello-java-exchange","hello.world", orderReturnReasonEntity,new CorrelationData(UUID.randomUUID().toString())); }else { OrderEntity orderEntity=new OrderEntity(); orderEntity.setId(1L); orderEntity.setBillContent("哈哈"+i); rabbitTemplate.convertAndSend("hello-java-exchange","hello.world", orderEntity,new CorrelationData(UUID.randomUUID().toString())); } } log.info("消息发送完成"); return "ok"; }

监听队列: @RabbitListener+@RabbitHandler 监听队列

/** * 监听hello-java-queue队列 */ @RabbitListener(queues = {"hello-java-queue"}) @Service("orderItemService") public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService { /** * 获取队列里OrderEntity类型的消息 * @param orderEntity */ @RabbitHandler public void recieveMessage( OrderEntity orderEntity){ System.out.println("内容OrderEntity:"+orderEntity); } /** * 获取队列里OrderReturnReasonEntity类型的消息 * @param orderReturnReasonEntity */ @RabbitHandler public void recieveMessage(OrderReturnReasonEntity orderReturnReasonEntity){ System.out.println("内容OrderReturnReasonEntity:"+orderReturnReasonEntity); } }

3、多线程消费消息

默认情况下,消费消息是单线程消费的

@RabbitListener(queues = {"gulimall.news"}) @Service public class Listener2 { /** * 获取队列里OrderStep类型的消息 * @param orderEntity */ @RabbitHandler public void recieveMessage( OrderStep orderEntity){ //消费消息 System.out.println("当前线程:"+Thread.currentThread().getId()+" "+Thread.currentThread().getName()+" 内容OrderEntity:"+orderEntity); } //默认是单线程消费消息 //当前线程:49 org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 内容OrderEntity:OrderStep{orderId=1039, desc='创建'} //当前线程:49 org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 内容OrderEntity:OrderStep{orderId=1039, desc='付款'} //当前线程:49 org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 内容OrderEntity:OrderStep{orderId=1039, desc='推送'} //当前线程:49 org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 内容OrderEntity:OrderStep{orderId=1039, desc='完成'} //设置多线程消费消息:好处增加处理消息的效率,坏处,可能打乱消息的消费顺序 //spring.rabbitmq.listener.simple.concurrency=10 //spring.rabbitmq.listener.simple.max-concurrency=10 //当前线程:53 org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-4 内容OrderEntity:OrderStep{orderId=1039, desc='完成'} //当前线程:55 org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-6 内容OrderEntity:OrderStep{orderId=1039, desc='推送'} //当前线程:51 org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-2 内容OrderEntity:OrderStep{orderId=1039, desc='付款'} //当前线程:59 org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-10 内容OrderEntity:OrderStep{orderId=1039, desc='创建'} }

RabbitMQ消息确认机制 - 可靠抵达

保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍。为此引入确认机制消息发布端publisher confirmCallback 确认模式消息发布端publisher returnCallback 未投递到queue退回模式consumer ack模式

确认模式 - ConfirmCallback

消息只要被broker接受到就会执行confirmCallback,如果是集群模式下,需要所有broker接受到才会调用confirmCallback被broker接受到只表示message已经到达服务器,并不能保证消息一定会被投递到目标queue里。所以需要用到接下来的returnCallback如果要使用,需要开启,通过配置的方式开启:spring.rabbitmq.publisher-confirms=true,废弃了,变成publisher-confirm-type=correlated编码模式开启,在创建connectionFactory的时候,设置PublishConfirms(true)选项,开启confirmcallbackCorrelationData:用来表示当前消息唯一性

可靠抵达 - ReturnCallback

confirm模式只能保证消息到达broker,不能保证消息准确投递目标queue里。在有些业务场景下,我们需要保证消息一定要投递到目标queue里,此时就需要用到return退回模式这样如果未能投递到目标queue里将调用ReturnCallback ,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据spring.rabbitmq.publisher-returns=true ##开启发送端确认,消息从broker抵达队列的确认spring.rabbitmq.template.mandatory=true ##只要抵达队列,以异步模式,优先回调returnsConfi

可靠抵达 - ACK消息确认机制

消费者获取到消息,成功处理,可以回复Ack给Broker

basic.ack用于肯定确认;broker将移除此消息basic.nack用于否定确认;可以指定broker是否丢弃此消息,可以批量basic.reject用于否定确认;同上,但不能批量 -默认,消息被消费者受到,就会从broker的queue中移除

queue无消费者,消息依然会被存储,直到消费者消费

消费者受到消息,默认自动ack。但是如果无法确认此消息是否被处理完成,或者成功处理,可以开启手动ack模式

消息处理成功,ack(),接受下一条消息,此消息broker就会移除消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack消息一直没有调用ack或者nack()方法,broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人

发送端消息确认 - 确认模式

配置类:

@Configuration public class RabbitMQConfig { @Autowired private RabbitTemplate rabbitTemplate; @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } /** * 定制RabbitTemplate * 1、服务器broker收到消息就回调 * 1.spring.rabbitmq.publisher-confirm-type=correlated ##开启发送端确认,消息到达broker进行确认 * 2.设置确认回调ConfirmCallback方法 *2、消息正确抵达队列进行回调 * 1.spring.rabbitmq.publisher-returns=true ##开启发送端确认,消息从broker抵达队列的确认 * spring.rabbitmq.template.mandatory=true ##只要抵达队列,以异步模式,优先回调returnsConfirm * */ //@PostConstruct:当前类的构造器执行完后,执行这个方法 @PostConstruct public void initRabbitTemplate(){ //设置消息抵达broker回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 触发时机:只要消息抵达Broker,消息服务器,ack=true * @param correlationData 当前消息的唯一关联数据(这个是消息的唯一id) * @param ack 消息是否成功收到 * @param cause 失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("correlationData["+correlationData+"]==>ack["+ack+"]==>cause["+cause+"]"); } }); //设置消息抵达队列回调 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 触发时机:只要消息没有投递给指定的队列,就触发失败回调 * 比如:客户端发送消息时,指定的路由键,对不上绑定关系中的路由键, * 那么会在交换机发送消息到队列的过程中,由于没有一个绑定关系上的路由键能匹配 * 肯定会发送失败 * @param message 投递失败的消息的详细信息 * @param replyCode 回复的状态码 * @param replyText 回复的内容 * @param exchange 当时这个消息发给哪个交换机 * @param routingKey 当时这个消息的路由键 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { String messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"); System.out.println("Fail Message["+messageId+"]==>replyCode["+replyCode+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]"); } }); } }

配合文件: controller:还是上面的controller,但是第二个发送的消息,故意让消息的路由建和绑定关系中的路由建匹配不上

测试: 结果分析 所有消息从客户端发往broker都是成功的,因为都回调了ConfirmCallback 但是消息从broker发往队列时,有一半的消息发送失败了,这些发送失败的消息,回调了ReturnCallback

correlationData[CorrelationData [id=a99aeeda-158b-426c-9b04-9420927ecbff]]==>ack[true]==>cause[null] Fail Message[99ab7dbc-0b9c-4976-9673-39d7db7be955]==>replyCode[312]==>exchange[hello-java-exchange]==>routingKey[hello.world22] correlationData[CorrelationData [id=99ab7dbc-0b9c-4976-9673-39d7db7be955]]==>ack[true]==>cause[null] Fail Message[dcf262d2-9778-4f8f-8cb4-975c04511fac]==>replyCode[312]==>exchange[hello-java-exchange]==>routingKey[hello.world22] correlationData[CorrelationData [id=dcf262d2-9778-4f8f-8cb4-975c04511fac]]==>ack[true]==>cause[null] correlationData[CorrelationData [id=4fab4e2a-2c06-4ac7-8218-14213fdbaf66]]==>ack[true]==>cause[null] Fail Message[5a3edfe4-c3ce-495c-b0a5-473427c18a5d]==>replyCode[312]==>exchange[hello-java-exchange]==>routingKey[hello.world22] correlationData[CorrelationData [id=5a3edfe4-c3ce-495c-b0a5-473427c18a5d]]==>ack[true]==>cause[null] correlationData[CorrelationData [id=e5e40ee7-5eeb-47a6-afc0-3ab24e713dcb]]==>ack[true]==>cause[null] correlationData[CorrelationData [id=0f4fe776-886f-47af-b5e5-75a61640371e]]==>ack[true]==>cause[null] 2020-09-03 17:54:35.056 INFO 13912 --- [nio-9010-exec-1] c.l.g.order.controller.RabbitController : 消息发送完成 Fail Message[12d065b5-b88e-4a80-b61c-45e0ad7ebab9]==>replyCode[312]==>exchange[hello-java-exchange]==>routingKey[hello.world22] correlationData[CorrelationData [id=12d065b5-b88e-4a80-b61c-45e0ad7ebab9]]==>ack[true]==>cause[null] Fail Message[73bd7464-b109-4b5d-bb15-d6236c8bff92]==>replyCode[312]==>exchange[hello-java-exchange]==>routingKey[hello.world22] correlationData[CorrelationData [id=73bd7464-b109-4b5d-bb15-d6236c8bff92]]==>ack[true]==>cause[null] correlationData[CorrelationData [id=87520e24-9461-4e47-9abb-d2d4a94460f0]]==>ack[true]==>cause[null]

这些都是发送端的消息确认机制,消费端的往下看

消费端消息确认 - ack模式

保证每个消息被正确消费,此时broker才可以删除这个消息。

消费端确认(保证每个消息被正确消息,此时才可以broker删除这个消息) 1.默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息 问题: 消费端收到很多消息,自动回复给服务器ack,服务器就删除了消息,但是在处理消息的过程中,消费端宕机了, 可能只处理成功一个消息,其他全丢失了。 解决:手动确认模式。只要我们没有明确告诉MQ。消息被消费,没有Ack给服务端,消息就一直是unacked状态。 即使消费端宕机,消息也不会丢失,会重新变为ready状态,下一次有新的服务端连接进来就发给它 2.如何手动签收消息 channel.basicAck(deliveryTag, false);签收消息;业务成功完成,就应该签收 channel.basicNack(deliveryTag,false,true);拒签消息;业务失败,就拒签 spring.rabbitmq.listener.simple.acknowledge-mode=manual

配置文件: ##默认是auto模式,自动回复。只要消费端监听到消息,消费端就自动回复ack确认消息,服务端就删除消息 ##manual 手动ack模式 spring.rabbitmq.listener.simple.acknowledge-mode=manual

之前的服务修改: 使用参数Channel 来手动签收

/** * 获取队列里OrderReturnReasonEntity类型的消息 * @param orderReturnReasonEntity */ @RabbitHandler public void recieveMessage(Message message,OrderReturnReasonEntity orderReturnReasonEntity,Channel channel) { //消费消息 System.out.println("内容OrderReturnReasonEntity:" + orderReturnReasonEntity); //消息的标签 long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.println("deliveryTag==>" + deliveryTag); try { //消息的tag%2==0才签收 if (deliveryTag%2==0) { //参数二:是否批量签收消息。 false:只回复当前消息已经处理完的ack给服务端 channel.basicAck(deliveryTag, false); System.out.println("签收消息..."+deliveryTag); }else { /** * 拒绝签收消息 * 参数三:requeue=false丢弃消息;requeue=true将消息发回服务器,重新入队。入队后,再重新发给消费端 * 参数二:是否批量拒绝,false只拒绝当前消息 * 参数一: 当前消息的tag */ channel.basicNack(deliveryTag,false,true); //同上,只不过少了一个参数 //channel.basicReject(); System.out.println("没有签收消息..."+deliveryTag); } }catch (Exception e){ log.error("网络中断!"); } }

controller是上面那张图的controller

测试:发了10个消息,有5个由于路由对不上没发到队列上。只有5个成功发到了队列上,这5个,有些第一次就被签收了,有些被返回队列,又重新发过去,消费

内容OrderReturnReasonEntity:OrderReturnReasonEntity(id=1, name=嘻嘻0, sort=null, status=null, createTime=Thu Sep 03 21:13:53 CST 2020) deliveryTag==>1 没有签收消息...1 内容OrderReturnReasonEntity:OrderReturnReasonEntity(id=1, name=嘻嘻2, sort=null, status=null, createTime=Thu Sep 03 21:13:53 CST 2020) deliveryTag==>2 签收消息...2 内容OrderReturnReasonEntity:OrderReturnReasonEntity(id=1, name=嘻嘻4, sort=null, status=null, createTime=Thu Sep 03 21:13:53 CST 2020) deliveryTag==>3 没有签收消息...3 内容OrderReturnReasonEntity:OrderReturnReasonEntity(id=1, name=嘻嘻6, sort=null, status=null, createTime=Thu Sep 03 21:13:53 CST 2020) deliveryTag==>4 签收消息...4 内容OrderReturnReasonEntity:OrderReturnReasonEntity(id=1, name=嘻嘻8, sort=null, status=null, createTime=Thu Sep 03 21:13:53 CST 2020) deliveryTag==>5 没有签收消息...5 内容OrderReturnReasonEntity:OrderReturnReasonEntity(id=1, name=嘻嘻0, sort=null, status=null, createTime=Thu Sep 03 21:13:53 CST 2020) deliveryTag==>6 签收消息...6 内容OrderReturnReasonEntity:OrderReturnReasonEntity(id=1, name=嘻嘻4, sort=null, status=null, createTime=Thu Sep 03 21:13:53 CST 2020) deliveryTag==>7 没有签收消息...7 内容OrderReturnReasonEntity:OrderReturnReasonEntity(id=1, name=嘻嘻8, sort=null, status=null, createTime=Thu Sep 03 21:13:53 CST 2020) deliveryTag==>8 签收消息...8 内容OrderReturnReasonEntity:OrderReturnReasonEntity(id=1, name=嘻嘻4, sort=null, status=null, createTime=Thu Sep 03 21:13:53 CST 2020) deliveryTag==>9 没有签收消息...9 内容OrderReturnReasonEntity:OrderReturnReasonEntity(id=1, name=嘻嘻4, sort=null, status=null, createTime=Thu Sep 03 21:13:53 CST 2020) deliveryTag==>10 签收消息...10

通过发送端消息确认 - 确认模式 和消费端消息确认 - ack模式,来达到消息的百分百不丢失

如何保证消息的可靠性

一、消息丢失的问题

消息发送出去,由于网络问题没有抵达服务器

做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机制,可记录到数据库,每一个消息做好日志记录,记录下消息的详细信息,采用定期扫描重发的方式做好日志记录,每个消息状态是否都被服务器收到应该记录做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进行重发 DROP TABLE IF EXISTS `mq_message`; CREATE TABLE `mq_message` ( `message_id` char(32) NOT NULL COMMENT '消息id', `content` text COMMENT '消息内容,json格式', `to_exchane` varchar(255) DEFAULT NULL COMMENT '消息的要发给哪个交换机', `routing_key` varchar(255) DEFAULT NULL COMMENT '消息的路由建', `class_type` varchar(255) DEFAULT NULL COMMENT '消息内容的类型', `message_status` int(1) DEFAULT '0' COMMENT '0-新建 1-已经发送 2-错误抵达 3-已抵达', `create_time` datetime DEFAULT NULL COMMENT '创建时间', `update_time` datetime DEFAULT NULL COMMENT '更新时间', PRIMARY KEY (`message_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; 消息抵达Broker,Broker要将消息写入磁盘才算成功(持久化)。此时Broker尚未持久化,宕机了。 publisher消息发送端也必须加入确认回调机制,确认成功的消息,修改数据库消息的状态这个回调机制,在发送端消息确认 - 确认模式中已经有提到了 自动ACK的状态下。消费者收到消息,但没来得及消费消息然后宕机了,但是服务端已经删除消息了 一定开启手动ack,消费成功才移除,失败或者没来得及处理就noAck并重新入队手动ack,消费端消息确认 - ack模式也提到了

小结: 1.做好消息确认机制(publisher消息发布者[回调机制],consumer消息消费者[手动ack]) 2.每一个发送的消息都在数据库做好记录。定期将失败的消息再次发送。

二、消息重复的问题

消息消费成功,事务已经提交,ack时,机器宕机,导致没有ack成功,Broker的消息重新由unack变为ready,并发送给其他消息者消息消费失败,由于重试机制,自动又将消息发送出去,这个是应该的,消费失败,就应该重新消费,没啥问题成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送 消费者的业务消费的接口应该设计为幂等性的,比如:扣减库存有工作单的状态标志,判断当前的库存的状态为锁定状态才进行解锁,否则不解锁使用防重表,发送消息每一个都有业务的唯一标识,处理过就不用处理rabbitmq的每一个消息都有redelivered字段,可以获取是否被重新投递过来,而不是第一次投递过来

消息重复的问题其实就是幂等性问题:

https://blog.csdn.net/weixin_42412601/article/details/108412439

三、消息积压的问题

消费者宕机积压消费者消费能力不足积压发送者发送的流量太大

解决:

上线更多的消费者,进行正常消费上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理

持久化

1、交换机和队列持久化

使用的是spring-boot-starter-amqp时 交换机: 队列: spring代码中,交换机和队列默认都是持久化的

2、消息持久化

需要将消息的投递模式(delivery_mode)设置为2(也就是持久化)。

当我们使用RabbitTemplate调用了convertAndSend(String exchange, String routingKey, final Object object) 方法。默认就是持久化模式。 源码:

public void convertAndSend(String exchange, String routingKey, Object object, CorrelationData correlationData) throws AmqpException { this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData); } protected Message convertMessageIfNecessary(Object object) { return object instanceof Message ? (Message)object : this.getRequiredMessageConverter().toMessage(object, new MessageProperties()); } public MessageProperties() { this.deliveryMode = DEFAULT_DELIVERY_MODE; this.priority = DEFAULT_PRIORITY; } public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE; static { DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT; DEFAULT_PRIORITY = 0; } public static int toInt(MessageDeliveryMode mode) { switch(mode) { case NON_PERSISTENT: return 1; //持久化 case PERSISTENT: return 2; default: return -1; } }

注意:

持久化的消息在到达队列时就被写入到磁盘,并且如果可以,持久化的消息也会在内存中保存一份备份,这样可以提高一定的性能,只有在内存吃紧的时候才会从内存中清除。非持久化的消息一般只保存在内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存空间。

但要注意的是,将所有的消息都设置为持久化,会严重影响RabbitMQ的性能,写入硬盘的速度比写入内存的速度慢的不只一点点。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐率,在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个权衡。

在某种应用场景,如大流量的订单交易系统,为了不影响性能,我们可以不设置持久化,但是我们会定时扫描数据库中的未发送成功的消息,进行重试发送,实际应用场景,我们其实有很多解决方案。

消息的状态

RabbitMQ中消息message的三种状态 Ready:待消费的消息总数。 Unacked:待应答(待确认)的消息总数。 Total:总数Ready+Unacked。

Spring-RabbitMq 参数配置详解

基础信息:

spring.rabbitmq.host: 默认localhost spring.rabbitmq.port: 默认5672 spring.rabbitmq.username: 用户名 spring.rabbitmq.password: 密码 spring.rabbitmq.virtual-host: 连接到代理时用的虚拟主机 spring.rabbitmq.addresses: 连接到server的地址列表(以逗号分隔),先addresses后host spring.rabbitmq.requested-heartbeat: 请求心跳超时时间,0为不指定,如果不指定时间单位默认为妙 spring.rabbitmq.publisher-confirms: 是否启用【发布确认】,默认false spring.rabbitmq.publisher-returns: 是否启用【发布返回】,默认false spring.rabbitmq.connection-timeout: 连接超时时间,单位毫秒,0表示永不超时

SSL:

spring.rabbitmq.ssl.enabled: 是否支持ssl,默认false spring.rabbitmq.ssl.key-store: 持有SSL certificate的key store的路径 spring.rabbitmq.ssl.key-store-password: 访问key store的密码 spring.rabbitmq.ssl.trust-store: 持有SSL certificates的Trust store spring.rabbitmq.ssl.trust-store-password: 访问trust store的密码 spring.rabbitmq.ssl.trust-store-type=JKS:Trust store 类型. spring.rabbitmq.ssl.algorithm: ssl使用的算法,默认由rabiitClient配置 spring.rabbitmq.ssl.validate-server-certificate=true:是否启用服务端证书验证 spring.rabbitmq.ssl.verify-hostname=true 是否启用主机验证

缓存cache:

spring.rabbitmq.cache.channel.size: 缓存中保持的channel数量 spring.rabbitmq.cache.channel.checkout-timeout: 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel spring.rabbitmq.cache.connection.size: 缓存的channel数,只有是CONNECTION模式时生效 spring.rabbitmq.cache.connection.mode=channel: 连接工厂缓存模式:channel 和 connection

Listener:

spring.rabbitmq.listener.type=simple: 容器类型:simple或direct,默认是simple spring.rabbitmq.listener.simple.auto-startup=true: 是否启动时自动启动容器 spring.rabbitmq.listener.simple.acknowledge-mode: 表示消息确认方式,其有三种配置方式,分别是none、manual【手动ack模式】和auto;默认auto spring.rabbitmq.listener.simple.concurrency: 最小的消费者数量 spring.rabbitmq.listener.simple.max-concurrency: 最大的消费者数量 spring.rabbitmq.listener.simple.prefetch: 一个消费者最多可处理的nack消息数量,如果有事务的话,必须大于等于transaction数量. spring.rabbitmq.listener.simple.transaction-size: 当ack模式为auto时,一个事务(ack间)处理的消息数量,最好是小于等于prefetch的数量.若大于prefetch, 则prefetch将增加到这个值 spring.rabbitmq.listener.simple.default-requeue-rejected: 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系) spring.rabbitmq.listener.simple.missing-queues-fatal=true 若容器声明的队列在代理上不可用,是否失败; 或者运行时一个多多个队列被删除,是否停止容器 spring.rabbitmq.listener.simple.idle-event-interval: 发布空闲容器的时间间隔,单位毫秒 spring.rabbitmq.listener.simple.retry.enabled=false: 监听重试是否可用 spring.rabbitmq.listener.simple.retry.max-attempts=3: 最大重试次数 spring.rabbitmq.listener.simple.retry.max-interval=10000ms: 最大重试时间间隔 spring.rabbitmq.listener.simple.retry.initial-interval=1000ms:第一次和第二次尝试传递消息的时间间隔 spring.rabbitmq.listener.simple.retry.multiplier=1: 应用于上一重试间隔的乘数 spring.rabbitmq.listener.simple.retry.stateless=true: 重试时有状态or无状态 spring.rabbitmq.listener.direct.acknowledge-mode= ack模式 spring.rabbitmq.listener.direct.auto-startup=true 是否在启动时自动启动容器 spring.rabbitmq.listener.direct.consumers-per-queue= 每个队列消费者数量. spring.rabbitmq.listener.direct.default-requeue-rejected= 默认是否将拒绝传送的消息重新入队. spring.rabbitmq.listener.direct.idle-event-interval= 空闲容器事件发布时间间隔. spring.rabbitmq.listener.direct.missing-queues-fatal=false若容器声明的队列在代理上不可用,是否失败. spring.rabbitmq.listener.direct.prefetch= 每个消费者可最大处理的nack消息数量. spring.rabbitmq.listener.direct.retry.enabled=false 是否启用发布重试机制. spring.rabbitmq.listener.direct.retry.initial-interval=1000ms # Duration between the first and second attempt to deliver a message. spring.rabbitmq.listener.direct.retry.max-attempts=3 # Maximum number of attempts to deliver a message. spring.rabbitmq.listener.direct.retry.max-interval=10000ms # Maximum duration between attempts. spring.rabbitmq.listener.direct.retry.multiplier=1 # Multiplier to apply to the previous retry interval. spring.rabbitmq.listener.direct.retry.stateless=true # Whether retries are stateless or stateful.

simple和direct容器: https://blog.csdn.net/yingziisme/article/details/86418580

公平分配(Fair dispatch)的问题: 消息的分发,若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。那么,这可能会出现问题,假如某个节点处理消息的速度快, 但是因为公平分配.,这个节点的利用率就比较低了; 为了解决这个问题,我们可以使用 channel.prefetch(n)

n = 1 channel.prefetch(n)
最新回复(0)