Netty-EventLoop
参考文献
Channel、EventLoop
和 EventLoopGroup
- 一个
EventLoopGroup
包含一个或者多个EventLoop
; - 一个
EventLoop
在它的生命周期内只和一个Thread
绑定; - 所有由
EventLoop
处理的I/O 事件都将在它专有的Thread
上被处理; - 一个
Channel
在它的生命周期内只注册于一个EventLoop
; - 一个
EventLoop
可能会被分配给一个或多个Channel
.
EventLoop
事件循环对象
EventLoop
是一种事件等待和处理的程序模型,可以解决多线程资源消耗高的问题.EventLoop
通用运行模式为,当事件发生时,应用程序都会将产生的事件放入事件队列中,然后EventLoop
会轮询从队列中取出事件执行或者将事件分发给相应的事件监听者执行.事件执行的方式通常分为立即执行,延后执行,定期执行.
-
EventLoop
本质是一个单线程执行器(同时维护了一个Selector
),里面有run
方法处理Channel
上源源不断的I/O
事件. -
它的继承关系比较复杂
- 一条线是继承自
j.u.c.ScheduledExecutorService
因此包含了线程池中所有的方法 - 另一条线是继承自
Netty
自己的OrderedEventExecutor
- 提供了
boolean inEventLoop(Thread thread)
方法判断一个线程是否属于此EventLoop
- 提供了
parent
方法来看看自己属于哪个EventLoopGroup
- 提供了
- 一条线是继承自
EventLoopGroup
事件循环组
EventLoopGroup
是一组EventLoop
,Channel
一般会调用EventLoopGroup
的register
方法来绑定其中一个EventLoop
,后续这个Channel
上的 I/O 事件都由此EventLoop
来处理(保证了 I/O 事件处理时的线程安全)- 继承自
Netty
自己的EventExecutorGroup
- 实现了
Iterable
接口提供遍历EventLoop
的能力 - 另有
next
方法获取集合中下一个EventLoop
- 实现了
EventLoopGroup
的创建方式
1 | EventLoopGroup group = new DefaultEventLoopGroup(); |
实现 | 使用场景 | 作用 | 注意点 |
---|---|---|---|
DefaultEventLoopGroup |
适用于不需要高并发性能的场景,例如处理定时任务等 | 是一个单线程的 EventLoopGroup 实现 | 由于是单线程的,因此并发性能较差,不适用于高并发场景 |
NioEventLoopGroup |
适用于大部分场景,特别是在处理大量并发连接的情况下 | 使用 Java NIO 实现的 EventLoopGroup | 需要根据实际情况调整线程数量,一般建议线程数量为 CPU 核心数的两倍 |
EpollEventLoopGroup |
适用于 Linux 平台上的高并发场景 | 使用 Linux epoll 实现的 EventLoopGroup | 需要在 Linux 平台上运行,否则无法正常工作 |
KqueueEventLoopGroup |
适用于 BSD 平台上的高并发场景 | 使用 BSD kqueue 实现的 EventLoopGroup | 需要在 BSD 平台上运行,否则无法正常工作 |
NioEventLoopGroup
的事件处理机制采用的是无锁串行化的设计思路.
NioEventLoop
处理普通任务
1 | // 创建一个 NioEventLoopGroup 实例 |
NioEventLoop
处理定时任务
1 | // 创建一个 NioEventLoopGroup 实例 |
NioEventLoop
源码分析
创建NioEventLoop
-
NioEventLoop
是由NioEventLoopGroup
来创建的 -
调用链
1
2
3
4
5
6
7
8
9// 先调用NioEventLoopGroup的无参构造函数
io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup()
-->io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)
-->io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)
-->io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.ThreadFactory, java.nio.channels.spi.SelectorProvider)
-->io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.ThreadFactory, java.nio.channels.spi.SelectorProvider, io.netty.channel.SelectStrategyFactory)
// 最终调用父类MultithreadEventLoopGroup的构造函数
io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.ThreadFactory, java.lang.Object...)
1 | private static final int DEFAULT_EVENT_LOOP_THREADS; |
1 | /** |
1 |
|
-
创建
DefaultEventLoop
调用链1
2
3
4io.netty.channel.DefaultEventLoop#DefaultEventLoop(io.netty.channel.EventLoopGroup, java.util.concurrent.Executor)
-->io.netty.channel.SingleThreadEventLoop#SingleThreadEventLoop(io.netty.channel.EventLoopGroup, java.util.concurrent.Executor, boolean)
-->io.netty.channel.SingleThreadEventLoop#SingleThreadEventLoop(io.netty.channel.EventLoopGroup, java.util.concurrent.Executor, boolean, int, io.netty.util.concurrent.RejectedExecutionHandler)
-->io.netty.util.concurrent.SingleThreadEventExecutor#SingleThreadEventExecutor(io.netty.util.concurrent.EventExecutorGroup, java.util.concurrent.Executor, boolean, int, io.netty.util.concurrent.RejectedExecutionHandler)
Selector
的创建
1 | NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, |
-
在
NioEventLoop
中有两个Selector
1
2
3
4
5/**
* The NIO {@link Selector}.
*/
private Selector selector;
private Selector unwrappedSelector;-
selector
: 为Netty
优化过的的Selector
-
unwrappedSelector
: 为Java NIO
原生的Selector
-
两者的区别在于:
selector
在获取SelectedSelectionKey
时采用数组的方式来获取的,可以避免遍历集合,从而提高了性能.1
2
3
4
5
6final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
SelectionKey[] keys;
int size;
// ...略
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public abstract class SelectorImpl
extends AbstractSelector
{
// 表示当前注册在该选择器上的所有SelectionKey对象的集合.
// 当注册一个通道到该选择器上时,将会创建一个SelectionKey对象,并添加到keys集合中.
// 当通道关闭或取消注册时,对应的SelectionKey对象将从keys集合中删除
// The set of keys registered with this Selector
private final Set<SelectionKey> keys;
// 表示当前有数据可以读取或写入的SelectionKey对象的集合.
// 当调用Selector.select()方法时,选择器会检查注册在该选择器上的所有通道,将所有有数据可以读取或写入的通道对应的SelectionKey对象添加到selectedKeys集合中.
// 在处理完selectedKeys集合中的所有SelectionKey对象后,应该手动清空该集合
// The set of keys with data ready for an operation
private final Set<SelectionKey> selectedKeys;
// 表示keys集合的不可变视图.这个视图不允许添加或删除SelectionKey对象,只能获取keys集合中的元素
// Public views of the key sets
private final Set<SelectionKey> publicKeys; // Immutable
// 表示selectedKeys集合的不可变视图.这个视图不允许添加或删除SelectionKey对象,只能获取selectedKeys集合中的元素
private final Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition
}
-
-
具体实现
1
2
3
4
5
6
7private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
执行任务
- 入口
io.netty.util.concurrent.SingleThreadEventExecutor#execute
1 |
|
1 | private void execute(Runnable task, boolean immediate) { |
启动线程
1 | private void startThread() { |
1 | private void doStartThread() { |
1 |
|
1 |
|
处理事件
1 | private void processSelectedKeys() { |
1 | private void processSelectedKeysOptimized() { |
1 | private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { |
1 | private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { |
OP_ACCEPT
事件处理流程
-
大致流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20//1 阻塞直到事件发生
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
//2 拿到一个事件
SelectionKey key = iter.next();
//3 如果是 accept 事件
if (key.isAcceptable()) {
//4 执行 accept
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking(false);
//5 关注 read 事件
channel.register(selector, SelectionKey.OP_READ);
}
// ...
} -
具体源码:
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// doReadMessages 中执行了 accept 并创建 NioSocketChannel 作为消息放入 readBuf
// readBuf 是一个 ArrayList 用来缓存消息
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// localRead 为 1,就一条消息,即接收一个客户端连接
allocHandle.incMessagesRead(localRead);
} while (continueReading(allocHandle));
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 触发 read 事件,让 pipeline 上的 handler 处理,这时是处理
// io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22// io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28// io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 这时的 msg 是 NioSocketChannel
final Channel child = (Channel) msg;
// NioSocketChannel 添加 childHandler 即初始化器
child.pipeline().addLast(childHandler);
// 设置选项
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
// 注册 NioSocketChannel 到 nio worker 线程,接下来的处理也移交至 nio worker 线程
childGroup.register(child).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
} -
最终走到
io.netty.channel.nio.AbstractNioChannel#doBeginRead
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
// 处理accept事件时:
// readInterestOp 取值是 16,在 NioServerSocketChannel 创建时初始化好,代表关注 accept 事件
// 处理read事件时:
// readInterestOp 取值是 1,在 NioSocketChannel 创建时初始化好,代表关注 read 事件
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
OP_READ
事件处理流程
-
入口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
// io.netty.allocator.type 决定 allocator 的实现
final ByteBufAllocator allocator = config.getAllocator();
// 用来分配 byteBuf,确定单次读取大小
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
// 读取数据
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
// 触发 read 事件,让 pipeline 上的 handler 处理,这时是处理 NioSocketChannel 上的 handler
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
// 是否要继续循环
} while (allocHandle.continueReading());
allocHandle.readComplete();
// 数据读取完毕,触发 read complete 事件
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
} -
判断是否需要继续读取
1 |
|
writeAndFlush
源码流程
1 | io.netty.channel.ChannelOutboundInvoker#writeAndFlush(java.lang.Object) |
-
逐个调用
handler
的write()
-
逐个调用
handler
的flush()
-
write
- 将
ByteBuf
转换成DirctBuffer
- 将消息(
DirctBuffer
)封装进entry
插入写队列 - 设置写状态
- 将
flush
- 刷新标志,设置写状态
- 变量
buffer
队列,过滤Buffer
- 调用
JDK
底层的API
,把ByteBuf
写入JDK
原生的ByteBuffer
EventLoop
最佳实践
- 网络连接建立过程中三次握手,安全认证的过程消耗不少时间.建立采用
Boos
和Worker
两个EventLoopGroup
,有助于分担Reactor
线程的压力. - 由于
Reactor
线程模型适合处理耗时短的任务场景,对于耗时较长的ChannelHandler
可以考虑维护一个业务线程池,将编解码后的数据封装成Task
进行异步处理,避免ChannelHandler
阻塞而造成EventLoop
不可用. - 如果业务逻辑执行时间较短,建议直接在
ChannelHandler
中执行.例如编解码操作,这样可以避免过度设计而造成架构的复杂性. - 不宜设计过多的
ChannelHandler
.对于系统性能和可维护性都会存在问题,在设计业务架构的时候,需要明确业务分层和Netty
层之间的界限.不要一昧地将业务逻辑都添加到ChannelHandler
中
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 HoleLin's Blog!