参考文献

ChannelPipline

  • 每一个新创建的Channel都将会被分配一个新的ChannelPipeline.这项关联是永久性的;Channel 既不能附加另外一个ChannelPipeline,也不能分离其当前的.在 Netty 组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预.

  • ChannelPipeline可以看作ChannelHandler的容器载体,它是由一组ChannelHandler实例组成的,内部通过双向链表将不同的ChannelHandler链接在一起.ChannelPipeline 的双向链表分别维护了HeadContextTailContext 的头尾节点。我们自定义的ChannelHandler会插入到HeadTail 之间.

  • 保存 ChannelHandlerList,用于处理或拦截Channel的入站事件和出站操作**,它们的执行顺序是由它们被添加的顺序所决定的**.

  • ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel中各个的 ChannelHandler 如何相互交互.

  • 下图引用NettyJavadoc 4.1ChannelPipeline 的说明,描述了 ChannelPipeline ChannelHandler 通常如何处理I/O事件.

    • I/O事件由 ChannelInboundHandlerChannelOutboundHandler 处理,并通过调用 ChannelHandlerContext 中定义的事件传播方法.例如:ChannelHandlerContext .fireChannelRead(Object)ChannelOutboundInvoker.write(Object)转发到其最近的处理程序.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    I/O Request
    via Channel or
    ChannelHandlerContext
    |
    +---------------------------------------------------+---------------+
    | ChannelPipeline | |
    | \|/ |
    | +---------------------+ +-----------+----------+ |
    | | Inbound Handler N | | Outbound Handler 1 | |
    | +----------+----------+ +-----------+----------+ |
    | /|\ | |
    | | \|/ |
    | +----------+----------+ +-----------+----------+ |
    | | Inbound Handler N-1 | | Outbound Handler 2 | |
    | +----------+----------+ +-----------+----------+ |
    | /|\ . |
    | . . |
    | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext .OUT_EVT()|
    | [ method call] [method call] |
    | . . |
    | . \|/ |
    | +----------+----------+ +-----------+----------+ |
    | | Inbound Handler 2 | | Outbound Handler M-1 | |
    | +----------+----------+ +-----------+----------+ |
    | /|\ | |
    | | \|/ |
    | +----------+----------+ +-----------+----------+ |
    | | Inbound Handler 1 | | Outbound Handler M | |
    | +----------+----------+ +-----------+----------+ |
    | /|\ | |
    +---------------+-----------------------------------+---------------+
    | \|/
    +---------------+-----------------------------------+---------------+
    | | | |
    | [ Socket.read() ] [ Socket.write() ] |
    | |
    | Netty Internal I/O Threads (Transport Implementation) |
    +-------------------------------------------------------------------+
  • HeadContext 既是 Inbound 处理器,也是 Outbound 处理器。它分别实现了 ChannelInboundHandler 和 ChannelOutboundHandler。网络数据写入操作的入口就是由 HeadContext 节点完成的。HeadContext 作为 Pipeline 的头结点负责读取数据并开始传递 InBound 事件,当数据处理完成后,数据会反方向经过Outbound 处理器,最终传递到 HeadContext,所以 HeadContext 又是处理 Outbound 事件的最后一站,此外 HeadContext 在传递事件之前,还会执行一些前置操作。

  • TailContext 只实现了 ChannelInboundHandler 接口。它会在 ChannelInboundHandler 调用链路的最后一步执行,主要用于终止 Inbound 事件传播,例如释放 Message 数据资源等。TailContext 节点作为 OutBound 事件传播的第一站,仅仅是将 OutBound 事件传递给上一个节点。

Inbound入站事件

  • 入站事件由自下而上方向的入站处理程序处理
    • Head-->Tail

Inbound事件在递归调用流程中何时结束

  • 用户自定义的Handler没有执行fireChannelRead()操作,则当前Handler终止Inbound事件传播
  • 如果用户自定义的Handler都执行了fireChannelRead()操作,Inbound事件传播最终会在TailContext节点终止

OutBound出站事件

  • 出站事件由上而下方向处理,出站 Handler 处理程序通常会生成或转换出站传输,例如 write 请求.
    • Tail-->Head

ChannelHandlerContext

  • 一个Channel包含了一个 ChannelPipeline ,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个ChannelHandler.

    • 入站事件和出站事件在一个双向链表中,
      • 入站事件会从链表head往后传递到最后一个入站的handler
      • 出站事件会从链表tail往前传递到最前一个出站的handler,两种类型的 handler 互不干扰.

