spring-tx的事务拦截逻辑在TransactionInterceptor类,本文将详细分析其实现方式。
事务拦截器TransactionInterceptor
spring-tx的事务拦截逻辑在TransactionInterceptor类,它实现了MethodInterceptor接口。
MethodInterceptor接口
MethodInterceptor接口的实现类封装aop切面拦截逻辑:- public interface MethodInterceptor extends Interceptor {
- /**
- * Implement this method to perform extra treatments before and after the invocation.
- */
- Object invoke(MethodInvocation invocation) throws Throwable;
- }
复制代码 TransactionInterceptor类
TransactionInterceptor类封装了事务拦截逻辑:- public class TransactionInterceptor
- extends TransactionAspectSupport implements MethodInterceptor, Serializable {
- // ...
- @Override
- public Object invoke(MethodInvocation invocation) throws Throwable {
- // Work out the target class: may be {@code null}.
- // The TransactionAttributeSource should be passed the target class
- // as well as the method, which may be from an interface.
- Class<?> targetClass =
- (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
- // Adapt to TransactionAspectSupport's invokeWithinTransaction...
- return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
- }
- // ...
- }
复制代码 事务逻辑在父类TransactionAspectSupport的invokeWithinTransaction方法中。
invokeWithinTransaction方法
- protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
- final InvocationCallback invocation) throws Throwable {
- // 如果方法没有被Transactional注解标注,则返回null
- // 返回的是AnnotationTransactionAttributeSource对象
- // 用于获取Transactional注解相关属性,
- // 比如rollbackOn, propagationBehavior, isolationLevel等
- TransactionAttributeSource tas = getTransactionAttributeSource();
- TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
- // 获取事务管理器
- // 后面会转型成PlatformTransactionManager对象,可以开启事务、提交、回滚
- TransactionManager tm = determineTransactionManager(txAttr);
- // Reactive事务,略
- // 转型成PlatformTransactionManager对象,可以开启事务、提交、回滚
- PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
- // 获取事务方法的唯一标识
- // 格式为"类名.方法名"
- final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
- if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
- // 创建事务
- TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
- Object retVal;
- try {
- // This is an around advice: Invoke the next interceptor in the chain.
- // This will normally result in a target object being invoked.
- retVal = invocation.proceedWithInvocation();
- } catch (Throwable ex) {
- // Handle a throwable, completing the transaction.
- // We may commit or roll back, depending on the configuration.
- completeTransactionAfterThrowing(txInfo, ex);
- throw ex;
- } finally {
- // Reset the TransactionInfo ThreadLocal.
- // Call this in all cases: exception or normal return!
- cleanupTransactionInfo(txInfo);
- }
- if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
- // Set rollback-only in case of Vavr failure matching our rollback rules...
- TransactionStatus status = txInfo.getTransactionStatus();
- if (status != null && txAttr != null) {
- retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
- }
- }
- // 提交事务
- commitTransactionAfterReturning(txInfo);
- return retVal;
- } else {
- // CallbackPreferringPlatformTransactionManager事务管理器逻辑,略
- }
- }
复制代码 创建事务
Create a transaction if necessary based on the given TransactionAttribute. Allows callers to perform custom TransactionAttribute lookups through the TransactionAttributeSource.- protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm,
- TransactionAttribute txAttr, final String joinpointIdentification) {
- // If no name specified, apply method identification as transaction name.
- if (txAttr != null && txAttr.getName() == null) {
- txAttr = new DelegatingTransactionAttribute(txAttr) {
- @Override
- public String getName() {
- return joinpointIdentification;
- }
- };
- }
- TransactionStatus status = null;
- if (txAttr != null) {
- if (tm != null) {
- // 创建新事务或者返回已存在事务, 这取决于传播级别。
- // 隔离级别或超时等参数只在新事务时生效,已存在事务会忽略这些参数。
- status = tm.getTransaction(txAttr);
- }
- }
- // 使用指定的事务属性和TransactionStatus创建TransactionInfo
- return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
- }
复制代码 getTransaction(txAttr)
这个方法创建新事务或者返回已存在事务,这取决于传播级别。隔离级别或超时等参数只在新事务时生效,已存在事务会忽略这些参数。- public final TransactionStatus getTransaction(TransactionDefinition definition)
- throws TransactionException {
- // Use defaults if no transaction definition given.
- TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
- // DataSourceTransactionManager实现类返回DataSourceTransactionObject对象
- // DataSourceTransactionObject对象封装着数据库连接、previousIsolationLevel、readOnly、savepointAllowed等
- Object transaction = doGetTransaction();
- boolean debugEnabled = logger.isDebugEnabled();
- if (isExistingTransaction(transaction)) {
- // Existing transaction found -> check propagation behavior to find out how to behave.
- return handleExistingTransaction(def, transaction, debugEnabled);
- }
- // No existing transaction found -> check propagation behavior to find out how to proceed.
- if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
- // 传播级别设置为PROPAGATION_MANDATORY时,如果当前没有事务,则抛出异常
- throw new IllegalTransactionStateException(
- "No existing transaction found for transaction marked with propagation 'mandatory'");
- } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
- def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
- def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
- SuspendedResourcesHolder suspendedResources = suspend(null);
- // Creating new transaction
- try {
- return startTransaction(def, transaction, debugEnabled, suspendedResources);
- } catch (RuntimeException | Error ex) {
- resume(null, suspendedResources);
- throw ex;
- }
- } else {
- // Create "empty" transaction: no actual transaction, but potentially synchronization.
- if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
- logger.warn("Custom isolation level specified but no actual transaction initiated; " +
- "isolation level will effectively be ignored: " + def);
- }
- boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
- return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
- }
- }
复制代码 doGetTransaction()
- protected Object doGetTransaction() {
- // DataSourceTransactionObject封装着数据库连接、previousIsolationLevel、readOnly、savepointAllowed等
- DataSourceTransactionObject txObject = new DataSourceTransactionObject();
- // 是否允许设置保存点,NESTED传播级别时使用,DataSourceTransactionManager类型该属性为true
- txObject.setSavepointAllowed(isNestedTransactionAllowed());
- // 从ThreadLocal获取当前线程上绑定的ConnectionHolder
- // ConnectionHolder对象保存着数据库连接
- // 业务方法第一次执行时为null
- ConnectionHolder conHolder =
- (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
- txObject.setConnectionHolder(conHolder, false);
- return txObject;
- }
复制代码 当前有事务
handleExistingTransaction方法
- if (isExistingTransaction(transaction)) {
- // Existing transaction found -> check propagation behavior to find out how to behave.
- return handleExistingTransaction(def, transaction, debugEnabled);
- }
复制代码 isExistingTransaction方法判断当前是否存在事务:- protected boolean isExistingTransaction(Object transaction) {
- DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
- // 判断存在数据库连接且开启了事务
- return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
- }
复制代码 handleExistingTransaction方法:- private TransactionStatus handleExistingTransaction(
- TransactionDefinition definition, Object transaction, boolean debugEnabled)
- throws TransactionException {
- // 传播级别为NEVER
- if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
- // 传播级别设置为NEVER时,如果当前有事务,则抛出异常
- throw new IllegalTransactionStateException(
- "Existing transaction found for transaction marked with propagation 'never'");
- }
- // 传播级别为NOT_SUPPORTED
- if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
- // 挂起当前事务
- Object suspendedResources = suspend(transaction);
- boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
- return prepareTransactionStatus(
- definition, null, false, newSynchronization, debugEnabled, suspendedResources);
- }
- // 传播级别为REQUIRES_NEW
- if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
- // 挂起当前事务,然后创建新事务
- SuspendedResourcesHolder suspendedResources = suspend(transaction);
- try {
- return startTransaction(definition, transaction, debugEnabled, suspendedResources);
- } catch (RuntimeException | Error beginEx) {
- resumeAfterBeginException(transaction, suspendedResources, beginEx);
- throw beginEx;
- }
- }
- // 传播级别为NESTED
- if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
- if (!isNestedTransactionAllowed()) {
- throw new NestedTransactionNotSupportedException(
- "Transaction manager does not allow nested transactions by default - " +
- "specify 'nestedTransactionAllowed' property with value 'true'");
- }
- // Creating nested transaction
- if (useSavepointForNestedTransaction()) {
- // Create savepoint within existing Spring-managed transaction,
- // through the SavepointManager API implemented by TransactionStatus.
- // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
- DefaultTransactionStatus status =
- prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
- status.createAndHoldSavepoint();
- return status;
- }
- else {
- // Nested transaction through nested begin and commit/rollback calls.
- // Usually only for JTA: Spring synchronization might get activated here
- // in case of a pre-existing JTA transaction.
- return startTransaction(definition, transaction, debugEnabled, null);
- }
- }
- // 传播级别为SUPPORTS/REQUIRED
- if (isValidateExistingTransaction()) {
- if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
- Integer currentIsolationLevel =
- TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
- if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
- Constants isoConstants = DefaultTransactionDefinition.constants;
- throw new IllegalTransactionStateException(
- "Participating transaction with definition [" + definition +
- "] specifies isolation level which is incompatible with existing transaction: " +
- (currentIsolationLevel != null ?
- isoConstants.toCode(currentIsolationLevel,
- DefaultTransactionDefinition.PREFIX_ISOLATION) :
- "(unknown)"));
- }
- }
- if (!definition.isReadOnly()) {
- if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
- throw new IllegalTransactionStateException("Participating transaction with definition [" +
- definition + "] is not marked as read-only but existing transaction is");
- }
- }
- }
- boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
- return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
- }
复制代码 传播级别为NEVER
传播级别设置为NEVER时,如果当前有事务,抛出异常:- if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
- throw new IllegalTransactionStateException(
- "Existing transaction found for transaction marked with propagation 'never'");
- }
复制代码 传播级别为NOT_SUPPORTED
挂起当前事务,业务方法以无事务方式执行:- if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
- // 挂起当前事务
- Object suspendedResources = suspend(transaction);
- boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
- return prepareTransactionStatus(
- definition, null, false, newSynchronization, debugEnabled, suspendedResources);
- }
复制代码 prepareTransactionStatus方法:
- 创建DefaultTransactionStatus对象,把SuspendedResources封装进去,以便后续恢复旧事务
- 使用TransactionSynchronizationManager将事务属性绑定到当前线程
- 初始化当前线程TransactionSynchronization集
由于传播级别为NOT_SUPPORTED所以此处不会开启事务。
传播级别为REQUIRES_NEW
挂起当前事务,创建新事务:- if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
- // 挂起当前事务
- SuspendedResourcesHolder suspendedResources = suspend(transaction);
- try {
- // 开启新事务
- return startTransaction(definition, transaction, debugEnabled, suspendedResources);
- } catch (RuntimeException | Error beginEx) {
- resumeAfterBeginException(transaction, suspendedResources, beginEx);
- throw beginEx;
- }
- }
复制代码 开启新事务:- private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
- boolean debugEnabled, SuspendedResourcesHolder suspendedResources) {
- // 值为true
- boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
- DefaultTransactionStatus status = newTransactionStatus(
- definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
- // 开启新事务
- doBegin(transaction, definition);
- // 初始化当前线程TransactionSynchronization集
- prepareSynchronization(status, definition);
- return status;
- }
- protected void doBegin(Object transaction, TransactionDefinition definition) {
- DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
- Connection con = null;
- try {
- if (!txObject.hasConnectionHolder() ||
- txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
- // 打开一个新连接
- Connection newCon = obtainDataSource().getConnection();
- txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
- }
- txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
- con = txObject.getConnectionHolder().getConnection();
- Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
- txObject.setPreviousIsolationLevel(previousIsolationLevel);
- txObject.setReadOnly(definition.isReadOnly());
- // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
- // so we don't want to do it unnecessarily (for example if we've explicitly
- // configured the connection pool to set it already).
- if (con.getAutoCommit()) {
- txObject.setMustRestoreAutoCommit(true);
- // 设置手动提交
- con.setAutoCommit(false);
- }
- // The default implementation executes a "SET TRANSACTION READ ONLY" statement
- // if the "enforceReadOnly" flag is set to true and the transaction definition
- // indicates a read-only transaction.
- prepareTransactionalConnection(con, definition);
- txObject.getConnectionHolder().setTransactionActive(true);
- int timeout = determineTimeout(definition);
- if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
- txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
- }
- // Bind the connection holder to the thread.
- if (txObject.isNewConnectionHolder()) {
- TransactionSynchronizationManager
- .bindResource(obtainDataSource(), txObject.getConnectionHolder());
- }
- } catch (Throwable ex) {
- if (txObject.isNewConnectionHolder()) {
- DataSourceUtils.releaseConnection(con, obtainDataSource());
- txObject.setConnectionHolder(null, false);
- }
- throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
- }
- }
复制代码 传播级别为NESTED
为当前连接设置保存点,如果业务方法出现异常,会回滚到该保存点位置:- if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
- if (!isNestedTransactionAllowed()) {
- throw new NestedTransactionNotSupportedException(
- "Transaction manager does not allow nested transactions by default - " +
- "specify 'nestedTransactionAllowed' property with value 'true'");
- }
- // Creating nested transaction
- // 默认就是true
- if (useSavepointForNestedTransaction()) {
- // Create savepoint within existing Spring-managed transaction,
- // through the SavepointManager API implemented by TransactionStatus.
- // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
- DefaultTransactionStatus status =
- prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
- // 设置保存点
- status.createAndHoldSavepoint();
- return status;
- } else {
- // Nested transaction through nested begin and commit/rollback calls.
- // Usually only for JTA: Spring synchronization might get activated here
- // in case of a pre-existing JTA transaction.
- return startTransaction(definition, transaction, debugEnabled, null);
- }
- }
复制代码 设置保存点:- public void createAndHoldSavepoint() throws TransactionException {
- setSavepoint(getSavepointManager().createSavepoint());
- }
- // JdbcTransactionObjectSupport#createSavepoint
- public Object createSavepoint() throws TransactionException {
- ConnectionHolder conHolder = getConnectionHolderForSavepoint();
- try {
- if (!conHolder.supportsSavepoints()) {
- throw new NestedTransactionNotSupportedException("不支持");
- }
- if (conHolder.isRollbackOnly()) {
- throw new CannotCreateTransactionException("只读");
- }
- // 使用jdbc设置保存点
- return conHolder.createSavepoint();
- } catch (SQLException ex) {
- throw new CannotCreateTransactionException("Could not create JDBC savepoint", ex);
- }
- }
复制代码 传播级别为SUPPORTS/REQUIRED/MANDATORY
- // 默认false
- if (isValidateExistingTransaction()) {
- if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
- Integer currentIsolationLevel =
- TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
- if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
- Constants isoConstants = DefaultTransactionDefinition.constants;
- throw new IllegalTransactionStateException(
- "Participating transaction with definition [" + definition +
- "] specifies isolation level which is incompatible with existing transaction: " +
- (currentIsolationLevel != null ?
- isoConstants.toCode(currentIsolationLevel,
- DefaultTransactionDefinition.PREFIX_ISOLATION) :
- "(unknown)"));
- }
- }
- if (!definition.isReadOnly()) {
- if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
- throw new IllegalTransactionStateException("Participating transaction with definition [" +
- definition + "] is not marked as read-only but existing transaction is");
- }
- }
- }
- boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
- return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
复制代码 事务挂起
把当前线程上绑定的资源、事务配置信息移除封装到SuspendedResourcesHolder对象,传递给新创建的TransactionStatus对象,以便在业务方法执行结束后恢复旧事务:- protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
- // 判断当前线程的Set<TransactionSynchronization>已经存在
- // TransactionSynchronization: 事务回调同步器,定义了事务挂起、恢复等方法
- // 例如mybatis-spring中有SqlSessionSynchronization实现类
- if (TransactionSynchronizationManager.isSynchronizationActive()) {
- List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
- try {
- Object suspendedResources = null;
- if (transaction != null) {
- // 挂起事务
- suspendedResources = doSuspend(transaction);
- }
- // 清除当前线程事务配置参数:事务名、只读属性、隔离级别等
- String name = TransactionSynchronizationManager.getCurrentTransactionName();
- TransactionSynchronizationManager.setCurrentTransactionName(null);
- boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
- TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
- Integer isolationLevel =
- TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
- TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
- boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
- TransactionSynchronizationManager.setActualTransactionActive(false);
- // 把当前线程的事务相关信息封装起来以便后续恢复
- return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations,
- name, readOnly, isolationLevel, wasActive);
- } catch (RuntimeException | Error ex) {
- // doSuspend failed - original transaction is still active...
- doResumeSynchronization(suspendedSynchronizations);
- throw ex;
- }
- } else if (transaction != null) {
- // Transaction active but no synchronization active.
- Object suspendedResources = doSuspend(transaction);
- return new SuspendedResourcesHolder(suspendedResources);
- } else {
- // Neither transaction nor synchronization active.
- return null;
- }
- }
- private List<TransactionSynchronization> doSuspendSynchronization() {
- List<TransactionSynchronization> suspendedSynchronizations =
- TransactionSynchronizationManager.getSynchronizations();
- // 挂起所有的TransactionSynchronization
- // 比如SqlSessionSynchronization实现类会清除当前线程的SessionFactory
- for (TransactionSynchronization synchronization : suspendedSynchronizations) {
- synchronization.suspend();
- }
- // 清除线程上的TransactionSynchronization集
- TransactionSynchronizationManager.clearSynchronization();
- return suspendedSynchronizations;
- }
- protected Object doSuspend(Object transaction) {
- DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
- txObject.setConnectionHolder(null);
- // ConnectionHolder对象
- return TransactionSynchronizationManager.unbindResource(obtainDataSource());
- }
复制代码 事务恢复
- protected final void resume(Object transaction, SuspendedResourcesHolder resourcesHolder)
- throws TransactionException {
- if (resourcesHolder != null) {
- Object suspendedResources = resourcesHolder.suspendedResources;
- if (suspendedResources != null) {
- // 恢复之前挂起的是ConnectionHolder对象
- doResume(transaction, suspendedResources);
- }
- List<TransactionSynchronization> suspendedSynchronizations =
- resourcesHolder.suspendedSynchronizations;
- if (suspendedSynchronizations != null) {
- TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
- TransactionSynchronizationManager
- .setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
- TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
- TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
- doResumeSynchronization(suspendedSynchronizations);
- }
- }
- }
- protected void doResume(Object transaction, Object suspendedResources) {
- // 恢复之前挂起的是ConnectionHolder对象
- TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
- }
复制代码 当前无事务
- // No existing transaction found -> check propagation behavior to find out how to proceed.
- if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
- // 传播级别设置为PROPAGATION_MANDATORY时,如果当前没有事务,则抛出异常
- throw new IllegalTransactionStateException(
- "No existing transaction found for transaction marked with propagation 'mandatory'");
- } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
- def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
- def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
- SuspendedResourcesHolder suspendedResources = suspend(null);
- // Creating new transaction
- try {
- return startTransaction(def, transaction, debugEnabled, suspendedResources);
- } catch (RuntimeException | Error ex) {
- resume(null, suspendedResources);
- throw ex;
- }
- } else {
- // Create "empty" transaction: no actual transaction, but potentially synchronization.
- if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
- logger.warn("Custom isolation level specified but no actual transaction initiated; " +
- "isolation level will effectively be ignored: " + def);
- }
- boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
- return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
- }
复制代码 创建TransactionInfo
- protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,
- TransactionAttribute txAttr, String joinpointIdentification,
- TransactionStatus status) {
- TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
- if (txAttr != null) {
- // The transaction manager will flag an error if an incompatible tx already exists.
- txInfo.newTransactionStatus(status);
- } else {
- // The TransactionInfo.hasTransaction() method will return false. We created it only
- // to preserve the integrity of the ThreadLocal stack maintained in this class.
- }
- // We always bind the TransactionInfo to the thread, even if we didn't create
- // a new transaction here. This guarantees that the TransactionInfo stack
- // will be managed correctly even if no transaction was created by this aspect.
- txInfo.bindToThread();
- return txInfo;
- }
复制代码 异常处理
- protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
- if (txInfo != null && txInfo.getTransactionStatus() != null) {
- if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
- try {
- txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
- } catch (TransactionSystemException ex2) {
- logger.error("Application exception overridden by rollback exception", ex);
- ex2.initApplicationException(ex);
- throw ex2;
- } catch (RuntimeException | Error ex2) {
- logger.error("Application exception overridden by rollback exception", ex);
- throw ex2;
- }
- } else {
- // We don't roll back on this exception.
- // Will still roll back if TransactionStatus.isRollbackOnly() is true.
- try {
- txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
- } catch (TransactionSystemException ex2) {
- logger.error("Application exception overridden by commit exception", ex);
- ex2.initApplicationException(ex);
- throw ex2;
- } catch (RuntimeException | Error ex2) {
- logger.error("Application exception overridden by commit exception", ex);
- throw ex2;
- }
- }
- }
- }
复制代码 回滚:- public final void rollback(TransactionStatus status) throws TransactionException {
- if (status.isCompleted()) {
- throw new IllegalTransactionStateException("Transaction is already completed");
- }
- DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
- processRollback(defStatus, false);
- }
- private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
- try {
- boolean unexpectedRollback = unexpected;
- try {
- triggerBeforeCompletion(status);
- if (status.hasSavepoint()) {
- // 回滚到指定保存点
- status.rollbackToHeldSavepoint();
- } else if (status.isNewTransaction()) {
- // 事务回滚
- doRollback(status);
- } else {
- // Participating in larger transaction
- if (status.hasTransaction()) {
- if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
- // 设置rollback-only
- doSetRollbackOnly(status);
- }
- }
- // Unexpected rollback only matters here if we're asked to fail early
- if (!isFailEarlyOnGlobalRollbackOnly()) {
- unexpectedRollback = false;
- }
- }
- } catch (RuntimeException | Error ex) {
- triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
- throw ex;
- }
- triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
- // Raise UnexpectedRollbackException if we had a global rollback-only marker
- if (unexpectedRollback) {
- throw new UnexpectedRollbackException(
- "Transaction rolled back because it has been marked as rollback-only");
- }
- } finally {
- // 这里面有恢复挂起事务的逻辑
- cleanupAfterCompletion(status);
- }
- }
- // 回滚到指定保存点
- public void rollbackToSavepoint(Object savepoint) throws TransactionException {
- ConnectionHolder conHolder = getConnectionHolderForSavepoint();
- try {
- conHolder.getConnection().rollback((Savepoint) savepoint);
- conHolder.resetRollbackOnly();
- } catch (Throwable ex) {
- throw new TransactionSystemException("Could not roll back to JDBC savepoint", ex);
- }
- }
- // 释放保存点
- public void releaseSavepoint(Object savepoint) throws TransactionException {
- ConnectionHolder conHolder = getConnectionHolderForSavepoint();
- try {
- conHolder.getConnection().releaseSavepoint((Savepoint) savepoint);
- } catch (Throwable ex) {
- logger.debug("Could not explicitly release JDBC savepoint", ex);
- }
- }
- // 事务回滚
- protected void doRollback(DefaultTransactionStatus status) {
- DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
- Connection con = txObject.getConnectionHolder().getConnection();
- try {
- con.rollback();
- } catch (SQLException ex) {
- throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
- }
- }
- private void cleanupAfterCompletion(DefaultTransactionStatus status) {
- status.setCompleted();
- if (status.isNewSynchronization()) {
- TransactionSynchronizationManager.clear();
- }
- if (status.isNewTransaction()) {
- // 恢复连接的事务属性,比如自动提交方式、隔离级别、只读属性等
- // 将连接归还给数据源,清除ConnectionHolder的conn
- doCleanupAfterCompletion(status.getTransaction());
- }
- if (status.getSuspendedResources() != null) {
- // 恢复之前挂起的事务
- Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
- resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
- }
- }
复制代码 事务清理
- private void restoreThreadLocalStatus() {
- // Use stack to restore old transaction TransactionInfo.
- // Will be null if none was set.
- transactionInfoHolder.set(this.oldTransactionInfo);
- }
复制代码 事务提交
- protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
- if (txInfo != null && txInfo.getTransactionStatus() != null) {
- txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
- }
- }
复制代码 提交:- public final void commit(TransactionStatus status) throws TransactionException {
- if (status.isCompleted()) {
- throw new IllegalTransactionStateException("Transaction is already completed");
- }
- DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
- if (defStatus.isLocalRollbackOnly()) {
- // Transactional code has requested rollback
- // 回滚
- processRollback(defStatus, false);
- return;
- }
- if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
- // Global transaction is marked as rollback-only but transactional code requested commit
- // 回滚
- processRollback(defStatus, true);
- return;
- }
- // 提交事务
- processCommit(defStatus);
- }
- // 提交事务
- private void processCommit(DefaultTransactionStatus status) throws TransactionException {
- try {
- boolean beforeCompletionInvoked = false;
- try {
- boolean unexpectedRollback = false;
- prepareForCommit(status);
- triggerBeforeCommit(status);
- triggerBeforeCompletion(status);
- beforeCompletionInvoked = true;
- if (status.hasSavepoint()) {
- unexpectedRollback = status.isGlobalRollbackOnly();
- // 释放保存点
- status.releaseHeldSavepoint();
- } else if (status.isNewTransaction()) {
- unexpectedRollback = status.isGlobalRollbackOnly();
- // 提交事务
- doCommit(status);
- } else if (isFailEarlyOnGlobalRollbackOnly()) {
- unexpectedRollback = status.isGlobalRollbackOnly();
- }
- // Throw UnexpectedRollbackException if we have a global rollback-only
- // marker but still didn't get a corresponding exception from commit.
- if (unexpectedRollback) {
- throw new UnexpectedRollbackException(
- "Transaction silently rolled back because it has been marked as rollback-only");
- }
- } catch (UnexpectedRollbackException ex) {
- // can only be caused by doCommit
- triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
- throw ex;
- } catch (TransactionException ex) {
- // can only be caused by doCommit
- if (isRollbackOnCommitFailure()) {
- doRollbackOnCommitException(status, ex);
- } else {
- triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
- }
- throw ex;
- } catch (RuntimeException | Error ex) {
- if (!beforeCompletionInvoked) {
- triggerBeforeCompletion(status);
- }
- doRollbackOnCommitException(status, ex);
- throw ex;
- }
- // Trigger afterCommit callbacks, with an exception thrown there
- // propagated to callers but the transaction still considered as committed.
- try {
- triggerAfterCommit(status);
- } finally {
- triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
- }
- } finally {
- cleanupAfterCompletion(status);
- }
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |