《spring源码深度解析》spring Jdbc(事务)

———— 5.1.3.RELEASE
words: 7.4k    views:    time: 37min

事务一般指数据库事务,是由一系列对系统中数据进行访问或更新操作组成的一个执行单元。主要是当发生并发操作时,事务可以起到一个隔离作用,即避免各个操作相互影响,另外为数据库提供了一个从失败中恢复到正常状态的方法,同时保证了数据库即使在异常状态下仍能保持数据的一致性。

以上可以概括成事务的四个特性,简称ACID

  • 原子性:即事务是原子性操作,要么不执行,要么全部执行成功;

  • 一致性:事务的执行不能破坏数据的完整性和一致性,即事务的执行没有中间状态,数据只有事务执行前的状态或执行后的状态;

  • 隔离性:在并发场景下,事务之间应该是相互隔离的,如果不同的事务同时访问相同的数据,则每个事务应该有各自完整的数据空间,即一个事务内部的操作及使用的数据对于其它事务是隔离的。在标准SQL规范中,定义了4种事务隔离级别,spring分别提供了对应的设置:

  1. 未授权读(READ_UNCOMMITTED):最低隔离级别,运行脏读,即一个事务可以读取别的事务还没有提交的数据;
  2. 授权读(READ_COMMITTED):保证一个事务的修改只有提交后才能被另外一个事务读取;
  3. 可重复读(REPEATABLE_READ):保证一个事务在处理的过程中,多次读取同一个数据时,其值都是与事务开始时一致的,即便在过程中有其它的事务进行了提交;
  4. 串行化(SERIALIZABLE):事务被要求顺序执行;
  • 持久性:事务一旦提交,其对数据的状态变更将是永久性的;

示例 @Transactional

这里看一个简单的示例,对于单个Jdbc数据源,使用spring中常见的声明式事务@Transactional

pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.1.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>5.1.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>5.1.3.RELEASE</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.13</version>
</dependency>
User.java
1
2
3
4
5
6
7
8
9
10
11
12
13
public class User {

private long id;

private String name;

public User(long id, String name){
this.id = id;
this.name = name;
}

// ...
}
UserService.java
1
2
3
4
public interface UserService {

void save(User user);
}

在目标方法上添加事务注解@Transactional

UserServiceImpl.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class UserServiceImpl implements UserService {

private JdbcTemplate jdbcTemplate;

public void setDataSource(DataSource dataSource){
jdbcTemplate = new JdbcTemplate(dataSource);
}

@Override
@Transactional
public void save(User user) {
jdbcTemplate.update("insert into user(id, name)values(?, ?)", new Object[]{user.getId(), user.getName()});
}
}

在配置文件中添加事务驱动开关annotation-driven

bean.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd">

<!-- DataSource有很多实现,这里直接使用简单DriverManagerDataSource,实际上没有缓存 -->
<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="com.mysql.cj.jdbc.Driver" />
<property name="url" value="jdbc:mysql://192.168.141.13:3306/demo" />
<property name="username" value="shanhm" />
<property name="password" value="shanhm" />
</bean>

<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>

<tx:annotation-driven transaction-manager="transactionManager" />

<bean id="userService" class="test.springjdbc.UserServiceImpl">
<property name="dataSource" ref="dataSource" />
</bean>
</beans>
App.java
1
2
3
4
5
6
7
8
9
10
public class App {

public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("bean.xml");
UserService userService = context.getBean(UserService.class);

User user = new User(1, "shanhm1991");
userService.save(user);
}
}

实现

Spring将事务管理抽象成了统一的接口PlatformTransactionManager,并提供一些具体的实现:

  • DataSourceTransactionManager:提供对单个javax.sql.DataSource的事务管理,主要用于Spring Jdbc、MyBatis。这种事务管理比较简单,可以直接将事务操作委托给具体的Connection或者SqlSession。

  • JtaTransactionManager:提供对全局事务管理的支持,比如常见的涉及到多个数据源的事务,这种场景一般涉及到多个事务,会通过设置保存点和回滚标识来解决,具体往下面看实现过程;

