目的
服务端启动后,肯定是需要接受客户端的请求数据并进行业务处理,再返回给客户端结果。那么就需要通过源码分析,Netty 是如何接受客户端的请求?
源码分析
之前我们知道,Netty 通过 NioEventLoop 事件循环的 run() 方法轮询 select 事件。那么请求服务端 并debug进入 NioEventLoop.run(),然后直到 processSelectedKey(SelectionKey k, AbstractNioChannel ch)
,这里处理单个 SelectionKey 的方法。
接着代码会进入这里,这里 readyOps 的值等于 SelectionKey.OP_ACCEPT,说明是一个 accept 事件。
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
unsafe 对象的实现类实际是 AbstractNioMessageChannel$NioMessageUnsafe 类,调用其 read() 方法。
read 方法分析
- 检查该 eventLoop 线程是否是当前线程
- 调用 doReadMessages 方法,并传入 readBuf 对象,该对象是一个 ArrayList。 doReadMessages 方法是读取 boss线程中的 NioSeverSocketChannel 接受到的请求获取到 jdk 的SocketChannel,然后将其包装为 Netty 的 NioSocketChannel,并把 NioSocketChannel 放入到 readBuf 中。如下:
SocketChannel ch = SocketUtils.accept(javaChannel()); buf.add(new NioSocketChannel(this, ch));
循环容器中的每个数据,调用 pipeline.fireChannelRead(readBuf.get(i)) 方法。这里就开始执行管道 pipeline 中的 handler 的 ChannelRead 方法。代码如下,最终debug 到
AbstractChannelHandlerContext
的invokeChannelRead
方法。private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } }
经过debug,会发现反复执行多个 handler 的 ChannelRead 方法,这是因为我们的 pipleline 中有4个 handler,分别是 HeadContext,LoggingHandler,ServerBootstrapAcceptor,TailContext。
ServerBootstrapAcceptor 是 ServerBootstrap 的 内部类,它的 ChannelRead 方法做了一件很重要的事情,就是将客户端的连接注册到 workerGroup 线程池中:
childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } });
register 方法
通过 childGroup.register(child) 注册了 Channel 。然后开始注册,通过调用父类 MultithreadEventLoopGroup 中的 register 方法注册。
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
说明:该方法通过 next() 方法从 childGroup 获取一个 EventLoop 来执行注册。该 EventLoop 实际是SingleThreadEventLoop
类的实现。
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
通过debug调用,发现实际上的调用是 AbstractChannel$AbstractUnsafe
的register(EventLoop eventLoop, final ChannelPromise promise)
方法。然后再通过层层调用,最终调到AbstractChannel
的doBeginRead
方法:
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
这里就是熟悉的 jdk 连接建立代码了,至此客户端连接建立,然后 workerGroup 可以轮询 read/write 事件并处理了。
总结
总体流程:
接受连接 —> 创建一个新的 NioSeverSocketChannel —> 注册到 workerGroup 中的其中一个 EventLoop 中 —> 注册 select,read 事件。
- 服务器轮询 accept 事件,获取事件后调用 unsafe 的 read 方法,此 unsafe 是 ServerSocket 的内部类,该方法内部分为 2 部分。
- doReadMessages 用于创建 NioSocketChannel 对象,其包装了 jdk 的 ServerSocketChannel,并添加入 readBuf 容器。
- 执行 pipeline.fireChannelRead 方法,将自己绑定到 chooser 选择器选择的一个 workerGroup 的 EventLoop 上,并且注册一个0,表示注册成功。