参考文献

ByteBuffer

1
2
3
4
5
6
7
8
9
10
+--------------------------------------------------------------------------------+
| ByteBuffer |
+-------------------+-----------------------------------+-------+----------------+
| | 可读区 (readable) | | |
+--------+----------+-----------------------------------+-------+-------+--------+
| | | [数据, 数据, 数据, 数据, 数据] | | | |
+--------+----------+-----------------------------------+-------+-------+--------+
↑ ↑ ↑ ↑
| | | |
(标记mark) (当前位置 position) (可读区的结束limit) (缓冲区容量capacity)
  • mark: 为某个读取过的关键位置做标记,方便回退到该位置
  • position: 当前读取的位置
  • 可读区 (readable): 从positionlimit之间的数据区域
  • limit: buffer中有效的数据长度
  • capacity: 初始化时的空间容量
  • mark <= position <= limit <= capactiy

ByteBuffer 的缺点

  • 第一,ByteBuffer 分配的长度是固定的,无法动态扩缩容,所以很难控制需要分配多大的容量。如果分配太大容量,容易造成内存浪费;如果分配太小,存放太大的数据会抛出 BufferOverflowException 异常。在使用 ByteBuffer 时,为了避免容量不足问题,你必须每次在存放数据的时候对容量大小做校验,如果超出 ByteBuffer 最大容量,那么需要重新开辟一个更大容量的 ByteBuffer,将已有的数据迁移过去。整个过程相对烦琐,对开发者而言是非常不友好的。
  • 第二,ByteBuffer 只能通过 position 获取当前可操作的位置,因为读写共用的 position 指针,所以需要频繁调用 flip、rewind 方法切换读写状态,开发者必须很小心处理 ByteBuffer 的数据读写,稍不留意就会出错。

ByteBuf

  • ByteBuf提供了两个指针变量来支持顺序的读写操作——readerIndex用于读操作,writerIndex用于写操作.下图给出了两个指针如何将缓冲区分割为3个区域:

    1
    2
    3
    4
    5
    6
    +-------------------+------------------+------------------+
    | discardable bytes | readable bytes | writable bytes |
    | | (CONTENT) | |
    +-------------------+------------------+------------------+
    | | | |
    0 <= readerIndex <= writerIndex <= capacity

Discardable bytes(可废弃的字节)

  • 该段包含已经通过读取操作读取的字节.最初,该段的长度为0,但随着读操作的执行,其长度会增加到writerIndex.读取的字节可以通过调用discardReadBytes()来回收未使用的区域来丢弃,如下图所示:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
     BEFORE discardReadBytes()

    +-------------------+------------------+------------------+
    | discardable bytes | readable bytes | writable bytes |
    +-------------------+------------------+------------------+
    | | | |
    0 <= readerIndex <= writerIndex <= capacity


    AFTER discardReadBytes()

    +------------------+--------------------------------------+
    | readable bytes | writable bytes (got more space) |
    +------------------+--------------------------------------+
    | | |
    readerIndex (0) <= writerIndex (decreased) <= capacity

Clearing the buffer indexes(清除缓冲区索引)

  • 可以通过调用clear()readerIndexwriterIndex都设置为0.它不清空缓冲区内容(例如填充0),只清空两个指针.请注意,这个操作的语义与Buffer.clear()不同.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    BEFORE clear()

    +-------------------+------------------+------------------+
    | discardable bytes | readable bytes | writable bytes |
    +-------------------+------------------+------------------+
    | | | |
    0 <= readerIndex <= writerIndex <= capacity


    AFTER clear()

    +---------------------------------------------------------+
    | writable bytes (got more space) |
    +---------------------------------------------------------+
    | |
    0 = readerIndex = writerIndex <= capacity

ByteBuf API的优点

  • 它可以被用户自定义的缓冲区类型扩展;
  • 通过内置的复合缓冲区类型实现了透明的零拷贝;
  • 容量可以按需增长(类似于JDKStringBuilder);
  • 在读和写这两种模式之间切换不需要调用ByteBufferflip()方法;
  • 读和写使用了不同的索引;
  • 支持方法的链式调用;
  • 支持引用计数;
  • 支持池化

