当前位置 : 主页 > 编程语言 > java >

ThreadLocal开发中遇到的问题

来源:互联网 收集:自由互联 发布时间:2023-09-03
背景 项目主体架构Spring Cloud。使用ThreadLocal保存线程数据,但是发现保存的数据不会随着线程的结束而消失。 解决 线程结束后ThreadLocal保存的数据要调用remove()清除数据。 原理 因为每

背景

项目主体架构Spring Cloud。使用ThreadLocal保存线程数据,但是发现保存的数据不会随着线程的结束而消失。

解决

线程结束后ThreadLocal保存的数据要调用remove()清除数据。

原理

因为每个线程是从SpringBoot线程池里拿的,如果拿到同一个线程并且该线程的ThreadLocal没有remove数据,那么就会存在上述背景问题。以下是测试代码。

private static ThreadLocal<Long> threadLocal = new ThreadLocal<>();
/**
 * ThreadLocal 在保存值后,如果不remove,保存的值是不会清除的
 * 输出:
 * 第一次请求
 * threadId: 51
 * Value is null, and set value
 * 第二次请求
 * threadId: 51
 * value: 51
 */
@GetMapping("testThreadLocalNoRm")
public String testThreadLocalNoRm() {
    Thread t = Thread.currentThread();
    long tId = t.getId();
    System.out.println("threadId: " + tId);
    Long value = threadLocal.get();
    if (value != null) {
        // 第二次请求,如果没有remove值tId,则打印值 
        System.out.println("value: " + value);
    }
    else {
        // 第一次请求 ThreadLocal没值,会先保存值tId
        System.out.println("Value is null, and set value");
        threadLocal.set(tId);
    }
    // 请求完成remove清除值
    // threadLocal.remove();
    return "success";
}

第一次请求testThreadLocalNoRm,如线程id: 51,value为空,把线程id存入ThreadLocal。

第二次请求testThreadLocalNoRm,同一个线程51,value不为空,取到第一次请求存入的值51。

进一步研究

怀着对ThreadLocal的好奇,进一步研究ThreadLocal,发现它也是一种抽象数据类型 (abstract data type,ADT,带有一组操作的一些对象的集合)

ThreadLocal里有个静态内部类ThreadLocalMap,是一个自定义的Hash Map,是一个散列表,ThreadLocal存储值是以键值对保存在ThreadLocalMap.Entry里,Entry继承了WeakReference,其中key是当前ThreadLocal实例。Key是弱引用。后面remove()函数就是利用这个弱引用机制。

static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    table = new Entry[INITIAL_CAPACITY];
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
    table[i] = new Entry(firstKey, firstValue);
    size = 1;
    setThreshold(INITIAL_CAPACITY);
}

ThreadLocal.ThreadLocalMap是Thread的实例变量threadLocals,该实例变量是包访问权限,所以说ThreadLocal是线程的内部数据,或者说线程本地变量。

Thread.threadLocals

/* ThreadLocal values pertaining to this thread. This map is maintained
 * by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;

ThreadLocal.ThreadLocalMap.createMap

void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}


ThreadLocal如何保存数据

ThreadLocal.set()源码

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

根据当前线程找到ThreadLocalMap,不为null以当前ThreadLocal实例为key保存value,否则创建ThreadLocalMap。

ThreadLocalMap.set()源码

private void set(ThreadLocal<?> key, Object value) {

    // We don't use a fast path as with get() because it is at
    // least as common to use set() to create new entries as
    // it is to replace existing ones, in which case, a fast
    // path would fail more often than not.

    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);

    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        ThreadLocal<?> k = e.get();

        if (k == key) {
            e.value = value;
            return;
        }

        if (k == null) {
            replaceStaleEntry(key, value, i);
            return;
        }
    }

    tab[i] = new Entry(key, value);
    int sz = ++size;
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();
}

根据当前ThreadLocal实例的threadLocalHashCode,和(len - 1)与操作,计算Entry数组的下标。len是该数组长度。初始长度16,且必须是2的幂。

如果定位到下标的槽位不为空,即槽位存在Entry,key与槽位Entry的k一样,替换槽位Entry的value,如果槽位Entry的k是null值,替换旧Entry。

如果下标槽位空的,则在该槽位创建一个Entry(key, value);

如果有过时Entry清除掉,同时达到Entry数组的扩容阀值,该阀值是2/3的数组长度取整。默认是10,即16 * 2/3。

如果遇到槽位有值,且k != key,k不为空,这就产生冲突了,这里解决冲突用线性探测法,nextIndex(i, len),就是往下一个槽位探测,找到一个空的槽位插入,nextIndex函数是以环形的方式探测下一槽位的,探测到最后一个槽位后又从头开始探测。

private static int nextIndex(int i, int len) {
    return ((i + 1 < len) ? i + 1 : 0);
}

rehash()如何扩容的

/**
 * Re-pack and/or re-size the table. First scan the entire
 * table removing stale entries. If this doesn't sufficiently
 * shrink the size of the table, double the table size.
 */
