RabbitMq的死信队列

tech2022-09-05  109

死信队列

DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。

消息变成死信,可能是由于以下的原因:

消息被拒绝消息过期队列达到最大长度

DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。

要想使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange 指定交换机即可。

定义交换机和队列

/** * @author WGR * @create 2020/9/2 -- 16:26 */ @Configuration public class RabbitMQDLXConfig { @Bean("my_dlx_queue") public Queue myDlxQueue(){ return QueueBuilder.durable("my_dlx_queue").build(); } @Bean("my_dlx_exchange") public Exchange myDlxExchange(){ return ExchangeBuilder.directExchange("my_dlx_exchange").durable(true).build(); } //绑定队列和交换机 @Bean public Binding myTtlDlx1(@Qualifier("my_dlx_queue") Queue queue, @Qualifier("my_dlx_exchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange) .with("my_ttl_dlx").noargs(); } //绑定队列和交换机 @Bean public Binding myTtlDlx2(@Qualifier("my_dlx_queue") Queue queue, @Qualifier("my_dlx_exchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange) .with("my_max_dlx").noargs(); } //声明队列 @Bean("my_ttl_dlx_queue") public Queue myTtlDlxQueue(){ Map<String,Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange","my_dlx_exchange"); arguments.put("x-dead-letter-routing-key","my_ttl_dlx"); arguments.put("x-message-ttl",60000); return QueueBuilder.durable("my_ttl_dlx_queue").withArguments(arguments).build(); } //声明队列 @Bean("my_max_dlx_queue") public Queue myMaxDlxQueue(){ Map<String,Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange","my_dlx_exchange"); arguments.put("x-dead-letter-routing-key","my_max_dlx"); arguments.put("x-max-length",2); return QueueBuilder.durable("my_max_dlx_queue").withArguments(arguments).build(); } @Bean("my_normal_exchange") public Exchange myNormalExchange(){ return ExchangeBuilder.directExchange("my_normal_exchange").durable(true).build(); } //绑定队列和交换机 @Bean public Binding myTtlDlx3(@Qualifier("my_ttl_dlx_queue") Queue queue, @Qualifier("my_normal_exchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange) .with("my_ttl_dlx").noargs(); } //绑定队列和交换机 @Bean public Binding myTtlDlx4(@Qualifier("my_max_dlx_queue") Queue queue, @Qualifier("my_normal_exchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange) .with("my_max_dlx").noargs(); } }

进行测试:

/** * 过期消息投递到死信队列 * 投递到一个正常的队列,但是该队列有设置过期时间,到过期时间之后消息会被投递到死信交换机(队列) */ @Test public void dlxTTLMessageTest(){ rabbitTemplate.convertAndSend( "my_normal_exchange", "my_ttl_dlx", "测试过期消息;6秒过期后会被投递到死信交换机2222"); } /** * 消息长度超过2,会投递到死信队列中 */ @Test public void dlxMaxMessageTest(){ rabbitTemplate.convertAndSend( "my_normal_exchange", "my_max_dlx", "发送消息4:消息长度超过2,会被投递到死信队列中!"); rabbitTemplate.convertAndSend( "my_normal_exchange", "my_max_dlx", "发送消息5:消息长度超过2,会被投递到死信队列中!"); rabbitTemplate.convertAndSend( "my_normal_exchange", "my_max_dlx", "发送消息6:消息长度超过2,会被投递到死信队列中!"); }

3)流程

具体因为队列消息过期而被投递到死信队列的流程:

最新回复(0)