另外,一些第三方厂商也提供了基于spring管理器是的事务实现,比如HibernateTransactionManager

为了方便厘清下面具体的实现,这里画两个简单UMl图进行辅助:

  • TransactionDefinition:事务的属性定义
    String getName():事务名称
    int getIsolationLevel():事务隔离级别
    int getPropagationBehavior():事务传播行为
    int getTimeout():事务超时时间
    boolean isReadOnly():事务是否只读

  • TransactionStatus:事务的状态信息
    isNewTransaction():当前事务否是新事务
    hasSavepoint():当前事务是否有保存点
    setRollbackOnly():设置当前事务应该回滚
    isRollbackOnly(():返回当前事务是否应该回滚
    flush():用于刷新底层会话中的修改到数据库,一般用于刷新如Hibernate/JPA的会话,可能对如JDBC类型的事务无任何影响
    isCompleted():当前事务否已经完成

1. 解析事务

根据之前经验,从自定义标签开始进行分析,首先定位到解析器AnnotationDrivenBeanDefinitionParser

org.springframework.transaction.config.TxNamespaceHandler
1
2
3
4
5
public void init() {
registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
}

AnnotationDrivenBeanDefinitionParser根据属性mode,决定是用代理还是AspectJ方式切入事务,默认走代理方式

org.springframework.transaction.config.AnnotationDrivenBeanDefinitionParser
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public BeanDefinition parse(Element element, ParserContext parserContext) {
registerTransactionalEventListenerFactory(parserContext);
String mode = element.getAttribute("mode");
if ("aspectj".equals(mode)) {
// mode="aspectj"
registerTransactionAspect(element, parserContext);
if (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader())) {
registerJtaTransactionAspect(element, parserContext);
}
}else {
// mode="proxy"
AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
}
return null;
}

走代理方式首先是检查注册几个BeanDefinition,通过特定名称TRANSACTION_ADVISOR_BEAN_NAME来检查和注册,主要是注册了
InfrastructureAdvisorAutoProxyCreatorTransactionAttributeSourceAdvisor,并且将TransactionAttributeSourceTransactionInterceptor组装到TransactionAttributeSourceAdvisor中。

org.springframework.transaction.config.AopAutoProxyConfigurer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public static void configureAutoProxyCreator(Element element, ParserContext parserContext) {

// 检查注册InfrastructureAdvisorAutoProxyCreator
AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element);

// 检查是否注册过TransactionAttributeSourceAdvisor,如果没有则进行注册和组装
String txAdvisorBeanName = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME;
if (!parserContext.getRegistry().containsBeanDefinition(txAdvisorBeanName)) {
Object eleSource = parserContext.extractSource(element);

// 注册TransactionAttributeSource
RootBeanDefinition sourceDef = new RootBeanDefinition(
"org.springframework.transaction.annotation.AnnotationTransactionAttributeSource");
sourceDef.setSource(eleSource);
sourceDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
String sourceName = parserContext.getReaderContext().registerWithGeneratedName(sourceDef);

// 注册TransactionInterceptor
RootBeanDefinition interceptorDef = new RootBeanDefinition(TransactionInterceptor.class);
interceptorDef.setSource(eleSource);
interceptorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
registerTransactionManager(element, interceptorDef);
interceptorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
String interceptorName = parserContext.getReaderContext().registerWithGeneratedName(interceptorDef);

// 注册TransactionAttributeSourceAdvisor
RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class);
advisorDef.setSource(eleSource);
advisorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);

// 给TransactionAttributeSourceAdvisor注入TransactionAttributeSource和TransactionInterceptor
advisorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
advisorDef.getPropertyValues().add("adviceBeanName", interceptorName);
if (element.hasAttribute("order")) {
advisorDef.getPropertyValues().add("order", element.getAttribute("order"));
}
parserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef);

