ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

浅谈Netty中的FastThreadLocal的优势和实现

2021-10-30 17:01:29  阅读:140  来源: 互联网

标签:Netty FastThreadLocal 浅谈 index InternalThreadLocalMap value ThreadLocal 数组


目录

FastThreadLocal为什么比ThreadLocal快

ThreadLocal

ThreadLocalMap

线性探测法

FastThreadLocal

FastThreadLocalThread

InternalThreadLocalMap

 FastThreadLocal.set方法

FastThreadLocal.get方法 


 

基于FastThreadLocalThread去使用FastThreadLocal时的效率要高于JDK的Thread使用ThreadLocal。

FastThreadLocal为什么比ThreadLocal快

FastThreadLocal与ThreadLocal内部存储结构不同,FastThreadLocal基于数据的存储形式,相对ThreadLocal来说站空间多,但是却使得查找效率更高,而ThreadLocal基于hash表,当存在Hash冲突时基于线性表的查找效率显然不如数组索引查找。

ThreadLocal

首先来回顾一下ThreadLocal。在JDK中提供了ThreadLocal用于存储线程私有数据。为了避免加锁,实现上以 Thread 入手,在 Thread 中维护一个 Map,记录 ThreadLocal 与实例之间的映射关系,这样在同一个线程内,Map 就不需要加锁了。

ThreadLocalMap

ThreadLocal内部存储是基于ThreadLocalMap的,ThreadLocalMap 其实与 HashMap 的数据结构类似,是一种使用线性探测法实现的哈希表,底层采用数组存储数据。ThreadLocalMap 会初始化一个长度为 16 的 Entry 数组,每个 Entry 对象用于保存 key-value 键值对。与 HashMap 不同的是,Entry 的 key 就是 ThreadLocal 对象本身,value 就是用户具体需要存储的值。

当调用 ThreadLocal.set() 添加 Entry 对象时,是如何解决 Hash 冲突的呢?下面是答案

线性探测法

每个 ThreadLocal 在初始化时都会有一个 Hash 值为 threadLocalHashCode,每增加一个 ThreadLocal, Hash 值就会固定增加一个魔术 HASH_INCREMENT = 0x61c88647。为什么取 0x61c88647 这个魔数呢?实验证明,通过 0x61c88647 累加生成的 threadLocalHashCode 与 2 的幂取模,得到的结果可以较为均匀地分布在长度为 2 的幂大小的数组中。

ThreadLocal.get() 的过程也是类似的,也是根据 threadLocalHashCode 的值定位到数组下标,然后判断当前位置 Entry 对象与待查询 Entry 对象的 key 是否相同,如果不同,继续向下查找。由此可见,ThreadLocal.set()/get() 方法在数据密集时很容易出现 Hash 冲突,需要 O(n) 时间复杂度解决冲突问题,效率较低。

FastThreadLocal

FastThreadLocal 的实现与 ThreadLocal 非常类似,Netty 为 FastThreadLocal 量身打造了 FastThreadLocalThread 和 InternalThreadLocalMap 两个重要的类。

FastThreadLocalThread

FastThreadLocalThread 是对 Thread 类的一层包装,每个线程对应一个 InternalThreadLocalMap 实例。只有 FastThreadLocal 和 FastThreadLocalThread 组合使用时,才能发挥 FastThreadLocal 的性能优势。FastThreadLocalThread 主要扩展了 InternalThreadLocalMap 字段,我们可以猜测到 FastThreadLocalThread 主要使用 InternalThreadLocalMap 存储数据,而不再是使用 Thread 中的 ThreadLocalMap。

InternalThreadLocalMap

从 InternalThreadLocalMap 内部实现来看,与 ThreadLocalMap 一样都是采用数组的存储方式。但是 InternalThreadLocalMap 并没有使用线性探测法来解决 Hash 冲突,而是在 FastThreadLocal 初始化的时候分配一个数组索引 index,index 的值采用原子类 AtomicInteger 保证顺序递增,通过调用 InternalThreadLocalMap.nextVariableIndex() 方法获得。然后在读写数据的时候通过数组下标 index 直接定位到 FastThreadLocal 的位置,时间复杂度为 O(1)。如果数组下标递增到非常大,那么数组也会比较大,所以 FastThreadLocal 是通过空间换时间的思想提升读写性能。

 FastThreadLocal.set方法

public final void set(V value) {
    if (value != InternalThreadLocalMap.UNSET) { // 1. value 是否为缺省值
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); // 2. 获取当前线程的 InternalThreadLocalMap
        setKnownNotUnset(threadLocalMap, value); // 3. 将 InternalThreadLocalMap 中数据替换为新的 value
    } else {
        remove();

    }
}

