0%

Netty池化内存机制

Netty作为一款高性能网络应用程序框架,实现了一套高性能内存管理机制。

池化内存

整体原理

Netty 向系统申请一整块连续内存,称为 chunk,默认大小 chunkSize = 16MB,通过 PoolChunk 对象包装。为了更细粒度的管理,Netty 将 chunk进一步拆分为 page,默认每个 chunk 包含 2048 个 page,pageSize = 8KB。

在同一个chunk中,Netty 将 page 按照不同粒度进行多层分组管理

  • 第1层,分组大小size = 1*pageSize,一共有2048个组
  • 第2层,分组大小size = 2*pageSize,一共有1024个组
  • 第3层,分组大小size = 4*pageSize,一共有512个组
  • 第12层,分组大小size = 2^(12-1) * pageSize = 2048 * pageSize,一共有2048个组

当请求分配内存时,将请求分配的内存数向上取值到最接近的分组大小,在该分组大小的相应层级中从左至右寻找空闲分组。例如请求分配内存对象为1.5 * pageSize,向上取值到分组大小 2 * pageSize,在该层分组中找到完全空闲的一组内存进行分配,如下图:

当分组大小 2 *pageSize 的内存分配出去后,为了方便下次内存分配,分组被标记为全部已使用(图中红色标记),向上更粗粒度的内存分组被标记为部分已使用(图中黄色标记)。

算法结构

Netty 基于平衡二叉树实现不同粒度的多层分组管理

为了方便快速查找 chunk 中能容纳请求内存的位置,算法构建一个基于byte数组(memoryMap)存储的完全平衡树,该平衡树的多个层级深度,就是前面介绍的按照不同粒度对 chunk进行多层分组:

树的深度depth从0开始计算,各层节点数,每个节点对应的内存大小如下:

1
2
3
4
5
6
depth = 0, 1 node,nodeSize = chunkSize
depth = 1, 2 nodes,nodeSize = chunkSize/2
...
depth = d, 2^d nodes, nodeSize = chunkSize/(2^d)
...
depth = maxOrder, 2^maxOrder nodes, nodeSize = chunkSize/2^{maxOrder} = pageSize

具有n个结点的完全二叉树的深度为:

树的最大深度为 maxOrder = 11(注意 depth从0开始),通过这棵树,算法在chunk中的查找就可以转换为:

当申请分配大小为 chunkSize/2^k 的内存,在平衡树高度为k的层级中,从左到右搜索第一个空闲节点。

数组的使用域从index = 1开始,将平衡树按照层次顺序依次存储在数组中:

1
2
3
depth = n 的第1个节点保存在 memoryMap[2^n] 中
depth = n 的第2个节点保存在 memoryMap[2^n+1] 中
...

下图代表已分配 chunkSize/2:

内存管理

正常对象

当申请分配大小在 (pageSize / 2, chunkSize] 区间的内存时,会首先将请求分配的内存大小归一化(向上取整为page的整数倍)。例如 8000byte 归一化为8192byte(即8KB),8193byte 归一化为16384byte(16KB)。

当分配已归一化处理后大小为 chunkSize/2^d 的内存,即需要在 depth = d 的层级中找到第一块空闲内存,将其修改为已使用后,再更新其祖先节点为部分已使用。

小对象

当申请分配内存,会首先将请求分配的内存大小归一化:

  • 当请求对象的大小为 (0, 496] ,归一化计算后方式是向上取最近的16的倍数。例如 15byte 归一化为16byte,490byte 归一化为496byte。
  • 当请求对象的大小为 (496, pageSize / 2] ,归一化的计算方式是向上取整为最接近 512,1024,2048,4096的值。例如 520byte 归一化为 1024byte。

给这些小对象直接分配一个page会造成浪费,在page中进行平衡树的标记又额外消耗更多空间,因此Netty的实现是:先 PoolChunk 中申请空闲page,同一个 page 分为相同大小规格的小内存进行存储。通过 long[] 类型的bitmap 来记录内存是否已使用。

巨型对象

当申请内存大于 chunkSize 时,Netty采用的是非池化管理策略。

弹性伸缩

PoolChunk

为了解决单个PoolChunk容量有限的问题,Netty将多个PoolChunk组成链表一起管理。

将所有PoolChunk组成一个链表的话,进行遍历查找管理效率较低,因此 Netty 设计了PoolArena实现对多个PoolChunkList管理。

Arena 内部持有6个PoolChunkList,各个PoolChunkList持有的PoolChunk的使用率区间不同:

PoolChunkList的额定使用率区间存在交叉,这样设计是因为如果基于一个临界值的话,当PoolChunk内存申请释放后的内存使用率在临界值上下徘徊的话,会导致在PoolChunkList链表前后来回移动:

PoolSubpage

PoolArena内部持有2个PoolSubpage数组,分别存储 tiny 和 small 规格类型的PoolSubpage。相同规格大小的 PoolSubpage组成链表,不同规格的PoolSubpage链表的 head 节点则分别保存在 tinySubpagePools 或者 smallSubpagePools数组中:

PoolSubpage链表中存储的 PoolSubpage都是只分配部分内存,当内存全部分配完或者内存全部释放完的 PoolSubpage会移出链表,减少不必要的链表节点;当PoolSubpage内存全部分配完后再释放部分内存,会重新将加入链表。

小结

PoolArena内存池弹性伸缩可用下图总结:

并发设计

为了减少线程间的竞争,Netty会提前创建多个 PoolArena(默认生成数量 = 2 * CPU核心数),当线程首次请求池化内存分配,会找被最少线程持有的 Arena,并保存线程局部变量 PoolThreadCache 中,实现线程与 Arena 的关联绑定。(线程局部变量使用的是 FastThreadLocal,性能高于 ThreadLocal)

Netty 还设计了缓存机制提升并发性能,当请求对象内存释放,PoolArena 并没有马上释放,而是先尝试将该内存关联的 PoolChunk 和 它的偏移位置等信息存入 PoolThreadLocalCache 中的固定大小缓存队列中(如果缓存队列满了则马上释放内存);当请求内存分配,PoolArena 会优先访问 PoolThreadLocalCache 的缓存队列中是否有缓存内存可用,如果有,则直接分配,提高分配效率。

源码分析

Netty 版本 4.1.52.Final

ByteBuf

Java NIO提供了 ByteBuffer 作为它的字节容器,但是这个类使用起来过于复杂和繁琐。Netty用 ByteBuf 替代了ByteBuffer,实现了自动扩容,也更易使用。

基本结构

ByteBuf 源码中有段注释画出了结构:

1
2
3
4
5
6
7
8
* <pre>
* +-------------------+------------------+------------------+
* | discardable bytes | readable bytes | writable bytes |
* | | (CONTENT) | |
* +-------------------+------------------+------------------+
* | | | |
* 0 <= readerIndex <= writerIndex <= capacity
* </pre>
  • readerIndex:记录读指针的开始位置
  • writerIndex:记录读指针的开始位置
  • capacity:缓冲区的总长度

除了这三个指针,ByteBuf里面其实还有一个指针 maxCapacity,它相当于ByteBuf扩容的最大阈值:

1
2
3
4
5
/**
* Returns the maximum allowed capacity of this buffer. This value provides an upper
* bound on {@link #capacity()}.
*/
public abstract int maxCapacity();

这些指针将 ByteBuf 分为以下几个区域:

  • discardable bytes:从 0 到 readerIndex 为 discardable bytes,表示是无效的
  • readable bytes:从 readerIndex 到 writerIndex 为 readable bytes,表示可读数据区
  • writable bytes:从 writerIndex 到 capacity 为 writable bytes,表示这段区间空闲,可以往里面写数据

基本分类

ByteBuf 有众多子类,大致可以从以下维度来进行分类:

  • Pooled和UnPooled
    • Pooled:每次申请内存都是从预先分配好的内存空间中提取一段连续内存
    • Unpooled:每次申请内存都是新的一次申请
  • unsafe和非unsafe
    • unsafe:调用 native 方法底层直接操作内存
    • 非unsafe:通过 JDK 的 API 间接操作内存
  • Heap和Direct
    • Heap:指 JVM的堆内存
    • Direct:堆外内存,直接调用 JDK 的底层 API 进行物理内存分配,不在 JVM的堆内存中,需要手动释放

ByteBuf 最基本的读写 API 操作在 AbstractByteBuf 中已经实现了,其众多子类采用不同的策略来分配内存空间。下面是对重要的几个子类的总结:

  • PooledHeapByteBuf:池化的堆内缓冲区
  • PooledUnsafeHeapByteBuf:池化的Unsafe堆内缓冲区
  • PooledDirectByteBuf:池化的堆外缓冲区
  • PooledUnsafeDirectByteBuf:池化的Unsafe堆外缓冲区
  • UnpooledHeapByteBuf:非池化的堆内缓冲区
  • UnpooledUnsafeHeapByteBuf:非池化的Unsafe堆内缓冲区
  • UnpooledDirectByteBuf:非池化的堆外缓冲区
  • UnpooledUnsafeDirectByteBuf:非池化的Unsafe堆外缓冲区

