多线程事务控制

在某些环境下面,我们需要使用一次性向数据库中插入大量数据,如果采用单线程,可能速度较慢,此时大部分同学第一时间就会想到采用多线程来进行操作。

但是多线程如果无法保证事务的一致性,会造成数据库中出现大量的脏数据。

这里就谈谈多线程事务如何控制,这里的多线程控制基于分布式事务2PC提交思想,但是要注意的是多线程事务控制破环了事务的隔离性,即有一个线程发生异常,其他线程得跟着回滚,并且多线程事务会占用大量数据库连接,所以在大部分场景下是不推荐使用的,弊端很多。


创建SpringBoot项目

multi-thread-transaction

Pom文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.8</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.itjing</groupId>
<artifactId>multi-thread-transaction</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>multi-thread-transaction</name>
<description>multi-thread-transaction</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>

<!-- guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>

<!-- mybatis -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>

<!-- mysql -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>

<!-- web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

<!-- test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

</project>

YML配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14

server:
port: 33333
spring:
application:
name: multi-thread-transaction
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/springboot?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true&nullCatalogMeansCurrent=true&allowMultiQueries=true&rewriteBatchedStatements=true
username: root
password: root

mybatis:
mapper-locations: classpath:mapper/**/*.xml

线程池配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.itjing.transaction.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

/**
* @description: 线程池配置类
* @author: lijing
* @date: 2023-01-29 21:38
*/
@EnableAsync
@Configuration
public class TaskPoolConfig {

@Bean("taskExecutor")
public Executor taskExecutor() {
// 返回可用处理器的Java虚拟机的数量
// int i = Runtime.getRuntime().availableProcessors();
// System.out.println("系统最大线程数 : " + i);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程池大小,视机器情况而定
executor.setCorePoolSize(16);
// 最大线程数
executor.setMaxPoolSize(20);
// 配置队列容量,默认值为Integer.MAX_VALUE
executor.setQueueCapacity(99999);
// 活跃时间
executor.setKeepAliveSeconds(60);
// 线程名字前缀
executor.setThreadNamePrefix("asyncServiceExecutor-");
// 设置此执行程序应该在关闭时阻止的最大秒数,以便在容器的其余部分继续关闭之前等待剩余的任务完成他们的执行
executor.setAwaitTerminationSeconds(60);
// 等待所有的任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 必须初始化
executor.initialize();
return executor;
}

}

用户实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.itjing.transaction.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* @description: 用户实体类
* @author: lijing
* @date: 2023-01-29 21:20
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {

private String name;

private Integer age;

private String email;

private String phone;

}

用户持久层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.itjing.transaction.dao;

import com.itjing.transaction.entity.User;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

import java.util.List;

