立山 发表于 2023-5-25 08:15:28

spring-transaction源码分析(5)TransactionInterceptor事务拦截逻辑

spring-tx的事务拦截逻辑在TransactionInterceptor类,本文将详细分析其实现方式。
事务拦截器TransactionInterceptor

spring-tx的事务拦截逻辑在TransactionInterceptor类,它实现了MethodInterceptor接口。
MethodInterceptor接口

MethodInterceptor接口的实现类封装aop切面拦截逻辑:
public interface MethodInterceptor extends Interceptor {

        /**
       * Implement this method to perform extra treatments before and after the invocation.
       */
        Object invoke(MethodInvocation invocation) throws Throwable;
}TransactionInterceptor类

TransactionInterceptor类封装了事务拦截逻辑:
public class TransactionInterceptor
    extends TransactionAspectSupport implements MethodInterceptor, Serializable {

        // ...

        @Override
        public Object invoke(MethodInvocation invocation) throws Throwable {
                // Work out the target class: may be {@code null}.
                // The TransactionAttributeSource should be passed the target class
                // as well as the method, which may be from an interface.
                Class<?> targetClass =
            (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

                // Adapt to TransactionAspectSupport's invokeWithinTransaction...
                return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
        }

        // ...
}事务逻辑在父类TransactionAspectSupport的invokeWithinTransaction方法中。
invokeWithinTransaction方法

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
                final InvocationCallback invocation) throws Throwable {

        // 如果方法没有被Transactional注解标注,则返回null
        // 返回的是AnnotationTransactionAttributeSource对象
        // 用于获取Transactional注解相关属性,
        // 比如rollbackOn, propagationBehavior, isolationLevel等
        TransactionAttributeSource tas = getTransactionAttributeSource();
        TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);

        // 获取事务管理器
        // 后面会转型成PlatformTransactionManager对象,可以开启事务、提交、回滚
    TransactionManager tm = determineTransactionManager(txAttr);

        // Reactive事务,略

        // 转型成PlatformTransactionManager对象,可以开启事务、提交、回滚
        PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
        // 获取事务方法的唯一标识
        // 格式为"类名.方法名"
        final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

        if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
                // 创建事务
                TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

                Object retVal;
                try {
                        // This is an around advice: Invoke the next interceptor in the chain.
                        // This will normally result in a target object being invoked.
                        retVal = invocation.proceedWithInvocation();
                } catch (Throwable ex) {
                        // Handle a throwable, completing the transaction.
                        // We may commit or roll back, depending on the configuration.
                        completeTransactionAfterThrowing(txInfo, ex);
                        throw ex;
                } finally {
                        // Reset the TransactionInfo ThreadLocal.
                        // Call this in all cases: exception or normal return!
                        cleanupTransactionInfo(txInfo);
                }

                if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
                        // Set rollback-only in case of Vavr failure matching our rollback rules...
                        TransactionStatus status = txInfo.getTransactionStatus();
                        if (status != null && txAttr != null) {
                                retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
                        }
                }

                // 提交事务
                commitTransactionAfterReturning(txInfo);

                return retVal;
        } else {
                // CallbackPreferringPlatformTransactionManager事务管理器逻辑,略
        }
}创建事务

Create a transaction if necessary based on the given TransactionAttribute. Allows callers to perform custom TransactionAttribute lookups through the TransactionAttributeSource.
protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm,
                TransactionAttribute txAttr, final String joinpointIdentification) {

        // If no name specified, apply method identification as transaction name.
        if (txAttr != null && txAttr.getName() == null) {
                txAttr = new DelegatingTransactionAttribute(txAttr) {
                        @Override
                        public String getName() {
                                return joinpointIdentification;
                        }
                };
        }

        TransactionStatus status = null;
        if (txAttr != null) {
                if (tm != null) {
                        // 创建新事务或者返回已存在事务, 这取决于传播级别。
                        // 隔离级别或超时等参数只在新事务时生效,已存在事务会忽略这些参数。
                        status = tm.getTransaction(txAttr);
                }
        }

        // 使用指定的事务属性和TransactionStatus创建TransactionInfo
        return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}getTransaction(txAttr)

