参考文献

Channel、EventLoopEventLoopGroup

  • 一个EventLoopGroup包含一个或者多个EventLoop;
  • 一个EventLoop在它的生命周期内只和一个Thread绑定;
  • 所有由EventLoop处理的I/O 事件都将在它专有的Thread上被处理;
  • 一个Channel在它的生命周期内只注册于一个EventLoop;
  • 一个EventLoop可能会被分配给一个或多个Channel.

EventLoop事件循环对象

  • EventLoop是一种事件等待和处理的程序模型,可以解决多线程资源消耗高的问题.
  • EventLoop通用运行模式为,当事件发生时,应用程序都会将产生的事件放入事件队列中,然后EventLoop会轮询从队列中取出事件执行或者将事件分发给相应的事件监听者执行.事件执行的方式通常分为立即执行,延后执行,定期执行.

img

  • EventLoop本质是一个单线程执行器(同时维护了一个Selector),里面有run方法处理Channel上源源不断的 I/O事件.

  • 它的继承关系比较复杂

    • 一条线是继承自j.u.c.ScheduledExecutorService因此包含了线程池中所有的方法
    • 另一条线是继承自Netty自己的OrderedEventExecutor
      • 提供了boolean inEventLoop(Thread thread)方法判断一个线程是否属于此EventLoop
      • 提供了parent方法来看看自己属于哪个EventLoopGroup

EventLoopGroup事件循环组

  • EventLoopGroup是一组EventLoop,Channel一般会调用EventLoopGroupregister方法来绑定其中一个 EventLoop,后续这个Channel上的 I/O 事件都由此EventLoop 来处理(保证了 I/O 事件处理时的线程安全)
  • 继承自Netty自己的EventExecutorGroup
    • 实现了Iterable接口提供遍历EventLoop的能力
    • 另有next方法获取集合中下一个EventLoop

EventLoopGroup的创建方式

1
2
3
4
EventLoopGroup group = new DefaultEventLoopGroup();
EventLoopGroup group = new NioEventLoopGroup();
EventLoopGroup group = new EpollEventLoopGroup();
EventLoopGroup group = new KqueueEventLoopGroup();
实现 使用场景 作用 注意点
DefaultEventLoopGroup 适用于不需要高并发性能的场景,例如处理定时任务等 是一个单线程的 EventLoopGroup 实现 由于是单线程的,因此并发性能较差,不适用于高并发场景
NioEventLoopGroup 适用于大部分场景,特别是在处理大量并发连接的情况下 使用 Java NIO 实现的 EventLoopGroup 需要根据实际情况调整线程数量,一般建议线程数量为 CPU 核心数的两倍
EpollEventLoopGroup 适用于 Linux 平台上的高并发场景 使用 Linux epoll 实现的 EventLoopGroup 需要在 Linux 平台上运行,否则无法正常工作
KqueueEventLoopGroup 适用于 BSD 平台上的高并发场景 使用 BSD kqueue 实现的 EventLoopGroup 需要在 BSD 平台上运行,否则无法正常工作
  • NioEventLoopGroup的事件处理机制采用的是无锁串行化的设计思路.

NioEventLoop处理普通任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 创建一个 NioEventLoopGroup 实例
EventLoopGroup group = new NioEventLoopGroup();

// 获取一个 NioEventLoop 实例
NioEventLoop loop = (NioEventLoop) group.next();

// 提交一个普通任务
loop.execute(() -> {
// 任务逻辑
System.out.println("Hello, world!");
});

// 关闭 NioEventLoopGroup
group.shutdownGracefully().sync();

NioEventLoop处理定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 创建一个 NioEventLoopGroup 实例
EventLoopGroup group = new NioEventLoopGroup();

// 获取一个 NioEventLoop 实例
NioEventLoop loop = (NioEventLoop) group.next();

// 提交一个定时任务
ScheduledFuture<?> future = loop.schedule(() -> {
// 任务逻辑
System.out.println("Hello, world!");
}, 1, TimeUnit.SECONDS);

// 在任务执行完毕后关闭 NioEventLoopGroup
future.addListener(f -> group.shutdownGracefully());

// 等待任务执行完毕
future.sync();

NioEventLoop源码分析