基本操作

下面是 AbstractByteBuf 的 部分 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// 读取指定字节长度的数据
@Override
public ByteBuf readBytes(int length) {
// 检查ByteBuf是否可读取length字节的数据
checkReadableBytes(length);
if (length == 0) {
return Unpooled.EMPTY_BUFFER;
}

ByteBuf buf = alloc().buffer(length, maxCapacity);
buf.writeBytes(this, readerIndex, length);
// 移动readerIndex
readerIndex += length;
return buf;
}

/**
* Throws an {@link IndexOutOfBoundsException} if the current
* {@linkplain #readableBytes() readable bytes} of this buffer is less
* than the specified value.
*/
protected final void checkReadableBytes(int minimumReadableBytes) {
checkReadableBytes0(checkPositiveOrZero(minimumReadableBytes, "minimumReadableBytes"));
}

private void checkReadableBytes0(int minimumReadableBytes) {
ensureAccessible();// 检查是否可以访问
if (checkBounds && readerIndex > writerIndex - minimumReadableBytes) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",
readerIndex, minimumReadableBytes, writerIndex, this));
}
}

/**
* Should be called by every method that tries to access the buffers content to check
* if the buffer was released before.
*/
protected final void ensureAccessible() {
if (checkAccessible && !isAccessible()) {
throw new IllegalReferenceCountException(0);
}
}

/**
* Used internally by {@link AbstractByteBuf#ensureAccessible()} to try to guard
* against using the buffer after it was released (best-effort).
*/
boolean isAccessible() {
return refCnt() != 0; // 引用计数不等于0
}

// 将src数组中从下标 srcIndex 到 srcIndex + length - 1 的数据写入ByteBuf中
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
// 确保可以写入 length 个字节的数据
ensureWritable(length);
setBytes(writerIndex, src, srcIndex, length);
writerIndex += length;
return this;
}

@Override
public ByteBuf ensureWritable(int minWritableBytes) {
ensureWritable0(checkPositiveOrZero(minWritableBytes, "minWritableBytes"));
return this;
}

final void ensureWritable0(int minWritableBytes) {
final int writerIndex = writerIndex();
final int targetCapacity = writerIndex + minWritableBytes;
if (targetCapacity <= capacity()) {
// 如果写入之后的 writerIndex 小于容量 capacity
ensureAccessible();// 检查是否可以访问
return;
}
if (checkBounds && targetCapacity > maxCapacity) {
ensureAccessible(); // 如果写入之后的 writerIndex 超出最大容量 maxCapacity
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// 到这里说明容量不够,需要扩容,新容量会是2的n次方
// Normalize the target capacity to the power of 2.
final int fastWritable = maxFastWritableBytes(); // 当前可写的容量
// 这里 fastWritable >= minWritableBytes 不会成立
int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable
: alloc().calculateNewCapacity(targetCapacity, maxCapacity);

// Adjust to the new capacity.
capacity(newCapacity); // 调整容量
}

// AbstractByteBufAllocator的calculateNewCapacity()方法
// 总感觉这里扩容实现的不是特别好,可能是我在第0层吧
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
checkPositiveOrZero(minNewCapacity, "minNewCapacity");
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
final int threshold = CALCULATE_THRESHOLD; // 4 MiB page

if (minNewCapacity == threshold) {
return threshold;
}

// If over threshold, do not double but just increase by threshold.
if (minNewCapacity > threshold) {
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}

// Not over threshold. Double up to 4 MiB, starting from 64.
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}

return Math.min(newCapacity, maxCapacity);
}

ByteBufAllocator

Netty 中内存分配有一个顶层的抽象就是 ByteBufAllocator,负责分配所有 ByteBuf 类型的内存。

ByteBufAllocator 有如下几个重要的方法:

  • buffer():分配一块内存,自动判断是分配堆外内存还是堆内存
  • ioBuffer():尽可能分配一块堆外内存,如果系统不支持则分配堆内存
  • heapBuffer():分配一块堆内存
  • directBuffer():分配一块堆外内存
  • compositeBuffer():组合分配,把多个ByteBuf组合到一起变成一个整体

基本操作

AbstractByteBufAllocator 部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* Instance use heap buffers by default
*/
protected AbstractByteBufAllocator() {
this(false); // 默认使用堆内存
}

protected AbstractByteBufAllocator(boolean preferDirect) {
directByDefault = preferDirect && PlatformDependent.hasUnsafe();
emptyBuf = new EmptyByteBuf(this);
}

// 分配一个内存,并指定初始容量和最大容量(自动判断是分配堆外内存还是堆内存)
@Override
public ByteBuf buffer(int initialCapacity, int maxCapacity) {
if (directByDefault) {
return directBuffer(initialCapacity, maxCapacity);
}
return heapBuffer(initialCapacity, maxCapacity);
}

// 分配一个堆外内存,并指定初始容量和最大容量
@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
validate(initialCapacity, maxCapacity);
return newDirectBuffer(initialCapacity, maxCapacity);
}

/**
* 抽象方法,由子类实现
* Create a direct {@link ByteBuf} with the given initialCapacity and maxCapacity.
*/
protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity);

// 分配一个堆内存,并指定初始容量和最大容量
@Override
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
validate(initialCapacity, maxCapacity);
return newHeapBuffer(initialCapacity, maxCapacity);
}

/**
* 抽象方法,由子类实现
* Create a heap {@link ByteBuf} with the given initialCapacity and maxCapacity.
*/
protected abstract ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity);

到这里知道了 directBuffer、heapBuffer 和 Pooled、Unpooled的分配规则,那么Unsafe和非Unsafe是如何判别的呢?其实是Netty自动判别的。如果操作系统底层支持 Unsafe 那就采用 Unsafe 读写,否则采用非 Unsafe 读写。

我们可以从UnpooledByteBufAllocator的源码中验证

非池化内存分配

UnpooledByteBufAllocator 实现了非池化内存分配策略

堆内存的分配

相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
return PlatformDependent.hasUnsafe() ?
new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}

public final class PlatformDependent {
private static final Throwable UNSAFE_UNAVAILABILITY_CAUSE = unsafeUnavailabilityCause0();

public static boolean hasUnsafe() {
return UNSAFE_UNAVAILABILITY_CAUSE == null;
}

private static Throwable unsafeUnavailabilityCause0() {
if (isAndroid()) {
logger.debug("sun.misc.Unsafe: unavailable (Android)");
return new UnsupportedOperationException("sun.misc.Unsafe: unavailable (Android)");
}

if (isIkvmDotNet()) {
logger.debug("sun.misc.Unsafe: unavailable (IKVM.NET)");
return new UnsupportedOperationException("sun.misc.Unsafe: unavailable (IKVM.NET)");
}

Throwable cause = PlatformDependent0.getUnsafeUnavailabilityCause();
if (cause != null) {
return cause;
}

try {
boolean hasUnsafe = PlatformDependent0.hasUnsafe();
logger.debug("sun.misc.Unsafe: {}", hasUnsafe ? "available" : "unavailable");
return hasUnsafe ? null : PlatformDependent0.getUnsafeUnavailabilityCause();
} catch (Throwable t) {
logger.trace("Could not determine if Unsafe is available", t);
// Probably failed to initialize PlatformDependent0.
return new UnsupportedOperationException("Could not determine if Unsafe is available", t);
}
}
}

通过调用 PlatformDependent.hasUnsafe() 方法来判断操作系统是否支持 Unsafe

  • 如果支持Unsafe则创建 InstrumentedUnpooledUnsafeHeapByteBuf
  • 如果不支持Unsafe则创建 InstrumentedUnpooledHeapByteBuf
InstrumentedUnpooledUnsafeHeapByteBuf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private static final class InstrumentedUnpooledUnsafeHeapByteBuf extends UnpooledUnsafeHeapByteBuf {
InstrumentedUnpooledUnsafeHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}

@Override
protected byte[] allocateArray(int initialCapacity) {
byte[] bytes = super.allocateArray(initialCapacity);
// 记录新建的堆空间大小
((UnpooledByteBufAllocator) alloc()).incrementHeap(bytes.length);
return bytes;
}
}

public class UnpooledUnsafeHeapByteBuf extends UnpooledHeapByteBuf {
public UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}

@Override
protected byte[] allocateArray(int initialCapacity) {
// 其实就是通过 unsafe 创建byte[]
return PlatformDependent.allocateUninitializedArray(initialCapacity);
}
}