这个方法创建新事务或者返回已存在事务,这取决于传播级别。隔离级别或超时等参数只在新事务时生效,已存在事务会忽略这些参数。
public final TransactionStatus getTransaction(TransactionDefinition definition)
                throws TransactionException {

        // Use defaults if no transaction definition given.
        TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

        // DataSourceTransactionManager实现类返回DataSourceTransactionObject对象
        // DataSourceTransactionObject对象封装着数据库连接、previousIsolationLevel、readOnly、savepointAllowed等
        Object transaction = doGetTransaction();
        boolean debugEnabled = logger.isDebugEnabled();

        if (isExistingTransaction(transaction)) {
                // Existing transaction found -> check propagation behavior to find out how to behave.
                return handleExistingTransaction(def, transaction, debugEnabled);
        }

        // No existing transaction found -> check propagation behavior to find out how to proceed.
        if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
                // 传播级别设置为PROPAGATION_MANDATORY时,如果当前没有事务,则抛出异常
                throw new IllegalTransactionStateException(
                                "No existing transaction found for transaction marked with propagation 'mandatory'");
        } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
                SuspendedResourcesHolder suspendedResources = suspend(null);
                // Creating new transaction
                try {
                        return startTransaction(def, transaction, debugEnabled, suspendedResources);
                } catch (RuntimeException | Error ex) {
                        resume(null, suspendedResources);
                        throw ex;
                }
        } else {
                // Create "empty" transaction: no actual transaction, but potentially synchronization.
                if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                        logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                                        "isolation level will effectively be ignored: " + def);
                }
                boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
                return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
        }
}doGetTransaction()

protected Object doGetTransaction() {
        // DataSourceTransactionObject封装着数据库连接、previousIsolationLevel、readOnly、savepointAllowed等
        DataSourceTransactionObject txObject = new DataSourceTransactionObject();
        // 是否允许设置保存点,NESTED传播级别时使用,DataSourceTransactionManager类型该属性为true
        txObject.setSavepointAllowed(isNestedTransactionAllowed());
        // 从ThreadLocal获取当前线程上绑定的ConnectionHolder
        // ConnectionHolder对象保存着数据库连接
        // 业务方法第一次执行时为null
        ConnectionHolder conHolder =
                        (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
        txObject.setConnectionHolder(conHolder, false);
        return txObject;
}当前有事务

handleExistingTransaction方法

if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        return handleExistingTransaction(def, transaction, debugEnabled);
}isExistingTransaction方法判断当前是否存在事务:
protected boolean isExistingTransaction(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        // 判断存在数据库连接且开启了事务
        return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}handleExistingTransaction方法:
private TransactionStatus handleExistingTransaction(
                TransactionDefinition definition, Object transaction, boolean debugEnabled)
                throws TransactionException {

        // 传播级别为NEVER
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
                // 传播级别设置为NEVER时,如果当前有事务,则抛出异常
                throw new IllegalTransactionStateException(
                                "Existing transaction found for transaction marked with propagation 'never'");
        }

        // 传播级别为NOT_SUPPORTED
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
                // 挂起当前事务
                Object suspendedResources = suspend(transaction);
                boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
                return prepareTransactionStatus(
                                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
        }

        // 传播级别为REQUIRES_NEW
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
                // 挂起当前事务,然后创建新事务
                SuspendedResourcesHolder suspendedResources = suspend(transaction);
                try {
                        return startTransaction(definition, transaction, debugEnabled, suspendedResources);
                } catch (RuntimeException | Error beginEx) {
                        resumeAfterBeginException(transaction, suspendedResources, beginEx);
                        throw beginEx;
                }
        }

        // 传播级别为NESTED
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
                if (!isNestedTransactionAllowed()) {
                        throw new NestedTransactionNotSupportedException(
                                        "Transaction manager does not allow nested transactions by default - " +
                                        "specify 'nestedTransactionAllowed' property with value 'true'");
                }
                // Creating nested transaction
                if (useSavepointForNestedTransaction()) {
                        // Create savepoint within existing Spring-managed transaction,
                        // through the SavepointManager API implemented by TransactionStatus.
                        // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
                        DefaultTransactionStatus status =
                                        prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                        status.createAndHoldSavepoint();
                        return status;
                }
                else {
                        // Nested transaction through nested begin and commit/rollback calls.
                        // Usually only for JTA: Spring synchronization might get activated here
                        // in case of a pre-existing JTA transaction.
                        return startTransaction(definition, transaction, debugEnabled, null);
                }
        }

        // 传播级别为SUPPORTS/REQUIRED
        if (isValidateExistingTransaction()) {
                if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                        Integer currentIsolationLevel =
                TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                        if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                                Constants isoConstants = DefaultTransactionDefinition.constants;
                                throw new IllegalTransactionStateException(
                  "Participating transaction with definition [" + definition +
                  "] specifies isolation level which is incompatible with existing transaction: " +
                                                (currentIsolationLevel != null ?
                                                                isoConstants.toCode(currentIsolationLevel,
                                                    DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                                                "(unknown)"));
                        }
                }
                if (!definition.isReadOnly()) {
                        if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                                definition + "] is not marked as read-only but existing transaction is");
                        }
                }
        }

        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}传播级别为NEVER

