Netty-Handler&Pipeline
参考文献
ChannelPipline
-
每一个新创建的
Channel
都将会被分配一个新的ChannelPipeline
.这项关联是永久性的;Channel
既不能附加另外一个ChannelPipeline
,也不能分离其当前的.在 Netty 组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预. -
ChannelPipeline
可以看作ChannelHandler
的容器载体,它是由一组ChannelHandler
实例组成的,内部通过双向链表将不同的ChannelHandler
链接在一起.ChannelPipeline
的双向链表分别维护了HeadContext
和TailContext
的头尾节点。我们自定义的ChannelHandler
会插入到Head
和Tail
之间. -
保存
ChannelHandler
的List
,用于处理或拦截Channel
的入站事件和出站操作**,它们的执行顺序是由它们被添加的顺序所决定的**. -
ChannelPipeline
实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及Channel
中各个的ChannelHandler
如何相互交互. -
下图引用
Netty
的Javadoc 4.1
中ChannelPipeline
的说明,描述了ChannelPipeline
中ChannelHandler
通常如何处理I/O
事件.I/O
事件由ChannelInboundHandler
或ChannelOutboundHandler
处理,并通过调用ChannelHandlerContext
中定义的事件传播方法.例如:ChannelHandlerContext .fireChannelRead(Object)
和ChannelOutboundInvoker.write(Object)
转发到其最近的处理程序.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38I/O Request
via Channel or
ChannelHandlerContext
|
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext .OUT_EVT()|
| [ method call] [method call] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) |
+-------------------------------------------------------------------+ -
HeadContext 既是 Inbound 处理器,也是 Outbound 处理器。它分别实现了 ChannelInboundHandler 和 ChannelOutboundHandler。网络数据写入操作的入口就是由 HeadContext 节点完成的。HeadContext 作为 Pipeline 的头结点负责读取数据并开始传递 InBound 事件,当数据处理完成后,数据会反方向经过Outbound 处理器,最终传递到 HeadContext,所以 HeadContext 又是处理 Outbound 事件的最后一站,此外 HeadContext 在传递事件之前,还会执行一些前置操作。
-
TailContext 只实现了 ChannelInboundHandler 接口。它会在 ChannelInboundHandler 调用链路的最后一步执行,主要用于终止 Inbound 事件传播,例如释放 Message 数据资源等。TailContext 节点作为 OutBound 事件传播的第一站,仅仅是将 OutBound 事件传递给上一个节点。
Inbound
入站事件
- 入站事件由自下而上方向的入站处理程序处理
Head-->Tail
Inbound
事件在递归调用流程中何时结束
- 用户自定义的
Handler
没有执行fireChannelRead()
操作,则当前Handler
终止Inbound
事件传播 - 如果用户自定义的
Handler
都执行了fireChannelRead()
操作,Inbound
事件传播最终会在TailContext
节点终止
OutBound
出站事件
- 出站事件由上而下方向处理,出站 Handler 处理程序通常会生成或转换出站传输,例如 write 请求.
Tail-->Head
ChannelHandlerContext
-
一个
Channel
包含了一个ChannelPipeline
,而ChannelPipeline
中又维护了一个由ChannelHandlerContext
组成的双向链表,并且每个ChannelHandlerContext
中又关联着一个ChannelHandler
.- 入站事件和出站事件在一个双向链表中,
- 入站事件会从链表head往后传递到最后一个入站的handler
- 出站事件会从链表tail往前传递到最前一个出站的handler,两种类型的 handler 互不干扰.
- 入站事件和出站事件在一个双向链表中,
-
ChannelHandler
添加到ChannelPipeline
中的过程如下所示:- 一个
ChannelInitializer
的实现被注册到了ServerBootstrap
中. - 当
ChannelInitializer.initChannel()
方法被调用,ChannelInitializer
将在ChannelPipeline
中安装一组自定义的ChannelHandler
; ChannelInitializer
将它自己从ChannelPipeline
中移除.
- 一个
-
在
Netty
中,有两种发送消息的方式.你可以直接写到Channel
中,也可以写到和ChannelHandler
相关联的ChannelHandlerContext
对象中.-
使用
ctx.channel().write(msg)
的方式会导致消息从ChannelPipeline
的尾端开始流动,即从最后一个出站ChannelHandler
向前开始处理.- Channel.writeAndFlush(): 当你调用
Channel
的writeAndFlush
方法时,数据会从整个 ChannelPipeline 的尾部(tail
)开始经过所有的ChannelOutboundHandler
,直到头部(head
).这意味着数据将经过整个出站处理流程,包括所有配置的ChannelOutboundHandler
.
- Channel.writeAndFlush(): 当你调用
-
使用
ctx.write(msg)
的方式则会导致消息从ChannelPipeline
中的下一个ChannelHandler
开始流动,即从当前ChannelHandler
向前找下一个出站ChannelHandler
开始处理消息.这是因为ctx.write(msg)
方法是通过ChannelHandlerContext
对象调用的,该对象表示当前ChannelHandler
在ChannelPipeline
中的位置ctx.write(msg)
后者将导致消息从ChannelPipeline
中的下一个ChannelHandler
开始流动- ChannelHandlerContext.writeAndFlush(): 当你调用
ChannelHandlerContext
的writeAndFlush
方法时,数据将从关联到该ChannelHandlerContext
的 当前ChannelOutboundHandler
开始,沿着 ChannelPipeline 传递到头部(head
).这种方式可以跳过一些出站处理过程,因为数据从当前ChannelHandlerContext
开始传播,而不是从尾部(tail
).
- ChannelHandlerContext.writeAndFlush(): 当你调用
-
修改ChannelPipeline
类型 | 描述 |
---|---|
AddFirst | 将一个ChannelHandler 添加到ChannelPipeline 中 |
addBefore | / |
addAfter | / |
addLast | / |
remove | 将一个ChannelHandler 从ChannelPipeline 中移除 |
replace | 将ChannelPipeline 中的一个ChannelHandler 替换为另一个ChannelHandler |
ChannelHandler
-
它充当了所有处理入站和出站数据的应用程序逻辑的容器.
-
ChannelHandler
是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其ChannelPipeline
(业务处理链)中的下一个处理程序 -
ChannelHandler
本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类ChannelInboundHandler
用于处理入站 I/O 事件.ChannelOutboundHandler
用于处理出站 I/O 操作.
-
或者使用以下适配器类
ChannelInboundHandlerAdapter
用于处理入站 I/O 事件.ChannelOutboundHandlerAdapter
用于处理出站 I/O 操作.ChannelDuplexHandler
用于处理入站和出站事件.
ChannelHandler
的生命周期
类型 | 描述 |
---|---|
handlerAdded |
当把ChannelHandler 添加到ChannelPipeline 中时被调用 |
handlerRemoved |
当从ChannelPipeline 中移除ChannelHandler 时被调用 |
exceptionCaught |
当从ChannelPipeline 中移除ChannelHandler 时被调用 |
ChannelInboundHandler
接口
方法 | 描述 |
---|---|
channelRegistered |
当Channel 已经注册到它的EventLoop 并且能够处理I/O 时被调用 |
channelUnregistered |
当Channel 从它的EventLoop 注销并且无法处理任何I/O 时被调用 |
channelActive |
当Channel 处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪 |
channelInactive |
当Channel 离开活动状态并且不再连接它的远程节点时被调用 |
channelReadComplete |
当Channel 上的一个读操作完成时被调用 |
channelRead |
当从Channel 读取数据时被调用 |
channelWritabilityChanged |
当Channel 的可写状态发生改变时被调用。用户可以确保写操作不会完成得太快(以避免发生 OutOfMemoryError )或者可以在Channel 变为再次可写时恢复写入。可以通过调用Channel 的 isWritable() 方法来检测Channel 的可写性。与可写性相关的阈值可以通过Channel.config() . setWriteHighWaterMark() 和 Channel.config() .setWriteLowWaterMark() 方法来设置 |
userEventTriggered |
当ChannelnboundHandler.fireUserEventTriggered() 方法被调用时被调用,因为一个POJO 被传经了ChannelPipeline |
-
当某个
ChannelInboundHandler
的实现重写channelRead()
方法时,它将负责显式地释放与池化的ByteBuf
实例相关的内存。1
2
3
4
5
6
7
public class DiscardHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
}
} -
使用
SimpleChannelInboundHandler
1
2
3
4
5
6
7
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> {
public void channelRead0(ChannelHandlerContext ctx, Object msg) {
// No need to do anything special
}
}SimpleChannelInboundHandler
会自动释放资源
ChannelOutboundHandler
接口
- 出站操作和数据将由
ChannelOutboundHandler
处理。它的方法将被Channel、ChannelPipeline
以及ChannelHandlerContext
调用。 ChannelOutboundHandler
的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求.
类型 | 描述 |
---|---|
bind(ChannelHandlerContext, SocketAddress,ChannelPromise) |
当请求将 Channel绑定到本地地址时被调用 |
connect(ChannelHandlerContext, SocketAddress,SocketAddress,ChannelPromise) |
当请求将 Channel连接到远程节点时被调用 |
disconnect(ChannelHandlerContext, ChannelPromise) |
当请求将 Channel从远程节点断开时被调用 |
close(ChannelHandlerContext,ChannelPromise) |
当请求关闭 Channel时被调用 |
deregister(ChannelHandlerContext, ChannelPromise) |
当请求将 Channel从它的 EventLoop注销时被调用 |
read(ChannelHandlerContext) |
当请求从 Channel读取更多的数据时被调用 |
flush(ChannelHandlerContext) |
当请求通过 Channel将入队数据冲刷到远程节点时被调用 |
write(ChannelHandlerContext,Object, ChannelPromise) |
当请求通过 Channel将数据写到远程节点时被调用 |
ChannelHandler
适配器
- 使用
ChannelInboundHandlerAdapter
和ChannelOutboundHandlerAdapter
类作为自己的ChannelHandler
的起始点。这两个适配器分别提供了ChannelInboundHandler
和ChannelOutboundHandler
的基本实现。通过扩展抽象类ChannelHandlerAdapter
,它们获得了它们共同的超接口ChannelHandler
的方法 ChannelHandlerAdapter
还提供了实用方法isSharable()
。如果其对应的实现被标注为Sharable
,那么这个方法将返回true
,表示它可以被添加到多个ChannelPipeline
中
异常处理
-
如果用户没有对异常进行拦截处理,最后将由
Tail
节点统一处理.1
2
3
4
5
6
7
8
9
10
11
12
13
14/**
* Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user
* in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}.
*/
protected void onUnhandledInboundException(Throwable cause) {
try {
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.",
cause);
} finally {
ReferenceCountUtil.release(cause);
}
} -
在 Netty 应用开发的过程中,良好的异常处理机制会让排查问题的过程事半功倍。所以推荐用户对异常进行统一拦截,然后根据实际业务场景实现更加完善的异常处理机制。通过异常传播机制的学习,我们应该可以想到最好的方法是在
ChannelPipeline
自定义处理器的末端添加统一的异常处理器. -
当异步事件在 pipeline 传播的过程中发生异常时,异步事件就会停止在 pipeline 中传播。所以我们在日常开发中,需要对写操作异常情况进行处理。
- 其中 inbound 类异步事件发生异常时,会触发exceptionCaught事件传播。exceptionCaught 事件本身也是一种 inbound 事件,传播方向会从当前发生异常的 ChannelHandler 开始一直向后传播直到 TailContext。
- 而 outbound 类异步事件发生异常时,则不会触发exceptionCaught事件传播。一般只是通知相关 ChannelFuture。但如果是 flush 事件在传播过程中发生异常,则会触发当前发生异常的 ChannelHandler 中 exceptionCaught 事件回调。
示例
1 |
|