/**
* @description: 用户持久层接口
* @author: lijing
* @date: 2023-01-29 21:22
*/
@Mapper
public interface UserMapper {

int batchInsert(@Param("userList") List<User> userList);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.itjing.transaction.dao.UserMapper">

<resultMap id="BaseMap" type="com.itjing.transaction.entity.User">
<result column="name" property="name"/>
<result column="age" property="age"/>
<result column="email" property="email"/>
<result column="phone" property="phone"/>
</resultMap>

<sql id="BaseColumns">
name,age,email,phone
</sql>

<insert id="batchInsert">
insert into user(name,age,email,phone)
values
<foreach item="item" collection="userList" separator=",">
(
#{item.name},
#{item.age},
#{item.email},
#{item.phone}
)
</foreach>
</insert>
</mapper>

事务工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.itjing.transaction.util;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;

/**
* @description: 事务工具类
* @author: lijing
* @date: 2023-01-29 21:34
*/
@Component
public class TransactionalUtil {

@Autowired
private DataSourceTransactionManager dataSourceTransactionManager;

/**
* 开启事务
*
* @return
*/
public TransactionStatus begin() {
TransactionStatus transaction = dataSourceTransactionManager.getTransaction(new DefaultTransactionAttribute());
return transaction;
}

/**
* 提交事务
*
* @return
*/
public void commit(TransactionStatus transactionStatus) {
dataSourceTransactionManager.commit(transactionStatus);
}

/**
* 回滚事务
*
* @return
*/
public void rollback(TransactionStatus transactionStatus) {
dataSourceTransactionManager.rollback(transactionStatus);
}

}

用户业务层

其中使用两种方式实现线程事务控制,一个是CyclicBarrier,一个是CountDownLatch。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.itjing.transaction.service;

import com.itjing.transaction.entity.User;

import java.util.List;

/**
* @description: 用户业务接口
* @author: lijing
* @date: 2023-01-29 21:21
*/
public interface UserService {

void batchInsert1(List<User> userList,int threads);

void batchInsert2(List<User> userList,int threads);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package com.itjing.transaction.service.impl;

import com.google.common.collect.Lists;
import com.itjing.transaction.dao.UserMapper;
import com.itjing.transaction.entity.User;
import com.itjing.transaction.service.UserService;
import com.itjing.transaction.util.TransactionalUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionStatus;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicReference;

/**
* @description:
* @author: lijing
* @date: 2023-01-29 21:22
*/
@Service
@Slf4j
public class UserServiceImpl implements UserService {

@Autowired
private UserMapper userMapper;

@Resource
private ThreadPoolTaskExecutor taskExecutor;

@Autowired
private TransactionalUtil transactionalUtil;

@Override
public void batchInsert1(List<User> userList, int threads) {
if (userList == null || userList.isEmpty()) {
return;
}
int size = userList.size();
int subSize = size / threads;
List<List<User>> partitions = Lists.partition(userList, subSize);
CyclicBarrier cyclicBarrier = new CyclicBarrier(partitions.size());
AtomicReference<Boolean> rollback = new AtomicReference<>(false);
for (int i = 0; i < partitions.size(); i++) {
List<User> importList = partitions.get(i);
int finalI = i;
taskExecutor.execute(() -> {
TransactionStatus transaction = transactionalUtil.begin();
try {
if (userMapper.batchInsert(importList) < 1) {
throw new RuntimeException("插入数据失败");
}
} catch (Exception e) {
// 如果当前线程异常 则设置回滚标志
rollback.set(true);
log.error("插入数据失败");
}
// 等待所有线程的事务结果
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
if (rollback.get()) {
transactionalUtil.rollback(transaction);
log.error("线程出现异常,线程{}事务回滚", Thread.currentThread().getName());
return;
}
transactionalUtil.commit(transaction);
log.info("线程正常执行,线程{}事务提交", Thread.currentThread().getName());
});
}
}

private volatile boolean IS_OK = true;

public void batchInsert2(List<User> userList, int threads) {
if (userList == null || userList.isEmpty()) {
return;
}
int size = userList.size();
int subSize = size / threads;
List<List<User>> partitions = Lists.partition(userList, subSize);
CountDownLatch childMonitor = new CountDownLatch(partitions.size());
// 主线程收集子线程运行的最终结果
List<Boolean> childResponse = Collections.synchronizedList(new ArrayList<>());
// 子线程在该对象上等待主线程的通知
CountDownLatch mainMonitor = new CountDownLatch(1);
for (int i = 0; i < partitions.size(); i++) {
List<User> importList = partitions.get(i);
taskExecutor.execute(() -> {
// 开启事务
TransactionStatus transaction = transactionalUtil.begin();
try {
if (userMapper.batchInsert(importList) < 1) {
throw new RuntimeException("插入数据失败");
}
childResponse.add(Boolean.TRUE);
childMonitor.countDown();
log.info("线程{}正常执行完成,等待其他线程执行结果", Thread.currentThread().getName());
mainMonitor.await();
// 如果其他线程有失败的,则回滚
if (IS_OK) {
// 事务提交
log.info("线程正常执行,线程{}事务提交", Thread.currentThread().getName());
transactionalUtil.commit(transaction);
} else {
// 事务回滚
log.error("线程出现异常,线程{}事务回滚", Thread.currentThread().getName());
transactionalUtil.rollback(transaction);
}
} catch (Exception e) {
childResponse.add(Boolean.FALSE);
childMonitor.countDown();
log.error("线程{}出现异常,开始事务回滚", Thread.currentThread().getName());
transactionalUtil.rollback(transaction);
}
});
}
try {
// 主线程等待所有子线程执行结果
childMonitor.await();
for (Boolean resp : childResponse) {
if (!resp) {
// 如果有一个子线程执行失败了,改变标志位,让所有线程回滚
log.error("有线程执行失败,标志位置为false");
IS_OK = false;
break;
}
}
// 主线程获取结果成功,让子线程根据主线程结果执行(提交或回滚)
mainMonitor.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.itjing.transaction;

import com.itjing.transaction.entity.User;
import com.itjing.transaction.service.UserService;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.List;

@SpringBootTest
class MultiThreadTransactionApplicationTests {

@Autowired
private UserService userService;

@Test
void contextLoads() {
List<User> userList = Lists.newArrayList();
for (int i = 0; i < 2000; i++) {
userList.add(new User("lijing" + i, 18, "2427259171@qq.com", "17798832262"));
}
userService.batchInsert2(userList, 5);
}

}

最后

多线程事务控制代码自己写玩玩就行了,大部分场景不推荐。欢迎关注本人原创公众号:程序员阿晶

本文标题:多线程事务控制

文章作者:LiJing

发布时间:2023年01月29日 - 20:54:37

最后更新:2023年06月03日 - 09:57:16

原始链接:https://blog-next.xiaojingge.com/posts/441442635.html

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

-------------------本文结束 感谢您的阅读-------------------