参考文献

Netty架构图

img

Netty源码结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
                               Transport Services                         Protocol Support
+----------------------------------------------+---------------------------------------------------------+
| io.netty.transport | io.netty.codec |
| io.netty.transport.epoll | io.netty.codec.dns |
| io.netty.transport.kqueue | io.netty.codec.haproxy |
| io.netty.transport.unix.common | io.netty.codec.http |
| io.netty.transport.sctp | io.netty.codec.http2 |
| io.netty.transport.rxtx | io.netty.codec.memcache |
| io.netty.transport.udt | io.netty.codec.mqtt |
| | io.netty.codec.redis |
| | io.netty.codec.smtp |
| | io.netty.codec.socks |
| | io.netty.codec.stomp |
| | io.netty.codec.xml |
| +---------------------------------------------------------+
| | io.netty.handler |
| | io.netty.handler.proxy |
+----------------------------------------------+---------------------------------------------------------+
| Core |
+--------------------------------------------------------------------------------------------------------+
| io.netty.buffer |
| io.netty.common |
| io.netty.resolver |
| io.netty.resolver.dns |
+--------------------------------------------------------------------------------------------------------+
  • netty-common模块是Netty的核心基础包,提供了丰富的工具类,其他模块都需要依赖它,在common模块中,常用的包括通用工具类和自定义并发包.
    • 通用工具类: 比如定时器工具TimeTask,时间轮HashedWheelTimer
    • 自定义并发包: 比如异步模型Future&Promise,相比JDK增强的FastThreadLocal
  • netty-buffer模块中Netty自己实现了一个更加完备的ByteBuf工具类
  • netty-resover模块主要提供了一些有关基础设置的解析工具,包括IP Address,Hostname,DNS
  • netty-codec模块主要负责编解码工作,通过编解码实现原始字节数据与业务实体对象之间的相互转化.
  • netty-handler模块主要负责数据处理工作,在Netty中有关数据处理的部分,本质上是一串有序的handler的集合
  • netty-transport模块是Netty提供数据处理和传输的核心模块.

Netty的优势

  • Netty 是一款用于高效开发网络应用的NIO网络框架
    • I/O 模型、线程模型和事件处理机制
    • 易用性 API 接口
    • 对数据协议、序列化的支持

Netty的零拷贝

  • 零拷贝的文件传输:Netty提供了FileRegion接口,通过该接口可以将文件的内容直接传输到网络中,而不需要将文件内容先复制到内存中.这种方式可以避免不必要的内存拷贝,提高传输效率.
  • 零拷贝的内存操作:Netty的ByteBuf实现了零拷贝的内存操作,它使用了内存池技术,将内存的分配和释放从应用程序中转移到了Netty内部.在使用ByteBuf时,可以通过slice和duplicate方法创建一个新的ByteBuf对象,它们共享同一块内存,避免了内存拷贝的开销.
  • 零拷贝的消息传递:Netty的ChannelPipeline中使用了基于事件的模型,每个事件都被封装为一个消息对象,通过ChannelPipeline中的Context对象进行传递.在传递消息时,Netty使用了引用计数技术,避免了消息的复制和内存拷贝,提高了传输效率.
  • 堆外内存,避免 JVM 堆内存到堆外内存的数据拷贝。
  • CompositeByteBuf 类,可以组合多个 Buffer 对象合并成一个逻辑上的对象,避免通过传统内存拷贝的方式将几个 Buffer 合并成一个大的 Buffer。
  • 通过 Unpooled.wrappedBuffer 可以将 byte 数组包装成 ByteBuf 对象,包装过程中不会产生内存拷贝。
  • ByteBuf.slice 操作与 Unpooled.wrappedBuffer 相反,slice 操作可以将一个 ByteBuf 对象切分成多个 ByteBuf 对象,切分过程中不会产生内存拷贝,底层共享一个 byte 数组的存储空间。
  • Netty 使用 FileRegion 实现文件传输,FileRegion 底层封装了 FileChannel#transferTo() 方法,可以将文件缓冲区的数据直接传输到目标 Channel,避免内核缓冲区和用户态缓冲区之间的数据拷贝,这属于操作系统级别的零拷贝。

