参考文献

  • 黑马Netty

Buffer(缓冲区)

  • Buffer是一个对象,它包含一些要写入或者要读出的数据.在面向流的I/O中,可以将数据直接写入或者将数据直接读到Stream对象中.

  • 在NIO库中,所有数据都是用缓冲区处理的.在读取数据时,它是直接读到缓冲区中的;在写入数据时,写入到缓冲区中.任何时候访问NIO中的数据,都是通过缓冲区进行操作.

  • 缓冲区实质上是一个数组.通常它是一个字节数组(ByteBuffer),也可以使用其他种类的数组.但是缓冲区不仅仅是一个数组,缓冲区提供了对数据的结构化访问以及维护读写位置(limit)等信息.

  • 最常用的缓冲区是ByteBuffer,一个ByteBuffer提供了一组功能用于操作byte数组.除了ByteBuffer,还有其他的一些缓冲区,事实上,每一种Java基本类型(除了Boolean类型),都有与之对应的缓冲区:

    类型 说明
    ByteBuffer 字节缓冲区
    CharBuffer 字符缓冲区
    ShortBuffer 短整形缓冲区
    IntBuffer 整形缓冲区
    LongBuffer 长整型缓冲区
    FloatBuffer 浮点型缓冲区
    DoubleBuffer 双精度浮点型缓冲区

ByteBuffer结构

  • ByteBufferJava NIO中用于处理二进制数据的缓冲区类,具有以下重要属性:
    • capacity:表示ByteBuffer的容量,即ByteBuffer可以存储的最大字节数.
      • 在创建ByteBuffer时,需要指定其容量.
    • position:表示ByteBuffer的当前位置,即下一个要读写的字节的位置.
      • 初始值为0,可以通过调用ByteBufferposition方法来设置新的位置.
    • limit:表示ByteBuffer的限制位置,即可读写的边界.
      • 初始值为capacity,可以通过调用ByteBufferlimit方法来设置新的限制位置.
      • 在读取数据时,不能超过limit;
      • 在写入数据时,不能超过capacity.

img

  • ByteBuffer中,可读区间和可写区间是通过positionlimit属性来定义的.

    • 可读区间是指从当前位置(position)到限制位置(limit)之间的字节序列,即[position, limit)区间内的字节可以被读取.

      • 在读取数据时,可以通过调用ByteBuffer的get()方法来获取数据,每次读取一个字节,并将position属性增加1.

      img

    • 可写区间是指从当前位置(position)到容量位置(capacity)之间的字节序列,即[position, capacity)区间内的字节可以被写入.

      • 在写入数据时,可以通过调用ByteBufferput()方法来写入数据,每次写入一个字节,并将position属性增加1.

      img

执行ByteBuffer API三个属性的变化过程

  • 一开始

img

  • 写模式下,position 是写入位置,limit等于容量,下图表示写入了 4 个字节后的状态

img

  • flip 动作发生后,position 切换为读取位置,limit 切换为读取限制

img

  • 读取 4 个字节后,状态

img

  • clear 动作发生后,状态

img

  • compact 方法,是把未读完的部分向前压缩,然后切换至写模式
    • compact()是一个ByteBuffer实例的方法,它的作用是将当前位置(position)到限制值(limit)之间的数据拷贝到缓冲区的起始位置,并将position设置为复制后数据的长度.同时,将缓冲区的限制值设置为容量值(capacity)

img

ByteBuffer的创建方式

  • 使用ByteBuffer的静态方法allocate()创建:

    1
    2
    // java.nio.HeapByteBuffer java的堆内存
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    • 这个方法会创建一个容量为1024字节的ByteBuffer实例,初始时所有字节都是0.
  • 使用ByteBuffer的静态方法wrap()创建:

    1
    2
    byte[] byteArray = new byte[1024];
    ByteBuffer buffer = ByteBuffer.wrap(byteArray);
    • 这个方法会将一个已经存在的字节数组包装成ByteBuffer实例,这个ByteBuffer实例的容量是该字节数组的长度.
  • 使用ByteBuffer的静态方法allocateDirect()创建:

    1
    2
    // java.nio.DirectByteBuffer 直接内存
    ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
    • 这个方法会创建一个容量为1024字节的直接ByteBuffer实例,直接ByteBuffer是一种特殊类型的ByteBuffer,它的内存空间是直接分配在系统内存中,而不是在JVM的堆内存中,因此读写效率更高.但是,直接ByteBuffer的创建和销毁比较耗费系统资源,因此一般只在需要处理大量数据时才使用.

