背景

Springboot项目,有个需求,需要提供接口,接口调用方每一次调用时,会保存或者更新大量数据,接口需要满足以下要求:

  • 数据保存要保证数据原子性:要么全部保存成功,要么全部不保存。
  • 保证接口性能。

实践发现,即使使用批量保存,接口耗时也久,所以需要开启多线程来保存。现在的问题是,在开启多线程保存的情况下,如何保证数据的原子性。

使用声明式事务出现的问题

具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Transactional(rollbackFor = Exception.class)
public boolean saveUser() {
Long userId = 1L;
CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
User user = new User();
user.setId(userId);
user.setName("wls");
user.setEmail("1396523950@qq.com");
save(user);
}, threadPoolExecutor);
CompletableFuture<Void> f2 = CompletableFuture.runAsync(() -> {

UserLog userLog = new UserLog();
userLog.setUserId(userId);
userLogService.save(userLog);
int i = 1 / 0;
}, threadPoolExecutor);
//等待所有执行完
CompletableFuture.allOf(f1, f2).join();
return true;
}

此时可以发现 user保存成功,userLog未保存成功,说明整个方法的事务并未保证原子性。

解决思路

  • 开启多线程,每个线程都是使用独立的DB连接。否则由于数据库是串行阻塞操作,最终还是会变成排队操作数据库。
  • 依赖spring事务异常回滚机制。
  • 有个统一的标识来标识“是否有线程操作失败”。
  • 线程如果出现异常:先捕获异常,将标识设置为失败,然后继续抛出异常。
  • 线程如果没有异常,在执行的最后,判断标识是失败,也就是“有其他线程有执行失败”,就自定义抛出异常来回滚。
  • 通过锁来保证:所有的线程都操作完之后,一起判断标识是否成功;确保不会出现“还有线程的业务未执行完成,其他线程就已经结束工作”。

最终代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@Slf4j
@Service
public class UserService extends ServiceImpl<UserMapper, User> {

@Resource
private ThreadPoolExecutor threadPoolExecutor;


@Resource
private DataSourceTransactionManager dataSourceTransactionManager;

@Resource
private UserLogService userLogService;



public boolean saveUser() {
CyclicBarrier cb = new CyclicBarrier(2);
AtomicBoolean flag = new AtomicBoolean(false);
Long userId = 1L;
CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();
TransactionStatus transaction = dataSourceTransactionManager.getTransaction(defaultTransactionDefinition);
try {
User user = new User();
user.setId(userId);
user.setName("wls");
user.setEmail("1396523950@qq.com");
save(user);
// 等待所有线程的事务结果
cb.await();
log.info("f1: "+flag.get());
// 如果标志需要回滚,则回滚
if (flag.get()) {
dataSourceTransactionManager.rollback(transaction);
return;
}
dataSourceTransactionManager.commit(transaction);
} catch (Exception e) {
flag.set(true);
dataSourceTransactionManager.rollback(transaction);
try {
cb.await();
} catch (InterruptedException | BrokenBarrierException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}

}, threadPoolExecutor);
CompletableFuture<Void> f2 = CompletableFuture.runAsync(() -> {
DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();
TransactionStatus transaction = dataSourceTransactionManager.getTransaction(defaultTransactionDefinition);
try {
UserLog userLog = new UserLog();
userLog.setUserId(userId);
userLogService.save(userLog);
int i = 1 / 0;
// 等待所有线程的事务结果
cb.await();
log.info("f2: "+flag.get());
// 如果标志需要回滚,则回滚
if (flag.get()) {
dataSourceTransactionManager.rollback(transaction);
return;
}
dataSourceTransactionManager.commit(transaction);
} catch (Exception e) {
flag.set(true);
dataSourceTransactionManager.rollback(transaction);
try {
cb.await();
} catch (InterruptedException | BrokenBarrierException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
}, threadPoolExecutor);
CompletableFuture.allOf(f1, f2).join();
return true;
}
}

注意:

使用CyclicBarrier 和手动事务时需要控制任务的超时时间。