Netty作为一款高性能网络应用程序框架,实现了一套高性能内存管理机制。
池化内存 整体原理 Netty 向系统申请一整块连续内存,称为 chunk,默认大小 chunkSize = 16MB,通过 PoolChunk 对象包装。为了更细粒度的管理,Netty 将 chunk进一步拆分为 page,默认每个 chunk 包含 2048 个 page,pageSize = 8KB。
在同一个chunk中,Netty 将 page 按照不同粒度进行多层分组管理
第1层,分组大小size = 1*pageSize,一共有2048个组
第2层,分组大小size = 2*pageSize,一共有1024个组
第3层,分组大小size = 4*pageSize,一共有512个组
…
第12层,分组大小size = 2^(12-1) * pageSize = 2048 * pageSize,一共有2048个组
当请求分配内存时,将请求分配的内存数向上取值到最接近的分组大小,在该分组大小的相应层级中从左至右寻找空闲分组。例如请求分配内存对象为1.5 * pageSize
,向上取值到分组大小 2 * pageSize
,在该层分组中找到完全空闲的一组内存进行分配,如下图:
当分组大小 2 *pageSize
的内存分配出去后,为了方便下次内存分配,分组被标记为全部已使用(图中红色标记),向上更粗粒度的内存分组被标记为部分已使用(图中黄色标记)。
算法结构 Netty 基于平衡二叉树实现不同粒度的多层分组管理
为了方便快速查找 chunk 中能容纳请求内存的位置,算法构建一个基于byte数组(memoryMap)存储的完全平衡树,该平衡树的多个层级深度,就是前面介绍的按照不同粒度对 chunk进行多层分组:
树的深度depth从0开始计算,各层节点数,每个节点对应的内存大小如下:
1 2 3 4 5 6 depth = 0, 1 node,nodeSize = chunkSize depth = 1, 2 nodes,nodeSize = chunkSize/2 ... depth = d, 2^d nodes, nodeSize = chunkSize/(2^d) ... depth = maxOrder, 2^maxOrder nodes, nodeSize = chunkSize/2^{maxOrder} = pageSize
具有n个结点的完全二叉树的深度为:
树的最大深度为 maxOrder = 11(注意 depth从0开始),通过这棵树,算法在chunk中的查找就可以转换为:
当申请分配大小为 chunkSize/2^k 的内存,在平衡树高度为k的层级中,从左到右搜索第一个空闲节点。
数组的使用域从index = 1开始,将平衡树按照层次顺序依次存储在数组中:
1 2 3 depth = n 的第1个节点保存在 memoryMap[2^n] 中 depth = n 的第2个节点保存在 memoryMap[2^n+1] 中 ...
下图代表已分配 chunkSize/2:
内存管理 正常对象 当申请分配大小在 (pageSize / 2, chunkSize] 区间的内存时,会首先将请求分配的内存大小归一化(向上取整为page的整数倍)。例如 8000byte 归一化为8192byte(即8KB),8193byte 归一化为16384byte(16KB)。
当分配已归一化处理后大小为 chunkSize/2^d
的内存,即需要在 depth = d 的层级中找到第一块空闲内存,将其修改为已使用后,再更新其祖先节点为部分已使用。
小对象 当申请分配内存,会首先将请求分配的内存大小归一化:
当请求对象的大小为 (0, 496] ,归一化计算后方式是向上取最近的16的倍数。例如 15byte 归一化为16byte,490byte 归一化为496byte。
当请求对象的大小为 (496, pageSize / 2] ,归一化的计算方式是向上取整为最接近 512,1024,2048,4096的值。例如 520byte 归一化为 1024byte。
给这些小对象直接分配一个page会造成浪费,在page中进行平衡树的标记又额外消耗更多空间,因此Netty的实现是:先 PoolChunk 中申请空闲page,同一个 page 分为相同大小规格的小内存进行存储。通过 long[] 类型的bitmap 来记录内存是否已使用。
巨型对象 当申请内存大于 chunkSize 时,Netty采用的是非池化管理策略。
弹性伸缩 PoolChunk 为了解决单个PoolChunk容量有限的问题,Netty将多个PoolChunk组成链表一起管理。
将所有PoolChunk组成一个链表的话,进行遍历查找管理效率较低,因此 Netty 设计了PoolArena实现对多个PoolChunkList管理。
Arena 内部持有6个PoolChunkList,各个PoolChunkList持有的PoolChunk的使用率区间不同:
PoolChunkList的额定使用率区间存在交叉,这样设计是因为如果基于一个临界值的话,当PoolChunk内存申请释放后的内存使用率在临界值上下徘徊的话,会导致在PoolChunkList链表前后来回移动:
PoolSubpage PoolArena内部持有2个PoolSubpage数组,分别存储 tiny 和 small 规格类型的PoolSubpage。相同规格大小的 PoolSubpage组成链表,不同规格的PoolSubpage链表的 head 节点则分别保存在 tinySubpagePools 或者 smallSubpagePools数组中:
PoolSubpage链表中存储的 PoolSubpage都是只分配部分内存,当内存全部分配完或者内存全部释放完的 PoolSubpage会移出链表,减少不必要的链表节点;当PoolSubpage内存全部分配完后再释放部分内存,会重新将加入链表。
小结 PoolArena内存池弹性伸缩可用下图总结:
并发设计 为了减少线程间的竞争,Netty会提前创建多个 PoolArena(默认生成数量 = 2 * CPU核心数),当线程首次请求池化内存分配,会找被最少线程持有的 Arena,并保存线程局部变量 PoolThreadCache 中,实现线程与 Arena 的关联绑定。(线程局部变量使用的是 FastThreadLocal,性能高于 ThreadLocal)
Netty 还设计了缓存机制提升并发性能,当请求对象内存释放,PoolArena 并没有马上释放,而是先尝试将该内存关联的 PoolChunk 和 它的偏移位置等信息存入 PoolThreadLocalCache 中的固定大小缓存队列中(如果缓存队列满了则马上释放内存);当请求内存分配,PoolArena 会优先访问 PoolThreadLocalCache 的缓存队列中是否有缓存内存可用,如果有,则直接分配,提高分配效率。
源码分析 Netty 版本 4.1.52.Final
ByteBuf Java NIO提供了 ByteBuffer 作为它的字节容器,但是这个类使用起来过于复杂和繁琐。Netty用 ByteBuf 替代了ByteBuffer,实现了自动扩容 ,也更易使用。
基本结构 ByteBuf 源码中有段注释画出了结构:
1 2 3 4 5 6 7 8 * <pre> * +-------------------+------------------+------------------+ * | discardable bytes | readable bytes | writable bytes | * | | (CONTENT) | | * +-------------------+------------------+------------------+ * | | | | * 0 <= readerIndex <= writerIndex <= capacity * </pre>
readerIndex:记录读指针的开始位置
writerIndex:记录读指针的开始位置
capacity:缓冲区的总长度
除了这三个指针,ByteBuf里面其实还有一个指针 maxCapacity,它相当于ByteBuf扩容的最大阈值:
1 2 3 4 5 public abstract int maxCapacity () ;
这些指针将 ByteBuf 分为以下几个区域:
discardable bytes:从 0 到 readerIndex 为 discardable bytes,表示是无效的
readable bytes:从 readerIndex 到 writerIndex 为 readable bytes,表示可读数据区
writable bytes:从 writerIndex 到 capacity 为 writable bytes,表示这段区间空闲,可以往里面写数据
基本分类 ByteBuf 有众多子类,大致可以从以下维度来进行分类:
Pooled和UnPooled :
Pooled :每次申请内存都是从预先分配好的内存空间中提取一段连续内存
Unpooled :每次申请内存都是新的一次申请
unsafe和非unsafe :
unsafe :调用 native 方法底层直接操作内存
非unsafe :通过 JDK 的 API 间接操作内存
Heap和Direct :
Heap :指 JVM的堆内存
Direct :堆外内存,直接调用 JDK 的底层 API 进行物理内存分配,不在 JVM的堆内存中,需要手动释放
ByteBuf 最基本的读写 API 操作在 AbstractByteBuf
中已经实现了,其众多子类采用不同的策略来分配内存空间。下面是对重要的几个子类的总结:
PooledHeapByteBuf :池化的堆内缓冲区
PooledUnsafeHeapByteBuf :池化的Unsafe堆内缓冲区
PooledDirectByteBuf :池化的堆外缓冲区
PooledUnsafeDirectByteBuf :池化的Unsafe堆外缓冲区
UnpooledHeapByteBuf :非池化的堆内缓冲区
UnpooledUnsafeHeapByteBuf :非池化的Unsafe堆内缓冲区
UnpooledDirectByteBuf :非池化的堆外缓冲区
UnpooledUnsafeDirectByteBuf :非池化的Unsafe堆外缓冲区
基本操作 下面是 AbstractByteBuf 的 部分 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 @Override public ByteBuf readBytes (int length) { checkReadableBytes(length); if (length == 0 ) { return Unpooled.EMPTY_BUFFER; } ByteBuf buf = alloc().buffer(length, maxCapacity); buf.writeBytes(this , readerIndex, length); readerIndex += length; return buf; } protected final void checkReadableBytes (int minimumReadableBytes) { checkReadableBytes0(checkPositiveOrZero(minimumReadableBytes, "minimumReadableBytes" )); } private void checkReadableBytes0 (int minimumReadableBytes) { ensureAccessible(); if (checkBounds && readerIndex > writerIndex - minimumReadableBytes) { throw new IndexOutOfBoundsException(String.format( "readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s" , readerIndex, minimumReadableBytes, writerIndex, this )); } } protected final void ensureAccessible () { if (checkAccessible && !isAccessible()) { throw new IllegalReferenceCountException(0 ); } } boolean isAccessible () { return refCnt() != 0 ; } public ByteBuf writeBytes (byte [] src, int srcIndex, int length) { ensureWritable(length); setBytes(writerIndex, src, srcIndex, length); writerIndex += length; return this ; } @Override public ByteBuf ensureWritable (int minWritableBytes) { ensureWritable0(checkPositiveOrZero(minWritableBytes, "minWritableBytes" )); return this ; } final void ensureWritable0 (int minWritableBytes) { final int writerIndex = writerIndex(); final int targetCapacity = writerIndex + minWritableBytes; if (targetCapacity <= capacity()) { ensureAccessible(); return ; } if (checkBounds && targetCapacity > maxCapacity) { ensureAccessible(); throw new IndexOutOfBoundsException(String.format( "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s" , writerIndex, minWritableBytes, maxCapacity, this )); } final int fastWritable = maxFastWritableBytes(); int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable : alloc().calculateNewCapacity(targetCapacity, maxCapacity); capacity(newCapacity); } public int calculateNewCapacity (int minNewCapacity, int maxCapacity) { checkPositiveOrZero(minNewCapacity, "minNewCapacity" ); if (minNewCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "minNewCapacity: %d (expected: not greater than maxCapacity(%d)" , minNewCapacity, maxCapacity)); } final int threshold = CALCULATE_THRESHOLD; if (minNewCapacity == threshold) { return threshold; } if (minNewCapacity > threshold) { int newCapacity = minNewCapacity / threshold * threshold; if (newCapacity > maxCapacity - threshold) { newCapacity = maxCapacity; } else { newCapacity += threshold; } return newCapacity; } int newCapacity = 64 ; while (newCapacity < minNewCapacity) { newCapacity <<= 1 ; } return Math.min(newCapacity, maxCapacity); }
ByteBufAllocator Netty 中内存分配有一个顶层的抽象就是 ByteBufAllocator,负责分配所有 ByteBuf 类型的内存。
ByteBufAllocator 有如下几个重要的方法:
buffer():分配一块内存,自动判断是分配堆外内存还是堆内存
ioBuffer():尽可能分配一块堆外内存,如果系统不支持则分配堆内存
heapBuffer():分配一块堆内存
directBuffer():分配一块堆外内存
compositeBuffer():组合分配,把多个ByteBuf组合到一起变成一个整体
基本操作 AbstractByteBufAllocator
部分代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 protected AbstractByteBufAllocator () { this (false ); } protected AbstractByteBufAllocator (boolean preferDirect) { directByDefault = preferDirect && PlatformDependent.hasUnsafe(); emptyBuf = new EmptyByteBuf(this ); } @Override public ByteBuf buffer (int initialCapacity, int maxCapacity) { if (directByDefault) { return directBuffer(initialCapacity, maxCapacity); } return heapBuffer(initialCapacity, maxCapacity); } @Override public ByteBuf directBuffer (int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0 ) { return emptyBuf; } validate(initialCapacity, maxCapacity); return newDirectBuffer(initialCapacity, maxCapacity); } protected abstract ByteBuf newDirectBuffer (int initialCapacity, int maxCapacity) ;@Override public ByteBuf heapBuffer (int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0 ) { return emptyBuf; } validate(initialCapacity, maxCapacity); return newHeapBuffer(initialCapacity, maxCapacity); } protected abstract ByteBuf newHeapBuffer (int initialCapacity, int maxCapacity) ;
到这里知道了 directBuffer、heapBuffer 和 Pooled、Unpooled的分配规则,那么Unsafe和非Unsafe是如何判别的呢?其实是Netty自动判别的。如果操作系统底层支持 Unsafe 那就采用 Unsafe 读写,否则采用非 Unsafe 读写。
我们可以从UnpooledByteBufAllocator的源码中验证
非池化内存分配 UnpooledByteBufAllocator
实现了非池化内存分配策略
堆内存的分配 相关代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Override protected ByteBuf newHeapBuffer (int initialCapacity, int maxCapacity) { return PlatformDependent.hasUnsafe() ? new InstrumentedUnpooledUnsafeHeapByteBuf(this , initialCapacity, maxCapacity) : new InstrumentedUnpooledHeapByteBuf(this , initialCapacity, maxCapacity); } public final class PlatformDependent { private static final Throwable UNSAFE_UNAVAILABILITY_CAUSE = unsafeUnavailabilityCause0(); public static boolean hasUnsafe () { return UNSAFE_UNAVAILABILITY_CAUSE == null ; } private static Throwable unsafeUnavailabilityCause0 () { if (isAndroid()) { logger.debug("sun.misc.Unsafe: unavailable (Android)" ); return new UnsupportedOperationException("sun.misc.Unsafe: unavailable (Android)" ); } if (isIkvmDotNet()) { logger.debug("sun.misc.Unsafe: unavailable (IKVM.NET)" ); return new UnsupportedOperationException("sun.misc.Unsafe: unavailable (IKVM.NET)" ); } Throwable cause = PlatformDependent0.getUnsafeUnavailabilityCause(); if (cause != null ) { return cause; } try { boolean hasUnsafe = PlatformDependent0.hasUnsafe(); logger.debug("sun.misc.Unsafe: {}" , hasUnsafe ? "available" : "unavailable" ); return hasUnsafe ? null : PlatformDependent0.getUnsafeUnavailabilityCause(); } catch (Throwable t) { logger.trace("Could not determine if Unsafe is available" , t); return new UnsupportedOperationException("Could not determine if Unsafe is available" , t); } } }
通过调用 PlatformDependent.hasUnsafe()
方法来判断操作系统是否支持 Unsafe
如果支持Unsafe则创建 InstrumentedUnpooledUnsafeHeapByteBuf
如果不支持Unsafe则创建 InstrumentedUnpooledHeapByteBuf
InstrumentedUnpooledUnsafeHeapByteBuf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 private static final class InstrumentedUnpooledUnsafeHeapByteBuf extends UnpooledUnsafeHeapByteBuf { InstrumentedUnpooledUnsafeHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super (alloc, initialCapacity, maxCapacity); } @Override protected byte [] allocateArray(int initialCapacity) { byte [] bytes = super .allocateArray(initialCapacity); ((UnpooledByteBufAllocator) alloc()).incrementHeap(bytes.length); return bytes; } } public class UnpooledUnsafeHeapByteBuf extends UnpooledHeapByteBuf { public UnpooledUnsafeHeapByteBuf (ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super (alloc, initialCapacity, maxCapacity); } @Override protected byte [] allocateArray(int initialCapacity) { return PlatformDependent.allocateUninitializedArray(initialCapacity); } } public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf { private final ByteBufAllocator alloc; byte [] array; private ByteBuffer tmpNioBuf; public UnpooledHeapByteBuf (ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super (maxCapacity); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)" , initialCapacity, maxCapacity)); } this .alloc = checkNotNull(alloc, "alloc" ); setArray(allocateArray(initialCapacity)); setIndex(0 , 0 ); } private void setArray (byte [] initialArray) { array = initialArray; tmpNioBuf = null ; } @Override public ByteBuf setIndex (int readerIndex, int writerIndex) { if (checkBounds) { checkIndexBounds(readerIndex, writerIndex, capacity()); } setIndex0(readerIndex, writerIndex); return this ; } }
InstrumentedUnpooledHeapByteBuf 1 2 3 4 5 6 7 8 9 10 private static final class InstrumentedUnpooledHeapByteBuf extends UnpooledHeapByteBuf { InstrumentedUnpooledHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super (alloc, initialCapacity, maxCapacity); } } public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf { protected byte [] allocateArray(int initialCapacity) { return new byte [initialCapacity]; } }
虽然 InstrumentedUnpooledUnsafeHeapByteBuf
和 InstrumentedUnpooledUnsafeHeapByteBuf
最终调用的都是 UnpooledHeapByteBuf
的构造方法,但前者是通过 unsafe
的方式创建字节数组,而后者是通过 new byte[Size]
的方式创建数组。其实不仅是创建数组,读写操作也是这样,前者是通过 unsafe
操作数据,后者则是直接通过数组下标操作数组。
小结 分配内存时,根据操作系统是否支持 Unsafe:
如果支持 unsafe:则通过 unsafe 的方式创建字节数组(读写数组也是通过 unsafe的方式)
如果不支持 unsafe:则使用 new byte[Size]
关键字创建字节数组(读写数组通过索引操作)
释放内存则依赖 JVM 自动释放
堆外内存的分配 相关代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 noCleaner = tryNoCleaner && PlatformDependent.hasUnsafe() && PlatformDependent.hasDirectBufferNoCleanerConstructor(); @Override protected ByteBuf newDirectBuffer (int initialCapacity, int maxCapacity) { final ByteBuf buf; if (PlatformDependent.hasUnsafe()) { buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this , initialCapacity, maxCapacity) : new InstrumentedUnpooledUnsafeDirectByteBuf(this , initialCapacity, maxCapacity); } else { buf = new InstrumentedUnpooledDirectByteBuf(this , initialCapacity, maxCapacity); } return disableLeakDetector ? buf : toLeakAwareBuffer(buf); }
通过调用 PlatformDependent.hasUnsafe()
方法来判断操作系统是否支持 Unsafe
如果支持 Unsafe,判断 noCleaner
noCleaner = true,则创建 InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf
noCleaner = false,则创建 InstrumentedUnpooledUnsafeDirectByteBuf
如果不支持 Unsafe则创建 InstrumentedUnpooledDirectByteBuf
InstrumentedUnpooledDirectByteBuf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 private static final class InstrumentedUnpooledDirectByteBuf extends UnpooledDirectByteBuf { InstrumentedUnpooledDirectByteBuf( UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super (alloc, initialCapacity, maxCapacity); } @Override protected ByteBuffer allocateDirect (int initialCapacity) { ByteBuffer buffer = super .allocateDirect(initialCapacity); ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity()); return buffer; } @Override protected void freeDirect (ByteBuffer buffer) { int capacity = buffer.capacity(); super .freeDirect(buffer); ((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity); } } public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf { ByteBuffer buffer; private ByteBuffer tmpNioBuf; public UnpooledDirectByteBuf (ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super (maxCapacity); ObjectUtil.checkNotNull(alloc, "alloc" ); checkPositiveOrZero(initialCapacity, "initialCapacity" ); checkPositiveOrZero(maxCapacity, "maxCapacity" ); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)" , initialCapacity, maxCapacity)); }5 this .alloc = alloc; setByteBuffer(allocateDirect(initialCapacity), false ); } void setByteBuffer (ByteBuffer buffer, boolean tryFree) { if (tryFree) { ByteBuffer oldBuffer = this .buffer; if (oldBuffer != null ) { if (doNotFree) { doNotFree = false ; } else { freeDirect(oldBuffer); } } } this .buffer = buffer; tmpNioBuf = null ; capacity = buffer.remaining(); } }
InstrumentedUnpooledUnsafeDirectByteBuf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 private static final class InstrumentedUnpooledUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf { InstrumentedUnpooledUnsafeDirectByteBuf( UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super (alloc, initialCapacity, maxCapacity); } @Override protected ByteBuffer allocateDirect (int initialCapacity) { ByteBuffer buffer = super .allocateDirect(initialCapacity); ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity()); return buffer; } @Override protected void freeDirect (ByteBuffer buffer) { int capacity = buffer.capacity(); super .freeDirect(buffer); ((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity); } } public class UnpooledUnsafeDirectByteBuf extends UnpooledDirectByteBuf { public UnpooledUnsafeDirectByteBuf (ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super (alloc, initialCapacity, maxCapacity); } } public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf { public UnpooledDirectByteBuf (ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super (maxCapacity); ObjectUtil.checkNotNull(alloc, "alloc" ); checkPositiveOrZero(initialCapacity, "initialCapacity" ); checkPositiveOrZero(maxCapacity, "maxCapacity" ); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)" , initialCapacity, maxCapacity)); } this .alloc = alloc; setByteBuffer(allocateDirect(initialCapacity), false ); } }
这里发现 InstrumentedUnpooledDirectByteBuf
和 InstrumentedUnpooledUnsafeDirectByteBuf
都是通过new DirectByteBuffer(capacity)
的方式创建堆外内存,并且释放内存也是通过 DirectByteBuffer
的 cleaner 释放,那它们的区别是什么呢?其实就是读写数据的方式不同 。
InstrumentedUnpooledDirectByteBuf
的父类 UnpooledDirectByteBuf
1 2 3 4 5 6 7 8 9 10 @Override protected byte _getByte (int index) { return buffer.get(index); } class DirectByteBuffer extends MappedByteBuffer implements DirectBuffer { public byte get (int i) { return ((unsafe.getByte(ix(checkIndex(i))))); } }
UnpooledDirectByteBuf
通过 DirectByteBuffer
本身的方法来操作数据,不过 DirectByteBuffer
本身也是使用 unsafe 来操作数据的。
InstrumentedUnpooledUnsafeDirectByteBuf
的父类 UnpooledUnsafeDirectByteBuf
1 2 3 4 5 6 7 8 9 10 @Override public byte getByte (int index) { checkIndex(index); return _getByte(index); } @Override protected byte _getByte (int index) { return UnsafeByteBufUtil.getByte(addr(index)); }
UnpooledUnsafeDirectByteBuf
是直接使用的 unsafe 来操作数据的。
InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private static final class InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeNoCleanerDirectByteBuf { InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf( UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super (alloc, initialCapacity, maxCapacity); } @Override protected ByteBuffer allocateDirect (int initialCapacity) { ByteBuffer buffer = super .allocateDirect(initialCapacity); ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity()); return buffer; } @Override ByteBuffer reallocateDirect (ByteBuffer oldBuffer, int initialCapacity) { int capacity = oldBuffer.capacity(); ByteBuffer buffer = super .reallocateDirect(oldBuffer, initialCapacity); ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity() - capacity); return buffer; } } class UnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeDirectByteBuf { }
小结 分配内存时
如果操作系统不支持 unsafe 或 noCleaner = false,则通过 new DirectByteBuffer(capacity)
的方式创建带有 cleaner 的 DirectByteBuffer
。
不过操作系统支持 unsafe 并且 noCleaner = true,则通过反射创建 cleaner 为空的 DirectByteBuffer
。
释放内存时
如果创建的 DirectByteBuffer
带有 cleaner,使用 DirectByteBuffer本身的cleaner释放
如果创建的 DirectByteBuffer
没有 cleaner,则使用 unsafe 释放内存
池化内存的分配 PooledByteBufAllocator
实现了池化内存分配策略
由于高版本 Netty 池化内存分配策略有所变更,这部分源码参考 Netty 版本 4.1.42.Final
PooledByteBufAllocator 构造方法的相关参数如下:
preferDirect:优先分配直接内存,默认值是 false。
nHeapArena:HeapArena的数量,默认值是 CPU核数 * 2。
nDirectArena:DirectArena的数量,默认值是 CPU核数 * 2。
pageSize:page 的个数,默认值是 8192。
maxOrder:二叉树的最大深度,默认值是 11。
tinyCacheSize:tiny 类型的缓存列表容量,默认值是512。
smallCacheSize:small 类型的缓存列表容量,默认值是 256。
normalCacheSize:normal 类型的缓存列表容量,默认值是 64。
useCacheForAllThreads:默认值是 true。
directMemoryCacheAlignment:默认值是 0。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public PooledByteBufAllocator (boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize, boolean useCacheForAllThreads, int directMemoryCacheAlignment) { super (preferDirect); threadCache = new PoolThreadLocalCache(useCacheForAllThreads); this .tinyCacheSize = tinyCacheSize; this .smallCacheSize = smallCacheSize; this .normalCacheSize = normalCacheSize; chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder); int pageShifts = validateAndCalculatePageShifts(pageSize); if (nHeapArena > 0 ) { heapArenas = newArenaArray(nHeapArena); List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length); for (int i = 0 ; i < heapArenas.length; i ++) { PoolArena.HeapArena arena = new PoolArena.HeapArena(this , pageSize, maxOrder, pageShifts, chunkSize,directMemoryCacheAlignment); heapArenas[i] = arena; metrics.add(arena); } heapArenaMetrics = Collections.unmodifiableList(metrics); } else { heapArenas = null ; heapArenaMetrics = Collections.emptyList(); } if (nDirectArena > 0 ) { directArenas = newArenaArray(nDirectArena); List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length); for (int i = 0 ; i < directArenas.length; i ++) { PoolArena.DirectArena arena = new PoolArena.DirectArena( this , pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment); directArenas[i] = arena; metrics.add(arena); } directArenaMetrics = Collections.unmodifiableList(metrics); } else { directArenas = null ; directArenaMetrics = Collections.emptyList(); } metric = new PooledByteBufAllocatorMetric(this ); }
HeapArena 和 DirectArena 分别为 CPU核数 * 2,目的就是保证Netty中的每一个任务线程都可以有一个独享的Arena,保证在每个线程分配内存的时候不用加锁 。
PooledByteBufAllocator
实现分配内存的两个方法:newDirectBuffer() 方法和newHeapBuffer() 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Override protected ByteBuf newHeapBuffer (int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<byte []> heapArena = cache.heapArena; final ByteBuf buf; if (heapArena != null ) { buf = heapArena.allocate(cache, initialCapacity, maxCapacity); } else { buf = PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this , initialCapacity, maxCapacity) : new UnpooledHeapByteBuf(this , initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); } @Override protected ByteBuf newDirectBuffer (int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; final ByteBuf buf; if (directArena != null ) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this , initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this , initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }
可以发现这两个方法大体结构是一样的,以 newDirectBuffer 为例:
通过 threadCache.get() 方法获取一个类型为 PoolThreadCache 的 cache 对象
通过 cache 获得 directArena 对象
调用 directArena.allocate() 方法分配 ByteBuf
PoolThreadLocalCache 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 final class PoolThreadLocalCache extends FastThreadLocal <PoolThreadCache > { private final boolean useCacheForAllThreads; PoolThreadLocalCache(boolean useCacheForAllThreads) { this .useCacheForAllThreads = useCacheForAllThreads; } @Override protected synchronized PoolThreadCache initialValue () { final PoolArena<byte []> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); final Thread current = Thread.currentThread(); if (useCacheForAllThreads || current instanceof FastThreadLocalThread) { final PoolThreadCache cache = new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); if (DEFAULT_CACHE_TRIM_INTERVAL_MILLIS > 0 ) { final EventExecutor executor = ThreadExecutorMap.currentExecutor(); if (executor != null ) { executor.scheduleAtFixedRate(trimTask, DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); } } return cache; } return new PoolThreadCache(heapArena, directArena, 0 , 0 , 0 , 0 , 0 ); } } private <T> PoolArena<T> leastUsedArena (PoolArena<T>[] arenas) { if (arenas == null || arenas.length == 0 ) { return null ; } PoolArena<T> minArena = arenas[0 ]; for (int i = 1 ; i < arenas.length; i++) { PoolArena<T> arena = arenas[i]; if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) { minArena = arena; } } return minArena; }
通过 leastUsedArena() 方法分别获得使用率最少的 HeapArena 和 DirectArena 对象
然后把 HeapArena 和 DirectArena 对象作为参数传递到 PoolThreadCache 的构造器中
PoolThreadCache除了可以在 HeapArena 和 DirectArena 上进行内存分配,还可以在它底层维护的 ByteBuf 缓存列表进行分配。
举个例子:我们通过 PooledByteBufAllocator 创建了一个1024字节的ByteBuf,当用完释放后,可能在其他地方会继续分配1024字节的ByteBuf。这时,其实不需要在 Arena 上进行内存分配,而是直接通过 PoolThreadCache中维护的 ByteBuf 的缓存列表直接拿过来返回。
PooledByteBufAllocator 维护着三种规格大小的缓存列表,分别是:tiny,small,normal,其容量分别为 512,256,64。
通过这种方式,Netty 预创建了固定规格的内存池,大大提高了内存分配的性能。
DirectArena内存分配流程 HeapArena 和 DirectArena 分配内存的基本流程有三个步骤。
从对象池里获得 PooledByteBuf 进行复用
从缓存中进行内存分配
从缓存中分配失败,则从内存堆里进行内存分配
对象池获取 ByteBuf 以 DirectBuffer 为例,首先来看从对象池里获得 PooledByteBuf 进行复用的情况。
PooledByteBufAllocator 的 newDirectBuffer() 的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override protected ByteBuf newDirectBuffer (int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; final ByteBuf buf; if (directArena != null ) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this , initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this , initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }
PoolArena 的 allocate方法:
1 2 3 4 5 PooledByteBuf<T> allocate (PoolThreadCache cache, int reqCapacity, int maxCapacity) { PooledByteBuf<T> buf = newByteBuf(maxCapacity); allocate(cache, buf, reqCapacity); return buf; }
DirectArena 的 newByteBuf方法:
1 2 3 4 5 6 7 8 9 10 @Override protected PooledByteBuf<ByteBuffer> newByteBuf (int maxCapacity) { if (HAS_UNSAFE) { return PooledUnsafeDirectByteBuf.newInstance(maxCapacity); } else { return PooledDirectByteBuf.newInstance(maxCapacity); } }
首先判断是否支持Unsafe:
支持 Unsafe 创建的是 PooledUnsafeDirectByteBuf,它通过Unsafe方法操作数组。
不支持 Unsafe 创建的是 PooledDirectByteBuf,它通过数组索引操作数组。
默认情况下一般是支持Unsafe的,PooledUnsafeDirectByteBuf 的 newInstance方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private static final ObjectPool<PooledUnsafeDirectByteBuf> RECYCLER = ObjectPool.newPool( new ObjectCreator<PooledUnsafeDirectByteBuf>() { @Override public PooledUnsafeDirectByteBuf newObject (Handle<PooledUnsafeDirectByteBuf> handle) { return new PooledUnsafeDirectByteBuf(handle, 0 ); } }); static PooledUnsafeDirectByteBuf newInstance (int maxCapacity) { PooledUnsafeDirectByteBuf buf = RECYCLER.get(); buf.reuse(maxCapacity); return buf; } final void reuse (int maxCapacity) { maxCapacity(maxCapacity); resetRefCnt(); setIndex0(0 , 0 ); discardMarks(); }
首先通过 RECYCLER (内存回收站)对象的 get() 方法获得一个 ByteBuf。当回收站里面没有可用的buf时就会创建一个新的 ByteBuf。获得的 ByteBuf 可能是回收站里取出来的,所以复用前需要重置相关属性。
内存池的内存规格 Netty 底层还有一个内存单位的封装,为了更高效地管理内存,避免内存浪费,把每一个区间的内存规格又做了细分。Netty 内存池中主要设置了四种规格大小的内存:
tiny :0~512Byte
small :512Byte~8KB
normal :8KB~16MB
huge :16MB以上
那么为什么以这几个节点来进行分段呢,其实这里面是有原因的:
首先为什么以16M 作为分界点,16M在 Netty 中就是一个Chunk ,Netty 中所有的内存申请都是以 Chunk 为单位向系统申请的,后续的所有内存分配都是在这个 Chunk 里的操作。
然后为什么又以8k 作为分界点的,8K在Netty中就是一个 Page ,每个Chunk 中有 2^11 个 Page 这样就能更细粒度的分配内存。
最后在 0 到 8K 的内存区间中有一个 SubPage 的对象来进行精确的分配内存。
缓存的分配 缓存的数据结构 在Netty中缓存的数据结构是一个叫做 MemoryRegionCache 的类,其结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private abstract static class MemoryRegionCache <T > { private final int size; private final Queue<Entry<T>> queue; private final SizeClass sizeClass; private int allocations; MemoryRegionCache(int size, SizeClass sizeClass) { this .size = MathUtil.safeFindNextPositivePowerOfTwo(size); queue = PlatformDependent.newFixedMpscQueue(this .size); this .sizeClass = sizeClass; } } static final class Entry <T > { final Handle<Entry<?>> recyclerHandle; PoolChunk<T> chunk; ByteBuffer nioBuffer; long handle = -1 ; Entry(Handle<Entry<?>> recyclerHandle) { this .recyclerHandle = recyclerHandle; } void recycle () { chunk = null ; nioBuffer = null ; handle = -1 ; recyclerHandle.recycle(this ); } } enum SizeClass { Tiny, Small, Normal }
MemoryRegionCache 由三部分组成:
queue:队列中每个节点都是一个 Entry 实体
sizeClass:对应Netty中的内存规格,分别是 tiny、small、normal(huge规格不用缓存分配)
size:队列的容量,可能是 512,256,64(对应tinyCacheSize,smallCacheSize,normalCacheSize)
tiny:共32种规格,均是16的整数倍,0Byte、16Byte、32Byte、48Byte、……496Byte。
small:4种规格,512Byte、1KB、2KB、4KB。
normal:3种规格,8KB、16KB、32KB。
PoolThreadCache中维护了六个缓存数组(HeapCaches省略):
tinySubPageDirectCaches:tiny 类型的缓存列表
smallSubPageDirectCaches:small 类型的缓存列表
normalDirectCaches:normal 类型的缓存数组
代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 final class PoolThreadCache { final PoolArena<byte []> heapArena; final PoolArena<ByteBuffer> directArena; private final MemoryRegionCache<byte []>[] tinySubPageHeapCaches; private final MemoryRegionCache<byte []>[] smallSubPageHeapCaches; private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches; private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches; private final MemoryRegionCache<byte []>[] normalHeapCaches; private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches; }
在 PoolThreadCache 的构造方法中进行了初始化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 PoolThreadCache(PoolArena<byte []> heapArena, PoolArena<ByteBuffer> directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { this .freeSweepAllocationThreshold = freeSweepAllocationThreshold; this .heapArena = heapArena; this .directArena = directArena; if (directArena != null ) { tinySubPageDirectCaches = createSubPageCaches( tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); smallSubPageDirectCaches = createSubPageCaches( smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small); numShiftsNormalDirect = log2(directArena.pageSize); normalDirectCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, directArena); directArena.numThreadCaches.getAndIncrement(); } else { tinySubPageDirectCaches = null ; smallSubPageDirectCaches = null ; normalDirectCaches = null ; numShiftsNormalDirect = -1 ; } if (heapArena != null ) { tinySubPageHeapCaches = createSubPageCaches( tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); smallSubPageHeapCaches = createSubPageCaches( smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small); numShiftsNormalHeap = log2(heapArena.pageSize); normalHeapCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, heapArena); heapArena.numThreadCaches.getAndIncrement(); } else { tinySubPageHeapCaches = null ; smallSubPageHeapCaches = null ; normalHeapCaches = null ; numShiftsNormalHeap = -1 ; } if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null ) && freeSweepAllocationThreshold < 1 ) { throw new IllegalArgumentException("freeSweepAllocationThreshold: " + freeSweepAllocationThreshold + " (expected: > 0)" ); } }
以 tiny 类型为例,具体分析一下SubPage的缓存结构,实现代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private static <T> MemoryRegionCache<T>[] createSubPageCaches( int cacheSize, int numCaches, SizeClass sizeClass) { if (cacheSize > 0 && numCaches > 0 ) { @SuppressWarnings ("unchecked" ) MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches]; for (int i = 0 ; i < cache.length; i++) { cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass); } return cache; } else { return null ; } }
其实就是创建了一个缓存数组,这个缓存数组的长度是 numCaches。相当于:
创建了 tinySubPageDirectCaches[32],smallSubPageDirectCaches[4],normalDirectCaches[3] 三个数组
tinySubPageDirectCaches 中的每个元素 SubPageMemoryRegionCache 中的队列queue容量为 512
smallSubPageDirectCaches 中的每个元素 SubPageMemoryRegionCache 中的队列queue容量为 256
normalDirectCaches 中的每个元素 MemoryRegionCache 中的队列queue容量为 64
缓存的分配流程 在基本了解缓存数组的数据结构之后,继续剖析在缓存中分配内存的逻辑,回到 PoolArena 的 allocate() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 private void allocate (PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { final int normCapacity = normalizeCapacity(reqCapacity); if (isTinyOrSmall(normCapacity)) { int tableIdx; PoolSubpage<T>[] table; boolean tiny = isTiny(normCapacity); if (tiny) { if (cache.allocateTiny(this , buf, reqCapacity, normCapacity)) { return ; } tableIdx = tinyIdx(normCapacity); table = tinySubpagePools; } else { if (cache.allocateSmall(this , buf, reqCapacity, normCapacity)) { return ; } tableIdx = smallIdx(normCapacity); table = smallSubpagePools; } final PoolSubpage<T> head = table[tableIdx]; synchronized (head) { final PoolSubpage<T> s = head.next; if (s != head) { assert s.doNotDestroy && s.elemSize == normCapacity; long handle = s.allocate(); assert handle >= 0 ; s.chunk.initBufWithSubpage(buf, null , handle, reqCapacity); incTinySmallAllocation(tiny); return ; } } synchronized (this ) { allocateNormal(buf, reqCapacity, normCapacity); } incTinySmallAllocation(tiny); return ; } if (normCapacity <= chunkSize) { if (cache.allocateNormal(this , buf, reqCapacity, normCapacity)) { return ; } synchronized (this ) { allocateNormal(buf, reqCapacity, normCapacity); ++allocationsNormal; } } else { allocateHuge(buf, reqCapacity); } }
PoolThreadCache 缓存分配:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 boolean allocateTiny (PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity); } private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) { int idx = PoolArena.tinyIdx(normCapacity); if (area.isDirect()) { return cache(tinySubPageDirectCaches, idx); } return cache(tinySubPageHeapCaches, idx); } private static <T> MemoryRegionCache<T> cache (MemoryRegionCache<T>[] cache, int idx) { if (cache == null || idx > cache.length - 1 ) { return null ; } return cache[idx]; } private boolean allocate (MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) { if (cache == null ) { return false ; } boolean allocated = cache.allocate(buf, reqCapacity); if (++ allocations >= freeSweepAllocationThreshold) { allocations = 0 ; trim(); } return allocated; } public final boolean allocate (PooledByteBuf<T> buf, int reqCapacity) { Entry<T> entry = queue.poll(); if (entry == null ) { return false ; } initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity); entry.recycle(); ++ allocations; return true ; }
弹出 Entry 之后,通过 initBuf() 方法初始化 ByteBuf,这里参数传入Entry的Chunk和Handle。
Entry类的代码:
1 2 3 4 5 6 static final class Entry <T > { final Handle<Entry<?>> recyclerHandle; PoolChunk<T> chunk; ByteBuffer nioBuffer; long handle = -1 ; }
chunk:代表一块连续的内存
handle:相当于一个指针,可以唯一定位Chunk里的一块连续内存
通过 Chunk 和 Handle 就可以定位 ByteBuf 中指定的一块连续内存,有关ByteBuf相关的读写操作,都会在这块内存中进行。
SubPageMemoryRegionCache 类的 initBuf(entry.chunk,entry.handle,buf,reqCapacity) 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 protected void initBuf (PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) { chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity); } void initBufWithSubpage (PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity) { initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx(handle), reqCapacity); } private static int bitmapIdx (long handle) { return (int ) (handle >>> Integer.SIZE); } private void initBufWithSubpage (PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int bitmapIdx, int reqCapacity) { int memoryMapIdx = memoryMapIdx(handle); PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)]; assert subpage.doNotDestroy; assert reqCapacity <= subpage.elemSize; buf.init( this , nioBuffer, handle, runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF ) * subpage.elemSize + offset, reqCapacity, subpage.elemSize, arena.parent.threadCache()); } void init (PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { init0(chunk, nioBuffer, handle, offset, length, maxLength, cache); } private void init0 (PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { this .chunk = chunk; memory = chunk.memory; tmpNioBuf = nioBuffer; allocator = chunk.arena.parent; this .cache = cache; this .handle = handle; this .offset = offset; this .length = length; this .maxLength = maxLength; }
上面的代码将 PooledUnsafeDirectByteBuf 的各个属性进行了初始化。通过缓存分配 ByteBuf,只要通过一个chunk 和 handle 就可以确定一块内存。以上就是通过缓存分配ByteBuf对象的全过程。
初始化ByteBuf 之后调用 entry.recycle() 将Entry对象回收,因为Entry对象弹出之后没有再被引用,所以可能GC会将Entry对象回收。Netty为了将对象循环利用,将其放在对象回收站进行回收。
1 2 3 4 5 6 void recycle () { chunk = null ; nioBuffer = null ; handle = -1 ; recyclerHandle.recycle(this ); }
chunk = null 和 handle = -1 表示当前Entry不指向任何一块内存。recyclerHandle.recycle(this) 将当前 Entry 回收。
以上就是命中缓存的流程。
Page级别的内存分配 Netty 内存分配的单位是Chunk,一个 Chunk 的大小是16MB,每个 Chunk 都以双向链表的形式保存在一个ChunkList 中。多个 ChunkList 也是以双向链表的形式进行关联的,大概结构如下图所示:
在 ChunkList 中,根据 Chunk 的内存使用率划分 ChunkList。这样在内存分配时,会根据百分比找到相应的ChunkList,在ChunkList 中选择一个 Chunk 进行内存分配。PoolArena 中 ChunkList 的成员变量:
1 2 3 4 5 6 private final PoolChunkList<T> q050;private final PoolChunkList<T> q025;private final PoolChunkList<T> q000;private final PoolChunkList<T> qInit;private final PoolChunkList<T> q075;private final PoolChunkList<T> q100;
这里总共定义了6个 ChunkList,并在构造方法中将其初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 q100 = new PoolChunkList<T>(this , null , 100 , Integer.MAX_VALUE, chunkSize); q075 = new PoolChunkList<T>(this , q100, 75 , 100 , chunkSize); q050 = new PoolChunkList<T>(this , q075, 50 , 100 , chunkSize); q025 = new PoolChunkList<T>(this , q050, 25 , 75 , chunkSize); q000 = new PoolChunkList<T>(this , q025, 1 , 50 , chunkSize); qInit = new PoolChunkList<T>(this , q000, Integer.MIN_VALUE, 25 , chunkSize); q100.prevList(q075); q075.prevList(q050); q050.prevList(q025); q025.prevList(q000); q000.prevList(null ); qInit.prevList(qInit);
q050 = new PoolChunkList(this, q075, 50, 100, chunkSize) 表示:
q075是q050的下一个节点。
当前 ChunkList 中存储的 Chunk 的内存使用率都在50%~100%。
ChunkSize 为其设置大小
ChunkList 的节点关系如下图所示:
Netty中,Chunk 又包含了多个Page,每个Page的大小为8KB,如果要分配16KB的内存,则在Chunk中找到连续的两个Page就可以。
在很多场景下,为缓冲区分配8KB的内存也是一种浪费,比如只需要分配2KB的缓冲区,如果使用8KB会造成6KB的浪费。这种情况下,Netty又会将Page切分成多个SubPage,每个SubPage大小要根据分配的缓冲区大小而定,比如要分配2KB的内存,就会将一个Page切分成4个SubPage,每个SubPage的大小为2KB:
PoolSubpage 的基本结构的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 final class PoolSubpage <T > implements PoolSubpageMetric { final PoolChunk<T> chunk; private final int memoryMapIdx; private final int runOffset; private final int pageSize; private final long [] bitmap; PoolSubpage<T> prev; PoolSubpage<T> next; boolean doNotDestroy; int elemSize; private int maxNumElems; private int bitmapLength; private int nextAvail; private int numAvail; }
chunk代表其子页属于哪个Chunk
bitmap用于记录子页的内存分配情况
prev和next代表子页是按照双向链表进行关联的,分别指向上一个节点和下一个节点
elemSize属性代表的是子页是按照多大内存进行划分的,如果按照1KB划分,则可以划分出8个子页
PoolArena 的 allocate() 方法中 allocateNormal(buf,reqCapacity,normCapacity) 实际上就是在Page级别上进行分配,分配一个或者多个Page的空间:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private void allocateNormal (PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity)) { return ; } PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); boolean success = c.allocate(buf, reqCapacity, normCapacity); assert success; qInit.add(c); }
优先在原有的Chunk中进行内存分配
如果是首次分配,那就创建Chunk进行内存分配
最后初始化ByteBuf
以 q050 为例进行分析,q050.allocate(buf,reqCapacity,normCapacity)方法的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 boolean allocate (PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { if (normCapacity > maxCapacity) { return false ; } for (PoolChunk<T> cur = head; cur != null ; cur = cur.next) { if (cur.allocate(buf, reqCapacity, normCapacity)) { if (cur.usage() >= maxUsage) { remove(cur); nextList.add(cur); } return true ; } } return false ; }
从Head节点往下遍历,对每个Chunk都尝试去分配。
再回到 allocateNormal() 方法,查看首次分配:newChunk(pageSize, maxOrder, pageShifts, chunkSize)
其中参数pageSize是8192,也就是8KB;maxOrder为11;pageShifts为13,2的13次方正好是8192,也就是8KB;chunkSize为16777216,也就是16MB。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override protected PoolChunk<ByteBuffer> newChunk (int pageSize, int maxOrder, int pageShifts, int chunkSize) { if (directMemoryCacheAlignment == 0 ) { return new PoolChunk<ByteBuffer>(this , allocateDirect(chunkSize), pageSize, maxOrder, pageShifts, chunkSize, 0 ); } final ByteBuffer memory = allocateDirect(chunkSize + directMemoryCacheAlignment); return new PoolChunk<ByteBuffer>(this , memory, pageSize, maxOrder, pageShifts, chunkSize, offsetCacheLine(memory)); }
PoolChunk 的构造函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) { unpooled = false ; this .arena = arena; this .memory = memory; this .pageSize = pageSize; this .pageShifts = pageShifts; this .maxOrder = maxOrder; this .chunkSize = chunkSize; this .offset = offset; unusable = (byte ) (maxOrder + 1 ); log2ChunkSize = log2(chunkSize); subpageOverflowMask = ~(pageSize - 1 ); freeBytes = chunkSize; maxSubpageAllocs = 1 << maxOrder; memoryMap = new byte [maxSubpageAllocs << 1 ]; depthMap = new byte [memoryMap.length]; int memoryMapIndex = 1 ; for (int d = 0 ; d <= maxOrder; ++ d) { int depth = 1 << d; for (int p = 0 ; p < depth; ++ p) { memoryMap[memoryMapIndex] = (byte ) d; depthMap[memoryMapIndex] = (byte ) d; memoryMapIndex ++; } } subpages = newSubpageArray(maxSubpageAllocs); cachedNioBuffers = new ArrayDeque<ByteBuffer>(8 ); }
上图是一个二叉树的结构,左侧的数字代表层级,右侧代表一块连续的内存,每个父节点下又拆分成多个子节点,顶层表示的内存范围为0~16MB,其下又分为两层,范围为0~8MB、8~16MB,依此类推,最后到11层,以8KB的大小划分,也就是一个Page的大小。
如果我们分配一个8MB的缓冲区,则会将第二层的第一个节点,也就是0~8MB这个连续的内存进行分配。分配完成之后,会将这个节点设置为不可用。结合上面的图,我们再看构造方法中的for循环代码:
1 2 3 4 5 6 7 8 for (int d = 0 ; d <= maxOrder; ++ d) { int depth = 1 << d; for (int p = 0 ; p < depth; ++ p) { memoryMap[memoryMapIndex] = (byte ) d; depthMap[memoryMapIndex] = (byte ) d; memoryMapIndex ++; } }
实际上,这个for循环就是将上面的结构包装成一个字节数组memoryMap,外层循环用于控制层数,内层循环用于控制里面每层的节点。经过循环之后,memoryMap和depthMap内容为以下表现形式:
需要注意的是,因为程序中数组的下标是从1开始设置的,所以第0个节点元素为默认值0。这里数字代表层级,同时也代表了当前层级的节点,相同的数字个数就是这一层级的节点数。其中0为2个(因为分配时下标是从1开始的,所以第0个位置是默认值0,实际上第0层元素只有一个,就是头节点),1为2个,2为4个,3为8个,4为16个,n为2的n次方个,直到11,也就是11有2的11次方个。
继续剖析 cur.allocate(buf, reqCapacity, normCapacity):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 boolean allocate (PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { final long handle; if ((normCapacity & subpageOverflowMask) != 0 ) { handle = allocateRun(normCapacity); } else { handle = allocateSubpage(normCapacity); } if (handle < 0 ) { return false ; } ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null ; initBuf(buf, nioBuffer, handle, reqCapacity); return true ; } private long allocateRun (int normCapacity) { int d = maxOrder - (log2(normCapacity) - pageShifts); int id = allocateNode(d); if (id < 0 ) { return id; } freeBytes -= runLength(id); return id; }
allocateNode()方法的具体实现代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private int allocateNode (int d) { int id = 1 ; int initial = - (1 << d); byte val = value(id); if (val > d) { return -1 ; } while (val < d || (id & initial) == 0 ) { id <<= 1 ; val = value(id); if (val > d) { id ^= 1 ; val = value(id); } } byte value = value(id); assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d" , value, id & initial, d); setValue(id, unusable); updateParentsAlloc(id); return id; }
上述代码实际上是从第一个节点往下找的,找到层级为d、未被使用的节点。找到相关节点后通过 setValue 将当前节点设置为不可用,其中id是当前节点的下标,unusable代表一个不可用的值,这里是12,因为我们的层级只有12层,所以设置为12之后就相当于标记不可用。设置为不可用之后,通过updateParentsAlloc(id)逐层设置为缓存被使用的状态。updateParentsAlloc()方法的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private void updateParentsAlloc (int id) { while (id > 1 ) { int parentId = id >>> 1 ; byte val1 = value(id); byte val2 = value(id ^ 1 ); byte val = val1 < val2 ? val1 : val2; setValue(parentId, val); id = parentId; } }
这里其实是将循环兄弟节点的值替换成父节点的值,可以通过注释仔细地进行逻辑分析。简单起见,这里只设置三层,如下图所示:
我们模拟其分配场景,假设只有三层,其中index代表数组memoryMap的下标,value代表其值,memoryMap中的值就为[0,0,1,1,2,2,2,2]。我们要分配一个4MB的ByteBuf,在调用allocateNode(int d)中传入的d是2,也就是第二层。根据上面分析的逻辑,这里会找到第二层的第一个节点,也就是0~4MB这个节点,找到之后将其设置为不可用,这样memoryMap中的值就为[0,0,1,1,12,2,2,2],二叉树的结构就会如下图所示:
注意深色节点部分,将index为4的节点设置为不可用,之后则将向上设置不可用,循环将兄弟节点数值较小的节点替换到父节点,也就是将index为2的节点的值替换成了index为5的节点的值,这样数组的值就会变为[0,1,2,1,12,2,2,2],二叉树的结构如下图所示:
注意:这里深色节点仅仅代表节点变化,并不是当前节点为不可用状态,不可用状态的真正判断依据是value值为12。
这样,如果再次分配一个4MB内存的ByteBuf,根据其逻辑,则会找到第二层的第二个节点,也就是4~8MB。再根据我们的逻辑,通过向上设置不可用,index为2就会设置成不可用状态,将value的值设置为12,数组的值变为[0,1,12,1,12,12,2,2],二叉树的结构如下图所示:
可以看到,分配两个4MB的ByteBuf之后,当前节点和其父节点都会设置成不可用状态,当index=2的节点设置为不可用之后,将不会再找这个节点下的子节点。依此类推,直到所有的内存分配完毕,index为1的节点也会变成不可用状态,这样所有的Page就都分配完毕,Chunk中再无可用节点。
根据以上分析可知,PoolChunk的allocate方法中的 handle = allocateRun(normCapacity);
或 handle = allocateSubpage(normCapacity);
,返回的就是memoryMap的一个下标。通过这个下标,我们能唯一地定位一块内存。继续往下跟,通过 initBuf(buf, nioBuffer, handle, reqCapacity) 初始化ByteBuf后,再通过 qInit.add(c)将新创建的Chunk添加到ChunkList中,我们看 initBuf() 方法的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 void initBuf (PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity) { int memoryMapIdx = memoryMapIdx(handle); int bitmapIdx = bitmapIdx(handle); if (bitmapIdx == 0 ) { byte val = value(memoryMapIdx); assert val == unusable : String.valueOf(val); buf.init(this , nioBuffer, handle, runOffset(memoryMapIdx) + offset, reqCapacity, runLength(memoryMapIdx), arena.parent.threadCache()); } else { initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx, reqCapacity); } } private int runOffset (int id) { int shift = id ^ 1 << depth(id); return shift * runLength(id); } private int runLength (int id) { return 1 << log2ChunkSize - depth(id); }
runOffset(memoryMapIdx) + offset 表示偏移量,偏移量相当于分配给缓冲区的这块内存相对于Chunk中申请的内存的首地址偏移了多少。参数runLength(memoryMapIdx)表示根据下标获取可分配的最大长度。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void init (PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { init0(chunk, nioBuffer, handle, offset, length, maxLength, cache); } private void init0 (PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { assert handle >= 0 ; assert chunk != null ; this .chunk = chunk; memory = chunk.memory; tmpNioBuf = nioBuffer; allocator = chunk.arena.parent; this .cache = cache; this .handle = handle; this .offset = offset; this .length = length; this .maxLength = maxLength; }
以上就是 PooledByteBuf 在Page级别分配的完整流程。
SubPage级别的内存分配 如果分配一个缓冲区大小远小于Page,直接在一个Page上进行分配会造成内存浪费,所以需要将Page继续切分成多个子块进行分配,子块分配的个数根据要分配的缓冲区大小而定,比如只需要分配1KB的内存,就将一个Page分成8等分。
简单起见,仅以16字节为例,讲解其分配逻辑。在分析其逻辑前,首先看PoolArena的一个属性:
1 private final PoolSubpage<T>[] tinySubpagePools;
这个属性是一个PoolSubpage的数组,有点类似于一个SubPage的缓存。我们创建一个SubPage之后,会将创建好的SubPage与PoolArena中tinySubpagePools数组的每一个元素进行关联,下次再分配的时候可以直接通过tinySubpagePools数组元素去找关联的SubPage。而tinySubpagePools是在PoolArena的构造方法中初始化的,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 protected PoolArena (PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) { tinySubpagePools = newSubpagePoolArray(numTinySubpagePools); for (int i = 0 ; i < tinySubpagePools.length; i ++) { tinySubpagePools[i] = newSubpagePoolHead(pageSize); } } private PoolSubpage<T>[] newSubpagePoolArray(int size) { return new PoolSubpage[size]; } private PoolSubpage<T> newSubpagePoolHead (int pageSize) { PoolSubpage<T> head = new PoolSubpage<T>(pageSize); head.prev = head; head.next = head; return head; }
由上面代码知道,SubPage其实也是一个双向链表,这里将Head的上一个节点和下一个节点都设置为自身,有关PoolSubpage的关联关系,我们稍后分析。通过循环创建PoolSubpage,总共创建32个SubPage,每个SubPage实际代表一块内存大小,如下图所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private long allocateSubpage (int normCapacity) { PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity); int d = maxOrder; synchronized (head) { int id = allocateNode(d); if (id < 0 ) { return id; } final PoolSubpage<T>[] subpages = this .subpages; final int pageSize = this .pageSize; freeBytes -= pageSize; int subpageIdx = subpageIdx(id); PoolSubpage<T> subpage = subpages[subpageIdx]; if (subpage == null ) { subpage = new PoolSubpage<T>(head, this , id, runOffset(id), pageSize, normCapacity); subpages[subpageIdx] = subpage; } else { subpage.init(head, normCapacity); } return subpage.allocate(); } }
通过 PoolSubpagehead=arena.findSubpagePoolHead(normCapacity) 这种方式找到Head节点,实际上这里Head就是之前分析的tinySubpagePools属性的第一个节点,也就是对应16Byte的那个节点。
int d=maxOrder 是将11赋值给d,也就是在内存树的第11层取节点,这部分上一小节已经剖析过。int id=allocateNode(d) 获取的是上一小节分析过的字节数组 memoryMap 的下标,这里指向一个Page,如果是第一次分配,指向的是0~8kB的那个Page。
final PoolSubpage[]subpages=this.subpages 这一步是获得PoolChunk中成员变量SubPages的值,也是个PoolSubpage的数组,在PoolChunk进行初始化的时候,也会初始化该数组,长度为2048。也就是说每个Chunk都维护着一个SubPage的列表。如果每一个Page级别的内存都需要被切分成SubPage,则会将这个Page放入该列表中,专门用于分配SubPage。
所以这个列表中的SubPage其实就是一个用于切分的Page。SubPages如下图所示:
int subpageIdx=subpageIdx(id) 这一步是通过id获得PoolSubpage数组的下标,如果id对应的Page是0~8KB的节点,这里获得的下标就是0。在 if(subpage==null) 中,因为默认SubPages只是创建一个数组,并没有往数组中赋值,所以第一次执行到这里会返回true,跟到if块中,代码如下
1 2 subpage = new PoolSubpage<T>(head, this , id, runOffset(id), pageSize, normCapacity); subpages[subpageIdx] = subpage;
通过new PoolSubpage创建一个新的SubPage后,通过subpages[subpageIdx]=subpage将新创建的SubPage根据下标赋值到SubPages中的元素。在new PoolSubpage的构造方法中,传入Head,就是之前提到的tinySubpagePools属性中的节点,如果分配16字节的缓冲区,这里对应的就是第一个节点,PoolSubpage构造方法的代码如下:
1 2 3 4 5 6 7 8 9 PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) { this .chunk = chunk; this .memoryMapIdx = memoryMapIdx; this .runOffset = runOffset; this .pageSize = pageSize; bitmap = new long [pageSize >>> 10 ]; init(head, elemSize); }
这里重点关注bitmap属性,这是一个long类型的数组,初始大小为8,这只是初始化的大小,真正的大小要根据Page切分成多少块确定,这里将属性进行了赋值,我们看init()方法的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void init (PoolSubpage<T> head, int elemSize) { doNotDestroy = true ; this .elemSize = elemSize; if (elemSize != 0 ) { maxNumElems = numAvail = pageSize / elemSize; nextAvail = 0 ; bitmapLength = maxNumElems >>> 6 ; if ((maxNumElems & 63 ) != 0 ) { bitmapLength ++; } for (int i = 0 ; i < bitmapLength; i ++) { bitmap[i] = 0 ; } } addToPool(head); }
this.elemSize=elemSize 表示保存当前分配的缓冲区大小,因为以16字节为例,所以这里是16。maxNumElems=numAvail=pageSize/elemSize 初始化了两个属性maxNumElems和numAvail,值都为pageSize/elemSize,表示一个Page大小除以分配的缓冲区大小,也就是表示当前Page被划分了多少份。numAvail表示剩余可用的块数,由于第一次分配都是可用的,所以numAvail=maxNumElems。bitmapLength表示bitmap的实际大小,已经分析过,bitmap初始化大小为8,但实际上并不一定需要8个元素,元素个数要根据Page切分的子块而定,这里的大小是所切分的子块数除以64。再往下看,代码if((maxNumElems&63)!=0)用于判断maxNumElems也就是当前配置所切分的子块是不是64的倍数,如果不是,则bitmapLength加1,最后通过循环,将对应位置的子块标记为0。
这里详细分析一下bitmap,它是个long类型的数组,long数组中的每一个值,也都是long类型的数字,其中每一个比特位都标记着Page中每一个子块的内存是否已分配,如果比特位是1,表示该子块已分配;如果比特位是0,表示该子块未分配,标记顺序是其二进制数从低位到高位进行排列。我们应该知道为什么bitmap大小要设置为子块数量除以64,因为long类型的数字是64位,每一个元素都能记录64个子块的数量,这样就可以通过SubPage个数除以64的方式决定bitmap中元素的数量。如果子块不能整除64,则通过元素数量+1的方式,除以64之后剩余的子块通过long中比特位由低到高进行排列记录,其逻辑结构如下图所示:
Chunk级别的内存分配 当需要分配的内存大小大于 ChunkSize 时,Netty 采用非池化策略直接分配内存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private void allocateHuge (PooledByteBuf<T> buf, int reqCapacity) { PoolChunk<T> chunk = newUnpooledChunk(reqCapacity); activeBytesHuge.add(chunk.chunkSize()); buf.initUnpooled(chunk, reqCapacity); allocationsHuge.increment(); } @Override protected PoolChunk<ByteBuffer> newUnpooledChunk (int capacity) { if (directMemoryCacheAlignment == 0 ) { return new PoolChunk<ByteBuffer>(this , allocateDirect(capacity), capacity, 0 ); } final ByteBuffer memory = allocateDirect(capacity + directMemoryCacheAlignment); return new PoolChunk<ByteBuffer>(this , memory, capacity, offsetCacheLine(memory)); } private static ByteBuffer allocateDirect (int capacity) { return PlatformDependent.useDirectBufferNoCleaner() ? PlatformDependent.allocateDirectNoCleaner(capacity) : ByteBuffer.allocateDirect(capacity); }
ByteBuf的内存回收 堆外内存是不受JVM垃圾回收机制控制的,分配一块堆外内存进行ByteBuf操作完后要对对象进行回收。
ByteBUf的回收就是调用release() 方法,不管是哪种类型的 ByteBuf 都会调用到 release0 这个方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public boolean release (int decrement) { return handleRelease(updater.release(this , decrement)); } private boolean retryRelease0 (T instance, int decrement) { for (;;) { int rawCnt = updater().get(instance), realCnt = toLiveRealRefCnt(rawCnt, decrement); if (decrement == realCnt) { if (tryFinalRelease0(instance, rawCnt)) { return true ; } } else if (decrement < realCnt) { if (updater().compareAndSet(instance, rawCnt, rawCnt - (decrement << 1 ))) { return false ; } } else { throw new IllegalReferenceCountException(realCnt, -decrement); } Thread.yield(); } } private boolean handleRelease (boolean result) { if (result) { deallocate(); } return result; }
主要是判断剩余引用数是否大于 decrement,如果大于则通过CAS修改引用数。然后通过deallocate()方法进行释放,以 UnpooledDirectByteBuf 为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Override protected final void deallocate () { if (handle >= 0 ) { final long handle = this .handle; this .handle = -1 ; memory = null ; chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache); tmpNioBuf = null ; chunk = null ; recycle(); } } void free (PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) { if (chunk.unpooled) { int size = chunk.chunkSize(); destroyChunk(chunk); activeBytesHuge.add(-size); deallocationsHuge.increment(); } else { SizeClass sizeClass = sizeClass(normCapacity); if (cache != null && cache.add(this , chunk, nioBuffer, handle, normCapacity, sizeClass)) { return ; } freeChunk(chunk, handle, sizeClass, nioBuffer, false ); } }