// 将上面三个bean定义用TRANSACTION_ADVISOR_BEAN_NAME声明,下一次检查将检测到已注册
CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), eleSource);
compositeDef.addNestedComponent(new BeanComponentDefinition(sourceDef, sourceName));
compositeDef.addNestedComponent(new BeanComponentDefinition(interceptorDef, interceptorName));
compositeDef.addNestedComponent(new BeanComponentDefinition(advisorDef, txAdvisorBeanName));
parserContext.registerComponent(compositeDef);
}
}
InfrastructureAdvisorAutoProxyCreator

InfrastructureAdvisorAutoProxyCreator实现了InstantiationAwareBeanPostProcessor,也就是在bean完成初始化,属性注入之前,它可以进行处理,这里的处理是:如果能找到匹配的增强器,则创建对应的代理,并用代理代替原有的bean

org.springframework.aop.framework.autoproxy.AbstractAutoProxyCreator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
if (bean != null) {
Object cacheKey = getCacheKey(bean.getClass(), beanName);
if (!this.earlyProxyReferences.contains(cacheKey)) {
return wrapIfNecessary(bean, beanName, cacheKey);
}
}
return bean;
}

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// 如果已经处理过,则忽略
if (StringUtils.hasLength(beanName) && this.targetSourcedBeans.contains(beanName)) {
return bean;
}
if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {
return bean;
}
// 如果是内置类,或者不需要代理,则忽略
if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) {
this.advisedBeans.put(cacheKey, Boolean.FALSE);
return bean;
}

// 找出适合bean的增强器,创建代理
Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
if (specificInterceptors != DO_NOT_PROXY) {
this.advisedBeans.put(cacheKey, Boolean.TRUE);
Object proxy = createProxy(
bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
this.proxyTypes.put(cacheKey, proxy.getClass());
return proxy;
}

this.advisedBeans.put(cacheKey, Boolean.FALSE);
return bean;
}
1.1 getAdvicesAndAdvisorsForBean

寻找匹配增强器即,首先获取所有的候选增强器,然后进行匹配

org.springframework.aop.framework.autoproxy.AbstractAdvisorAutoProxyCreator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected Object[] getAdvicesAndAdvisorsForBean(
Class<?> beanClass, String beanName, @Nullable TargetSource targetSource) {

List<Advisor> advisors = findEligibleAdvisors(beanClass, beanName);
if (advisors.isEmpty()) {
return DO_NOT_PROXY;
}
return advisors.toArray();
}

protected List<Advisor> findEligibleAdvisors(Class<?> beanClass, String beanName) {
List<Advisor> candidateAdvisors = findCandidateAdvisors();
List<Advisor> eligibleAdvisors = findAdvisorsThatCanApply(candidateAdvisors, beanClass, beanName);
extendAdvisors(eligibleAdvisors);
if (!eligibleAdvisors.isEmpty()) {
eligibleAdvisors = sortAdvisors(eligibleAdvisors);
}
return eligibleAdvisors;
}
1.1.1. findCandidateAdvisors

找出所有Advisor的定义,并进行实例化和保存

org.springframework.aop.framework.autoproxy.BeanFactoryAdvisorRetrievalHelper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public List<Advisor> findAdvisorBeans() {
// Determine list of advisor bean names, if not cached already.
String[] advisorNames = this.cachedAdvisorBeanNames;
if (advisorNames == null) {
// 找出容器中所有Advisor对应的beanName
advisorNames = BeanFactoryUtils.beanNamesForTypeIncludingAncestors(
this.beanFactory, Advisor.class, true, false);
this.cachedAdvisorBeanNames = advisorNames;
}
if (advisorNames.length == 0) {
return new ArrayList<>();
}

List<Advisor> advisors = new ArrayList<>();
for (String name : advisorNames) {
if (isEligibleBean(name)) {
if (this.beanFactory.isCurrentlyInCreation(name)) {
if (logger.isTraceEnabled()) {
logger.trace("Skipping currently created advisor '" + name + "'");
}
}else {
try {
advisors.add(this.beanFactory.getBean(name, Advisor.class));
}catch (BeanCreationException ex) {
Throwable rootCause = ex.getMostSpecificCause();
if (rootCause instanceof BeanCurrentlyInCreationException) {
BeanCreationException bce = (BeanCreationException) rootCause;
String bceBeanName = bce.getBeanName();
if (bceBeanName != null && this.beanFactory.isCurrentlyInCreation(bceBeanName)) {
if (logger.isTraceEnabled()) {
logger.trace("Skipping advisor '" + name +
"' with dependency on currently created bean: " + ex.getMessage());
}
// Ignore: indicates a reference back to the bean we're trying to advise.
// We want to find advisors other than the currently created bean itself.
continue;
}
}
throw ex;
}
}
}
}
return advisors;
}
1.1.2. findAdvisorsThatCanApply

