spring-transaction源码分析(5)TransactionInterceptor事务拦截逻辑 ...

打印 上一主题 下一主题

主题 921|帖子 921|积分 2763

spring-tx的事务拦截逻辑在TransactionInterceptor类,本文将详细分析其实现方式。
事务拦截器TransactionInterceptor

spring-tx的事务拦截逻辑在TransactionInterceptor类,它实现了MethodInterceptor接口。
MethodInterceptor接口

MethodInterceptor接口的实现类封装aop切面拦截逻辑:
  1. public interface MethodInterceptor extends Interceptor {
  2.         /**
  3.          * Implement this method to perform extra treatments before and after the invocation.
  4.          */
  5.         Object invoke(MethodInvocation invocation) throws Throwable;
  6. }
复制代码
TransactionInterceptor类

TransactionInterceptor类封装了事务拦截逻辑:
  1. public class TransactionInterceptor
  2.     extends TransactionAspectSupport implements MethodInterceptor, Serializable {
  3.         // ...
  4.         @Override
  5.         public Object invoke(MethodInvocation invocation) throws Throwable {
  6.                 // Work out the target class: may be {@code null}.
  7.                 // The TransactionAttributeSource should be passed the target class
  8.                 // as well as the method, which may be from an interface.
  9.                 Class<?> targetClass =
  10.             (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
  11.                 // Adapt to TransactionAspectSupport's invokeWithinTransaction...
  12.                 return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
  13.         }
  14.         // ...
  15. }
复制代码
事务逻辑在父类TransactionAspectSupport的invokeWithinTransaction方法中。
invokeWithinTransaction方法
  1. protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
  2.                 final InvocationCallback invocation) throws Throwable {
  3.         // 如果方法没有被Transactional注解标注,则返回null
  4.         // 返回的是AnnotationTransactionAttributeSource对象
  5.         // 用于获取Transactional注解相关属性,
  6.         // 比如rollbackOn, propagationBehavior, isolationLevel等
  7.         TransactionAttributeSource tas = getTransactionAttributeSource();
  8.         TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
  9.         // 获取事务管理器
  10.         // 后面会转型成PlatformTransactionManager对象,可以开启事务、提交、回滚
  11.     TransactionManager tm = determineTransactionManager(txAttr);
  12.         // Reactive事务,略
  13.         // 转型成PlatformTransactionManager对象,可以开启事务、提交、回滚
  14.         PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
  15.         // 获取事务方法的唯一标识
  16.         // 格式为"类名.方法名"
  17.         final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
  18.         if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
  19.                 // 创建事务
  20.                 TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
  21.                 Object retVal;
  22.                 try {
  23.                         // This is an around advice: Invoke the next interceptor in the chain.
  24.                         // This will normally result in a target object being invoked.
  25.                         retVal = invocation.proceedWithInvocation();
  26.                 } catch (Throwable ex) {
  27.                         // Handle a throwable, completing the transaction.
  28.                         // We may commit or roll back, depending on the configuration.
  29.                         completeTransactionAfterThrowing(txInfo, ex);
  30.                         throw ex;
  31.                 } finally {
  32.                         // Reset the TransactionInfo ThreadLocal.
  33.                         // Call this in all cases: exception or normal return!
  34.                         cleanupTransactionInfo(txInfo);
  35.                 }
  36.                 if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
  37.                         // Set rollback-only in case of Vavr failure matching our rollback rules...
  38.                         TransactionStatus status = txInfo.getTransactionStatus();
  39.                         if (status != null && txAttr != null) {
  40.                                 retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
  41.                         }
  42.                 }
  43.                 // 提交事务
  44.                 commitTransactionAfterReturning(txInfo);
  45.                 return retVal;
  46.         } else {
  47.                 // CallbackPreferringPlatformTransactionManager事务管理器逻辑,略
  48.         }
  49. }
复制代码
创建事务