private void rehash() {
    expungeStaleEntries();

    // Use lower threshold for doubling to avoid hysteresis
    if (size >= threshold - threshold / 4)
        resize();
}

删除过期Entry,删除过期Entry之后的size,就是还剩下多少个Entry,这个size >= 8则进行扩容。默认threshold是10。

threshold = len * 2/3,扩容阀值

threshold - threshold / 4 = threshold * (1 – 1/4) = threshold * 3/4 = len * 2/3 * 3/4 = len * 1/2; 实际扩容阀值

所以,大于阀值只是进入扩容准备阶段,准备阶段清除过期Entry后,还剩的Entry数量size,大于或者等于len * 1/2,才会进入实际扩容。

扩容会重新创建一个是原来两倍长度的Entry数组,再将原数组的Entry重新哈希定位下标存入新数组。过程中遇到某个Entry不为空的key == null的,把value也置null,方便GC回收。第一次扩容之后Entry数组长度32,阀值21,实际扩容阀值16。

resize()源码

/**
 * Double the capacity of the table.
 */
private void resize() {
    Entry[] oldTab = table;
    int oldLen = oldTab.length;
    int newLen = oldLen * 2;
    Entry[] newTab = new Entry[newLen];
    int count = 0;

    for (int j = 0; j < oldLen; ++j) {
        Entry e = oldTab[j];
        if (e != null) {
            ThreadLocal<?> k = e.get();
            if (k == null) {
                e.value = null; // Help the GC
            } else {
                int h = k.threadLocalHashCode & (newLen - 1);
                while (newTab[h] != null)
                    h = nextIndex(h, newLen);
                newTab[h] = e;
                count++;
            }
        }
    }

    setThreshold(newLen);
    size = count;
    table = newTab;
}

ThreadLocal如何查询数据

ThreadLocal的get()函数获取set(key, value)存入的value,key是当前ThreadLocal实例。

get()源码

public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    return setInitialValue();
}

根据当前线程取得ThreadLocalMap,即Thread里的实例变量threadLocals。

ThreadLocalMap不为空,根据当前ThreadLocal实例获取Entry。

通过上面的介绍Entry是一个键值对,不为空返回里面的value。

如果ThreadLocalMap为空,为其设一个初始值,类似上面ThreadLocal.set(),只是value是null。

ThreadLocal如何删除数据

ThreadLocal.remove()源码

public void remove() {
     ThreadLocalMap m = getMap(Thread.currentThread());
     if (m != null)
         m.remove(this);
}

ThreadLocalMap.remove()源码

private void remove(ThreadLocal<?> key) {
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        if (e.get() == key) {
            e.clear();
            expungeStaleEntry(i);
            return;
        }
    }
}

ThreadLocal通过remove()函数删除set入的数据。取得当前线程的ThreadLocalMap,删除当前ThreadLocal实例的Entry。先是把Entry的弱引用key置空,该引用的那块内存会被GC回收,以此上面那些key==null的Entry都是过期的,待GC回收的ThreadLocal实例。e.clear()就是将key引用置空。删除过期Entry则将value置空,和该Entry槽位置空。

关于4种引用,分别是,强引用、软引用、弱引用、虚引用。

  • 强引用对象不会被垃圾回收,甚至OOM也不会被回收。
  • 软引用SoftReference,堆内存紧张时 GC回收,否则不回收,所以软引用不会引起OOM。
  • 弱引用WeakReference,GC时,无论是否堆内存是否紧张都会回收。
  • 虚引用PhantomReference,用于跟踪对象回收过程,虚引用通过get()是拿不到引用对象的。

测试ThreadLocal的GC

public class ThreadLocalGC {

    public static class User {

        private String name;

        public User(String name) {
            this.name = name;
        }

        // 监控一下User垃圾回收
        @Override
        protected void finalize() {
            System.out.println("User GC");
        }

        @Override
        public String toString() {
            return "User{" +
                    "name='" + name + '\'' +
                    '}';
        }
    }

    public static class UserRun implements Runnable {

        private long sleepTime;

        public UserRun(long sleepTime) {
            this.sleepTime = sleepTime;
        }

        private static ThreadLocal<User> TL = new ThreadLocal<User>() {
            // 监控ThreadLocal垃圾回收
            @Override
            protected void finalize() throws Throwable {
                super.finalize();
                System.out.println(this + ": TL GC");
            }
        };