逐个匹配筛选,并区分引介增强器

org.springframework.aop.support.AopUtils
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static List<Advisor> findAdvisorsThatCanApply(List<Advisor> candidateAdvisors, Class<?> clazz) {
if (candidateAdvisors.isEmpty()) {
return candidateAdvisors;
}
List<Advisor> eligibleAdvisors = new ArrayList<>();
// 首先处理引介增强器
for (Advisor candidate : candidateAdvisors) {
if (candidate instanceof IntroductionAdvisor && canApply(candidate, clazz)) {
eligibleAdvisors.add(candidate);
}
}
boolean hasIntroductions = !eligibleAdvisors.isEmpty();
for (Advisor candidate : candidateAdvisors) {
if (candidate instanceof IntroductionAdvisor) {
// 引介增强器上面已经处理过,直接忽略
continue;
}
if (canApply(candidate, clazz, hasIntroductions)) {
eligibleAdvisors.add(candidate);
}
}
return eligibleAdvisors;
}

其中对PointcutAdvisor类型的筛选,就是上面注册的BeanFactoryTransactionAttributeSourceAdvisor,其getPointcut()返回的是一个匿名内部类TransactionAttributeSourcePointcut,并且在getTransactionAttributeSource实现中返回了上面在BeanFactoryTransactionAttributeSourceAdvisor中注入的TransactionInterceptor

org.springframework.aop.support.AopUtils
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) {
if (advisor instanceof IntroductionAdvisor) {
return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);
}else if (advisor instanceof PointcutAdvisor) {
// pca即上面注册的TransactionAttributeSourceAdvisor,getPointcut返回TransactionAttributeSourcePointcut
PointcutAdvisor pca = (PointcutAdvisor) advisor;
return canApply(pca.getPointcut(), targetClass, hasIntroductions);
}else {
// It doesn't have a pointcut so we assume it applies.
return true;
}
}

// pc即TransactionAttributeSourcePointcut
public static boolean canApply(Pointcut pc, Class<?> targetClass, boolean hasIntroductions) {
Assert.notNull(pc, "Pointcut must not be null");
if (!pc.getClassFilter().matches(targetClass)) {
return false;
}

MethodMatcher methodMatcher = pc.getMethodMatcher(); // return this; 返回自己
if (methodMatcher == MethodMatcher.TRUE) {
return true; // 匹配任意方法
}

IntroductionAwareMethodMatcher introductionAwareMethodMatcher = null;
if (methodMatcher instanceof IntroductionAwareMethodMatcher) {
introductionAwareMethodMatcher = (IntroductionAwareMethodMatcher) methodMatcher;
}

Set<Class<?>> classes = new LinkedHashSet<>();
if (!Proxy.isProxyClass(targetClass)) { // 如果是cglib代理,则获取其父类,因为它是基于继承实现的
classes.add(ClassUtils.getUserClass(targetClass));
}

classes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetClass)); // 获取目标类实现的所有接口

// 遍历目标类及其实现的接口中所有方法,只要有一个可以匹配,就返回true
for (Class<?> clazz : classes) {
Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz);
for (Method method : methods) {
if (introductionAwareMethodMatcher != null ? // 匹配方式 区分是否是引介增强器
introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions) :
methodMatcher.matches(method, targetClass)) {
return true;
}
}
}
return false;
}

