0%

深入理解Recycler

Recycler 是 Netty 实现的一个轻量级对象回收站,它的作用是保证对象的循环利用。对象使用完可以通过Recycler回收,需要再次使用则从对象池中取出,不用每次都创建新对象,从而减少对系统资源的占用,同时也减轻了GC的压力。

源码中的描述是:Recycler 是基于线程本地堆栈的轻量级对象池。

使用示例

回收再利用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class RecyclerTest {

private static final Recycler<User> RECYCLER = new Recycler<User>() {
@Override
protected User newObject(Handle<User> handle) {
return new User(handle);
}
};

static class User {
private final Recycler.Handle<User> handle;

public User(Recycler.Handle<User> handle) {
this.handle = handle;
}

public void recycle() {
handle.recycle(this);
}
}

public static void main(String[] args) {
User user1 = RECYCLER.get();
user1.recycle();
User user2 = RECYCLER.get();
user2.recycle();
System.out.println(user1 == user2);
}
}

输出:true

源码分析

在 Recycler 类的源码中,我们会看到这样一段逻辑:

1
2
3
4
5
6
7
8
9
10
11
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this,
Thread.currentThread(),
maxCapacityPerThread,
maxSharedCapacityFactor,
interval,
maxDelayedQueuesPerThread,
delayedQueueInterval);
}

这段逻辑用于保存线程共享对象,而这里的共享对象,就是一个 Stack 类型的对象。每个 Stack 中都维护着一个DefaultHandle 类型的数组,用于盛放回收的对象。Stack 和线程的关系如下图所示:

在每个 Recycler 中,都维护着一个线程共享的栈,用于对一类对象的回收。

Stack 的构造方法和相关属性如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private static final class Stack<T> {

final Recycler<T> parent;

final WeakReference<Thread> threadRef;
final AtomicInteger availableSharedCapacity;
private final int maxDelayedQueues;

private final int maxCapacity;
private final int interval;
private final int delayedQueueInterval;
DefaultHandle<?>[] elements;
int size;
private int handleRecycleCount;
private WeakOrderQueue cursor, prev;
// 非threadRef指向的线程回收的对象
private volatile WeakOrderQueue head;

Stack(Recycler<T> parent,
Thread thread, int maxCapacity, int maxSharedCapacityFactor,
int interval, int maxDelayedQueues, int delayedQueueInterval) {
this.parent = parent;
threadRef = new WeakReference<Thread>(thread);
this.maxCapacity = maxCapacity;
availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
this.interval = interval;
this.delayedQueueInterval = delayedQueueInterval;
handleRecycleCount = interval;
this.maxDelayedQueues = maxDelayedQueues;
}
// ...
}
  • parent:表示 Recycler 对象自身
  • threadRef:表示当前 Stack 绑定的线程
  • availableSharedCapacity:表示在线程A中创建的对象,在其他线程中缓存的最大个数。默认值是32768 / 2,也就是16384
  • maxDelayedQueues:表示该线程能回收的线程对象的最大值。默认值为CPU核数×2。
  • maxCapacity:表示当前 Stack 的最大容量,默认值为32768。
  • interval:用来控制对象回收的频率,也就是说每次通过Recycler回收对象的时候,不是每次都会进行回收,而是通过该参数控制回收频率。默认值为7。
  • delayedQueueInterval
  • elements:表示 Stack 中存储的对象,类型为 DefaultHandle,可以被外部对象引用,从而实现回收。
  • size: elements的长度
  • handleRecycleCount:表示当前 Stack 回收了多少次对象
  • cursor、prev、head:指向 WeakOrderQueue,用于存放其他线程的对象

获取对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public final T get() {
if (maxCapacityPerThread == 0) { // 判断Stack最大能缓存的对象个数是否为0
return newObject((Handle<T>) NOOP_HANDLE); // NOOP_HANDLE 是一个常量
}
Stack<T> stack = threadLocal.get();
DefaultHandle<T> handle = stack.pop(); // 从Stack中pop出一个Handle元素
if (handle == null) { // 第一次执行到这里则为空
handle = stack.newHandle(); // 新建一个handle对象
handle.value = newObject(handle); // 给handle对象赋值
}
return (T) handle.value;
}
// stack的pop方法
DefaultHandle<T> pop() {
int size = this.size; // stack 的对象数
if (size == 0) {
if (!scavenge()) { // 异线程回收对象
return null;
}
size = this.size;
if (size <= 0) {
// double check, avoid races
return null;
}
}
size --;
DefaultHandle ret = elements[size];
elements[size] = null;
// As we already set the element[size] to null we also need to store the updated size before we do
// any validation. Otherwise we may see a null value when later try to pop again without a new element
// added before.
this.size = size;

if (ret.lastRecycledId != ret.recycleId) {
throw new IllegalStateException("recycled multiple times");
}
ret.recycleId = 0;
ret.lastRecycledId = 0;
return ret;
}
// stack的newHandle方法
DefaultHandle<T> newHandle() {
return new DefaultHandle<T>(this);
}
private static final class DefaultHandle<T> implements Handle<T> {
int lastRecycledId;
int recycleId;

boolean hasBeenRecycled;

Stack<?> stack;
Object value;

DefaultHandle(Stack<?> stack) {
this.stack = stack;
}
}

