阿超
>
CompletableFuture事务处理
男人的盟誓是女人的陷阱——莎士比亚
使用CompletableFuture
进行异步任务编排时,可能会有事务的支持需求,我们这里可以使用之前我写的手动回滚、提交事务进行处理
代码案例如下,所用框架stream-query:
首先是不加事务的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package io.github.vampireachao.stream.plugin.mybatisplus;
import io.github.vampireachao.stream.plugin.mybatisplus.pojo.po.UserInfo; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.Objects; import java.util.concurrent.CompletableFuture;
@SpringBootTest class AsyncTest {
@Test @Transactional void test(@Autowired ThreadPoolTaskExecutor executor) { UserInfo userInfo = new UserInfo(); userInfo.setName("ruben"); Database.save(userInfo); CompletableFuture.runAsync(() -> { Assertions.assertNotNull(userInfo.getId(), "id is null"); Assertions.assertNotNull(Database.getById(userInfo.getId(), userInfo.getClass()), "user is null"); }, executor).join(); } }
|
此处会抛出assertNotNull
,提示user is null
因为外部事务未提交,但内部默认事务没有设置为读未提交
修改后事务支持的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| package io.github.vampireachao.stream.plugin.mybatisplus;
import io.github.vampireachao.stream.plugin.mybatisplus.pojo.po.UserInfo; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.DefaultTransactionDefinition; import org.springframework.transaction.support.TransactionTemplate;
import java.util.Objects; import java.util.concurrent.CompletableFuture;
@SpringBootTest class AsyncTest {
@Test @Transactional void test( @Autowired TransactionTemplate transactionTemplate, @Autowired ThreadPoolTaskExecutor executor) { UserInfo userInfo = new UserInfo(); userInfo.setName("ruben"); Database.save(userInfo); CompletableFuture.runAsync(() -> { PlatformTransactionManager transactionManager = Objects.requireNonNull(transactionTemplate.getTransactionManager()); DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition(); transactionDefinition.setIsolationLevel(TransactionDefinition.ISOLATION_READ_UNCOMMITTED); TransactionStatus transactionStatus = transactionManager.getTransaction(transactionDefinition); try { Assertions.assertNotNull(userInfo.getId(), "id is null"); Assertions.assertNotNull(Database.getById(userInfo.getId(), userInfo.getClass()), "user is null"); transactionManager.commit(transactionStatus); } catch (Throwable e) { transactionManager.rollback(transactionStatus); throw e; } }, executor).join(); } }
|
成功执行