事实上,确认目标class是否匹配增强器,就是看是否可以从目标类,或者目标类的public方法,或者目标类实现的接口,或者目标类实现的接口的方法上提取到事务属性

org.springframework.transaction.interceptor.TransactionAttributeSourcePointcut
1
2
3
4
5
6
7
8
9
public boolean matches(Method method, Class<?> targetClass) {
if (TransactionalProxy.class.isAssignableFrom(targetClass) ||
PlatformTransactionManager.class.isAssignableFrom(targetClass) ||
PersistenceExceptionTranslator.class.isAssignableFrom(targetClass)) {
return false;
}
TransactionAttributeSource tas = getTransactionAttributeSource();
return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
}
org.springframework.transaction.interceptor.AbstractFallbackTransactionAttributeSource
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
if (method.getDeclaringClass() == Object.class) {
return null;
}

TransactionAttribute cached = this.attributeCache.get(cacheKey);
if (cached != null) { // 尝试获取缓存
if (cached == NULL_TRANSACTION_ATTRIBUTE) {
return null;
}else {
return cached;
}
}else { // 解析并存入缓存
TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
if (txAttr == null) {
this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
}else {
String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
if (txAttr instanceof DefaultTransactionAttribute) {
((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
}
if (logger.isTraceEnabled()) {
logger.trace("Adding transactional method '"
+ methodIdentification + "' with attribute: " + txAttr);
}
this.attributeCache.put(cacheKey, txAttr);
}
return txAttr;
}
}

protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
// 实际就是只对public方法进行处理,allowPublicMethodsOnly在AnnotationTransactionAttributeSource构造时传的true
if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
return null;
}

// specificMethod代表实现类中的方法
Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);

// 先从目标方法上获取事务属性
TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
if (txAttr != null) {
return txAttr;
}

// 再尝试从目标类上面获取事务属性
txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}

if (specificMethod != method) {
// 再尝试从接口方法上获取
txAttr = findTransactionAttribute(method);
if (txAttr != null) {
return txAttr;
}
// 再尝试从接口类上获取
txAttr = findTransactionAttribute(method.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
}
return null;
}

最终对于事务属性的提取则委托给了SpringTransactionAnnotationParser

org.springframework.transaction.annotation.SpringTransactionAnnotationParser
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();

Propagation propagation = attributes.getEnum("propagation");
rbta.setPropagationBehavior(propagation.value());
Isolation isolation = attributes.getEnum("isolation");
rbta.setIsolationLevel(isolation.value());
rbta.setTimeout(attributes.getNumber("timeout").intValue());
rbta.setReadOnly(attributes.getBoolean("readOnly"));
rbta.setQualifier(attributes.getString("value"));

List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
rbta.setRollbackRules(rollbackRules);
return rbta;
}
1.2. createProxy

参考之前的笔记:[spring. Aop],上面注册的BeanFactoryTransactionAttributeSourceAdvisor作为Advisor的实现,最终创建代理时会获取所有对应的Advisor,并获取其中的Advice适配成MethodInterceptor,然后组织成一个调用链。而这里getAdvice()返回的实际就是上面注入的TransactionInterceptor,而且它已经是一个MethodInterceptor

org.springframework.aop.support.AbstractBeanFactoryPointcutAdvisor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public Advice getAdvice() {
Advice advice = this.advice;
if (advice != null) {
return advice;
}

// adviceBeanName 就是上面一开始注入的TransactionInterceptor
Assert.state(this.adviceBeanName != null, "'adviceBeanName' must be specified");
Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve 'adviceBeanName'");

if (this.beanFactory.isSingleton(this.adviceBeanName)) {
advice = this.beanFactory.getBean(this.adviceBeanName, Advice.class);
this.advice = advice;
return advice;
}else {
synchronized (this.adviceMonitor) {
advice = this.advice;
if (advice == null) {
advice = this.beanFactory.getBean(this.adviceBeanName, Advice.class);
this.advice = advice;
}
return advice;
}
}
}

2. 执行事务 TransactionInterceptor