        @Override
        public void run() {
            User user = new User("Joy Boy");
            TL.set(user);
            // user置为null,释放User,System.gc()会回收,不置null,System.gc()不会回收
            // 如果内存紧张不管是否释放都会回收User
            // user = null;

            System.out.println(Thread.currentThread().getId() + ": " + TL.get());
            System.out.println("TL1: " + TL);
            TL.remove();
            // -Xmx1m最大堆1m 申请一个大内存触发GC,制造内存紧张
            byte[] b = new byte[1024 * 500];
            // 手动GC,内存紧张的时候不用手动GC也会触发GC
            // System.gc();

            // 这里重新new一个TL会释放上面的TL
            TL = new ThreadLocal<User>() {
                @Override
                protected void finalize() {
                    System.out.println(this + ": TL GC");
                }
            };
            System.out.println(Thread.currentThread().getId() + ": " + TL.get());
            System.out.println("TL2: " + TL);
            try {
                // 休眠一下可以看到GC,让垃圾回收线程执行完,以此监测GC
                Thread.sleep(sleepTime);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 设置JVM参数方便测试
     * -Xmx1m 设置堆大小1m
     * -XX:+PrintGCDetails 打印GC详情     *
     * 打印信息
     * [GC (Allocation Failure)  1099K->676K(1536K), 0.0007083 secs]
     * [Full GC (Ergonomics)  804K->304K(1536K), 0.0060029 secs]
     * 12: User{name='Joy Boy'}
     * TL1: com.zx.threads.thread.local.ThreadLocalGC$UserRun$1@21a66f33
     * 12: null
     * TL2: com.zx.threads.thread.local.ThreadLocalGC$UserRun$2@2f618577
     * com.zx.threads.thread.local.ThreadLocalGC$UserRun$1@21a66f33: TL GC
     * User GC
     */
    public static void main(String[] args) throws InterruptedException {
        new Thread(new UserRun(1000)).start();
    }
}

为了方便测试运行的堆内存设置1M,Entry的value是User,key是TL。User和TL都重写了finalize(),用于跟踪GC。启个UserRun线程,测试垃圾回收过程。对重点代码作了解释。TL.remove()之后,在Entry释放了TL,释放User。TL重新new了,所以TL1对象被GC了,User也被GC了,如果TL还有引用是不会被GC的,重新new了,TL1没有被引用,才会被GC。一般生产环境中,不会重新new ThreadLocal(),所以就算Entry释放了TL,ThreadLocal也不会被GC。只有value被释放了之后会被GC。 最终Entry也会被GC。因此如果不remove()释放掉Entry,可能会导致OOM。

同一线程每new一个ThreadLocal,生成一个Entry,实现原理

通过阅读ThreadLocal源码,它维护着一个ThreadLocalMap,该Map维护着一个Entry数组。在同一线程中,每new一个ThreadLocal就是一个Entry,根据threadLocalHashCode和数组长度减一,与运算,即threadLocalHashCode & (len - 1)计算出数组下标,插入数组。如果同一线程中创建了多个ThreadLocal,那么生成的下标肯定是不一样的,而数组长度是一定的,默认长度是16,所以threadLocalHashCode肯定是会变化的,才会算出不一样下标。问题是threadLocalHashCode是如何变化的?

为搞清楚这个问题,先看看ThreadLocal的threadLocalHashCode相关源码。

private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode = new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
    return nextHashCode.getAndAdd(HASH_INCREMENT);
}

在ThreadLocal中threadLocalHashCode被声明为,私有的final整型基本数据类型,且调用静态方法nextHashCode()赋值。这个final关键字和调用nextHashCode()为其实现了同一线程中,每new一个ThreadLocal就会生成新threadLocalHashCode,final定义的字段(没有static),构造函数完成之前,不能初始化该字段,即构造函数完成之前,该字段对其他线程不可见。所以每次new ThreadLocal()都会初始化一个threadLocalHashCode,这个初始化就是调用nextHashCode();

通过相关源码的字节码指令也可以看出

/**
 * public <init>()V
 *    L0
 *     LINENUMBER 5 L0
 *     ALOAD 0
 *     INVOKESPECIAL java/lang/Object.<init> ()V
 *    L1
 *     LINENUMBER 14 L1
 *     ALOAD 0
 *     INVOKESTATIC com/zx/finals/FinalByMethod.nextHashCode ()I
 *     PUTFIELD com/zx/finals/FinalByMethod.threadLocalHashCode : I
 *     RETURN
 *    L2
 *     LOCALVARIABLE this Lcom/zx/finals/FinalByMethod; L0 L2 0
 *     MAXSTACK = 2
 *     MAXLOCALS = 1
 */

public <init>()V,就是new ThreadLocal()执行的,

INVOKESTATIC com/zx/finals/FinalByMethod.nextHashCode ()I,

就是执行静态方法nextHashCode ()。

PUTFIELD com/zx/finals/FinalByMethod.threadLocalHashCode : I,

赋值给threadLocalHashCode

0x61c88647猜想

ThreadLocal.ThreadLocalMap就是一个散列表,其散列函数通过ThreadLocal的散列码跟散列表长度取模实现,ThreadLocal的散列码是0x61c88647的叠加得出。

ThreadLocal散列叠加数0x61c88647,关于这个数的猜想

在说这个散列叠加数之前,先了解一下黄金分割

数学定义:把一条线段分割为两部分,使较大部分与全长的比值等于较小部分与较大的比值,则这个比值即为黄金分割。其比值是 2 / (√5 + 1) = (√5 - 1) / 2,近似值为0.618,通常用希腊字母 Ф [fi:] 表示这个值。有些文献用 φ (Ф的小写) 表示斐波那契黄金分割数 (√5 - 1) / 2 ≈ 0.618,用 Ф 表示黄金分区率的倒数 1 / φ,(√5 + 1) / 2 ≈ 1.618

A————————C——————B,设一条线段AB的长度为a,C点在靠近B点的黄金分割点上,且AC为b,则b与a的比叫作黄金比

ThreadLocal开发中遇到的问题_ThreadLocal

ThreadLocal开发中遇到的问题_源码_02

ThreadLocal开发中遇到的问题_源码_03

ThreadLocal开发中遇到的问题_ThreadLocal_04

ThreadLocal开发中遇到的问题_0x61c88647_05

ThreadLocal开发中遇到的问题_源码_06

ThreadLocal开发中遇到的问题_ThreadLocal_07

ThreadLocal开发中遇到的问题_0x61c88647_08

ThreadLocal开发中遇到的问题_源码_09

0x61c88647的十进制表示1640531527,假设这个数是a的黄金分割点,也就是C点,以下是关于这个假设的推理:

1.当这个数是AC,求AB

ThreadLocal开发中遇到的问题_ThreadLocal_10

ThreadLocal开发中遇到的问题_ThreadLocal_11

ThreadLocal开发中遇到的问题_源码_12

2.当这个数是BC,求AB

ThreadLocal开发中遇到的问题_ThreadLocal_13

ThreadLocal开发中遇到的问题_0x61c88647_14

ThreadLocal开发中遇到的问题_0x61c88647_15


第一种假设,AB:2654435770,其二进制表示10011110001101110111100110111010

第二种假设,AB:4294967297,其二进制表示100000000000000000000000000000001,

在计算机里更倾向于第二种假设,因其Java二进制位移表示(1L << 32) + 1,方便计算加一可以忽略表示为

Integer.MAX_VALUE = 1 << 31 = 2³¹ = -2147483648,转成Long表示,1L << 31 = 2147483648

1L << 32 = 2147483648 * 2 = 4294967296。

逆向证明过程

AB * ((√5 - 1) / 2),其Java代码表示

0x61c88647 = (1L << 32) - 2654435769 = 1640531527;

2654435769其实就是较长的那段AC,1640531527就是较短那段BC。

ThreadLocal的threadLocalHashCode,这个线程本地变量散列码,是通过0x61c88647叠加来起到减少冲突的作用

ThreadLocal的nextHashCode() 就是叠加函数,准确说是扰乱函数,通过叠加0x61c88647来扰乱散列码,以此减少冲突。

ThreadLocalMap散列表通过 threadLocalHashCode & (len - 1) 散列函数寻址,len是散列表长度

threadLocalHashCode & (len - 1) 等同于 threadLocalHashCode % len (threadLocalHashCode mod len)

线程本地变量散列码和长度减一与运算,求余数,

(len - 1),len是2的幂,默认是(2⁴ - 1),二进制表示1111,(32 - 4)位高位补0,所以与运用算只算最低4位,threadLocalHashCode最低4位就是余数

threadLocalHashCode的散列是运用乘法散列方案,设 k = 0x61c88647,则散列函数 h(K) = k, 2k, 3k...nk,

ThreadLocalMap是运用除法散列方案 h(K) = nk mod len


关于黄金分割使得散列均匀分布早有证明,这个证明中斐波那契数列起着重要作用,参考Donald E.Knuth The Art of Computer Programming 6.4

如果非要说0x61c88647跟黄金分割有关系,刚好是分割了Integer.MAX_VALUE的两倍的黄金分割点,

测试用AC段2654435769作为这个分割数也能均匀分布散列码。

最终猜测只要这个分割数足够大,那么每次叠加都是唯一散列码,用这个唯一码再去取模散列表长度时,出现冲突概率会非常小,加上散列表的扩容机制,这个冲突概率就更小了。

【文章原创作者:阿里云代理 http://www.558idc.com/aliyun.html处的文章,转载请说明出处】
上一篇:如何使用CORS来允许设置Cookie
下一篇:没有了
网友评论