参考文献

Channel

  • Socket类似

  • Netty 网络通信的组件,能够用于执行网络 I/O 操作.Channel 为用户提供

    • 当前网络连接的通道的状态(例如是否打开?是否已连接?
    • 网络连接的配置参数 (例如接收缓冲区大小)
    • 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成.
    • 调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以 I/O 操作成功、失败或取消时回调通知调用方.
    • 支持关联 I/O 操作与对应的处理程序.
  • 不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应,下面是一些常用的 Channel 类型:

    • NioSocketChannel: 异步的客户端 TCP Socket 连接.
    • NioServerSocketChannel: 异步的服务器端 TCP Socket 连接.
    • NioDatagramChannel: 异步的 UDP 连接.
    • NioSctpChannel: 异步的客户端 Sctp 连接.
    • NioSctpServerChannel: 异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO.

Channel的生命周期

状态 描述
ChannelRegistered Channel已经被注册到了EventLoop
ChannelActive Channel处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了
ChannelInactive Channel没有连接到远程节点
ChannelUnregistered Channel已经被创建,但还未注册到EventLoop

Channel主要方法

  • close()可以用来关闭channel
  • closeFuture()用来处理channel 的关闭
    • sync 方法作用是同步等待 channel 关闭
    • 而 addListener 方法是异步等待 channel 关闭
  • pipeline()方法添加处理器
  • write()方法将数据写入
  • writeAndFlush()方法将数据写入并刷出

EmbeddedChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Slf4j
public class TestEmbeddedChannel {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
};
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
// 模拟入站操作
// channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
// 模拟出站操作
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes()));

}

资源管理

  • Netty提供了ResourceLeakDetector它将对应用程序的缓冲区分配做大约 1%的采样来检测内存泄露。相关的开销是非常小的。

  • 如果检测到了内存泄露,将会产生类似于下面的日志消息:

    1
    2
    LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable advanced leak reporting to find out where the leak occurred. To enable advanced leak reporting, specify the JVM option
    '-Dio.netty.leakDetectionLevel=ADVANCED' or call ResourceLeakDetector.setLevel()
  • Netty目前定义了4种泄漏检测级别

    级别 描述
    DISABLED 禁用泄漏检测。只有在详尽的测试之后才应设置为这个值
    SIMPLE 使用1%的默认采样率检测并报告任何发现的泄露。这是默认级别,适合绝大部分的情况
    ADVANCED 使用默认的采样率,报告所发现的任何的泄露以及对应的消息被访问的位置
    PARANOID 类似于ADVANCED,但是其将会对每次(对消息的)访问都进行采样。这对性能将会有很大的影响,应该只在调试阶段使用
  • 泄露检测级别可以通过将Java 系统属性设置为表中的一个值来定义: java -Dio.netty.leakDetectionLevel=ADVANCED

写大型数据

  • 使用FileRegion传输文件的内容

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // 创建一个FileInputStream
    FileInputStream in = new FileInputStream(file);
    // 以该文件的完整长度创建一个新的DefaultFileRegion
    FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length());
    // 发送该DefaultFileRegion,并注册一个ChannelFutureListener
    channel.writeAndFlush(region).addListener(
    new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
    if (!future.isSuccess()) {
    Throwable cause = future.cause();
    }
    }
    });
    • 这个示例只适用于文件内容的直接传输,不包括应用程序对数据的任何处理。在需要将数据从文件系统复制到用户内存中时,可以使用ChunkedWriteHandler,它支持异步写大型数据流,而又不会导致大量的内存消耗.
  • ChunkedInput的实现

    名称 描述
    ChunkedFile 从文件中逐块获取数据,当平台不支持零拷贝或者你需要转换数据时使用
    ChunkedNioFile ChunkedFile类似,只是它使用了FileChannel
    ChunkedStream InputStream中逐块传输内容
    ChunkedNioStream ReadableByteChannel中逐块传输内容
  • 使用ChunkedStream传输文件内容

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> { 
    private final File file;
    private final SslContext sslCtx;

    public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {
    this.file = file;
    this.sslCtx = sslCtx;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast(new SslHandler(sslCtx.newEngine(ch.alloc());
    pipeline.addLast(new ChunkedWriteHandler());
    pipeline.addLast(new WriteStreamHandler());
    }

    public final class WriteStreamHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    super.channelActive(ctx);
    ctx.writeAndFlush(
    new ChunkedStream(new FileInputStream(file)));
    }
    }
    }

处理WebSocket

帧类型 描述
BinaryWebSocketFrame 包含了二进制数据
TextWebSocketFrame 包含了文本数据
ContinuationWebSocketFrame 包含属于上一个BinaryWebSocketFrameTextWebSocketFrame的文本数据或者二进制数据
CloseWebSocketFrame 表示一个CLOSE请求,包含一个关闭的状态码和关闭的原因
PingWebSocketFrame 请求传输一个PongWebSocketFrame
PongWebSocketFrame 作为一个对于PingWebSocketFrame的响应被发送

如何进行加密

ChannelPipeline添加加密

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class SecureChatServerInitializer extends ChatServerInitializer {
private final SslContext context;
public SecureChatServerInitializer(ChannelGroup group,SslContext context) {
super(group);
this.context = context;
}

@Override
protected void initChannel(Channel ch) throws Exception {
super.initChannel(ch);
SSLEng.ine engine = context.newEngine(ch.alloc());
engine.setUseClientMode(false);
ch.pipeline().addFirst(new SslHandler(engine));
}
}

ChatServer添加加密

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class SecureChatServer extends ChatServer { 
private final SslContext context;
public SecureChatServer(SslContext context) {
this.context = context;
}
@Override
protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
return new SecureChatServerInitializer(group, context);
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Please give port as argument");
System.exit(1);
}
int port = Integer.parseInt(args[0]);
// 核心代码开始
SelfSignedCertificate cert = new SelfSignedCertificate();
SslContext context = SslContext.newServerContext(
cert.certificate(), cert.privateKey());
// 核心代码结束
final SecureChatServer endpoint = new SecureChatServer(context);
ChannelFuture future = endpoint.start(new InetSocketAddress(port));
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
endpoint.destroy();
}
});
future.channel().closeFuture().syncUninterruptibly();
}
}

设置参数

1
2
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.SO_BACKLOG, 1024);
  • ServerBootstrap设置Channel属性有optionchildOption两个方法,option主要负责设置Boss线程组,而childOption对应的事Worker线程组
参数 含义
SO_KEEPALIVE 设置为true代表启动了TCP SO_KEEPALIVE属性,TCP会主动探测连接状态,即连接保活
SO_BACKLOG 已完成三次握手的请求队列最大长度,同一时刻服务端可能会处理多个连接,在高并发海量连接的场景下,该参数应适当调大
TCP_NODELAY 默认为true,表示立即发送数据.如果设置为false表示启用Nagle算法,该算法会将TCP网络数据包累计到一定量才会发送,虽然可以减少报文发送的数量,但是会造成一定的数据延迟.Netty为了最小化数据传输的延迟,默认禁用了Nagle算法
SO_SNDBUF TCP数据发送缓冲区的大小
SO_RCVBUF TCP数据接收缓冲区的大小
SO_LINGER 设置延迟关闭的时间,等待缓冲区中的数据发送完成
CONNECT_TIMEOUT_MILLIS 建立连接的超时时间