Spring:事务处理的实现

tech2023-05-08  118

事务处理的实现

Spring在对事务进行统一处理其实底层还是使用到了aop,TransactionProxyFactoryBean将对事务的处理事件使用aop做增强和织入。对数据源的事务处理(提交、回滚)的实现是通过事务管理器TransactionManager来进行支持的。通过TransactionAspectSupport来赋予TransactionManager的事务处理可以通过aop进行操作。

事务处理拦截器的配置

事务处理拦截器是对target的方法进行拦截,然后调用TransactionManager对方法的事务进行管理。

事务处理拦截器是在TransactionProxyFactoryBean的父类AbstractSingletonProxyFactoryBean中进行配置的。在bean的生命周期中,在处理init-method的方法中,先调用后置处理器的before,然后如果是InitializingBean类型的bean还会执行afterPropertiesSet方法,然后才会依次执行init-method和后置处理器的after方法。

afterPropertiesSet方法

// AbstractSingletonProxyFactoryBean#afterPropertiesSet public void afterPropertiesSet() { // <1> 对target的;类型进行检查,必须是一个bean reference if (this.target == null) { throw new IllegalArgumentException("Property 'target' is required"); } if (this.target instanceof String) { throw new IllegalArgumentException("'target' needs to be a bean reference, not a bean name as value"); } if (this.proxyClassLoader == null) { this.proxyClassLoader = ClassUtils.getDefaultClassLoader(); } // 创建一个ProxyFactory,和aop中的ProxyFactoryBean功能很相近, // 能够操作拦截器和创建代理对象 ProxyFactory proxyFactory = new ProxyFactory(); // <2> 将拦截器包装成为advisor,通过aop知道ProxyFactory可以管理advisor,进而操作通知器链等 // 首先是preInterceptors 然后加入TransactionInterceptor的advisor 最后postInterceptors if (this.preInterceptors != null) { for (Object interceptor : this.preInterceptors) { proxyFactory.addAdvisor(this.advisorAdapterRegistry.wrap(interceptor)); } } // Add the main interceptor (typically an Advisor). // <3> 这是对TransactionInterceptor创建对应的advisor proxyFactory.addAdvisor(this.advisorAdapterRegistry.wrap(createMainInterceptor())); if (this.postInterceptors != null) { for (Object interceptor : this.postInterceptors) { proxyFactory.addAdvisor(this.advisorAdapterRegistry.wrap(interceptor)); } } // 对proxyFactory进行一些设置,在使用proxyFactory创建代理对象的时候需要 proxyFactory.copyFrom(this); TargetSource targetSource = createTargetSource(this.target); proxyFactory.setTargetSource(targetSource); if (this.proxyInterfaces != null) { proxyFactory.setInterfaces(this.proxyInterfaces); } else if (!isProxyTargetClass()) { // Rely on AOP infrastructure to tell us what interfaces to proxy. Class<?> targetClass = targetSource.getTargetClass(); if (targetClass != null) { proxyFactory.setInterfaces(ClassUtils.getAllInterfacesForClass(targetClass, this.proxyClassLoader)); } } postProcessProxyFactory(proxyFactory); // <4> 创建代理对象,过程和aop中的一样(拦截器链在这个方法中已经构建好了) this.proxy = proxyFactory.getProxy(this.proxyClassLoader); }

<3>中的wrap和aop中的方法是一样的。重点在于createMainInterceptor方法做了什么。Interceptor需要有Advice和Pointcut。

如果已经存在pointcut就是使用DefaultPointcutAdvisor,否则就创建TransactionAttributeSourceAdvisor。

