Fork me on GitHub

ChannelPipeline、ChannelHandler、ChannelHandlerContext 源码剖析

目的

ChannelPipeline、ChannelHandler、ChannelHandlerContext 在 Netty 中是非常核心的概念,因此对此进行一次梳理。

三者关系

  1. 每当 ServerSocket 创建一个新的连接,就会创建一个 Socket,与目标客户端对应。
  2. 每一个新创建的 Socket 都会分配一个全新的 ChannelPipeline。
  3. 每一个 ChannelPipeline 内部都含有多个 ChannelHandlerContext。
  4. ChannelPipeline 和 ChannelHandlerContext 都是双向链表的结构,context 用于包装在调用 addLast 方法时添加的 ChannelHandler。

总之,一图胜千言。 pipleline,handler,ctx关系.png

  • ChannelSocket 与 ChannelPipeline 是一对一的关系,而 ChannelPipeline 内部的多个 ChannelHandlerContext 形成了链表, ChannelHandlerContext 是对 ChannelHandler 的封装,通过 ChannelHandlerContext 可以获取到相应的 ChannelHandler、ChannelPipeline以及Channel。
  • 请求会通过 Socket 进入相应的 pipeline,并经过 pipeline 上所有的 handler,典型的响应链设计模式。

ChannelPipeline

接口继承图: ChannelPipleline接口继承图.png 可以看出接口继承了ChannelInboundInvoker, ChannelOutboundInvoker, Iterable接口,表示它可以调用出站和入站的方法,同时能遍历内部链表,其内部的方法基本都是对 handler 链表的操作,同时也能获取到一个 channel 对象。

再看看官方接口api对pipeline的介绍:

/*
* <pre>
*                                                 I/O Request
*                                            via {@link Channel} or
*                                        {@link ChannelHandlerContext}
*                                                      |
*  +---------------------------------------------------+---------------+
*  |                           ChannelPipeline         |               |
*  |                                                  \|/              |
*  |    +---------------------+            +-----------+----------+    |
*  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|\                                  |               |
*  |               |                                  \|/              |
*  |    +----------+----------+            +-----------+----------+    |
*  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|\                                  .               |
*  |               .                                   .               |
*  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
*  |        [ method call]                       [method call]         |
*  |               .                                   .               |
*  |               .                                  \|/              |
*  |    +----------+----------+            +-----------+----------+    |
*  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|\                                  |               |
*  |               |                                  \|/              |
*  |    +----------+----------+            +-----------+----------+    |
*  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|\                                  |               |
*  +---------------+-----------------------------------+---------------+
*                  |                                  \|/
*  +---------------+-----------------------------------+---------------+
*  |               |                                   |               |
*  |       [ Socket.read() ]                    [ Socket.write() ]     |
*  |                                                                   |
*  |  Netty Internal I/O Threads (Transport Implementation)            |
*  +-------------------------------------------------------------------+
* </pre>
*/
  • 这是一个 handler 的 list,handler 用于处理入站出站事件,pipeline 实现了过滤器的高级形式,以便用户控制事件如何处理 handler 在 pipeline 的执行顺序等。
  • 入站事件由入站程序自下而上的方向处理,入站处理程序通常处理由图底部的I/O线程生成的入站数据。入站数据通常从如SocketChannel.read(ByteBuffer)获取。
  • 这也展示了 handler 在 pipeline 中处理 I/O 事件的方式,I/ O事件分别由 Inbound Handler 和 Outbound Handler 处理。通过 ChannelHandlerContext.fireChannelRead 方法传递转发给下一个处理程序。可以看下源码:
public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next; // 当前 ctx 的下一个ctx
    } while (!ctx.inbound); // 直到找到一个入站ctx为止
    return ctx;
}

private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev; // 当前 ctx 的前一个ctx
    } while (!ctx.outbound); // 直到找到一个出站ctx为止
    return ctx;
}

实践:

  • 通常一个 pipeline 有多个 handler ,在实践中,通常会包含编码器、解码器、多个业务处理程序等 handler 。
  • 切记业务程序不能将线程阻塞,否则会影响 I/O 速度,从而影响整个Netty程序的性能。
  • 耗时操作需要异步执行
    • 可以在添加 handler 的时候添加一个线程池。比如:pipeline.addLast(group, “bizHandler”, new BizHandler()); // 这个任务不会阻塞IO线程,执行线程来自 group 线程池
    • 可以放到 netty 的任务队列 taskQueue 中或者 scheduledTaskQueue 中,比如:
      • ctx.channel().eventLoop().execute(() -> System.out.println(“do something”));
      • ctx.channel().eventLoop().schedule(() -> System.out.println(“do something”), 1, TimeUnit.HOURS);

