场景1: 企业版需一周内需要上线一个需求。需要 服务端,前端,测试参与。 服务端开发两天完成,前端开发两天完成,但是测试这周排期满了,没有时间测试,那这个需求上线不了。
场景2: 小明需要将A银行卡里的钱,转到B银行。A银行系统扣款成功,B银行入账失败。
场景3: 小王喜欢小唐,但是小唐父母觉得女儿需要个房子住。他们先结婚,几年后再买房。最终小两口还是有房子住
什么是事务
事务(Transaction),一般是指要做的或所做的事情, 通俗讲一般是指一系列操作,要么都成功,要么都不成功,通过确保系统中的独立操作全部成功完成或全部成功取消来维持系统的完整性。
Java实现事务的几种方式:jdbc事务,jta事务,容器事务
本地事务
本地事务一般是指依靠关系型数据库本身的事务特性来实现本地事务。
数据库事务的四大特性 ACID:
A(Atomic) : 原子性,构成事务的所有操作,要么都执行完成,要么全部不执行
C(Consistency):一致性,在事务执行前后,数据库的一致性约束没有被破坏。
I(Isolation): 隔离性,数据库中的事务一般都是并发的,隔离性是指并发的两个事务的执行互不干扰。可设置不同数据库事务隔离级别。(RU, RC, RR, Serializable)
D (Durability):持久性,事务完成之后,该事务对数据的更改会被持久化到数据库,且不会被回滚。
本地事务程序实现
基本流程流程: begin, commit, rollback 操作。
begin transaction; try { write db; commit transaction; } catch(Exception e ){ rollback transaction; }本地事务基本是依靠数据库事务实现。在java中,本地事务基本基于jdbc事务实现,通过Connection 对象控制
public void setAutoCommit(boolean) public void commit() public void rollback()spring 程序中可在方法上添加 @Transactional 注解,或自己通过AOP实现
利: 实现简单。开发少。强一致
局限:局限于单个数据库链接
在分布式系统下。会有多数据库实例,多服务。会产生 跨jvm,同一个数据库;同一个jvm,跨数据库; 跨jvm,跨数据库 三种情况的分布式事务。
理论基础:
分布式系统需要满足 CAP 定理,即:
一致性(consistency):
可用性(avaliable);
分区容忍性(Partition tolerance);
一个分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容忍性(Partition tolerance)这三项中的两项。即在分布式系统中,满足P的前提下,C和A只能满足其中一个。根据实际业务场景选择。
1) AP:
放弃一致性,追求分区容忍性和可用性。这是很多分布式系统设计时的选择。
接受所查询到的数据在一定的时间内不是最新的数据。如消息异步处理同步;通过查询来确认最终处理结果
2)CP:
放弃可用性,追求强一致性和分区容错性。不是系统不可用,只是性能不佳
如 zookeeper选举; 跨行转账,一次转账请求要等待双方银行系统都完成整个事务才算完成。
3)AC:
放弃分区容忍性。不分区,不部署子节点。可不考虑网络,或其他节点挂掉的问题,实现强一致性或可用性。但不符合现有多数分布式系统设计原则。
多数业务场景,追求性能体验。允许暂时的数据不一致。即AP系统设计
BASE 理论
BASE 是 Basically Available(基本可用)、Soft state(软状态)和 Eventually consistent (最终一致性)三个短语的缩写。BASE理论是对CAP中AP的一个扩展,通过牺牲强一致性来获得可用性,当出现故障允许部分不可用但要保证核心功能可用,允许数据在一段时间内是不一致的,但最终达到一致状态。满足BASE理论的事务,我们称之为“柔性事务”。
1、两段提交 (2PC)
2PC即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段(Prepare phase)、提交阶段(commit phase),2是指两个阶段,P是指准备阶段,C是指提交阶段。
基本流程:
XA 规范
X/Open DTP 模型:
包括应用程序( AP ):
事务管理器( TM ): transaction-manager
资源管理器( RM ):DB
通信资源管理器( CRM ):网络通信 (可忽略)
基于TM和RM之间通讯的接口规范为XA规范,可通过数据库的XA协议实现2PC方案。
实现
通过JTA(java transaction api)实现。
在java程序中,如果不使用 JBOSS, WEBLOGIC等容器。可通过 jotm, atomikos等方案实现。
也可通过 seata 框架实现。
利: 强一致性,开发量小,业务不侵入
局限:资源锁定时间长,不适用于高并发场景。 依赖资源层(RM)。
存在的问题:
同步阻塞问题。执行过程中,所有参与节点都是事务阻塞型的。当参与者占有公共资源时,其他第三方节点访问公共资源不得不处于阻塞状态
单点故障。由于协调者的重要性,一旦协调者发生故障。参与者会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。(如果是协调者挂掉,可以重新选举一个协调者,但是无法解决因为协调者宕机导致的参与者处于阻塞状态的问题)
数据不一致。在二阶段提交的阶段二中,只有部分commit
由上述问题,引入三阶段提交
2、tcc (try、confirm、cancel)
补偿型事务
原理:
各步骤职责:
try阶段:
1)业务检查
2)预留本次业务所需资源
confirm 阶段(try正常执行无异常):
1)执行业务
2)不做业务检查
3)只使用try阶段所预留的资源
4)confirm接口满足幂等(会重试)
cancel 阶段(try执行发生异常时):
1)释放try阶段预留的资源
2)cancel操作满足幂等(重试)
注意事项:
1、空回滚:在没有调用try方法的情况下,调用了cancel。cancel需要识别空回滚,直接返回成功。(如:由于网络异常或子业务宕机等情况,子业务的try方法没有执行或者没有进行资源锁定。在网络或服务恢复之后,会调用cancel方法)
2、执行顺序:在try方法超时的情况下,直接调用cancel方法。会导致cancel逻辑先执行完成,而try方法再进行资源锁定。这种情况下锁定的资源将不能释放。 需要延迟cancel。
3、confirm和cancel保证幂等:因为会失败重试。如果未保证幂等,则会重复使用或者重复释放资源。会导致数据不一致。
利:tcc属于补偿型事务,由多个分支事务组成,分支逻辑执行之后会提交本地事务,不会长时间锁定数据库资源。由业务逻辑实现提交,补偿逻辑。更加灵活
局限:提交,补偿逻辑由业务代码实现,开发量大。有一定的业务侵入。最终一致
2、
框架1: tcc-transaction
github:https://github.com/changmingxie/tcc-transaction
1) TCC 实现
1、try、 confirm、 cancel 在框架中的使用
官方demo
Try 方法:
@Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = true) public void makePayment(Order order, BigDecimal redPacketPayAmount, BigDecimal capitalPayAmount) { System.out.println("order try make payment called.time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss")); }confirm方法:
public void confirmMakePayment(Order order, BigDecimal redPacketPayAmount, BigDecimal capitalPayAmount) { }cancel方法:
public void cancelMakePayment(Order order, BigDecimal redPacketPayAmount, BigDecimal capitalPayAmount) { }2、事务管理器
事务管理类:
org.mengyun.tcctransaction.TransactionManager
1) 发起根事务:
/** * 发起根事务 * @return */ public Transaction begin() { // 构建事务,开始状态为 TRYING Transaction transaction = new Transaction(TransactionType.ROOT); // 存储事务 transactionRepository.create(transaction); // 注册添加事务 registerTransaction(transaction); return transaction; }注册添加事务:
/** 设置双向队列,可以实现事务嵌套调用. threadLocal 类型,每个线程独立 */ private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>(); /** * 注册事务: 将事务对象放入到队列中 * @param transaction */ private void registerTransaction(Transaction transaction) { if (CURRENT.get() == null) { CURRENT.set(new LinkedList<Transaction>()); } CURRENT.get().push(transaction); }
2) 发起分支事务
/** * 发起分支事务 * @param transactionContext * @return */ public Transaction propagationNewBegin(TransactionContext transactionContext) { //带参数构建事务,传递原有事务ID(xid) Transaction transaction = new Transaction(transactionContext); //存储事务 transactionRepository.create(transaction); //注册事务 registerTransaction(transaction); return transaction; }
3) 提交事务
/** * 提交事务 * @param asyncCommit */ public void commit(boolean asyncCommit) { // 获取当前事务 final Transaction transaction = getCurrentTransaction(); // 更新事务状态为 confirming transaction.changeStatus(TransactionStatus.CONFIRMING); transactionRepository.update(transaction); // 根据 参数 asyncConfirm 判断是否为异步执行 confirm 逻辑 if (asyncCommit) { try { Long statTime = System.currentTimeMillis(); executorService.submit(new Runnable() { @Override public void run() { commitTransaction(transaction); } }); logger.debug("async submit cost time:" + (System.currentTimeMillis() - statTime)); } catch (Throwable commitException) { logger.warn("compensable transaction async submit confirm failed, recovery job will try to confirm later.", commitException); throw new ConfirmingException(commitException); } } else { commitTransaction(transaction); } } private void commitTransaction(Transaction transaction) { try { // 调用执行当前事务 confirm method transaction.commit(); // confirm 执行成功后,当前事务整体完成,删除所存储的事务 transactionRepository.delete(transaction); } catch (Throwable commitException) { // 如果执行confirm逻辑异常,则抛出,等待事务恢复逻辑再次处理 logger.warn("compensable transaction confirm failed, recovery job will try to confirm later.", commitException); throw new ConfirmingException(commitException); } }
4)回滚事务
/** * 回滚事务 * @param asyncRollback */ public void rollback(boolean asyncRollback) { // 获取当前事务 final Transaction transaction = getCurrentTransaction(); // 更新当前事务状态为取消状态 transaction.changeStatus(TransactionStatus.CANCELLING); transactionRepository.update(transaction); // 根据 asyncCancel 判断是否异步执行回滚逻辑 if (asyncRollback) { try { executorService.submit(new Runnable() { @Override public void run() { rollbackTransaction(transaction); } }); } catch (Throwable rollbackException) { logger.warn("compensable transaction async rollback failed, recovery job will try to rollback later.", rollbackException); throw new CancellingException(rollbackException); } } else { rollbackTransaction(transaction); } } private void rollbackTransaction(Transaction transaction) { try { // 调用执行 cancel method transaction.rollback(); // 如果cancel执行成功。则当前事务逻辑完整完成,删除所存储的事务 transactionRepository.delete(transaction); } catch (Throwable rollbackException) { // 如果执行cancel逻辑异常,则抛出,等待事务恢复逻辑进行重试处理 logger.warn("compensable transaction rollback failed, recovery job will try to rollback later.", rollbackException); throw new CancellingException(rollbackException); } }
3、事务拦截(AOP)
拦截注解 @Compensable
@Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD}) public @interface Compensable { // 事务传播级别 (同 spring 事务的传播级别) public Propagation propagation() default Propagation.REQUIRED; // 确认执行任务的方法名 public String confirmMethod() default ""; // 取消执行任务的方法名 public String cancelMethod() default ""; // 设置参数上下文 public Class<? extends TransactionContextEditor> transactionContextEditor() default DefaultTransactionContextEditor.class; public boolean asyncConfirm() default false; public boolean asyncCancel() default false; }
AOP 拦截:
两个拦截器:
可补偿事务方法执行拦截器: 拦截并执行 confirm、 cancel逻辑
org.mengyun.tcctransaction.interceptor.CompensableTransactionAspect
资源协调拦截器: 添加参与者信息到事务对象中。及设置参数等信息
org.mengyun.tcctransaction.interceptor.ResourceCoordinatorAspect
具体实现:
org.mengyun.tcctransaction.interceptor.CompensableTransactionAspect
通过 AOP 的 @PointCut 和 @Around 对 @Compensable注解进行拦截
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable { Method method = CompensableMethodUtils.getCompensableMethod(pjp); Compensable compensable = method.getAnnotation(Compensable.class); Propagation propagation = compensable.propagation(); // 获取上下文, 如果此时 methodType 是 ROOT(事务入口),则没有上下文 TransactionContext transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()); boolean asyncConfirm = compensable.asyncConfirm(); boolean asyncCancel = compensable.asyncCancel(); boolean isTransactionActive = transactionManager.isTransactionActive(); if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, propagation, transactionContext)) { throw new SystemException("no active compensable transaction while propagation is mandatory for method " + method.getName()); } MethodType methodType = CompensableMethodUtils.calculateMethodType(propagation, isTransactionActive, transactionContext); switch (methodType) { case ROOT: return rootMethodProceed(pjp, asyncConfirm, asyncCancel); case PROVIDER: return providerMethodProceed(pjp, transactionContext, asyncConfirm, asyncCancel); default: return pjp.proceed(); } }
根事务执行逻辑 (事务处理核心逻辑):
private Object rootMethodProceed(ProceedingJoinPoint pjp, boolean asyncConfirm, boolean asyncCancel) throws Throwable { Object returnValue = null; Transaction transaction = null; try { // 开始事务, 创建及注册事务 transaction = transactionManager.begin(); try { // 执行 try 方法 returnValue = pjp.proceed(); } catch (Throwable tryingException) { /** * 如果try 方法执行异常。且不是自定义(超时等异常),则进行回滚操作(执行cancel逻辑) * 为什么 try 方法超时时,不立马进行回滚: try 方法内进行远程调用且超时时,如果立马进行回滚操作。 * 此时try可能还在执行,正在冻结资源等,会导致cancel操作不能完全释放资源 * */ if (!isDelayCancelException(tryingException)) { logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException); transactionManager.rollback(asyncCancel); } throw tryingException; } // 如果try方法成功执行无异常。则提交事务(执行confirm逻辑) transactionManager.commit(asyncConfirm); } finally { transactionManager.cleanAfterCompletion(transaction); } return returnValue; }
2)事务存储
为实现事务恢复逻辑,在执行异常时持久化。需要保存事务信息及事务执行状态
支持 ZK ,文件,redis,DB等方式持久化
不一一展开。基本的CRUD操作
3)事务恢复
事务恢复,是指在 confirm 或者 cancel操作异常时,进行重试的逻辑。
重试主要通过 quartz 实现,按照配置策略,定时任务查询存储的事务,并根据状态进行重试.
org.mengyun.tcctransaction.spring.recover.DefaultRecoverConfig 默认恢复配置:
public class DefaultRecoverConfig implements RecoverConfig { public static final RecoverConfig INSTANCE = new DefaultRecoverConfig(); /**最大重试次数*/ private int maxRetryCount = 30; /**重试间隔 120 s*/ private int recoverDuration = 120; /**定时任务cron表达式,每分钟执行一次*/ private String cronExpression = "0 */1 * * * ?"; /** 异步执行线程池大小 */ private int asyncTerminateThreadPoolSize = 1024; /**延迟回滚异常 集合*/ private Set<Class<? extends Exception>> delayCancelExceptions = new HashSet<Class<? extends Exception>>(); /** * 延迟回滚异常:包含,乐观锁异常,连接超时异常 */ public DefaultRecoverConfig() { delayCancelExceptions.add(OptimisticLockException.class); delayCancelExceptions.add(SocketTimeoutException.class); } }
定时任务执行:
public void startRecover() { // 查询加载执行异常的事务 List<Transaction> transactions = loadErrorTransactions(); // 恢复执行异常的事务 recoverErrorTransactions(transactions); }
为确保在集群多机情况下,不重复执行任务。会在更新事务状态时加上乐观锁判断,更新成功之后,才进行下一步操作(confirm或cancel等)
public int update(Transaction transaction) { int result = 0; try { result = doUpdate(transaction); if (result > 0) { putToCache(transaction); } else { throw new OptimisticLockException(); } } finally { if (result <= 0) { removeFromCache(transaction); } } return result; }
框架2: https://github.com/Dromara/hmily
3、MQ消息最终一致
基本流程
Rocket MQ 如何实现事务消息:
通过MQ消息来通知其他业务执行事务。 可靠MQ消息需要实现事务消息,上图以rocketMq为例,发送事务消息。
业务B消费消息接口,需要实现: 消费失败后能够重新消费;接口实现幂等。
4、最大努力通知
基本流程:
现有大多数支付系统支付结果的处理逻辑。支付宝,微信,银联在线支付等
适用于 不同公司系统间,http调用场景下处理
支付宝支付时序图:
总结
2PC 最大的诟病是一个阻塞协议。RM在执行分支事务后需要等待TM的决定,此时服务会阻塞并锁定资源。由于其阻塞机制和最差时间复杂度高, 因此,这种设计不能适应随着事务涉及的服务数量增加而扩展的需要,很难用于并发较高以及子事务生命周期较长 (long-running transactions) 的分布式服务中。
TCC在应用层面的处理,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据操作的粒度,使得降低锁冲突、提高吞吐量成为可能。而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现try、confirm、cancel三个操作。此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。典型的使用场景:满,登录送优惠券等。
可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。引入消息机制后,同步的事务操作变为基于消息执行的异步操作, 避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。典型的使用场景:注册送积分,登录送优惠券等。
最大努力通知是分布式事务中要求最低的一种,适用于一些最终一致性时间敏感度低的业务;允许发起通知方处理业务失败,在接收通知方收到通知后积极进行失败处理,无论发起通知方如何处理结果都会不影响到接收通知方的后续处理;发起通知方需提供查询执行情况接口,用于接收通知方校对结果。典型的使用场景:银行通知、支付结果通知等。
对比分析:
指标2PCtccMQ消息最大努力通知一致性强一致性最终一致最终一致最终一致吞吐量低中高高实现复杂度简单难中简单