FastThreadLocal.set() 方法虽然入口只有几行代码,但是内部逻辑是相当复杂的。我们首先还是抓住代码主干,一步步进行拆解分析。set() 的过程主要分为三步:

  1. 判断 value 是否为缺省值,如果等于缺省值,那么直接调用 remove() 方法。这里我们还不知道缺省值和 remove() 之间的联系是什么,我们暂且把 remove() 放在最后分析。

  2. 如果 value 不等于缺省值,接下来会获取当前线程的 InternalThreadLocalMap。

  3. 然后将 InternalThreadLocalMap 中对应数据替换为新的 value。 

首先我们看下 InternalThreadLocalMap.get() 方法.

  1. 如果当前线程是 FastThreadLocalThread 类型,那么直接通过 fastGet() 方法获取 FastThreadLocalThread 的 threadLocalMap 属性即可。如果此时 InternalThreadLocalMap 不存在,直接创建一个返回。
  2. 如果当前线程不是 FastThreadLocalThread,内部是没有 InternalThreadLocalMap 属性的,Netty 在 UnpaddedInternalThreadLocalMap 中保存了一个 JDK 原生的 ThreadLocal,ThreadLocal 中存放着 InternalThreadLocalMap,此时获取 InternalThreadLocalMap 就退化成 JDK 原生的 ThreadLocal 获取。
public static InternalThreadLocalMap get() {
    Thread thread = Thread.currentThread();
    if (thread instanceof FastThreadLocalThread) { // 当前线程是否为 FastThreadLocalThread 类型
        return fastGet((FastThreadLocalThread) thread);
    } else {
        return slowGet();
    }
}

private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
    InternalThreadLocalMap threadLocalMap = thread.threadLocalMap(); // 获取 FastThreadLocalThread 的 threadLocalMap 属性
    if (threadLocalMap == null) {
        thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
    }
    return threadLocalMap;
}
private static InternalThreadLocalMap slowGet() {
    ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; 
    InternalThreadLocalMap ret = slowThreadLocalMap.get(); // 从 JDK 原生 ThreadLocal 中获取 InternalThreadLocalMap
    if (ret == null) {
        ret = new InternalThreadLocalMap();
        slowThreadLocalMap.set(ret);
    }
    return ret;
}

 下面使用一幅图描述 InternalThreadLocalMap 的获取方式

下面看下 setKnownNotUnset() 如何将数据添加到 InternalThreadLocalMap 的。

setKnownNotUnset() 主要做了两件事:

  1. 找到数组下标 index 位置,设置新的 value。

  2. 将 FastThreadLocal 对象保存到待清理的 Set 中。

private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
    // 1. 找到数组下标 index 位置,设置新的 value
    if (threadLocalMap.setIndexedVariable(index, value)) { 
    // 2. 将 FastThreadLocal 对象保存到待清理的 Set 中
        addToVariablesToRemove(threadLocalMap, this); 
    }
}

首先我们看下第一步 threadLocalMap.setIndexedVariable() 的源码 

public boolean setIndexedVariable(int index, Object value) {
    Object[] lookup = indexedVariables;
    if (index < lookup.length) {
        Object oldValue = lookup[index]; 
        lookup[index] = value; // 直接将数组 index 位置设置为 value,时间复杂度为 O(1)
        return oldValue == UNSET;
    } else {
        expandIndexedVariableTableAndSet(index, value); // 容量不够,先扩容再设置值
        return true;
    }
}

indexedVariables 就是 InternalThreadLocalMap 中用于存放数据的数组,如果数组容量大于 FastThreadLocal 的 index 索引,那么直接找到数组下标 index 位置将新 value 设置进去,事件复杂度为 O(1)。在设置新的 value 之前,会将之前 index 位置的元素取出,如果旧的元素还是 UNSET 缺省对象,那么返回成功。

如果数组容量不够了怎么办呢?InternalThreadLocalMap 会自动扩容,然后再设置 value。接下来看看 expandIndexedVariableTableAndSet() 的扩容逻辑.