img

  • ChannelHandler添加到ChannelPipeline中的过程如下所示:

    • 一个ChannelInitializer的实现被注册到了ServerBootstrap中.
    • ChannelInitializer.initChannel()方法被调用,ChannelInitializer将在ChannelPipeline中安装一组自定义的ChannelHandler;
    • ChannelInitializer将它自己从ChannelPipeline中移除.
  • Netty中,有两种发送消息的方式.你可以直接写到Channel中,也可以写到和ChannelHandler相关联的 ChannelHandlerContext对象中.

    • 使用 ctx.channel().write(msg) 的方式会导致消息从ChannelPipeline的尾端开始流动,即从最后一个出站ChannelHandler向前开始处理.

      • Channel.writeAndFlush(): 当你调用 ChannelwriteAndFlush 方法时,数据会从整个 ChannelPipeline 的尾部(tail)开始经过所有的 ChannelOutboundHandler,直到头部(head).这意味着数据将经过整个出站处理流程,包括所有配置的 ChannelOutboundHandler.
    • 使用 ctx.write(msg) 的方式则会导致消息从ChannelPipeline中的下一个ChannelHandler 开始流动,即从当前ChannelHandler向前找下一个出站ChannelHandler 开始处理消息.这是因为 ctx.write(msg) 方法是通过ChannelHandlerContext对象调用的,该对象表示当前ChannelHandlerChannelPipeline中的位置ctx.write(msg)后者将导致消息从ChannelPipeline中的下一个ChannelHandler开始流动

      • ChannelHandlerContext.writeAndFlush(): 当你调用 ChannelHandlerContextwriteAndFlush 方法时,数据将从关联到该 ChannelHandlerContext当前 ChannelOutboundHandler 开始,沿着 ChannelPipeline 传递到头部(head).这种方式可以跳过一些出站处理过程,因为数据从当前 ChannelHandlerContext 开始传播,而不是从尾部(tail).

    img

修改ChannelPipeline

类型 描述
AddFirst 将一个ChannelHandler添加到ChannelPipeline
addBefore /
addAfter /
addLast /
remove 将一个ChannelHandlerChannelPipeline中移除
replace ChannelPipeline中的一个ChannelHandler替换为另一个ChannelHandler

ChannelHandler

  • 它充当了所有处理入站和出站数据的应用程序逻辑的容器.

  • ChannelHandler是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline (业务处理链)中的下一个处理程序

  • ChannelHandler本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类

    • ChannelInboundHandler用于处理入站 I/O 事件.
    • ChannelOutboundHandler用于处理出站 I/O 操作.
  • 或者使用以下适配器类

    • ChannelInboundHandlerAdapter用于处理入站 I/O 事件.
    • ChannelOutboundHandlerAdapter 用于处理出站 I/O 操作.
    • ChannelDuplexHandler 用于处理入站和出站事件.

ChannelHandler的生命周期

类型 描述
handlerAdded 当把ChannelHandler添加到ChannelPipeline中时被调用
handlerRemoved 当从ChannelPipeline中移除ChannelHandler时被调用
exceptionCaught 当从ChannelPipeline中移除ChannelHandler时被调用

ChannelInboundHandler接口

方法 描述
channelRegistered Channel已经注册到它的EventLoop并且能够处理I/O时被调用
channelUnregistered Channel从它的EventLoop注销并且无法处理任何I/O时被调用
channelActive Channel处于活动状态时被调用;Channel已经连接/绑定并且已经就绪
channelInactive Channel离开活动状态并且不再连接它的远程节点时被调用
channelReadComplete Channel上的一个读操作完成时被调用
channelRead 当从Channel读取数据时被调用
channelWritabilityChanged Channel的可写状态发生改变时被调用。用户可以确保写操作不会完成得太快(以避免发生 OutOfMemoryError)或者可以在Channel变为再次可写时恢复写入。可以通过调用ChannelisWritable()方法来检测Channel的可写性。与可写性相关的阈值可以通过Channel.config(). setWriteHighWaterMark()Channel.config().setWriteLowWaterMark()方法来设置
userEventTriggered ChannelnboundHandler.fireUserEventTriggered()方法被调用时被调用,因为一个POJO被传经了ChannelPipeline
  • 当某个ChannelInboundHandler的实现重写channelRead()方法时,它将负责显式地释放与池化的ByteBuf实例相关的内存。

    1
    2
    3
    4
    5
    6
    7
    @Sharable
    public class DiscardHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ReferenceCountUtil.release(msg);
    }
    }
  • 使用SimpleChannelInboundHandler

    1
    2
    3
    4
    5
    6
    7
    @Sharable
    public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) {
    // No need to do anything special
    }
    }
    • SimpleChannelInboundHandler会自动释放资源

