男人的盟誓是女人的陷阱——莎士比亚

使用CompletableFuture进行异步任务编排时,可能会有事务的支持需求,我们这里可以使用之前我写的手动回滚、提交事务进行处理

代码案例如下,所用框架stream-query

首先是不加事务的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package io.github.vampireachao.stream.plugin.mybatisplus;

import io.github.vampireachao.stream.plugin.mybatisplus.pojo.po.UserInfo;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;

/**
* AsyncTest
*
* @author VampireAchao
* @since 2022/12/6
*/
@SpringBootTest
class AsyncTest {

@Test
@Transactional
void test(@Autowired ThreadPoolTaskExecutor executor) {
UserInfo userInfo = new UserInfo();
userInfo.setName("ruben");
Database.save(userInfo);
CompletableFuture.runAsync(() -> {
Assertions.assertNotNull(userInfo.getId(), "id is null");
Assertions.assertNotNull(Database.getById(userInfo.getId(), userInfo.getClass()), "user is null");
}, executor).join();
}
}

此处会抛出assertNotNull,提示user is null

因为外部事务未提交,但内部默认事务没有设置为读未提交

修改后事务支持的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package io.github.vampireachao.stream.plugin.mybatisplus;

import io.github.vampireachao.stream.plugin.mybatisplus.pojo.po.UserInfo;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.TransactionTemplate;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;

/**
* AsyncTest
*
* @author VampireAchao
* @since 2022/12/6
*/
@SpringBootTest
class AsyncTest {

@Test
@Transactional
void test(
@Autowired TransactionTemplate transactionTemplate,
@Autowired ThreadPoolTaskExecutor executor) {
UserInfo userInfo = new UserInfo();
userInfo.setName("ruben");
Database.save(userInfo);
CompletableFuture.runAsync(() -> {
PlatformTransactionManager transactionManager = Objects.requireNonNull(transactionTemplate.getTransactionManager());
DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
transactionDefinition.setIsolationLevel(TransactionDefinition.ISOLATION_READ_UNCOMMITTED);
TransactionStatus transactionStatus = transactionManager.getTransaction(transactionDefinition);
try {
Assertions.assertNotNull(userInfo.getId(), "id is null");
Assertions.assertNotNull(Database.getById(userInfo.getId(), userInfo.getClass()), "user is null");
transactionManager.commit(transactionStatus);
} catch (Throwable e) {
transactionManager.rollback(transactionStatus);
throw e;
}
}, executor).join();
}
}

成功执行