传播级别设置为NEVER时,如果当前有事务,抛出异常:
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
                        "Existing transaction found for transaction marked with propagation 'never'");
}传播级别为NOT_SUPPORTED

挂起当前事务,业务方法以无事务方式执行:
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        // 挂起当前事务
        Object suspendedResources = suspend(transaction);
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(
                        definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}prepareTransactionStatus方法:

[*]创建DefaultTransactionStatus对象,把SuspendedResources封装进去,以便后续恢复旧事务
[*]使用TransactionSynchronizationManager将事务属性绑定到当前线程
[*]初始化当前线程TransactionSynchronization集
由于传播级别为NOT_SUPPORTED所以此处不会开启事务。
传播级别为REQUIRES_NEW

挂起当前事务,创建新事务:
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        // 挂起当前事务
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
                // 开启新事务
                return startTransaction(definition, transaction, debugEnabled, suspendedResources);
        } catch (RuntimeException | Error beginEx) {
                resumeAfterBeginException(transaction, suspendedResources, beginEx);
                throw beginEx;
        }
}开启新事务:
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
                boolean debugEnabled, SuspendedResourcesHolder suspendedResources) {

        // 值为true
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
        // 开启新事务
        doBegin(transaction, definition);
        // 初始化当前线程TransactionSynchronization集
        prepareSynchronization(status, definition);
        return status;
}

protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;

        try {
                if (!txObject.hasConnectionHolder() ||
                                txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                        // 打开一个新连接
                        Connection newCon = obtainDataSource().getConnection();
                        txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
                }

                txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
                con = txObject.getConnectionHolder().getConnection();

                Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
                txObject.setPreviousIsolationLevel(previousIsolationLevel);
                txObject.setReadOnly(definition.isReadOnly());

                // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
                // so we don't want to do it unnecessarily (for example if we've explicitly
                // configured the connection pool to set it already).
                if (con.getAutoCommit()) {
                        txObject.setMustRestoreAutoCommit(true);
                        // 设置手动提交
                        con.setAutoCommit(false);
                }

                // The default implementation executes a "SET TRANSACTION READ ONLY" statement
                // if the "enforceReadOnly" flag is set to true and the transaction definition
                // indicates a read-only transaction.
                prepareTransactionalConnection(con, definition);
                txObject.getConnectionHolder().setTransactionActive(true);

                int timeout = determineTimeout(definition);
                if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                        txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
                }

                // Bind the connection holder to the thread.
                if (txObject.isNewConnectionHolder()) {
                        TransactionSynchronizationManager
                .bindResource(obtainDataSource(), txObject.getConnectionHolder());
                }
        } catch (Throwable ex) {
                if (txObject.isNewConnectionHolder()) {
                        DataSourceUtils.releaseConnection(con, obtainDataSource());
                        txObject.setConnectionHolder(null, false);
                }
                throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
        }
}传播级别为NESTED