相同线程内的对象回收

按示例的代码回收的流程分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public void recycle() {
handle.recycle(this);
}
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}

Stack<?> stack = this.stack;
if (lastRecycledId != recycleId || stack == null) {
throw new IllegalStateException("recycled already");
}

stack.push(this); // 将handle push到Stack中
}
void push(DefaultHandle<?> item) {
Thread currentThread = Thread.currentThread();
if (threadRef.get() == currentThread) {
// 如果当前线程和创建Stack的时候保存的线程是否是同一线程
// 则执行pushNow()方法将对象放入Stack中
pushNow(item);
} else {
// The current Thread is not the one that belongs to the Stack
// (or the Thread that belonged to the Stack was collected already), we need to signal that the push
// happens later.
// 不同线程回收则执行 pushLaster方法
pushLater(item, currentThread);
}
}
private void pushNow(DefaultHandle<?> item) {
// 第一次回收 recycleId 和 lastRecycledId 都为0
if ((item.recycleId | item.lastRecycledId) != 0) {
throw new IllegalStateException("recycled already");
}
// recycleId和lastRecycledId赋值为OWN_THREAD_ID
// OWN_THREAD_ID在每一个recycle中都是唯一固定的(本质上就是一个自增的AtomicInteger)
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;

int size = this.size;
// 判断size是否超过上限 或者 dropHandle返回为true
if (size >= maxCapacity || dropHandle(item)) {
// Hit the maximum capacity or should drop - drop the possibly youngest object.
return;
}
if (size == elements.length) { // 数组扩容并且迁移数据
elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
}

elements[size] = item;
this.size = size + 1;
}
boolean dropHandle(DefaultHandle<?> handle) {
if (!handle.hasBeenRecycled) { // 表示当前对象之前没有回收过
// handleRecycleCount 表示当前位置Stack回收了多少次对象(不代表回收了多少个对象)
if (handleRecycleCount < interval) {
handleRecycleCount++;
// Drop the object.
return true;
}
handleRecycleCount = 0;
handle.hasBeenRecycled = true;
}
return false;
}

不同线程间的对象回收

创建对象和回收对象不在同一条线程的情况下对象回收的逻辑不会放在当前线程的Stack中的,而是放在一个WeakOrderQueue 的数据结构。

上面分析相同线程内的对象回收时执行 pushNow 方法,不同线程回收则执行 pushLaster 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private void pushLater(DefaultHandle<?> item, Thread thread) {
if (maxDelayedQueues == 0) {
// We don't support recycling across threads and should just drop the item on the floor.
return;
}

// 每个线程都维护了一个WeakHashMap对象,不同的Stack对应不同的WeakOrderQueue
// 比如线程1创建了一个对象,在线程3进行了回收;线程2创建了一个对象,同样也在线程3进行了回收,那么线程3对应的WeakHashMap中保存了两组关系:线程1对应的Stack和WeakOrderQueue,以及线程2对应的Stack和WeakOrderQueue。
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
WeakOrderQueue queue = delayedRecycled.get(this);
if (queue == null) { // 为空则说明当前线程没有回收过该Stack所属线程的对象
// delayedRecycled.size() 表示当前线程回收其他创建对象的线程的个数
if (delayedRecycled.size() >= maxDelayedQueues) {
// 当前线程不能再回收其他线程的对象了, DUMMY为缺省值,可以理解为不可用
delayedRecycled.put(this, WeakOrderQueue.DUMMY);
return;
}
// 创建WeakOrderQueue,创建对象的线程对应的Stack
if ((queue = newWeakOrderQueue(thread)) == null) {
// drop object
return;
}
delayedRecycled.put(this, queue); // 将Stack和WeakOrderQueue进行关联
} else if (queue == WeakOrderQueue.DUMMY) {
// drop object
return;
}

queue.add(item);
}
static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
// We allocated a Link so reserve the space
if (!Head.reserveSpaceForLink(stack.availableSharedCapacity)) {
return null;
}
final WeakOrderQueue queue = new WeakOrderQueue(stack, thread);
// Done outside of the constructor to ensure WeakOrderQueue.this does not escape the constructor and so
// may be accessed while its still constructed.
stack.setHead(queue);

return queue;
}
// 判断线程的Stack还能不能分配LINK_CAPACITY个元素
static boolean reserveSpaceForLink(AtomicInteger availableSharedCapacity) {
for (;;) {
int available = availableSharedCapacity.get();
if (available < LINK_CAPACITY) { // LINK_CAPACITY 默认为16
return false;
}
if (availableSharedCapacity.compareAndSet(available, available - LINK_CAPACITY)) {
return true;
}
}
}
坚持原创技术分享,您的支持将鼓励我继续创作!