Create a transaction if necessary based on the given TransactionAttribute. Allows callers to perform custom TransactionAttribute lookups through the TransactionAttributeSource.
  1. protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm,
  2.                 TransactionAttribute txAttr, final String joinpointIdentification) {
  3.         // If no name specified, apply method identification as transaction name.
  4.         if (txAttr != null && txAttr.getName() == null) {
  5.                 txAttr = new DelegatingTransactionAttribute(txAttr) {
  6.                         @Override
  7.                         public String getName() {
  8.                                 return joinpointIdentification;
  9.                         }
  10.                 };
  11.         }
  12.         TransactionStatus status = null;
  13.         if (txAttr != null) {
  14.                 if (tm != null) {
  15.                         // 创建新事务或者返回已存在事务, 这取决于传播级别。
  16.                         // 隔离级别或超时等参数只在新事务时生效,已存在事务会忽略这些参数。
  17.                         status = tm.getTransaction(txAttr);
  18.                 }
  19.         }
  20.         // 使用指定的事务属性和TransactionStatus创建TransactionInfo
  21.         return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
  22. }
复制代码
getTransaction(txAttr)

这个方法创建新事务或者返回已存在事务,这取决于传播级别。隔离级别或超时等参数只在新事务时生效,已存在事务会忽略这些参数。
  1. public final TransactionStatus getTransaction(TransactionDefinition definition)
  2.                 throws TransactionException {
  3.         // Use defaults if no transaction definition given.
  4.         TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
  5.         // DataSourceTransactionManager实现类返回DataSourceTransactionObject对象
  6.         // DataSourceTransactionObject对象封装着数据库连接、previousIsolationLevel、readOnly、savepointAllowed等
  7.         Object transaction = doGetTransaction();
  8.         boolean debugEnabled = logger.isDebugEnabled();
  9.         if (isExistingTransaction(transaction)) {
  10.                 // Existing transaction found -> check propagation behavior to find out how to behave.
  11.                 return handleExistingTransaction(def, transaction, debugEnabled);
  12.         }
  13.         // No existing transaction found -> check propagation behavior to find out how to proceed.
  14.         if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
  15.                 // 传播级别设置为PROPAGATION_MANDATORY时,如果当前没有事务,则抛出异常
  16.                 throw new IllegalTransactionStateException(
  17.                                 "No existing transaction found for transaction marked with propagation 'mandatory'");
  18.         } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
  19.                         def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
  20.                         def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
  21.                 SuspendedResourcesHolder suspendedResources = suspend(null);
  22.                 // Creating new transaction
  23.                 try {
  24.                         return startTransaction(def, transaction, debugEnabled, suspendedResources);
  25.                 } catch (RuntimeException | Error ex) {
  26.                         resume(null, suspendedResources);
  27.                         throw ex;
  28.                 }
  29.         } else {
  30.                 // Create "empty" transaction: no actual transaction, but potentially synchronization.
  31.                 if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
  32.                         logger.warn("Custom isolation level specified but no actual transaction initiated; " +
  33.                                         "isolation level will effectively be ignored: " + def);
  34.                 }
  35.                 boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
  36.                 return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
  37.         }
  38. }