public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
private final ByteBufAllocator alloc;
byte[] array;
private ByteBuffer tmpNioBuf;

public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);

if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}

this.alloc = checkNotNull(alloc, "alloc");
// 调用的是 InstrumentedUnpooledUnsafeHeapByteBuf 的 allocateArray 方法
setArray(allocateArray(initialCapacity));
setIndex(0, 0);
}
private void setArray(byte[] initialArray) {
array = initialArray;
tmpNioBuf = null;
}
@Override
public ByteBuf setIndex(int readerIndex, int writerIndex) {
if (checkBounds) { // 确保 0 <= readerIndex <= writerIndex <= capacity
checkIndexBounds(readerIndex, writerIndex, capacity());
}
// 调用AbstractByteBuf的方法设置 readerIndex 和 writerIndex
setIndex0(readerIndex, writerIndex);
return this;
}
}
InstrumentedUnpooledHeapByteBuf
1
2
3
4
5
6
7
8
9
10
private static final class InstrumentedUnpooledHeapByteBuf extends UnpooledHeapByteBuf {
InstrumentedUnpooledHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
}
public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
protected byte[] allocateArray(int initialCapacity) {
return new byte[initialCapacity];
}
}

虽然 InstrumentedUnpooledUnsafeHeapByteBufInstrumentedUnpooledUnsafeHeapByteBuf最终调用的都是 UnpooledHeapByteBuf 的构造方法,但前者是通过 unsafe 的方式创建字节数组,而后者是通过 new byte[Size] 的方式创建数组。其实不仅是创建数组,读写操作也是这样,前者是通过 unsafe 操作数据,后者则是直接通过数组下标操作数组。

小结

分配内存时,根据操作系统是否支持 Unsafe:

  • 如果支持 unsafe:则通过 unsafe 的方式创建字节数组(读写数组也是通过 unsafe的方式)
  • 如果不支持 unsafe:则使用 new byte[Size] 关键字创建字节数组(读写数组通过索引操作)

释放内存则依赖 JVM 自动释放

堆外内存的分配

相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
noCleaner = tryNoCleaner && PlatformDependent.hasUnsafe()
&& PlatformDependent.hasDirectBufferNoCleanerConstructor();
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
final ByteBuf buf;
if (PlatformDependent.hasUnsafe()) {
buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}

通过调用 PlatformDependent.hasUnsafe() 方法来判断操作系统是否支持 Unsafe

  • 如果支持 Unsafe,判断 noCleaner
    • noCleaner = true,则创建 InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf
    • noCleaner = false,则创建 InstrumentedUnpooledUnsafeDirectByteBuf
  • 如果不支持 Unsafe则创建 InstrumentedUnpooledDirectByteBuf
InstrumentedUnpooledDirectByteBuf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
private static final class InstrumentedUnpooledDirectByteBuf extends UnpooledDirectByteBuf {
InstrumentedUnpooledDirectByteBuf(
UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
@Override
protected ByteBuffer allocateDirect(int initialCapacity) {
// 最终就是 new DirectByteBuffer(capacity);
ByteBuffer buffer = super.allocateDirect(initialCapacity);
((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity());
return buffer;
}

@Override
protected void freeDirect(ByteBuffer buffer) {
int capacity = buffer.capacity();
// 最终就是 调用 DirectByteBuffer本身的cleaner去释放内存
// DirectByteBuffer的cleaner 本质上也是使用 unsafe.freeMemory(address);
super.freeDirect(buffer);
((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity);
}
}
public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf {
ByteBuffer buffer; // accessed by UnpooledUnsafeNoCleanerDirectByteBuf.reallocateDirect()
private ByteBuffer tmpNioBuf;

public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
ObjectUtil.checkNotNull(alloc, "alloc");
checkPositiveOrZero(initialCapacity, "initialCapacity");
checkPositiveOrZero(maxCapacity, "maxCapacity");
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}5

this.alloc = alloc;
// 调用的是 InstrumentedUnpooledDirectByteBuf 的 allocateArray 方法
setByteBuffer(allocateDirect(initialCapacity), false);
}
void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
if (tryFree) {
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null) {
if (doNotFree) {
doNotFree = false;
} else {
freeDirect(oldBuffer);
}
}
}

this.buffer = buffer; // buffer赋值
tmpNioBuf = null;
capacity = buffer.remaining();
}
}
InstrumentedUnpooledUnsafeDirectByteBuf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private static final class InstrumentedUnpooledUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
InstrumentedUnpooledUnsafeDirectByteBuf(
UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
@Override
protected ByteBuffer allocateDirect(int initialCapacity) {
// 最终就是 new DirectByteBuffer(capacity);
ByteBuffer buffer = super.allocateDirect(initialCapacity);
((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity());
return buffer;
}

@Override
protected void freeDirect(ByteBuffer buffer) {
int capacity = buffer.capacity();
// 最终就是 调用 DirectByteBuffer本身的cleaner去释放内存
// DirectByteBuffer的cleaner 本质上也是使用 unsafe.freeMemory(address);
super.freeDirect(buffer);
((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity);
}
}
public class UnpooledUnsafeDirectByteBuf extends UnpooledDirectByteBuf {
public UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
}
public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf {
public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
ObjectUtil.checkNotNull(alloc, "alloc");
checkPositiveOrZero(initialCapacity, "initialCapacity");
checkPositiveOrZero(maxCapacity, "maxCapacity");
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}

this.alloc = alloc;
// 调用的是 InstrumentedUnpooledUnsafeDirectByteBuf 的 allocateDirect 方法
setByteBuffer(allocateDirect(initialCapacity), false);
}
}

这里发现 InstrumentedUnpooledDirectByteBufInstrumentedUnpooledUnsafeDirectByteBuf都是通过new DirectByteBuffer(capacity) 的方式创建堆外内存,并且释放内存也是通过 DirectByteBuffer 的 cleaner 释放,那它们的区别是什么呢?其实就是读写数据的方式不同

InstrumentedUnpooledDirectByteBuf 的父类 UnpooledDirectByteBuf

1
2
3
4
5
6
7
8
9
10
@Override
protected byte _getByte(int index) {
return buffer.get(index);
}

class DirectByteBuffer extends MappedByteBuffer implements DirectBuffer{
public byte get(int i) {
return ((unsafe.getByte(ix(checkIndex(i)))));
}
}

UnpooledDirectByteBuf 通过 DirectByteBuffer 本身的方法来操作数据,不过 DirectByteBuffer 本身也是使用 unsafe 来操作数据的。

InstrumentedUnpooledUnsafeDirectByteBuf的父类 UnpooledUnsafeDirectByteBuf

1
2
3
4
5
6
7
8
9
10
@Override
public byte getByte(int index) {
checkIndex(index);
return _getByte(index);
}
@Override
protected byte _getByte(int index) {
// 最终就是 UNSAFE.getByte(address);
return UnsafeByteBufUtil.getByte(addr(index));
}

UnpooledUnsafeDirectByteBuf 是直接使用的 unsafe 来操作数据的。

InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private static final class InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf
extends UnpooledUnsafeNoCleanerDirectByteBuf {
InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(
UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}

@Override
protected ByteBuffer allocateDirect(int initialCapacity) {
// 通过反射初始化一个 cleaner=null 的 DirectByteBuffer
ByteBuffer buffer = super.allocateDirect(initialCapacity);
((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity());
return buffer;
}

@Override
ByteBuffer reallocateDirect(ByteBuffer oldBuffer, int initialCapacity) {
int capacity = oldBuffer.capacity();
// 最终就是 调用 UNSAFE.freeMemory(address);
ByteBuffer buffer = super.reallocateDirect(oldBuffer, initialCapacity);
((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity() - capacity);
return buffer;
}
}
class UnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
// UnpooledUnsafeNoCleanerDirectByteBuf 继承了 UnpooledUnsafeDirectByteBuf
// 没有重写读写操作,都是使用的 UnpooledUnsafeDirectByteBuf 中的方法
}
小结

分配内存时

  • 如果操作系统不支持 unsafe 或 noCleaner = false,则通过 new DirectByteBuffer(capacity) 的方式创建带有 cleaner 的 DirectByteBuffer
  • 不过操作系统支持 unsafe 并且 noCleaner = true,则通过反射创建 cleaner 为空的 DirectByteBuffer

释放内存时

  • 如果创建的 DirectByteBuffer 带有 cleaner,使用 DirectByteBuffer本身的cleaner释放
  • 如果创建的 DirectByteBuffer 没有 cleaner,则使用 unsafe 释放内存

池化内存的分配

PooledByteBufAllocator 实现了池化内存分配策略

由于高版本 Netty 池化内存分配策略有所变更,这部分源码参考 Netty 版本 4.1.42.Final

PooledByteBufAllocator

构造方法的相关参数如下:

  • preferDirect:优先分配直接内存,默认值是 false。
  • nHeapArena:HeapArena的数量,默认值是 CPU核数 * 2。
  • nDirectArena:DirectArena的数量,默认值是 CPU核数 * 2。
  • pageSize:page 的个数,默认值是 8192。
  • maxOrder:二叉树的最大深度,默认值是 11。
  • tinyCacheSize:tiny 类型的缓存列表容量,默认值是512。
  • smallCacheSize:small 类型的缓存列表容量,默认值是 256。
  • normalCacheSize:normal 类型的缓存列表容量,默认值是 64。
  • useCacheForAllThreads:默认值是 true。
  • directMemoryCacheAlignment:默认值是 0。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 去掉了一些非核心的代码
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena,
int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize,
int normalCacheSize,
boolean useCacheForAllThreads,
int directMemoryCacheAlignment) {
super(preferDirect);
threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
this.tinyCacheSize = tinyCacheSize; // 默认是512
this.smallCacheSize = smallCacheSize; // 默认是256
this.normalCacheSize = normalCacheSize; // 默认是64
// 校验并计算chunkSize的大小,默认是 8192 * (2 ^ 11) = 1677216 byte = 16MB
chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
// 校验并计算pageShifts的大小,默认是 13
int pageShifts = validateAndCalculatePageShifts(pageSize);
// 初始化HeapArenas
if (nHeapArena > 0) {
// 相当于 new PoolArena[nHeapArena]
heapArenas = newArenaArray(nHeapArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
for (int i = 0; i < heapArenas.length; i ++) {
PoolArena.HeapArena arena = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize,directMemoryCacheAlignment);
heapArenas[i] = arena;
metrics.add(arena);
}
heapArenaMetrics = Collections.unmodifiableList(metrics);
} else {
heapArenas = null;
heapArenaMetrics = Collections.emptyList();
}
// 初始化DirectArenas
if (nDirectArena > 0) {
// 相当于 new PoolArena[nDirectArena]
directArenas = newArenaArray(nDirectArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
for (int i = 0; i < directArenas.length; i ++) {
PoolArena.DirectArena arena = new PoolArena.DirectArena(
this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
directArenas[i] = arena;
metrics.add(arena);
}
directArenaMetrics = Collections.unmodifiableList(metrics);
} else {
directArenas = null;
directArenaMetrics = Collections.emptyList();
}
metric = new PooledByteBufAllocatorMetric(this);
}

HeapArena 和 DirectArena 分别为 CPU核数 * 2,目的就是保证Netty中的每一个任务线程都可以有一个独享的Arena,保证在每个线程分配内存的时候不用加锁

PooledByteBufAllocator 实现分配内存的两个方法:newDirectBuffer() 方法和newHeapBuffer() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get(); // PoolThreadLocalCache threadCache;
PoolArena<byte[]> heapArena = cache.heapArena;

final ByteBuf buf;
if (heapArena != null) {
// PoolArena的allocate方法
buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}

return toLeakAwareBuffer(buf);
}

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get(); // PoolThreadLocalCache threadCache;
PoolArena<ByteBuffer> directArena = cache.directArena;

final ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}

return toLeakAwareBuffer(buf);
}

可以发现这两个方法大体结构是一样的,以 newDirectBuffer 为例:

  1. 通过 threadCache.get() 方法获取一个类型为 PoolThreadCache 的 cache 对象
  2. 通过 cache 获得 directArena 对象
  3. 调用 directArena.allocate() 方法分配 ByteBuf

PoolThreadLocalCache

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
private final boolean useCacheForAllThreads;

PoolThreadLocalCache(boolean useCacheForAllThreads) {
this.useCacheForAllThreads = useCacheForAllThreads; // 默认是true
}

@Override
protected synchronized PoolThreadCache initialValue() {
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

final Thread current = Thread.currentThread();
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
final PoolThreadCache cache = new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);

if (DEFAULT_CACHE_TRIM_INTERVAL_MILLIS > 0) {
final EventExecutor executor = ThreadExecutorMap.currentExecutor();
if (executor != null) {
executor.scheduleAtFixedRate(trimTask, DEFAULT_CACHE_TRIM_INTERVAL_MILLIS,
DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
}
}
return cache;
}
// No caching so just use 0 as sizes.
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}
// ...
}
// 从 arenas 中获取使用率最少的一个Areana
private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
if (arenas == null || arenas.length == 0) {
return null;
}
PoolArena<T> minArena = arenas[0];
for (int i = 1; i < arenas.length; i++) {
PoolArena<T> arena = arenas[i];
// final AtomicInteger numThreadCaches = new AtomicInteger();
if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
minArena = arena;
}
}
return minArena;
}
  1. 通过 leastUsedArena() 方法分别获得使用率最少的 HeapArena 和 DirectArena 对象
  2. 然后把 HeapArena 和 DirectArena 对象作为参数传递到 PoolThreadCache 的构造器中

PoolThreadCache除了可以在 HeapArena 和 DirectArena 上进行内存分配,还可以在它底层维护的 ByteBuf 缓存列表进行分配。

举个例子:我们通过 PooledByteBufAllocator 创建了一个1024字节的ByteBuf,当用完释放后,可能在其他地方会继续分配1024字节的ByteBuf。这时,其实不需要在 Arena 上进行内存分配,而是直接通过 PoolThreadCache中维护的 ByteBuf 的缓存列表直接拿过来返回。

PooledByteBufAllocator 维护着三种规格大小的缓存列表,分别是:tiny,small,normal,其容量分别为 512,256,64。

通过这种方式,Netty 预创建了固定规格的内存池,大大提高了内存分配的性能。

DirectArena内存分配流程

HeapArena 和 DirectArena 分配内存的基本流程有三个步骤。

  1. 从对象池里获得 PooledByteBuf 进行复用
  2. 从缓存中进行内存分配
  3. 从缓存中分配失败,则从内存堆里进行内存分配
对象池获取 ByteBuf

以 DirectBuffer 为例,首先来看从对象池里获得 PooledByteBuf 进行复用的情况。

PooledByteBufAllocator 的 newDirectBuffer() 的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;

final ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}

return toLeakAwareBuffer(buf);
}

PoolArena 的 allocate方法:

1
2
3
4
5
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
allocate(cache, buf, reqCapacity);
return buf;
}

DirectArena 的 newByteBuf方法:

1
2
3
4
5
6
7
8
9
10
@Override
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
if (HAS_UNSAFE) { // 判断是否支持Unsafe
// 创建的是 PooledUnsafeDirectByteBuf
return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
} else {
// 创建的是 PooledDirectByteBuf
return PooledDirectByteBuf.newInstance(maxCapacity);
}
}

首先判断是否支持Unsafe:

  • 支持 Unsafe 创建的是 PooledUnsafeDirectByteBuf,它通过Unsafe方法操作数组。
  • 不支持 Unsafe 创建的是 PooledDirectByteBuf,它通过数组索引操作数组。

默认情况下一般是支持Unsafe的,PooledUnsafeDirectByteBuf 的 newInstance方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// PooledUnsafeDirectByteBuf的RECYCLER属性
private static final ObjectPool<PooledUnsafeDirectByteBuf> RECYCLER = ObjectPool.newPool(
new ObjectCreator<PooledUnsafeDirectByteBuf>() {
@Override
public PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {
// 当Recycler里面没有可用的buf时就会创建一个新的buf
return new PooledUnsafeDirectByteBuf(handle, 0);
}
});
// PooledUnsafeDirectByteBuf的newInstance方法
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
PooledUnsafeDirectByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity); // 让所有的参数重新归为初始状态
return buf;
}
// PooledByteBuf的reuse方法
final void reuse(int maxCapacity) {
maxCapacity(maxCapacity);
resetRefCnt();
setIndex0(0, 0);
discardMarks();
}

首先通过 RECYCLER(内存回收站)对象的 get() 方法获得一个 ByteBuf。当回收站里面没有可用的buf时就会创建一个新的 ByteBuf。获得的 ByteBuf 可能是回收站里取出来的,所以复用前需要重置相关属性。

内存池的内存规格

Netty 底层还有一个内存单位的封装,为了更高效地管理内存,避免内存浪费,把每一个区间的内存规格又做了细分。Netty 内存池中主要设置了四种规格大小的内存:

  • tiny:0~512Byte
  • small:512Byte~8KB
  • normal:8KB~16MB
  • huge:16MB以上

那么为什么以这几个节点来进行分段呢,其实这里面是有原因的:

  • 首先为什么以16M作为分界点,16M在 Netty 中就是一个Chunk,Netty 中所有的内存申请都是以 Chunk 为单位向系统申请的,后续的所有内存分配都是在这个 Chunk 里的操作。
  • 然后为什么又以8k作为分界点的,8K在Netty中就是一个 Page,每个Chunk 中有 2^11 个 Page 这样就能更细粒度的分配内存。
  • 最后在 0 到 8K 的内存区间中有一个 SubPage 的对象来进行精确的分配内存。

缓存的分配

缓存的数据结构

在Netty中缓存的数据结构是一个叫做 MemoryRegionCache 的类,其结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private abstract static class MemoryRegionCache<T> {
private final int size;
private final Queue<Entry<T>> queue;
private final SizeClass sizeClass;
private int allocations;

MemoryRegionCache(int size, SizeClass sizeClass) {
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
queue = PlatformDependent.newFixedMpscQueue(this.size);
this.sizeClass = sizeClass;
}
}
static final class Entry<T> {
final Handle<Entry<?>> recyclerHandle;
PoolChunk<T> chunk;
ByteBuffer nioBuffer;
long handle = -1;

Entry(Handle<Entry<?>> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

void recycle() {
chunk = null;
nioBuffer = null;
handle = -1;
recyclerHandle.recycle(this);
}
}
enum SizeClass {
Tiny,
Small,
Normal
}

MemoryRegionCache 由三部分组成:

  • queue:队列中每个节点都是一个 Entry 实体
  • sizeClass:对应Netty中的内存规格,分别是 tiny、small、normal(huge规格不用缓存分配)
  • size:队列的容量,可能是 512,256,64(对应tinyCacheSize,smallCacheSize,normalCacheSize)
  • tiny:共32种规格,均是16的整数倍,0Byte、16Byte、32Byte、48Byte、……496Byte。
  • small:4种规格,512Byte、1KB、2KB、4KB。
  • normal:3种规格,8KB、16KB、32KB。

PoolThreadCache中维护了六个缓存数组(HeapCaches省略):

  • tinySubPageDirectCaches:tiny 类型的缓存列表
  • smallSubPageDirectCaches:small 类型的缓存列表
  • normalDirectCaches:normal 类型的缓存数组

代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
final class PoolThreadCache {
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;

// Hold the caches for the different size classes, which are tiny, small and normal.
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
// ...
}

在 PoolThreadCache 的构造方法中进行了初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// 去掉了一些非核心的代码
PoolThreadCache(PoolArena<byte[]> heapArena, // 使用率最少的HeapArena
PoolArena<ByteBuffer> directArena,// 使用率最少的DirectArena
int tinyCacheSize, // 默认为512
int smallCacheSize, // 默认为216
int normalCacheSize, // 默认为64
int maxCachedBufferCapacity,
int freeSweepAllocationThreshold) {

this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
this.heapArena = heapArena;
this.directArena = directArena;
if (directArena != null) {
// PoolArena.numTinySubpagePools = 512 >>> 4 = 32
tinySubPageDirectCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
// directArena.numSmallSubpagePools = pageShifts - 9 = 13 - 9 = 4
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);

numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena);

directArena.numThreadCaches.getAndIncrement();
} else {
// No directArea is configured so just null out all caches
tinySubPageDirectCaches = null;
smallSubPageDirectCaches = null;
normalDirectCaches = null;
numShiftsNormalDirect = -1;
}
if (heapArena != null) {
// Create the caches for the heap allocations
tinySubPageHeapCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageHeapCaches = createSubPageCaches(
smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);

numShiftsNormalHeap = log2(heapArena.pageSize);
normalHeapCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, heapArena);

heapArena.numThreadCaches.getAndIncrement();
} else {
// No heapArea is configured so just null out all caches
tinySubPageHeapCaches = null;
smallSubPageHeapCaches = null;
normalHeapCaches = null;
numShiftsNormalHeap = -1;
}

// Only check if there are caches in use.
if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
|| tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
&& freeSweepAllocationThreshold < 1) {
throw new IllegalArgumentException("freeSweepAllocationThreshold: "
+ freeSweepAllocationThreshold + " (expected: > 0)");
}
}

以 tiny 类型为例,具体分析一下SubPage的缓存结构,实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
int cacheSize, int numCaches, SizeClass sizeClass) {
if (cacheSize > 0 && numCaches > 0) {
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) {
// TODO: maybe use cacheSize / cache.length
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
}
return cache;
} else {
return null;
}
}

其实就是创建了一个缓存数组,这个缓存数组的长度是 numCaches。相当于:

  • 创建了 tinySubPageDirectCaches[32],smallSubPageDirectCaches[4],normalDirectCaches[3] 三个数组
  • tinySubPageDirectCaches 中的每个元素 SubPageMemoryRegionCache 中的队列queue容量为 512
  • smallSubPageDirectCaches 中的每个元素 SubPageMemoryRegionCache 中的队列queue容量为 256
  • normalDirectCaches 中的每个元素 MemoryRegionCache 中的队列queue容量为 64
缓存的分配流程

在基本了解缓存数组的数据结构之后,继续剖析在缓存中分配内存的逻辑,回到 PoolArena 的 allocate() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
// 规格化,确保ByteBuf容量统一,向上取与MemoryRegionCache中的内存规格中最接近的值
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) { // 判断 normCapacity 是否小于 pageSize
int tableIdx;
PoolSubpage<T>[] table;
// 判断是不是 tiny 类型 即:normCapacity < 512
boolean tiny = isTiny(normCapacity);
if (tiny) { // tiny 类型
// 缓存分配
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// 能够从缓存中分配
return;
}
// 获取 tableIdx,即 tableIdx = normCapacity / (2 ^ 4)
tableIdx = tinyIdx(normCapacity);
// subPage 数组
table = tinySubpagePools;
} else { // small 类型
// 缓存分配
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// 能够从缓存中分配
return;
}
// 获取 tableIdx,即 2 ^ tableIdx = normCapacity / (2 ^ 10)
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
// 获取对应的节点
final PoolSubpage<T> head = table[tableIdx];

synchronized (head) {
final PoolSubpage<T> s = head.next;
// 默认情况下,head.next == head
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}

incTinySmallAllocation(tiny);
return;
}
if (normCapacity <= chunkSize) {
// 缓存分配
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
// 分配不成功,进行实际的内存分配
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// huge 则直接分配
allocateHuge(buf, reqCapacity);
}
}

PoolThreadCache 缓存分配:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, 
int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}
// 根据normCapacity找到tiny类型缓存数组中的一个缓存对象
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
// 找到tiny类型缓存数组的下标, idx = normCapacity / (2 ^ 4)
// 例如 normCapacity = 16,idx = 1; normCapacity = 32,idx = 2
int idx = PoolArena.tinyIdx(normCapacity);
if (area.isDirect()) { // 是否堆外内存
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
if (cache == null || idx > cache.length - 1) {
return null;
}
return cache[idx];
}
private boolean allocate(MemoryRegionCache<?> cache,
PooledByteBuf buf, int reqCapacity) {
if (cache == null) {
// no cache found so just return false here
return false;
}
boolean allocated = cache.allocate(buf, reqCapacity);
if (++ allocations >= freeSweepAllocationThreshold) {
allocations = 0;
trim();
}
return allocated;
}
// MemoryRegionCache的allocate方法
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
// MemoryRegionCache内部维护着一个队列queue
Entry<T> entry = queue.poll();
if (entry == null) { // 为空则说明缓存分配失败
return false;
}
initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity);
entry.recycle();

// allocations is not thread-safe which is fine as this is only called from the same thread all time.
++ allocations;
return true;
}

弹出 Entry 之后,通过 initBuf() 方法初始化 ByteBuf,这里参数传入Entry的Chunk和Handle。

Entry类的代码:

1
2
3
4
5
6
static final class Entry<T> {
final Handle<Entry<?>> recyclerHandle;
PoolChunk<T> chunk;
ByteBuffer nioBuffer;
long handle = -1;
}
  • chunk:代表一块连续的内存
  • handle:相当于一个指针,可以唯一定位Chunk里的一块连续内存

通过 Chunk 和 Handle 就可以定位 ByteBuf 中指定的一块连续内存,有关ByteBuf相关的读写操作,都会在这块内存中进行。

SubPageMemoryRegionCache 类的 initBuf(entry.chunk,entry.handle,buf,reqCapacity) 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
protected void initBuf(PoolChunk<T> chunk, ByteBuffer nioBuffer, 
long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity);
}
// PoolChunk的initBufWithSubpage方法
void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer,
long handle, int reqCapacity) {
initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx(handle), reqCapacity);
}
private static int bitmapIdx(long handle) {
return (int) (handle >>> Integer.SIZE); // Integer.SIZE 为32
}
// PoolChunk的initBufWithSubpage方法
private void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer,
long handle, int bitmapIdx, int reqCapacity) {

int memoryMapIdx = memoryMapIdx(handle);
// private final PoolSubpage<T>[] subpages;
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage.doNotDestroy;
assert reqCapacity <= subpage.elemSize;

buf.init(
this, nioBuffer, handle,
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
reqCapacity, subpage.elemSize, arena.parent.threadCache());
}
void init(PoolChunk<T> chunk, ByteBuffer nioBuffer,
long handle, int offset, int length,
int maxLength, PoolThreadCache cache) {
init0(chunk, nioBuffer, handle, offset, length, maxLength, cache);
}
private void init0(PoolChunk<T> chunk, ByteBuffer nioBuffer,
long handle, int offset, int length,
int maxLength, PoolThreadCache cache) {
// 指定bytebuf分配的内存块
this.chunk = chunk;
// 指定当前bytebuf连续内存指向的位置
memory = chunk.memory;
tmpNioBuf = nioBuffer;
allocator = chunk.arena.parent;
this.cache = cache;
this.handle = handle;
this.offset = offset;//相对memory中的偏移量
this.length = length;
this.maxLength = maxLength;
}

上面的代码将 PooledUnsafeDirectByteBuf 的各个属性进行了初始化。通过缓存分配 ByteBuf,只要通过一个chunk 和 handle 就可以确定一块内存。以上就是通过缓存分配ByteBuf对象的全过程。

初始化ByteBuf 之后调用 entry.recycle() 将Entry对象回收,因为Entry对象弹出之后没有再被引用,所以可能GC会将Entry对象回收。Netty为了将对象循环利用,将其放在对象回收站进行回收。

1
2
3
4
5
6
void recycle() {
chunk = null;
nioBuffer = null;
handle = -1;
recyclerHandle.recycle(this);
}

chunk = null 和 handle = -1 表示当前Entry不指向任何一块内存。recyclerHandle.recycle(this) 将当前 Entry 回收。

以上就是命中缓存的流程。

Page级别的内存分配

Netty 内存分配的单位是Chunk,一个 Chunk 的大小是16MB,每个 Chunk 都以双向链表的形式保存在一个ChunkList 中。多个 ChunkList 也是以双向链表的形式进行关联的,大概结构如下图所示:

在 ChunkList 中,根据 Chunk 的内存使用率划分 ChunkList。这样在内存分配时,会根据百分比找到相应的ChunkList,在ChunkList 中选择一个 Chunk 进行内存分配。PoolArena 中 ChunkList 的成员变量:

1
2
3
4
5
6
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;

这里总共定义了6个 ChunkList,并在构造方法中将其初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);

q100.prevList(q075);
q075.prevList(q050);
q050.prevList(q025);
q025.prevList(q000);
q000.prevList(null);
qInit.prevList(qInit);

q050 = new PoolChunkList(this, q075, 50, 100, chunkSize) 表示:

  • q075是q050的下一个节点。
  • 当前 ChunkList 中存储的 Chunk 的内存使用率都在50%~100%。
  • ChunkSize 为其设置大小

ChunkList 的节点关系如下图所示:

Netty中,Chunk 又包含了多个Page,每个Page的大小为8KB,如果要分配16KB的内存,则在Chunk中找到连续的两个Page就可以。

在很多场景下,为缓冲区分配8KB的内存也是一种浪费,比如只需要分配2KB的缓冲区,如果使用8KB会造成6KB的浪费。这种情况下,Netty又会将Page切分成多个SubPage,每个SubPage大小要根据分配的缓冲区大小而定,比如要分配2KB的内存,就会将一个Page切分成4个SubPage,每个SubPage的大小为2KB:

PoolSubpage 的基本结构的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final class PoolSubpage<T> implements PoolSubpageMetric {

final PoolChunk<T> chunk;
private final int memoryMapIdx;
private final int runOffset;
private final int pageSize;
private final long[] bitmap;

PoolSubpage<T> prev;
PoolSubpage<T> next;

boolean doNotDestroy;
int elemSize;
private int maxNumElems;
private int bitmapLength;
private int nextAvail;
private int numAvail;
// ...
}
  • chunk代表其子页属于哪个Chunk
  • bitmap用于记录子页的内存分配情况
  • prev和next代表子页是按照双向链表进行关联的,分别指向上一个节点和下一个节点
  • elemSize属性代表的是子页是按照多大内存进行划分的,如果按照1KB划分,则可以划分出8个子页

PoolArena 的 allocate() 方法中 allocateNormal(buf,reqCapacity,normCapacity) 实际上就是在Page级别上进行分配,分配一个或者多个Page的空间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Method must be called inside synchronized(this) { ... } block
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
// 在原有的Chunk中进行内存分配
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
q075.allocate(buf, reqCapacity, normCapacity)) {
return;
}

// 如果是首次分配,创建Chunk进行内存分配
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
boolean success = c.allocate(buf, reqCapacity, normCapacity);
assert success;
qInit.add(c);
}
  1. 优先在原有的Chunk中进行内存分配
  2. 如果是首次分配,那就创建Chunk进行内存分配
  3. 最后初始化ByteBuf

以 q050 为例进行分析,q050.allocate(buf,reqCapacity,normCapacity)方法的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
if (normCapacity > maxCapacity) {
// Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can
// be handled by the PoolChunks that are contained in this PoolChunkList.
return false;
}
// 从Head节点向下遍历
for (PoolChunk<T> cur = head; cur != null; cur = cur.next) {
// 尝试分配
if (cur.allocate(buf, reqCapacity, normCapacity)) {
// 当前Chunk的内存使用率大于其最大使用率
if (cur.usage() >= maxUsage) {
// 从当前的ChunkList中移除,再添加到下一个ChunkList中
remove(cur);
nextList.add(cur);
}
return true;
}
}
return false;
}

从Head节点往下遍历,对每个Chunk都尝试去分配。

再回到 allocateNormal() 方法,查看首次分配:newChunk(pageSize, maxOrder, pageShifts, chunkSize)

其中参数pageSize是8192,也就是8KB;maxOrder为11;pageShifts为13,2的13次方正好是8192,也就是8KB;chunkSize为16777216,也就是16MB。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder,
int pageShifts, int chunkSize) {
if (directMemoryCacheAlignment == 0) {
return new PoolChunk<ByteBuffer>(this,
allocateDirect(chunkSize), pageSize, maxOrder,
pageShifts, chunkSize, 0);
}
// 申请了一块直接内存
final ByteBuffer memory = allocateDirect(chunkSize
+ directMemoryCacheAlignment);
// 通过构造函数创建了一个Chunk
return new PoolChunk<ByteBuffer>(this, memory, pageSize,
maxOrder, pageShifts, chunkSize,
offsetCacheLine(memory));
}

PoolChunk 的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
PoolChunk(PoolArena<T> arena, T memory, int pageSize, 
int maxOrder, int pageShifts, int chunkSize, int offset) {
unpooled = false;
this.arena = arena;
this.memory = memory;
this.pageSize = pageSize; // 8K
this.pageShifts = pageShifts; // 13
this.maxOrder = maxOrder; // 11
this.chunkSize = chunkSize;
this.offset = offset;
unusable = (byte) (maxOrder + 1);
log2ChunkSize = log2(chunkSize);
subpageOverflowMask = ~(pageSize - 1);
freeBytes = chunkSize;

maxSubpageAllocs = 1 << maxOrder;

// Generate the memory map.
memoryMap = new byte[maxSubpageAllocs << 1]; // 节点个数4096
depthMap = new byte[memoryMap.length];
int memoryMapIndex = 1;
for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
int depth = 1 << d;
for (int p = 0; p < depth; ++ p) {
memoryMap[memoryMapIndex] = (byte) d; // d代表当前节点的深度
depthMap[memoryMapIndex] = (byte) d;
memoryMapIndex ++;
}
}

subpages = newSubpageArray(maxSubpageAllocs);
cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);
}

上图是一个二叉树的结构,左侧的数字代表层级,右侧代表一块连续的内存,每个父节点下又拆分成多个子节点,顶层表示的内存范围为0~16MB,其下又分为两层,范围为0~8MB、8~16MB,依此类推,最后到11层,以8KB的大小划分,也就是一个Page的大小。

如果我们分配一个8MB的缓冲区,则会将第二层的第一个节点,也就是0~8MB这个连续的内存进行分配。分配完成之后,会将这个节点设置为不可用。结合上面的图,我们再看构造方法中的for循环代码:

1
2
3
4
5
6
7
8
for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
int depth = 1 << d;
for (int p = 0; p < depth; ++ p) {
memoryMap[memoryMapIndex] = (byte) d; // d代表当前节点的深度
depthMap[memoryMapIndex] = (byte) d;
memoryMapIndex ++;
}
}

实际上,这个for循环就是将上面的结构包装成一个字节数组memoryMap,外层循环用于控制层数,内层循环用于控制里面每层的节点。经过循环之后,memoryMap和depthMap内容为以下表现形式:

需要注意的是,因为程序中数组的下标是从1开始设置的,所以第0个节点元素为默认值0。这里数字代表层级,同时也代表了当前层级的节点,相同的数字个数就是这一层级的节点数。其中0为2个(因为分配时下标是从1开始的,所以第0个位置是默认值0,实际上第0层元素只有一个,就是头节点),1为2个,2为4个,3为8个,4为16个,n为2的n次方个,直到11,也就是11有2的11次方个。

继续剖析 cur.allocate(buf, reqCapacity, normCapacity):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// PoolChunk的allocate方法
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
final long handle;
if ((normCapacity & subpageOverflowMask) != 0) { // 大于一个Page
handle = allocateRun(normCapacity);
} else {
handle = allocateSubpage(normCapacity);
}

if (handle < 0) { // handle小于0说明分配失败
return false;
}
ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null;
initBuf(buf, nioBuffer, handle, reqCapacity);
return true;
}
private long allocateRun(int normCapacity) {
// 表示根据normCapacity计算出第几层
int d = maxOrder - (log2(normCapacity) - pageShifts);
// 根据层级关系去分配一个节点,其中id代表memoryMap中的下标
int id = allocateNode(d);
if (id < 0) {
return id;
}
freeBytes -= runLength(id);
return id;
}

allocateNode()方法的具体实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private int allocateNode(int d) {
// 下标初始值为1
int id = 1;
// 代表当前层级第一个节点的初始下标
int initial = - (1 << d); // has last d bits = 0 and rest all = 1
// 获取第一个节点的值
byte val = value(id);
if (val > d) { // 如果值大于层级,说明Chunk不可用
return -1;
}
// 当前下标对应的节点值如果小于层级,或当前下标小于层级的初始下标
while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
// 当前下标乘2,代表当前节点的子节点的起始位置
id <<= 1;
// 获取id位置的值
val = value(id);
if (val > d) { // 如果值大于层级,说明节点不可用
id ^= 1; // id为偶数则+1,id为奇数则-1,用的是兄弟节点
val = value(id); // 获取id位置的值
}
}
byte value = value(id);
assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
value, id & initial, d);
// 将找到的节点设置为不可用
setValue(id, unusable); // unusable = maxOrder + 1 = 12
// 逐层网上标记为被使用
updateParentsAlloc(id);
return id;
}

上述代码实际上是从第一个节点往下找的,找到层级为d、未被使用的节点。找到相关节点后通过 setValue 将当前节点设置为不可用,其中id是当前节点的下标,unusable代表一个不可用的值,这里是12,因为我们的层级只有12层,所以设置为12之后就相当于标记不可用。设置为不可用之后,通过updateParentsAlloc(id)逐层设置为缓存被使用的状态。updateParentsAlloc()方法的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void updateParentsAlloc(int id) {
while (id > 1) {
// 获取当前节点的父节点的id
int parentId = id >>> 1;
// 获取当前节点的值
byte val1 = value(id);
// 找到当前节点的兄弟节点
byte val2 = value(id ^ 1);
// 如果当前节点值小于兄弟节点,则保存当前节点值到val,否则保存兄弟节点值到val
byte val = val1 < val2 ? val1 : val2;
// 将val的值设置为父节点下标对应的值
setValue(parentId, val);
// id设置为父节点id,继续循环
id = parentId;
}
}

这里其实是将循环兄弟节点的值替换成父节点的值,可以通过注释仔细地进行逻辑分析。简单起见,这里只设置三层,如下图所示:

我们模拟其分配场景,假设只有三层,其中index代表数组memoryMap的下标,value代表其值,memoryMap中的值就为[0,0,1,1,2,2,2,2]。我们要分配一个4MB的ByteBuf,在调用allocateNode(int d)中传入的d是2,也就是第二层。根据上面分析的逻辑,这里会找到第二层的第一个节点,也就是0~4MB这个节点,找到之后将其设置为不可用,这样memoryMap中的值就为[0,0,1,1,12,2,2,2],二叉树的结构就会如下图所示:

注意深色节点部分,将index为4的节点设置为不可用,之后则将向上设置不可用,循环将兄弟节点数值较小的节点替换到父节点,也就是将index为2的节点的值替换成了index为5的节点的值,这样数组的值就会变为[0,1,2,1,12,2,2,2],二叉树的结构如下图所示:

注意:这里深色节点仅仅代表节点变化,并不是当前节点为不可用状态,不可用状态的真正判断依据是value值为12。

这样,如果再次分配一个4MB内存的ByteBuf,根据其逻辑,则会找到第二层的第二个节点,也就是4~8MB。再根据我们的逻辑,通过向上设置不可用,index为2就会设置成不可用状态,将value的值设置为12,数组的值变为[0,1,12,1,12,12,2,2],二叉树的结构如下图所示:

可以看到,分配两个4MB的ByteBuf之后,当前节点和其父节点都会设置成不可用状态,当index=2的节点设置为不可用之后,将不会再找这个节点下的子节点。依此类推,直到所有的内存分配完毕,index为1的节点也会变成不可用状态,这样所有的Page就都分配完毕,Chunk中再无可用节点。

根据以上分析可知,PoolChunk的allocate方法中的 handle = allocateRun(normCapacity);handle = allocateSubpage(normCapacity); ,返回的就是memoryMap的一个下标。通过这个下标,我们能唯一地定位一块内存。继续往下跟,通过 initBuf(buf, nioBuffer, handle, reqCapacity) 初始化ByteBuf后,再通过 qInit.add(c)将新创建的Chunk添加到ChunkList中,我们看 initBuf() 方法的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity) {
int memoryMapIdx = memoryMapIdx(handle); // 其实就是返回handler本身,memoryMapIdx=handle
// bitmapIdx(handle)是有关SubPage中使用到的逻辑,如果是Page级别的分配,只返回0
int bitmapIdx = bitmapIdx(handle);// handle >>> Integer.SIZE
if (bitmapIdx == 0) { // bitmapIdx == 0 说明是Page级别的分配
byte val = value(memoryMapIdx);
assert val == unusable : String.valueOf(val);// 判断当前节点是不是不可用

buf.init(this, nioBuffer, handle, runOffset(memoryMapIdx) + offset,
reqCapacity, runLength(memoryMapIdx), arena.parent.threadCache());
} else {
initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx, reqCapacity);
}
}
private int runOffset(int id) {
// represents the 0-based offset in #bytes from start of the byte-array chunk
int shift = id ^ 1 << depth(id);
return shift * runLength(id);
}
private int runLength(int id) {
// represents the size in #bytes supported by node 'id' in the tree
return 1 << log2ChunkSize - depth(id);
}

runOffset(memoryMapIdx) + offset 表示偏移量,偏移量相当于分配给缓冲区的这块内存相对于Chunk中申请的内存的首地址偏移了多少。参数runLength(memoryMapIdx)表示根据下标获取可分配的最大长度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void init(PoolChunk<T> chunk, ByteBuffer nioBuffer,
long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
init0(chunk, nioBuffer, handle, offset, length, maxLength, cache);
}
private void init0(PoolChunk<T> chunk, ByteBuffer nioBuffer,
long handle, int offset, int length,
int maxLength, PoolThreadCache cache) {
assert handle >= 0;
assert chunk != null;

this.chunk = chunk; // 在哪一块内存进行分配
memory = chunk.memory; // 这这一块内存的哪一块连续内存
tmpNioBuf = nioBuffer;
allocator = chunk.arena.parent;
this.cache = cache;
this.handle = handle;
this.offset = offset;
this.length = length;
this.maxLength = maxLength;
}

以上就是 PooledByteBuf 在Page级别分配的完整流程。

SubPage级别的内存分配

如果分配一个缓冲区大小远小于Page,直接在一个Page上进行分配会造成内存浪费,所以需要将Page继续切分成多个子块进行分配,子块分配的个数根据要分配的缓冲区大小而定,比如只需要分配1KB的内存,就将一个Page分成8等分。

简单起见,仅以16字节为例,讲解其分配逻辑。在分析其逻辑前,首先看PoolArena的一个属性:

1
private final PoolSubpage<T>[] tinySubpagePools;

这个属性是一个PoolSubpage的数组,有点类似于一个SubPage的缓存。我们创建一个SubPage之后,会将创建好的SubPage与PoolArena中tinySubpagePools数组的每一个元素进行关联,下次再分配的时候可以直接通过tinySubpagePools数组元素去找关联的SubPage。而tinySubpagePools是在PoolArena的构造方法中初始化的,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected PoolArena(PooledByteBufAllocator parent, int pageSize,
int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
// ...
// numTinySubpagePools = 512 >>> 4 = 32
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
for (int i = 0; i < tinySubpagePools.length; i ++) {
tinySubpagePools[i] = newSubpagePoolHead(pageSize);
}
}
private PoolSubpage<T>[] newSubpagePoolArray(int size) {
return new PoolSubpage[size];
}
private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
head.prev = head;
head.next = head;
return head;
}

