Netty-ByteBuf
参考文献
ByteBuffer
1 | +--------------------------------------------------------------------------------+ |
mark
: 为某个读取过的关键位置做标记,方便回退到该位置position
: 当前读取的位置- 可读区 (
readable
): 从position
到limit
之间的数据区域 limit
: buffer中有效的数据长度capacity
: 初始化时的空间容量mark <= position <= limit <= capactiy
ByteBuffer
的缺点
- 第一,ByteBuffer 分配的长度是固定的,无法动态扩缩容,所以很难控制需要分配多大的容量。如果分配太大容量,容易造成内存浪费;如果分配太小,存放太大的数据会抛出 BufferOverflowException 异常。在使用 ByteBuffer 时,为了避免容量不足问题,你必须每次在存放数据的时候对容量大小做校验,如果超出 ByteBuffer 最大容量,那么需要重新开辟一个更大容量的 ByteBuffer,将已有的数据迁移过去。整个过程相对烦琐,对开发者而言是非常不友好的。
- 第二,ByteBuffer 只能通过 position 获取当前可操作的位置,因为读写共用的 position 指针,所以需要频繁调用 flip、rewind 方法切换读写状态,开发者必须很小心处理 ByteBuffer 的数据读写,稍不留意就会出错。
ByteBuf
-
ByteBuf
提供了两个指针变量来支持顺序的读写操作——readerIndex
用于读操作,writerIndex
用于写操作.下图给出了两个指针如何将缓冲区分割为3个区域:1
2
3
4
5
6+-------------------+------------------+------------------+
| discardable bytes | readable bytes | writable bytes |
| | (CONTENT) | |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity
Discardable bytes
(可废弃的字节)
-
该段包含已经通过读取操作读取的字节.最初,该段的长度为0,但随着读操作的执行,其长度会增加到
writerIndex
.读取的字节可以通过调用discardReadBytes()
来回收未使用的区域来丢弃,如下图所示:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17BEFORE discardReadBytes()
+-------------------+------------------+------------------+
| discardable bytes | readable bytes | writable bytes |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity
AFTER discardReadBytes()
+------------------+--------------------------------------+
| readable bytes | writable bytes (got more space) |
+------------------+--------------------------------------+
| | |
readerIndex (0) <= writerIndex (decreased) <= capacity
Clearing the buffer indexes
(清除缓冲区索引)
-
可以通过调用
clear()
将readerIndex
和writerIndex
都设置为0.它不清空缓冲区内容(例如填充0),只清空两个指针.请注意,这个操作的语义与Buffer.clear()
不同.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16BEFORE clear()
+-------------------+------------------+------------------+
| discardable bytes | readable bytes | writable bytes |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity
AFTER clear()
+---------------------------------------------------------+
| writable bytes (got more space) |
+---------------------------------------------------------+
| |
0 = readerIndex = writerIndex <= capacity
ByteBuf API
的优点
- 它可以被用户自定义的缓冲区类型扩展;
- 通过内置的复合缓冲区类型实现了透明的零拷贝;
- 容量可以按需增长(类似于
JDK
的StringBuilder
); - 在读和写这两种模式之间切换不需要调用
ByteBuffer
的flip()
方法; - 读和写使用了不同的索引;
- 支持方法的链式调用;
- 支持引用计数;
- 支持池化
ByteBuf
分类
ByteBuf
可以划分为三个不同的纬度:Heap/Direct
Pooled/Unpooled
Unsafe/非Unsafe
Heap/Direct
就是堆内和堆外内存。Heap 指的是在 JVM 堆内分配,底层依赖的是字节数据;Direct 则是堆外内存,不受 JVM 限制,分配方式依赖 JDK 底层的 ByteBuffer。Pooled/Unpooled
表示池化还是非池化内存。Pooled 是从预先分配好的内存中取出,使用完可以放回 ByteBuf 内存池,等待下一次分配。而 Unpooled 是直接调用系统 API 去申请内存,确保能够被 JVM GC 管理回收。Unsafe/非 Unsafe
的区别在于操作方式是否安全。 Unsafe 表示每次调用 JDK 的 Unsafe 对象操作物理内存,依赖 offset + index 的方式操作数据。非 Unsafe 则不需要依赖 JDK 的 Unsafe 对象,直接通过数组下标的方式操作数据。
ByteBuf
的使用模式
- 堆缓冲区
- 直接缓冲区
- 复合缓冲区
CompositeByteBuf
复合缓冲区
-
使用
ByteBuffer
的复合缓冲区模式1
2
3
4
5
6
7
8// Use an array to hold the message parts
ByteBuffer[] message = new ByteBuffer[] { header, body };
// Create a new ByteBuffer and use copy to merge the header and body
ByteBuffer message2 =
ByteBuffer.allocate(header.remaining() + body.remaining());
message2.put(header);
message2.put(body);
message2.flip(); -
使用
CompositeByteBuf
的复合缓冲区模式1
2
3
4
5
6
7
8
9CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
ByteBuf headerBuf = ...; // can be backing or direct
ByteBuf bodyBuf = ...; // can be backing or direct
messageBuf.addComponents(headerBuf, bodyBuf);
.....
messageBuf.removeComponent(0); // remove the header
for (ByteBuf buf : messageBuf) {
System.out.println(buf.toString());
} -
访问
CompositeByteBuf
中的数据1
2
3
4
5CompositeByteBuf compBuf = Unpooled.compositeBuffer();
int length = compBuf.readableBytes();
byte[] array = new byte[length];
compBuf.getBytes(compBuf.readerIndex(), array);
handleArray(array, 0, array.length);
派生缓冲区
- 派生缓冲区为 ByteBuf提供了以专门的方式来呈现其内容的视图.这类视图是通过以下方法被创建的
duplicate()
slice()
slice(int, int)
Unpooled.unmodifiableBuffer(…)
order(ByteOrder)
readSlice(int)
- 每个这些方法都将返回一个新的 ByteBuf 实例,它具有自己的读索引、写索引和标记索引.其内部存储和 JDK 的
ByteBuffer
一样也是共享的.这使得派生缓冲区的创建成本是很低廉的,但是这也意味着,如果你修改了它的内容,也同时修改了其对应的源实例,所以要小心. - ByteBuf 复制 如果需要一个现有缓冲区的真实副本,请使用
copy()
或者copy(int, int)
方法.不同于派生缓冲区,由这个调用所返回的ByteBuf
拥有独立的数据副本
对ByteBuf
进行切片
1 | // 创建一个用于保存给定字符串的字节的 ByteBuf |
复制一个ByteBuf
1 | Charset utf8 = Charset.forName("UTF-8"); |
ByteBuf
的分配
按需分配:ByteBufAllocator
接口
-
ByteBufAllocator
的方法名称 描述 buffer()
返回一个基于堆或者直接内存存储的 ByteBuf
buffer(int initialCapacity)
/ buffer(int initialCapacity, int maxCapacity)
/ heapBuffer()
返回一个基于堆内存存储的 ByteBuf
heapBuffer(int initialCapacity)
/ heapBuffer(int initialCapacity, int maxCapacity)
/ directBuffer()
返回一个基于直接内存存储的 ByteBuf
directBuffer(int initialCapacity)
/ directBuffer(int initialCapacity, int maxCapacity)
/ compositeBuffer()
返回一个可以通过添加最大到指定数目的基于堆的或者直接内存存储的缓冲区来扩展的 CompositeByteBuf
compositeBuffer(int maxNumComponents)
/ compositeDirectBuffer()
/ compositeDirectBuffer(int maxNumComponents)
/ compositeHeapBuffer()
/ compositeHeapBuffer(int maxNumComponents)
/ ioBuffer()
返回一个用于套接字的 I/O
操作的ByteBuf
1 | // 使用直接内存来创建 |
获取一个到ByteBufAllocator
的引用
1 | Channel channel = ...; |
Netty
提供了两种ByteBufAllocator
的实现:PooledByteBufAllocator
和UnpooledByteBufAllocator
。前者池化了ByteBuf
的实例以提高性能并最大限度地减少内存碎片。此实现使用了一种称为jemalloc
的已被大量现代操作系统所采用的高效方法来分配内存。后者的实现不池化ByteBuf
实例,并且在每次它被调用时都会返回一个新的实例。
Unpooled
缓冲区
-
可能某些情况下,未能获取一个到
ByteBufAllocator
的引用。对于这种情况,Netty
提供了一个简单的称为Unpooled
的工具类,它提供了静态的辅助方法来创建未池化的ByteBuf
实例。名称 描述 buffer()
返回一个基于堆或者直接内存存储的 ByteBuf
buffer(int initialCapacity)
/ buffer(int initialCapacity, int maxCapacity)
/ directBuffer()
返回一个基于直接内存存储的 ByteBuf
directBuffer(int initialCapacity)
/ directBuffer(int initialCapacity, int maxCapacity)
/ wrappedBuffer()
返回一个包装了给定数据的 ByteBuf
copiedBuffer()
返回一个复制了给定数据的 ByteBuf
String
转为ByteBuf
1 | // 第一种 |
ByteBuf
写入
方法签名 | 含义 | 备注 |
---|---|---|
writeBoolean(boolean value) |
写入 boolean 值 | 用一字节 01|00 代表 true|false |
writeByte(int value) |
写入 byte 值 | |
writeShort(int value) |
写入 short 值 | |
writeInt(int value) |
写入 int 值 | Big Endian,即 0x250,写入后 00 00 02 50 |
writeIntLE(int value) |
写入 int 值 | Little Endian,即 0x250,写入后 50 02 00 00 |
writeLong(long value) |
写入 long 值 | |
writeChar(int value) |
写入 char 值 | |
writeFloat(float value) |
写入 float 值 | |
writeDouble(double value) |
写入 double 值 | |
writeBytes(ByteBuf src) |
写入 netty 的 ByteBuf | |
writeBytes(byte[] src) |
写入 byte[] | |
writeBytes(ByteBuffer src) |
写入 nio 的 ByteBuffer | |
int writeCharSequence(CharSequence sequence, Charset charset) |
写入字符串 |
-
注意
- 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用
- 网络传输,默认习惯是
Big Endian
ByteBuf
扩容
ByteBuf
是一个可扩容的字节容器,可以方便地进行数据读写操作.当写入的数据超过了ByteBuf
的容量时,ByteBuf
会自动进行扩容,以支持更多的数据写入.- 当写入的数据大小超过了
ByteBuf
的可写字节数,即writerIndex - readerIndex >= writableBytes()
,会触发扩容操作. - 如果写入的数据大小小于 1024 字节,则将
ByteBuf
的容量扩大到原来的 4 倍或者下一个 16 的整数倍,取两者中的最大值.例如,如果ByteBuf
的容量为 64 字节,需要写入 100 字节的数据,则扩容后的容量为 256 字节. - 如果写入的数据大小大于等于 1024 字节,则将
ByteBuf
的容量扩大到原来的 2 倍或者下一个 2^n,取两者中的最大值.例如,如果ByteBuf
的容量为 1024 字节,需要写入 2048 字节的数据,则扩容后的容量为 4096 字节. - 扩容后的容量不能超过
maxCapacity
,否则会抛出IllegalStateException
异常.
- 当写入的数据大小超过了
ByteBuf
读取
1 | ByteBuf buffer = ...; |
1 | System.out.println(buffer.readByte()); |
-
读过的内容,就属于废弃部分了,再读只能读那些尚未读取的部分
-
如果需要重复读取 int 整数 5,怎么办?
- 可以在
read
前先做个标记mark
1
2buffer.markReaderIndex();
System.out.println(buffer.readInt()); - 可以在
-
这时要重复读取的话,重置到标记位置
reset
1
buffer.resetReaderIndex();
ByteBuf
其他操作
名称 | 描述 |
---|---|
isReadable() |
如果至少有一个字节可供读取,则返回true |
isWritable() |
如果至少有一个字节可被写入,则返回true |
readableBytes() |
返回可被读取的字节数 |
writableBytes() |
返回可被写入的字节数 |
capacity() |
返回ByteBuf 可容纳的字节数.在此之后,它会尝试再次扩展直到达到maxCapacity() |
maxCapacity() |
返回ByteBuf 可以容纳的最大字节数 |
hasArray() |
如果 hasArray() 返回 true ,那么可以通过 array() 获取对应的字节数组,通过 arrayOffset() 获取偏移量,通过 readableBytes() 获取可读字节数,然后可以直接操作数组而不必进行零拷贝.如果 hasArray() 返回 false ,说明 ByteBuf 不由一个字节数组支撑,可能是直接内存,此时操作可能需要进行零拷贝或者使用其他方式来访问数据. |
array() |
如果ByteBuf 由一个字节数组支撑则返回该数组;否则,它将抛出一个UnsupportedOperationException 异常 |
ByteBuf
内存回收
UnpooledHeapByteBuf
使用的是 JVM 内存,因此只需要等待 GC 回收内存即可.UnpooledDirectByteBuf
使用的是直接内存,需要手动释放内存,否则会导致内存泄漏.可以通过调用ByteBuf.release()
方法来释放UnpooledDirectByteBuf
的内存.PooledByteBuf
和它的子类,由于使用了池化机制,内存的释放规则会更加复杂.在使用PooledByteBuf
时,应该遵循以下规则:
- 在使用完毕后,调用
ByteBuf.release()
方法释放PooledByteBuf
的内存. - 不要在
PooledByteBuf
未被释放前,将它转换成其他类型的ByteBuf
对象,否则会导致内存泄漏. - 不要重复释放
PooledByteBuf
,否则会导致内存泄漏.
- 需要注意的是,
PooledByteBuf
的内存池是在ByteBufAllocator
中实现的,因此在使用PooledByteBuf
时,应该使用相同的ByteBufAllocator
对象来创建和释放PooledByteBuf
,避免出现内存池不一致的情况.
采用引用计数法来控制回收内存
-
在 Netty 中,每个
ByteBuf
对象都实现了ReferenceCounted
接口,采用引用计数法来控制内存的释放.- 每个
ByteBuf
对象的初始计数为 1; - 每次调用
release()
方法会将计数减 1,当计数为 0 时,ByteBuf
对应的内存会被回收. - 调用
retain()
方法会将计数加 1,表示该ByteBuf
对象仍然被使用,即使其他handler
调用了release()
方法也不会造成内存回收.
- 每个
-
需要注意的是,当计数为 0 时,底层内存会被回收,此时即使
ByteBuf
对象还存在,其各个方法均无法正常使用.因此,在使用ByteBuf
时,应该避免在计数为 0 时继续使用该对象,避免出现内存泄漏或者访问非法内存的情况. -
另外,由于
ByteBuf
的引用计数机制是线程安全的,因此可以在多线程环境下使用ByteBuf
对象,不需要额外的同步措施.但是,需要注意的是,在使用ByteBuf
时,应该遵循引用计数的规则,避免出现内存泄漏或者访问非法内存的情况.
谁负责release
?
-
因为
Pipeline
的存在,一般需要将 ByteBuf 传递给下一个ChannelHandler
,如果在 finally 中 release 了,就失去了传递性(当然,如果在这个ChannelHandler
内这个 ByteBuf 已完成了它的使命,那么便无须再传递) -
基本规则是,谁是最后使用者,谁负责 release,详细分析如下
-
起点,对于 NIO 实现来讲,在
io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
方法中首次创建ByteBuf
放入Pipeline
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
// 此处放入
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
} -
入站ByteBuf处理原则
- 对原始ByteBuf不做处理,调用
ctx.fireChannelRead(msg)
向后传递,这时无须release - 将原始ByteBuf转换为其它类型的Java对象,这时ByteBuf就没用了,必须release
- 如果不调用
ctx.fireChannelRead(msg)
向后传递,那么也必须release - 注意各种异常,如果ByteBuf没有成功传递到下一个
ChannelHandler
,必须release - 假设消息一直向后传,那么
TailContext
会负责释放未处理消息(原始的ByteBuf)
- 对原始ByteBuf不做处理,调用
-
出站ByteBuf处理原则
- 出站消息最终都会转为ByteBuf输出,一直向前传,由
HeadContext#flush
后release
- 出站消息最终都会转为ByteBuf输出,一直向前传,由
-
异常处理原则
- 有时候不清楚ByteBuf被引用了多少次,但又必须彻底释放,可以循环调用release直到返回true
-