要有最朴素的梦想,即使明天天寒地冻,路远马亡。——海子
前段时间遇到的TTL(TransmittableThreadLocal)
在异步编程中的上下文丢失问题,我是采用了直接更换线程池的方式
但今天抽空看了下官方文档,发现了:
所有TTL值的抓取、回放和恢复方法(即CRR操作)
CRR
:capture(快照)
、replay(回放)
、restore(备份)
自己简单写了个测试用例,用于在CompletableFuture
和并行流场景下解决ThreadLocal
的上下文丢失问题
大伙一定要复制到本地跑一下,需要的GAV
是这个:
1 2 3 4 5
| <dependency> <groupId>com.alibaba</groupId> <artifactId>transmittable-thread-local</artifactId> <version>2.12.4</version> </dependency>
|
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
| import com.alibaba.ttl.TransmittableThreadLocal; import lombok.SneakyThrows; import org.junit.Ignore; import org.junit.jupiter.api.Assertions; import org.junit.Test;
import java.util.concurrent.CompletableFuture; import java.util.stream.Stream;
public class TtlTest {
@Test @SneakyThrows public void testCompletableFuture() { ThreadLocal<Integer> threadLocal = new InheritableThreadLocal<>(); Stream.iterate(0, i -> ++i).limit(2).forEach(i -> { threadLocal.set(i); CompletableFuture.runAsync(() -> Assertions.assertEquals(i, threadLocal.get())).join(); threadLocal.remove(); }); }
@Test @SneakyThrows public void testCompletableFutureReplayRestore() { ThreadLocal<Integer> threadLocal = new TransmittableThreadLocal<>(); Stream.iterate(0, i -> ++i).limit(2).forEach(i -> { threadLocal.set(i); final Object captured = TransmittableThreadLocal.Transmitter.capture(); CompletableFuture.runAsync(() -> { final Object backup = TransmittableThreadLocal.Transmitter.replay(captured); try { Assertions.assertEquals(i, threadLocal.get()); } finally { TransmittableThreadLocal.Transmitter.restore(backup); } }).join(); threadLocal.remove(); }); }
@Test @SneakyThrows public void testCompletableFutureTransmitter() { ThreadLocal<Integer> threadLocal = new TransmittableThreadLocal<>(); Stream.iterate(0, i -> ++i).limit(2).forEach(i -> { threadLocal.set(i); final Object captured = TransmittableThreadLocal.Transmitter.capture(); CompletableFuture.runAsync(() -> TransmittableThreadLocal.Transmitter.runSupplierWithCaptured(captured, () -> { Assertions.assertEquals(i, threadLocal.get()); return null; })).join(); threadLocal.remove(); }); }
@Test public void testParallelStream() { ThreadLocal<Integer> threadLocal = new TransmittableThreadLocal<>(); Stream.iterate(0, i -> ++i).limit(1).peek(threadLocal::set) .flatMap(item -> Stream.of(item, item).parallel().peek(i -> Assertions.assertEquals(i, threadLocal.get()))) .peek(t -> threadLocal.remove()).forEach(System.out::println); }
@Test public void testParallelStreamReplayRestore() { ThreadLocal<Integer> threadLocal = new TransmittableThreadLocal<>(); Stream.iterate(0, i -> ++i).limit(1).peek(threadLocal::set) .flatMap(item -> { final Object captured = TransmittableThreadLocal.Transmitter.capture(); return Stream.of(item, item).parallel().peek(i -> { final Object backup = TransmittableThreadLocal.Transmitter.replay(captured); try { Assertions.assertEquals(i, threadLocal.get()); } finally { TransmittableThreadLocal.Transmitter.restore(backup); } }); }) .peek(t -> threadLocal.remove()).forEach(System.out::println); }
@Test public void testParallelStreamTransmitter() { ThreadLocal<Integer> threadLocal = new TransmittableThreadLocal<>(); Stream.iterate(0, i -> ++i).limit(1).peek(threadLocal::set) .flatMap(item -> { final Object captured = TransmittableThreadLocal.Transmitter.capture(); return Stream.of(item, item).parallel().peek(i -> TransmittableThreadLocal.Transmitter.runSupplierWithCaptured(captured, () -> { Assertions.assertEquals(i, threadLocal.get()); return null; })); }) .peek(t -> threadLocal.remove()).forEach(System.out::println); }
}
|
最后测试结果: