0%

深入理解FastThreadLocal

FastThreadLocalThreadLocal 的一个变体,使用 FastThreadLocal 可获得更高的访问性能。

FastThreadLocal 需使用 FastThreadLocalThread 或其子类线程操作否则会更慢。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class FastThreadLocalTest {

private static FastThreadLocal<Integer> fastThreadLocal = new FastThreadLocal<>();

public static void main(String[] args) {
new FastThreadLocalThread(() -> {
for (int i = 0; i < 10; i++) {
fastThreadLocal.set(i);
System.out.println(Thread.currentThread().getName() + "====" + fastThreadLocal.get());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "fastThreadLocal1").start();


new FastThreadLocalThread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + "====" + fastThreadLocal.get());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "fastThreadLocal2").start();
}
}

输出:

1
2
3
4
5
6
fastThreadLocal1====0
fastThreadLocal2====null
fastThreadLocal2====null
fastThreadLocal1====1
fastThreadLocal2====null
fastThreadLocal1====2

源码分析

FastThreadLocal 使用了数组中的常量索引代替了 ThreadLocal 类中的 哈希和哈希表 实现,在请求频繁时这个改动效果会比较明显。使用的线程必须是 FastThreadLocalThread 或其子类。

1
2
3
4
5
6
7
8
9
10
11
public class FastThreadLocal<V> {

private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();

private final int index;

public FastThreadLocal() {
// 这里其实就是 AtomicInteger.getAndIncrement();
index = InternalThreadLocalMap.nextVariableIndex();
}
}

创建第一个 FastThreadLocal 对象时,nextIndex为0;

创建第二个 FastThreadLocal 对象时,nextIndex为1;

get方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// FastThreadLocal的get()方法
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
Object v = threadLocalMap.indexedVariable(index);
// UNSET 是一个常量 public static final Object UNSET = new Object();
if (v != InternalThreadLocalMap.UNSET) { // 不为空
return (V) v;
}

return initialize(threadLocalMap);
}
// InternalThreadLocalMap的get()方法
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
// 如果是FastThreadLocalThread则直接获取该thread的内部变量threadLocalMap
return fastGet((FastThreadLocalThread) thread);
} else {
// 否则使用一个普通Thread Local
return slowGet();
}
}
// InternalThreadLocalMap的fastGet()方法
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
// 如果当前thread的threadLocalMap还是空的,就创建一个
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
// FastThreadLocalThread的threadLocalMap()方法
public final InternalThreadLocalMap threadLocalMap() {
return threadLocalMap; // FastThreadLocalThread的threadLocalMap属性
}
// InternalThreadLocalMap的构造方法
private InternalThreadLocalMap() {
super(newIndexedVariableTable());
}
// InternalThreadLocalMap的newIndexedVariableTable()方法
private static Object[] newIndexedVariableTable() {
Object[] array = new Object[INDEXED_VARIABLE_TABLE_INITIAL_SIZE]; // 数组长度默认32
Arrays.fill(array, UNSET); // 使用 UNSET 常量填充数组
return array;
}
// UnpaddedInternalThreadLocalMap的构造方法
// InternalThreadLocalMap初始化时会调用该方法
UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
// indexedVariables 实际上是一个 Object[],用于保存变量
this.indexedVariables = indexedVariables;
}
// InternalThreadLocalMap的slowGet()方法
// UnpaddedInternalThreadLocalMap.slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
private static InternalThreadLocalMap slowGet() {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret); // JDK的ThreadLocal
}
return ret;
}
// InternalThreadLocalMap的indexedVariable()方法
// 获取 indexedVariables[index] 处存储的元素
public Object indexedVariable(int index) {
Object[] lookup = indexedVariables;
return index < lookup.length? lookup[index] : UNSET;
}
// FastThreadLocal的initialize()方法
private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
v = initialValue();
} catch (Exception e) {
PlatformDependent.throwException(e);
}

threadLocalMap.setIndexedVariable(index, v);
addToVariablesToRemove(threadLocalMap, this);
return v;
}

如果使用的是 FastThreadLocalThread 或其子类,才能获取到线程中的 threadLocalMap 属性

如果使用的是普通线程,则会使用普通的 ThreadLocal

