ThreadLocal

ThreadLocal 提供了一种方式,在多线程环境下,每个线程都可以拥有自己独特的数据,并且可以在整个线程执行过程中,从上而下的传递。

用法

public class Main {
    static final ThreadLocal<String> context = new ThreadLocal<>();
    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            context.set("线程1");
            System.out.println("thread1 => " + context.get());
        }).start();
        new Thread(() -> {
            context.set("线程2");
            System.out.println("thread2 => " + context.get());
        }).start();
        Thread.sleep(2000);
        System.out.println("main => " + context.get());
    }
}

ThreadLocal

ThreadLocal有一个ThreadLocalMap内部类,数据都是存储在ThreadLocalMap中。

public class ThreadLocal<T> {

    // ThreadLocal 
    static class ThreadLocalMap {
        // 数组中的每个节点值
        static class Entry extends WeakReference<ThreadLocal<?>> {
            // 当前 ThreadLocal 关联的值
            Object value;
            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }
        // 数组的初始化大小
        private static final int INITIAL_CAPACITY = 16;
        // 存储 ThreadLocal 的数组
        private Entry[] table;
        // 当前数组容量
        private int size = 0;
        // 扩容的阈值,默认是数组大小的三分之二
        private int threshold; // Default to 0
    }
}

查看Thread类可以发现,Thread类里面有一个ThreadLocal.ThreadLocalMap的变量

public class Thread {
    /* ThreadLocal values pertaining to this thread. This map is maintained
     * by the ThreadLocal class. */
    ThreadLocal.ThreadLocalMap threadLocals = null;

    /*
     * InheritableThreadLocal values pertaining to this thread. This map is
     * maintained by the InheritableThreadLocal class.
     */
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
}

每个线程的本地变量并不是存放在 ThreadLocal里面,而是存放在调用线程的threadLocals变量里,也就是说 ThreadLocal类型的本地变量存放在具体的线程内存空间中, ThreadLocal就是一个工具壳,它通过 set 方法把 value 值放入调用线程的threadLocals里面并存放起来,当调用线程调用它的 get 时,再从线程的threadLocals里面将其拿出来使用。

源码分析

内部类

ThreadLocalMap本身就是一个简单的 Map 结构,key 是 ThreadLocal,value 是 ThreadLocal 保存的值,因为可能存在多个ThreadLocal,所以底层是采用数组的数据结构。

static class ThreadLocalMap {
    /**    
     *
     * 自定义一个Entry类,并继承自弱引用
     * 用来保存ThreadLocal和Value之间的对应关系
     *
     * 之所以用弱引用,是为了解决线程与ThreadLocal之间的强绑定关系
     * 会导致如果线程没有被回收,则GC便一直无法回收这部分内容
     */
    static class Entry extends WeakReference<ThreadLocal<?>> {
        // 当前 ThreadLocal 关联的值
        Object value;
        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }
    // 数组的初始化大小
    private static final int INITIAL_CAPACITY = 16;
    // 存储 ThreadLocal 的数组
    private Entry[] table;

    private int size = 0;
    // 扩容的阈值,默认是数组大小的三分之二
    private int threshold; // Default to 0
}

ThreadLocalMap 其实就是一个简单的 Map 结构,底层是数组,有初始化大小,也有扩容阈值大小,数组的元素是 Entry,Entry 的 key 就是 ThreadLocal 的引用,value 是 ThreadLocal 的值。

set

public void set(T value) {
    // 获取当前线程
    Thread t = Thread.currentThread();
    // 1. 通过当前线程获取 ThreadLocalMap 
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value); // 2. 往ThreadLocalMap中set数据
    else
        createMap(t, value); //  3. 创建ThreadLocalMap
}

getMap(t)

// 1. 通过当前线程获取 ThreadLocalMap 
// Thread类里面有一个ThreadLocalMap类型的属性
ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
}

createMap(t, value)