由上面代码知道,SubPage其实也是一个双向链表,这里将Head的上一个节点和下一个节点都设置为自身,有关PoolSubpage的关联关系,我们稍后分析。通过循环创建PoolSubpage,总共创建32个SubPage,每个SubPage实际代表一块内存大小,如下图所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private long allocateSubpage(int normCapacity) {
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
synchronized (head) {
// 表示在第11层分配节点
int id = allocateNode(d);
if (id < 0) {
return id;
}
// 获取初始化的SubPage
final PoolSubpage<T>[] subpages = this.subpages;
final int pageSize = this.pageSize;

freeBytes -= pageSize;
// 表示第几个subPageIdx
int subpageIdx = subpageIdx(id);
PoolSubpage<T> subpage = subpages[subpageIdx];
if (subpage == null) {
// 如果SubPage为空
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
// 将当前的下标赋值给Subpage
subpages[subpageIdx] = subpage;
} else {
subpage.init(head, normCapacity);
}
// 抽出一个SubPage
return subpage.allocate();
}
}

通过 PoolSubpagehead=arena.findSubpagePoolHead(normCapacity) 这种方式找到Head节点,实际上这里Head就是之前分析的tinySubpagePools属性的第一个节点,也就是对应16Byte的那个节点。

int d=maxOrder 是将11赋值给d,也就是在内存树的第11层取节点,这部分上一小节已经剖析过。int id=allocateNode(d) 获取的是上一小节分析过的字节数组 memoryMap 的下标,这里指向一个Page,如果是第一次分配,指向的是0~8kB的那个Page。

final PoolSubpage[]subpages=this.subpages 这一步是获得PoolChunk中成员变量SubPages的值,也是个PoolSubpage的数组,在PoolChunk进行初始化的时候,也会初始化该数组,长度为2048。也就是说每个Chunk都维护着一个SubPage的列表。如果每一个Page级别的内存都需要被切分成SubPage,则会将这个Page放入该列表中,专门用于分配SubPage。

所以这个列表中的SubPage其实就是一个用于切分的Page。SubPages如下图所示:

int subpageIdx=subpageIdx(id) 这一步是通过id获得PoolSubpage数组的下标,如果id对应的Page是0~8KB的节点,这里获得的下标就是0。在 if(subpage==null) 中,因为默认SubPages只是创建一个数组,并没有往数组中赋值,所以第一次执行到这里会返回true,跟到if块中,代码如下

1
2
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
subpages[subpageIdx] = subpage;

通过new PoolSubpage创建一个新的SubPage后,通过subpages[subpageIdx]=subpage将新创建的SubPage根据下标赋值到SubPages中的元素。在new PoolSubpage的构造方法中,传入Head,就是之前提到的tinySubpagePools属性中的节点,如果分配16字节的缓冲区,这里对应的就是第一个节点,PoolSubpage构造方法的代码如下:

1
2
3
4
5
6
7
8
9
PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, 
int runOffset, int pageSize, int elemSize) {
this.chunk = chunk;
this.memoryMapIdx = memoryMapIdx;
this.runOffset = runOffset;
this.pageSize = pageSize;
bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
init(head, elemSize);
}

这里重点关注bitmap属性,这是一个long类型的数组,初始大小为8,这只是初始化的大小,真正的大小要根据Page切分成多少块确定,这里将属性进行了赋值,我们看init()方法的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void init(PoolSubpage<T> head, int elemSize) {
doNotDestroy = true;
this.elemSize = elemSize;
if (elemSize != 0) {
maxNumElems = numAvail = pageSize / elemSize;
nextAvail = 0;
bitmapLength = maxNumElems >>> 6;
if ((maxNumElems & 63) != 0) {
bitmapLength ++;
}

for (int i = 0; i < bitmapLength; i ++) {
// bitmap用于标识哪个SubPage被分配,0表示未分配,1表示已分配
bitmap[i] = 0;
}
}
// 加到Arena里面
addToPool(head);
}

this.elemSize=elemSize 表示保存当前分配的缓冲区大小,因为以16字节为例,所以这里是16。maxNumElems=numAvail=pageSize/elemSize 初始化了两个属性maxNumElems和numAvail,值都为pageSize/elemSize,表示一个Page大小除以分配的缓冲区大小,也就是表示当前Page被划分了多少份。numAvail表示剩余可用的块数,由于第一次分配都是可用的,所以numAvail=maxNumElems。bitmapLength表示bitmap的实际大小,已经分析过,bitmap初始化大小为8,但实际上并不一定需要8个元素,元素个数要根据Page切分的子块而定,这里的大小是所切分的子块数除以64。再往下看,代码if((maxNumElems&63)!=0)用于判断maxNumElems也就是当前配置所切分的子块是不是64的倍数,如果不是,则bitmapLength加1,最后通过循环,将对应位置的子块标记为0。

这里详细分析一下bitmap,它是个long类型的数组,long数组中的每一个值,也都是long类型的数字,其中每一个比特位都标记着Page中每一个子块的内存是否已分配,如果比特位是1,表示该子块已分配;如果比特位是0,表示该子块未分配,标记顺序是其二进制数从低位到高位进行排列。我们应该知道为什么bitmap大小要设置为子块数量除以64,因为long类型的数字是64位,每一个元素都能记录64个子块的数量,这样就可以通过SubPage个数除以64的方式决定bitmap中元素的数量。如果子块不能整除64,则通过元素数量+1的方式,除以64之后剩余的子块通过long中比特位由低到高进行排列记录,其逻辑结构如下图所示:

Chunk级别的内存分配

当需要分配的内存大小大于 ChunkSize 时,Netty 采用非池化策略直接分配内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
PoolChunk<T> chunk = newUnpooledChunk(reqCapacity); // 采用非池化策略直接分配内存
activeBytesHuge.add(chunk.chunkSize());
buf.initUnpooled(chunk, reqCapacity);
allocationsHuge.increment();
}
@Override
protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) {
if (directMemoryCacheAlignment == 0) {
return new PoolChunk<ByteBuffer>(this,
allocateDirect(capacity), capacity, 0);
}
final ByteBuffer memory = allocateDirect(capacity
+ directMemoryCacheAlignment);
return new PoolChunk<ByteBuffer>(this, memory, capacity,
offsetCacheLine(memory));
}
private static ByteBuffer allocateDirect(int capacity) {
return PlatformDependent.useDirectBufferNoCleaner() ?
PlatformDependent.allocateDirectNoCleaner(capacity) : ByteBuffer.allocateDirect(capacity);
}

ByteBuf的内存回收

堆外内存是不受JVM垃圾回收机制控制的,分配一块堆外内存进行ByteBuf操作完后要对对象进行回收。

ByteBUf的回收就是调用release() 方法,不管是哪种类型的 ByteBuf 都会调用到 release0 这个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public boolean release(int decrement) {
return handleRelease(updater.release(this, decrement));
}
private boolean retryRelease0(T instance, int decrement) {
for (;;) {
int rawCnt = updater().get(instance), realCnt = toLiveRealRefCnt(rawCnt, decrement);
if (decrement == realCnt) {
if (tryFinalRelease0(instance, rawCnt)) {
return true;
}
} else if (decrement < realCnt) {
// all changes to the raw count are 2x the "real" change
if (updater().compareAndSet(instance, rawCnt, rawCnt - (decrement << 1))) {
return false;
}
} else {
throw new IllegalReferenceCountException(realCnt, -decrement);
}
Thread.yield(); // this benefits throughput under high contention
}
}
private boolean handleRelease(boolean result) {
if (result) {
deallocate();
}
return result;
}

主要是判断剩余引用数是否大于 decrement,如果大于则通过CAS修改引用数。然后通过deallocate()方法进行释放,以 UnpooledDirectByteBuf 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
protected final void deallocate() {
if (handle >= 0) {
final long handle = this.handle;
// 表示当前的ByteBuf不在指向任何一块内存
this.handle = -1;
memory = null;
// 将ByteBuf内存进行释放
chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache);
tmpNioBuf = null;
chunk = null;
recycle(); // 将对象放入对象池,循环利用
}
}
void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
if (chunk.unpooled) {
int size = chunk.chunkSize();
destroyChunk(chunk);
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else { //池化内存
SizeClass sizeClass = sizeClass(normCapacity);// 判断内存规格
// 加到缓存中
if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
// 将缓存对象标记为未使用
freeChunk(chunk, handle, sizeClass, nioBuffer, false);
}
}
坚持原创技术分享,您的支持将鼓励我继续创作!