复制代码
doGetTransaction()
  1. protected Object doGetTransaction() {
  2.         // DataSourceTransactionObject封装着数据库连接、previousIsolationLevel、readOnly、savepointAllowed等
  3.         DataSourceTransactionObject txObject = new DataSourceTransactionObject();
  4.         // 是否允许设置保存点,NESTED传播级别时使用,DataSourceTransactionManager类型该属性为true
  5.         txObject.setSavepointAllowed(isNestedTransactionAllowed());
  6.         // 从ThreadLocal获取当前线程上绑定的ConnectionHolder
  7.         // ConnectionHolder对象保存着数据库连接
  8.         // 业务方法第一次执行时为null
  9.         ConnectionHolder conHolder =
  10.                         (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
  11.         txObject.setConnectionHolder(conHolder, false);
  12.         return txObject;
  13. }
复制代码
当前有事务

handleExistingTransaction方法
  1. if (isExistingTransaction(transaction)) {
  2.         // Existing transaction found -> check propagation behavior to find out how to behave.
  3.         return handleExistingTransaction(def, transaction, debugEnabled);
  4. }
复制代码
isExistingTransaction方法判断当前是否存在事务:
  1. protected boolean isExistingTransaction(Object transaction) {
  2.         DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  3.         // 判断存在数据库连接且开启了事务
  4.         return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
  5. }
复制代码
handleExistingTransaction方法:
  1. private TransactionStatus handleExistingTransaction(
  2.                 TransactionDefinition definition, Object transaction, boolean debugEnabled)
  3.                 throws TransactionException {
  4.         // 传播级别为NEVER
  5.         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
  6.                 // 传播级别设置为NEVER时,如果当前有事务,则抛出异常
  7.                 throw new IllegalTransactionStateException(
  8.                                 "Existing transaction found for transaction marked with propagation 'never'");
  9.         }
  10.         // 传播级别为NOT_SUPPORTED
  11.         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
  12.                 // 挂起当前事务
  13.                 Object suspendedResources = suspend(transaction);
  14.                 boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
  15.                 return prepareTransactionStatus(
  16.                                 definition, null, false, newSynchronization, debugEnabled, suspendedResources);
  17.         }
  18.         // 传播级别为REQUIRES_NEW
  19.         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
  20.                 // 挂起当前事务,然后创建新事务
  21.                 SuspendedResourcesHolder suspendedResources = suspend(transaction);
  22.                 try {
  23.                         return startTransaction(definition, transaction, debugEnabled, suspendedResources);
  24.                 } catch (RuntimeException | Error beginEx) {
  25.                         resumeAfterBeginException(transaction, suspendedResources, beginEx);
  26.                         throw beginEx;
  27.                 }
  28.         }
  29.         // 传播级别为NESTED
  30.         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
  31.                 if (!isNestedTransactionAllowed()) {
  32.                         throw new NestedTransactionNotSupportedException(
  33.                                         "Transaction manager does not allow nested transactions by default - " +
  34.                                         "specify 'nestedTransactionAllowed' property with value 'true'");
  35.                 }
  36.                 // Creating nested transaction
  37.                 if (useSavepointForNestedTransaction()) {
  38.                         // Create savepoint within existing Spring-managed transaction,
  39.                         // through the SavepointManager API implemented by TransactionStatus.
  40.                         // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
  41.                         DefaultTransactionStatus status =
  42.                                         prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
  43.                         status.createAndHoldSavepoint();
  44.                         return status;
  45.                 }
  46.                 else {
  47.                         // Nested transaction through nested begin and commit/rollback calls.
  48.                         // Usually only for JTA: Spring synchronization might get activated here
  49.                         // in case of a pre-existing JTA transaction.
  50.                         return startTransaction(definition, transaction, debugEnabled, null);
  51.                 }
  52.         }
  53.         // 传播级别为SUPPORTS/REQUIRED
  54.         if (isValidateExistingTransaction()) {
  55.                 if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
  56.                         Integer currentIsolationLevel =
  57.                 TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
  58.                         if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
  59.                                 Constants isoConstants = DefaultTransactionDefinition.constants;
  60.                                 throw new IllegalTransactionStateException(
  61.                     "Participating transaction with definition [" + definition +
  62.                     "] specifies isolation level which is incompatible with existing transaction: " +
  63.                                                 (currentIsolationLevel != null ?
  64.                                                                 isoConstants.toCode(currentIsolationLevel,
  65.                                                     DefaultTransactionDefinition.PREFIX_ISOLATION) :
  66.                                                                 "(unknown)"));
  67.                         }
  68.                 }
  69.                 if (!definition.isReadOnly()) {
  70.                         if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
  71.                                 throw new IllegalTransactionStateException("Participating transaction with definition [" +
  72.                                                 definition + "] is not marked as read-only but existing transaction is");
  73.                         }
  74.                 }
  75.         }
  76.         boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  77.         return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
  78. }
复制代码
传播级别为NEVER

传播级别设置为NEVER时,如果当前有事务,抛出异常:
  1. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
  2.         throw new IllegalTransactionStateException(
  3.                         "Existing transaction found for transaction marked with propagation 'never'");
  4. }
复制代码
传播级别为NOT_SUPPORTED

挂起当前事务,业务方法以无事务方式执行:
  1. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
  2.         // 挂起当前事务
  3.         Object suspendedResources = suspend(transaction);
  4.         boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
  5.         return prepareTransactionStatus(
  6.                         definition, null, false, newSynchronization, debugEnabled, suspendedResources);
  7. }
复制代码
prepareTransactionStatus方法:

  • 创建DefaultTransactionStatus对象,把SuspendedResources封装进去,以便后续恢复旧事务
  • 使用TransactionSynchronizationManager将事务属性绑定到当前线程
  • 初始化当前线程TransactionSynchronization集