如果看过HashMap 中扩容的源码I就会发现,nternalThreadLocalMap 实现数组扩容几乎和 HashMap 完全是一模一样的 。InternalThreadLocalMap 以 index 为基准进行扩容,将数组扩容后的容量向上取整为 2 的次幂。然后将原数组内容拷贝到新的数组中,空余部分填充缺省对象 UNSET,最终把新数组赋值给 indexedVariables。为什么 InternalThreadLocalMap 以 index 为基准进行扩容,而不是原数组长度呢?假设现在初始化了 70 个 FastThreadLocal,但是这些 FastThreadLocal 从来没有调用过 set() 方法,此时数组还是默认长度 32。当第 index = 70 的 FastThreadLocal 调用 set() 方法时,如果按原数组容量 32 进行扩容 2 倍后,还是无法填充 index = 70 的数据。所以使用 index 为基准进行扩容可以解决这个问题,但是如果 FastThreadLocal 特别多,数组的长度也是非常大的。

private void expandIndexedVariableTableAndSet(int index, Object value) {
    Object[] oldArray = indexedVariables;
    final int oldCapacity = oldArray.length;
    int newCapacity = index;
    newCapacity |= newCapacity >>>  1;
    newCapacity |= newCapacity >>>  2;
    newCapacity |= newCapacity >>>  4;
    newCapacity |= newCapacity >>>  8;
    newCapacity |= newCapacity >>> 16;
    newCapacity ++;
    Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
    Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
    newArray[index] = value;
    indexedVariables = newArray;
}

向 InternalThreadLocalMap 添加完数据之后,接下就是将 FastThreadLocal 对象保存到待清理的 Set 中。我们继续看下 addToVariablesToRemove() 是如何实现的。 

private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
    // 获取数组下标为 0 的元素
    Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); 
    Set<FastThreadLocal<?>> variablesToRemove;
    if (v == InternalThreadLocalMap.UNSET || v == null) {
    // 创建 FastThreadLocal 类型的 Set 集合
        variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>()); 
    // 将 Set 集合填充到数组下标 0 的位置
        threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); 
    } else {
    // 如果不是 UNSET,Set 集合已存在,直接强转获得 Set 集合
        variablesToRemove = (Set<FastThreadLocal<?>>) v; 
    }
    // 将 FastThreadLocal 添加到 Set 集合中
    variablesToRemove.add(variable); 
}

variablesToRemoveIndex 是采用 static final 修饰的变量,在 FastThreadLocal 初始化时 variablesToRemoveIndex 被赋值为 0。InternalThreadLocalMap 首先会找到数组下标为 0 的元素,如果该元素是缺省对象 UNSET 或者不存在,那么会创建一个 FastThreadLocal 类型的 Set 集合,然后把 Set 集合填充到数组下标 0 的位置。如果数组第一个元素不是缺省对象 UNSET,说明 Set 集合已经被填充,直接强转获得 Set 集合即可。这就解释了 InternalThreadLocalMap 的 value 数据为什么是从下标为 1 的位置开始存储了,因为 0 的位置已经被 Set 集合占用了。 

FastThreadLocal.get方法 

public final V get() {
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    // 从数组中取出 index 位置的元素
    Object v = threadLocalMap.indexedVariable(index); 
    if (v != InternalThreadLocalMap.UNSET) {
        return (V) v;
    }
    // 如果获取到的数组元素是缺省对象,执行初始化操作
    return initialize(threadLocalMap); 
}

public Object indexedVariable(int index) {
    Object[] lookup = indexedVariables;
    return index < lookup.length? lookup[index] : UNSET;
}

private V initialize(InternalThreadLocalMap threadLocalMap) {
    V v = null;
    try {
        v = initialValue();
    } catch (Exception e) {
        PlatformDependent.throwException(e);
    }
    threadLocalMap.setIndexedVariable(index, v);
    addToVariablesToRemove(threadLocalMap, this);
    return v;
}

首先根据当前线程是否是 FastThreadLocalThread 类型找到 InternalThreadLocalMap,然后取出从数组下标 index 的元素,如果 index 位置的元素不是缺省对象 UNSET,说明该位置已经填充过数据,直接取出返回即可。如果 index 位置的元素是缺省对象 UNSET,那么需要执行初始化操作。可以看到,initialize() 方法会调用用户重写的 initialValue 方法构造需要存储的对象数据,如下所示。

private final FastThreadLocal<String> threadLocal = new FastThreadLocal<String>() {
    @Override
    protected String initialValue() {
        return "hello world";
    }
};

构造完用户对象数据之后,接下来就会将它填充到数组 index 的位置,然后再把当前 FastThreadLocal 对象保存到待清理的 Set 中。整个过程我们在分析 FastThreadLocal.set() 时都已经介绍过,就不再赘述了。 

标签:Netty,FastThreadLocal,浅谈,index,InternalThreadLocalMap,value,ThreadLocal,数组
来源: https://blog.csdn.net/qq_29569183/article/details/121052562

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有