ByteBuf分类

  • ByteBuf可以划分为三个不同的纬度:
    • Heap/Direct
    • Pooled/Unpooled
    • Unsafe/非Unsafe

img

  • Heap/Direct 就是堆内和堆外内存。Heap 指的是在 JVM 堆内分配,底层依赖的是字节数据;Direct 则是堆外内存,不受 JVM 限制,分配方式依赖 JDK 底层的 ByteBuffer。
  • Pooled/Unpooled 表示池化还是非池化内存。Pooled 是从预先分配好的内存中取出,使用完可以放回 ByteBuf 内存池,等待下一次分配。而 Unpooled 是直接调用系统 API 去申请内存,确保能够被 JVM GC 管理回收。
  • Unsafe/非 Unsafe 的区别在于操作方式是否安全。 Unsafe 表示每次调用 JDK 的 Unsafe 对象操作物理内存,依赖 offset + index 的方式操作数据。非 Unsafe 则不需要依赖 JDK 的 Unsafe 对象,直接通过数组下标的方式操作数据。

ByteBuf的使用模式

  • 堆缓冲区
  • 直接缓冲区
  • 复合缓冲区CompositeByteBuf

复合缓冲区

  • 使用ByteBuffer的复合缓冲区模式

    1
    2
    3
    4
    5
    6
    7
    8
    // Use an array to hold the message parts
    ByteBuffer[] message = new ByteBuffer[] { header, body };
    // Create a new ByteBuffer and use copy to merge the header and body
    ByteBuffer message2 =
    ByteBuffer.allocate(header.remaining() + body.remaining());
    message2.put(header);
    message2.put(body);
    message2.flip();
  • 使用CompositeByteBuf的复合缓冲区模式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    CompositeByteBuf messageBuf = Unpooled.compositeBuffer(); 
    ByteBuf headerBuf = ...; // can be backing or direct
    ByteBuf bodyBuf = ...; // can be backing or direct
    messageBuf.addComponents(headerBuf, bodyBuf);
    .....
    messageBuf.removeComponent(0); // remove the header
    for (ByteBuf buf : messageBuf) {
    System.out.println(buf.toString());
    }
  • 访问CompositeByteBuf中的数据

    1
    2
    3
    4
    5
    CompositeByteBuf compBuf = Unpooled.compositeBuffer(); 
    int length = compBuf.readableBytes();
    byte[] array = new byte[length];
    compBuf.getBytes(compBuf.readerIndex(), array);
    handleArray(array, 0, array.length);

派生缓冲区

  • 派生缓冲区为 ByteBuf提供了以专门的方式来呈现其内容的视图.这类视图是通过以下方法被创建的
    • duplicate()
    • slice()
    • slice(int, int)
    • Unpooled.unmodifiableBuffer(…)
    • order(ByteOrder)
    • readSlice(int)
  • 每个这些方法都将返回一个新的 ByteBuf 实例,它具有自己的读索引、写索引和标记索引.其内部存储和 JDK 的ByteBuffer一样也是共享的.这使得派生缓冲区的创建成本是很低廉的,但是这也意味着,如果你修改了它的内容,也同时修改了其对应的源实例,所以要小心.
  • ByteBuf 复制 如果需要一个现有缓冲区的真实副本,请使用copy()或者copy(int, int)方法.不同于派生缓冲区,由这个调用所返回的ByteBuf拥有独立的数据副本
ByteBuf进行切片
1
2
3
4
5
6
7
8
9
10
// 创建一个用于保存给定字符串的字节的 ByteBuf
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
// 创建该 ByteBuf 从索引0开始到索引15结束的一个新切片
ByteBuf sliced = buf.slice(0, 15);
System.out.println(sliced.toString(utf8));
// 更新索引 0 处的字节
buf.setByte(0, (byte)'J');
// 将会成功,因为数据是共享的,对其中 一个所做的更改对另外一个也是可见的
assert buf.getByte(0) == sliced.getByte(0);
复制一个ByteBuf
1
2
3
4
5
6
7
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
ByteBuf copy = buf.copy(0, 15);
System.out.println(copy.toString(utf8));
buf.setByte(0, (byte) 'J');
// 将会成功,因为数据不是共享的
assert buf.getByte(0) != copy.getByte(0);

