消息队列之支付超时,取消订单(死信队列)

tech2024-06-07  82

业务场景

创建订单10分钟之后自动支付订单超时取消…等等…

实现方式

最简单的方式,定时扫表;例如每分钟扫表一次十分钟之后未支付的订单进行主动支付 ; 优点: 简单 缺点: 每分钟全局扫表,浪费资源,有一分钟延迟

使用RabbitMq 实现 RabbitMq实现延迟队列 优点: 开源,现成的稳定的实现方案; 缺点: RabbitMq是一个消息中间件;延迟队列只是其中一个小功能,如果团队技术栈中本来就是使用RabbitMq那还好,如果不是,那为了使用延迟队列而去部署一套RabbitMq成本有点大;

使用Java中的延迟队列,DelayQueue 优点: java.util.concurrent包下一个延迟队列,简单易用;拿来即用 缺点: 单机、不能持久化、宕机任务丢失等等;

本文在这里选用rabbitmq死信队列的方式实现支付超时,取消订单。

1.配置消息队列处开启手动ack 2.定义一个生产者

/** * 生产者 ->生产消息 * 作者:Chris On 2020/09/01 10:44 * 这里声明的amqpTemplate接口,这个接口包含了发送和接收消息的一般操作,换种说法,它不是某个实现所专有的,所以AMQP存在于名称里。这个接口的实现与AMQP协议的实现紧密关联。 * this.amqpTemplate.convertAndSend的第一个参数为延迟交换机的名称,第二个为延时消费routing-key,第三个参数为order操作对象,第四个参数为消息 */ @Component @Slf4j public class DelaySender { // AMQP 高级消息队列协议 @Autowired private AmqpTemplate amqpTemplate; public void sendDelay(ConcurrentHashMap order) { log.info("【订单生成时间】" + new Date().toString() +"【1分钟后检查订单是否已经支付】" + order.toString() ); this.amqpTemplate.convertAndSend(DelayRabbitConfig.ORDER_DELAY_EXCHANGE, DelayRabbitConfig.ORDER_DELAY_ROUTING_KEY, order, message -> { // 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么这一句也可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间 message.getMessageProperties().setExpiration(1 * 1000 * 60 + ""); return message; }); } }

3.消费者->消费消息,由于我们在配置处开启了手动ack,这里消费者消费到消息后要进行应答,否则消息服务器会以为这条消息没处理掉

/** * 接收者 ->消费者 * By Chris On 2020/09/01 10:37 */ @Component @Slf4j public class DelayReceiver { @Resource private MyOrderMasterMapper myOrderMasterMapper; @Autowired private RedisUtil redisUtil; @Autowired private SkuMapper skuMapper; @RabbitListener(queues = {DelayRabbitConfig.ORDER_QUEUE_NAME}) public void orderDelayQueue(ConcurrentHashMap msgMap, Message message, Channel channel) { log.info("消费者接收到消息:{}",message.getBody()); try{ if(ToolUtil.isNotEmpty(message)){ log.info("【orderDelayQueue 监听的消息】 - 【消费时间】 - [{}]- 【订单内容】 - [{}]", new Date(), msgMap.toString()); OrderMaster order = new OrderMaster(); order.setId(Integer.valueOf(msgMap.get("orderId").toString())); order = myOrderMasterMapper.selectById(order.getId()); if (order.getStatus() == 5) {//待支付 order.setStatus(4); order.setUpdReason("订单超时未支付,取消订单"); myOrderMasterMapper.updateById(order); skuMapper.updStockById(msgMap.get("goodsId").toString(),msgMap.get("orderNum").toString()); redisUtil.incr("seckill-"+msgMap.get("goodsId"),Long.parseLong(msgMap.get("orderNum").toString())); } //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }catch (Exception e){ e.printStackTrace(); //丢弃这条消息 try { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } catch (IOException ioException) { ioException.printStackTrace(); } } } }

4.测试接口

@RequestMapping("/sendDelay") public Object sendDelay() { ConcurrentHashMap msgMap = new ConcurrentHashMap(); msgMap.put("orderId","2345123123"); msgMap.put("goodsId",1); msgMap.put("orderNum",1); delaySender.sendDelay(msgMap); return "test--ok"; }

5.测试接口,下订单,此时订单的状态为5:未支付

队列中存在一条消息 1分钟后消息被消费 订单已经被取消,ok,大功告成! 贴上死信队列的配置

@Configuration @Slf4j public class DelayRabbitConfig { // 延迟队列 TTL 名称 private static final String ORDER_DELAY_QUEUE = "order.delay.queue"; // DLX,dead letter发送到的 exchange // 延时消息就是发送到该交换机的 public static final String ORDER_DELAY_EXCHANGE = "order.delay.exchange"; // routing key 名称 // 具体消息发送在该 routingKey 的 public static final String ORDER_DELAY_ROUTING_KEY = "order_delay"; //立即消费的队列名称 public static final String ORDER_QUEUE_NAME = "order.queue"; // 立即消费的exchange public static final String ORDER_EXCHANGE_NAME = "order.exchange"; //立即消费 routing key 名称 public static final String ORDER_ROUTING_KEY = "order"; /** * 创建一个延时队列 */ @Bean public Queue delayOrderQueue() { Map<String, Object> params = new HashMap<>(); // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称, params.put("x-dead-letter-exchange", ORDER_EXCHANGE_NAME); // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。 params.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY); return new Queue(ORDER_DELAY_QUEUE, true, false, false, params); } /** * 创建一个立即消费队列 */ @Bean public Queue orderQueue() { // 第一个参数为queue的名字,第二个参数为是否支持持久化 return new Queue(ORDER_QUEUE_NAME, true); } /** * 延迟交换机 */ @Bean public DirectExchange orderDelayExchange() { // 一共有三种构造方法,可以只传exchange的名字, 第二种,可以传exchange名字,是否支持持久化,是否可以自动删除, // 第三种在第二种参数上可以增加Map,Map中可以存放自定义exchange中的参数 // new DirectExchange(ORDER_DELAY_EXCHANGE,true,false); return new DirectExchange(ORDER_DELAY_EXCHANGE); } /** * 立即消费交换机 */ @Bean public TopicExchange orderTopicExchange() { return new TopicExchange(ORDER_EXCHANGE_NAME); } /** * 把延时队列和 订单延迟交换的exchange进行绑定 * @return */ @Bean public Binding dlxBinding() { return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY); } /** * 把立即队列和 立即交换的exchange进行绑定 * @return */ @Bean public Binding orderBinding() { // TODO 如果要让延迟队列之间有关联,这里的 routingKey 和 绑定的交换机很关键 return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY); } }
最新回复(0)