ToB企服应用市场:ToB评测及商务社交产业平台

标题: 线程本地存储 ThreadLocal [打印本页]

作者: 自由的羽毛    时间: 2022-9-16 17:15
标题: 线程本地存储 ThreadLocal
线程本地存储 · 语雀 (yuque.com)
线程本地存储提供了线程内存储变量的能力,这些变量是线程私有的。
线程本地存储一般用在跨类、跨方法的传递一些值。
线程本地存储也是解决特定场景下线程安全问题的思路之一(每个线程都访问本线程自己的变量)。
Java 语言提供了线程本地存储,ThreadLocal 类。

ThreadLocal 的使用及注意事项
  1. public class TestClass {
  2.     public static ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
  3.     public static void main(String[] args) {
  4.         // 设置值
  5.         threadLocal.set(1);
  6.         test();
  7.     }
  8.     private static void test() {
  9.         // 获取值,返回 1
  10.         threadLocal.get();
  11.         // 防止内存泄漏
  12.         threadLocal.remove();
  13.     }
  14. }
复制代码
static 修饰的变量是在类在加载时就分配地址了,在类卸载才会被回收,因此使用 static 的 ThreadLocal,延长了 ThreadLocal 的生命周期,可能会导致内存泄漏。
分配使用了 ThreadLocal,又不调用 get()、set()、remove() 方法,并且当前线程迟迟不结束的话,那么就会导致内存泄漏。
ThreadLocal 的 set() 过程


每一个 Thread 实例对象中,都会有一个 ThreadLocalMap 实例对象;
ThreadLocalMap 是一个 Map 类型,底层数据结构是 Entry 数组;
一个 Entry 对象中又包含一个 key 和 一个 value
  1. static class Entry extends WeakReference<ThreadLocal<?>> {
  2.     /**
  3.      * The value associated with this ThreadLocal.
  4.      */
  5.     Object value;
  6.     Entry(ThreadLocal<?> k, Object v) {
  7.         super(k);
  8.         value = v;
  9.     }
  10. }
复制代码
ThreadLocalMap 的哈希冲突

ThreadLocalMap 里处理 hash 冲突的机制不是像 HashMap 一样使用链表(拉链法)。
它采用的是另一种经典的处理方式,沿着冲突的索引向后查找空闲的位置(开放寻址法中的线性探测法)。
下面我们通过 ThreadLocal 的 set()、get() 方法源码,分析 ThreadLocalMap 的哈希冲突解决方案。
  1. public void set(T value) {
  2.     // 获取当前线程
  3.     Thread t = Thread.currentThread();
  4.     // 获取当前线程的 ThreadLocalMap
  5.     ThreadLocal.ThreadLocalMap map = getMap(t);
  6.     if (map != null) {
  7.         // 将存储的值设置到 ThreadLocalMap
  8.         map.set(this, value);
  9.     } else {
  10.         // 首次设置存储的值,需要创建 ThreadLocalMap
  11.         createMap(t, value);
  12.     }
  13. }
复制代码
上面源码我们注意几点:
  1. /**
  2. * Remove the entry for key.
  3. */
  4. private void remove(ThreadLocal<?> key) {
  5.     ThreadLocal.ThreadLocalMap.Entry[] tab = table;
  6.     int len = tab.length;
  7.     int i = key.threadLocalHashCode & (len-1);
  8.     for (ThreadLocal.ThreadLocalMap.Entry e = tab[i];
  9.          e != null;
  10.          e = tab[i = nextIndex(i, len)]) {
  11.         if (e.get() == key) {
  12.             e.clear();
  13.             expungeStaleEntry(i);
  14.             return;
  15.         }
  16.     }
  17. }