由于传播级别为NOT_SUPPORTED所以此处不会开启事务。
传播级别为REQUIRES_NEW

挂起当前事务,创建新事务:
  1. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
  2.         // 挂起当前事务
  3.         SuspendedResourcesHolder suspendedResources = suspend(transaction);
  4.         try {
  5.                 // 开启新事务
  6.                 return startTransaction(definition, transaction, debugEnabled, suspendedResources);
  7.         } catch (RuntimeException | Error beginEx) {
  8.                 resumeAfterBeginException(transaction, suspendedResources, beginEx);
  9.                 throw beginEx;
  10.         }
  11. }
复制代码
开启新事务:
  1. private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
  2.                 boolean debugEnabled, SuspendedResourcesHolder suspendedResources) {
  3.         // 值为true
  4.         boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  5.         DefaultTransactionStatus status = newTransactionStatus(
  6.                         definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
  7.         // 开启新事务
  8.         doBegin(transaction, definition);
  9.         // 初始化当前线程TransactionSynchronization集
  10.         prepareSynchronization(status, definition);
  11.         return status;
  12. }
  13. protected void doBegin(Object transaction, TransactionDefinition definition) {
  14.         DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  15.         Connection con = null;
  16.         try {
  17.                 if (!txObject.hasConnectionHolder() ||
  18.                                 txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
  19.                         // 打开一个新连接
  20.                         Connection newCon = obtainDataSource().getConnection();
  21.                         txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
  22.                 }
  23.                 txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
  24.                 con = txObject.getConnectionHolder().getConnection();
  25.                 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
  26.                 txObject.setPreviousIsolationLevel(previousIsolationLevel);
  27.                 txObject.setReadOnly(definition.isReadOnly());
  28.                 // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
  29.                 // so we don't want to do it unnecessarily (for example if we've explicitly
  30.                 // configured the connection pool to set it already).
  31.                 if (con.getAutoCommit()) {
  32.                         txObject.setMustRestoreAutoCommit(true);
  33.                         // 设置手动提交
  34.                         con.setAutoCommit(false);
  35.                 }
  36.                 // The default implementation executes a "SET TRANSACTION READ ONLY" statement
  37.                 // if the "enforceReadOnly" flag is set to true and the transaction definition
  38.                 // indicates a read-only transaction.
  39.                 prepareTransactionalConnection(con, definition);
  40.                 txObject.getConnectionHolder().setTransactionActive(true);
  41.                 int timeout = determineTimeout(definition);
  42.                 if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
  43.                         txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
  44.                 }
  45.                 // Bind the connection holder to the thread.
  46.                 if (txObject.isNewConnectionHolder()) {
  47.                         TransactionSynchronizationManager
  48.                 .bindResource(obtainDataSource(), txObject.getConnectionHolder());
  49.                 }
  50.         } catch (Throwable ex) {
  51.                 if (txObject.isNewConnectionHolder()) {
  52.                         DataSourceUtils.releaseConnection(con, obtainDataSource());
  53.                         txObject.setConnectionHolder(null, false);
  54.                 }
  55.                 throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
  56.         }
  57. }
复制代码
传播级别为NESTED

为当前连接设置保存点,如果业务方法出现异常,会回滚到该保存点位置:
  1. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
  2.         if (!isNestedTransactionAllowed()) {
  3.                 throw new NestedTransactionNotSupportedException(
  4.                                 "Transaction manager does not allow nested transactions by default - " +
  5.                                 "specify 'nestedTransactionAllowed' property with value 'true'");
  6.         }
  7.         // Creating nested transaction
  8.         // 默认就是true
  9.         if (useSavepointForNestedTransaction()) {
  10.                 // Create savepoint within existing Spring-managed transaction,
  11.                 // through the SavepointManager API implemented by TransactionStatus.
  12.                 // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
  13.                 DefaultTransactionStatus status =
  14.                                 prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
  15.                 // 设置保存点
  16.                 status.createAndHoldSavepoint();
  17.                 return status;
  18.         } else {
  19.                 // Nested transaction through nested begin and commit/rollback calls.
  20.                 // Usually only for JTA: Spring synchronization might get activated here
  21.                 // in case of a pre-existing JTA transaction.
  22.                 return startTransaction(definition, transaction, debugEnabled, null);
  23.         }
  24. }