创建NioEventLoop

  • NioEventLoop是由NioEventLoopGroup来创建的

  • 调用链

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // 先调用NioEventLoopGroup的无参构造函数
    io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup()
    -->io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)
    -->io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)
    -->io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.ThreadFactory, java.nio.channels.spi.SelectorProvider)
    -->io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.ThreadFactory, java.nio.channels.spi.SelectorProvider, io.netty.channel.SelectStrategyFactory)

    // 最终调用父类MultithreadEventLoopGroup的构造函数
    io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.ThreadFactory, java.lang.Object...)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static final int DEFAULT_EVENT_LOOP_THREADS;

static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param chooserFactory the {@link EventExecutorChooserFactory} to use.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
checkPositive(nThreads, "nThreads");

if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 根据传入的线程数创建子线程数组
children = new EventExecutor[nThreads];

for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 真正创建子线程
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}

for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 创建子线程选择器
chooser = chooserFactory.newChooser(children);

final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};

for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}

Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
1
2
3
4
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new DefaultEventLoop(this, executor);
}
  • 创建DefaultEventLoop调用链

    1
    2
    3
    4
    io.netty.channel.DefaultEventLoop#DefaultEventLoop(io.netty.channel.EventLoopGroup, java.util.concurrent.Executor)
    -->io.netty.channel.SingleThreadEventLoop#SingleThreadEventLoop(io.netty.channel.EventLoopGroup, java.util.concurrent.Executor, boolean)
    -->io.netty.channel.SingleThreadEventLoop#SingleThreadEventLoop(io.netty.channel.EventLoopGroup, java.util.concurrent.Executor, boolean, int, io.netty.util.concurrent.RejectedExecutionHandler)
    -->io.netty.util.concurrent.SingleThreadEventExecutor#SingleThreadEventExecutor(io.netty.util.concurrent.EventExecutorGroup, java.util.concurrent.Executor, boolean, int, io.netty.util.concurrent.RejectedExecutionHandler)

Selector的创建