set方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// FastThreadLocal的set()方法
public final void set(V value) {
// 如果不是缺省值,则获取当前线程所绑定的InternalThreadLocalMap
if (value != InternalThreadLocalMap.UNSET) {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
setKnownNotUnset(threadLocalMap, value);
} else {
remove();
}
}
// FastThreadLocal的setKnownNotUnset()方法
private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
// 设置新的value,位置就在该threadLocal所存储的索引上,
// 添加到一个原来是空的位置上就返回true
if (threadLocalMap.setIndexedVariable(index, value)) {
// 保存旧值方便后续清理
addToVariablesToRemove(threadLocalMap, this);
}
}
// InternalThreadLocalMap的setIndexedVariable()方法
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables; // indexedVariables中存储着当前线程中所有的数据
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET; // 表明是否是第一次插入,数组中初始化的元素是UNSET
} else {
// 数组扩容,扩容后的大小是比 index 大的最小的2的幂
expandIndexedVariableTableAndSet(index, value);
return true;
}
}
// FastThreadLocalThread的addToVariablesToRemove()方法
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
// 保存 FastThreadLocal 对象的集合存在数组的 index 0 位置,variablesToRemoveIndex 恒等于 0
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
Set<FastThreadLocal<?>> variablesToRemove;
if (v == InternalThreadLocalMap.UNSET || v == null) {
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
} else {
variablesToRemove = (Set<FastThreadLocal<?>>) v;
}

variablesToRemove.add(variable);
}

remove方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// FastThreadLocal的remove()方法
public final void remove() {
remove(InternalThreadLocalMap.getIfSet());
}
// InternalThreadLocalMap的getIfSet()方法
public static InternalThreadLocalMap getIfSet() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
// 如果是FastThreadLocalThread则直接获取该thread的内部变量threadLocalMap
return ((FastThreadLocalThread) thread).threadLocalMap();
}
// 否则使用一个普通Thread Local
return slowThreadLocalMap.get();
}
// FastThreadLocal的remove()方法
public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}

Object v = threadLocalMap.removeIndexedVariable(index);
// 将该 FastThreadLocal 从待删除结合(index0 保存的集合)中删除
removeFromVariablesToRemove(threadLocalMap, this);

if (v != InternalThreadLocalMap.UNSET) {
try {
onRemoval((V) v);
} catch (Exception e) {
PlatformDependent.throwException(e);
}
}
}
// InternalThreadLocalMap的removeIndexedVariable()方法
// 将index处的元素设置为缺省值
public Object removeIndexedVariable(int index) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object v = lookup[index];
lookup[index] = UNSET;
return v;
} else {
return UNSET;
}
}

清除数据

根据 FastThreadLocal 使用示例可以看出,我们在使用完 FastThreadLocal 并没有手动 remove 数据。这是因为 FastThreadLocalThread 会自动清理数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public FastThreadLocalThread(Runnable target, String name) {
super(FastThreadLocalRunnable.wrap(target), name);
cleanupFastThreadLocals = true;
}
final class FastThreadLocalRunnable implements Runnable {
private final Runnable runnable;

private FastThreadLocalRunnable(Runnable runnable) {
this.runnable = ObjectUtil.checkNotNull(runnable, "runnable");
}

@Override
public void run() {
try {
runnable.run();
} finally {
// 执行完后清除数据
FastThreadLocal.removeAll();
}
}

static Runnable wrap(Runnable runnable) {
return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);
}
}

但如果没有使用 FastThreadLocalThread ,还是需要手动执行 removeAll() 方法,避免内存泄漏。

伪共享(False Sharing)

缓存行(通常是64个字节)是CPU同步的基本单位,缓存行隔离会比伪共享(False Sharing)效率要高

如果多个核的线程在操作同一个缓存行中的不同变量数据,那么就会出现频繁的缓存失效,即使在代码层面看这两个线程操作的数据之间完全没有关系。

这种不合理的资源竞争情况学名伪共享(False Sharing),会严重影响机器的并发执行效率。

InternalThreadLocalMap 为了进一步的提高效率在成员变量中添加了几个填充字段。这是为了防止伪共享:

1
2
3
// Cache line padding (must be public)
// With CompressedOops enabled, an instance of this class should occupy at least 128 bytes.
public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;

不过这里填充了9个long值,有点无法理解,github上也有人提出了 issues

性能测试

Netty 提供了相关的测试代码:地址

  • FastThreadLocalFastPathBenchmark
  • FastThreadLocalSlowPathBenchmark

使用 FastThreadLocalThread 操作 FastThreadLocal 吞吐量是 ThreadLocal 的3倍左右。

使用普通线程操作 FastThreadLocal 吞吐量比 ThreadLocal 还低。

小结

  1. FastThreadLocal 使用常量下标来替代 ThreadLocal 通过哈希和哈希表操作元素。
  2. FastThreadLocal 利用了缓存行的特性并使用缓存行填充解决了伪共享问题。
  3. 使用 FastThreadLocalThread 线程操作 FastThreadLocal 才会快,普通线程会更慢。
  4. 使用 FastThreadLocalThread 线程操作 FastThreadLocal 时,不需要手动执行 removeAll() 方法。
  5. 使用普通线程操作 FastThreadLocal 时,最后需要手动执行 removeAll() 方法。
坚持原创技术分享,您的支持将鼓励我继续创作!