ChannelOutboundHandler接口

  • 出站操作和数据将由ChannelOutboundHandler处理。它的方法将被 Channel、ChannelPipeline以及ChannelHandlerContext调用。
  • ChannelOutboundHandler的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求.
类型 描述
bind(ChannelHandlerContext, SocketAddress,ChannelPromise) 当请求将 Channel绑定到本地地址时被调用
connect(ChannelHandlerContext, SocketAddress,SocketAddress,ChannelPromise) 当请求将 Channel连接到远程节点时被调用
disconnect(ChannelHandlerContext, ChannelPromise) 当请求将 Channel从远程节点断开时被调用
close(ChannelHandlerContext,ChannelPromise) 当请求关闭 Channel时被调用
deregister(ChannelHandlerContext, ChannelPromise) 当请求将 Channel从它的 EventLoop注销时被调用
read(ChannelHandlerContext) 当请求从 Channel读取更多的数据时被调用
flush(ChannelHandlerContext) 当请求通过 Channel将入队数据冲刷到远程节点时被调用
write(ChannelHandlerContext,Object, ChannelPromise) 当请求通过 Channel将数据写到远程节点时被调用

ChannelHandler适配器

  • 使用ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter类作为自己的ChannelHandler的起始点。这两个适配器分别提供了ChannelInboundHandlerChannelOutboundHandler的基本实现。通过扩展抽象类ChannelHandlerAdapter,它们获得了它们共同的超接口ChannelHandler的方法
  • ChannelHandlerAdapter还提供了实用方法isSharable()。如果其对应的实现被标注为Sharable,那么这个方法将返回true,表示它可以被添加到多个ChannelPipeline

异常处理

  • 如果用户没有对异常进行拦截处理,最后将由Tail节点统一处理.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    /**
    * Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user
    * in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}.
    */
    protected void onUnhandledInboundException(Throwable cause) {
    try {
    logger.warn(
    "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
    "It usually means the last handler in the pipeline did not handle the exception.",
    cause);
    } finally {
    ReferenceCountUtil.release(cause);
    }
    }
  • 在 Netty 应用开发的过程中,良好的异常处理机制会让排查问题的过程事半功倍。所以推荐用户对异常进行统一拦截,然后根据实际业务场景实现更加完善的异常处理机制。通过异常传播机制的学习,我们应该可以想到最好的方法是在ChannelPipeline自定义处理器的末端添加统一的异常处理器.

  • 当异步事件在 pipeline 传播的过程中发生异常时,异步事件就会停止在 pipeline 中传播。所以我们在日常开发中,需要对写操作异常情况进行处理。

    • 其中 inbound 类异步事件发生异常时,会触发exceptionCaught事件传播。exceptionCaught 事件本身也是一种 inbound 事件,传播方向会从当前发生异常的 ChannelHandler 开始一直向后传播直到 TailContext。
    • 而 outbound 类异步事件发生异常时,则不会触发exceptionCaught事件传播。一般只是通知相关 ChannelFuture。但如果是 flush 事件在传播过程中发生异常,则会触发当前发生异常的 ChannelHandler 中 exceptionCaught 事件回调。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Slf4j
public class TestPipeline {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 1. 通过 channel 拿到 pipeline
ChannelPipeline pipeline = ch.pipeline();
// 2. 添加处理器 head -> h1 -> h2 -> h4 -> h3 -> h5 -> h6 -> tail
pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
});
pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {
log.debug("2");
super.channelRead(ctx, name); // 将数据传递给下个 handler,如果不调用,调用链会断开 或者调用 ctx.fireChannelRead(student);
}
});

pipeline.addLast("h3", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("3");
ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
// ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
}
});
pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h5", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("5");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h6", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("6");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
@Data
@AllArgsConstructor
static class Student {
private String name;
}
}