为当前连接设置保存点,如果业务方法出现异常,会回滚到该保存点位置:
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        if (!isNestedTransactionAllowed()) {
                throw new NestedTransactionNotSupportedException(
                                "Transaction manager does not allow nested transactions by default - " +
                                "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        // Creating nested transaction
        // 默认就是true
        if (useSavepointForNestedTransaction()) {
                // Create savepoint within existing Spring-managed transaction,
                // through the SavepointManager API implemented by TransactionStatus.
                // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
                DefaultTransactionStatus status =
                                prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                // 设置保存点
                status.createAndHoldSavepoint();
                return status;
        } else {
                // Nested transaction through nested begin and commit/rollback calls.
                // Usually only for JTA: Spring synchronization might get activated here
                // in case of a pre-existing JTA transaction.
                return startTransaction(definition, transaction, debugEnabled, null);
        }
}设置保存点:
public void createAndHoldSavepoint() throws TransactionException {
        setSavepoint(getSavepointManager().createSavepoint());
}

// JdbcTransactionObjectSupport#createSavepoint
public Object createSavepoint() throws TransactionException {
        ConnectionHolder conHolder = getConnectionHolderForSavepoint();
        try {
                if (!conHolder.supportsSavepoints()) {
                        throw new NestedTransactionNotSupportedException("不支持");
                }
                if (conHolder.isRollbackOnly()) {
                        throw new CannotCreateTransactionException("只读");
                }
                // 使用jdbc设置保存点
                return conHolder.createSavepoint();
        } catch (SQLException ex) {
                throw new CannotCreateTransactionException("Could not create JDBC savepoint", ex);
        }
}传播级别为SUPPORTS/REQUIRED/MANDATORY

// 默认false
if (isValidateExistingTransaction()) {
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                Integer currentIsolationLevel =
            TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                        Constants isoConstants = DefaultTransactionDefinition.constants;
                        throw new IllegalTransactionStateException(
                "Participating transaction with definition [" + definition +
                "] specifies isolation level which is incompatible with existing transaction: " +
                                        (currentIsolationLevel != null ?
                                                        isoConstants.toCode(currentIsolationLevel,
                                                DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                                        "(unknown)"));
                }
        }
        if (!definition.isReadOnly()) {
                if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                        throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                        definition + "] is not marked as read-only but existing transaction is");
                }
        }
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);事务挂起

把当前线程上绑定的资源、事务配置信息移除封装到SuspendedResourcesHolder对象,传递给新创建的TransactionStatus对象,以便在业务方法执行结束后恢复旧事务:
protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
        // 判断当前线程的Set<TransactionSynchronization>已经存在
        // TransactionSynchronization: 事务回调同步器,定义了事务挂起、恢复等方法
        // 例如mybatis-spring中有SqlSessionSynchronization实现类
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
                List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
                try {
                        Object suspendedResources = null;
                        if (transaction != null) {
                                // 挂起事务
                                suspendedResources = doSuspend(transaction);
                        }

                        // 清除当前线程事务配置参数:事务名、只读属性、隔离级别等
                        String name = TransactionSynchronizationManager.getCurrentTransactionName();
                        TransactionSynchronizationManager.setCurrentTransactionName(null);
                        boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
                        TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
                        Integer isolationLevel =
                TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                        TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
                        boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
                        TransactionSynchronizationManager.setActualTransactionActive(false);

                        // 把当前线程的事务相关信息封装起来以便后续恢复
                        return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations,
                                                name, readOnly, isolationLevel, wasActive);
                } catch (RuntimeException | Error ex) {
                        // doSuspend failed - original transaction is still active...
                        doResumeSynchronization(suspendedSynchronizations);
                        throw ex;
                }
        } else if (transaction != null) {
                // Transaction active but no synchronization active.
                Object suspendedResources = doSuspend(transaction);
                return new SuspendedResourcesHolder(suspendedResources);
        } else {
                // Neither transaction nor synchronization active.
                return null;
        }
}

private List<TransactionSynchronization> doSuspendSynchronization() {
        List<TransactionSynchronization> suspendedSynchronizations =
                        TransactionSynchronizationManager.getSynchronizations();
        // 挂起所有的TransactionSynchronization
        // 比如SqlSessionSynchronization实现类会清除当前线程的SessionFactory
        for (TransactionSynchronization synchronization : suspendedSynchronizations) {
                synchronization.suspend();
        }
        // 清除线程上的TransactionSynchronization集
        TransactionSynchronizationManager.clearSynchronization();
        return suspendedSynchronizations;
}

protected Object doSuspend(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        txObject.setConnectionHolder(null);
        // ConnectionHolder对象
        return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}事务恢复

protected final void resume(Object transaction, SuspendedResourcesHolder resourcesHolder)
                throws TransactionException {

        if (resourcesHolder != null) {
                Object suspendedResources = resourcesHolder.suspendedResources;
                if (suspendedResources != null) {
                        // 恢复之前挂起的是ConnectionHolder对象
                        doResume(transaction, suspendedResources);
                }
                List<TransactionSynchronization> suspendedSynchronizations =
            resourcesHolder.suspendedSynchronizations;
                if (suspendedSynchronizations != null) {
                        TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
                        TransactionSynchronizationManager
                .setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
                        TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
                        TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
                        doResumeSynchronization(suspendedSynchronizations);
                }
        }
}