复制代码
设置保存点:
  1. public void createAndHoldSavepoint() throws TransactionException {
  2.         setSavepoint(getSavepointManager().createSavepoint());
  3. }
  4. // JdbcTransactionObjectSupport#createSavepoint
  5. public Object createSavepoint() throws TransactionException {
  6.         ConnectionHolder conHolder = getConnectionHolderForSavepoint();
  7.         try {
  8.                 if (!conHolder.supportsSavepoints()) {
  9.                         throw new NestedTransactionNotSupportedException("不支持");
  10.                 }
  11.                 if (conHolder.isRollbackOnly()) {
  12.                         throw new CannotCreateTransactionException("只读");
  13.                 }
  14.                 // 使用jdbc设置保存点
  15.                 return conHolder.createSavepoint();
  16.         } catch (SQLException ex) {
  17.                 throw new CannotCreateTransactionException("Could not create JDBC savepoint", ex);
  18.         }
  19. }
复制代码
传播级别为SUPPORTS/REQUIRED/MANDATORY
  1. // 默认false
  2. if (isValidateExistingTransaction()) {
  3.         if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
  4.                 Integer currentIsolationLevel =
  5.             TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
  6.                 if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
  7.                         Constants isoConstants = DefaultTransactionDefinition.constants;
  8.                         throw new IllegalTransactionStateException(
  9.                 "Participating transaction with definition [" + definition +
  10.                 "] specifies isolation level which is incompatible with existing transaction: " +
  11.                                         (currentIsolationLevel != null ?
  12.                                                         isoConstants.toCode(currentIsolationLevel,
  13.                                                 DefaultTransactionDefinition.PREFIX_ISOLATION) :
  14.                                                         "(unknown)"));
  15.                 }
  16.         }
  17.         if (!definition.isReadOnly()) {
  18.                 if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
  19.                         throw new IllegalTransactionStateException("Participating transaction with definition [" +
  20.                                         definition + "] is not marked as read-only but existing transaction is");
  21.                 }
  22.         }
  23. }
  24. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  25. return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
复制代码
事务挂起

把当前线程上绑定的资源、事务配置信息移除封装到SuspendedResourcesHolder对象,传递给新创建的TransactionStatus对象,以便在业务方法执行结束后恢复旧事务:
  1. protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
  2.         // 判断当前线程的Set<TransactionSynchronization>已经存在
  3.         // TransactionSynchronization: 事务回调同步器,定义了事务挂起、恢复等方法
  4.         // 例如mybatis-spring中有SqlSessionSynchronization实现类
  5.         if (TransactionSynchronizationManager.isSynchronizationActive()) {
  6.                 List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
  7.                 try {
  8.                         Object suspendedResources = null;
  9.                         if (transaction != null) {
  10.                                 // 挂起事务
  11.                                 suspendedResources = doSuspend(transaction);
  12.                         }
  13.                         // 清除当前线程事务配置参数:事务名、只读属性、隔离级别等
  14.                         String name = TransactionSynchronizationManager.getCurrentTransactionName();
  15.                         TransactionSynchronizationManager.setCurrentTransactionName(null);
  16.                         boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
  17.                         TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
  18.                         Integer isolationLevel =
  19.                 TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
  20.                         TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
  21.                         boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
  22.                         TransactionSynchronizationManager.setActualTransactionActive(false);
  23.                         // 把当前线程的事务相关信息封装起来以便后续恢复
  24.                         return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations,
  25.                                                 name, readOnly, isolationLevel, wasActive);
  26.                 } catch (RuntimeException | Error ex) {
  27.                         // doSuspend failed - original transaction is still active...
  28.                         doResumeSynchronization(suspendedSynchronizations);
  29.                         throw ex;
  30.                 }
  31.         } else if (transaction != null) {
  32.                 // Transaction active but no synchronization active.
  33.                 Object suspendedResources = doSuspend(transaction);
  34.                 return new SuspendedResourcesHolder(suspendedResources);
  35.         } else {
  36.                 // Neither transaction nor synchronization active.
  37.                 return null;
  38.         }
  39. }
  40. private List<TransactionSynchronization> doSuspendSynchronization() {
  41.         List<TransactionSynchronization> suspendedSynchronizations =
  42.                         TransactionSynchronizationManager.getSynchronizations();
  43.         // 挂起所有的TransactionSynchronization
  44.         // 比如SqlSessionSynchronization实现类会清除当前线程的SessionFactory
  45.         for (TransactionSynchronization synchronization : suspendedSynchronizations) {
  46.                 synchronization.suspend();
  47.         }
  48.         // 清除线程上的TransactionSynchronization集
  49.         TransactionSynchronizationManager.clearSynchronization();
  50.         return suspendedSynchronizations;
  51. }
  52. protected Object doSuspend(Object transaction) {
  53.         DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  54.         txObject.setConnectionHolder(null);
  55.         // ConnectionHolder对象
  56.         return TransactionSynchronizationManager.unbindResource(obtainDataSource());
  57. }