private final TransactionInterceptor transactionInterceptor = new TransactionInterceptor(); protected Object createMainInterceptor() { // 做一些检查 this.transactionInterceptor.afterPropertiesSet(); if (this.pointcut != null) { return new DefaultPointcutAdvisor(this.pointcut, this.transactionInterceptor); } else { // Rely on default pointcut. return new TransactionAttributeSourceAdvisor(this.transactionInterceptor); } }

事务处理配置的读入

TransactionAttributeSourceAdvisor类

这是在事务处理中比较常用的通知器。通知器需要有Advice和Pointcut,它是如何配置的。Advice其实就是TransactionInterceptor。Pointcut其实就是TransactionAttributeSourcePointcut,可以从对事务的配置属性中读取信息。

private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() { @Override @Nullable protected TransactionAttributeSource getTransactionAttributeSource() { return (transactionInterceptor != null ? transactionInterceptor.getTransactionAttributeSource() : null); } };

TransactionAttributeSourcePointcut类

在这个类中就实现了对拦截方法的检查,是否能够拦截该事务。

使用TransactionAttributeSource,根据方法名判断是否有匹配的TransactionAttribute如果有那么就可以拦截。而TransactionAttributeSource通过TransactionInterceptor获得。TransactionAttributeSource属性是TransactionInterceptor在注入依赖的时候配置好的。

public boolean matches(Method method, Class<?> targetClass) { TransactionAttributeSource tas = getTransactionAttributeSource(); return (tas == null || tas.getTransactionAttribute(method, targetClass) != null); } // TransactionAspectSupport#setTransactionAttributes public void setTransactionAttributes(Properties transactionAttributes) { NameMatchTransactionAttributeSource tas = new NameMatchTransactionAttributeSource(); tas.setProperties(transactionAttributes); this.transactionAttributeSource = tas; } // NameMatchTransactionAttributeSource#getTransactionAttribute /** Keys are method names; values are TransactionAttributes. */ private Map<String, TransactionAttribute> nameMap = new HashMap<>(); public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) { if (!ClassUtils.isUserLevelMethod(method)) { return null; } // Look for direct name match. String methodName = method.getName(); TransactionAttribute attr = this.nameMap.get(methodName); if (attr == null) { // Look for most specific name match. String bestNameMatch = null; for (String mappedName : this.nameMap.keySet()) { if (isMatch(methodName, mappedName) && (bestNameMatch == null || bestNameMatch.length() <= mappedName.length())) { attr = this.nameMap.get(mappedName); bestNameMatch = mappedName; } } } return attr; }

事务处理器拦截器的作用

TransactionProxyFactoryBean的getObject方法获取代理对象。

// AbstractSingletonProxyFactoryBean#getObject public Object getObject() { if (this.proxy == null) { throw new FactoryBeanNotInitializedException(); } return this.proxy; }

proxy就是在执行afterPropertiesSet方法时进行设置的。和aop一样是通过DefaultAopProxyFactory的createAopProxy方法创建。所以在执行通知器链的时候对target的代理实现原理也是一样的。具体的不同就是多了专门用来处理事务的拦截器TransactionInterceptor。

在遍历MethodInterceptor的时候会执行invoke方法。那么TransactionInterceptor的invoke就是对target的方法能够进行事务处理的地方,所以在invoke方法中会使用到TransactionManager。

invoke方法

// TransactionInterceptor#invoke public Object invoke(MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // <1> Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); }

<1>最后是使用lambda表达式创建了一个InvocationCallback对象,当调用InvocationCallback#proceedWithInvocation的时候会调用ReflectiveMethodInvocation#proceed。

invokeWithinTransaction方法

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. TransactionAttributeSource tas = getTransactionAttributeSource(); // 得到事务属性 final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); // 得到事务管理器 final TransactionManager tm = determineTransactionManager(txAttr); // <1> 有关于reactive响应式的事务管理器 if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) { ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> { if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) { throw new TransactionUsageException( "Unsupported annotated transaction on suspending function detected: " + method + ". Use TransactionalOperator.transactional extensions instead."); } ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType()); if (adapter == null) { throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " + method.getReturnType()); } return new ReactiveTransactionSupport(adapter); }); return txSupport.invokeWithinTransaction( method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm); } // <2> PlatformTransactionManager事务管理器的处理 // 对于CallbackPreferringPlatformTransactionManager类型的事务管理器需要使用回调函数 // 实现事务的创建和提交 PlatformTransactionManager ptm = asPlatformTransactionManager(tm); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. // <3> 创建事务,将事务的属性和状态设置到TransactionInfo TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); Object retVal; try { // 这是一个环绕通知:执行在链中的下一个 拦截器. // 这是执行目标方法之后的正常的结果. retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // 事务出现异常,根据情况进行回滚和提交 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { // 把与线程绑定的TransactionInfo都置为旧的 cleanupTransactionInfo(txInfo); } if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... TransactionStatus status = txInfo.getTransactionStatus(); if (status != null && txAttr != null) { retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } } // 对事物进行提交 commitTransactionAfterReturning(txInfo); return retVal; } // 通过回调的方式进行处理 else { Object result; final ThrowableHolder throwableHolder = new ThrowableHolder(); // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. try { result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> { TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status); try { Object retVal = invocation.proceedWithInvocation(); if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } return retVal; } catch (Throwable ex) { // 判断如果能够处理该异常,对异常进行处理 if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } // 正常返回 else { // A normal return value: will lead to a commit. throwableHolder.throwable = ex; return null; } } finally { cleanupTransactionInfo(txInfo); } }); } catch (ThrowableHolderException ex) { throw ex.getCause(); } catch (TransactionSystemException ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); ex2.initApplicationException(throwableHolder.throwable); } throw ex2; } catch (Throwable ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); } throw ex2; } // Check result state: It might indicate a Throwable to rethrow. if (throwableHolder.throwable != null) { throw throwableHolder.throwable; } return result; } }

事务的创建

事务是由createTransactionIfNecessary方法创建的,返回TransactionInfo。

在TransactionInfo中记录了事务所需要的一些信息。

protected static final class TransactionInfo { @Nullable // 事务管理器 private final PlatformTransactionManager transactionManager; @Nullable // 事务的一些配置属性 private final TransactionAttribute transactionAttribute; // 方法标识 private final String joinpointIdentification; @Nullable // 有关事务的信息 private TransactionStatus transactionStatus; @Nullable // 旧事物 private TransactionInfo oldTransactionInfo; }

createTransactionIfNecessary方法

private static final ThreadLocal<TransactionInfo> transactionInfoHolder = new NamedThreadLocal<>("Current aspect-driven transaction"); // TransactionAspectSupport#createTransactionIfNecessary protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { // 如果没有名字,就将方法的标识作为事务的名字 if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { // <1> 创建TransactionStatus status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } // <2> 准备将信息封装给TransactionStatus return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }

将事务的处理信息与调用事务方法的当前线程绑定起来。

// TransactionAspectSupport#prepareTransactionInfo protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, String joinpointIdentification, @Nullable TransactionStatus status) { // 总是创建新的 TransactionInfo,无论是否产生新的事务 TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification); if (txAttr != null) { // We need a transaction for this method... if (logger.isTraceEnabled()) { logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]"); } // The transaction manager will flag an error if an incompatible tx already exists. txInfo.newTransactionStatus(status); } else { // The TransactionInfo.hasTransaction() method will return false. We created it only // to preserve the integrity of the ThreadLocal stack maintained in this class. if (logger.isTraceEnabled()) { logger.trace("No need to create transaction for [" + joinpointIdentification + "]: This method is not transactional."); } } // We always bind the TransactionInfo to the thread, even if we didn't create // a new transaction here. This guarantees that the TransactionInfo stack // will be managed correctly even if no transaction was created by this aspect. // <3> 将TransactionInfo与线程绑定起来(无论是否产生新的事务,都会用新的TransactionInfo) // 将事务的处理信息与调用事务方法的当前线程绑定起来 txInfo.bindToThread(); return txInfo; } // TransactionAspectSupport.TransactionInfo#bindToThread private void bindToThread() { // Expose current TransactionStatus, preserving any existing TransactionStatus // for restoration after this transaction is complete. // TransactionInfo成链式结构 this.oldTransactionInfo = transactionInfoHolder.get(); transactionInfoHolder.set(this); }

getTransaction方法

getTransaction是通过各种TransactionManager的具体实现,获取对应的TransactionStartus(对事件的具体实现)。

// AbstractPlatformTransactionManager#getTransaction public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { // 如果没有事务属性配置就是用默认的属性配置. TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults()); // <1> transaction对象由具体的事务处理器去创建 Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); // 如果当前线程已经存在事务,会根据事务的传播行为进行相应的处理 if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(def, transaction, debugEnabled); } // Check definition settings for new transaction. if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout()); } // 如果不存在事务,就根据事务的配置进行操作 if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 与挂起有关 SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def); } try { // <2> 创建TransactionStatus return startTransaction(def, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error ex) { // <3> 创建过程中产生异常的恢复 resume(null, suspendedResources); throw ex; } } else { // 创建空事务: no actual transaction, but potentially synchronization. if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + def); // 默认TransactionSynchronization() == SYNCHRONIZATION_ALWAYS boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null); } }

举例transaction对象:ConnectionHolder是对jdbc connection的包装。

private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport { private boolean newConnectionHolder; private boolean mustRestoreAutoCommit; public void setConnectionHolder(@Nullable ConnectionHolder connectionHolder, boolean newConnectionHolder) { super.setConnectionHolder(connectionHolder); this.newConnectionHolder = newConnectionHolder; } public boolean isNewConnectionHolder() { return this.newConnectionHolder; } public void setMustRestoreAutoCommit(boolean mustRestoreAutoCommit) { this.mustRestoreAutoCommit = mustRestoreAutoCommit; } public boolean isMustRestoreAutoCommit() { return this.mustRestoreAutoCommit; } public void setRollbackOnly() { getConnectionHolder().setRollbackOnly(); } @Override public boolean isRollbackOnly() { return getConnectionHolder().isRollbackOnly(); } @Override public void flush() { if (TransactionSynchronizationManager.isSynchronizationActive()) { TransactionSynchronizationUtils.triggerFlush(); } }

handleExistingTransaction方法

如果通过AbstractPlatformTransactionManager#isExistingTransaction方法检测是否已经存在事务。

protected boolean isExistingTransaction(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive()); }

这里判断是否存在的判断是通过DataSourceTransactionObject.connectionHolder是否存在来判断的。而这个connectionHolder则是在创建transaction对象的时候创建的:

protected Object doGetTransaction() { DataSourceTransactionObject txObject = new DataSourceTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed()); // 首先通过obtainDataSource得到数据源,然后得到连接 ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource()); txObject.setConnectionHolder(conHolder, false); return txObject; }

而connectionHolder是根据DataSource从一个ThreadLocal<Map>中获取的。

private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources"); private static Object doGetResource(Object actualKey) { Map<Object, Object> map = resources.get(); if (map == null) { return null; } Object value = map.get(actualKey); // Transparently remove ResourceHolder that was marked as void... if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { map.remove(actualKey); // Remove entire ThreadLocal if empty... if (map.isEmpty()) { resources.remove(); } value = null; } return value; }

handleExistingTransaction对各种传播形式进行处理。

// AbstractPlatformTransactionManager#handleExistingTransaction private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { // PROPAGATION_NEVER不支持事务,会抛出异常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } // PROPAGATION_NOT_SUPPORTED也是不支持事务,会将现有的事务挂起 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } // PROPAGATION_REQUIRES_NEW会将之前的事务挂起,然后创建一个新的事务。 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } SuspendedResourcesHolder suspendedResources = suspend(transaction); try { return startTransaction(definition, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } } // PROPAGATION_NESTED嵌入事务(保存点) if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } if (useSavepointForNestedTransaction()) { // Create savepoint within existing Spring-managed transaction, // through the SavepointManager API implemented by TransactionStatus. // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization. DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); status.createAndHoldSavepoint(); return status; } else { //通过嵌套的begin和commit / rollback调用进行的嵌套事务。 //通常仅用于JTA:如果存在预先存在的JTA事务,则可以在此处激活Spring同步。 return startTransaction(definition, transaction, debugEnabled, null); } } // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED. if (debugEnabled) { logger.debug("Participating in existing transaction"); } // 对其他情况的处理 if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)")); } } if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is"); } } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); }

事务的挂起

事务挂起是通过AbstractPlatformTransactionManager#suspend方法完成的。

// AbstractPlatformTransactionManager#suspend protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException { if (TransactionSynchronizationManager.isSynchronizationActive()) { List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization(); try { Object suspendedResources = null; if (transaction != null) { suspendedResources = doSuspend(transaction); // <1> 根据不同的事务处理器实现构建suspendedResources(其实就是ConnectionHolder) } // <2> 将当前线程的记录有关事务状态的Threadlocal进行复原,进入无事务的状态 String name = TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName(null); boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null); boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false); return new SuspendedResourcesHolder( suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } catch (RuntimeException | Error ex) { // doSuspend failed - original transaction is still active... doResumeSynchronization(suspendedSynchronizations); throw ex; } } else if (transaction != null) { // Transaction active but no synchronization active. Object suspendedResources = doSuspend(transaction); return new SuspendedResourcesHolder(suspendedResources); } else { // Neither transaction nor synchronization active. return null; } }

这个方法其实就是使用doSuspend方法将当前事务的状态保存在suspendedResources,最后将suspendedResources以及一些状态封装到SuspendedResourcesHolder中返还。返还的SuspendedResourcesHolder会保存在一个新建的TransactionStatus.suspendedResources中(无论有无事务,都会创建这个对象),方便恢复操作。

事务的提交

TransactionAspectSupport#commitTransactionAfterReturning完成的是对事务的提交。

// TransactionAspectSupport#commitTransactionAfterReturning protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) { if (txInfo != null && txInfo.getTransactionStatus() != null) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]"); } txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); // <1> } }

<1>可以看到是是由TransactionManager的具体实现决定的。

事务的回滚

TransactionAspectSupport#completeTransactionAfterThrowing完成事务的回滚。

// TransactionAspectSupport#completeTransactionAfterThrowing protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) { if (txInfo != null && txInfo.getTransactionStatus() != null) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "] after exception: " + ex); } if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) { try { txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); // <1> } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by rollback exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException | Error ex2) { logger.error("Application exception overridden by rollback exception", ex); throw ex2; } } else { // We don't roll back on this exception. // Will still roll back if TransactionStatus.isRollbackOnly() is true. try { txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); // <2> } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by commit exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException | Error ex2) { logger.error("Application exception overridden by commit exception", ex); throw ex2; } } } } // DefaultTransactionAttribute#rollbackOn public boolean rollbackOn(Throwable ex) { return (ex instanceof RuntimeException || ex instanceof Error); }

对RuntimeException和Error进行catch处理可以使用具体TransactionManager的实现的rollback回滚,否则无法处理会继续commit。

事务挂起的恢复

// AbstractPlatformTransactionManager#resume protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder) throws TransactionException { if (resourcesHolder != null) { Object suspendedResources = resourcesHolder.suspendedResources; if (suspendedResources != null) { doResume(transaction, suspendedResources); } List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations; if (suspendedSynchronizations != null) { TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel); TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly); TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name); doResumeSynchronization(suspendedSynchronizations); } } }

其实就是将挂起的ConnectionHolder和数据源重新绑定。然后通过之前保存状态的SuspendedResourcesHolder恢复ThreadLocal的属性值。

DataSourceTransactionManager

根据DataSourceTransactionManager对一些模板方法的具体实现,可以解决一些前边有关创建和是否存在事务的疑惑。

doGetTransaction的一些事

第一次创建事务transaction的时候调用doGetTransaction方法创建一个DataSourceTransactionObject,即事务transaction。

// DataSourceTransactionManager#doGetTransaction protected Object doGetTransaction() { DataSourceTransactionObject txObject = new DataSourceTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed()); ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource()); // 值得注意 newConnectionHolder=false txObject.setConnectionHolder(conHolder, false); return txObject; } // TransactionSynchronizationManager#getResource public static Object getResource(Object key) { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doGetResource(actualKey); if (value != null && logger.isTraceEnabled()) { logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } return value; } // TransactionSynchronizationManager#doGetResource private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources"); private static Object doGetResource(Object actualKey) { Map<Object, Object> map = resources.get(); // <1> if (map == null) { return null; } Object value = map.get(actualKey); // Transparently remove ResourceHolder that was marked as void... if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { map.remove(actualKey); // Remove entire ThreadLocal if empty... if (map.isEmpty()) { resources.remove(); } value = null; } return value; }

<1>可以看出这个时候其实ConnectionHolder是null。

doBegin的一些事

然后在AbstractPlatformTransactionManager#startTransaction方法中创建新的事务,会调用DataSourceTransactionManager#doBegin方法。

// DataSourceTransactionManager#doBegin protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { Connection newCon = obtainDataSource().getConnection(); // <2> 没有ConnectionHolder就从数据源获取 if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } txObject.setConnectionHolder(new ConnectionHolder(newCon), true); // <3> } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); txObject.setReadOnly(definition.isReadOnly()); // Switch to manual commit if necessary. This is very expensive in some JDBC drivers, // so we don't want to do it unnecessarily (for example if we've explicitly // configured the connection pool to set it already). if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } con.setAutoCommit(false); // <4> 将其此次连接设置为非自动提交 } prepareTransactionalConnection(con, definition); txObject.getConnectionHolder().setTransactionActive(true); int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } // <5> Bind the connection holder to the thread. if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, obtainDataSource()); txObject.setConnectionHolder(null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } // TransactionSynchronizationManager#bindResource public static void bindResource(Object key, Object value) throws IllegalStateException { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Assert.notNull(value, "Value must not be null"); Map<Object, Object> map = resources.get(); // set ThreadLocal Map if none found if (map == null) { map = new HashMap<>(); resources.set(map); } Object oldValue = map.put(actualKey, value); // Transparently suppress a ResourceHolder that was marked as void... if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) { oldValue = null; } if (oldValue != null) { throw new IllegalStateException("Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } if (logger.isTraceEnabled()) { logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" + Thread.currentThread().getName() + "]"); } }

其实就是获取和数据源的连接,然后创建新的ConnectionHolder,进行一些属性设置,<3>的newConnectionHolder赋值为true,因为是从数据源新获取的连接。

在<5>中将ConnectionHolder和线程绑定(数据源和连接),于是就有了在doGetTransaction方法中可以从ThreadLocal获取和数据源匹配的已经存在的连接。以及根据这个判断当前线程是否存在事务。

doSuspend

//DataSourceTransactionManager#doSuspend protected Object doSuspend(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; txObject.setConnectionHolder(null); return TransactionSynchronizationManager.unbindResource(obtainDataSource()); // 得到ConnectionHolder }

doCommit

// DataSourceTransactionManager#doCommit protected void doCommit(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Committing JDBC transaction on Connection [" + con + "]"); } try { con.commit(); //就commit了 } catch (SQLException ex) { throw new TransactionSystemException("Could not commit JDBC transaction", ex); } }

doRollback

// DataSourceTransactionManager#doRollback protected void doRollback(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Rolling back JDBC transaction on Connection [" + con + "]"); } try { con.rollback(); } catch (SQLException ex) { throw new TransactionSystemException("Could not roll back JDBC transaction", ex); } }

doResume

// DataSourceTransactionManager#doResume protected void doResume(@Nullable Object transaction, Object suspendedResources) { TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources); }
最新回复(0)