Buffer的应用固定逻辑

  • 写操作顺序

    • 调用clear()方法,清空缓冲区并准备写入数据;
    • 调用 put() 方法,将要写入的数据写入缓冲区;
    • 调用 flip() 方法,重置缓冲区游标,将写模式切换为读模式;
    • 调用SocketChannel.write(buffer)方法,将缓冲区中的数据发送到网络的另一端;
    • 调用 clear() 方法,清空缓冲区,准备下一次写操作.
  • 读操作顺序

    • 调用 clear() 方法,清空缓冲区并准备读取数据;
    • 调用 SocketChannel.read(buffer) 方法,从网络中读取数据到缓冲区;
    • 调用 flip() 方法,重置缓冲区游标,将读模式切换为写模式;
    • 调用 get() 方法,从缓冲区中读取数据;
    • 调用 clear() 方法,清空缓冲区,准备下一次读操作.

Channel中的读写操作

  • 读取数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // 读取通道中的数据,并写入到 buf 中
    while (channel.read(buf) != -1){
    // 缓存区切换到读模式
    buf.flip();
    // 读取 buf 中的数据
    while (buf.position() < buf.limit()){
    text.append((char)buf.get());
    }
    // 清空 buffer,缓存区切换到写模式
    buf.clear();
    }
  • 写入数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    for (int i = 0; i < str.length(); i++) {
    buf.put((byte)str.charAt(i));
    // 缓存区已满或者已经遍历到最后一个字符
    if (buf.position() == buf.limit() || i == str.length() - 1) {
    // 将缓冲区由写模式置为读模式
    buf.flip();
    // 将缓冲区的数据写到通道
    channel.write(buf);
    // 清空缓存区,将缓冲区置为写模式,下次才能使用
    buf.clear();
    }
    }
  • 将数据刷出到物理磁盘

    1
    channel.force(false);
    • 该方法的参数为一个 boolean 类型的值。当参数为 false 时,表示只需要强制刷新缓冲区的数据至磁盘,而无需等待数据同步完成;当参数为 true 时,则需要等待数据同步完成才能返回。这里需要注意的是,即使使用了 channel.force(true) 方法,同步操作也不能百分之百地保证数据不会丢失,因为文件系统和磁盘等因素也可能导致数据丢失。
    • 一般来说,在进行文件写入操作时,可以先将数据写入缓冲区,待缓冲区满了或者达到一定数量时,再将缓冲区中的数据调用 channel.force() 方法刷入磁盘。这种做法可以减少频繁的磁盘 I/O 操作,提高程序的性能。但是在要求数据的可靠性比较高的场合,建议采用channel.force(true)方法,保证数据的安全性。
  • 关闭通道

    1
    channel.close();

