参考文献

非阻塞 vs 阻塞

阻塞

  • 阻塞模式下,相关方法都会导致线程暂停
    • ServerSocketChannel.accept 会在没有连接建立时让线程暂停
    • SocketChannel.read 会在没有数据可读时让线程暂停
    • 阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置
  • 单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持
  • 但多线程下,有新的问题,体现在以下方面
    • 32 位 jvm一个线程 320k,64 位 jvm 一个线程1024k,如果连接数过多,必然导致 OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
    • 可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接.

服务器端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 使用 nio 来理解阻塞模式, 单线程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();

// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));

// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信
log.debug("connecting...");
SocketChannel sc = ssc.accept(); // 阻塞方法,线程停止运行
log.debug("connected... {}", sc);
channels.add(sc);
for (SocketChannel channel : channels) {
// 5. 接收客户端发送的数据
log.debug("before read... {}", channel);
channel.read(buffer); // 阻塞方法,线程停止运行
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("after read...{}", channel);
}
}

客户端

1
2
3
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
System.out.println("waiting...");

非阻塞

  • 非阻塞模式下,相关方法都会不会让线程暂停
    • ServerSocketChannel.accept 在没有连接建立时,会返回 null,继续运行
    • SocketChannel.read在没有数据可读时,会返回 0,但线程不必阻塞,可以去执行其它 SocketChannelread 或是去执行 ServerSocketChannel.accept
    • 写数据时,线程只是等待数据写入 Channel 即可,无需等Channel通过网络把数据发送出去
  • 但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了cpu
  • 数据复制过程中,线程实际还是阻塞的(AIO 改进的地方)

服务器端,客户端代码不变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 使用 nio 来理解非阻塞模式, 单线程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 非阻塞模式
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信
SocketChannel sc = ssc.accept(); // 非阻塞,线程还会继续运行,如果没有连接建立,但sc是null
if (sc != null) {
log.debug("connected... {}", sc);
sc.configureBlocking(false); // 非阻塞模式
channels.add(sc);
}
for (SocketChannel channel : channels) {
// 5. 接收客户端发送的数据
int read = channel.read(buffer);// 非阻塞,线程仍然会继续运行,如果没有读到数据,read 返回 0
if (read > 0) {
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("after read...{}", channel);
}
}
}

多路复用Selector

1
2
3
4
5
6
7
graph TD
subgraph selector 版
thread --> selector
selector --> b1(buffer) --> c1(channel) --> b4(buffer)
selector --> b2(buffer) --> c2(channel) --> b5(buffer)
selector --> b3(buffer) --> c3(channel) --> b6(buffer)
end
  • 单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用

    • 多路复用仅针对网络 IO、普通文件 IO 没法利用多路复用

    • 如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 能够保证

      • 有可连接事件时才去连接
      • 有可读事件才去读取
      • 有可写事件才去写入
        • 限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件
  • 好处

    • 一个线程配合Selector 就可以监控多个Channel 的事件,事件发生线程才去处理.避免非阻塞模式下所做无用功
    • 让这个线程能够被充分利用
    • 节约了线程的数量
    • 减少了线程上下文切换

创建

1
Selector selector = Selector.open();

绑定Channel事件

  • 也称之为注册事件,绑定的事件 selector 才会关心
1
2
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, 绑定事件);
  • Channel必须工作在非阻塞模式
  • FileChannel没有非阻塞模式,因此不能配合Selector一起使用
  • 绑定的事件类型可以有
    • connect - 客户端连接成功时触发
    • accept - 服务器端成功接受连接时触发
    • read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
    • write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况
1
2
3
4
5
6
7
8
9
// -- Operation bits and bit-testing convenience methods --
// 1
public static final int OP_READ = 1 << 0;
// 4
public static final int OP_WRITE = 1 << 2;
// 8
public static final int OP_CONNECT = 1 << 3;
// 16
public static final int OP_ACCEPT = 1 << 4;

监听Channel事件

  • 可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少 channel 发生了事件

  • 方法1: 阻塞直到绑定事件发生

    1
    int count = selector.select();
  • 方法2: 阻塞直到绑定事件发生,或是超时(时间单位为 ms)

    1
    int count = selector.select(long timeout);
  • 方法3: 不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件

    1
    int count = selector.selectNow();

select何时不阻塞

  • 事件发生时

    • 客户端发起连接请求,会触发 accept 事件
    • 客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件
    • channel 可写,会触发 write 事件
    • 在 linux 下nio bug 发生时
  • 调用 selector.wakeup()

  • 调用 selector.close()

  • selector 所在线程 interrupt

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class NoBlockServer {

public static void main(String[] args) throws I/OExceptI/On {

// 1.获取通道
ServerSocketChannel server = ServerSocketChannel.open();

// 2.切换成非阻塞模式
server.configureBlocking(false);

// 3. 绑定连接
server.bind(new InetSocketAddress(6666));

// 4. 创建多路复用器
Selector selector = Selector.open();

// 4.1将通道注册到选择器上,指定接收“监听通道”事件
server.register(selector, SelectionKey.OP_ACCEPT);

// 5. 轮询地获取选择器上已“就绪”的事件--->只要select()>0,说明已就绪
while (selector.select() > 0) {
// 6. 获取当前选择器所有注册的“选择键”(已就绪的监听事件)
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

// 7. 获取已“就绪”的事件,(不同的事件做不同的事)
while (iterator.hasNext()) {

SelectionKey SelectionKey = iterator.next();

// 接收事件就绪
if (SelectionKey.isAcceptable()) {

// 8. 获取客户端的链接
SocketChannel client = server.accept();

// 8.1 切换成非阻塞状态
client.configureBlocking(false);

// 8.2 注册到选择器上-->拿到客户端的连接为了读取通道的数据(监听读就绪事件)
client.register(selector, SelectionKey.OP_READ);

} else if (SelectionKey.isReadable()) { // 读事件就绪

// 9. 获取当前选择器读就绪状态的通道
SocketChannel client = (SocketChannel) SelectionKey.channel();

// 9.1读取数据
ByteBuffer buffer = ByteBuffer.allocate(1024);

// 9.2得到文件通道,将客户端传递过来的图片写到本地项目下(写模式、没有则创建)
FileChannel outChannel = FileChannel.open(Paths.get("2.png"), StandardOpenOptI/On.WRITE, StandardOpenOptI/On.CREATE);

while (client.read(buffer) > 0) {
// 在读之前都要切换成读模式
buffer.flip();

outChannel.write(buffer);

// 读完切换成写模式,能让管道继续读取文件的数据
buffer.clear();
}
}
// 10. 取消选择键(已经处理过的事件,就应该取消掉了)
iterator.remove();
}
}
}
}

为什么要 iterator.remove()

  • 在使用Java NIOSelector进行事件驱动编程时,当一个Channel上的事件被触发时,Selector会将该事件加入到SelectedKeys集合中,以通知应用程序处理.但是,应用程序在处理完selectedKeys集合中的事件后,需要将其从集合中删除,否则下一次select()方法调用时,这个事件仍然会被检测到并通知给应用程序,这会导致重复处理同一个事件的问题.
  • 第一次触发了SelectedKey上的accept 事件,没有移除selectedKey
  • 第二次触发了SelectedKey上的read 事件,但这时selectedKeys 中还有上次的 ssckey,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class NoBlockClient {

public static void main(String[] args) throws I/OExceptI/On {

// 1. 获取通道
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 6666));

// 1.1切换成非阻塞模式
socketChannel.configureBlocking(false);

// 1.2获取选择器
Selector selector = Selector.open();

// 1.3将通道注册到选择器中,获取服务端返回的数据
socketChannel.register(selector, SelectionKey.OP_READ);

// 2. 发送一张图片给服务端吧
FileChannel fileChannel = FileChannel.open(Paths.get("X:\\Users\\1.png"), StandardOpenOptI/On.READ);

// 3.要使用NI/O,有了Channel,就必然要有Buffer,Buffer是与数据打交道的呢
ByteBuffer buffer = ByteBuffer.allocate(1024);

// 4.读取本地文件(图片),发送到服务器
while (fileChannel.read(buffer) != -1) {

// 在读之前都要切换成读模式
buffer.flip();

socketChannel.write(buffer);

// 读完切换成写模式,能让管道继续读取文件的数据
buffer.clear();
}

// 5. 轮训地获取选择器上已“就绪”的事件--->只要select()>0,说明已就绪
while (selector.select() > 0) {
// 6. 获取当前选择器所有注册的“选择键”(已就绪的监听事件)
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

// 7. 获取已“就绪”的事件,(不同的事件做不同的事)
while (iterator.hasNext()) {

SelectionKey SelectionKey = iterator.next();

// 8. 读事件就绪
if (SelectionKey.isReadable()) {

// 8.1得到对应的通道
SocketChannel channel = (SocketChannel) SelectionKey.channel();

ByteBuffer responseBuffer = ByteBuffer.allocate(1024);

// 9. 知道服务端要返回响应的数据给客户端,客户端在这里接收
int readBytes = channel.read(responseBuffer);

if (readBytes > 0) {
// 切换读模式
responseBuffer.flip();
System.out.println(new String(responseBuffer.array(), 0, readBytes));
}
}

// 10. 取消选择键(已经处理过的事件,就应该取消掉了)
iterator.remove();
}
}
}

}

处理消息的边界

img

  • 一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽
  • 另一种思路是按分隔符拆分,缺点是效率低
  • TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量
    • Http 1.1 是 TLV(ype-Length-Value) 格式
    • Http 2.0 是 LTV(ength-Type-Value) 格式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
private static void split(ByteBuffer source) {
source.flip();
for (int i = 0; i < source.limit(); i++) {
// 找到一条完整消息
if (source.get(i) == '\n') {
int length = i + 1 - source.position();
// 把这条完整消息存入新的 ByteBuffer
ByteBuffer target = ByteBuffer.allocate(length);
// 从 source 读,向 target 写
for (int j = 0; j < length; j++) {
target.put(source.get());
}
debugAll(target);
}
}
source.compact(); // 0123456789abcdef position 16 limit 16
}

public static void main(String[] args) throws IOException {
// 1. 创建 selector, 管理多个 channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// 2. 建立 selector 和 channel 的联系(注册)
// SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// key 只关注 accept 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("sscKey:{}", sscKey);
ssc.bind(new InetSocketAddress(8080));
while (true) {
// 3. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行
// select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理
selector.select();
// 4. 处理事件, selectedKeys 内部包含了所有发生的事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题
iter.remove();
log.debug("key: {}", key);
// 5. 区分事件类型
if (key.isAcceptable()) { // 如果是 accept
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
// 将一个 byteBuffer 作为附件关联到 selectionKey 上
SelectionKey scKey = sc.register(selector, 0, buffer);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("{}", sc);
log.debug("scKey:{}", scKey);
} else if (key.isReadable()) { // 如果是 read
try {
SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel
// 获取 selectionKey 上关联的附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1
if(read == -1) {
key.cancel();
} else {
split(buffer);
// 需要扩容
if (buffer.position() == buffer.limit()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();
newBuffer.put(buffer); // 0123456789abcdef3333\n
key.attach(newBuffer);
}
}

} catch (IOException e) {
e.printStackTrace();
key.cancel(); // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)
}
}
}
}
}
1
2
3
4
5
6
7
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
SocketAddress address = sc.getLocalAddress();
// sc.write(Charset.defaultCharset().encode("hello\nworld\n"));
sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));
sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
System.in.read();

处理write事件

一次无法写完例子

  • 非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)
  • 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略
    • 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上
    • selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册
    • 如果不取消,会每次可写均会触发 write 事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class WriteServer {

public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));

Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);

