目的
ChannelPipeline、ChannelHandler、ChannelHandlerContext 在 Netty 中是非常核心的概念,因此对此进行一次梳理。
三者关系
- 每当 ServerSocket 创建一个新的连接,就会创建一个 Socket,与目标客户端对应。
- 每一个新创建的 Socket 都会分配一个全新的 ChannelPipeline。
- 每一个 ChannelPipeline 内部都含有多个 ChannelHandlerContext。
- ChannelPipeline 和 ChannelHandlerContext 都是双向链表的结构,context 用于包装在调用 addLast 方法时添加的 ChannelHandler。
总之,一图胜千言。
- ChannelSocket 与 ChannelPipeline 是一对一的关系,而 ChannelPipeline 内部的多个 ChannelHandlerContext 形成了链表, ChannelHandlerContext 是对 ChannelHandler 的封装,通过 ChannelHandlerContext 可以获取到相应的 ChannelHandler、ChannelPipeline以及Channel。
- 请求会通过 Socket 进入相应的 pipeline,并经过 pipeline 上所有的 handler,典型的响应链设计模式。
ChannelPipeline
接口继承图: 可以看出接口继承了ChannelInboundInvoker, ChannelOutboundInvoker, Iterable接口,表示它可以调用出站和入站的方法,同时能遍历内部链表,其内部的方法基本都是对 handler 链表的操作,同时也能获取到一个 channel 对象。
再看看官方接口api对pipeline的介绍:
/*
* <pre>
* I/O Request
* via {@link Channel} or
* {@link ChannelHandlerContext}
* |
* +---------------------------------------------------+---------------+
* | ChannelPipeline | |
* | \|/ |
* | +---------------------+ +-----------+----------+ |
* | | Inbound Handler N | | Outbound Handler 1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler N-1 | | Outbound Handler 2 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ . |
* | . . |
* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
* | [ method call] [method call] |
* | . . |
* | . \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 2 | | Outbound Handler M-1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 1 | | Outbound Handler M | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* +---------------+-----------------------------------+---------------+
* | \|/
* +---------------+-----------------------------------+---------------+
* | | | |
* | [ Socket.read() ] [ Socket.write() ] |
* | |
* | Netty Internal I/O Threads (Transport Implementation) |
* +-------------------------------------------------------------------+
* </pre>
*/
- 这是一个 handler 的 list,handler 用于处理入站出站事件,pipeline 实现了过滤器的高级形式,以便用户控制事件如何处理 handler 在 pipeline 的执行顺序等。
- 入站事件由入站程序自下而上的方向处理,入站处理程序通常处理由图底部的I/O线程生成的入站数据。入站数据通常从如SocketChannel.read(ByteBuffer)获取。
- 这也展示了 handler 在 pipeline 中处理 I/O 事件的方式,I/ O事件分别由 Inbound Handler 和 Outbound Handler 处理。通过 ChannelHandlerContext.fireChannelRead 方法传递转发给下一个处理程序。可以看下源码:
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next; // 当前 ctx 的下一个ctx
} while (!ctx.inbound); // 直到找到一个入站ctx为止
return ctx;
}
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev; // 当前 ctx 的前一个ctx
} while (!ctx.outbound); // 直到找到一个出站ctx为止
return ctx;
}
实践:
- 通常一个 pipeline 有多个 handler ,在实践中,通常会包含编码器、解码器、多个业务处理程序等 handler 。
- 切记业务程序不能将线程阻塞,否则会影响 I/O 速度,从而影响整个Netty程序的性能。
- 耗时操作需要异步执行。
- 可以在添加 handler 的时候添加一个线程池。比如:pipeline.addLast(group, “bizHandler”, new BizHandler()); // 这个任务不会阻塞IO线程,执行线程来自 group 线程池
- 可以放到 netty 的任务队列 taskQueue 中或者 scheduledTaskQueue 中,比如:
- ctx.channel().eventLoop().execute(() -> System.out.println(“do something”));
- ctx.channel().eventLoop().schedule(() -> System.out.println(“do something”), 1, TimeUnit.HOURS);
ChannelHandler
public interface ChannelHandler {
/**
* handler 添加到 ctx 时被调用
*/
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
/**
* handler 从 ctx 上被删除时被调用
*/
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
/**
* handler 处理过程发生异常被调用
*/
@Deprecated
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
ChannelHandler 的作用是处理 I/O 事件或拦截 I/O 事件,并将其转发给下一个 ChannelHandler 处理程序。ChannelHandler 处理事件时分入站和出站的,两个方向的操作都是不同的,因此 Netty 定义了2个子接口继承 ChannelHandler:
- ChannelInboundHandler : 增加了读取事件过程中发生相关状态时的处理接口。当接收到数据或者与之关联的 Channel 状态改变时调用。之前已经注意到了,这些方法与 Channel 的生命周期接近。
- ChannelOutboundHandler : 增加了写出事件过程中发生相关状态时的处理接口。ChannelOutboundHandler 另个一个强大的方面是它具有在请求时延迟操作或者事件的能力。比如,当你在写数据到 remote peer 的过程中被意外暂停,你可以延迟执行刷新操作,然后在迟些时候继续。
Netty 也提供了即能处理入站事件也能处理出站事件的接口:
- ChannelDuplexHandler
ChannelHandlerContext
接口继承图: 说明:
- ChannelHandlerContext 继承了AttributeMap, ChannelInboundInvoker,ChannelOutboundInvoker,这两个 Invoker 是针对入站和出站调用时提供的方法,因此 ChannelHandlerContext 可以处理入站出站操作。
- ChannelHandlerContext 自身也定义了丰富的方法,它能够获取 context 上下文环境中对应的 channel,executor,handler,pipeline,内存分配器等有用的对象。
- context 就是包装了 handler 的一切,以便其能在 pipeline 中方便的操作 handler。
ChannelPipeline、ChannelHandler、ChannelHandlerContext 创建过程
主要分为三部分来看创建过程:
- 任何一个 Channel 创建的同时都会创建一个 pipeline。
- 当用户或系统内部调用 pipeline 的 add方法添加 handler 时,都会创建一个包装这 handler 的 context。
- context 在 pipeline 组成双向链表。
Channel 创建时创建 ChannelPipeline
protected AbstractChannel(Channel parent, ChannelId id) {
this.parent = parent;
this.id = id;
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
可以看到 Channel 创建的时候创建了 ChannelPipeline。
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
- 将 channel 绑定到 pipeline 中。
- 创建了一个 SucceededChannelFuture 和 VoidChannelPromise 用于异步回调使用。
- 创建了一个 Inbound 的。 TailContext,创建了一个即是 Inbound 也是 Outbound 的 HeadContext。
- 将 tail 和 head 结合起来形成双向链表。
pipeline 的 add方法时创建 context 并形成双向链表
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler); // 检查 handler 是否共享
newCtx = newContext(group, filterName(name, handler), handler); // 创建一个 ctx ,管理它所关联的 handler 和 同一个 pipeline 中其他 handler 之间的交互
addLast0(newCtx); // ctx 添加到链表末尾,tail 节点的前面
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true); // 添加到代办任务中稍后处理
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx); // 同步或异步或晚点异步添加 handler
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
说明:
- Netty 为了防止多线程导致安全问题,同步了这段代码块。
- 检查 handler 实例是否是共享。如果不是,且已经被别的pipeline 使用了,则抛出异常。
- 创建一个新的 context 包装 handler。
- 调用 addLast0 方法将 context 添加到链表中。
- 如果这个通道还没有注册到 selector 上,就将这个 context 添加到这个 pipeline 的待办任务中。当注册好了之后,再调用 callHandlerAdded0 方法(此方法默认什么都没做,用户可实现当做钩子使用)
总结
ChannelPipeline、ChannelHandler、ChannelHandlerContext 三者关系主要还是看那幅图即可方便理解。