Netty-概览
参考文献
- 新手入门:目前为止最透彻的的Netty高性能原理和框架架构解析
- Netty实战
Netty
架构图
Netty
源码结构
1 | Transport Services Protocol Support |
netty-common
模块是Netty
的核心基础包,提供了丰富的工具类,其他模块都需要依赖它,在common
模块中,常用的包括通用工具类和自定义并发包.- 通用工具类: 比如定时器工具
TimeTask
,时间轮HashedWheelTimer
等 - 自定义并发包: 比如异步模型
Future&Promise
,相比JDK
增强的FastThreadLocal
等
- 通用工具类: 比如定时器工具
netty-buffer
模块中Netty
自己实现了一个更加完备的ByteBuf
工具类netty-resover
模块主要提供了一些有关基础设置的解析工具,包括IP Address
,Hostname
,DNS
等netty-codec
模块主要负责编解码工作,通过编解码实现原始字节数据与业务实体对象之间的相互转化.netty-handler
模块主要负责数据处理工作,在Netty
中有关数据处理的部分,本质上是一串有序的handler
的集合netty-transport
模块是Netty
提供数据处理和传输的核心模块.
Netty
的优势
- Netty 是一款用于高效开发网络应用的NIO网络框架
- I/O 模型、线程模型和事件处理机制
- 易用性 API 接口
- 对数据协议、序列化的支持
Netty
的零拷贝
- 零拷贝的文件传输:Netty提供了FileRegion接口,通过该接口可以将文件的内容直接传输到网络中,而不需要将文件内容先复制到内存中.这种方式可以避免不必要的内存拷贝,提高传输效率.
- 零拷贝的内存操作:Netty的ByteBuf实现了零拷贝的内存操作,它使用了内存池技术,将内存的分配和释放从应用程序中转移到了Netty内部.在使用ByteBuf时,可以通过slice和duplicate方法创建一个新的ByteBuf对象,它们共享同一块内存,避免了内存拷贝的开销.
- 零拷贝的消息传递:Netty的ChannelPipeline中使用了基于事件的模型,每个事件都被封装为一个消息对象,通过ChannelPipeline中的Context对象进行传递.在传递消息时,Netty使用了引用计数技术,避免了消息的复制和内存拷贝,提高了传输效率.
- 堆外内存,避免 JVM 堆内存到堆外内存的数据拷贝。
- CompositeByteBuf 类,可以组合多个 Buffer 对象合并成一个逻辑上的对象,避免通过传统内存拷贝的方式将几个 Buffer 合并成一个大的 Buffer。
- 通过 Unpooled.wrappedBuffer 可以将 byte 数组包装成 ByteBuf 对象,包装过程中不会产生内存拷贝。
- ByteBuf.slice 操作与 Unpooled.wrappedBuffer 相反,slice 操作可以将一个 ByteBuf 对象切分成多个 ByteBuf 对象,切分过程中不会产生内存拷贝,底层共享一个 byte 数组的存储空间。
- Netty 使用 FileRegion 实现文件传输,FileRegion 底层封装了 FileChannel#transferTo() 方法,可以将文件缓冲区的数据直接传输到目标 Channel,避免内核缓冲区和用户态缓冲区之间的数据拷贝,这属于操作系统级别的零拷贝。
Netty
核心组件
-
Bootstrap:客户端启动器,用于创建客户端连接.可以设置连接参数、处理器、编解码器等.
-
ServerBootstrap:服务器启动器,用于创建服务器.可以设置监听端口、处理器、编解码器等.
-
Channel:Netty的基础组件,代表一个连接.通过Channel,可以读写数据、绑定监听端口、注册事件等.
-
EventLoop:事件循环组件,负责监听和处理事件.Netty采用单线程模型,每个Channel都由一个EventLoop负责处理.EventLoop采用类似于生产者-消费者模型,不断地从任务队列中取出任务并执行.
-
ChannelHandler:事件处理器,负责处理Channel的各种事件,例如连接建立、数据读写等.通过重写ChannelHandler中的方法,可以实现自定义的事件处理逻辑.
-
ChannelPipeline:事件处理链,将ChannelHandler按照一定的顺序组成一个链表.当有事件到达时,Netty会从链表头开始,依次将事件传递给每个ChannelHandler进行处理.
-
ByteBuf:字节缓冲区,Netty中的数据读写都是通过ByteBuf来实现的.ByteBuf实现了可读、可写、可扩容等功能,可以大大提高数据读写的效率.
-
Codec:编解码器,用于将Java对象和二进制数据之间进行转换.Netty中提供了多种编解码器,例如字符串编码器、对象编解码器等.
-
EventLoopGroup:事件循环组,包括BossGroup和WorkerGroup两个子组.BossGroup负责监听连接请求,WorkerGroup负责处理连接的读写事件.
-
ChannelFuture:异步操作结果,用于获取异步操作的结果或状态
Bootstrap、ServerBootstrap
Bootstrap
意思是引导,一个 Netty 应用通常由一个Bootstrap
开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中Bootstrap
类是客户端程序的启动引导类,ServerBootstrap
是服务端启动引导类
ChannelFuture
- 在 Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理,但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件.
- 可以将
ChannelFuture
看作将来要执行的操作的结果的占位符.它究竟什么时候被执行则可能取决于若干的因素,因此不可能准确的预测,但是可以肯定的是它将会被执行.此外,所有属于同一个 Channel的操作都被保证其将以它们被调用的顺序被执行.
Channel
-
与
Socket
类似 -
Netty 网络通信的组件,能够用于执行网络 I/O 操作.Channel 为用户提供
- 当前网络连接的通道的状态(例如是否打开?是否已连接?
- 网络连接的配置参数 (例如接收缓冲区大小)
- 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成.
- 调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以 I/O 操作成功、失败或取消时回调通知调用方.
- 支持关联 I/O 操作与对应的处理程序.
-
不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应,下面是一些常用的 Channel 类型:
NioSocketChannel
: 异步的客户端 TCP Socket 连接.NioServerSocketChannel
: 异步的服务器端 TCP Socket 连接.NioDatagramChannel
: 异步的 UDP 连接.NioSctpChannel
: 异步的客户端 Sctp 连接.NioSctpServerChannel
: 异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO.
EventLoop
EventLoop
定义了Netty的核心抽象,用于处理连接的生命周期中发生的事件.
NioEventLoop
NioEventLoop
中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用NioEventLoop
的 run 方法,执行 I/O 任务和非 I/O 任务- I/O 任务,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法触发.
- 非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks 方法触发
- 两种任务的执行时间比由变量 ioRatio 控制,默认为 50,则表示允许非 IO 任务执行的时间与 IO 任务的执行时间相等
NioEventLoopGroup
NioEventLoopGroup
,主要管理 eventLoop 的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop
)负责处理多个 Channel 上的事件,而一个 Channel 只对应于一个线程.
ChannelHandler
- 它充当了所有处理入站和出站数据的应用程序逻辑的容器.
ChannelHandler
是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其ChannelPipeline
(业务处理链)中的下一个处理程序
ChannelPipline
- 保存
ChannelHandler
的 List,用于处理或拦截Channel
的入站事件和出站操作**,它们的执行顺序是由它们被添加的顺序所决定的**. ChannelPipeline
实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的ChannelHandler
如何相互交互.
Netty
服务端
实现步骤
- 创建Bootstrap和EventLoopGroup对象:首先,需要创建Bootstrap和EventLoopGroup对象.Bootstrap用于创建客户端连接,EventLoopGroup用于管理处理器线程.
- 配置Bootstrap和Channel:其次,需要配置Bootstrap和Channel.可以设置连接参数、处理器、编解码器等.例如,可以设置连接超时时间、添加ChannelHandler、设置编解码器等.
- 连接到服务器:然后,调用Bootstrap的connect()方法连接到服务器.此时,Netty会创建一个新的Channel,并将其注册到EventLoopGroup中.
- 发送请求数据:接下来,可以通过Channel向服务器发送请求数据.可以使用write()方法发送数据,也可以使用flush()方法刷新缓冲区.
- 处理响应数据:当服务器返回响应数据时,Netty会触发ChannelHandler的channelRead()方法.在channelRead()方法中,可以读取响应数据,并进行相应的处理.
- 关闭连接:处理完响应数据后,需要关闭连接.可以调用Channel的close()方法关闭连接,也可以使用ChannelFuture的addListener()方法添加监听器,在连接关闭后执行相应的操作.
具体代码
1 | package cn.holelin.netty; |
- 在Netty中,每个Boss NioEventLoop和每个Worker NioEventLoop的循环执行的任务包括以下三个步骤:
- 轮询事件:Boss NioEventLoop轮询的是accept事件,Worker NioEventLoop轮询的是read和write事件。
- 处理I/O事件:Boss NioEventLoop处理accept事件,与Client建立连接,生成NioSocketChannel,并将NioSocketChannel注册到某个Worker NioEventLoop的Selector上;Worker NioEventLoop处理I/O事件,即read、write事件,在NioSocketChannel可读、可写事件发生时进行处理。
- 处理任务队列中的任务:每个NioEventLoop维护着一个任务队列,其中包含了需要在事件处理完成后执行的任务。每次循环时,NioEventLoop会从任务队列中取出一定数量的任务,并按照顺序执行。这些任务可以是用户调用eventloop.execute或schedule方法提交的任务,也可以是其它线程提交到该NioEventLoop的任务。
- 需要注意的是,Boss NioEventLoop和Worker NioEventLoop的执行逻辑是不同的。Boss NioEventLoop只负责接受连接请求并将连接分配给Worker NioEventLoop,而Worker NioEventLoop才是真正负责处理数据传输的地方。此外,每个NioEventLoop都是单线程执行的,因此不存在线程安全的问题。
Channel
和NioEventLoop
是如何绑定的?
-
Netty中通过
ServerBootstrapAcceptor
这个Handler来处理新建立的连接,会将新建立的Channel绑定到一个NioEventLoop
中 -
源码路径
1
2
3
4
5
6
7
8
9
10# 入口在绑定端口
io.netty.bootstrap.AbstractBootstrap#bind(int) -->
# 真正执行绑定逻辑
io.netty.bootstrap.AbstractBootstrap#doBind(SocketAddress) -->
# 进行初始化和注册操作
io.netty.bootstrap.AbstractBootstrap#initAndRegister() -->
# 初始化Channel
io.netty.bootstrap.ServerBootstrap#init(Channel)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
p.addLast(new ChannelInitializer<Channel>() {
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
public void run() {
// 核心
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24// io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead(ChannelHandlerContext ctx, Object msg)
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}1
2
3
4
5// io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
public ChannelFuture register(Channel channel) {
return next().register(channel);
}- 通过
next
方法选取到NioEventLoop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
// Use a 'long' counter to avoid non-round-robin behaviour at the 32-bit overflow boundary.
// The 64-bit long solves this by placing the overflow so far into the future, that no system
// will encounter this in practice.
private final AtomicLong idx = new AtomicLong();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
public EventExecutor next() {
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
}- 选取完成后,进行注册
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// 将当前Channel的eventLoop属性传递进来的NioEventLoop进行了绑定,后续这个Channel上的任务执行都是调用其eventLoop 来执行,将任务添加到eventLoop 的任务队列中,
// 而eventLoop 最终会调用上面ThreadPerTaskExecutor生成的线程去执行(该线程一直在for循环,从任务对队列里面取出任务执行)
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
} - 通过
Selector BUG
- Selector BUG:若Selector的轮询结果为空,也没有wakeup或新消息处理,则发生空轮询,CPU使用率100%.
- Netty的解决办法
- 对Selector的select操作周期进行统计,每完成一次空的select操作进行一次计数,若在某个周期内连续发生N次空轮询,则触发了epoll死循环bug.重建Selector,判断是否是其他线程发起的重建请求,若不是则将原SocketChannel从旧的Selector上去除注册,重新注册到新的Selector上,并将原来的Selector关闭.
one event loop per channel
-
在Netty中,每个Channel都会被分配到一个EventLoop上,一个EventLoop可以处理多个Channel的事件,但每个Channel只会被一个EventLoop处理.这个机制称为"one event loop per channel".
-
具体地说,当一个Channel被创建时,会被分配到一个EventLoopGroup中的一个EventLoop上.在后续的事件处理中,该Channel所有的事件都会被分配到同一个EventLoop上处理.在EventLoop内部,事件处理是单线程的,即同一个Channel的所有事件都会在同一个线程上处理.
-
此外,Netty还提供了ChannelHandlerContext的概念,用于在ChannelHandler之间传递事件和状态.在同一个ChannelPipeline中,每个ChannelHandler都有一个对应的ChannelHandlerContext,通过该上下文,ChannelHandler可以向前或向后传递事件和状态.在传递事件和状态时,Netty会根据事件类型和ChannelHandlerContext的位置,确定下一个处理该事件的ChannelHandler和对应的EventLoop.
-
综上所述,通过"one event loop per channel"机制和ChannelHandlerContext的传递,Netty可以保证同一个Channel的所有事件都在同一个线程上处理.这种机制可以避免线程切换和同步等开销,提高系统的并发性和性能.同时,也要注意避免在EventLoop中进行耗时的操作,以避免影响其他Channel的事件处理和系统的稳定性.