Netty核心组件

  • Bootstrap:客户端启动器,用于创建客户端连接.可以设置连接参数、处理器、编解码器等.

  • ServerBootstrap:服务器启动器,用于创建服务器.可以设置监听端口、处理器、编解码器等.

  • Channel:Netty的基础组件,代表一个连接.通过Channel,可以读写数据、绑定监听端口、注册事件等.

  • EventLoop:事件循环组件,负责监听和处理事件.Netty采用单线程模型,每个Channel都由一个EventLoop负责处理.EventLoop采用类似于生产者-消费者模型,不断地从任务队列中取出任务并执行.

  • ChannelHandler:事件处理器,负责处理Channel的各种事件,例如连接建立、数据读写等.通过重写ChannelHandler中的方法,可以实现自定义的事件处理逻辑.

  • ChannelPipeline:事件处理链,将ChannelHandler按照一定的顺序组成一个链表.当有事件到达时,Netty会从链表头开始,依次将事件传递给每个ChannelHandler进行处理.

  • ByteBuf:字节缓冲区,Netty中的数据读写都是通过ByteBuf来实现的.ByteBuf实现了可读、可写、可扩容等功能,可以大大提高数据读写的效率.

  • Codec:编解码器,用于将Java对象和二进制数据之间进行转换.Netty中提供了多种编解码器,例如字符串编码器、对象编解码器等.

  • EventLoopGroup:事件循环组,包括BossGroup和WorkerGroup两个子组.BossGroup负责监听连接请求,WorkerGroup负责处理连接的读写事件.

  • ChannelFuture:异步操作结果,用于获取异步操作的结果或状态

Bootstrap、ServerBootstrap

  • Bootstrap意思是引导,一个 Netty 应用通常由一个 Bootstrap开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类

ChannelFuture

  • 在 Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理,但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件.
  • 可以将ChannelFuture看作将来要执行的操作的结果的占位符.它究竟什么时候被执行则可能取决于若干的因素,因此不可能准确的预测,但是可以肯定的是它将会被执行.此外,所有属于同一个 Channel的操作都被保证其将以它们被调用的顺序被执行.

Channel

  • Socket类似

  • Netty 网络通信的组件,能够用于执行网络 I/O 操作.Channel 为用户提供

    • 当前网络连接的通道的状态(例如是否打开?是否已连接?
    • 网络连接的配置参数 (例如接收缓冲区大小)
    • 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成.
    • 调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以 I/O 操作成功、失败或取消时回调通知调用方.
    • 支持关联 I/O 操作与对应的处理程序.
  • 不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应,下面是一些常用的 Channel 类型:

    • NioSocketChannel: 异步的客户端 TCP Socket 连接.
    • NioServerSocketChannel: 异步的服务器端 TCP Socket 连接.
    • NioDatagramChannel: 异步的 UDP 连接.
    • NioSctpChannel: 异步的客户端 Sctp 连接.
    • NioSctpServerChannel: 异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO.

EventLoop

  • EventLoop 定义了Netty的核心抽象,用于处理连接的生命周期中发生的事件.

NioEventLoop

  • NioEventLoop中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用 NioEventLoop 的 run 方法,执行 I/O 任务和非 I/O 任务
    • I/O 任务,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法触发.
    • 非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks 方法触发
  • 两种任务的执行时间比由变量 ioRatio 控制,默认为 50,则表示允许非 IO 任务执行的时间与 IO 任务的执行时间相等

NioEventLoopGroup

  • NioEventLoopGroup,主要管理 eventLoop 的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个 Channel 上的事件,而一个 Channel 只对应于一个线程.

ChannelHandler

  • 它充当了所有处理入站和出站数据的应用程序逻辑的容器.
  • ChannelHandler是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline (业务处理链)中的下一个处理程序

ChannelPipline

  • 保存 ChannelHandler的 List,用于处理或拦截Channel的入站事件和出站操作**,它们的执行顺序是由它们被添加的顺序所决定的**.
  • ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互.

Netty服务端

实现步骤

  1. 创建Bootstrap和EventLoopGroup对象:首先,需要创建Bootstrap和EventLoopGroup对象.Bootstrap用于创建客户端连接,EventLoopGroup用于管理处理器线程.
  2. 配置Bootstrap和Channel:其次,需要配置Bootstrap和Channel.可以设置连接参数、处理器、编解码器等.例如,可以设置连接超时时间、添加ChannelHandler、设置编解码器等.
  3. 连接到服务器:然后,调用Bootstrap的connect()方法连接到服务器.此时,Netty会创建一个新的Channel,并将其注册到EventLoopGroup中.
  4. 发送请求数据:接下来,可以通过Channel向服务器发送请求数据.可以使用write()方法发送数据,也可以使用flush()方法刷新缓冲区.
  5. 处理响应数据:当服务器返回响应数据时,Netty会触发ChannelHandler的channelRead()方法.在channelRead()方法中,可以读取响应数据,并进行相应的处理.
  6. 关闭连接:处理完响应数据后,需要关闭连接.可以调用Channel的close()方法关闭连接,也可以使用ChannelFuture的addListener()方法添加监听器,在连接关闭后执行相应的操作.

具体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package cn.holelin.netty;

import cn.holelin.netty.decoder.MessageDecoder;
import cn.holelin.netty.encoder.MessageEncoder;
import cn.holelin.netty.handlers.ServerChatHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

/**
* @Description:
* @Author: HoleLin
* @CreateDate: 2022/12/9 15:01
* @UpdateUser: HoleLin
* @UpdateDate: 2022/12/9 15:01
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
public class Server {
/**
* 服务器监听端口
*/
private Integer port;

public Server(Integer port) {
this.port = port;
}

public void run() {
// bossGroup 线程池则只是在 Bind 某个端口后,获得其中一个线程作为 MainReactor,
// 专门处理端口的 Accept 事件,每个端口对应一个 Boss 线程
final NioEventLoopGroup bossGroup = new NioEventLoopGroup();
// workerGroup 线程池会被各个SubReactor和Worker线程充分利用
final NioEventLoopGroup workerGroup = new NioEventLoopGroup();

try {
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
// 组装NioEventLoopGroup
.group(bossGroup, workerGroup)
// 设置Channel类型为NIO类型
.channel(NioServerSocketChannel.class)
// 使用匿名内部类的形式初始化通道对象
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 拆包器
//1.拆包器
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,5,4));
//2.自定义解码器
ch.pipeline().addLast(new MessageDecoder());
//3.业务Handler
ch.pipeline().addLast(new ServerChatHandler());
//4.自定义编码器
ch.pipeline().addLast(new MessageEncoder());
}
})
// 设置线程队列得到连接个数
.option(ChannelOption.SO_BACKLOG, 128)
// 设置保持活动连接状态
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口号,启动服务端
ChannelFuture f = serverBootstrap.bind(port).sync();

