Fork me on GitHub

Netty接受请求过程源码分析

目的

服务端启动后,肯定是需要接受客户端的请求数据并进行业务处理,再返回给客户端结果。那么就需要通过源码分析,Netty 是如何接受客户端的请求?

源码分析

之前我们知道,Netty 通过 NioEventLoop 事件循环的 run() 方法轮询 select 事件。那么请求服务端 并debug进入 NioEventLoop.run(),然后直到 processSelectedKey(SelectionKey k, AbstractNioChannel ch),这里处理单个 SelectionKey 的方法。

接着代码会进入这里,这里 readyOps 的值等于 SelectionKey.OP_ACCEPT,说明是一个 accept 事件。

// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

unsafe 对象的实现类实际是 AbstractNioMessageChannel$NioMessageUnsafe 类,调用其 read() 方法。

read 方法分析

  1. 检查该 eventLoop 线程是否是当前线程
  2. 调用 doReadMessages 方法,并传入 readBuf 对象,该对象是一个 ArrayList。 doReadMessages 方法是读取 boss线程中的 NioSeverSocketChannel 接受到的请求获取到 jdk 的SocketChannel,然后将其包装为 Netty 的 NioSocketChannel,并把 NioSocketChannel 放入到 readBuf 中。如下:
    SocketChannel ch = SocketUtils.accept(javaChannel());
    buf.add(new NioSocketChannel(this, ch));
    
  3. 循环容器中的每个数据,调用 pipeline.fireChannelRead(readBuf.get(i)) 方法。这里就开始执行管道 pipeline 中的 handler 的 ChannelRead 方法。代码如下,最终debug 到 AbstractChannelHandlerContextinvokeChannelRead方法。

    private void invokeChannelRead(Object msg) {
     if (invokeHandler()) {
         try {
             ((ChannelInboundHandler) handler()).channelRead(this, msg);
         } catch (Throwable t) {
             notifyHandlerException(t);
         }
     } else {
         fireChannelRead(msg);
     }
    }
    

    经过debug,会发现反复执行多个 handler 的 ChannelRead 方法,这是因为我们的 pipleline 中有4个 handler,分别是 HeadContext,LoggingHandler,ServerBootstrapAcceptor,TailContext。

  4. ServerBootstrapAcceptor 是 ServerBootstrap 的 内部类,它的 ChannelRead 方法做了一件很重要的事情,就是将客户端的连接注册到 workerGroup 线程池中:

    childGroup.register(child).addListener(new ChannelFutureListener() {
     @Override
     public void operationComplete(ChannelFuture future) throws Exception {
         if (!future.isSuccess()) {
             forceClose(child, future.cause());
         }
     }
    });
    

register 方法

通过 childGroup.register(child) 注册了 Channel 。然后开始注册,通过调用父类 MultithreadEventLoopGroup 中的 register 方法注册。

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

说明:该方法通过 next() 方法从 childGroup 获取一个 EventLoop 来执行注册。该 EventLoop 实际是SingleThreadEventLoop类的实现。

public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

通过debug调用,发现实际上的调用是 AbstractChannel$AbstractUnsaferegister(EventLoop eventLoop, final ChannelPromise promise)方法。然后再通过层层调用,最终调到AbstractChanneldoBeginRead方法:

protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

这里就是熟悉的 jdk 连接建立代码了,至此客户端连接建立,然后 workerGroup 可以轮询 read/write 事件并处理了。

总结

总体流程:

接受连接 —> 创建一个新的 NioSeverSocketChannel —> 注册到 workerGroup 中的其中一个 EventLoop 中 —> 注册 select,read 事件

  1. 服务器轮询 accept 事件,获取事件后调用 unsafe 的 read 方法,此 unsafe 是 ServerSocket 的内部类,该方法内部分为 2 部分。
  2. doReadMessages 用于创建 NioSocketChannel 对象,其包装了 jdk 的 ServerSocketChannel,并添加入 readBuf 容器。
  3. 执行 pipeline.fireChannelRead 方法,将自己绑定到 chooser 选择器选择的一个 workerGroup 的 EventLoop 上,并且注册一个0,表示注册成功。
-------------本文结束,感谢您的阅读-------------
贵在坚持,如果您觉得本文还不错,不妨打赏一下~
0%