// 2. 
// ThreadLocal.createMap(Thread t, T firstValue)
void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}
// ThreadLocalMap
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    // 初始化数组
    table = new Entry[INITIAL_CAPACITY];
    // 计算数组索引下标
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
    // 创建Entry节点
    table[i] = new Entry(firstKey, firstValue);
    size = 1;
    setThreshold(INITIAL_CAPACITY);
}

map.set(this, value)

// 2.
// ThreadLocalMap.
private void set(ThreadLocal<?> key, Object value) {
    Entry[] tab = table;
    int len = tab.length;
    // 获取对应ThreadLocal在table当中的下标
    // 计算 key 在数组中的下标,其实就是 ThreadLocal 的 hashCode 和数组大小-1取余
    int i = key.threadLocalHashCode & (len-1);

    // 整体策略
    // 1、如遇相同 key,则直接替换 value 
    // 2、如果 key 为 null,说明 key 被回收了,替换被回收的 key
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        ThreadLocal<?> k = e.get(); 
        // 找到内存地址一样的 ThreadLocal,直接替换
        if (k == key) {
            e.value = value;
            return;
        }
        // 当前 key 是 null,说明 ThreadLocal 被清理了
        if (k == null) {
            // 替换当前失效的key所在Entry节点
            replaceStaleEntry(key, value, i);
            return;
        }
    }
    // 如果没有在table当中找到该key,也没有找到一个key为null的Entry,则直接在当前位置new一个Entry
    tab[i] = new Entry(key, value);
    int sz = ++size;
    // 当数组大小大于等于扩容阈值(数组大小的三分之二)时,进行扩容
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash(); // 扩容,容量扩大了一倍
}

replaceStaleEntry

// 替换 key 为 null 的 Entry
private void replaceStaleEntry(ThreadLocal<?> key, Object value,int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;
    Entry e;

    // Back up to check for prior stale entry in current run.
    // We clean out whole runs at a time to avoid continual
    // incremental rehashing due to garbage collector freeing
    // up refs in bunches (i.e., whenever the collector runs).
    // 记录当前失效节点的下标
    int slotToExpunge = staleSlot;
    /**
     * 这个for循坏的功能:由staleSlot开始向前搜索,记录key为null的下标
     */
    for (int i = prevIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = prevIndex(i, len))
        if (e.get() == null)
            slotToExpunge = i;

    // Find either the key or trailing null slot of run, whichever
    // occurs first
    /**
     * 这个for循坏的功能:由staleSlot开始向后搜索
     */
    for (int i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();

        // If we find key, then we need to swap it
        // with the stale entry to maintain hash table order.
        // The newly stale slot, or any other stale slot
        // encountered above it, can then be sent to expungeStaleEntry
        // to remove or rehash all of the other entries in run.
        if (k == key) {
            // 如果 key 相等,则先将值设置到这个节点
            e.value = value;

            // 将下标为 staleSlot 与下标为 i 的数据调换位置
            tab[i] = tab[staleSlot];
            tab[staleSlot] = e;

            // 
            // Start expunge at preceding stale entry if it exists
            if (slotToExpunge == staleSlot)
                slotToExpunge = i;
            // expungeStaleEntry 清理下标为 staleSlot 的 Entry
            // 将下标为 staleSlot 的 Entry 设置为 null
            // 返回值便是 table[] 为 null 的下标
            // cleanSomeSlots 继续调用 expungeStaleEntry 来清理被回收的key
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
            return;
        }

        // If we didn't find stale entry on backward scan, the
        // first stale entry seen while scanning for key is the
        // first still present in the run.
        if (k == null && slotToExpunge == staleSlot)
            slotToExpunge = i;
    }

    // If key not found, put new entry in stale slot
    tab[staleSlot].value = null;
    tab[staleSlot] = new Entry(key, value);

    // If there are any other stale entries in run, expunge them
    if (slotToExpunge != staleSlot)
        cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}

expungeStaleEntry

