针对ThreadLocal的并发安全线程池

打印 上一主题 下一主题

主题 1762|帖子 1762|积分 5286

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
GIthub链接:
https://github.com/CodeDeveloperC/SafeThreadPoolExecutorForThreadLocal

  • 题目
当ThreadLocal和InheritableThreadLocal和Java原生的线程池使用时,就会出现并发安全题目,比方:
下面这段代码在线程池中获取的数据都为空
  1. public static void test2() throws Exception {
  2.     ThreadLocal<String> context = new ThreadLocal<>();
  3.     ThreadLocal<String> contextI = new InheritableThreadLocal<>();
  4.     ThreadLocal<String> contextT = new TransmittableThreadLocal<>();
  5.     ExecutorService executor = Executors.newFixedThreadPool(1);
  6.     long start = System.currentTimeMillis();
  7.     for (int i = 0; i < 10; i++) {
  8.         // context.set("ThreadLocal-from-main:" + i);
  9.         contextI.set("ThreadLocal-from-main:" + i);
  10.         // contextT.set("ThreadLocal-from-main:" + i);
  11.         Future<Integer> submit = executor.submit(() -> {
  12.             System.out.println("contextI = " + contextT.get());
  13.             return 1;
  14.         });
  15.         submit.get();
  16.     }
  17.     long end = System.currentTimeMillis();
  18.     System.out.println("cost:" + (end - start));
  19.     executor.shutdown();
  20. }
复制代码
执行结果如下:
  1. contextI = null
  2. contextI = null
  3. contextI = null
  4. contextI = null
  5. contextI = null
  6. contextI = null
  7. contextI = null
  8. contextI = null
  9. contextI = null
  10. contextI = null
  11. cost:21
复制代码

  • 业内解法
最常见的就是阿里的TransmittableThreadLocal,其使用方式是将ThreadLocal定义成TransmittableThreadLocal,同时将线程池包装成ExecutorServiceTtlWrapper,两者缺一不可。
比方,下面这段代码可以保证并发安全
  1. public static void test3() throws Exception {
  2.     ThreadLocal<String> context = new ThreadLocal<>();
  3.     ThreadLocal<String> contextI = new InheritableThreadLocal<>();
  4.     ThreadLocal<String> contextT = new TransmittableThreadLocal<>();
  5.     ExecutorService executor = Executors.newFixedThreadPool(1);
  6.     executor = TtlExecutors.getTtlExecutorService(executor);
  7.     long start = System.currentTimeMillis();
  8.     for (int i = 0; i < 10; i++) {
  9.         // context.set("ThreadLocal-from-main:" + i);
  10.         // contextI.set("ThreadLocal-from-main:" + i);
  11.         contextT.set("ThreadLocal-from-main:" + i);
  12.         Future<Integer> submit = executor.submit(() -> {
  13.             System.out.println("context value = " + contextT.get());
  14.             return 1;
  15.         });
  16.         submit.get();
  17.     }
  18.     long end = System.currentTimeMillis();
  19.     System.out.println("cost:" + (end - start));
  20.     executor.shutdown();
  21. }
复制代码
执行结果如下:
  1. context value = ThreadLocal-from-main:0
  2. context value = ThreadLocal-from-main:1
  3. context value = ThreadLocal-from-main:2
  4. context value = ThreadLocal-from-main:3
  5. context value = ThreadLocal-from-main:4
  6. context value = ThreadLocal-from-main:5
  7. context value = ThreadLocal-from-main:6
  8. context value = ThreadLocal-from-main:7
  9. context value = ThreadLocal-from-main:8
  10. context value = ThreadLocal-from-main:9
  11. cost:26
复制代码
而在我们项目中很多脚手架,以及一些优秀的开源框架比方:Spring、MyBatis等,底层使用的都是JDK原生的ThreadLocal,此时无法修改其定义,照旧会造成并发安全题目。
其核心头脑是封装了自定义的TTLRunable对象,在执行任务之前,先复制当火线程存储在holder中的值,holder中的值是在用户set的时候保存进去的,类似于Thread线程级别的map。然后在线程池中线程执行具体任务前,先将当火线程Holder中的数据备份,然后用之前复制的holder中的数据替换掉当火线程的Holder数据。执行完再通过备份的数据恢复当火线程的holder数据。
如许就可以在线程池中共享数据。解决线程池调用无法共享数据的题目。

  • 我们的解法