复制代码
ThreadLocalMap 的扩容策略
  1. // 入参 staleSlot 是当前被删除对象在 Entry 数组中的位置
  2. private int expungeStaleEntry(int staleSlot) {
  3.     ThreadLocal.ThreadLocalMap.Entry[] tab = table;
  4.     int len = tab.length;
  5.     // 删除 staleSlot 位置的 value,key 已经在进入该方法前删除了 / 已经被回收
  6.     // expunge entry at staleSlot
  7.     tab[staleSlot].value = null;
  8.     // 将 Entry 对象赋值为 null,断开 Entry 实例对象的强引用
  9.     tab[staleSlot] = null;
  10.     // Entry 数组大小 - 1
  11.     size--;
  12.     // Rehash until we encounter null
  13.     ThreadLocal.ThreadLocalMap.Entry e;
  14.     int i;
  15.     // for 循环的作用是从当前位置开始向后循环处理 Entry 中的 ThreadLocal 对象
  16.     // 将从指定位置开始,遇到 null 之前的所有 ThreadLocal 对象 rehash
  17.     for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
  18.         // 获取 ThreadLocal 的虚引用引用的实例对象
  19.         ThreadLocal<?> k = e.get();
  20.         if (k == null) {
  21.             // 虚引用引用的实例对象为 null,说明 ThreadLocal 已经被回收了
  22.             // 则删除 value 和 Entry,让虚拟机能够回收
  23.             e.value = null;
  24.             tab[i] = null;
  25.             size--;
  26.         } else {
  27.             // rehash
  28.             int h = k.threadLocalHashCode & (len - 1);
  29.             if (h != i) {
  30.                 tab[i] = null;
  31.                 // 从当前 h 的位置向后找,找到一个 null 的位置将 e 填入
  32.                 // Unlike Knuth 6.4 Algorithm R, we must scan until
  33.                 // null because multiple entries could have been stale.
  34.                 while (tab[h] != null) {
  35.                     h = nextIndex(h, len);
  36.                 }
  37.                 tab[h] = e;
  38.             }
  39.         }
  40.     }
  41.     return i;
  42. }
复制代码
由上面源码我们可以看出,ThreadLocalMap 扩容的时机是,ThreadLocalMap 中的 ThreadLocal 的个数超过阈值,并且 cleanSomeSlots() 返回 false(启发式清理),然后尝试清理所有 key 为 null 的 Entry,清理完之后 ThreadLocal 的个数仍然大于阈值的四分之三,ThreadLocalMap 就要开始扩容了, 我们一起来看下扩容的逻辑:
  1. // set() 的关键方法,被 set(Object value) 调用
  2. private void set(ThreadLocal<?> key, Object value) {
  3.     // We don't use a fast path as with get() because it is at
  4.     // least as common to use set() to create new entries as
  5.     // it is to replace existing ones, in which case, a fast
  6.     // path would fail more often than not.
  7.     ThreadLocal.ThreadLocalMap.Entry[] tab = table;
  8.     int len = tab.length;
  9.     // 计算 key 在数组中的下标,其实就是 ThreadLocal 的 hashCode 和 数组大小-1 取余
  10.     int i = key.threadLocalHashCode & (len - 1);
  11.     // 整体策略:查看 i 索引位置有没有值,有值的话,索引位置 + 1,直到找到没有值的位置
  12.     // 这种解决 hash 冲突的策略,也导致了其在 get 时查找策略有所不同,体现在 getEntryAfterMis
  13.     // nextIndex() 就是让在不超过数组长度的基础上,把数组的索引位置 + 1
  14.     for (ThreadLocal.ThreadLocalMap.Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
  15.         ThreadLocal<?> k = e.get();
  16.         // 找到内存地址一样的 ThreadLocal,直接替换
  17.         // 即,修改线程本地变量
  18.         if (k == key) {
  19.             e.value = value;
  20.             return;
  21.         }
  22.         // 当前 key 是 null,说明 ThreadLocal 被清理了,直接替换掉并返回
  23.         if (k == null) {
  24.             replaceStaleEntry(key, value, i);
  25.             return;
  26.         }
  27.     }
  28.     // 当前 i 位置是无值的,可以被当前 thradLocal 使用
  29.     tab[i] = new ThreadLocal.ThreadLocalMap.Entry(key, value);
  30.     int sz = ++size;
  31.     // 当数组大小大于等于扩容阈值(数组大小的三分之二)时,进行扩容
  32.     if (!cleanSomeSlots(i, sz) && sz >= threshold) {
  33.         rehash();
  34.     }
  35. }
复制代码
源码注解也比较清晰,我们注意两点:
ThreadLocalMap 扩容策略的语言描述:
在 ThreadLocalMap.set() 方法的最后,如果执行完启发式清理工作后,未清理到任何 Entry,且当前数组中 Entry 的数量已经达到了扩容阈值(数组长度的三分之二),就开始执行 rehash() 逻辑。
rehash() 首先是会进行探测式清理工作,从数组的起始位置开始遍历,查找 key 为 null 的 Entry 并清理。清理完成之后如果 ThreadLocal 的个数仍然大于等于扩容阈值的四分之三,那么就进行扩容操作,扩容为原来数组长度的两倍,并且设置下一次的扩容阈值为新数组长度的三分之二。
InheritableThreadLocal 与继承性