1
2
3
4
5
6
7
8
9
10
11
12
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
// 创建Selector
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
  • NioEventLoop中有两个Selector

    1
    2
    3
    4
    5
    /**
    * The NIO {@link Selector}.
    */
    private Selector selector;
    private Selector unwrappedSelector;
    • selector: 为Netty优化过的的Selector

    • unwrappedSelector: 为Java NIO原生的Selector

    • 两者的区别在于:selector在获取SelectedSelectionKey时采用数组的方式来获取的,可以避免遍历集合,从而提高了性能.

      1
      2
      3
      4
      5
      6
      final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

      SelectionKey[] keys;
      int size;
      // ...略
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      public abstract class SelectorImpl
      extends AbstractSelector
      {
      // 表示当前注册在该选择器上的所有SelectionKey对象的集合.
      // 当注册一个通道到该选择器上时,将会创建一个SelectionKey对象,并添加到keys集合中.
      // 当通道关闭或取消注册时,对应的SelectionKey对象将从keys集合中删除
      // The set of keys registered with this Selector
      private final Set<SelectionKey> keys;

      // 表示当前有数据可以读取或写入的SelectionKey对象的集合.
      // 当调用Selector.select()方法时,选择器会检查注册在该选择器上的所有通道,将所有有数据可以读取或写入的通道对应的SelectionKey对象添加到selectedKeys集合中.
      // 在处理完selectedKeys集合中的所有SelectionKey对象后,应该手动清空该集合
      // The set of keys with data ready for an operation
      private final Set<SelectionKey> selectedKeys;

      // 表示keys集合的不可变视图.这个视图不允许添加或删除SelectionKey对象,只能获取keys集合中的元素
      // Public views of the key sets
      private final Set<SelectionKey> publicKeys; // Immutable
      // 表示selectedKeys集合的不可变视图.这个视图不允许添加或删除SelectionKey对象,只能获取selectedKeys集合中的元素
      private final Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition
      }
  • 具体实现

    1
    2
    3
    4
    5
    6
    7
    private void processSelectedKeys() {
    if (selectedKeys != null) {
    processSelectedKeysOptimized();
    } else {
    processSelectedKeysPlain(selector.selectedKeys());
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
    final SelectionKey k = selectedKeys.keys[i];
    // null out entry in the array to allow to have it GC'ed once the Channel close
    // See https://github.com/netty/netty/issues/2363
    selectedKeys.keys[i] = null;

    final Object a = k.attachment();

    if (a instanceof AbstractNioChannel) {
    processSelectedKey(k, (AbstractNioChannel) a);
    } else {
    @SuppressWarnings("unchecked")
    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
    processSelectedKey(k, task);
    }

    if (needsToSelectAgain) {
    // null out entries in the array to allow to have it GC'ed once the Channel close
    // See https://github.com/netty/netty/issues/2363
    selectedKeys.reset(i + 1);

    selectAgain();
    i = -1;
    }
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    // check if the set is empty and if so just return to not create garbage by
    // creating a new Iterator every time even if there is nothing to process.
    // See https://github.com/netty/netty/issues/597
    if (selectedKeys.isEmpty()) {
    return;
    }

    Iterator<SelectionKey> i = selectedKeys.iterator();
    for (;;) {
    final SelectionKey k = i.next();
    final Object a = k.attachment();
    i.remove();

    if (a instanceof AbstractNioChannel) {
    processSelectedKey(k, (AbstractNioChannel) a);
    } else {
    @SuppressWarnings("unchecked")
    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
    processSelectedKey(k, task);
    }

    if (!i.hasNext()) {
    break;
    }

    if (needsToSelectAgain) {
    selectAgain();
    selectedKeys = selector.selectedKeys();

    // Create the iterator again to avoid ConcurrentModificationException
    if (selectedKeys.isEmpty()) {
    break;
    } else {
    i = selectedKeys.iterator();
    }
    }
    }
    }

执行任务

  • 入口io.netty.util.concurrent.SingleThreadEventExecutor#execute
1
2
3
4
5
6
7
8
@Override
public void execute(Runnable task) {
execute0(task);
}
private void execute0(@Schedule Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
// 添加任务,其中队列使用了 jctools 提供的 mpsc 无锁队列
addTask(task);
if (!inEventLoop) {
// inEventLoop 如果为 false 表示由其它线程来调用 execute,即首次调用,
// 这时需要向 eventLoop 提交首个任务,启动死循环,会执行到下面的 doStartThread
startThread();
if (isShutdown()) {
// 如果已经 shutdown,做拒绝逻辑
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}

if (!addTaskWakesUp && immediate) {
// 如果线程由于 IO select 阻塞了,添加的任务的线程需要负责唤醒 NioEventLoop 线程
wakeup(inEventLoop);
}
}

启动线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
// 将线程池的当前线程保存在成员变量中,以便后续使用
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}

boolean success = false;
updateLastExecutionTime();
try {
// 调用外部类 SingleThreadEventExecutor 的 run 方法,进入死循环,
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 清理工作,代码略...
}
}
});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
// calculateStrategy 的逻辑如下:
// 有任务,会执行一次 selectNow,清除上一次的 wakeup 结果,无论有没有 IO 事件,都会跳过 switch
// 没有任务,会匹配 SelectStrategy.SELECT,看是否应当阻塞
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
// 重试IO循环
continue;

case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO

case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}

selectCnt++;
// 已取消注册的channel个数
cancelledKeys = 0;
// 是否需要再次select
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// 记录 io 事件处理耗时
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
// 运行非 IO 任务,一旦超时会退出 runAllTasks
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}

if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
1
2
3
4
5
6
7
8
9
@Override
protected void wakeup(boolean inEventLoop) {
// wakeup 方法的效果
// * 由非 EventLoop 线程调用,会唤醒当前在执行 select 阻塞的 EventLoop 线程
// * 由 EventLoop 自己调用,会本次的 wakeup 会取消下一次的 select 操作
if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
selector.wakeup();
}
}

处理事件

1
2
3
4
5
6
7
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;

final Object a = k.attachment();

if (a instanceof AbstractNioChannel) {
// 入口
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);

selectAgain();
i = -1;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}

Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();

if (a instanceof AbstractNioChannel) {
// 入口
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

if (!i.hasNext()) {
break;
}

if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();

// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 当 key 取消或关闭时会导致这个 key 无效
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}

try {
int readyOps = k.readyOps();
// 处理连接事件
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}

// 处理可写事件
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
unsafe.forceFlush();
}

// 处理可读和可接入事件
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 如果是可接入 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
// 如果是可读 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