protected void doResume(Object transaction, Object suspendedResources) {
        // 恢复之前挂起的是ConnectionHolder对象
        TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}当前无事务

// No existing transaction found -> check propagation behavior to find out how to proceed.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        // 传播级别设置为PROPAGATION_MANDATORY时,如果当前没有事务,则抛出异常
        throw new IllegalTransactionStateException(
                        "No existing transaction found for transaction marked with propagation 'mandatory'");
} else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        SuspendedResourcesHolder suspendedResources = suspend(null);
        // Creating new transaction
        try {
                return startTransaction(def, transaction, debugEnabled, suspendedResources);
        } catch (RuntimeException | Error ex) {
                resume(null, suspendedResources);
                throw ex;
        }
} else {
        // Create "empty" transaction: no actual transaction, but potentially synchronization.
        if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                                "isolation level will effectively be ignored: " + def);
        }
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}创建TransactionInfo

protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,
                TransactionAttribute txAttr, String joinpointIdentification,
                TransactionStatus status) {

        TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
        if (txAttr != null) {
                // The transaction manager will flag an error if an incompatible tx already exists.
                txInfo.newTransactionStatus(status);
        } else {
                // The TransactionInfo.hasTransaction() method will return false. We created it only
                // to preserve the integrity of the ThreadLocal stack maintained in this class.
        }

        // We always bind the TransactionInfo to the thread, even if we didn't create
        // a new transaction here. This guarantees that the TransactionInfo stack
        // will be managed correctly even if no transaction was created by this aspect.
        txInfo.bindToThread();
        return txInfo;
}异常处理

protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
        if (txInfo != null && txInfo.getTransactionStatus() != null) {
                if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
                        try {
                                txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
                        } catch (TransactionSystemException ex2) {
                                logger.error("Application exception overridden by rollback exception", ex);
                                ex2.initApplicationException(ex);
                                throw ex2;
                        } catch (RuntimeException | Error ex2) {
                                logger.error("Application exception overridden by rollback exception", ex);
                                throw ex2;
                        }
                } else {
                        // We don't roll back on this exception.
                        // Will still roll back if TransactionStatus.isRollbackOnly() is true.
                        try {
                                txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
                        } catch (TransactionSystemException ex2) {
                                logger.error("Application exception overridden by commit exception", ex);
                                ex2.initApplicationException(ex);
                                throw ex2;
                        } catch (RuntimeException | Error ex2) {
                                logger.error("Application exception overridden by commit exception", ex);
                                throw ex2;
                        }
                }
        }
}回滚:
public final void rollback(TransactionStatus status) throws TransactionException {
        if (status.isCompleted()) {
                throw new IllegalTransactionStateException("Transaction is already completed");
        }

        DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
        processRollback(defStatus, false);
}

private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
        try {
                boolean unexpectedRollback = unexpected;

                try {
                        triggerBeforeCompletion(status);

                        if (status.hasSavepoint()) {
                                // 回滚到指定保存点
                                status.rollbackToHeldSavepoint();
                        } else if (status.isNewTransaction()) {
                                // 事务回滚
                                doRollback(status);
                        } else {
                                // Participating in larger transaction
                                if (status.hasTransaction()) {
                                        if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                                                // 设置rollback-only
                                                doSetRollbackOnly(status);
                                        }
                                }
                                // Unexpected rollback only matters here if we're asked to fail early
                                if (!isFailEarlyOnGlobalRollbackOnly()) {
                                        unexpectedRollback = false;
                                }
                        }
                } catch (RuntimeException | Error ex) {
                        triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                        throw ex;
                }

                triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

                // Raise UnexpectedRollbackException if we had a global rollback-only marker
                if (unexpectedRollback) {
                        throw new UnexpectedRollbackException(
                                        "Transaction rolled back because it has been marked as rollback-only");
                }
        } finally {
                // 这里面有恢复挂起事务的逻辑
                cleanupAfterCompletion(status);
        }
}

