聊一聊 Netty 数据搬运工 ByteBuf 体系的设计与实现
本文基于 Netty 4.1.56.Final 版本进行讨论韶光芿苒,岁月如梭,好久没有给大家更新 Netty 相干的文章了,在断更 Netty 的这段日子里,笔者一直在持续更新Linux 内存管理相干的文章 ,如今为止,算是将 Linux 内存管理子系统相干的主干源码较为完整的给大家呈现了出来,同时也结识了很多喜好内核的读者,经常在背景留言讨论一些代码的设计细节,在这个过程中,我们相互分享,相互学习,浓浓的感受到了大家对技术那份纯粹的热爱,对于我自己来说,也是一种激励,学习,提高的机会。
之前系列文章的视角一直是停顿在内核态,笔者试图从 Linux 内核的角度来为大家揭秘内存管理的本质,那么从本日开始,我们把视角在往上挪一挪,从内核态转换到用户态,继续沿着内存管理这条主线,来看一看用户态的内存管理是怎样进行的。
接下来笔者计划用三篇文章的篇幅为大家剖析一下 Netty 的内存管理模块,本文是第一篇,主要是围绕 Netty 内存管理的外围介绍一下ByteBuf 的总体设计。
https://img2024.cnblogs.com/blog/2907560/202408/2907560-20240814100227882-1209248984.png
别看 ByteBuf 体系涉及到的类比较多,一眼望过去比较头大,但是我们按照差别的视角,将它们逐一分类,整个体系脉络就变得很清晰了:
[*]从 JVM 内存区域布局的角度来看,Netty 的 ByteBuf 主要分为 HeapByteBuf(堆内) 和 DirectByteBuf(堆外)这两种类型。
[*]从内存管理的角度来看,Netty 的 ByteBuf又分为 PooledByteBuf (池化)和 UnpooledByteBuf(非池化)两种子类型。一种是被内存池同一管理,另一种则宁静凡的 ByteBuf 一样,用的时候临时创建,不用的时候开释。
[*]从内存访问的角度来看,Netty 又将 ByteBuf 分为了 UnsafeByteBuf 宁静凡的 ByteBuf。UnsafeByteBuf 主要是依赖 Unsafe 类提供的底层 API 来直接对内存地点进行操作。而平凡 ByteBuf 对内存的操作主要是依赖 NIO 中的 ByteBuffer。
[*]从内存回收的角度来看,ByteBuf 又分为了带 Cleaner 的 ByteBuf 以及不带 Cleaner 的 NoCleanerByteBuf,Cleaner 在 JDK 中是用来开释 NIO ByteBuffer 背后所引用的 Native Memory 的,内存的开释由 JVM 同一管理。而 NoCleanerByteBuf 背后的 Native Memory 则需要我们进行手动开释。
[*]从内存占用统计的角度来说,Netty 又近一步将 ByteBuf 分为了 InstrumentedByteBuf 宁静凡的 ByteBuf,此中 InstrumentedByteBuf 会带有内存占用相干 Metrics 的统计供我们进行监控,而平凡的 ByteBuf 则不带有热任何 Metrics。
[*]从零拷贝的角度来看,Netty 又引入了 CompositeByteBuf,目的是为多个 ByteBuf 在聚合的时候提供一个同一的逻辑视图,将多个 ByteBuf 聚合成一个逻辑上的 CompositeByteBuf,而传统的聚合操作则是首先要分配一个大的 ByteBuf,然后将需要聚合的多个 ByteBuf 中的内容在拷贝到新的 ByteBuf 中。CompositeByteBuf 避免了分配大段内存以及内存拷贝的开销。注意这里的零拷贝指的是 Netty 在用户态层面自己实现的避免内存拷贝的设计,而不是 OS 层面上的零拷贝。
[*]别的 Netty 的 ByteBuf 支持引用计数以及自动地内存泄露探测,如果有内存泄露的情况,Netty 会将详细发生泄露的位置陈诉出来。
[*]Netty 的 ByteBuf 支持扩容,而 NIO 的 ByteBuffer 则不支持扩容,
在将 Netty 的 ByteBuf 设计体系梳理完整之后,我们就会发现,Netty 的 ByteBuf 其实是对 JDKByteBuffer 的一种扩展和完善,所以下面笔者的行文思路是与 JDKByteBuffer 对比着进行介绍 Netty 的 ByteBuf ,有了对比,我们才能更加深刻的体会到 Netty 设计的精妙。
https://img2024.cnblogs.com/blog/2907560/202408/2907560-20240814100248603-1140014078.png
1. JDK 中的 ByteBuffer 设计有何不妥
笔者曾在 《一步一图带你深入剖析 JDK NIO ByteBuffer 在差别字节序下的设计与实现》 一文中完整的介绍过 JDK ByteBuffer 的整个设计体系,下面我们来简短回忆一下 ByteBuffer 的几个核心要素。
public abstract class Buffer {
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
}https://img2024.cnblogs.com/blog/2907560/202408/2907560-20240814100306862-468854608.png
[*]capacity 规定了整个 Buffer 的容量,详细可以容纳多少个元素。capacity 之前的元素均是 Buffer 可操作的空间,JDK 中的 ByteBuffer 是不可扩容的。
[*]position 用于指向 Buffer 中下一个可操作性的元素,初始值为 0。对于 Buffer 的读写操作全部都共用这一个 position 指针,在 Buffer 的写模式下,position 指针用于指向下一个可写位置。在读模式下,position 指针指向下一个可读位置。
[*]limit 用于限定 Buffer 可操作元素的上限,position 指针不能超过 limit。
由于 JDK ByteBuffer 只设计了一个 position 指针,所以我们在读写 ByteBuffer 的时候需要不断的调解 position 的位置。比如,利用 flip() ,rewind(),compact(),clear() 等方法不断的进行读写模式的切换。
一些详细的场景体现就是,当我们对一个 ByteBuffer 进行写入的时候,随着数据不断的向 ByteBuffer 写入,position 指针会不断的向后移动。在写入操作完成之后,如果我们想要从 ByteBuffer 读取刚刚写入的数据就麻烦了。
由于 JDK 在对 ByteBuffer 的设计中读写操作都是混用一个 position 指针,所以在读取 ByteBuffer 之前,我们还需要通过flip() 调解 position 的位置,进行读模式的切换。
https://img2024.cnblogs.com/blog/2907560/202408/2907560-20240814100402210-553083879.png
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}当我们将 ByteBuffer 中的数据全部读取完之后,如果再次向 ByteBuffer 写入数据,那么还需要重新调解 position 的位置,通过 clear()来进行写模式的切换。
https://img2024.cnblogs.com/blog/2907560/202408/2907560-20240814100419177-2078325530.png
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}如果我们只是部分读取了 ByteBuffer 中的数据而不是全部读取,那么在写入的时候,为了避免未被读取的部分被接下来的写入操作覆盖,我们则需要通过 compact() 方法来切换写模式。
https://img2024.cnblogs.com/blog/2907560/202408/2907560-20240814100441508-1553946306.png
class HeapByteBuffer extends ByteBuffer {
//HeapBuffer中底层负责存储数据的数组
final byte[] hb;
public ByteBuffer compact() {
System.arraycopy(hb, ix(position()), hb, ix(0), remaining());
position(remaining());
limit(capacity());
discardMark();
return this;
}
public final int remaining() {
return limit - position;
}
final void discardMark() {
mark = -1;
}
}从上面列举的这些读写 ByteBuffer 场景可以看出,当我们在操作 ByteBuffer 的时候,需要时刻保持头脑清醒,对 ByteBuffer 中哪些部分是可读的,哪些部分是可写的要有一个清醒的认识,稍不留心就会堕落。在复杂的编解码逻辑中,如果使用 ByteBuffer 的话,就需要不断的进行读写模式的切换,切的切的人就傻了。
除了对 ByteBuffer 的相干操作比较麻烦之外,JDK 对于 ByteBuffer 没有设计池化管理机制,而面临大量需要使用堆外内存的场景,我们就需要不断的创建 DirectBuffer,DirectBuffer 在使用完之后,回收又是个问题。
JDK 自身对于 DirectBuffer 的回收是有延迟的,我们需要比及一次 FullGc ,这些 DirectBuffer 背后引用的 Native Memory 才能被 JVM 自动回收。所以为了及时回收这些 Native Memory ,我们又需要操心 DirectBuffer 的手动开释。
JDK 的 ByteBuffer 不支持引用计数,没有引用计数的设计,我们就无从得知一个 DirectBuffer 被引用了多少次,又被开释了多少次,面临 DirectBuffer 引起的内存泄露问题,也就无法进行自动探测。
别的 JDK 的 ByteBuffer 不支持动态按需自适应扩容,当一个 ByteBuffer 被创建出来之后,它的容量就固定了。但实际上,我们很难在一开始就能正确的评估出到底需要多大的 ByteBuffer。分配的容量大了,会造成浪费。分配的容量小了,我们又需要每次在写入的时候判断剩余容量是否充足,如果不敷,又需要手动去申请一个更大的 ByteBuffer,然后在将原有 ByteBuffer 中的数据迁移到新的 ByteBuffer 中,想想都麻烦。
还有就是当多个 JDK 的 ByteBuffer 在面临归并聚合的场景,总是要先创建一个更大的 ByteBuffer,然后将原有的多个 ByteBuffer 中的内容在拷贝到新的 ByteBuffer 中。这就涉及到了内存分配和拷贝的开销。
那为什么不能利用原有的这些 ByteBuffer 所占用的内存空间,在此底子上只创建一个逻辑上的视图 ByteBuffer,将对视图 ByteBuffer 的逻辑操作全部转移到原有的内存空间上,如许一来不就可以省去重新分配内存以及内存拷贝的开销了么 ?
下面我们就来一起看下,Netty 中的 ByteBuf 是怎样解决并完善上述问题的~~~
2. Netty对于 ByteBuf 的设计与实现
在之前介绍 JDK ByteBuffer 团体设计的时候,笔者是以 HeapByteBuffer 为例将 ByteBuffer 的整个设计体系串联起来的,那么本文笔者将会用 DirectByteBuf 为大家串联 Netty ByteBuf 的设计体系。
2.1 ByteBuf 的基本结构
public abstract class AbstractByteBuf extends ByteBuf {
int readerIndex;
int writerIndex;
private int markedReaderIndex;
private int markedWriterIndex;
private int maxCapacity;
}
public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf {
private int capacity;
}https://img2024.cnblogs.com/blog/2907560/202408/2907560-20240814100504494-1924105090.png
为了避免 JDK ByteBuffer 在读写模式下共用一个 position 指针所引起的繁琐操作,Netty 为 ByteBuf 引入了两个指针,readerIndex 用于指向 ByteBuf 中第一个可读字节位置,writerIndex 用于指向 ByteBuf 中第一个可写的字节位置。有了这两个独立的指针之后,我们在对 Netty ByteBuf 进行读写操作的时候,就不需要进行繁琐的读写模式切换了。与之对应的 markedReaderIndex,markedWriterIndex 用于支持 ByteBuf 相干的 mark 和 reset 操作,这一点和 JDK中的设计保持同等。
@Override
public ByteBuf markReaderIndex() {
markedReaderIndex = readerIndex;
return this;
}
@Override
public ByteBuf resetReaderIndex() {
readerIndex(markedReaderIndex);
return this;
}
@Override
public ByteBuf markWriterIndex() {
markedWriterIndex = writerIndex;
return this;
}
@Override
public ByteBuf resetWriterIndex() {
writerIndex(markedWriterIndex);
return this;
}由于 JDK ByteBuffer 在设计上不支持扩容机制,所以 Netty 为 ByteBuf 额外引入了一个新的字段 maxCapacity,用于表现 ByteBuf 容量最多只能扩容至 maxCapacity。
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
}Netty ByteBuf 的 capacity 与 JDK ByteBuffer 中的 capacity 含义保持同等,用于表现 ByteBuf 的初始容量大小,也就是下面在创建 UnpooledDirectByteBuf 的时候传入的 initialCapacity 参数。
public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf {
// Netty ByteBuf 底层依赖的 JDK ByteBuffer
ByteBuffer buffer;
// ByteBuf 初始的容量,也是真正的内存占用
private int capacity;
public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
// 设置最大可扩容的容量
super(maxCapacity);
this.alloc = alloc;
// 按照 initialCapacity 指定的初始容量,创建 JDK ByteBuffer
setByteBuffer(allocateDirect(initialCapacity), false);
}
void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
// UnpooledDirectByteBuf 底层会依赖一个 JDK 的 ByteBuffer
// 后续对 UnpooledDirectByteBuf 的操作, Netty 全部会代理到 JDK ByteBuffer 中
this.buffer = buffer;
// 初始指定的 ByteBuf 容量 initialCapacity
capacity = buffer.remaining();
}
}由此一来,Netty 中的 ByteBuf 就会被 readerIndex,writerIndex,capacity,maxCapacity 这四个指针分割成四个部分,上图中笔者以按照差别的颜色进行了区分。
[*]此中 [0 , capacity) 这部分是创建 ByteBuf 的时候分配的初始容量,这部分是真正占用内存的,而 [capacity , maxCapacity)这部分表现 ByteBuf 可扩容的容量,这部分还未分配内存。
[*][0 , readerIndex) 这部分字节是已经被读取过的字节,是可以被丢弃的范围。
[*][readerIndex , writerIndex) 这部分字节表现 ByteBuf 中可以被读取的字节。
[*][writerIndex , capacity) 这部分表现 ByteBuf 的剩余容量,也就是可以写入的字节范围。
这四个指针他们之间的关系为 :0writerIndex; } @Override public int writableBytes() { return capacity() - writerIndex; }当 ByteBuf 的容量已经被写满,变为不可写的时候,如果继续对 ByteBuf 进行写入,那么就需要扩容了,但扩容后的 capacity 最大不能超过 maxCapacity。
private static void checkIndexBounds(final int readerIndex, final int writerIndex, final int capacity) {
if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex: %d, writerIndex: %d (expected: 0 <= readerIndex <= writerIndex <= capacity(%d))",
readerIndex, writerIndex, capacity));
}
}2.2 ByteBuf 的读取操作
明白了 ByteBuf 基本结构之后,我们来看一下针对 ByteBuf 的读写等基本操作是怎样进行的。Netty 支持以多种基本类型为粒度对 ByteBuf 进行读写,除此之外还支持 Unsigned 基本类型的转换以及大小端的转换。下面笔者以 Byte 和 Int 这两种基本类型为例对 ByteBuf 的读取操作进行说明。
ByteBuf 中的 get 方法只是单纯地从 ByteBuf 中读取数据,并不改变其 readerIndex 的位置,我们可以通过 getByte 从 ByteBuf 中的指定位置 index 读取一个 Byte 出来,也可以通过 getUnsignedByte 从 ByteBuf 读取一个 Byte 并转换成 UnsignedByte 。
@Override
public boolean isReadable() {
return writerIndex > readerIndex;
}
@Override
public int readableBytes() {
return writerIndex - readerIndex;
}其底层依赖的是一个抽象方法 _getByte,由 AbstractByteBuf 详细的子类负责实现。比如,在 UnpooledDirectByteBuf 类的实现中,直接将 _getByte 操作代理给其底层依赖的 JDK DirectByteBuffer。
@Override
public boolean isWritable() {
return capacity() > writerIndex;
}
@Override
public int writableBytes() {
return capacity() - writerIndex;
}而在 UnpooledUnsafeDirectByteBuf 类的实现中,则是通过 sun.misc.Unsafe 直接从对应的内存地点中读取。
final void ensureWritable0(int minWritableBytes) {
// minWritableBytes 表示本次要写入的字节数
// 获取当前 writerIndex 的位置
final int writerIndex = writerIndex();
// 为满足本次的写入操作,预期的 ByteBuf 容量大小
final int targetCapacity = writerIndex + minWritableBytes;
// 如果 targetCapacity 在(capacity , maxCapacity] 之间,则进行扩容
if (targetCapacity >= 0 & targetCapacity <= capacity()) {
// targetCapacity 在 之间,则无需扩容,本来就可以满足
return;
}
// 扩容后的 capacity 最大不能超过 maxCapacity
if (checkBounds && (targetCapacity < 0 || targetCapacity > maxCapacity)) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
..... 扩容 ByteBuf ......
}Netty 别的还提供了批量读取 Bytes 的操作,比如我们可以通过 getBytes 方法将 ByteBuf 中的数据读取到一个字节数组 byte[] 中,也可以读取到另一个 ByteBuf 中。
public abstract class AbstractByteBuf extends ByteBuf {
@Override
public byte getByte(int index) {
// 检查 index 的边界,index 不能超过 capacity(index < capacity)
checkIndex(index);
return _getByte(index);
}
@Override
public short getUnsignedByte(int index) {
// 将获取到的 Byte 转换为 UnsignedByte
return (short) (getByte(index) & 0xFF);
}
protected abstract byte _getByte(int index);
}通过 getBytes 方法将原来 ByteBuf 的数据读取到目的 ByteBuf 之后,原来 ByteBuf 的 readerIndex 不会发生变化,但是目的 ByteBuf 的 writerIndex 会重新调解。
对于 UnpooledDirectByteBuf 类的详细实现来说自然是将 getBytes 的操作直接代理给其底层依赖的 JDK DirectByteBuffer。对于 UnpooledUnsafeDirectByteBuf 类的详细实现来说,则是通过 UNSAFE.copyMemory 直接根据内存地点进行拷贝。
而 ByteBuf 中的 read 方法则不仅会从 ByteBuf 中读取数据,而且会改变其 readerIndex 的位置。比如,readByte 方法首先会通过前面介绍的 _getByte 从 ByteBuf 中读取一个字节,然后将 readerIndex 向后移动一位。
public class UnpooledDirectByteBuf{
// 底层依赖 JDK 的 DirectByteBuffer
ByteBuffer buffer;
@Override
protected byte _getByte(int index) {
return buffer.get(index);
}
}同样 Netty 也提供了从 ByteBuf 中批量读取数据的方法 readBytes,我们可以将一个 ByteBuf 中的数据通过 readBytes 方法读取到另一个 ByteBuf 中。但是这里,Netty 将会改变原来 ByteBuf 的 readerIndex 以及目的 ByteBuf 的 writerIndex。
public class UnpooledUnsafeDirectByteBuf {
// 直接操作 OS 的内存地址
long memoryAddress;
@Override
protected byte _getByte(int index) {
// 底层依赖 PlatformDependent0,直接通过内存地址读取 byte
return UnsafeByteBufUtil.getByte(addr(index));
}
final long addr(int index) {
// 获取偏移 index 对应的内存地址
return memoryAddress + index;
}
}
final class PlatformDependent0 {
// sun.misc.Unsafe
static final Unsafe UNSAFE;
static byte getByte(long address) {
return UNSAFE.getByte(address);
}
}别的我们还可以明确指定 dstIndex,使得我们可以从目的 ByteBuf 中的某一个位置处开始拷贝原来 ByteBuf 中的数据,但这里只会改变原来 ByteBuf 的 readerIndex,并不会改变目的 ByteBuf 的 writerIndex。这也很好理解,由于我们在写入目的 ByteBuf的时候已经明确指定了 writerIndex(dstIndex),自然在写入完成之后,writerIndex 的位置并不需要改变。
@Override
public ByteBuf getBytes(int index, byte[] dst) {
getBytes(index, dst, 0, dst.length);
return this;
}
public abstract ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length);
@Override
public ByteBuf getBytes(int index, ByteBuf dst, int length) {
getBytes(index, dst, dst.writerIndex(), length);
// 调整 dst 的writerIndex
dst.writerIndex(dst.writerIndex() + length);
return this;
}
// 注意这里的 getBytes 方法既不会改变原来 ByteBuf 的 readerIndex 和 writerIndex
// 也不会改变目的 ByteBuf 的 readerIndex 和 writerIndex
public abstract ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length);除此之外,Netty 还支持将 ByteBuf 中的数据读取到差别的目的地,比如,读取到 JDK ByteBuffer 中,读取到 FileChannel 中,读取到 OutputStream 中,以及读取到 GatheringByteChannel 中。
@Override
public byte readByte() {
checkReadableBytes0(1);
int i = readerIndex;
byte b = _getByte(i);
readerIndex = i + 1;
return b;
}Netty 除了支持以Byte 为粒度对 ByteBuf 进行读写之外,还同时支持以多种基本类型对 ByteBuf 进行读写,这里笔者以 Int 类型为例进行说明。
我们可以通过 readInt() 从 ByteBuf 中读取一个 Int 类型的数据出来,随后 ByteBuf 的 readerIndex 向后移动 4 个位置。
@Override
public ByteBuf readBytes(ByteBuf dst, int length) {
readBytes(dst, dst.writerIndex(), length);
// 改变 dst 的 writerIndex
dst.writerIndex(dst.writerIndex() + length);
return this;
}同理,真正负责读取数据的方法 _getInt 方法需要由 AbstractByteBuf 详细的子类实现,但这里和 _getByte 差别的是,_getInt 需要考虑字节序的问题,由于网络协议接纳的是大端字节序传输,所以 Netty 的 ByteBuf 默认也是大端字节序。
在 UnpooledDirectByteBuf 的实现中,同样也是将 getInt 的操作直接代理给其底层依赖的 JDK DirectByteBuffer。
@Override
public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
checkReadableBytes(length);
getBytes(readerIndex, dst, dstIndex, length);
// 改变原来 ByteBuf 的 readerIndex
readerIndex += length;
return this;
}在 UnpooledUnsafeDirectByteBuf 的实现中,由于是通过 sun.misc.Unsafe 直接对内存地点进行操作,所以需要考虑字节序转换的细节。Netty 的 ByteBuf 默认是大端字节序,所以这里直接依次将低地点的字节放到 Int 数据的高位就可以了。
public abstract ByteBuf readBytes(ByteBuffer dst);
public abstract ByteBuf readBytes(OutputStream out, int length) throws IOException;
public abstract int readBytes(GatheringByteChannel out, int length) throws IOException;
public abstract int readBytes(FileChannel out, long position, int length) throws IOException;Netty 设计的这个丢弃字节的方法在解码的场景非常有用,由于 TCP 是一个面向流的网络协议,它只会根据滑动窗口的大小进行字节省的发送,所以我们在应用层接收到的数据可能是一个半包也可能是一个粘包,反正不会是一个完整的数据包。
这就要求我们在解码的时候,首先要判断 ByteBuf 中的数据是否构成一个完成的数据包,如果构成一个数据包,才会去读取 ByteBuf 中的字节,然后解码,随后 readerIndex 向后移动。
如果不够一个数据包,那就需要将 ByteBuf 累积缓存起来,一直比及一个完整的数据包到来。一种极度的情况是,即使我们已经解码很多次了,但是缓存的 ByteBuf 中仍然还有半包,由于不断的会有粘包过来,这就导致 ByteBuf 会越来越大。由于已经解码了很多次,所以 ByteBuf 中可以被丢弃的字节占据了很大的内存空间,如果半包情况持续存在,将会导致 OutOfMemory。
所以 Netty 规定,如果已经解码了 16 次之后,ByteBuf 中仍然有半包的情况,那么就会调用这里的 discardSomeReadBytes() 将已经解码过的字节全部丢弃,节省不须要的内存开销。
2.4 ByteBuf 的写入操作
ByteBuf 的写入操作与读取操作互为相反的操作,每一个读取方法 getBytes , readBytes , readInt 等都有一个对应的 setBytes , writeBytes , writeInt 等底子类型的写入操作。
和 get 方法一样,set 相干的方法也只是单纯的向 ByteBuf 中写入数据,并不会改变其 writerIndex 的位置,我们可以通过 setByte 向 ByteBuf 中的某一个指定位置 index 写入数据 value。
@Override
public int readInt() {
checkReadableBytes0(4);
int v = _getInt(readerIndex);
readerIndex += 4;
return v;
}
protected abstract int _getInt(int index);实行详细的写入操作同样也是一个抽象方法,其详细的实现由 AbstractByteBuf 详细的子类负责。对于 UnpooledDirectByteBuf 的实现来说,_setByte 操作直接会代理给其底层依赖的 JDKDirectByteBuffer。
public class UnpooledDirectByteBuf{
@Override
protected int _getInt(int index) {
// 代理给其底层依赖的 JDK DirectByteBuffer
return buffer.getInt(index);
}
}对于 UnpooledUnsafeDirectByteBuf 的实现来说,则是直接通过 sun.misc.Unsafe 向对应的内存地点(memoryAddress + index)写入 Byte。
public class UnpooledUnsafeDirectByteBuf {
@Override
protected int _getInt(int index) {
return UnsafeByteBufUtil.getInt(addr(index));
}
}
final class UnsafeByteBufUtil {
static int getInt(long address) {
return PlatformDependent.getByte(address) << 24 |
(PlatformDependent.getByte(address + 1) & 0xff) << 16 |
(PlatformDependent.getByte(address + 2) & 0xff) <<8 |
PlatformDependent.getByte(address + 3)& 0xff;
}
}Netty 别的也提供了向 ByteBuf 批量写入 Bytes 的操作,setBytes 方法用于向 ByteBuf 的指定位置 index 批量写入一个字节数组 byte[] 中的数据。
@Override
public int readIntLE() {
checkReadableBytes0(4);
int v = _getIntLE(readerIndex);
readerIndex += 4;
return v;
}
protected abstract int _getIntLE(int index);对于 UnpooledDirectByteBuf 的实现来说,同样也是将 setBytes 的操作直接代理给 JDK DirectByteBuffer,将字节数组 byte[] 中的字节直接写入 DirectByteBuffer 中。
对于 UnpooledUnsafeDirectByteBuf的实现来说,则是直接操作字节数组和 ByteBuf 的内存地点,通过 UNSAFE.copyMemory 将字节数组对应内存地点中的数据拷贝到 ByteBuf 相应的内存地点上。
我们还可以通过 setBytes 方法将其他 ByteBuf 中的字节数据写入到 ByteBuf 中。
public class UnpooledDirectByteBuf{
@Override
protected int _getIntLE(int index) {
// 切换字节序,从大端变小端
return ByteBufUtil.swapInt(buffer.getInt(index));
}
}这里需要注意的是被写入 ByteBuf 的 writerIndex 并不会改变,但是原来 ByteBuf 的 readerIndex 会重新调解。
ByteBuf 中的 write 方法底层依赖的是相干的 set 方法,差别的是 write 方法会改变 ByteBuf 中 writerIndex 的位置。比如,我们通过 writeByte 方法向 ByteBuf 中写入一个字节之后,writerIndex 就会向后移动一位。
public class UnpooledUnsafeDirectByteBuf {
@Override
protected int _getIntLE(int index) {
return UnsafeByteBufUtil.getIntLE(addr(index));
}
}
final class UnsafeByteBufUtil {
static int getIntLE(long address) {
return PlatformDependent.getByte(address) & 0xff |
(PlatformDependent.getByte(address + 1) & 0xff) <<8 |
(PlatformDependent.getByte(address + 2) & 0xff) << 16 |
PlatformDependent.getByte(address + 3) << 24;
}
}我们也可以通过 writeBytes 向 ByteBuf 中批量写入数据,将一个字节数组中的数据或者另一个 ByteBuf 中的数据写入到 ByteBuf 中,但是这里,Netty 将会改变被写入 ByteBuf 的 writerIndex 以及数据来源 ByteBuf的 readerIndex。
@Override
public long readUnsignedInt() {
return readInt() & 0xFFFFFFFFL;
}
@Override
public long readUnsignedIntLE() {
return readIntLE() & 0xFFFFFFFFL;
}如果我们明确指定了从数据来源 ByteBuf 中的哪一个位置(srcIndex)开始读取数据,那么数据来源 ByteBuf 中的 readerIndex 将不会被改变,只会改变被写入 ByteBuf 的 writerIndex。
@Override
public int writableBytes() {
return capacity() - writerIndex;
}除此之外,Netty 还支持从差别的数据来源向 ByteBuf 批量写入数据,比如,从 JDK ByteBuffer ,从 FileChannel ,从 InputStream ,以及从 ScatteringByteChannel 中。
public abstract class AbstractByteBuf extends ByteBuf {
@Override
public ByteBuf discardReadBytes() {
// readerIndex 为 0 表示没有可以丢弃的字节
if (readerIndex == 0) {
return this;
}
if (readerIndex != writerIndex) {
// 将 [readerIndex, writerIndex) 这段字节范围移动到 ByteBuf 的开头
// 也就是丢弃 readerIndex 之前的字节
setBytes(0, this, readerIndex, writerIndex - readerIndex);
// writerIndex 和 readerIndex 都向前移动 readerIndex 大小
writerIndex -= readerIndex;
// 重新调整 markedReaderIndex 和 markedWriterIndex 的位置
// 都对应向前移动 readerIndex 大小。
adjustMarkers(readerIndex);
readerIndex = 0;
} else {
// readerIndex = writerIndex 表示当前 ByteBuf 已经不可读了
// 将 readerIndex 之前的字节全部丢弃,ByteBuf 恢复到最初的状态
// 整个 ByteBuf 的容量都可以被写入
ensureAccessible();
adjustMarkers(readerIndex);
writerIndex = readerIndex = 0;
}
return this;
}
}Netty 除了支持以 Byte 为粒度向 ByteBuf 中写入数据之外,还同时支持以多种基本类型为粒度向写入 ByteBuf ,这里笔者以 Int 类型为例进行说明。
我们可以通过 writeInt() 向 ByteBuf 写入一个 Int 类型的数据,随后 ByteBuf 的 writerIndex 向后移动 4 个位置。
@Override
public ByteBuf discardSomeReadBytes() {
if (readerIndex > 0) {
// 当 ByteBuf 已经不可读了,则无条件丢弃已读字节
if (readerIndex == writerIndex) {
adjustMarkers(readerIndex);
writerIndex = readerIndex = 0;
return this;
}
// 当已读的字节数超过整个 ByteBuf 的一半容量时才会丢弃已读字节
if (readerIndex >= capacity() >>> 1) {
setBytes(0, this, readerIndex, writerIndex - readerIndex);
writerIndex -= readerIndex;
adjustMarkers(readerIndex);
readerIndex = 0;
return this;
}
}
return this;
}和写入 Byte 数据差别的是,这里需要考虑字节序,NettyByteBuf 默认是大端字节序,和网络协议传输使用的字节序保持同等。这里我们需要将待写入数据 value 的高位依次放入到 ByteBuf 的低地点上。
@Override
public ByteBuf setByte(int index, int value) {
checkIndex(index);
_setByte(index, value);
return this;
}
protected abstract void _setByte(int index, int value);同时 Netty 也支持以小端字节序向 ByteBuf 写入数据。
public class UnpooledDirectByteBuf{
// 底层依赖 JDK 的 DirectByteBuffer
ByteBuffer buffer;
@Override
protected void _setByte(int index, int value) {
buffer.put(index, (byte) value);
}
}这里需要将待写入数据 value 的低位依次放到 ByteBuf 的低地点上。
public class UnpooledUnsafeDirectByteBuf {
// 直接操作 OS 的内存地址,不依赖 JDK 的 buffer
long memoryAddress;
@Override
protected void _setByte(int index, int value) {
// 底层依赖 PlatformDependent0,直接向内存地址写入 byte
UnsafeByteBufUtil.setByte(addr(index), value);
}
final long addr(int index) {
// 获取偏移 index 对应的内存地址
return memoryAddress + index;
}
}
final class PlatformDependent0 {
// sun.misc.Unsafe
static final Unsafe UNSAFE;
static void putByte(long address, byte value) {
UNSAFE.putByte(address, value);
}
}2.5 ByteBuf 的扩容机制
在每次向 ByteBuf 写入数据的时候,Netty 都会调用 ensureWritable0 方法来判断当前 ByteBuf 剩余可写容量(capacity - writerIndex)是否能够满足本次需要写入的数据大小 minWritableBytes。如果剩余容量不敷,那么就需要对 ByteBuf 进行扩容,但扩容后的容量不能超过 maxCapacity 的大小。
https://img2024.cnblogs.com/blog/2907560/202408/2907560-20240814100749014-697724045.png
@Override
public ByteBuf setBytes(int index, byte[] src) {
setBytes(index, src, 0, src.length);
return this;
}
public abstract ByteBuf setBytes(int index, byte[] src, int srcIndex, int length);2.5.1 newCapacity 的计算逻辑
ByteBuf 的初始默认 capacity 为 256 个字节,初始默认 maxCapacity 为 Integer.MAX_VALUE 也就是 2G 大小。
@Override
public ByteBuf setBytes(int index, ByteBuf src, int length) {
setBytes(index, src, src.readerIndex(), length);
// 调整 src 的readerIndex
src.readerIndex(src.readerIndex() + length);
return this;
}
// 注意这里的 setBytes 方法既不会改变原来 ByteBuf 的 readerIndex 和 writerIndex
// 也不会改变目的 ByteBuf 的 readerIndex 和 writerIndex
public abstract ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length);为满足本次写入操作,对 ByteBuf 的最小容量要求为 minNewCapacity,它的值就是在 ensureWritable0 方法中计算出来的 targetCapacity , 计算方式为: minNewCapacity = writerIndex + minWritableBytes(本次将要写入的字节数)。
在 ByteBuf 的扩容逻辑中,Netty 设置了一个重要的阈值 CALCULATE_THRESHOLD, 大小为 4M,它决定了 ByteBuf 扩容的尺度。
@Override
public ByteBuf writeByte(int value) {
ensureWritable0(1);
_setByte(writerIndex++, value);
return this;
}如果 minNewCapacity 恰好等于 CALCULATE_THRESHOLD,那么扩容后的容量 newCapacity 就是 4M。
如果 minNewCapacity 大于 CALCULATE_THRESHOLD,那么 newCapacity 就会按照 4M 的尺度进行扩容,详细的扩容逻辑如下:
首先通过 minNewCapacity / threshold * threshold 计算出一个准备扩容之前的基准线,后面就会以此基准线为底子,按照 CALCULATE_THRESHOLD 的粒度进行扩容。
该基准线的要求必须是 CALCULATE_THRESHOLD 的最小倍数,而且必须要小于等于 minNewCapacity。
什么意思呢 ? 假设 minNewCapacity 为 5M,那么它的扩容基准线就是 4M , 这种情况下扩容之后的容量 newCapacity = 4M + CALCULATE_THRESHOLD = 8M 。
如果计算出来的基准线超过了 maxCapacity - 4M , 那么 newCapacity 直接就扩容到 maxCapacity 。
如果 minNewCapacity 小于 CALCULATE_THRESHOLD,那么 newCapacity 就会从 64 开始,一直循环 double , 也就是按照 64 的倍数进行扩容。直到 newCapacity 大于等于 minNewCapacity。
@Override
public ByteBuf writeBytes(ByteBuf src, int length) {
writeBytes(src, src.readerIndex(), length);
// 调整数据来源 ByteBuf 的 readerIndex
src.readerIndex(src.readerIndex() + length);
return this;
}2.5.4 自适应动态扩容
Netty 在接收网络数据的过程中,其实一开始是很难确定出该用多大容量的 ByteBuf 去接收的,所以 Netty 在一开始会首先预估一个初始容量 DEFAULT_INITIAL (2048)。
@Override
public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
ensureWritable(length);
setBytes(writerIndex, src, srcIndex, length);
// 调整被写入 ByteBuf 的 writerIndex
writerIndex += length;
return this;
}用初始容量为 2048 大小的 ByteBuf 去读取 socket 中的数据,在每一次读取完 socket 之后,Netty 都会评估 ByteBuf 的容量大小是否合适。如果每一次都能把 ByteBuf 装满,那说明我们预估的容量太小了,socket 中还有更多的数据,那么就需要对 ByteBuf 进行扩容,下一次读取 socket 的时候就换一个容量更大的 ByteBuf。
public ByteBuf writeBytes(ByteBuffer src)
public int writeBytes(InputStream in, int length)
public int writeBytes(ScatteringByteChannel in, int length) throws IOException
public int writeBytes(FileChannel in, long position, int length) throws IOExceptionNetty 会在一个 read loop 中不停的读取 socket 中的数据直到数据被读取完毕或者读满 16 次,竣事 read loop 停止读取。ByteBuf 越大那么 Netty 读取的次数就越少,ByteBuf 越小那么 Netty 读取的次数就越多,所以需要一种机制将 ByteBuf 的容量控制在一个公道的范围内。
Netty 会统计每一轮 read loop 总共读取了多少数据 —— totalBytesRead。
@Override
public ByteBuf writeInt(int value) {
ensureWritable0(4);
_setInt(writerIndex, value);
writerIndex += 4;
return this;
}
protected abstract void _setInt(int index, int value);在每一轮的 read loop 竣事之后,Netty 都会根据这个 totalBytesRead 来判断是否应该对 ByteBuf 进行扩容或者缩容,如许在下一轮 read loop 开始的时候,Netty 就可以用一个相对公道的容量去接收 socket 中的数据,尽量减少读取 socket 的次数。
public class UnpooledUnsafeDirectByteBuf {
@Override
protected void _setInt(int index, int value) {
// 以大端字节序写入 ByteBuf
UnsafeByteBufUtil.setInt(addr(index), value);
}
}
final class UnsafeByteBufUtil {
static void setInt(long address, int value) {
PlatformDependent.putByte(address, (byte) (value >>> 24));
PlatformDependent.putByte(address + 1, (byte) (value >>> 16));
PlatformDependent.putByte(address + 2, (byte) (value >>> 8));
PlatformDependent.putByte(address + 3, (byte) value);
}
}那么在什么情况下需要对 ByteBuf 扩容,每次扩容多少 ? 什么情况下需要对 ByteBuf 进行缩容,每次缩容多少呢 ?
这就用到了一个重要的容量索引结构 ——SIZE_TABLE,它里边定义索引了 ByteBuf 的每一种容量大小。相当于是扩缩容的容量索引表。每次扩容多少,缩容多少全部记录在这个容量索引表中。
@Override
public ByteBuf writeIntLE(int value) {
ensureWritable0(4);
_setIntLE(writerIndex, value);
writerIndex += 4;
return this;
}
protected abstract void _setIntLE(int index, int value);当索引容量小于 512 时,SIZE_TABLE 中定义的容量是从 16 开始按照 16 递增。
https://img2024.cnblogs.com/blog/2907560/202408/2907560-20240814100852341-897765222.png
当索引容量大于 512 时,SIZE_TABLE 中定义的容量是按前一个索引容量的 2 倍递增。
https://img2024.cnblogs.com/blog/2907560/202408/2907560-20240814100902873-956918241.png
那么当前 ByteBuf 的初始容量为 2048 , 它在 SIZE_TABLE 中的 index 为 33 。当一轮 read loop 读取完毕之后,如果发现 totalBytesRead 在SIZE_TABLE 与 SIZE_TABLE 之间的话,也就是如果本轮 read loop 竣事之后总共读取的字节数在 之间。说明此时分配的 ByteBuf 容量正好,不需要进行缩容也不需要进行扩容。比如本次 totalBytesRead = 2000,正利益在 1024 与 2048 之间。说明 2048 的容量正好。
如果 totalBytesRead 小于等于 SIZE_TABLE,也就是如果本轮 read loop 竣事之后总共读取的字节数小于等于1024。表现本次读取到的字节数比当前 ByteBuf 容量的下一级容量还要小,说明当前 ByteBuf 的容量分配的有些大了,设置缩容标识decreaseNow = true。当下次 read loop 的时候如果继续满足缩容条件,那么就开始进行缩容。缩容后的容量为 SIZE_TABLE,但不能小于SIZE_TABLE(16)。
注意,这里需要满足两次缩容条件才会进行缩容,且缩容步长为 1 (INDEX_DECREMENT),缩容比较谨慎。
如果 totalBytesRead大于等于当前 ByteBuf 容量—— nextReceiveBufferSize 时,说明 ByteBuf 的容量有点小了,需要进行扩容。扩容后的容量为 SIZE_TABLE,但不能超过 SIZE_TABLE(65535)。
满足一次扩容条件就进行扩容,而且扩容步长为 4 (INDEX_INCREMENT), 扩容比较奔放。
public class UnpooledUnsafeDirectByteBuf {
@Override
protected void _setIntLE(int index, int value) {
// // 以小端字节序写入 ByteBuf
UnsafeByteBufUtil.setIntLE(addr(index), value);
}
}
final class UnsafeByteBufUtil {
static void setIntLE(long address, int value) {
PlatformDependent.putByte(address, (byte) value);
PlatformDependent.putByte(address + 1, (byte) (value >>> 8));
PlatformDependent.putByte(address + 2, (byte) (value >>> 16));
PlatformDependent.putByte(address + 3, (byte) (value >>> 24));
}
}2.6 ByteBuf 的引用计数设计
Netty 为 ByteBuf 引入了引用计数的机制,在 ByteBuf 的整个设计体系中,所有的 ByteBuf 都会继承一个抽象类 AbstractReferenceCountedByteBuf , 它是对接口 ReferenceCounted 的实现。
https://img2024.cnblogs.com/blog/2907560/202408/2907560-20240814100924919-806499796.png
final void ensureWritable0(int minWritableBytes) {
final int writerIndex = writerIndex();
// 为满足本次的写入操作,预期的 ByteBuf 容量大小
final int targetCapacity = writerIndex + minWritableBytes;
// 剩余容量可以满足本次写入要求,直接返回,不需要扩容
if (targetCapacity >= 0 & targetCapacity <= capacity()) {
return;
}
// 扩容后的容量不能超过 maxCapacity
if (checkBounds && (targetCapacity < 0 || targetCapacity > maxCapacity)) {
ensureAccessible();
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// 如果 targetCapacity 在(capacity , maxCapacity] 之间,则进行扩容
// fastWritable 表示在不涉及到 memory reallocation or data-copy 的情况下,当前 ByteBuf 可以直接写入的容量
// 对于 UnpooledDirectBuffer 这里的 fastWritable = capacity - writerIndex
// PooledDirectBuffer 有另外的实现,这里先暂时不需要关注
final int fastWritable = maxFastWritableBytes();
// 计算扩容后的容量 newCapacity
// 对于 UnpooledDirectBuffer 来说这里直接通过 calculateNewCapacity 计算扩容后的容量。
int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable
: alloc().calculateNewCapacity(targetCapacity, maxCapacity);
// 根据 new capacity 对 ByteBuf 进行扩容
capacity(newCapacity);
}每个 ByteBuf 的内部都维护了一个叫做 refCnt 的引用计数,我们可以通过 refCnt() 方法来获取 ByteBuf 当前的引用计数 refCnt。当 ByteBuf 在其他上下文中被引用的时候,我们需要通过 retain() 方法将 ByteBuf 的引用计数加 1。别的我们也可以通过 retain(int increment) 方法来指定 refCnt 增加的大小(increment)。
有对 ByteBuf 的引用那么就有对 ByteBuf 的开释,每当我们使用完 ByteBuf 的时候就需要手动调用 release() 方法将 ByteBuf 的引用计数减 1 。当引用计数 refCnt 变成 0 的时候,Netty 就会通过 deallocate 方法来开释 ByteBuf 所引用的内存资源。这时 release() 方法会返回 true , 如果 refCnt 还不为 0 ,那么就返回 false 。同样我们也可以通过 release(int decrement) 方法来指定 refCnt 减少多少(decrement)。
2.6.1 为什么要引入引用计数
”在其他上下文中引用 ByteBuf “ 是什么意思呢 ? 比如我们在线程 1 中创建了一个ByteBuf,然后将这个 ByteBuf 丢给线程 2 进行处理,线程 2 又可能丢给线程 3, 而每个线程都有自己的上下文处理逻辑,比如对 ByteBuf 的处理,开释等操作。如许就使得 ByteBuf 在事实上形成了在多个线程上下文中被共享的情况。
面临这种情况我们就很难在一个单独的线程上下文中判断一个 ByteBuf 该不该被开释,比如线程1 准备开释 ByteBuf 了,但是它可能正在被其他线程使用。所以这也是 Netty 为 ByteBuf 引入引用计数的重要缘故原由,每当引用一次 ByteBuf 的时候就需要通过 retain() 方法将引用计数加 1, release() 开释的时候将引用计数减 1 ,当引用计数为 0 了,说明已经没有其他上下文引用 ByteBuf 了,这时 Netty 就可以开释它了。
别的相比于 JDK DirectByteBuffer 需要依赖 GC 机制来开释其背后引用的 Native Memory , Netty 更倾向于手动及时开释 DirectByteBuf 。由于 JDK DirectByteBuffer 的开释需要比及 GC 发生,由于 DirectByteBuffer 的对象实例所占的 JVM 堆内存太小了,所以一时很难触发 GC , 这就导致被引用的 Native Memory 的开释有了肯定的延迟,严重的情况会越积越多,导致 OOM 。而且也会导致进程中对 DirectByteBuffer 的申请操作有非常大的延迟。
而 Netty 为了避免这些情况的出现,选择在每次使用完毕之后手动开释 Native Memory ,但是不依赖 JVM 的话,总会有内存泄露的情况,比如在使用完了 ByteBuf 却忘记调用 release() 方法来开释。
所以为了检测内存泄露的发生,这也是 Netty 为 ByteBuf 引入了引用计数的另一个缘故原由,当 ByteBuf 不再被引用的时候,也就是没有任何强引用或者软引用的时候,如果此时发生 GC , 那么这个 ByteBuf 实例(位于 JVM 堆中)就需要被回收了,这时 Netty 就会检查这个 ByteBuf 的引用计数是否为 0 , 如果不为 0 ,说明我们忘记调用 release() 开释了,近而判断出这个 ByteBuf 发生了内存泄露。
在探测到内存泄露发生之后,后续 Netty 就会通过 reportLeak() 将内存泄露的相干信息以 error 的日志级别输出到日志中。
看到这里,大家可能不禁要问,不就是引入了一个小小的引用计数嘛,这有何难 ? 值得这里大书特书吗 ? 不就是在创建 ByteBuf 的时候将引用计数 refCnt 初始化为 1 , 每次在其他上下文引用的时候将 refCnt 加 1, 每次开释的时候再将 refCnt 减 1 吗 ?减到 0 的时候就开释 Native Memory,太简朴了吧~~
事实上 Netty 对引用计数的设计非常讲究,绝非如此简朴,甚至有些复杂,其背后隐蔽着大大的性能考究以及对复杂并发问题的全面考虑,在性能与线程安全问题之间的反复衡量。
2.6.2 引用计数的最初设计
所以为了理清关于引用计数的整个设计脉络,我们需要将版本回退到最初的起点 —— 4.1.16.Final 版本,来看一下原始的设计。
public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
// ByteBuf 的初始默认 CAPACITY
static final int DEFAULT_INITIAL_CAPACITY = 256;
// ByteBuf 的初始默认 MAX_CAPACITY
static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE;
@Override
public ByteBuf directBuffer() {
return directBuffer(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY);
}
}首先新版本的 retain0 方法仍然保留了 4.1.17.Final 版本引入的XADD 指令带来的性能优势,大致的处理逻辑也是雷同的,一上来先通过 getAndAdd 方法将 refCnt 增加 rawIncrement,对于 retain(T instance) 来说这里直接加 2 。
然后判断原来的引用计数 oldRef 是否是一个奇数,如果是一个奇数,那么就表现 ByteBuf 已经没有任何引用了,逻辑引用计数早已经为 0 了,那么就抛出 IllegalReferenceCountException。
在引用计数为奇数的情况下,无论多线程怎么对 refCnt 并发加 2 ,refCnt 始终是一个奇数,终极都会抛出非常。解决并发安全问题的要点就在这里,肯定要保证 retain 方法的并发实行不能改变原来的语义。
最后会判断一下 refCnt 字段是否发生溢出,如果溢出,则进行回退,并抛出非常。下面我们仍然以之前的并发场景为例,用一个详细的例子,往返味一下奇偶设计的精妙之处。
https://img2024.cnblogs.com/blog/2907560/202408/2907560-20240814101135211-1812742852.png
如今线程 1 对一个 refCnt 为 2 的 ByteBuf 实行 release 方法,这时 ByteBuf 的逻辑引用计数就为 0 了,对于一个没有任何引用的 ByteBuf 来说,新版的设计中它的 refCnt 只能是一个奇数,不能为 0 ,所以这里 Netty 会将 refCnt 设置为 1 。然后在步骤 2 中调用 deallocate 方法开释 Native Memory。
线程 2 在步骤 1 和步骤 2 之间插入进来对ByteBuf 并发实行 retain 方法,这时线程 2 看到的 refCnt 是 1,然后通过 getAndAdd 将 refCnt 加到了 3 ,仍然是一个奇数,随后抛出 IllegalReferenceCountException 非常。
线程 3 在步骤 1.1 和步骤 1.2 之间插入进来再次对 ByteBuf 并发实行 retain 方法,这时线程 3 看到的 refCnt 是 3,然后通过 getAndAdd 将 refCnt 加到了 5 ,还是一个奇数,随后抛出 IllegalReferenceCountException 非常。
如许一来就保证了引用计数的并发语义 —— 只要一个 ByteBuf 没有任何引用的时候(refCnt = 1),其他线程无论怎么并发实行retain 方法都会得到一个非常。
但是引用计数并发语义的保证不能单单只靠 retain 方法,它还需要与 release 方法相互配合协作才可以,所以为了并发语义的保证 , release 方法的设计就不能使用性能更高的 XADD 指令,而是要回退到CMPXCHG 指令来实现。
为什么这么说呢 ?由于新版引用计数的设计接纳的是奇偶实现,refCnt 为偶数表现 ByteBuf 还有引用,refCnt 为奇数表现 ByteBuf 已经没有任何引用了,可以安全开释 Native Memory 。对于一个 refCnt 已经为奇数的 ByteBuf 来说,无论多线程怎么并发实行 retain 方法,得到的 refCnt 仍然是一个奇数,终极都会抛出 IllegalReferenceCountException,这就是引用计数的并发语义 。
为了保证这一点,就需要在每次调用 retain ,release 方法的时候,以偶数步长来更新 refCnt,比如每一次调用 retain 方法就对 refCnt 加 2 ,每一次调用 release 方法就对 refCnt 减 2 。
但总有一个时刻,refCnt 会被减到 0 的对吧,在新版的奇偶设计中,refCnt 是不答应为 0 的,由于一旦 refCnt 被减到了 0 ,多线程并发实行 retain 之后,就会将 refCnt 再次加成了偶数,这又会出现并发问题。
而每一次调用 release 方法是对 refCnt 减 2 ,如果我们接纳 XADD 指令实现 release 的话,回想一下 4.1.17.Final 版本中的设计,它首先进来是通过 getAndAdd 方法对 refCnt 减 2 ,如许一来,refCnt 就变成 0 了,就有并发安全问题了。所以我们需要通过 CMPXCHG 指令将 refCnt 更新为 1。
这里有的同学可能要问了,那可不可以先进行一下 if 判断,如果 refCnt 减 2 之后变为 0 了,我们在通过 getAndAdd 方法将 refCnt 更新为 1 (减一个奇数),如许一来不也可以利用上 XADD 指令的性能优势吗 ?
答案是不行的,由于 if 判断与 getAndAdd 更新这两个操作之间仍然不是原子的,多线程可以在这个间隙仍然有并发实行 retain 方法的可能,如下图所示:
https://img2024.cnblogs.com/blog/2907560/202408/2907560-20240814101158906-416747436.png
在线程 1 实行 if 判断和 getAndAdd 更新这两个操作之间,线程 2 看到的 refCnt 其实 2 ,然后线程 2 会将 refCnt 加到 4 ,线程 3 紧接着会将 refCnt 增加到 6 ,在线程 2 和线程 3 看来这个 ByteBuf 完满是正常的,但是线程 1 立刻就会开释 Native Memory 了。
而且接纳这种设计的话,一会通过 getAndAdd 对 refCnt 减一个奇数,一会通过 getAndAdd 对 refCnt 加一个偶数,如许就把原本的奇偶设计搞乱掉了。
所以我们的设计目的是肯定要保证在 ByteBuf 没有任何引用计数的时候,release 方法需要原子性的将 refCnt 更新为 1 。 因此必须接纳 CMPXCHG 指令来实现而不能使用 XADD 指令。
再者说, CMPXCHG 指令是可以原子性的判断当前是否有并发情况的,如果有并发情况出现,CAS就会失败,我们可以继续重试。但 XADD 指令却无法原子性的判断是否有并发情况,由于它每次都是先更新,后判断并发,这就不是原子的了。这一点,在下面的源码实现中会体现的特别明显。
2.6.7 尽量避免内存屏障的开销
// 扩容的尺度
static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page这里有一个小的细节再次体现出 Netty 对于性能的极致追求,refCnt 字段在 ByteBuf 中被 Netty 说明为一个 volatile 字段。
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}我们对 refCnt 的平凡读写都是要走内存屏障的,但 Netty 在 release 方法中首次读取 refCnt 的值是接纳 nonVolatile 的方式,不走内存屏障,直接读取 cache line,避免了屏障开销。
public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
// 满足本次写入操作的最小容量 minNewCapacity 不能超过 maxCapacity
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
// 用于决定扩容的尺度
final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
// If over threshold, do not double but just increase by threshold.
if (minNewCapacity > threshold) {
// 计算扩容基准线。
// 要求必须是 CALCULATE_THRESHOLD 的最小倍数,而且必须要小于等于 minNewCapacity
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
// 按照 threshold (4M)扩容
newCapacity += threshold;
}
return newCapacity;
}
// Not over threshold. Double up to 4 MiB, starting from 64.
// 按照 64 的倍数进行扩容。但 newCapacity 需要大于等于 minNewCapacity。
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
return Math.min(newCapacity, maxCapacity);
}
}那有的同学可能要问了,如果读取 refCnt 的时候不走内存屏障的话,读取到的 refCnt 不就可能是一个错误的值吗 ?
事实上确实是如许的,但 Netty 不 care , 读到一个错误的值也无所谓,由于这里的引用计数接纳了奇偶设计,我们在第一次读取引用计数的时候并不需要读取到一个精确的值,既然如许我们可以直接通过 UnSafe 来读取,还能剩下一笔内存屏障的开销。
那为什么不需要一个精确的值呢 ?由于如果原来的 refCnt 是一个奇数,那无论多线程怎么并发 retain ,终极得到的还是一个奇数,我们这里只需要知道 refCnt 是一个奇数就可以直接抛 IllegalReferenceCountException 了。详细读到的是一个 3 还是一个 5 其实都无所谓。
那如果原来的 refCnt 是一个偶数呢 ?其实也无所谓,我们可能读到一个正确的值也可能读到一个错误的值,如果恰好读到一个正确的值,那更好。如果读取到一个错误的值,也无所谓,由于我们后面是用 CAS 进行更新,如许的话 CAS 就会更新失败,我们只需要在一下轮 for 循环中更新正确就可以了。
如果读取到的 refCnt 恰好是 2 ,那就意味着本次 release 之后,ByteBuf 的逻辑引用计数就为 0 了,Netty 会通过 CAS 将 refCnt 更新为 1 。
public class UnpooledDirectByteBuf{
// 底层依赖 JDK 的 DirectByteBuffer
ByteBuffer buffer;
}如果 CAS 更新失败,则表现此时有多线程可能并发对 ByteBuf 实行 retain 方法,逻辑引用计数此时可能就不为 0 了,针对这种并发情况,Netty 会在 retryRelease0 方法中进行重试,将 refCnt 减 2 。
public class UnpooledDirectByteBuf{
void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
if (tryFree) {
ByteBuffer oldBuffer = this.buffer;
// 释放原来的 buffer
freeDirect(oldBuffer);
}
// 重新设置新的 buffer
this.buffer = buffer;
capacity = buffer.remaining();
}
}2.8.2 CompositeByteBuf 的创建
好了,如今我们已经熟悉了 CompositeByteBuf 的总体架构,那么接下来我们就来看一下 Netty 是怎样将多个 ByteBuf 逻辑聚合成一个 CompositeByteBuf 的。
public class UnpooledUnsafeDirectByteBuf extends UnpooledDirectByteBuf {
// ByteBuf 的内存地址
long memoryAddress;
@Override
final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
super.setByteBuffer(buffer, tryFree);
// 设置成新 buffer 的内存地址
memoryAddress = PlatformDependent.directBufferAddress(buffer);
}
}CompositeByteBuf 的初始 maxNumComponents 为 buffers 数组的长度,如果我们只是传入一个 ByteBuf 的话,那么就无需创建 CompositeByteBuf,而是直接返回该 ByteBuf 的 slice 视图。
如果我们传入的是多个 ByteBuf 的话,则将这多个 ByteBuf 包装成 CompositeByteBuf 返回。
public class UnpooledDirectByteBuf{
// 底层依赖 JDK 的 DirectByteBuffer
ByteBuffer buffer;
@Override
public ByteBuf capacity(int newCapacity) {
// newCapacity 不能超过 maxCapacity
checkNewCapacity(newCapacity);
int oldCapacity = capacity;
if (newCapacity == oldCapacity) {
return this;
}
// 计算扩容之后需要拷贝的字节数
int bytesToCopy;
if (newCapacity > oldCapacity) {
bytesToCopy = oldCapacity;
} else {
........ 缩容 .......
}
ByteBuffer oldBuffer = buffer;
// 根据 newCapacity 分配一个新的 ByteBuffer(JDK)
ByteBuffer newBuffer = allocateDirect(newCapacity);
oldBuffer.position(0).limit(bytesToCopy);
newBuffer.position(0).limit(bytesToCopy);
// 将原来 oldBuffer 中的数据拷贝到 newBuffer 中
newBuffer.put(oldBuffer).clear();
// 释放 oldBuffer,设置 newBuffer
// 对于 UnpooledUnsafeDirectByteBuf 来说就是将 newBuffer 的地址设置到 memoryAddress 中
setByteBuffer(newBuffer, true);
return this;
}
}在进入 CompositeByteBuf 的创建流程之后,首先是创建出一个空的 CompositeByteBuf,也就是先把 CompositeByteBuf 的骨架搭建起来,这时它的 initSize 为 buffers.length - offset 。
注意 initSize 表现的并不是 CompositeByteBuf 初始包含的字节个数,而是表现初始 Component 的个数。offset 则表现从 buffers 数组中的哪一个索引开始创建 CompositeByteBuf,就是上面 CompositeByteBuf 构造函数中最后一个参数 i 。
随后通过 addComponents0 方法为 buffers 数组中的每一个 ByteBuf 创建初始化 Component 实例,并将他们有序的添加到 CompositeByteBuf 的 components 数组中。
但这时 Component 实例的个数可能已经超过 maxNumComponents 限制的个数,那么接下来就会在 consolidateIfNeeded() 方法中将当前 CompositeByteBuf 中的所有 Components 归并成一个更大的 Component。CompositeByteBuf 中的 components 数组长度是不可以超过 maxNumComponents 限制的,如果超过就需要在这里归并。
最后设置当前 CompositeByteBuf 的 readerIndex 和 writerIndex,在初始状态下 CompositeByteBuf 的 readerIndex 会被设置为 0 ,writerIndex 会被设置为最后一个 Component 的 endOffset 。
public ByteBuf ensureWritable(int minWritableBytes) 2.8.3 shiftComps 为新的 ByteBuf 腾挪空间
在整个 CompositeByteBuf 的构造过程中,最核心也是最复杂的步骤其实就是 addComponents0 方法,将多个 ByteBuf有序的添加到 CompositeByteBuf 的 components 数组中看似简朴,其实还有很多种复杂的情况需要考虑。
https://img2024.cnblogs.com/blog/2907560/202408/2907560-20240814101610524-1773168778.png
复杂之处在于这些 ByteBuf 需要插在 components 数组的哪个位置上 ? 比较简朴直观的情况是我们直接在 components 数组的末尾插入,也就是说要插入的位置索引 cIndex 等于 componentCount。这里分为两种情况:
[*]cIndex = componentCount = 0 ,这种情况表现我们在向一个空的 CompositeByteBuf 插入 ByteBufs , 很简朴,直接插入即可。
[*]cIndex = componentCount > 0 , 这种情况表现我们再向一个非空的 CompositeByteBuf 插入 ByteBufs,正如上图所示。同样也很简朴,直接在 componentCount 的位置处插入即可。
轻微复杂一点的情况是我们在 components 数组的中间位置进行插入而不是在末尾,也就是 cIndex < componentCount 的情况。如下如图所示,假设我们如今需要在 cIndex = 3的位置处插入两个 ByteBuf 进来,但如今 components 以及 components 的位置已经被占用了。所以我们需要将这两个位置上的原有 component 向后移动两个位置,将 components 和 components 的位置腾出来。
public int ensureWritable(int minWritableBytes, boolean force) https://img2024.cnblogs.com/blog/2907560/202408/2907560-20240814101630389-1475469389.png
在复杂一点的情况就是 components 数组需要扩容,当一个 CompositeByteBuf 刚刚被初始化出来的时候,它的 components 数组长度等于 maxNumComponents。
如果当前 components 数组中包含的 component 个数 —— componentCount 加上本次需要添加的 ByteBuf 个数 —— count 已经超过了 maxNumComponents 的时候,就需要对 components 数组进行扩容。
@Override
public int ensureWritable(int minWritableBytes, boolean force) {
// 如果剩余容量可以满足本次写入操作,则不会扩容,直接返回
if (minWritableBytes <= writableBytes()) {
return 0;
}
final int maxCapacity = maxCapacity();
final int writerIndex = writerIndex();
// 如果本次写入的数据大小已经超过了 ByteBuf 的最大可写容量 maxCapacity - writerIndex
if (minWritableBytes > maxCapacity - writerIndex) {
// force = false , 那么停止扩容,直接返回
// force = true, 直接扩容到 maxCapacity,如果当前 capacity 已经等于 maxCapacity 了则停止扩容
if (!force || capacity() == maxCapacity) {
return 1;
}
// 虽然扩容之后还是无法满足写入需求,但还是强制扩容至 maxCapacity
capacity(maxCapacity);
return 3;
}
// 下面就是普通的扩容逻辑
int fastWritable = maxFastWritableBytes();
int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable
: alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
// Adjust to the new capacity.
capacity(newCapacity);
return 2;
}扩容之后的 components 数组长度是在 newSize 与原来长度的 3 / 2之间取一个最大值。
public class AdaptiveRecvByteBufAllocator {
static final int DEFAULT_INITIAL = 2048;
}如果我们原来恰好是盼望在 components 数组的末尾插入,也就是 cIndex= componentCount 的情况,那么就需要通过 Arrays.copyOf 首先申请一段长度为 newArrSize 的数组,然后将原来的 components 数组中的内容原样拷贝过去。
private final class HandleImpl extends MaxMessageHandle {
@Override
public void lastBytesRead(int bytes) {
// bytes 为本次从 socket 中真实读取的数据大小
// attemptedBytesRead 为 ByteBuf 可写的容量大小,初始为 2048
if (bytes == attemptedBytesRead()) {
// 如果本次读取 socket 中的数据将 ByteBuf 装满了
// 那么就对 ByteBuf 进行扩容,在下一次读取的时候用更大的 ByteBuf 去读
record(bytes);
}
// 记录本次从 socket 中读取的数据大小
super.lastBytesRead(bytes);
}
}如许新的 components 数组就有位置可以容纳本次需要参加的 ByteBuf 了。
https://img2024.cnblogs.com/blog/2907560/202408/2907560-20240814101655924-659940622.png
如果我们盼望在原来 components 数组的中间插入,也就是 cIndex < componentCount 的情况,如下图所示:
https://img2024.cnblogs.com/blog/2907560/202408/2907560-20240814101707962-582044574.png
这种情况在扩容的时候就不能原样拷贝原 components 数组了,而是首先通过 System.arraycopy 将 [0 , cIndex) 这段范围的内容拷贝过去,在将 [cIndex , componentCount) 这段范围的内容拷贝到新数组的 cIndex + count 位置处。
https://img2024.cnblogs.com/blog/2907560/202408/2907560-20240814101721102-1247792752.png
如许一来,就在新 components 数组的 cIndex 索引处,空出了两个位置出来用来添加本次这两个 ByteBuf。最后更新 componentCount 的值。以上腾挪空间的逻辑封装在 shiftComps 方法中:
private void shiftComps(int i, int count) { // 初始为 0,当前 CompositeByteBuf 中包含的 component 个数 final int size = componentCount, // 本次 addComponents0 操作之后,新的 component 个数 newSize = size + count; // newSize 超过了 max components(16) 则对 components 数组进行扩容 if (newSize > components.length) { // grow the array,扩容到原来的 3 / 2 public class AdaptiveRecvByteBufAllocator {
static final int DEFAULT_INITIAL = 2048;
} Component[] newArr; if (i == size) { // 在 Component[] 数组的末尾进行插入 // 初始状态 i = size = 0 // size - 1 是 Component[] 数组的最后一个元素,指定的 i 恰好越界 // 原来 Component[] 数组中的内容全部拷贝到 newArr 中 private final class HandleImpl extends MaxMessageHandle {
@Override
public void lastBytesRead(int bytes) {
// bytes 为本次从 socket 中真实读取的数据大小
// attemptedBytesRead 为 ByteBuf 可写的容量大小,初始为 2048
if (bytes == attemptedBytesRead()) {
// 如果本次读取 socket 中的数据将 ByteBuf 装满了
// 那么就对 ByteBuf 进行扩容,在下一次读取的时候用更大的 ByteBuf 去读
record(bytes);
}
// 记录本次从 socket 中读取的数据大小
super.lastBytesRead(bytes);
}
} } else { // 在 Component[] 数组的中间进行插入 newArr = new Component; if (i > 0) { // 2.8.4 Component 怎样封装 ByteBuf
经过上一小节 shiftComps 方法的辗转腾挪之后,如今 CompositeByteBuf 中的 components 数组终于有位置可以容纳本次需要添加的 ByteBuf 了。接下来就需要为每一个 ByteBuf 创建初始化一个 Component 实例,最后将这些 Component 实例放到 components 数组对应的位置上。
private final class HandleImpl extends MaxMessageHandle {
@Override
public void readComplete() {
// 是否对 ByteBuf 进行扩容或者缩容
record(totalBytesRead());
}
}https://img2024.cnblogs.com/blog/2907560/202408/2907560-20240814101743867-346106571.png
我们首先需要初始化 Component 实例的 offset , endOffset 属性,前面我们已经介绍了,一个 Component 在 CompositeByteBuf 的视角中所能表现的数据逻辑范围是 [offset , endOffset)。在 components 数组中,一样平常前一个 Component 的 endOffset 往往是后一个 Component 的 offset。
如果我们盼望从 components 数组的第一个位置处开始插入(cIndex = 0),那么第一个 Component 的 offset 自然是 0 。
如果 cIndex > 0 , 那么我们就需要找到它上一个 Component —— components , 上一个 Component 的 endOffset 恰好就是当前 Component 的 offset。
然后通过 newComponent 方法利用 ByteBuf 相干属性以及 offset 来初始化 Component 实例。随后将创建出来的 Component 实例放置在对应的位置上 —— components 。
public class AdaptiveRecvByteBufAllocator {
// 扩容步长
private static final int INDEX_INCREMENT = 4;
// 缩容步长
private static final int INDEX_DECREMENT = 1;
// ByteBuf分配容量表(扩缩容索引表)按照表中记录的容量大小进行扩缩容
private static final int[] SIZE_TABLE;
}假设如今有一个空的 CompositeByteBuf,我们需要将一个数据范围为 , readerIndex = 1 的 srcBuf , 插入到 CompositeByteBuf 的 components 数组中。
https://img2024.cnblogs.com/blog/2907560/202408/2907560-20240814101801704-167478668.png
但是如果该 srcBuf 是一个视图 ByteBuf 的话,比如:SlicedByteBuf , DuplicatedByteBuf。或者是一个被包装过的 ByteBuf ,比如:WrappedByteBuf , SwappedByteBuf。
那么我们就需要对 srcBuf 不断的实行 unwrap(), 将其最底层的原生 ByteBuf 提取出来,如上图所示,原生 buf 的数据范围为 , srcBuf 与 buf 之间相干 index 的偏移 adjustment 等于 3, 原生 buf 的 readerIndex = 4 。
最后我们会根据 srcBuf , srcIndex(srcBuf 的 readerIndex),原生 buf ,unwrappedIndex(buf 的 readerIndex),offset , len (srcBuf 中的可读字节数)来初始化 Component 实例。
private void record(int actualReadBytes) {
if (actualReadBytes <= SIZE_TABLE) {
// 缩容条件触发两次之后就进行缩容
if (decreaseNow) {
index = max(index - INDEX_DECREMENT, minIndex);
nextReceiveBufferSize = SIZE_TABLE;
decreaseNow = false;
} else {
decreaseNow = true;
}
} else if (actualReadBytes >= nextReceiveBufferSize) {
// 扩容条件满足一次之后就进行扩容
index = min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE;
decreaseNow = false;
}
}由于当前的 CompositeByteBuf 还是空的,里面没有包含任何逻辑数据,当长度为 4 的 srcBuf 参加之后,CompositeByteBuf 就产生了 这段逻辑数据范围,所以 srcBuf 所属 Component 的 offset = 0 , endOffset = 4 ,srcAdjustment = 1 ,adjustment = 4。
https://img2024.cnblogs.com/blog/2907560/202408/2907560-20240814101821619-432558438.png
public interface ReferenceCounted {
int refCnt();
ReferenceCounted retain();
ReferenceCounted retain(int increment);
boolean release();
boolean release(int decrement);
}当我们继续初始化下一个 Component 的时候,它的 Offset 其实就是这个 Component 的 endOffset 。后面的流程都是一样的了。
2.8.5 addComponents0
在我们清晰了以上配景知识之后,在看 addComponents0 方法的逻辑就很清晰了:
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
// 原子更新 refCnt 的 Updater
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
// 引用计数,初始化为 1
private volatile int refCnt;
protected AbstractReferenceCountedByteBuf(int maxCapacity) {
super(maxCapacity);
// 引用计数初始化为 1
refCntUpdater.set(this, 1);
}
// 引用计数增加 increment
private ByteBuf retain0(int increment) {
for (;;) {
int refCnt = this.refCnt;
// 每次 retain 的时候对引用计数加 1
final int nextCnt = refCnt + increment;
// Ensure we not resurrect (which means the refCnt was 0) and also that we encountered an overflow.
if (nextCnt <= increment) {
// 如果 refCnt 已经为 0 或者发生溢出,则抛异常
throw new IllegalReferenceCountException(refCnt, increment);
}
// CAS 更新 refCnt
if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) {
break;
}
}
return this;
}
// 引用计数减少 decrement
private boolean release0(int decrement) {
for (;;) {
int refCnt = this.refCnt;
if (refCnt < decrement) {
// 引用的次数必须和释放的次数相等对应
throw new IllegalReferenceCountException(refCnt, -decrement);
}
// 每次 release 引用计数减 1
// CAS 更新 refCnt
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
if (refCnt == decrement) {
// 如果引用计数为 0 ,则释放 Native Memory,并返回 true
deallocate();
return true;
}
// 引用计数不为 0 ,返回 false
return false;
}
}
}
}在这里,Netty 会将当前 CompositeByteBuf 中包含的所有 Component 归并成一个更大的 Component。归并之后 ,CompositeByteBuf 中就只包含一个 Component 了。归并的核心逻辑如下:
[*]根据当前CompositeByteBuf 的 capacity 重新申请一个更大的 ByteBuf ,该ByteBuf 需要容纳下 CompositeByteBuf 所能表现的所有字节。
[*]将所有 Component 底层的 buf 中存储的内容全部转移到新的 ByteBuf 中,并开释原有 buf 的内存。
[*]删除 Component 数组中所有的 Component。
[*]根据新的 ByteBuf 创建一个新的 Component 实例,并放置在 components 数组的第一个位置上。
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
private volatile int refCnt;
protected AbstractReferenceCountedByteBuf(int maxCapacity) {
super(maxCapacity);
// 引用计数在初始的时候还是为 1
refCntUpdater.set(this, 1);
}
private ByteBuf retain0(final int increment) {
// 相比于 compareAndSet 的实现,这里将 for 循环去掉
// 并且每次是先对 refCnt 增加计数 increment
int oldRef = refCntUpdater.getAndAdd(this, increment);
// 增加完 refCnt 计数之后才去判断异常情况
if (oldRef <= 0 || oldRef + increment < oldRef) {
// Ensure we don't resurrect (which means the refCnt was 0) and also that we encountered an overflow.
// 如果原来的 refCnt 已经为 0 或者 refCnt 溢出,则对 refCnt 进行回退,并抛出异常
refCntUpdater.getAndAdd(this, -increment);
throw new IllegalReferenceCountException(oldRef, increment);
}
return this;
}
private boolean release0(int decrement) {
// 先对 refCnt 减少计数 decrement
int oldRef = refCntUpdater.getAndAdd(this, -decrement);
// 如果 refCnt 已经为 0 则进行 Native Memory 的释放
if (oldRef == decrement) {
deallocate();
return true;
} else if (oldRef < decrement || oldRef - decrement > oldRef) {
// 如果释放次数大于 retain 次数 或者 refCnt 出现下溢
// 则对 refCnt 进行回退,并抛出异常
refCntUpdater.getAndAdd(this, decrement);
throw new IllegalReferenceCountException(oldRef, decrement);
}
return false;
}
}如果是 JDK 9 以上的版本,Netty 会检查是否可以通过sun.misc.Unsafe 的 invokeCleaner 方法正确实行 DirectBuffer 的 Cleaner,如果实行过程中发生非常,那么 CLEANER 就为 NOOP,Netty 在默认情况下就会走 Heap Memory。
public final int initialValue() {
return 2;
}如果是 JDK 9 以下的版本,Netty 就会通过反射的方式先去获取 DirectByteBuffer 的 cleaner 字段,如果 cleaner 为 null 或者在实行 clean 方法的过程中出现了非常,那么 CLEANER 就为 NOOP,Netty 在默认情况下就会走 Heap Memory。
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
// 获取 refCnt 字段在 ByteBuf 对象内存中的偏移
// 后续通过 Unsafe 对 refCnt 进行操作
private static final long REFCNT_FIELD_OFFSET =
ReferenceCountUpdater.getUnsafeOffset(AbstractReferenceCountedByteBuf.class, "refCnt");
// 获取 refCnt 字段 的 AtomicFieldUpdater
// 后续通过 AtomicFieldUpdater 来操作 refCnt 字段
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> AIF_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
// 创建 ReferenceCountUpdater,对于引用计数的所有操作最终都会代理到这个类中
private static final ReferenceCountUpdater<AbstractReferenceCountedByteBuf> updater =
new ReferenceCountUpdater<AbstractReferenceCountedByteBuf>() {
@Override
protected AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater() {
// 通过 AtomicIntegerFieldUpdater 操作 refCnt 字段
return AIF_UPDATER;
}
@Override
protected long unsafeOffset() {
// 通过 Unsafe 操作 refCnt 字段
return REFCNT_FIELD_OFFSET;
}
};
// ByteBuf 中的引用计数,初始为 2 (偶数)
private volatile int refCnt = updater.initialValue();
}如果 PlatformDependent.directBufferPreferred() 方法返回 true ,那么 ByteBufAllocator 接下来在分配内存的时候,默认情况下就会分配directBuffer。
public abstract class ReferenceCountUpdater<T extends ReferenceCounted> {
public final int initialValue() {
// ByteBuf 引用计数初始化为 2
return 2;
}
public final int refCnt(T instance) {
// 通过 updater 获取 refCnt
// 根据 refCnt 在realRefCnt 中获取真实的引用计数
return realRefCnt(updater().get(instance));
}
// 获取 ByteBuf 的逻辑引用计数
private static int realRefCnt(int rawCnt) {
// 奇偶判断
return rawCnt != 2 && rawCnt != 4 && (rawCnt & 1) != 0 ? 0 : rawCnt >>> 1;
}
}一样平常情况下,JDK 都会包含有效的 CLEANER 机制,所以我们完全可以仅是通过 -Dio.netty.noPreferDirect (默认 false)来控制 Netty 默认情况下走Direct Memory。
但如果是安卓平台,那么无论-Dio.netty.noPreferDirect怎样设置,Netty 默认情况下都会走Heap Memory 。
4. Cleaner or NoCleaner
站在内存回收的角度,Netty 将 ByteBuf 分为了带有 Cleaner 的 DirectByteBuf 和没有 Cleaner 的 DirectByteBuf 两个大类。在之前的文章《以 ZGC 为例,谈一谈 JVM 是怎样实现 Reference 语义的》 中的第三小节,笔者详细的介绍过,JVM怎样利用 Cleaner 机制往返收 DirectByteBuffer 背后的 Native Memory 。
而 Cleaner 回收 DirectByteBuffer 的 Native Memory 需要依赖 GC 的发生,当一个 DirectByteBuffer 没有任何强引用或者软引用的时候,如果此时发生 GC , Cleaner 才会去回收 Native Memory。如果很久都没发生 GC ,那么这些 DirectByteBuffer 所引用的 Native Memory 将一直不会开释。
所以仅仅是依赖 Cleaner 来开释 Native Memory 是有肯定延迟的,极度情况下,如果一直等不来 GC ,很有可能就会发生 OOM 。
而 Netty 的 ByteBuf 设计相当于是对 NIO ByteBuffer 的一种完善扩展,其底层其实都会依赖一个 JDK 的 ByteBuffer。比如,前面介绍的 UnpooledDirectByteBuf , UnpooledUnsafeDirectByteBuf 其底层依赖的就是 JDKDirectByteBuffer , 而这个 DirectByteBuffer 就是带有 Cleaner 的 ByteBuf 。
public final void setRefCnt(T instance, int refCnt) {
updater().set(instance, refCnt > 0 ? refCnt << 1 : 1); // overflow OK here
} public final T retain(T instance) {
// 引用计数逻辑上是加 1 ,但实际上是加 2 (实现角度)
return retain0(instance, 1, 2);
}
public final T retain(T instance, int increment) {
// all changes to the raw count are 2x the "real" change - overflow is OK
// rawIncrement 始终是逻辑计数 increment 的两倍
int rawIncrement = checkPositive(increment, "increment") << 1;
// 将 rawIncrement 设置到 ByteBuf 的 refCnt 字段中
return retain0(instance, increment, rawIncrement);
}
// rawIncrement = increment << 1
// increment 表示引用计数的逻辑增长步长
// rawIncrement 表示引用计数的实际增长步长
private T retain0(T instance, final int increment, final int rawIncrement) {
// 先通过 XADD 指令将refCnt 的值加起来
int oldRef = updater().getAndAdd(instance, rawIncrement);
// 如果 oldRef 是一个奇数,也就是 ByteBuf 已经没有引用了,抛出异常
if (oldRef != 2 && oldRef != 4 && (oldRef & 1) != 0) {
// 如果 oldRef 已经是一个奇数了,无论多线程在这里怎么并发 retain ,都是一个奇数,这里都会抛出异常
throw new IllegalReferenceCountException(0, increment);
}
// don't pass 0!
// refCnt 不可能为 0 ,只能是 1
if ((oldRef <= 0 && oldRef + rawIncrement >= 0)
|| (oldRef >= 0 && oldRef + rawIncrement < oldRef)) {
// 如果 refCnt 字段已经溢出,则进行回退,并抛异常
updater().getAndAdd(instance, -rawIncrement);
throw new IllegalReferenceCountException(realRefCnt(oldRef), increment);
}
return instance;
}在 JDK NIO中,凡是通过 ByteBuffer.allocateDirect 方法申请到 DirectByteBuffer 都是带有 Cleaer 的。
public final boolean release(T instance) {
// 第一次尝试采用 unSafe nonVolatile 的方式读取 refCnf 的值
int rawCnt = nonVolatileRawCnt(instance);
// 如果逻辑引用计数被减到 0 了,那么就通过 tryFinalRelease0 使用 CAS 将 refCnf 更新为 1
// CAS 失败的话,则通过 retryRelease0 进行重试
// 如果逻辑引用计数不为 0 ,则通过 nonFinalRelease0 将 refCnf 减 2
return rawCnt == 2 ? tryFinalRelease0(instance, 2) || retryRelease0(instance, 1)
: nonFinalRelease0(instance, 1, rawCnt, toLiveRealRefCnt(rawCnt, 1));
}而带有 Cleaner 的 DirectByteBuffer 背后所能引用的 Direct Memory 是受到 -XX:MaxDirectMemorySize JVM 参数限制的。由于 UnpooledDirectByteBuf 以及 UnpooledUnsafeDirectByteBuf 都带有 Cleaner,所以当他们在系统中没有任何强引用或者软引用的时候,如果发生 GC , Cleaner 就会开释他们的 Direct Memory 。
由于 Cleaner 实行会依赖 GC , 而 GC 的发生往往不那么及时,会有肯定的延时,所以 Netty 为了可以及时的开释Direct Memory ,往往选择不依赖 JDK 的 Cleaner 机制,手动进行开释。所以就有了 NoCleaner 类型的 DirectByteBuf —— UnpooledUnsafeNoCleanerDirectByteBuf 。
private volatile int refCnt = updater.initialValue();UnpooledUnsafeNoCleanerDirectByteBuf 的底层同样也会依赖一个 JDKDirectByteBuffer , 但和之前差别的是,这里的 DirectByteBuffer 是不带有 cleaner 的。
我们通过 JNI 来调用 DirectByteBuffer(long addr, int cap) 构造函数创建出来的 JDKDirectByteBuffer 都是没有 cleaner 的。但通过这种方式创建出来的 DirectByteBuffer 背后引用的 Native Memory 是不会受到 -XX:MaxDirectMemorySize JVM 参数限制的。
private int nonVolatileRawCnt(T instance) {
// 获取 REFCNT_FIELD_OFFSET
final long offset = unsafeOffset();
// 通过 UnSafe 的方式来访问 refCnt , 避免内存屏障的开销
return offset != -1 ? PlatformDependent.getInt(instance, offset) : updater().get(instance);
}既然没有了 cleaner , 所以 Netty 就无法依赖 GC 来开释 Direct Memory 了,这就要求 Netty 必须手动调用 freeDirect方法及时地开释 Direct Memory。
事实上,无论 Netty 中的 DirectByteBuf 有没有 Cleaner, Netty 都会选择手动的进行开释,目的就是为了避免 GC 的延迟 , 从而及时的开释 Direct Memory。
那么 Netty 中的 DirectByteBuf 在什么情况下带有 Cleaner,又在什么情况下不带 Cleaner 呢 ?我们可以通过 PlatformDependent.useDirectBufferNoCleaner 方法的返回值进行判断:
private boolean tryFinalRelease0(T instance, int expectRawCnt) {
return updater().compareAndSet(instance, expectRawCnt, 1); // any odd number will work
}
[*]USE_DIRECT_BUFFER_NO_CLEANER = TRUE 表现 Netty 创建出来的 DirectByteBuf 不带有 Cleaner 。Direct Memory 的用量不会受到 JVM 参数 -XX:MaxDirectMemorySize 的限制。
[*]USE_DIRECT_BUFFER_NO_CLEANER = FALSE 表现 Netty 创建出来的 DirectByteBuf 带有 Cleaner 。Direct Memory 的用量会受到 JVM 参数 -XX:MaxDirectMemorySize 的限制。
我们可以通过 -Dio.netty.maxDirectMemory 来设置 USE_DIRECT_BUFFER_NO_CLEANER 的值,除此之外,该参数还可以指定在 Netty 层面上可以使用的最大 DirectMemory 用量。
io.netty.maxDirectMemory = 0 那么 USE_DIRECT_BUFFER_NO_CLEANER 就为 FALSE , 表现在 Netty 层面创建出来的 DirectByteBuf 都是带有 Cleaner 的,这种情况下 Netty 并不会限制 maxDirectMemory 的用量,由于限制了也没用,详细能用多少 maxDirectMemory,还是由 JVM 参数 -XX:MaxDirectMemorySize 决定的。
io.netty.maxDirectMemory < 0 ,默认为 -1,也就是在默认情况下 USE_DIRECT_BUFFER_NO_CLEANER 为 TRUE , 创建出来的 DirectByteBuf 都是不带 Cleaner 的。由于在这种情况下 maxDirectMemory 的用量并不会受到 JVM 参数 -XX:MaxDirectMemorySize 的限制,所以在 Netty 层面上必须限制 maxDirectMemory 的用量,默认值就是-XX:MaxDirectMemorySize指定的值。
这里需要特别注意的是,Netty 层面临于 maxDirectMemory 的容量限制和 JVM 层面临于 maxDirectMemory 的容量限制是单独分别计算的,互不影响。因此站在 JVM 进程的角度来说,总体 maxDirectMemory 的用量是 -XX:MaxDirectMemorySize 的两倍。
io.netty.maxDirectMemory > 0 的情况和小于 0 的情况一样,唯一差别的是 Netty 层面的 maxDirectMemory 用量是专门由-Dio.netty.maxDirectMemory 参数指定,仍然独立于 JVM 层面的 maxDirectMemory 限制之外单独计算。
所以从这个层面来说,Netty 设计 NoCleaner 类型的 DirectByteBuf 的别的一个目的就是为了突破 JVM 对于 maxDirectMemory 用量的限制。
private boolean retryRelease0(T instance, int decrement) {
for (;;) {
// 采用 Volatile 的方式读取 refCnt
int rawCnt = updater().get(instance),
// 获取逻辑引用计数,如果 refCnt 已经变为奇数,则抛出异常
realCnt = toLiveRealRefCnt(rawCnt, decrement);
// 如果执行完本次 release , 逻辑引用计数为 0
if (decrement == realCnt) {
// CAS 将 refCnt 更新为 1
if (tryFinalRelease0(instance, rawCnt)) {
return true;
}
} else if (decrement < realCnt) {
// 原来的逻辑引用计数 realCnt 大于 1(decrement)
// 则通过 CAS 将 refCnt 减 2
if (updater().compareAndSet(instance, rawCnt, rawCnt - (decrement << 1))) {
return false;
}
} else {
// refCnt 字段如果发生溢出,则抛出异常
throw new IllegalReferenceCountException(realCnt, -decrement);
}
// CAS 失败之后调用 yield
// 减少无畏的竞争,否则所有线程在高并发情况下都在这里 CAS 失败
Thread.yield();
}
}当 Netty 层面的 direct memory 用量超过了 -Dio.netty.maxDirectMemory 参数指定的值时,那么就会抛出 OutOfDirectMemoryError ,分配 DirectByteBuf 将会失败。
private boolean nonFinalRelease0(T instance, int decrement, int rawCnt, int realCnt) {
if (decrement < realCnt
&& updater().compareAndSet(instance, rawCnt, rawCnt - (decrement << 1))) {
// ByteBuf 的 rawCnt 减少 2 * decrement
return false;
}
// CAS失败则一直重试,如果引用计数已经为 0 ,那么抛出异常,不能再次 release
return retryRelease0(instance, decrement);
}5. Unsafe or NoUnsafe
站在内存访问方式的角度上来说 , Netty 又会将 ByteBuf 分为了 Unsafe 和 NoUnsafe 两个大类,此中 NoUnsafe 的内存访问方式是依赖底层的 JDK ByteBuffer,对于 Netty ByteBuf 的任何操作终极都是会代理给底层 JDK 的 ByteBuffer。
@Override
public int readableBytes() {
// 原生 ByteBuf
return writerIndex - readerIndex;
}而 Unsafe 的内存访问方式则是通过 sun.misc.Unsafe 类中提供的浩繁 low-level direct buffer access API 来对内存地点直接进行访问,由于是脱离 JVM 相干规范直接对内存地点进行访问,所以我们在调用 Unsafe 相干方法的时候需要考虑 JVM 以及 OS 的各种细节,一不鉴戒就会踩坑堕落,所以它是一种不安全的访问方式,但是充足灵活,高效。
public abstract class AbstractByteBuf extends ByteBuf {
@Override
public ByteBuf slice() {
return slice(readerIndex, readableBytes());
}
@Override
public ByteBuf slice(int index, int length) {
// 确保 ByteBuf 的引用计数不为 0
ensureAccessible();
return new UnpooledSlicedByteBuf(this, index, length);
}
}Netty 提供了 -Dio.netty.noUnsafe 参数来让我们决定是否接纳 Unsafe 的内存访问方式,默认值是 false , 表现 Netty 默认开启 Unsafe 访问方式。
class UnpooledSlicedByteBuf extends AbstractUnpooledSlicedByteBuf {
UnpooledSlicedByteBuf(AbstractByteBuf buffer, int index, int length) {
// index = readerIndex
// length = readableBytes()
super(buffer, index, length);
}
@Override
public int capacity() {
// 视图 ByteBuf 的 capacity 和 maxCapacity 相等
// 均为原生 ByteBuf 的 readableBytes()
return maxCapacity();
}
}在确认开启了 Unsafe 方式之后,我们就需要近一步确认在当前 JRE 的 classpath 下是否存在 sun.misc.Unsafe 类,是否能通过反射的方式获取到 Unsafe 实例 —— theUnsafe 。
@Override
protected byte _getByte(int index) {
// 底层其实是对原生 ByteBuf 的访问
return unwrap()._getByte(idx(index));
}
@Override
protected void _setByte(int index, int value) {
unwrap()._setByte(idx(index), value);
}
/**
* Returns the index with the needed adjustment.
*/
final int idx(int index) {
// 转换为原生 ByteBuf 的 readerIndex 或者 writerIndex
return index + adjustment;
}abstract class AbstractUnpooledSlicedByteBuf extends AbstractDerivedByteBuf {
// 原生 ByteBuf
private final ByteBuf buffer;
// 视图 ByteBuf 相对于原生 ByteBuf的数据区域偏移
private final int adjustment;
AbstractUnpooledSlicedByteBuf(ByteBuf buffer, int index, int length) {
// 设置视图 ByteBuf 的 maxCapacity,readerIndex 为 0
super(length);
// 原生 ByteBuf
this.buffer = buffer;
// 数据偏移为原生 ByteBuf 的 readerIndex
adjustment = index;
// 设置视图 ByteBuf 的 writerIndex
writerIndex(length);
}
}在获取到 Unsafe 实例之后,我们还需要检查 Unsafe 中是否包含所有 Netty 用到的 low-level direct buffer access API ,确保这些 API 可以正常有效的运行。比如,是否包含 copyMemory 方法。
@Override
public ByteBuf retainedSlice() {
// 原生 ByteBuf 的引用计数加 1
return slice().retain();
}是否可以通过 Unsafe 访问到 NIO Buffer 的 address 字段,由于后续我们需要直接操作内存地点。
@Override
public ByteBuf duplicate() {
// 确保 ByteBuf 的引用计数不为 0
ensureAccessible();
return new UnpooledDuplicatedByteBuf(this);
}在整个过程中如果发生任何非常,则表现在当前 classpath 下,不存在 sun.misc.Unsafe 类或者是由于差别版本 JDK 的设计,Unsafe 中没有 Netty 所需要的一些须要的访存 API 。如许一来我们就无法使用 Unsafe,内存的访问方式就需要回退到 NoUnsafe。
public class DuplicatedByteBuf extends AbstractDerivedByteBuf {
// 原生 ByteBuf
private final ByteBuf buffer;
public DuplicatedByteBuf(ByteBuf buffer) {
this(buffer, buffer.readerIndex(), buffer.writerIndex());
}
DuplicatedByteBuf(ByteBuf buffer, int readerIndex, int writerIndex) {
// 初始化视图 ByteBuf 的 maxCapacity 与原生的相同
super(buffer.maxCapacity());
// 原生 ByteBuf
this.buffer = buffer;
// 视图 ByteBuf 的 readerIndex , writerIndex 也与原生相同
setIndex(readerIndex, writerIndex);
markReaderIndex();
markWriterIndex();
}
@Override
public int capacity() {
// 视图 ByteBuf 的 capacity 也与原生相同
return unwrap().capacity();
}
}如果在整个过程中没有发生任何非常,我们获取到了一个有效的 UNSAFE 实例,那么后续将正式开启 Unsafe 的内存访问方式。
@Override
public ByteBuf retainedDuplicate() {
return duplicate().retain();
}完整的 hasUnsafe() 判断逻辑如下:
[*]如果当前平台是安卓或者 .NET ,则不能开启 Unsafe,由于这些平台并不包含 sun.misc.Unsafe 类。
[*]-Dio.netty.noUnsafe 参数需要设置为 false (默认开启)。
3.. 当前 classpath 下是否包含有效的 sun.misc.Unsafe 类。
[*]Unsafe 实例需要包含须要的访存 API 。
public abstract class AbstractByteBuf extends ByteBuf {
@Override
public ByteBuf copy() {
// 从原生 ByteBuf 中的 readerIndex 开始,拷贝 readableBytes 个字节到新的 ByteBuf 中
return copy(readerIndex, readableBytes());
}
}如果 PlatformDependent.hasUnsafe() 方法返回 true , 那么后续 Netty 都会创建 Unsafe 类型的 ByteBuf。
6. Pooled or Unpooled
站在内存管理的角度上来讲,Netty 将 ByteBuf 分为了 池化(Pooled) 和 非池化(Unpooled)两个大类,此中 Unpooled 类型的 ByteBuf是用到的时候才去临时创建,使用完的时候再去开释。
而 Direct Memory 的申请和开释开销相较于 Heap Memory 会大很多,Netty 在面临高并发网络通讯的场景下,Direct Memory 的申请和开释是一个非常频繁的操作,这种大量频繁地内存申请开释操尴尬刁难程序的性能影响是巨大的,因此 Netty 引入了内存池将这些 Direct Memory 同一池化管理起来。
Netty 提供了 -Dio.netty.allocator.type 参数来让我们决定是否接纳内存池来管理 ByteBuf , 默认值是 pooled , 也就是说 Netty 默认是接纳池化的方式来管理 PooledByteBuf 。如果是安卓平台,那么默认是使用非池化的 ByteBuf (unpooled)。
[*]当参数 io.netty.allocator.type 的值为 pooled 时,Netty 的默认 ByteBufAllocator 是 PooledByteBufAllocator.DEFAULT 。
[*]当参数 io.netty.allocator.type 的值为 unpooled 时,Netty 的默认 ByteBufAllocator 是 UnpooledByteBufAllocator.DEFAULT 。
public class UnpooledDirectByteBuf{
@Override
public ByteBuf copy(int index, int length) {
ensureAccessible();
ByteBuffer src;
try {
// 将原生 ByteBuf 中 [index , index + lengh) 这段范围的数据拷贝到新的 ByteBuf 中
src = (ByteBuffer) buffer.duplicate().clear().position(index).limit(index + length);
} catch (IllegalArgumentException ignored) {
throw new IndexOutOfBoundsException("Too many bytes to read - Need " + (index + length));
}
// 首先新申请一段 native memory , 新的 ByteBuf 初始容量为 length (真实容量),最大容量与原生 ByteBuf 的 maxCapacity 相等
// readerIndex = 0 , writerIndex = length
return alloc().directBuffer(length, maxCapacity()).writeBytes(src);
}
}后续 Netty 在创建 SocketChannel 的时候,在 SocketChannelConfig 中指定的 ByteBufAllocator 就是这里的 ByteBufUtil.DEFAULT_ALLOCATOR,默认情况下为 PooledByteBufAllocator。
public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements Iterable<ByteBuf> {
// 内部 ByteBuf 的分配器,用于后续扩容,copy , 合并等操作
private final ByteBufAllocator alloc;
// compositeDirectBuffer 还是 compositeHeapBuffer ?
private final boolean direct;
// 最大的 components 数组容量(16)
private final int maxNumComponents;
// 当前 CompositeByteBuf 中包含的 components 个数
private int componentCount;
// 存储 component 的数组
private Component[] components; // resized when needed
}当 Netty 读取 Socket 中的网络数据时,首先会从 DefaultChannelConfig 中将 ByteBufAllocator 获取到,然后利用 ByteBufAllocator 从内存池中获取一个 DirectByteBuf ,最后将 Socket 中的数据读取到 DirectByteBuf 中,随后沿着 pipeline 向后传播,进行 IO 处理。
public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
static final int DEFAULT_MAX_COMPONENTS = 16;
}除此之外,Netty 还提供了 ChannelOption.ALLOCATOR 选项,让我们可以在配置 ServerBootstrap 的时候为 SocketChannel 灵活指定自定义的 ByteBufAllocator 。
private CompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents, int initSize) {
// 设置 maxCapacity
super(AbstractByteBufAllocator.DEFAULT_MAX_CAPACITY);
this.alloc = ObjectUtil.checkNotNull(alloc, "alloc");
this.direct = direct;
this.maxNumComponents = maxNumComponents;
// 初始 Component 数组的容量为 maxNumComponents
components = newCompArray(initSize, maxNumComponents);
}这里通过 ChannelOption 来配置 Socket 相干的属性是最高优先级的,它会覆盖掉统统默认配置。
7. Metric
在第四小节中,我们介绍了 Cleaner 和 NoCleaner 这两种 DirectByteBuf,此中 CleanerDirectByteBuf的团体 Direct Memory 的用量是受到 JVM 参数 -XX:MaxDirectMemorySize 限制的,而 NoCleanerDirectByteBuf 的团体 Direct Memory可以突破该参数的限制,JVM 并不会统计这块 Direct Memory 的用量。
Netty 为了及时地开释这些 Direct Memory,通常默认选择 NoCleanerDirectByteBuf,这就要求 Netty 需要对这部分 Direct Memory 的用量进行自行统计限制。NoCleanerDirectByteBuf 的最大可用 Direct Memory 我们可以通过 -Dio.netty.maxDirectMemory 来指定,默认情况下等于 -XX:MaxDirectMemorySize 设置的值。
PlatformDependent 类中的 DIRECT_MEMORY_COUNTER 字段用于统计在 Netty 层面上,所有 NoCleanerDirectByteBuf 占用的 Direct Memory 大小。注意这里并不会统计 CleanerDirectByteBuf 的 Direct Memory 占用,这部分统计由 JVM 负责。
private static Component[] newCompArray(int initComponents, int maxNumComponents) {
// MAX_COMPONENT
int capacityGuess = Math.min(AbstractByteBufAllocator.DEFAULT_MAX_COMPONENTS, maxNumComponents);
// 初始 Component 数组的容量为 maxNumComponents
return new Component;
}PlatformDependent 类是 Netty 最底层的一个类,所有内存的分配,开释动作终极都是在该类中实行,因此 DIRECT_MEMORY_COUNTER 字段统计的是全局的 Direct Memory 大小(Netty 层面)。
每一次的内存申请 —— allocateDirectNoCleaner , 都会增加 DIRECT_MEMORY_COUNTER 计数,每一次的内存开释 —— freeDirectNoCleaner,都会减少 DIRECT_MEMORY_COUNTER 计数。
我们可以通过 PlatformDependent.usedDirectMemory()方法来获取 Netty 当前所占用的 Direct Memory 大小。但如果我们特殊指定了需要使用 CleanerDirectByteBuf , 比如,将 -Dio.netty.maxDirectMemory 参数设置为 0 , 那么这里将会返回-1 。
private static final class Component {
// 原生 ByteBuf
final ByteBuf srcBuf;
// CompositeByteBuf 的 index 加上 srcAdjustment 就得到了srcBuf 的相关 index
int srcAdjustment;
// srcBuf 可能是一个被包装过的 ByteBuf,比如 SlicedByteBuf , DuplicatedByteBuf
// 被 srcBuf 包装的最底层的 ByteBuf 就存放在 buf 字段中
final ByteBuf buf;
// CompositeByteBuf 的 index 加上 adjustment 就得到了 buf 的相关 index
int adjustment;
// 该 Component 在 CompositeByteBuf 视角中表示的数据范围 [offset , endOffset)
int offset;
int endOffset;
}除了 PlatformDependent 这里的全局统计之外,Netty 还提供了以 ByteBufAllocator 为粒度的内存占用统计,统计的维度包括 Heap Memory 的占用和 Direct Memory 的占用。
int srcIdx(int index) {
// CompositeByteBuf 相关的 index 转换成 srcBuf 的相关 index
return index + srcAdjustment;
}Netty 定义的每一个ByteBufAllocator 中,都会有一个 ByteBufAllocatorMetric 类型的字段,该类定义两个计数字段:directCounter,heapCounter。 分别用于统计 Direct Memory和 Heap Memory 的占用。
abstract class AbstractUnpooledSlicedByteBuf {
// 原生 ByteBuf
private final ByteBuf buffer;
}因此从内存占用统计的角度上来说,Netty 又会将整个 ByteBuf 体系分为 Instrumented 和 NoInstrumented 两大类,带有 Instrumented 前缀的 ByteBuf ,无论你是 Heap or Direct , Cleaner or NoCleaner,Unsafe or NoUnsafe 类型的 ByteBuf ,Netty 都会统计这部分内存占用。
@Override
public byte getByte(int index) {
// 通过 CompositeByteBuf 的 index , 找到数据所属的 component
Component c = findComponent(index);
// 首先通过 idx 转换为 buf 相关的 index
// 将对 CompositeByteBuf 的读写操作转换为 buf 的读写操作
return c.buf.getByte(c.idx(index));
}
int idx(int index) {
// 将 CompositeByteBuf 的相关 index 转换为 buf 的相关 index
return index + adjustment;
} // 缓存最近一次查找到的 Component
private Component lastAccessed;
private Component findComponent(int offset) {
Component la = lastAccessed;
// 首先查找 offset 是否恰好落在 lastAccessed 的区间中
if (la != null && offset >= la.offset && offset < la.endOffset) {
return la;
}
// 在所有 Components 中进行二分查找
return findIt(offset);
}8. ByteBufAllocator
在 Netty 中,ByteBuf 的创建必须通过 ByteBufAllocator 进行,不能直接显示地调用 ByteBuf 相干的构造函数自行创建。Netty 定义了两种类型的 ByteBufAllocator:
[*]PooledByteBufAllocator 负责池化 ByteBuf,这里正是 Netty 内存管理的核心,在下一篇文章中,笔者会详细的和大家介绍它。
[*]UnpooledByteBufAllocator 负责分配非池化的 ByteBuf,创建 ByteBuf 的时候临时向 OS 申请 Native Memory ,使用完之后,需要及时的手动调用 release 将 Native Memory 开释给 OS 。
-Dio.netty.allocator.type 参数可以让我们自行选择 ByteBufAllocator 的类型,默认值为 pooled, Netty 默认是接纳池化的方式来管理 ByteBuf 。
private Component findIt(int offset) {
for (int low = 0, high = componentCount; low <= high;) {
int mid = low + high >>> 1;
Component c = components;
if (offset >= c.endOffset) {
low = mid + 1;
} else if (offset < c.offset) {
high = mid - 1;
} else {
lastAccessed = c;
return c;
}
}
throw new Error("should not reach here");
}除了以上两种官方定义的 ByteBufAllocator 之外,我们还可以根据自己实际业务场景来自行定制 ByteBufAllocator , 然后通过第六小节中介绍的 ChannelOption.ALLOCATOR 选项,将 ByteBufAllocator 灵活指定为我们自行定制的实现。
对于 UnpooledByteBuf 来说,Netty 还专门提供了一个工具类 Unpooled,这里定义实现了很多针对 ByteBuf 的实用操作,比如,allocate,wrapped,copied 等。这里笔者以 DirectByteBuf 的创建为例进行说明:
public final class Unpooled {
public static ByteBuf wrappedBuffer(ByteBuf... buffers) {
return wrappedBuffer(buffers.length, buffers);
}
}Unpooled 底层依赖了 UnpooledByteBufAllocator , 所有对 ByteBuf 的创建动作终极都会代理给这个 Allocator 。在 DirectBuffer 的创建过程中,我们可以看到前面介绍的所有类型的 ByteBuf。
public final class Unpooled {
public static ByteBuf wrappedBuffer(int maxNumComponents, ByteBuf... buffers) {
switch (buffers.length) {
case 0:
break;
case 1:
ByteBuf buffer = buffers;
if (buffer.isReadable()) {
// 直接返回 buffer.slice() 视图
return wrappedBuffer(buffer.order(BIG_ENDIAN));
} else {
buffer.release();
}
break;
default:
for (int i = 0; i < buffers.length; i++) {
ByteBuf buf = buffers;
if (buf.isReadable()) {
// 从第一个可读的 ByteBuf —— buffers 开始创建 CompositeByteBuf
return new CompositeByteBuf(ALLOC, false, maxNumComponents, buffers, i);
}
// buf 不可读则 release
buf.release();
}
break;
}
return EMPTY_BUFFER;
}
}
[*]首先 Netty 创建出来的所有 ByteBuf 都是带有 Metric 统计的,详细的 ByteBuf 类型都会带有 Instrumented 前缀。
[*]如果当前 JRE 环境支持 Unsafe , 那么后续就会通过 Unsafe 的方式来对 ByteBuf 进行相干操作(默认),详细的 ByteBuf 类型都会带有 Unsafe 前缀。
[*]如果我们明确指定了 NoCleaner 类型的 DirectByteBuf(默认),那么创建出来的 ByteBuf 类型就会带有 NoCleaner 前缀,由于没有 Cleaner ,这就要求我们使用完 ByteBuf 的时候必须及时地手动进行开释。
[*]如果我们开启了内存泄露探测,那么创建流程的最后,Netty 会用一个LeakAwareByteBuf 去包装新创建出来的 ByteBuf,当这个 ByteBuf 被 GC 的时候,Netty 会通过相干引用计数来判断是否存在忘记 release 的情况,从而确定出是否发生内存泄露。
总结
本文笔者从八个角度为大家详细的剖析了 ByteBuf 的团体设计,这八个角度分别是:内存区域分布的角度,内存管理的角度,内存访问的角度,内存回收的角度,内存统计 Metric 的角度,零拷贝的角度,引用计数的角度,扩容的角度。
到如今为止,我们只是扫清了 Netty 内存管理外围的一些障碍,那么下一篇文章,笔者将带大家深入到内存管理的核心,彻底让大家弄懂 Netty 的内存管理机制。好了,本文的内容就到这里,我们下篇文章见~~~
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]