复制代码
事务恢复
  1. protected final void resume(Object transaction, SuspendedResourcesHolder resourcesHolder)
  2.                 throws TransactionException {
  3.         if (resourcesHolder != null) {
  4.                 Object suspendedResources = resourcesHolder.suspendedResources;
  5.                 if (suspendedResources != null) {
  6.                         // 恢复之前挂起的是ConnectionHolder对象
  7.                         doResume(transaction, suspendedResources);
  8.                 }
  9.                 List<TransactionSynchronization> suspendedSynchronizations =
  10.             resourcesHolder.suspendedSynchronizations;
  11.                 if (suspendedSynchronizations != null) {
  12.                         TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
  13.                         TransactionSynchronizationManager
  14.                 .setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
  15.                         TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
  16.                         TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
  17.                         doResumeSynchronization(suspendedSynchronizations);
  18.                 }
  19.         }
  20. }
  21. protected void doResume(Object transaction, Object suspendedResources) {
  22.         // 恢复之前挂起的是ConnectionHolder对象
  23.         TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
  24. }
复制代码
当前无事务
  1. // No existing transaction found -> check propagation behavior to find out how to proceed.
  2. if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
  3.         // 传播级别设置为PROPAGATION_MANDATORY时,如果当前没有事务,则抛出异常
  4.         throw new IllegalTransactionStateException(
  5.                         "No existing transaction found for transaction marked with propagation 'mandatory'");
  6. } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
  7.                 def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
  8.                 def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
  9.         SuspendedResourcesHolder suspendedResources = suspend(null);
  10.         // Creating new transaction
  11.         try {
  12.                 return startTransaction(def, transaction, debugEnabled, suspendedResources);
  13.         } catch (RuntimeException | Error ex) {
  14.                 resume(null, suspendedResources);
  15.                 throw ex;
  16.         }
  17. } else {
  18.         // Create "empty" transaction: no actual transaction, but potentially synchronization.
  19.         if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
  20.                 logger.warn("Custom isolation level specified but no actual transaction initiated; " +
  21.                                 "isolation level will effectively be ignored: " + def);
  22.         }
  23.         boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
  24.         return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
  25. }
复制代码
创建TransactionInfo
  1. protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,
  2.                 TransactionAttribute txAttr, String joinpointIdentification,
  3.                 TransactionStatus status) {
  4.         TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
  5.         if (txAttr != null) {
  6.                 // The transaction manager will flag an error if an incompatible tx already exists.
  7.                 txInfo.newTransactionStatus(status);
  8.         } else {
  9.                 // The TransactionInfo.hasTransaction() method will return false. We created it only
  10.                 // to preserve the integrity of the ThreadLocal stack maintained in this class.
  11.         }
  12.         // We always bind the TransactionInfo to the thread, even if we didn't create
  13.         // a new transaction here. This guarantees that the TransactionInfo stack
  14.         // will be managed correctly even if no transaction was created by this aspect.
  15.         txInfo.bindToThread();
  16.         return txInfo;
  17. }