通过 ThreadLocal 创建的线程变量,其子线程是无法继承的。
也就是说你在线程中通过 ThreadLocal 创建了线程变量 V,而后该线程创建了子线程,你在子线程中是无法通过 ThreadLocal 来访问父线程的线程变量 V 的。
  1. // get 的关键方法,被 get() 方法调用
  2. // 得到当前 thradLocal 对应的值,值的类型是由 thradLocal 的泛型决定的
  3. // 首先尝试根据 hashcode 取模 数组大小-1 = 索引位置 i 寻找,找不到的话,自旋把 i+1,直到找到
  4. private ThreadLocal.ThreadLocalMap.Entry getEntry(ThreadLocal<?> key) {
  5.     int i = key.threadLocalHashCode & (table.length - 1);
  6.     ThreadLocal.ThreadLocalMap.Entry e = table[i];
  7.     // e 不为空,并且 e 的 ThreadLocal 的内存地址和 key 相同,直接返回,否则就是没有找到,继续寻找
  8.     if (e != null && e.get() == key) {
  9.         return e;
  10.     } else {
  11.         // 这个取数据的逻辑,是因为 set 时数组索引位置冲突造成的
  12.         return getEntryAfterMiss(key, i, e);
  13.     }
  14. }
  15. // 自旋 i+1,直到找到为止
  16. private ThreadLocal.ThreadLocalMap.Entry getEntryAfterMiss(ThreadLocal<?> key, int i, ThreadLocal.ThreadLocalMap.Entry e) {
  17.     ThreadLocal.ThreadLocalMap.Entry[] tab = table;
  18.     int len = tab.length;
  19.     while (e != null) {
  20.         ThreadLocal<?> k = e.get();
  21.         // 内存地址一样,表示找到了
  22.         if (k == key) {
  23.             return e;
  24.         }
  25.         // 删除不再使用的 Entry,避免内存泄漏
  26.         if (k == null) {
  27.             expungeStaleEntry(i);
  28.         } else {
  29.             // 继续使索引位置 + 1
  30.             i = nextIndex(i, len);
  31.         }
  32.         e = tab[i];
  33.     }
  34.     return null;
  35. }
复制代码
如果你需要子线程继承父线程的线程变量,那该怎么办呢?
JDK 的 InheritableThreadLocal 类可以完成父线程到子线程的值传递。
InheritableThreadLocal 是 ThreadLocal 子类,所以用法和 ThreadLocal 相同。
使用时,改为 ThreadLocal threadLocal = new InheritableThreadLocal(); 即可。
InheritableThreadLocal 在创建子线程的时候(初始化线程时),在 Thread#init() 方法中拷贝父线程中本地变量的值到子线程的本地变量中,子线程就拥有了和父线程一样的本地变量。
下面是 Thread#init() 中,和 ThreadLocal 相关的代码,我们一起来看下这个功能是怎么实现的
  1. // set() 的部分源码
  2. if (!cleanSomeSlots(i, sz) && sz >= threshold){
  3.     rehash();
  4. }
  5. // 称为启发式清理,从指定下标开始遍历
  6. private boolean cleanSomeSlots(int i, int n) {
  7.     boolean removed = false;
  8.     ThreadLocal.ThreadLocalMap.Entry[] tab = table;
  9.     int len = tab.length;
  10.     do {
  11.         i = nextIndex(i, len);
  12.         ThreadLocal.ThreadLocalMap.Entry e = tab[i];
  13.         if (e != null && e.get() == null) {
  14.             n = len;
  15.             removed = true;
  16.             i = expungeStaleEntry(i);
  17.         }
  18.     } while ( (n >>>= 1) != 0);
  19.     return removed;
  20. }
  21. private void rehash() {
  22.     // 探测式清理,从数组的下标为 0 处开始遍历,清理所有无用的 Entry
  23.     expungeStaleEntries();
  24.     // 扩容使用较低的阈值,以避免迟滞
  25.     // Use lower threshold for doubling to avoid hysteresis
  26.     if (size >= threshold - threshold / 4)
  27.         resize();
  28. }
复制代码
不过,完全不建议你在线程池中使用 InheritableThreadLocal,不仅仅是因为它具有 ThreadLocal 相同的缺点:可能导致内存泄露,更重要的原因是:线程池中线程的创建是动态的,很容易导致继承关系错乱,如果你的业务逻辑依赖 InheritableThreadLocal,那么很可能导致业务逻辑计算错误,而这个错误往往比内存泄露更要命。
同时,如果父线程的本地变量是引用数据类型的话,父子线程共享相同的数据,存在线程安全问题,甚至导致业务逻辑计算错误。要想做到父子线程的本地变量互不影响,则需要继承 InheritableThreadLocal 并重写 childValue() 方法实现对象的深拷贝 。
并且对于使用线程池等会池化复用线程的执行组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;这时父子线程关系的 ThreadLocal 值传递已经没有意义,应用需要的实际上是把任务提交给线程池时的ThreadLocal 值传递到任务执行时。阿里开源的 TransmittableThreadLocal 类继承并加强 InheritableThreadLocal 类,解决上述的问题。
TransmittableThreadLocal