// 对关闭通道进行监听
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}

public static void main(String[] args) {
final Server server = new Server(6666);
server.run();
}
}
  • 在Netty中,每个Boss NioEventLoop和每个Worker NioEventLoop的循环执行的任务包括以下三个步骤:
    1. 轮询事件:Boss NioEventLoop轮询的是accept事件,Worker NioEventLoop轮询的是read和write事件。
    2. 处理I/O事件:Boss NioEventLoop处理accept事件,与Client建立连接,生成NioSocketChannel,并将NioSocketChannel注册到某个Worker NioEventLoop的Selector上;Worker NioEventLoop处理I/O事件,即read、write事件,在NioSocketChannel可读、可写事件发生时进行处理。
    3. 处理任务队列中的任务:每个NioEventLoop维护着一个任务队列,其中包含了需要在事件处理完成后执行的任务。每次循环时,NioEventLoop会从任务队列中取出一定数量的任务,并按照顺序执行。这些任务可以是用户调用eventloop.execute或schedule方法提交的任务,也可以是其它线程提交到该NioEventLoop的任务。
  • 需要注意的是,Boss NioEventLoop和Worker NioEventLoop的执行逻辑是不同的。Boss NioEventLoop只负责接受连接请求并将连接分配给Worker NioEventLoop,而Worker NioEventLoop才是真正负责处理数据传输的地方。此外,每个NioEventLoop都是单线程执行的,因此不存在线程安全的问题。