ByteBuf的分配

按需分配:ByteBufAllocator接口

  • ByteBufAllocator的方法

    名称 描述
    buffer() 返回一个基于堆或者直接内存存储的ByteBuf
    buffer(int initialCapacity) /
    buffer(int initialCapacity, int maxCapacity) /
    heapBuffer() 返回一个基于堆内存存储的ByteBuf
    heapBuffer(int initialCapacity) /
    heapBuffer(int initialCapacity, int maxCapacity) /
    directBuffer() 返回一个基于直接内存存储的ByteBuf
    directBuffer(int initialCapacity) /
    directBuffer(int initialCapacity, int maxCapacity) /
    compositeBuffer() 返回一个可以通过添加最大到指定数目的基于堆的或者直接内存存储的缓冲区来扩展的CompositeByteBuf
    compositeBuffer(int maxNumComponents) /
    compositeDirectBuffer() /
    compositeDirectBuffer(int maxNumComponents) /
    compositeHeapBuffer() /
    compositeHeapBuffer(int maxNumComponents) /
    ioBuffer() 返回一个用于套接字的I/O操作的ByteBuf
1
2
3
4
5
6
// 使用直接内存来创建
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
// 使用堆内存来创建
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
// 使用直接内存来创建
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
获取一个到ByteBufAllocator的引用
1
2
3
4
5
6
7
8
Channel channel = ...;
// 从Channel获取一个到ByteBufAllocator的引用
ByteBufAllocator allocator = channel.alloc();
....
ChannelHandlerContext ctx = ...;
// 从ChannelHandlerContext 获取一个到ByteBufAllocator的引用
ByteBufAllocator allocator2 = ctx.alloc();
...
  • Netty提供了两种ByteBufAllocator的实现:PooledByteBufAllocatorUnpooledByteBufAllocator。前者池化了ByteBuf的实例以提高性能并最大限度地减少内存碎片。此实现使用了一种称为jemalloc的已被大量现代操作系统所采用的高效方法来分配内存。后者的实现不池化ByteBuf实例,并且在每次它被调用时都会返回一个新的实例。

Unpooled缓冲区

  • 可能某些情况下,未能获取一个到ByteBufAllocator的引用。对于这种情况,Netty 提供了一个简单的称为Unpooled的工具类,它提供了静态的辅助方法来创建未池化的ByteBuf实例。

    名称 描述
    buffer() 返回一个基于堆或者直接内存存储的ByteBuf
    buffer(int initialCapacity) /
    buffer(int initialCapacity, int maxCapacity) /
    directBuffer() 返回一个基于直接内存存储的ByteBuf
    directBuffer(int initialCapacity) /
    directBuffer(int initialCapacity, int maxCapacity) /
    wrappedBuffer() 返回一个包装了给定数据的ByteBuf
    copiedBuffer() 返回一个复制了给定数据的ByteBuf

String转为ByteBuf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 第一种
String message = "Hello, Netty!";
ByteBuf byteBuf = Unpooled.copiedBuffer(message, CharsetUtil.UTF_8);

// 第二种
String str = "Hello, Netty!";
ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
ByteBuf byteBuf = allocator.buffer(str.length());
byteBuf.writeCharSequence(str, CharsetUtil.UTF_8);

// 第三种
String str = "Hello, Netty!";
ChannelHandlerContext ctx = ...;
ByteBuf byteBuf = ctx.alloc().buffer(str.length());
byteBuf.writeCharSequence(str, CharsetUtil.UTF_8);

ByteBuf写入

方法签名 含义 备注
writeBoolean(boolean value) 写入 boolean 值 用一字节 01|00 代表 true|false
writeByte(int value) 写入 byte 值
writeShort(int value) 写入 short 值
writeInt(int value) 写入 int 值 Big Endian,即 0x250,写入后 00 00 02 50
writeIntLE(int value) 写入 int 值 Little Endian,即 0x250,写入后 50 02 00 00
writeLong(long value) 写入 long 值
writeChar(int value) 写入 char 值
writeFloat(float value) 写入 float 值
writeDouble(double value) 写入 double 值
writeBytes(ByteBuf src) 写入 netty 的 ByteBuf
writeBytes(byte[] src) 写入 byte[]
writeBytes(ByteBuffer src) 写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset) 写入字符串
  • 注意

    • 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用
    • 网络传输,默认习惯是Big Endian