对于声明式事务的处理主要有以下几个步骤:

  1. 获取事务属性
  2. 获取TransactionManager
  3. 创建事务
  4. 在目标方法执行前获取事务并收集事务信息TransactionInfo
  5. 执行目标方法
  6. 异常回滚处理
  7. 提交事务前的事务信息清除
  8. 提交事务
org.springframework.transaction.interceptor.TransactionAspectSupport
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {

// 获取事务属性
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);

// 获取TransactionManager
final PlatformTransactionManager tm = determineTransactionManager(txAttr);

// 获取方法唯一标识
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

// 声明式事务
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// 创建TransactionInfo
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// 执行目标方法
retVal = invocation.proceedWithInvocation();
}catch (Throwable ex) {
// 异常回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}finally {
// 清除信息
cleanupTransactionInfo(txInfo);
}
// 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}else {
// 编程式事务
final ThrowableHolder throwableHolder = new ThrowableHolder();
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
try {
return invocation.proceedWithInvocation();
}catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}else {
throw new ThrowableHolderException(ex);
}
}else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
}finally {
cleanupTransactionInfo(txInfo);
}
});

// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}catch (ThrowableHolderException ex) {
throw ex.getCause();
}catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
}catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
}
}
2.1. determineTransactionManager

对于事务属性,前面解析时已经保存好了,不再赘述,这里看一下TransactionManager的获取

org.springframework.transaction.interceptor.TransactionAspectSupport
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//Determine the specific transaction manager to use for the given transaction.
@Nullable
protected PlatformTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
// Do not attempt to lookup tx manager if no tx attributes are set
if (txAttr == null || this.beanFactory == null) {
return getTransactionManager();
}

String qualifier = txAttr.getQualifier();
if (StringUtils.hasText(qualifier)) { // 获取事务本身,即@Transactional指定的TransactionManager
return determineQualifiedTransactionManager(this.beanFactory, qualifier);
}else if (StringUtils.hasText(this.transactionManagerBeanName)) { // 获取annotation-driven指定的,如示例
return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
}else { // 前面两个都没有指定,则自己设置一个默认的
PlatformTransactionManager defaultTransactionManager = getTransactionManager();
if (defaultTransactionManager == null) {
defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY);
if (defaultTransactionManager == null) {
defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class);
this.transactionManagerCache.putIfAbsent(
DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
}
}
return defaultTransactionManager;
}
}
2.2. createTransactionIfNecessary

创建事务主要分为两步,即获取事务和构建事务信息

org.springframework.transaction.interceptor.TransactionAspectSupport
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}

TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
status = tm.getTransaction(txAttr);
}else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
2.2.1. getTransaction

具体的获取事务又分成了以下几个步骤

  1. 获取事务
  2. 如果当前线程存在事务,则转向嵌套事务处理
  3. 事务超时设置验证
  4. 事务propagationBehavior属性的设置验证
  5. 构建DefaultTransactionStatus
  6. 完善transaction
org.springframework.transaction.support.AbstractPlatformTransactionManager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {

Object transaction = doGetTransaction();

boolean debugEnabled = logger.isDebugEnabled();

if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}

// 判断当前线程是否存在事务:存在ConnectionHolder并且其transactionActive不为空
if (isExistingTransaction(transaction)) {
return handleExistingTransaction(definition, transaction, debugEnabled);
}

// 事务超时校验
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}

// 没有存在的事务,但是propagationBehavior却声明为PROPAGATION_MANDATORY,则异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED
|| definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW
|| definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// PROPAGATION_REQUIRED PROPAGATION_REQUIRES_NEW PROPAGATION_NESTED 都需要创建事务
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 构造transaction,包括设置ConnectionHolder、隔离级别、timeout,如果是新连接,则绑定到当前线程
doBegin(transaction, definition);
// 同步事务设置,针对当前前程
prepareSynchronization(status, definition);
return status;
}catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
2.2.1.1. doGetTransaction