为相识决这个题目,我们自定义了SafeThreadPoolExecutorForThreadLocal线程池,确保针对ThreadLocal和InheritableThreadLocal都能保证并发安全。源码如下:
  1. import com.alibaba.ttl.TransmittableThreadLocal;
  2. import com.alibaba.ttl.threadpool.TtlExecutors;
  3. import javax.validation.constraints.NotNull;
  4. import java.lang.invoke.MethodHandle;
  5. import java.lang.invoke.MethodHandles;
  6. import java.lang.reflect.Field;
  7. import java.nio.file.Files;
  8. import java.nio.file.Paths;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. import java.util.concurrent.*;
  12. public class SafeThreadPoolExecutorForThreadLocal extends ThreadPoolExecutor {
  13.     private static final MethodHandle threadLocalsGetter;
  14.     private static final MethodHandle threadLocalsSetter;
  15.     private static final MethodHandle inheritableThreadLocalsGetter;
  16.     private static final MethodHandle inheritableThreadLocalsSetter;
  17.     static {
  18.         try {
  19.             Field threadLocalsField = Thread.class.getDeclaredField("threadLocals");
  20.             Field inheritableThreadLocalsField = Thread.class.getDeclaredField("inheritableThreadLocals");
  21.             threadLocalsField.setAccessible(true);
  22.             inheritableThreadLocalsField.setAccessible(true);
  23.             MethodHandles.Lookup lookup = MethodHandles.lookup();
  24.             threadLocalsGetter = lookup.unreflectGetter(threadLocalsField);
  25.             threadLocalsSetter = lookup.unreflectSetter(threadLocalsField);
  26.             inheritableThreadLocalsGetter = lookup.unreflectGetter(inheritableThreadLocalsField);
  27.             inheritableThreadLocalsSetter = lookup.unreflectSetter(inheritableThreadLocalsField);
  28.         } catch (Exception e) {
  29.             throw new RuntimeException("初始化 MethodHandle 失败", e);
  30.         }
  31.     }
  32.     public SafeThreadPoolExecutorForThreadLocal(int corePoolSize, int maxPoolSize, long keepAliveTime,
  33.                                                 TimeUnit unit, BlockingQueue<Runnable> workQueue) {
  34.         super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
  35.     }
  36.     @Override
  37.     public void execute(@NotNull Runnable command) {
  38.         try {
  39.             final Object parentThreadLocals = threadLocalsGetter.invoke(Thread.currentThread());
  40.             final Object parentInheritableThreadLocals = inheritableThreadLocalsGetter.invoke(Thread.currentThread());
  41.             Runnable wrapped = () -> {
  42.                 Thread t = Thread.currentThread();
  43.                 try {
  44.                     threadLocalsSetter.invoke(t, parentThreadLocals);
  45.                     inheritableThreadLocalsSetter.invoke(t, parentInheritableThreadLocals);
  46.                     command.run();
  47.                 } catch (Throwable e) {
  48.                     e.printStackTrace();
  49.                 } finally {
  50.                     // 任务执行后,清除 ThreadLocal 避免污染
  51.                     try {
  52.                         threadLocalsSetter.invoke(t, null);
  53.                         inheritableThreadLocalsSetter.invoke(t, null);
  54.                     } catch (Throwable e) {
  55.                         e.printStackTrace();
  56.                     }
  57.                 }
  58.             };
  59.             super.execute(wrapped);
  60.         } catch (Throwable e) {
  61.             throw new RuntimeException("任务包装失败", e);
  62.         }
  63.     }
  64.     public static SafeThreadPoolExecutorForThreadLocal newFixedThreadPool(int nThreads) {
  65.         return new SafeThreadPoolExecutorForThreadLocal(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
  66.     }
  67.     public <T> Future<T> submit(Callable<T> task) {
  68.         RunnableFuture<T> future = newTaskFor(task);
  69.         execute(future);
  70.         return future;
  71.     }
  72.     public Future<?> submit(Runnable task) {
  73.         RunnableFuture<?> future = newTaskFor(task, null);
  74.         execute(future);
  75.         return future;
  76.     }
  77.     public <T> Future<T> submit(Runnable task, T result) {
  78.         RunnableFuture<T> future = newTaskFor(task, result);
  79.         execute(future);
  80.         return future;
  81.     }
  82.     public static void main(String[] args) throws Exception {
  83.         // test1();
  84.         // test2();
  85.         test3();
  86.     }
  87.     public static void test1() throws Exception {
  88.         ThreadLocal<String> context = new ThreadLocal<>();
  89.         ThreadLocal<String> contextI = new InheritableThreadLocal<>();
  90.         ThreadLocal<String> contextT = new TransmittableThreadLocal<>();
  91.         ExecutorService executor = SafeThreadPoolExecutorForThreadLocal.newFixedThreadPool(1);
  92.         byte[] bytes = Files.readAllBytes(Paths.get("/Users/cheteng/chen/project/mycode/idea/mylearning/leetcode/src/main/java/com/example/leetcode/utils/SafeThreadPoolExecutorForThreadLocal.java"));
  93.         String content = new String(bytes);
  94.         for (int j = 0; j < 10000; j++) {
  95.             ThreadLocal<String> context2 = new ThreadLocal<>();
  96.             context2.set("ThreadLocal-from-main-inner:" + j);
  97.             ThreadLocal<Map<String, Object>> context3 = new ThreadLocal<>();
  98.             Map<String, Object> data = new HashMap<>();
  99.             data.put("field1", "123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123");
  100.             data.put("field2", "123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123123w");
  101.             data.put("field3", content);
  102.             context3.set(data);
  103.         }
  104.         long start = System.currentTimeMillis();
  105.         for (int i = 0; i < 10000; i++) {
  106.             // context.set("ThreadLocal-from-main:" + i);
  107.             // contextI.set("ThreadLocal-from-main:" + i);
  108.             contextT.set("ThreadLocal-from-main:" + i);
  109.             Future<Integer> submit = executor.submit(() -> {
  110.                 System.out.println("contextI = " + contextT.get());
  111.                 return 1;
  112.             });
  113.             submit.get();
  114.         }
  115.         long end = System.currentTimeMillis();
  116.         System.out.println("cost:" + (end - start));
  117.         executor.shutdown();
  118.     }
  119.     public static void test2() throws Exception {
  120.         ThreadLocal<String> context = new ThreadLocal<>();
  121.         ThreadLocal<String> contextI = new InheritableThreadLocal<>();
  122.         ThreadLocal<String> contextT = new TransmittableThreadLocal<>();
  123.         ExecutorService executor = Executors.newFixedThreadPool(1);
  124.         long start = System.currentTimeMillis();
  125.         for (int i = 0; i < 10; i++) {
  126.             // context.set("ThreadLocal-from-main:" + i);
  127.             contextI.set("ThreadLocal-from-main:" + i);
  128.             // contextT.set("ThreadLocal-from-main:" + i);
  129.             Future<Integer> submit = executor.submit(() -> {
  130.                 System.out.println("contextI = " + contextT.get());
  131.                 return 1;
  132.             });
  133.             submit.get();
  134.         }
  135.         long end = System.currentTimeMillis();
  136.         System.out.println("cost:" + (end - start));
  137.         executor.shutdown();
  138.     }
  139.     public static void test3() throws Exception {
  140.         ThreadLocal<String> context = new ThreadLocal<>();
  141.         ThreadLocal<String> contextI = new InheritableThreadLocal<>();
  142.         ThreadLocal<String> contextT = new TransmittableThreadLocal<>();
  143.         ExecutorService executor = Executors.newFixedThreadPool(1);
  144.         executor = TtlExecutors.getTtlExecutorService(executor);
  145.         long start = System.currentTimeMillis();
  146.         for (int i = 0; i < 10; i++) {
  147.             // context.set("ThreadLocal-from-main:" + i);
  148.             // contextI.set("ThreadLocal-from-main:" + i);
  149.             contextT.set("ThreadLocal-from-main:" + i);
  150.             Future<Integer> submit = executor.submit(() -> {
  151.                 System.out.println("context value = " + contextT.get());
  152.                 return 1;
  153.             });
  154.             submit.get();
  155.         }
  156.         long end = System.currentTimeMillis();
  157.         System.out.println("cost:" + (end - start));
  158.         executor.shutdown();
  159.     }
  160. }
复制代码
其核心头脑是在执行任务之前,先复制当火线程threadLocals和inheritableThreadLocals中的数据存储在暂时变量中,然后在线程池中线程执行具体任务前,用暂时变量数据覆盖当火线程的threadLocals和inheritableThreadLocals变量。执行完将threadLocals和inheritableThreadLocals数据全部删除(由于我们要确保线程池是无状态的,以是没有须要恢复,这里和阿里的TTL不同)。
如许就可以在线程池中共享数据。解决线程池调用无法共享数据的题目。

  • 性能对比
性能对比代码如下:
  1. public static void compare() throws Exception {
  2.     for (int j = 0; j < 10000; j++) {
  3.         ThreadLocal<String> context2 = new TransmittableThreadLocal<>();
  4.         context2.set("ThreadLocal-from-main-inner:" + j);
  5.         ThreadLocal<Map<String, Object>> context3 = new TransmittableThreadLocal<>();
  6.         Map<String, Object> data = new HashMap<>();
  7.         data.put("field1", new Object());
  8.         data.put("field2", new Object());
  9.         context3.set(data);
  10.     }
  11.     int rounds = 10000;
  12.     for (int i = 0; i < 10; i++) {
  13.         System.out.println("-------------" + i + "------------");
  14.         compareSafe(rounds);
  15.         compareTTL(rounds);
  16.     }
  17. }
  18. public static void compareTTL(int rounds) throws Exception {
  19.     ThreadLocal<String> contextT = new TransmittableThreadLocal<>();
  20.     ExecutorService executor = Executors.newFixedThreadPool(1);
  21.     executor = TtlExecutors.getTtlExecutorService(executor);
  22.     long start = System.currentTimeMillis();
  23.     for (int i = 0; i < rounds; i++) {
  24.         contextT.set("ThreadLocal-from-main:" + i);
  25.         Future<String> submit = executor.submit(() -> {
  26.             // System.out.println("context value = " + contextT.get());
  27.             return contextT.get();
  28.         });
  29.         submit.get();
  30.     }
  31.     long end = System.currentTimeMillis();
  32.     System.out.println("compareTTL cost:" + (end - start));
  33. }
  34. public static void compareSafe(int rounds) throws Exception {
  35.     ThreadLocal<String> context = new ThreadLocal<>();
  36.     ExecutorService executor = SafeThreadPoolExecutorForThreadLocal.newFixedThreadPool(1);
  37.     long start = System.currentTimeMillis();
  38.     for (int i = 0; i < rounds; i++) {
  39.         context.set("ThreadLocal-from-main:" + i);
  40.         Future<String> submit = executor.submit(() -> {
  41.             // System.out.println("contextI = " + context.get());
  42.             return context.get();
  43.         });
  44.         submit.get();
  45.     }
  46.     long end = System.currentTimeMillis();
  47.     System.out.println("compareSafe cost:" + (end - start));
  48. }
复制代码
10次运行对比的运行结果如下:
  1. -------------0------------
  2. compareSafe cost:55
  3. compareTTL cost:2541
  4. -------------1------------
  5. compareSafe cost:12
  6. compareTTL cost:2290
  7. -------------2------------
  8. compareSafe cost:7
  9. compareTTL cost:2242
  10. -------------3------------
  11. compareSafe cost:6
  12. compareTTL cost:2224
  13. -------------4------------
  14. compareSafe cost:5
  15. compareTTL cost:2254
  16. -------------5------------
  17. compareSafe cost:6
  18. compareTTL cost:2461
  19. -------------6------------
  20. compareSafe cost:7
  21. compareTTL cost:2579
  22. -------------7------------
  23. compareSafe cost:5
  24. compareTTL cost:2375
  25. -------------8------------
  26. compareSafe cost:5
  27. compareTTL cost:2314
  28. -------------9------------
  29. compareSafe cost:5
  30. compareTTL cost:2265
复制代码
SafeThreadPoolExecutorForThreadLocal 线程池的性能在ThreadLocal 越多的环境下,性能越高,在ThreadLocal量级为10000的场景下,性能比阿里的TransmittableThreadLocal快100多倍而且随着运行时间的累加,SafeThreadPoolExecutorForThreadLocal性能提升越来越高。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

万有斥力

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表