ByteBuf扩容

  • ByteBuf 是一个可扩容的字节容器,可以方便地进行数据读写操作.当写入的数据超过了 ByteBuf 的容量时,ByteBuf 会自动进行扩容,以支持更多的数据写入.
    • 当写入的数据大小超过了 ByteBuf 的可写字节数,即 writerIndex - readerIndex >= writableBytes(),会触发扩容操作.
    • 如果写入的数据大小小于 1024 字节,则将 ByteBuf 的容量扩大到原来的 4 倍或者下一个 16 的整数倍,取两者中的最大值.例如,如果 ByteBuf 的容量为 64 字节,需要写入 100 字节的数据,则扩容后的容量为 256 字节.
    • 如果写入的数据大小大于等于 1024 字节,则将 ByteBuf 的容量扩大到原来的 2 倍或者下一个 2^n,取两者中的最大值.例如,如果 ByteBuf 的容量为 1024 字节,需要写入 2048 字节的数据,则扩容后的容量为 4096 字节.
    • 扩容后的容量不能超过 maxCapacity,否则会抛出 IllegalStateException 异常.

ByteBuf读取

1
2
3
4
5
ByteBuf buffer = ...;
for (int i = 0; i < buffer.capacity(); i++) {
byte b = buffer.getByte(i);
System.out.println((char)b);
}
1
2
3
4
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
  • 读过的内容,就属于废弃部分了,再读只能读那些尚未读取的部分

  • 如果需要重复读取 int 整数 5,怎么办?

    • 可以在read前先做个标记mark
    1
    2
    buffer.markReaderIndex();
    System.out.println(buffer.readInt());
  • 这时要重复读取的话,重置到标记位置reset

    1
    buffer.resetReaderIndex();

ByteBuf其他操作

名称 描述
isReadable() 如果至少有一个字节可供读取,则返回true
isWritable() 如果至少有一个字节可被写入,则返回true
readableBytes() 返回可被读取的字节数
writableBytes() 返回可被写入的字节数
capacity() 返回ByteBuf可容纳的字节数.在此之后,它会尝试再次扩展直到达到maxCapacity()
maxCapacity() 返回ByteBuf可以容纳的最大字节数
hasArray() 如果 hasArray() 返回 true,那么可以通过 array() 获取对应的字节数组,通过 arrayOffset() 获取偏移量,通过 readableBytes() 获取可读字节数,然后可以直接操作数组而不必进行零拷贝.
如果 hasArray() 返回 false,说明 ByteBuf 不由一个字节数组支撑,可能是直接内存,此时操作可能需要进行零拷贝或者使用其他方式来访问数据.
array() 如果ByteBuf由一个字节数组支撑则返回该数组;否则,它将抛出一个UnsupportedOperationException异常

ByteBuf内存回收

  • UnpooledHeapByteBuf 使用的是 JVM 内存,因此只需要等待 GC 回收内存即可.
  • UnpooledDirectByteBuf 使用的是直接内存,需要手动释放内存,否则会导致内存泄漏.可以通过调用 ByteBuf.release() 方法来释放 UnpooledDirectByteBuf 的内存.
  • PooledByteBuf 和它的子类,由于使用了池化机制,内存的释放规则会更加复杂.在使用 PooledByteBuf 时,应该遵循以下规则:
  1. 在使用完毕后,调用 ByteBuf.release() 方法释放 PooledByteBuf 的内存.
  2. 不要在 PooledByteBuf 未被释放前,将它转换成其他类型的 ByteBuf 对象,否则会导致内存泄漏.
  3. 不要重复释放 PooledByteBuf,否则会导致内存泄漏.
  • 需要注意的是,PooledByteBuf 的内存池是在 ByteBufAllocator 中实现的,因此在使用 PooledByteBuf 时,应该使用相同的 ByteBufAllocator 对象来创建和释放 PooledByteBuf,避免出现内存池不一致的情况.