while(true) {
selector.select();

Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
// 1. 向客户端发送内容
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
int write = sc.write(buffer);
// 3. write 表示实际写了多少字节
System.out.println("实际写入字节:" + write);
// 4. 如果有剩余未读字节,才需要关注写事件
if (buffer.hasRemaining()) {
// read 1 write 4
// 在原有关注事件的基础上,多关注 写事件
sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
// 把 buffer 作为附件加入 sckey
sckey.attach(buffer);
}
} else if (key.isWritable()) {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);
System.out.println("实际写入字节:" + write);
if (!buffer.hasRemaining()) { // 写完了
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
key.attach(null);
}
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class WriteClient {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
sc.connect(new InetSocketAddress("localhost", 8080));
int count = 0;
while (true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isConnectable()) {
System.out.println(sc.finishConnect());
} else if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
buffer.clear();
System.out.println(count);
}
}
}
}
}

write为何要取消

  • 只要向 channel 发送数据时,socket 缓冲可写,这个事件会频繁触发,因此应当只在 socket 缓冲区写不下时再关注可写事件,数据写完之后再取消关注

利用多线程优化

  • 分两组选择器

    • 单线程配一个选择器,专门处理 accept 事件
    • 创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理 read 事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
public class ChannelDemo7 {
public static void main(String[] args) throws IOException {
new BossEventLoop().register();
}


@Slf4j
static class BossEventLoop implements Runnable {
private Selector boss;
private WorkerEventLoop[] workers;
private volatile boolean start = false;
AtomicInteger index = new AtomicInteger();

public void register() throws IOException {
if (!start) {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.configureBlocking(false);
boss = Selector.open();
SelectionKey ssckey = ssc.register(boss, 0, null);
ssckey.interestOps(SelectionKey.OP_ACCEPT);
workers = initEventLoops();
new Thread(this, "boss").start();
log.debug("boss start...");
start = true;
}
}

public WorkerEventLoop[] initEventLoops() {
// EventLoop[] eventLoops = new EventLoop[Runtime.getRuntime().availableProcessors()];
WorkerEventLoop[] workerEventLoops = new WorkerEventLoop[2];
for (int i = 0; i < workerEventLoops.length; i++) {
workerEventLoops[i] = new WorkerEventLoop(i);
}
return workerEventLoops;
}

@Override
public void run() {
while (true) {
try {
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
SocketChannel sc = c.accept();
sc.configureBlocking(false);
log.debug("{} connected", sc.getRemoteAddress());
workers[index.getAndIncrement() % workers.length].register(sc);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

@Slf4j
static class WorkerEventLoop implements Runnable {
private Selector worker;
private volatile boolean start = false;
private int index;

private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();

public WorkerEventLoop(int index) {
this.index = index;
}

public void register(SocketChannel sc) throws IOException {
if (!start) {
worker = Selector.open();
new Thread(this, "worker-" + index).start();
start = true;
}
tasks.add(() -> {
try {
SelectionKey sckey = sc.register(worker, 0, null);
sckey.interestOps(SelectionKey.OP_READ);
worker.selectNow();
} catch (IOException e) {
e.printStackTrace();
}
});
worker.wakeup();
}

@Override
public void run() {
while (true) {
try {
worker.select();
Runnable task = tasks.poll();
if (task != null) {
task.run();
}
Set<SelectionKey> keys = worker.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
try {
int read = sc.read(buffer);
if (read == -1) {
key.cancel();
sc.close();
} else {
buffer.flip();
log.debug("{} message:", sc.getRemoteAddress());
debugAll(buffer);
}
} catch (IOException e) {
e.printStackTrace();
key.cancel();
sc.close();
}
}
iter.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

实现UDP

  • UDP 是无连接的,client 发送数据不会管 server 是否开启
  • server 这边的 receive 方法会将接收到的数据存入 byte buffer,但如果数据报文超过 buffer 大小,多出来的数据会被默默抛弃

服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class UdpServer {
public static void main(String[] args) {
try (DatagramChannel channel = DatagramChannel.open()) {
channel.socket().bind(new InetSocketAddress(9999));
System.out.println("waiting...");
ByteBuffer buffer = ByteBuffer.allocate(32);
channel.receive(buffer);
buffer.flip();
debug(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
public class UdpClient {
public static void main(String[] args) {
try (DatagramChannel channel = DatagramChannel.open()) {
ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello");
InetSocketAddress address = new InetSocketAddress("localhost", 9999);
channel.send(buffer, address);
} catch (Exception e) {
e.printStackTrace();
}
}
}