复制代码
异常处理
  1. protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
  2.         if (txInfo != null && txInfo.getTransactionStatus() != null) {
  3.                 if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
  4.                         try {
  5.                                 txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
  6.                         } catch (TransactionSystemException ex2) {
  7.                                 logger.error("Application exception overridden by rollback exception", ex);
  8.                                 ex2.initApplicationException(ex);
  9.                                 throw ex2;
  10.                         } catch (RuntimeException | Error ex2) {
  11.                                 logger.error("Application exception overridden by rollback exception", ex);
  12.                                 throw ex2;
  13.                         }
  14.                 } else {
  15.                         // We don't roll back on this exception.
  16.                         // Will still roll back if TransactionStatus.isRollbackOnly() is true.
  17.                         try {
  18.                                 txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
  19.                         } catch (TransactionSystemException ex2) {
  20.                                 logger.error("Application exception overridden by commit exception", ex);
  21.                                 ex2.initApplicationException(ex);
  22.                                 throw ex2;
  23.                         } catch (RuntimeException | Error ex2) {
  24.                                 logger.error("Application exception overridden by commit exception", ex);
  25.                                 throw ex2;
  26.                         }
  27.                 }
  28.         }
  29. }
复制代码
回滚:
  1. public final void rollback(TransactionStatus status) throws TransactionException {
  2.         if (status.isCompleted()) {
  3.                 throw new IllegalTransactionStateException("Transaction is already completed");
  4.         }
  5.         DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
  6.         processRollback(defStatus, false);
  7. }
  8. private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
  9.         try {
  10.                 boolean unexpectedRollback = unexpected;
  11.                 try {
  12.                         triggerBeforeCompletion(status);
  13.                         if (status.hasSavepoint()) {
  14.                                 // 回滚到指定保存点
  15.                                 status.rollbackToHeldSavepoint();
  16.                         } else if (status.isNewTransaction()) {
  17.                                 // 事务回滚
  18.                                 doRollback(status);
  19.                         } else {
  20.                                 // Participating in larger transaction
  21.                                 if (status.hasTransaction()) {
  22.                                         if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
  23.                                                 // 设置rollback-only
  24.                                                 doSetRollbackOnly(status);
  25.                                         }
  26.                                 }
  27.                                 // Unexpected rollback only matters here if we're asked to fail early
  28.                                 if (!isFailEarlyOnGlobalRollbackOnly()) {
  29.                                         unexpectedRollback = false;
  30.                                 }
  31.                         }
  32.                 } catch (RuntimeException | Error ex) {
  33.                         triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
  34.                         throw ex;
  35.                 }
  36.                 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
  37.                 // Raise UnexpectedRollbackException if we had a global rollback-only marker
  38.                 if (unexpectedRollback) {
  39.                         throw new UnexpectedRollbackException(
  40.                                         "Transaction rolled back because it has been marked as rollback-only");
  41.                 }
  42.         } finally {
  43.                 // 这里面有恢复挂起事务的逻辑
  44.                 cleanupAfterCompletion(status);
  45.         }
  46. }
  47. // 回滚到指定保存点
  48. public void rollbackToSavepoint(Object savepoint) throws TransactionException {
  49.         ConnectionHolder conHolder = getConnectionHolderForSavepoint();
  50.         try {
  51.                 conHolder.getConnection().rollback((Savepoint) savepoint);
  52.                 conHolder.resetRollbackOnly();
  53.         } catch (Throwable ex) {
  54.                 throw new TransactionSystemException("Could not roll back to JDBC savepoint", ex);
  55.         }
  56. }
  57. // 释放保存点
  58. public void releaseSavepoint(Object savepoint) throws TransactionException {
  59.         ConnectionHolder conHolder = getConnectionHolderForSavepoint();
  60.         try {
  61.                 conHolder.getConnection().releaseSavepoint((Savepoint) savepoint);
  62.         } catch (Throwable ex) {
  63.                 logger.debug("Could not explicitly release JDBC savepoint", ex);
  64.         }
  65. }
  66. // 事务回滚
  67. protected void doRollback(DefaultTransactionStatus status) {
  68.         DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
  69.         Connection con = txObject.getConnectionHolder().getConnection();
  70.         try {
  71.                 con.rollback();
  72.         } catch (SQLException ex) {
  73.                 throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
  74.         }
  75. }
  76. private void cleanupAfterCompletion(DefaultTransactionStatus status) {
  77.         status.setCompleted();
  78.         if (status.isNewSynchronization()) {
  79.                 TransactionSynchronizationManager.clear();
  80.         }
  81.         if (status.isNewTransaction()) {
  82.                 // 恢复连接的事务属性,比如自动提交方式、隔离级别、只读属性等
  83.                 // 将连接归还给数据源,清除ConnectionHolder的conn
  84.                 doCleanupAfterCompletion(status.getTransaction());
  85.         }
  86.         if (status.getSuspendedResources() != null) {
  87.                 // 恢复之前挂起的事务
  88.                 Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
  89.                 resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
  90.         }
  91. }