/**        
 * 这个函数可以看做是ThreadLocal里的核心清理函数,它主要做的事情就是
 * 1、将下标为 staleSlot 的 Entry 设置为 null
 * 2、从staleSlot开始,向后遍历将被回收对象所对应的Entry节点的value和Entry节点本身设置null
 * 方便GC,并且size自减1
 * 3、对非null的Entry节点进行rehash,只要不是在当前位置,就会将Entry挪到下一个为null的位置上
 * 所以实际上是对从staleSlot开始做一个连续段的清理和rehash操作
 */
private int expungeStaleEntry(int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;

    // 将下标为 staleSlot 的 Entry 设置为 null
    // expunge entry at staleSlot
    tab[staleSlot].value = null;
    tab[staleSlot] = null;
    size--;


    // Rehash until we encounter null
    Entry e;
    int i;
    for (i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();
        // 如果ThreadLocal 为 null,则将 value 以及数组下标所在位置设置 null ,方便GC
        // 并且size - 1
        if (k == null) {
            e.value = null;
            tab[i] = null;
            size--;
        } else {
            // 如果不为null
            // 重新计算key的下标
            int h = k.threadLocalHashCode & (len - 1);

            // 如果 h == i,说明rehash的位置为当前位置,则继续遍历下一个

            if (h != i) {
                // 如果 h != i,则重新从h开始找到下一个为null的坐标进行赋值

                tab[i] = null;

                // Unlike Knuth 6.4 Algorithm R, we must scan until
                // null because multiple entries could have been stale.
                while (tab[h] != null)
                    h = nextIndex(h, len);
                tab[h] = e;
            }
        }
    }
    // 此时 tab[i] = null
    return i;
}

cleanSomeSlots

/**     
 * i 对应的 Entry 是非无效的,有可能是失效被回收了,也有可能是null
 * 会有两个地方调用到这个方法
 * 1、set方法,在判断是否需要resize之前,会清理并rehash一遍
 * 2、替换失效的节点时候,也会进行一次清理
 */
private boolean cleanSomeSlots(int i, int n) {
    boolean removed = false;
    Entry[] tab = table;
    int len = tab.length;
    do {
        i = nextIndex(i, len);
        Entry e = tab[i];
        // Entry对象不为空,但是Entry对应的key已经为null
        if (e != null && e.get() == null) {
            n = len;
            removed = true;
            // 调用 expungeStaleEntry 回收
            i = expungeStaleEntry(i);
        }
    } while ( (n >>>= 1) != 0);
    return removed;
}

get

public T get() {
    // 因为 threadLocal 属于线程的属性,所以需要先把当前线程拿出来
    Thread t = Thread.currentThread();
    // 从线程中拿到 ThreadLocalMap
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        // 从 map 中拿到 entry
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    // 否则给当前线程的 ThreadLocal 初始化,并返回初始值 null
    return setInitialValue();
}

getEntry

// ThreadLocalMap.getEntry(ThreadLocal<?> key)
private Entry getEntry(ThreadLocal<?> key) {
    // 计算索引位置
    int i = key.threadLocalHashCode & (table.length - 1);
    Entry e = table[i];
    if (e != null && e.get() == key)
        return e;
    else
        return getEntryAfterMiss(key, i, e);
}

getEntryAfterMiss

// 自旋 i+1,直到找到为止
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
    Entry[] tab = table;
    int len = tab.length;

    while (e != null) {
        ThreadLocal<?> k = e.get();
        if (k == key)
            return e;
        if (k == null)
            expungeStaleEntry(i); // 删除 key 为 null 的 Entry
        else
            i = nextIndex(i, len); // 继续使索引位置 + 1
        e = tab[i];
    }
    return null;
}

remove

// 将ThreadLocal对象对应的Entry节点从table当中删除
public void remove() {
    ThreadLocalMap m = getMap(Thread.currentThread());
    if (m != null)
        m.remove(this);
}

