RocketMQ事务消息思路

tech2025-11-07  5

通过消息队列 RocketMQ 事务消息,能达到分布式事务的最终一致。

模拟A账户转账给B账户操作,这个分布式事务有两个子事务 子事务A:AReduceTransaction()代表A账户扣款 子事务B:BIncreaseTransaction() 代表B账户收款

业务服务器实例一

一:向消息队列服务器发送半消息(半消息无法被消费),如果收到消息队列服务器的ACK代表发送成功,则执行本地事务,(AReduceTransaction),若本地事务执行成功,会通知消息队列服务器将半消息转为正常消息(可以被消费),否则删除半消息。

@RestController @Log4j public class Controller { @Autowired private Producer producer; @GetMapping("/transactionMessage") String send() { ProduceMessage message = ProduceMessage.fromString("YZL_TEST", "J-TEST-5", "lazlaz"); LocalTransactionExecuter localTransactionExecuter = new LocalTransactionExecuter() { @Override public TransactionStatus execute(Message message, Object o) { log.info("【半消息发送成功】 收到ACK messageId = {}", message.getMsgID()); log.info("开始执行本地事务(用户A扣款)"); return AReduceTransaction(message); } }; message.setLocalTransactionExecuter(localTransactionExecuter); message.setShardingKey("abc"); producer.send(message); return "success"; } /** * 用户A付款 账户余额较少 */ TransactionStatus AReduceTransaction(Message message) { try { // 业务逻辑开始 log.info("本地事务(用户A扣款)执行成功,通知半消息转为真正消息"); RedisUtils.set(message.getMsgID(), true); // 业务逻辑结束 // 断电点 return TransactionStatus.CommitTransaction; } catch (Exception e) { RedisUtils.set(message.getMsgID(), false); log.error("本地事务(用户A扣款)执行失败,通知半消息删除"); return TransactionStatus.RollbackTransaction; } } }

二:在业务执行过程中,可能存在网络错误如上述代码(断电点),此时虽然子事务A执行成功了,但是并没有通知消息队列服务器将半消息转为正常消息,导致无法执行子事务B。因此在初始化的时候需要指定回查方法,该方法会按照设置的指定时间间隔进行执行,执行一定次数后放弃。

// 回查方法 public class LocalTransactionCheckerImpl implements LocalTransactionChecker { @Override public TransactionStatus check(Message message) { if (RedisUtils.get(message.getMsgID()) == null) { log.info("未收到本地事务(用户A扣款)执行结果,继续定时回查"); return TransactionStatus.Unknow; } if (RedisUtils.get(message.getMsgID())) { log.info("本地事务(用户A扣款)执行成功,通知半消息转为真正消息"); return TransactionStatus.CommitTransaction; } log.info("本地事务(用户A扣款)执行失败,通知半消息删除"); return TransactionStatus.RollbackTransaction; } }

业务服务器实例二

收到消息后执行第二阶段的事务操作,若事务失败则进行重试,直到操作成功或到达最大重试次数后进行后续干预。

public class MessageListener implements MessageListener { @Override public String getId() { return ""; } @Override public String getTopic() { return "YZL_TEST"; } @Override public String getTag() { return "J-TEST-5"; } @Override public void process(ConsumeMessage message) { log.info("远程事务(用户B收款)开始执行"); BIncreaseTransaction(); } /** * 用户B收款 账户余额增加 */ void BIncreaseTransaction() { // 业务逻辑 } }

更多前沿技术,面试技巧,内推信息请扫码关注公众号“云计算平台技术”

最新回复(0)