复制代码
事务清理
  1. private void restoreThreadLocalStatus() {
  2.         // Use stack to restore old transaction TransactionInfo.
  3.         // Will be null if none was set.
  4.         transactionInfoHolder.set(this.oldTransactionInfo);
  5. }
复制代码
事务提交
  1. protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
  2.         if (txInfo != null && txInfo.getTransactionStatus() != null) {
  3.                 txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
  4.         }
  5. }
复制代码
提交:
  1. public final void commit(TransactionStatus status) throws TransactionException {
  2.         if (status.isCompleted()) {
  3.                 throw new IllegalTransactionStateException("Transaction is already completed");
  4.         }
  5.         DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
  6.         if (defStatus.isLocalRollbackOnly()) {
  7.                 // Transactional code has requested rollback
  8.                 // 回滚
  9.                 processRollback(defStatus, false);
  10.                 return;
  11.         }
  12.         if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
  13.                 // Global transaction is marked as rollback-only but transactional code requested commit
  14.                 // 回滚
  15.                 processRollback(defStatus, true);
  16.                 return;
  17.         }
  18.         // 提交事务
  19.         processCommit(defStatus);
  20. }
  21. // 提交事务
  22. private void processCommit(DefaultTransactionStatus status) throws TransactionException {
  23.         try {
  24.                 boolean beforeCompletionInvoked = false;
  25.                 try {
  26.                         boolean unexpectedRollback = false;
  27.                         prepareForCommit(status);
  28.                         triggerBeforeCommit(status);
  29.                         triggerBeforeCompletion(status);
  30.                         beforeCompletionInvoked = true;
  31.                         if (status.hasSavepoint()) {
  32.                                 unexpectedRollback = status.isGlobalRollbackOnly();
  33.                                 // 释放保存点
  34.                                 status.releaseHeldSavepoint();
  35.                         } else if (status.isNewTransaction()) {
  36.                                 unexpectedRollback = status.isGlobalRollbackOnly();
  37.                                 // 提交事务
  38.                                 doCommit(status);
  39.                         } else if (isFailEarlyOnGlobalRollbackOnly()) {
  40.                                 unexpectedRollback = status.isGlobalRollbackOnly();
  41.                         }
  42.                         // Throw UnexpectedRollbackException if we have a global rollback-only
  43.                         // marker but still didn't get a corresponding exception from commit.
  44.                         if (unexpectedRollback) {
  45.                                 throw new UnexpectedRollbackException(
  46.                                                 "Transaction silently rolled back because it has been marked as rollback-only");
  47.                         }
  48.                 } catch (UnexpectedRollbackException ex) {
  49.                         // can only be caused by doCommit
  50.                         triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
  51.                         throw ex;
  52.                 } catch (TransactionException ex) {
  53.                         // can only be caused by doCommit
  54.                         if (isRollbackOnCommitFailure()) {
  55.                                 doRollbackOnCommitException(status, ex);
  56.                         } else {
  57.                                 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
  58.                         }
  59.                         throw ex;
  60.                 } catch (RuntimeException | Error ex) {
  61.                         if (!beforeCompletionInvoked) {
  62.                                 triggerBeforeCompletion(status);
  63.                         }
  64.                         doRollbackOnCommitException(status, ex);
  65.                         throw ex;
  66.                 }
  67.                 // Trigger afterCommit callbacks, with an exception thrown there
  68.                 // propagated to callers but the transaction still considered as committed.
  69.                 try {
  70.                         triggerAfterCommit(status);
  71.                 } finally {
  72.                         triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
  73.                 }
  74.         } finally {
  75.                 cleanupAfterCompletion(status);
  76.         }
  77. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

乌市泽哥

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表