ChannelHandler

public interface ChannelHandler {

    /**
     * handler 添加到 ctx 时被调用
     */
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;

    /**
     * handler 从 ctx 上被删除时被调用
     */
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

    /**
     * handler 处理过程发生异常被调用
     */
    @Deprecated
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;


}

ChannelHandler 的作用是处理 I/O 事件或拦截 I/O 事件,并将其转发给下一个 ChannelHandler 处理程序。ChannelHandler 处理事件时分入站和出站的,两个方向的操作都是不同的,因此 Netty 定义了2个子接口继承 ChannelHandler:

  • ChannelInboundHandler : 增加了读取事件过程中发生相关状态时的处理接口。当接收到数据或者与之关联的 Channel 状态改变时调用。之前已经注意到了,这些方法与 Channel 的生命周期接近。
  • ChannelOutboundHandler : 增加了写出事件过程中发生相关状态时的处理接口。ChannelOutboundHandler 另个一个强大的方面是它具有在请求时延迟操作或者事件的能力。比如,当你在写数据到 remote peer 的过程中被意外暂停,你可以延迟执行刷新操作,然后在迟些时候继续。

Netty 也提供了即能处理入站事件也能处理出站事件的接口:

  • ChannelDuplexHandler

ChannelHandlerContext

接口继承图: ChannelHandlerContext接口继承图.png 说明:

  • ChannelHandlerContext 继承了AttributeMap, ChannelInboundInvoker,ChannelOutboundInvoker,这两个 Invoker 是针对入站和出站调用时提供的方法,因此 ChannelHandlerContext 可以处理入站出站操作。
  • ChannelHandlerContext 自身也定义了丰富的方法,它能够获取 context 上下文环境中对应的 channel,executor,handler,pipeline,内存分配器等有用的对象。
  • context 就是包装了 handler 的一切,以便其能在 pipeline 中方便的操作 handler。

ChannelPipeline、ChannelHandler、ChannelHandlerContext 创建过程

主要分为三部分来看创建过程:

  • 任何一个 Channel 创建的同时都会创建一个 pipeline。
  • 当用户或系统内部调用 pipeline 的 add方法添加 handler 时,都会创建一个包装这 handler 的 context。
  • context 在 pipeline 组成双向链表。

Channel 创建时创建 ChannelPipeline

protected AbstractChannel(Channel parent, ChannelId id) {
    this.parent = parent;
    this.id = id;
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

可以看到 Channel 创建的时候创建了 ChannelPipeline。

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}
  • 将 channel 绑定到 pipeline 中。
  • 创建了一个 SucceededChannelFuture 和 VoidChannelPromise 用于异步回调使用。
  • 创建了一个 Inbound 的。 TailContext,创建了一个即是 Inbound 也是 Outbound 的 HeadContext。
  • 将 tail 和 head 结合起来形成双向链表。

pipeline 的 add方法时创建 context 并形成双向链表

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler); // 检查 handler 是否共享

        newCtx = newContext(group, filterName(name, handler), handler); // 创建一个 ctx ,管理它所关联的 handler 和 同一个 pipeline 中其他 handler 之间的交互

        addLast0(newCtx); // ctx 添加到链表末尾,tail 节点的前面

        // If the registered is false it means that the channel was not registered on an eventloop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true); // 添加到代办任务中稍后处理
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx); // 同步或异步或晚点异步添加 handler
                }
            });
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

说明:

  • Netty 为了防止多线程导致安全问题,同步了这段代码块。
  • 检查 handler 实例是否是共享。如果不是,且已经被别的pipeline 使用了,则抛出异常。
  • 创建一个新的 context 包装 handler。
  • 调用 addLast0 方法将 context 添加到链表中。
  • 如果这个通道还没有注册到 selector 上,就将这个 context 添加到这个 pipeline 的待办任务中。当注册好了之后,再调用 callHandlerAdded0 方法(此方法默认什么都没做,用户可实现当做钩子使用)

总结

ChannelPipeline、ChannelHandler、ChannelHandlerContext 三者关系主要还是看那幅图即可方便理解。

-------------本文结束,感谢您的阅读-------------
贵在坚持,如果您觉得本文还不错,不妨打赏一下~
0%