对于JDBC事务实例的真正获取,则委托给了DataSourceTransactionManager,尝试根据当前DataSource从线程缓存中获取对应的ConnectionHolder,至于这个缓存就是TransactionSynchronizationManager.resources,然后将获取的结果封装成事务对象DataSourceTransactionObject返回

org.springframework.jdbc.datasource.DataSourceTransactionManager
1
2
3
4
5
6
7
8
9
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// 获取ThreadLocal中的值,没有就是null
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
2.2.1.2. doBegin

doBegin开始进行真正的事务创建

  1. 从DataSource获取Connection
  2. 设置Connection的隔离级别和只读标识
  3. 设置Connection为手动提交
  4. 标识当前连接已经激活事务
  5. 设置超时时间
  6. 绑定当前线程

如果上面DataSourceTransactionManager获取的事务对象中没有ConnectionHolder,则这创建一个,其实就是手动从DataSource获取一个Connection,然后用其构造一个ConnectionHolder并交给事务对象,并标记成是新创建的,接下来就是一些设置。

org.springframework.jdbc.datasource.DataSourceTransactionManager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;

try {
if (!txObject.hasConnectionHolder() // 如果上面缓存中没有获取到,则新建一个连接
|| txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}

txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();

// 设置连接:con.setReadOnly以及con.setTransactionIsolation
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);

// 设置成手动提交
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}

prepareTransactionalConnection(con, definition); // 如果强制事务只读,则执行"SET TRANSACTION READ ONLY"
txObject.getConnectionHolder().setTransactionActive(true); // 标识当前连接已经激活事务

int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}

// 绑定当前线程
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
2.2.1.3. prepareSynchronization

将事务信息记录到当前线程中,其实就是记到各个ThreadLocal中,不过事先会将之前创建的事务对象包装成TransactionStatus

org.springframework.transaction.support.AbstractPlatformTransactionManager
1
2
3
4
5
6
7
8
9
10
11
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}
2.2.1.4. handleExistingTransaction

对于已经处于事务中的连接,则根据propagationBehavior做一些区别处理,具体定义了如下传播行为

  • REQUIRED:支持当前事务,如果当前没有事务,就新建一个事务。这是最常见的选择。
  • SUPPORTS:支持当前事务,如果当前没有事务,就以非事务方式执行。
  • MANDATORY:支持当前事务,如果当前没有事务,就抛出异常。
  • REQUIRES_NEW:新建事务,如果当前存在事务,把当前事务挂起。
  • NOT_SUPPORTED:以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
  • NEVER:以非事务方式执行,如果当前存在事务,则抛出异常。
  • NESTED:支持当前事务,如果当前事务存在,则执行一个嵌套事务,如果当前没有事务,就新建一个事务。
org.springframework.transaction.support.AbstractPlatformTransactionManager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)throws TransactionException {

// 如果设置NEVER,则直接抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}

// 如果设置NOT_SUPPORTED,则挂起当前事务,然后直接执行
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}

// 如果设置REQUIRES_NEW,则挂起当前事务,并创建新的事务执行
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}

// 如果设置NESTED,则创建嵌套事务,不过会将保存点作为其回滚处理,
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}

// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}

// 校验隔离级别和只读属性
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

这里的挂起就是将当前线程记录的事务信息保存下来,并重置ThreadLocal中的值

org.springframework.transaction.support.AbstractPlatformTransactionManager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
suspendedResources = doSuspend(transaction);
}

String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);

boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);

Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);

boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);

return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
}else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}else {
// Neither transaction nor synchronization active.
return null;
}
}
2.2.2. prepareTransactionInfo

最终将TransactionStatus再封装成TransactionInfo,只是这里不管当前方法是否应用事务,都创建一个TransactionInfo并绑定到线程变量中,后面如果事务执行失败,spring会通过TransactionInfo实例中的信息进行回滚等操作。

org.springframework.transaction.interceptor.TransactionAspectSupport
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {

TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
// We need a transaction for this method...
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// The transaction manager will flag an error if an incompatible tx already exists.
txInfo.newTransactionStatus(status);
}else {
// The TransactionInfo.hasTransaction() method will return false. We created it only
// to preserve the integrity of the ThreadLocal stack maintained in this class.
if (logger.isTraceEnabled()) {
logger.trace("Don't need to create transaction for [" + joinpointIdentification +
"]: This method isn't transactional.");
}
}