OP_ACCEPT事件处理流程

  • 大致流程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    //1 阻塞直到事件发生
    selector.select();

    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
    while (iter.hasNext()) {
    //2 拿到一个事件
    SelectionKey key = iter.next();

    //3 如果是 accept 事件
    if (key.isAcceptable()) {

    //4 执行 accept
    SocketChannel channel = serverSocketChannel.accept();
    channel.configureBlocking(false);

    //5 关注 read 事件
    channel.register(selector, SelectionKey.OP_READ);
    }
    // ...
    }
  • 具体源码:io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
        @Override
    public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);

    boolean closed = false;
    Throwable exception = null;
    try {
    try {
    do {
    // doReadMessages 中执行了 accept 并创建 NioSocketChannel 作为消息放入 readBuf
    // readBuf 是一个 ArrayList 用来缓存消息
    int localRead = doReadMessages(readBuf);
    if (localRead == 0) {
    break;
    }
    if (localRead < 0) {
    closed = true;
    break;
    }
    // localRead 为 1,就一条消息,即接收一个客户端连接
    allocHandle.incMessagesRead(localRead);
    } while (continueReading(allocHandle));
    } catch (Throwable t) {
    exception = t;
    }

    int size = readBuf.size();
    for (int i = 0; i < size; i ++) {
    readPending = false;
    // 触发 read 事件,让 pipeline 上的 handler 处理,这时是处理
    // io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
    pipeline.fireChannelRead(readBuf.get(i));
    }
    readBuf.clear();
    allocHandle.readComplete();
    pipeline.fireChannelReadComplete();

    if (exception != null) {
    closed = closeOnReadError(exception);

    pipeline.fireExceptionCaught(exception);
    }

    if (closed) {
    inputShutdown = true;
    if (isOpen()) {
    close(voidPromise());
    }
    }
    } finally {
    // Check if there is a readPending which was not processed yet.
    // This could be for two reasons:
    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
    //
    // See https://github.com/netty/netty/issues/2254
    if (!readPending && !config.isAutoRead()) {
    removeReadOp();
    }
    }
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
    if (ch != null) {
    buf.add(new NioSocketChannel(this, ch));
    return 1;
    }
    } catch (Throwable t) {
    logger.warn("Failed to create a new channel from an accepted socket.", t);

    try {
    ch.close();
    } catch (Throwable t2) {
    logger.warn("Failed to close a socket.", t2);
    }
    }

    return 0;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    // io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
    @Override
    @SuppressWarnings("unchecked")
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // 这时的 msg 是 NioSocketChannel
    final Channel child = (Channel) msg;

    // NioSocketChannel 添加 childHandler 即初始化器
    child.pipeline().addLast(childHandler);

    // 设置选项
    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
    // 注册 NioSocketChannel 到 nio worker 线程,接下来的处理也移交至 nio worker 线程
    childGroup.register(child).addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
    if (!future.isSuccess()) {
    forceClose(child, future.cause());
    }
    }
    });
    } catch (Throwable t) {
    forceClose(child, t);
    }
    }
  • 最终走到io.netty.channel.nio.AbstractNioChannel#doBeginRead

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    @Override
    protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
    return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    // 处理accept事件时:
    // readInterestOp 取值是 16,在 NioServerSocketChannel 创建时初始化好,代表关注 accept 事件
    // 处理read事件时:
    // readInterestOp 取值是 1,在 NioSocketChannel 创建时初始化好,代表关注 read 事件
    if ((interestOps & readInterestOp) == 0) {
    selectionKey.interestOps(interestOps | readInterestOp);
    }
    }