ByteBffer的写入和读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    @Test
public void testWriteInByteBuffer() {
final ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put((byte) 0x61);
debugAll(buffer);
buffer.put(new byte[]{0x62, 0x63, 0x64});
// 还可以通过channel.read(buffer) 从channel中将数据读取到buffer中
debugAll(buffer);
// 在写模式下直接获取,是获取不到数据的
// System.out.println(buffer.get());

// 切换到读模式
System.out.println("切换到读模式");
// 还可以使用channel.write(buffer); 即从channel中将数据写入到buffer中
buffer.flip();
System.out.println(buffer.get());
debugAll(buffer);

buffer.compact();
debugAll(buffer);
buffer.put((byte) 0x61);
debugAll(buffer);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
+--------+-------------------- all ------------------------+----------------+
position: [1], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 00 00 00 00 00 00 00 00 00 |a......... |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [4], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 00 00 00 00 00 00 |abcd...... |
+--------+-------------------------------------------------+----------------+
切换到读模式
97
+--------+-------------------- all ------------------------+----------------+
position: [1], limit: [4]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 00 00 00 00 00 00 |abcd...... |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [3], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 63 64 64 00 00 00 00 00 00 |bcdd...... |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [4], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 63 64 61 00 00 00 00 00 00 |bcda...... |
+--------+-------------------------------------------------+----------------+
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Test
public void testReadFromByteBuffer() {
final ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put(new byte[]{'a', 'b', 'c', 'd'});
buffer.flip();
buffer.get(new byte[4]);
debugAll(buffer);

// rewind
buffer.rewind();
System.out.println((char)buffer.get());

// mark & reset
// mark 做一个标记,记录position位置
// reset是将position重置mark的位置
System.out.println((char)buffer.get());
// 在c这个位置做个标记
buffer.mark();
System.out.println((char)buffer.get());
System.out.println((char)buffer.get());
// 返回c这个位置继续读
buffer.reset();
System.out.println((char)buffer.get());
System.out.println((char)buffer.get());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
+--------+-------------------- all ------------------------+----------------+
position: [4], limit: [4]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 00 00 00 00 00 00 |abcd...... |
+--------+-------------------------------------------------+----------------+
a
b
c
d
c
d

字符串与ByteBuffer之间进行转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void testByteBufferToString() {
// 字符串==>byte[]==>byteBuffer
final ByteBuffer buffer = ByteBuffer.allocate(16);
final byte[] bytes = "holelin".getBytes();
buffer.put(bytes);
debugAll(buffer);

// Charset并切换到读模式
final ByteBuffer newBuffer = StandardCharsets.UTF_8.encode("holelin");
debugAll(newBuffer);


// wrap并切换到读模式
final ByteBuffer wrapBuffer = ByteBuffer.wrap("holelin".getBytes());
debugAll(wrapBuffer);

// bytebuffer转字符串
final CharBuffer decode = StandardCharsets.UTF_8.decode(newBuffer);
System.out.println(decode.toString());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

+--------+-------------------- all ------------------------+----------------+
position: [7], limit: [16]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 6f 6c 65 6c 69 6e 00 00 00 00 00 00 00 00 00 |holelin.........|
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [7]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 6f 6c 65 6c 69 6e |holelin |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [7]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 6f 6c 65 6c 69 6e |holelin |
+--------+-------------------------------------------------+----------------+
holelin

Scattering Reads

  • Scattering Reads指的是将一个通道中的数据分散读取到多个缓冲区中.使用Scattering Reads可以将一个大的数据块分散到多个小的缓冲区中,从而方便数据的处理和管理.Scattering Reads的典型使用场景是在一个通道中读取多个不同类型的数据,例如在一个TCP连接中读取协议头和数据体.

  • 分散读取,有一个文本文件 3parts.txt

1
onetwothree
  • 使用如下方式读取,可以将数据填充至多个 buffer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
try (RandomAccessFile file = new RandomAccessFile("helloword/3parts.txt", "rw")) {
FileChannel channel = file.getChannel();
ByteBuffer a = ByteBuffer.allocate(3);
ByteBuffer b = ByteBuffer.allocate(3);
ByteBuffer c = ByteBuffer.allocate(5);
channel.read(new ByteBuffer[]{a, b, c});
a.flip();
b.flip();
c.flip();
debug(a);
debug(b);
debug(c);
} catch (IOException e) {
e.printStackTrace();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 6f 6e 65 |one |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 74 77 6f |two |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 74 68 72 65 65 |three |
+--------+-------------------------------------------------+----------------+

Gathering Writes

  • Gathering Writes指的是将多个缓冲区中的数据聚合写入到一个通道中.使用Gathering Writes可以将多个小的数据块聚合成一个大的数据块,从而提高数据的传输效率.Gathering Writes的典型使用场景是在一个TCP连接中发送多个不同类型的数据,例如在一个HTTP响应中发送响应头和响应体.

  • 使用如下方式写入,可以将多个 buffer 的数据填充至 channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
try (RandomAccessFile file = new RandomAccessFile("helloword/3parts.txt", "rw")) {
FileChannel channel = file.getChannel();
ByteBuffer d = ByteBuffer.allocate(4);
ByteBuffer e = ByteBuffer.allocate(4);
channel.position(11);

d.put(new byte[]{'f', 'o', 'u', 'r'});
e.put(new byte[]{'f', 'i', 'v', 'e'});
d.flip();
e.flip();
debug(d);
debug(e);
channel.write(new ByteBuffer[]{d, e});
} catch (IOException e) {
e.printStackTrace();
}
1
2
3
4
5
6
7
8
9
10
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 6f 75 72 |four |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 69 76 65 |five |
+--------+-------------------------------------------------+----------------+
  • 文件内容
1
onetwothreefourfive
  • Scattering Reads和Gathering Writes的主要作用是提高数据的处理和传输效率,减少数据的拷贝和传输次数,从而提高系统的性能和效率.同时,使用这两种操作还可以提高代码的简洁性和可读性,方便代码的维护和扩展.
  • 需要注意的是,Scattering Reads和Gathering Writes只能在非阻塞的通道上进行操作,而且通常需要配合使用Buffer类和Selector类等其他Java NIO中的类来完成.因此,在使用这两种操作时需要对Java NIO的相关知识有一定的了解和掌握.
  • Scattering Reads和Gathering Writes只能在非阻塞的通道上进行操作,这是因为这两种操作需要读写多个缓冲区,需要在缓冲区和通道之间进行多次切换和交换,因此必须使用非阻塞的通道才能保证操作的正常进行.
  • 在阻塞式通道中,读写操作是同步的,即在读写数据时,如果数据还没有到达或者还没有完全写入,那么线程会被阻塞,直到数据到达或者写入完成.因此,如果在阻塞式通道中使用Scattering Reads或Gathering Writes,如果任何一个缓冲区没有得到完全填充或读取,那么整个操作都会被阻塞,因此不适合使用.

粘包和半包现象

  • 粘包现象指的是接收方收到多个数据包后,将它们粘合在一起,导致无法正确解析出每个数据包的边界和内容.
    • 这种现象通常发生在发送方连续发送多个小数据包时,接收方缓冲区大小不够大,导致数据包被合并在一起.
  • 半包现象则是指接收方只接收到了数据包的部分内容,无法完整解析出发送方发送的完整数据包.
    • 这种现象通常发生在发送方连续发送多个大数据包时,接收方缓冲区大小不够大,导致数据包被截断.

解决方案

  1. 固定长度的数据包:发送方在发送数据时,将数据按照固定长度的数据包进行拆分和发送,接收方在接收数据时,每次接收固定长度的数据包,从而避免粘包和半包现象.
  2. 添加特殊字符或标识符:发送方在发送数据时,在每个数据包的结尾添加一个特殊字符或标识符,接收方在接收数据时,根据特殊字符或标识符来判断数据包的边界和内容,从而避免粘包和半包现象.
  3. 使用消息头:发送方在发送数据时,在每个数据包的开头添加一个消息头,其中包含数据包的长度信息,接收方在接收数据时,根据消息头中的长度信息来正确解析出每个数据包的边界和内容,从而避免粘包和半包现象.

常用ByteBuffer API说明

读写模式切换相关

  • flip()

    • 写模式<==>读模式
    • 在写模式下,limit 表示缓冲区的容量,position 表示当前写入的位置。
    • 在读模式下,limit 表示已写入的数据大小,而 position 则重置为 0,这样 Buffer 可以被用于读取之前写入的数据
  • rewind()

    • 将 Buffer 重置为读模式

    • 在读模式下,limit 表示缓冲区中剩余可读数据的数量,position 表示当前读取的位置。

    • rewind() 方法不仅将 position 重置为 0,还会保持 limit 不变,并清空标记,因此可以认为它是将 Buffer 的读位置重置为 0

  • clear()

    • 将 Buffer 重置为写模式,同时清空缓冲区
    • 在写模式下,limit 表示缓冲区的容量,position 表示当前写入的位置。clear() 方法将 limit 和 position 都重置为初始状态,而且会清空缓冲区的数据,让 Buffer 可以被重新写入。

常用API

  • put 存入数据到缓冲区

    • put(byte b):将给定单个字节写入缓冲区的当前位置
    • put(byte[] src):将 src 中的字节写入缓冲区的当前位置
    • put(int index, byte b):将指定字节写入缓冲区的索引位置(不会移动 position)
  • get 获取缓冲区的数据

    • get() :读取单个字节
    • get(byte[] dst):批量读取多个字节到 dst 中
    • get(int index):读取指定索引位置的字节(不会移动 position)
  • compact()

    • compact() 方法只能在写模式下使用
    • compact() 方法之后仍然处于写模式
    • 它将尚未读取的数据移动到缓冲区的起始位置,并将 position 设为移动后的数据长度。这使得 Buffer 可以继续接收新的数据,而不会覆盖未读取的数据。
  • mark() 和 reset()

    • mark() 方法是一个标记方法,用于在缓冲区中设置一个标记,以便后续 reset() 方法可以恢复到该标记处。
    • mark() 方法在读模式和写模式下都可以使用。
    • reset() 方法将 position 重置为最近的标记位置,它只能在读模式下使用。如果在没有设置标记时调用 reset() 方法,则会抛出 InvalidMarkException 异常。
  • hasRemaining()

    • hasRemaining() 方法用于检查缓冲区是否还有剩余数据可供读取。
    • 在读模式下,它返回 (limit - position) > 0,即缓冲区剩余的数据大小;
    • 在写模式下,它返回 (capacity - position) > 0,即缓冲区还可以写入的数据大小。
  • remaining()

    • remaining() 方法用于获取缓冲区中剩余可读或可写的数据大小。
    • 在读模式下,它返回 limit - position;
    • 在写模式下,它返回 capacity - position。
  • isReadOnly()

    • isReadOnly() 方法用于判断当前缓冲区是否为只读模式。
    • 只读缓冲区不能修改数据,它们可以通过 ByteBuffer 的静态方法 allocateDirect(int capacity) 来创建。
    • 只读缓冲区的 limit 和 capacity 属性相等,position 属性则表示已经读取了多少数据。