// ThreadLocalMap.remove(ThreadLocal<?> key)
private void remove(ThreadLocal<?> key) {
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        if (e.get() == key) {
            // 将引用设置null,方便GC
            e.clear();
            // 从该位置开始进行一次连续清理
            expungeStaleEntry(i);
            return;
        }
    }
}

ThreadLocal副作用

ThreadLocal 的主要问题是会产生脏数据和内存泄漏。这两个问题通常是在线程池的线程中使用 ThreadLocal 引发的,因为线程池有线程复用和内存常驻两个特点。

  1. 脏数据。线程复用会产生脏数据。由于结程池会重用 Thread 对象 ,那么与 Thread 绑定的类的静态属性 ThreadLocal 变量也会被重用。如果在实现的线程 run()方法体中不显式地调用 remove() 清理与线程相关的 ThreadLocal 信息,那么倘若下一个结程不调用set()设置初始值,就可能 get()到重用的线程信息,包括 ThreadLocal 所关联的线程对象的 value 值。
  2. 内存泄露。在源码注释中提示使用 static 键字来修饰 ThreadLocal 。在此场景下 ,寄希望于ThreadLocal 对象失去引用后触发弱引用机制来回收 Entry Value 就不现实了。每次用完 ThreadLocal 时, 必须要及时调用 remove()方法清理。

内存泄露

ThreadLocalMap使用ThreadLocal的弱引用作为key,如果一个ThreadLocal没有外部强引用来引用它,那么系统 GC 的时候,这个ThreadLocal势必会被回收。

这样一来,ThreadLocalMap中就会出现key为null的Entry,就没有办法访问这些key为null的Entry的value,如果当前线程再迟迟不结束的话,这些key为null的Entry的value就会一直存在一条强引用链:

Thread Ref -> Thread -> ThreaLocalMap -> Entry -> value永远无法回收,造成内存泄漏。

其实,ThreadLocalMap的设计中已经考虑到这种情况,也加上了一些防护措施:在ThreadLocal的get(),set(),remove()的时候都会清除线程ThreadLocalMap里所有key为null的value。

但是这些被动的预防措施并不能保证不会内存泄漏:

  1. 使用static的ThreadLocal,延长了ThreadLocal的生命周期,可能导致的内存泄漏。
  2. 分配使用了ThreadLocal又不再调用get(),set(),remove()方法,那么就会导致内存泄漏。

为什么使用弱引用

下面我们分两种情况讨论:

  1. key 使用强引用:引用的ThreadLocal的对象被回收了,但是ThreadLocalMap还持有ThreadLocal的强引用,如果没有手动删除,ThreadLocal不会被回收,导致Entry内存泄漏。
  2. key 使用弱引用:引用的ThreadLocal的对象被回收了,由于ThreadLocalMap持有ThreadLocal的弱引用,即使没有手动删除,ThreadLocal也会被回收。value在下一次ThreadLocalMap调用set,get,remove的时候会被清除。

比较两种情况,我们可以发现:

由于ThreadLocalMap的生命周期跟Thread一样长,如果都没有手动删除对应key,都会导致内存泄漏,但是使用弱引用可以多一层保障:弱引用ThreadLocal不会内存泄漏,对应的value在下一次ThreadLocalMap调用set,get,remove的时候会被清除。

因此,ThreadLocal内存泄漏的根源是:由于ThreadLocalMap的生命周期跟Thread一样长,如果没有手动删除对应key就会导致内存泄漏,而不是因为弱引用。

ThreadLocal 实践

综合上面的分析,我们可以理解ThreadLocal内存泄漏的前因后果,那么怎么避免内存泄漏呢?

每次使用完ThreadLocal,都调用它的remove()方法,清除数据。

在使用线程池的情况下,没有及时清理ThreadLocal,不仅是内存泄漏的问题,更严重的是可能导致业务逻辑出现问题。所以,使用ThreadLocal就跟加锁完要解锁一样,用完就清理。

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!