采用引用计数法来控制回收内存

  • 在 Netty 中,每个 ByteBuf 对象都实现了 ReferenceCounted 接口,采用引用计数法来控制内存的释放.

    • 每个 ByteBuf 对象的初始计数为 1;
    • 每次调用 release() 方法会将计数减 1,当计数为 0 时,ByteBuf 对应的内存会被回收.
    • 调用 retain() 方法会将计数加 1,表示该 ByteBuf 对象仍然被使用,即使其他 handler 调用了 release() 方法也不会造成内存回收.
  • 需要注意的是,当计数为 0 时,底层内存会被回收,此时即使 ByteBuf 对象还存在,其各个方法均无法正常使用.因此,在使用 ByteBuf 时,应该避免在计数为 0 时继续使用该对象,避免出现内存泄漏或者访问非法内存的情况.

  • 另外,由于 ByteBuf 的引用计数机制是线程安全的,因此可以在多线程环境下使用 ByteBuf 对象,不需要额外的同步措施.但是,需要注意的是,在使用 ByteBuf 时,应该遵循引用计数的规则,避免出现内存泄漏或者访问非法内存的情况.

谁负责release?

  • 因为Pipeline的存在,一般需要将 ByteBuf 传递给下一个ChannelHandler,如果在 finally 中 release 了,就失去了传递性(当然,如果在这个ChannelHandler内这个 ByteBuf 已完成了它的使命,那么便无须再传递)

  • 基本规则是,谁是最后使用者,谁负责 release,详细分析如下

    • 起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read方法中首次创建ByteBuf 放入Pipeline

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
          @Override
      public final void read() {
      final ChannelConfig config = config();
      if (shouldBreakReadReady(config)) {
      clearReadPending();
      return;
      }
      final ChannelPipeline pipeline = pipeline();
      final ByteBufAllocator allocator = config.getAllocator();
      final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
      allocHandle.reset(config);

      ByteBuf byteBuf = null;
      boolean close = false;
      try {
      do {
      byteBuf = allocHandle.allocate(allocator);
      allocHandle.lastBytesRead(doReadBytes(byteBuf));
      if (allocHandle.lastBytesRead() <= 0) {
      // nothing was read. release the buffer.
      byteBuf.release();
      byteBuf = null;
      close = allocHandle.lastBytesRead() < 0;
      if (close) {
      // There is nothing left to read as we received an EOF.
      readPending = false;
      }
      break;
      }

      allocHandle.incMessagesRead(1);
      readPending = false;
      // 此处放入
      pipeline.fireChannelRead(byteBuf);
      byteBuf = null;
      } while (allocHandle.continueReading());

      allocHandle.readComplete();
      pipeline.fireChannelReadComplete();

      if (close) {
      closeOnRead(pipeline);
      }
      } catch (Throwable t) {
      handleReadException(pipeline, byteBuf, t, close, allocHandle);
      } finally {
      // Check if there is a readPending which was not processed yet.
      // This could be for two reasons:
      // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
      // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
      //
      // See https://github.com/netty/netty/issues/2254
      if (!readPending && !config.isAutoRead()) {
      removeReadOp();
      }
      }
      }
      }
    • 入站ByteBuf处理原则

      • 对原始ByteBuf不做处理,调用ctx.fireChannelRead(msg)向后传递,这时无须release
      • 将原始ByteBuf转换为其它类型的Java对象,这时ByteBuf就没用了,必须release
      • 如果不调用ctx.fireChannelRead(msg)向后传递,那么也必须release
      • 注意各种异常,如果ByteBuf没有成功传递到下一个ChannelHandler,必须release
      • 假设消息一直向后传,那么TailContext会负责释放未处理消息(原始的ByteBuf)
    • 出站ByteBuf处理原则

      • 出站消息最终都会转为ByteBuf输出,一直向前传,由HeadContext#flush后release
    • 异常处理原则

      • 有时候不清楚ByteBuf被引用了多少次,但又必须彻底释放,可以循环调用release直到返回true

Netty提供的工具类ByteBufUtil