OP_READ事件处理流程

  • 入口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
       @Override
    public final void read() {
    final ChannelConfig config = config();
    if (shouldBreakReadReady(config)) {
    clearReadPending();
    return;
    }
    final ChannelPipeline pipeline = pipeline();
    // io.netty.allocator.type 决定 allocator 的实现
    final ByteBufAllocator allocator = config.getAllocator();
    // 用来分配 byteBuf,确定单次读取大小
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
    do {
    byteBuf = allocHandle.allocate(allocator);
    // 读取数据
    allocHandle.lastBytesRead(doReadBytes(byteBuf));
    if (allocHandle.lastBytesRead() <= 0) {
    // nothing was read. release the buffer.
    byteBuf.release();
    byteBuf = null;
    close = allocHandle.lastBytesRead() < 0;
    if (close) {
    // There is nothing left to read as we received an EOF.
    readPending = false;
    }
    break;
    }

    allocHandle.incMessagesRead(1);
    readPending = false;
    // 触发 read 事件,让 pipeline 上的 handler 处理,这时是处理 NioSocketChannel 上的 handler
    pipeline.fireChannelRead(byteBuf);
    byteBuf = null;
    // 是否要继续循环
    } while (allocHandle.continueReading());

    allocHandle.readComplete();
    // 数据读取完毕,触发 read complete 事件
    pipeline.fireChannelReadComplete();

    if (close) {
    closeOnRead(pipeline);
    }
    } catch (Throwable t) {
    handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
    // Check if there is a readPending which was not processed yet.
    // This could be for two reasons:
    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
    //
    // See https://github.com/netty/netty/issues/2254
    if (!readPending && !config.isAutoRead()) {
    removeReadOp();
    }
    }
    }
    }
  • 判断是否需要继续读取

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
// 一般为true
return config.isAutoRead() &&
// respectMaybeMoreData 默认为 true
// maybeMoreDataSupplier 的逻辑是如果预期读取字节与实际读取字节相等,返回 true
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
// 小于最大次数,maxMessagePerRead 默认 16
totalMessages < maxMessagePerRead &&
// 实际读到了数据
(ignoreBytesRead || totalBytesRead > 0);
}

writeAndFlush源码流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
io.netty.channel.ChannelOutboundInvoker#writeAndFlush(java.lang.Object)
io.netty.channel.AbstractChannel#writeAndFlush(java.lang.Object)
io.netty.channel.DefaultChannelPipeline#writeAndFlush(java.lang.Object)
io.netty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object)
io.netty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object, io.netty.channel.ChannelPromise)
io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)
io.netty.channel.AbstractChannelHandlerContext#invokeWriteAndFlush
io.netty.channel.AbstractChannelHandlerContext#invokeWrite0
io.netty.channel.ChannelOutboundHandler#write
... // 经过一些类的outboundHandler
io.netty.channel.DefaultChannelPipeline.HeadContext#write
// NioSocketChannel$NioSocketChannelUnsafe
io.netty.channel.AbstractChannel.AbstractUnsafe#write
// NioSocketChannel#filterOutboundMessage
io.netty.channel.nio.AbstractNioByteChannel#filterOutboundMessage
// 将msg添加到缓冲区
io.netty.channel.ChannelOutboundBuffer#addMessage
io.netty.channel.AbstractChannelHandlerContext#invokeFlush0
// 逐个调用handler的flush
...
io.netty.channel.DefaultChannelPipeline.HeadContext#flush
io.netty.channel.AbstractChannel.AbstractUnsafe#flush
io.netty.channel.ChannelOutboundBuffer#addFlush
io.netty.channel.AbstractChannel.AbstractUnsafe#flush0
  • 逐个调用handlerwrite()

  • 逐个调用handlerflush()

  • write

    • ByteBuf转换成DirctBuffer
    • 将消息(DirctBuffer)封装进entry插入写队列
    • 设置写状态
  • flush
  • 刷新标志,设置写状态
  • 变量buffer队列,过滤Buffer
  • 调用JDK底层的API,把ByteBuf写入JDK原生的ByteBuffer

EventLoop最佳实践

  • 网络连接建立过程中三次握手,安全认证的过程消耗不少时间.建立采用BoosWorker两个EventLoopGroup,有助于分担Reactor线程的压力.
  • 由于Reactor线程模型适合处理耗时短的任务场景,对于耗时较长的ChannelHandler可以考虑维护一个业务线程池,将编解码后的数据封装成Task进行异步处理,避免ChannelHandler阻塞而造成EventLoop不可用.
  • 如果业务逻辑执行时间较短,建议直接在ChannelHandler中执行.例如编解码操作,这样可以避免过度设计而造成架构的复杂性.
  • 不宜设计过多的ChannelHandler.对于系统性能和可维护性都会存在问题,在设计业务架构的时候,需要明确业务分层和Netty层之间的界限.不要一昧地将业务逻辑都添加到ChannelHandler