TransmittableThreadLocal 的 GitHub:https://github.com/alibaba/transmittable-thread-local
TransmittableThreadLocal 的 API 文档:https://alibaba.github.io/transmittable-thread-local
TransmittableThreadLocal 是阿里开源的一个增强 InheritableThreadLocal 的库。
TransmittableThreadLocal 的功能:在使用线程池等会池化复用线程的执行组件情况下,提供 ThreadLocal 值的传递功能,解决异步执行时上下文传递的问题。
TTL 的使用及注意事项

TTL 的 User Guide:https://github.com/alibaba/transmittable-thread-local#-user-guide
TransmittableThreadLocal 有三种使用方式(具体使用见 GitHub 的 README):
注意事项:
使用 TtlRunnable 和 TtlCallable 来修饰传入线程池的 Runnable 和 Callable 时,即使是同一个 Runnable 任务多次提交到线程池时,每次提交时都需要通过修饰操作(即TtlRunnable.get(task))以抓取这次提交时的 TransmittableThreadLocal 上下文的值;即如果同一个任务下一次提交时不执行修饰而仍然使用上一次的 TtlRunnable,则提交的任务运行时会是之前修饰操作所抓取的上下文。
修饰线程池其实本质上也是修饰 Runnable,只是将这个逻辑移到了 ExecutorServiceTtlWrapper.submit() 方法内,对所有提交的 Runnable 进行修饰。
  1. // 扩容
  2. private void resize() {
  3.     // 拿出旧的数组
  4.     ThreadLocal.ThreadLocalMap.Entry[] oldTab = table;
  5.     int oldLen = oldTab.length;
  6.     // 新数组的大小为老数组的两倍
  7.     int newLen = oldLen * 2;
  8.     // 初始化新数组
  9.     ThreadLocal.ThreadLocalMap.Entry[] newTab = new ThreadLocal.ThreadLocalMap.Entry[newLen];
  10.     int count = 0;
  11.     // 老数组的值拷贝到新数组上
  12.     for (int j = 0; j < oldLen; ++j) {
  13.         ThreadLocal.ThreadLocalMap.Entry e = oldTab[j];
  14.         if (e != null) {
  15.             ThreadLocal<?> k = e.get();
  16.             if (k == null) {
  17.                 e.value = null; // Help the GC
  18.             } else {
  19.                 // 计算 ThreadLocal 在新数组中的位置
  20.                 int h = k.threadLocalHashCode & (newLen - 1);
  21.                 // 如果索引 h 的位置值不为空,往后+1,直到找到值为空的索引位置
  22.                 while (newTab[h] != null)
  23.                     h = nextIndex(h, newLen);
  24.                 // 给新数组赋值
  25.                 newTab[h] = e;
  26.                 count++;
  27.             }
  28.         }
  29.     }
  30.     // 给新数组初始化下次扩容阈值,为数组长度的三分之二
  31.     setThreshold(newLen);
  32.     size = count;
  33.     table = newTab;
  34. }
复制代码
TTL 的原理


TTL 做的是,使用装饰器模式装饰 Runnable 等任务,将原本与 Thread 绑定的线程变量,缓存一份到 TtlRunnable 对象中,每次调用任务的 run() 前后进行 set() 和还原数据。
TTL 的需求场景

需求场景说明
总结

使用 ThreadLocal 库友好地解决了线程本地存储的问题,但是它还存在父子线程值传递丢失的问题,于是 JDK 又引入了 InheritableThreadLocal 对象。
InheritableThreadLocal 的出现又引出了下一个问题,那就是涉及到线程池等复用线程场景时,还是会存在变量复制混乱的缺陷。阿里巴巴提供了解决方案,用 TransmittableThreadLocal  来增强 InheritableThreadLocal 对象。
参考资料

30 | 线程本地存储模式:没有共享,就没有伤害-极客时间 (geekbang.org)
ThreadLocal原理分析及内存泄漏演示-极客时间 (geekbang.org)
ThreadLocal如何在父子线程及线程池中传递?-极客时间 (geekbang.org)
https://github.com/alibaba/transmittable-thread-local

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4