ChannelNioEventLoop是如何绑定的?

  • Netty中通过ServerBootstrapAcceptor这个Handler来处理新建立的连接,会将新建立的Channel绑定到一个NioEventLoop

  • 源码路径

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    # 入口在绑定端口
    io.netty.bootstrap.AbstractBootstrap#bind(int) -->
    # 真正执行绑定逻辑
    io.netty.bootstrap.AbstractBootstrap#doBind(SocketAddress) -->

    # 进行初始化和注册操作
    io.netty.bootstrap.AbstractBootstrap#initAndRegister() -->

    # 初始化Channel
    io.netty.bootstrap.ServerBootstrap#init(Channel)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    @Override
    void init(Channel channel) {
    setChannelOptions(channel, newOptionsArray(), logger);
    setAttributes(channel, newAttributesArray());

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

    p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(final Channel ch) {
    final ChannelPipeline pipeline = ch.pipeline();
    ChannelHandler handler = config.handler();
    if (handler != null) {
    pipeline.addLast(handler);
    }

    ch.eventLoop().execute(new Runnable() {
    @Override
    public void run() {
    // 核心
    pipeline.addLast(new ServerBootstrapAcceptor(
    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
    }
    });
    }
    });
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    // io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead(ChannelHandlerContext ctx, Object msg)		
    @Override
    @SuppressWarnings("unchecked")
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
    childGroup.register(child).addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
    if (!future.isSuccess()) {
    forceClose(child, future.cause());
    }
    }
    });
    } catch (Throwable t) {
    forceClose(child, t);
    }
    }
    1
    2
    3
    4
    5
    // io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
    @Override
    public ChannelFuture register(Channel channel) {
    return next().register(channel);
    }
    • 通过next方法选取到NioEventLoop
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
    this.executors = executors;
    }

    @Override
    public EventExecutor next() {
    return executors[idx.getAndIncrement() & executors.length - 1];
    }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    // Use a 'long' counter to avoid non-round-robin behaviour at the 32-bit overflow boundary.
    // The 64-bit long solves this by placing the overflow so far into the future, that no system
    // will encounter this in practice.
    private final AtomicLong idx = new AtomicLong();
    private final EventExecutor[] executors;

    GenericEventExecutorChooser(EventExecutor[] executors) {
    this.executors = executors;
    }

    @Override
    public EventExecutor next() {
    return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
    }
    }
    • 选取完成后,进行注册
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    @Override
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    if (isRegistered()) {
    promise.setFailure(new IllegalStateException("registered to an event loop already"));
    return;
    }
    if (!isCompatible(eventLoop)) {
    promise.setFailure(
    new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
    return;
    }
    // 将当前Channel的eventLoop属性传递进来的NioEventLoop进行了绑定,后续这个Channel上的任务执行都是调用其eventLoop 来执行,将任务添加到eventLoop 的任务队列中,
    // 而eventLoop 最终会调用上面ThreadPerTaskExecutor生成的线程去执行(该线程一直在for循环,从任务对队列里面取出任务执行)
    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
    register0(promise);
    } else {
    try {
    eventLoop.execute(new Runnable() {
    @Override
    public void run() {
    register0(promise);
    }
    });
    } catch (Throwable t) {
    logger.warn(
    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
    AbstractChannel.this, t);
    closeForcibly();
    closeFuture.setClosed();
    safeSetFailure(promise, t);
    }
    }
    }

    private void register0(ChannelPromise promise) {
    try {
    // check if the channel is still open as it could be closed in the mean time when the register
    // call was outside of the eventLoop
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
    return;
    }
    boolean firstRegistration = neverRegistered;
    doRegister();
    neverRegistered = false;
    registered = true;

    // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
    // user may already fire events through the pipeline in the ChannelFutureListener.
    pipeline.invokeHandlerAddedIfNeeded();

    safeSetSuccess(promise);
    pipeline.fireChannelRegistered();
    // Only fire a channelActive if the channel has never been registered. This prevents firing
    // multiple channel actives if the channel is deregistered and re-registered.
    if (isActive()) {
    if (firstRegistration) {
    pipeline.fireChannelActive();
    } else if (config().isAutoRead()) {
    // This channel was registered before and autoRead() is set. This means we need to begin read
    // again so that we process inbound data.
    //
    // See https://github.com/netty/netty/issues/4805
    beginRead();
    }
    }
    } catch (Throwable t) {
    // Close the channel directly to avoid FD leak.
    closeForcibly();
    closeFuture.setClosed();
    safeSetFailure(promise, t);
    }
    }

Selector BUG

  • Selector BUG:若Selector的轮询结果为空,也没有wakeup或新消息处理,则发生空轮询,CPU使用率100%.
  • Netty的解决办法
    • 对Selector的select操作周期进行统计,每完成一次空的select操作进行一次计数,若在某个周期内连续发生N次空轮询,则触发了epoll死循环bug.重建Selector,判断是否是其他线程发起的重建请求,若不是则将原SocketChannel从旧的Selector上去除注册,重新注册到新的Selector上,并将原来的Selector关闭.

one event loop per channel

  • 在Netty中,每个Channel都会被分配到一个EventLoop上,一个EventLoop可以处理多个Channel的事件,但每个Channel只会被一个EventLoop处理.这个机制称为"one event loop per channel".

  • 具体地说,当一个Channel被创建时,会被分配到一个EventLoopGroup中的一个EventLoop上.在后续的事件处理中,该Channel所有的事件都会被分配到同一个EventLoop上处理.在EventLoop内部,事件处理是单线程的,即同一个Channel的所有事件都会在同一个线程上处理.

  • 此外,Netty还提供了ChannelHandlerContext的概念,用于在ChannelHandler之间传递事件和状态.在同一个ChannelPipeline中,每个ChannelHandler都有一个对应的ChannelHandlerContext,通过该上下文,ChannelHandler可以向前或向后传递事件和状态.在传递事件和状态时,Netty会根据事件类型和ChannelHandlerContext的位置,确定下一个处理该事件的ChannelHandler和对应的EventLoop.

  • 综上所述,通过"one event loop per channel"机制和ChannelHandlerContext的传递,Netty可以保证同一个Channel的所有事件都在同一个线程上处理.这种机制可以避免线程切换和同步等开销,提高系统的并发性和性能.同时,也要注意避免在EventLoop中进行耗时的操作,以避免影响其他Channel的事件处理和系统的稳定性.