// 回滚到指定保存点
public void rollbackToSavepoint(Object savepoint) throws TransactionException {
        ConnectionHolder conHolder = getConnectionHolderForSavepoint();
        try {
                conHolder.getConnection().rollback((Savepoint) savepoint);
                conHolder.resetRollbackOnly();
        } catch (Throwable ex) {
                throw new TransactionSystemException("Could not roll back to JDBC savepoint", ex);
        }
}
// 释放保存点
public void releaseSavepoint(Object savepoint) throws TransactionException {
        ConnectionHolder conHolder = getConnectionHolderForSavepoint();
        try {
                conHolder.getConnection().releaseSavepoint((Savepoint) savepoint);
        } catch (Throwable ex) {
                logger.debug("Could not explicitly release JDBC savepoint", ex);
        }
}

// 事务回滚
protected void doRollback(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        try {
                con.rollback();
        } catch (SQLException ex) {
                throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
        }
}

private void cleanupAfterCompletion(DefaultTransactionStatus status) {
        status.setCompleted();
        if (status.isNewSynchronization()) {
                TransactionSynchronizationManager.clear();
        }
        if (status.isNewTransaction()) {
                // 恢复连接的事务属性,比如自动提交方式、隔离级别、只读属性等
                // 将连接归还给数据源,清除ConnectionHolder的conn
                doCleanupAfterCompletion(status.getTransaction());
        }
        if (status.getSuspendedResources() != null) {
                // 恢复之前挂起的事务
                Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
                resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
        }
}事务清理

private void restoreThreadLocalStatus() {
        // Use stack to restore old transaction TransactionInfo.
        // Will be null if none was set.
        transactionInfoHolder.set(this.oldTransactionInfo);
}事务提交

protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
        if (txInfo != null && txInfo.getTransactionStatus() != null) {
                txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
        }
}提交:
public final void commit(TransactionStatus status) throws TransactionException {
        if (status.isCompleted()) {
                throw new IllegalTransactionStateException("Transaction is already completed");
        }

        DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
        if (defStatus.isLocalRollbackOnly()) {
                // Transactional code has requested rollback
                // 回滚
                processRollback(defStatus, false);
                return;
        }

        if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
                // Global transaction is marked as rollback-only but transactional code requested commit
                // 回滚
                processRollback(defStatus, true);
                return;
        }

        // 提交事务
        processCommit(defStatus);
}

// 提交事务
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
                boolean beforeCompletionInvoked = false;

                try {
                        boolean unexpectedRollback = false;
                        prepareForCommit(status);
                        triggerBeforeCommit(status);
                        triggerBeforeCompletion(status);
                        beforeCompletionInvoked = true;

                        if (status.hasSavepoint()) {
                                unexpectedRollback = status.isGlobalRollbackOnly();
                                // 释放保存点
                                status.releaseHeldSavepoint();
                        } else if (status.isNewTransaction()) {
                                unexpectedRollback = status.isGlobalRollbackOnly();
                                // 提交事务
                                doCommit(status);
                        } else if (isFailEarlyOnGlobalRollbackOnly()) {
                                unexpectedRollback = status.isGlobalRollbackOnly();
                        }

                        // Throw UnexpectedRollbackException if we have a global rollback-only
                        // marker but still didn't get a corresponding exception from commit.
                        if (unexpectedRollback) {
                                throw new UnexpectedRollbackException(
                                                "Transaction silently rolled back because it has been marked as rollback-only");
                        }
                } catch (UnexpectedRollbackException ex) {
                        // can only be caused by doCommit
                        triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
                        throw ex;
                } catch (TransactionException ex) {
                        // can only be caused by doCommit
                        if (isRollbackOnCommitFailure()) {
                                doRollbackOnCommitException(status, ex);
                        } else {
                                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                        }
                        throw ex;
                } catch (RuntimeException | Error ex) {
                        if (!beforeCompletionInvoked) {
                                triggerBeforeCompletion(status);
                        }
                        doRollbackOnCommitException(status, ex);
                        throw ex;
                }

                // Trigger afterCommit callbacks, with an exception thrown there
                // propagated to callers but the transaction still considered as committed.
                try {
                        triggerAfterCommit(status);
                } finally {
                        triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
                }

        } finally {
                cleanupAfterCompletion(status);
        }
}
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: spring-transaction源码分析(5)TransactionInterceptor事务拦截逻辑