// We always bind the TransactionInfo to the thread, even if we didn't create
// a new transaction here. This guarantees that the TransactionInfo stack
// will be managed correctly even if no transaction was created by this aspect.
txInfo.bindToThread();
return txInfo;
}
2.3. completeTransactionAfterThrowing

当catch到错误时,存在事务的情况下会对特定的异常回滚

org.springframework.transaction.interceptor.TransactionAspectSupport
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
// 首先判断当前是否存在事务
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
// 对特定的错误进行回滚操作
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
}else {
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}

默认只对RuntimeExceptionError进行回滚操作

org.springframework.transaction.interceptor.DefaultTransactionAttribute
1
2
3
public boolean rollbackOn(Throwable ex) {
return (ex instanceof RuntimeException || ex instanceof Error);
}
2.3.1. rollback

如果存在保存点,则回滚到保存点,不管如何封装,最终的操作都是委托给了Connection,即数据库驱动提供的实现,另外spring还提供了一个事件注册器,可以通过TransactionSynchronizationManager.registerSynchronization来注册当发生回滚或提交时需要执行的操作。

org.springframework.transaction.support.AbstractPlatformTransactionManager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) { // 如果事务已经完成,再次回滚则抛出异常
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}

DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false);
}

private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;

try {
triggerBeforeCompletion(status); // 触发beforeCompletion
if (status.hasSavepoint()) { // 如果有保存点,则回退到保存点
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}else if (status.isNewTransaction()) { // 如果当前是独立的新事务,则直接回退
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
doRollback(status);
}else {
// 如果当前不是独立的事务,则标记状态,等到事务链执行完毕后统一回滚
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status); // 设置当前事务rollbackOnly=true,即后面处理时无条件回滚
}else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}

// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}

// 激活afterCompletion
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

// 清空记录的资源,并将挂起的资源恢复
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}finally {
cleanupAfterCompletion(status);
}
}
  • cleanupAfterCompletion

回滚事务之后执行一些事务清理操作

org.springframework.transaction.support.AbstractPlatformTransactionManager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
status.setCompleted(); // 设置完成状态

// 如果当前事务是新的同步状态,需要将绑定到当前线程的事务信息清除
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.clear();
}

// 如果是新事务,做一些清除工作,比如解绑DataSource,释放连接等
if (status.isNewTransaction()) {
doCleanupAfterCompletion(status.getTransaction());
}

if (status.getSuspendedResources() != null) { //恢复之前挂起的事务
if (status.isDebug()) {
logger.debug("Resuming suspended transaction after completion of inner transaction");
}
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
2.4. commitTransactionAfterReturning
org.springframework.transaction.interceptor.TransactionAspectSupport
1
2
3
4
5
6
7
8
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}

上面在分析事务回滚时,对于既没有保存点又不是新事务的事务,只是设置了一个rollbackOnly标识。即spring会对无法设置保存点的内嵌事务设置回滚标识来禁止提交,这样当外部事务提交时,如果检测到回滚标识,则由外部事务来进行整体事务的回滚。

对于内嵌事务,spring的处理方式是在内嵌事务开始之前设置保存点,一旦内嵌事务出现异常便根据保存点信息进行回滚,如果没有出现异常,内嵌事务也不会单独提交,而是事务链由最外层事务负责提交,所以如果当时事务存在保存点信息就证明它不是最外层事务。

  • commit
org.springframework.transaction.support.AbstractPlatformTransactionManager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}

// 如果事务链中设置了回滚,则不会尝试提交,直接回滚
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}

if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}

processCommit(defStatus);
}


参考:

  1. 《spring源码深度解析》 郝佳
  2. 《分布式一致性原理与实践》
  3. https://www.cnblogs.com/superming/p/11419905.html
  4. https://blog.csdn.net/weixin_30908707/article/details/98464075