Fork me on GitHub

Netty服务器启动过程源码分析

前言

Netty 是常用的网络框架,最近debug了源码,准备写一写 Netty 引导启动一个服务端的源码分析过程。

Netty服务启动调用分析

以下示例代码来自Netty源码4.1版本:

public final class EchoServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // 配置SSL
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // 1、 2、 3、4
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap(); // 5
            b.group(bossGroup, workerGroup) // 6
                    .channel(NioServerSocketChannel.class) // 7
                    .option(ChannelOption.SO_BACKLOG, 100) // 8
                    .handler(new LoggingHandler(LogLevel.INFO)) // 9
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 10
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            if (sslCtx != null) {
                                p.addLast(sslCtx.newHandler(ch.alloc()));
                            }
                            //p.addLast(new LoggingHandler(LogLevel.INFO));
                            p.addLast(serverHandler);
                        }
                    });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync(); // 11

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync(); // 12
        } finally {
            // Shut down all event loops to terminate all threads.
            // 13
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

解释 :

  1. bossGroup, workerGroup 这俩 EventLoopGroup 对象是整个Netty核心对象,其分别对应Reactor模型中的 mainReactor 和 subReactor 模型。
    • bossGroup 用于接收 tcp 请求处理 acccept 事件,然后他将请求交给 workerGroup。
    • workerGroup 会获取到真正的连接,然后与连接通信,进行 read -> decode -> do somthing -> encode -> write 等操作.
  2. EventLoopGroup 是事件循环组(线程组),其含有多个 EventLoop ,可以注册 channel ,用于事件循环中进行选择器的选择事件。
  3. new NioEventLoopGroup(1); 指定了使用1个线程,如果使用默认构造器,则默认是使用 cpus * 2 个. 在其父类 MultithreadEventLoopGroup 中,
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    
  4. 父类 MultithreadEventLoopGroup 构造器中回创建 大小为所配置的线程数的 EventExecutor 数组。
    children = new EventExecutor[nThreads];
    children[i] = newChild(executor, args); // newChild 由子类实现。
    
    同时,也会对每个 EventExecutor 做监听:
    for (EventExecutor e: children) {
     e.terminationFuture().addListener(terminationListener);
    }
    
  5. 创建服务端引导类 ServerBootstrap ,用于启动服务器和引导整个程序初始化。它和 ServerChannel 关联, ServerChannel 是 Channel 的子类。
  6. group(EventLoopGroup parentGroup, EventLoopGroup childGroup) 是配置 bossGroup 和 workerGroup 到父类的 group 和 自己的childGroup 字段中,用于后期引导使用。
  7. channel(Class<? extends C> channelClass) 是通过 这个 Class 对象反射创建 channelFactory。 调用处是在 AbstractBootstrap.initAndRegister() 中 ,
    channel = channelFactory.newChannel(); // channelFactory 的实现类是 ReflectiveChannelFactory
    
  8. option(ChannelOption option, T value) 用来配置一些 tcp 相关的选项,并放在一个 LinkedHashMap 中
  9. handler(new LoggingHandler(LogLevel.INFO))添加一个服务器专属的日志处理器类, handler方法的 handler 是给 bossGroup 用的
  10. childHandler(ChannelHandler childHandler), 是给 workerGroup 用的,其添加了一个 SocketChannel (而不是 Server端的 ServerSocketChannel) 的 handler。
  11. 绑定端口,阻塞至连接成功。
  12. main 线程阻塞等待你关闭。
  13. 优雅的关闭 bossGroup 、 workerGroup 中的所有资源。

附:NioEventLoopGroup 创建源码:

/**
 * Create a new instance.
 *
 * @param nThreads          the number of threads that will be used by this instance.
 * @param executor          the Executor to use, or {@code null} if the default should be used.
 * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
 * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
 */
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    // 创建指定线程数的执行器数组, EventExecutor 就是 EventLoop 的父类
    children = new EventExecutor[nThreads];

    // 初始化每个 EventExecutor
    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // new NioEventLoop
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            // 创建失败,就优雅关闭前面所有的 EventExecutor
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    // 为每个单例线程池添加一个关闭监听器
    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children); // 用 LinkedHashSet 管理所有 EventExecutor
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

ServerSocketChannel 绑定分析

ChannelFuture f = b.bind(PORT).sync();方法进入 AbstractBootstrap 的 doBind 方法:

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister(); // 1. 初始化和注册
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise); // 2. 绑定 ServerSocketChannel
                }
            }
        });
        return promise;
    }
}

说明: 其核心功能封装为2个方法:

  • initAndRegister(); // 1. 初始化和注册
  • doBind0(regFuture, channel, localAddress, promise); // 2. 绑定 ServerSocketChannel

再分开看这两个核心方法。

initAndRegister()

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel(); // 1. 创建 NioServerSocketChannel
        init(channel);// 2 初始化 NioServerSocketChannel
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    ChannelFuture regFuture = config().group().register(channel); // 3. 配置的 EventLoopGroup 中注册 NioServerSocketChannel
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.

    return regFuture; // 4. 返回 ChannelFuture 占位符
}

说明:

  1. 创建NioServerSocketChannel。channelFactory 反射创建一个 NioServerSocketChannel。

    public NioSocketChannel(Channel parent, SocketChannel socket) {
      super(parent, socket);
      config = new NioSocketChannelConfig(this, socket.socket());
    }
    
    protected AbstractChannel(Channel parent) {
      this.parent = parent;
      id = newId();
      unsafe = newUnsafe();
      pipeline = newChannelPipeline();
    }
    
  • 通过 NioServerSocketChannel 的 SelectorProvider 的 openServerSocketChannel 方法得到 JDK 的 channel。目的是使 Netty 再对 JDK 的 channel 进行包装。
  • 通过源码追踪到抽象父类 AbstractChannel 的创建时,
    • 创建了唯一的 ChannelId 对象 ;
    • 创建了 Unsafe 对象,其实现为 NioMessageUnsafe 。用于操作消息。
    • 创建了 ChannelPipeline 对象,其实现为 DefaultChannelPipeline。双向链表结构,过滤进出的消息.
  • 创建了一个 NioSocketChannelConfig 对象,用于对外展示配置信息。
  1. init 初始化。init(Channel channel)方法是个抽象方法, 其实现在 ServerBootstrap中。其内容也比较简单,其设置 NioSocketChannel 的TCP属性;最主要的功能是添加 ChannelInitializer 处理器到 ChannelPipeline 中。
  2. 配置的 EventLoopGroup 中注册 NioServerSocketChannel
  3. 返回 ChannelFuture 占位符

doBind0

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); // 开始绑定
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

说明:直接开始绑定,绑定的过程比较复杂,经过了一个拦截的过程,具体调用栈如下图: netty.doBind调用栈.png

其最终会调用到 NioServerSocketChannel 的 doBind 方法,然后交由 jdk 实现绑定。

@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

绑定完成后会调用 AbstractChannelsafeSetSuccess 方法。待所有 handler 都绑定了 pipleline , 则绑定结束。

继续 debug 代码,将会一直 debug 到 NioEventLooprun() 方法中,run() 方法是一个死循环,不停的轮询 select 事件,调用 processSelectedKeys() 来处理相关事件,然后 runAllTasks() 来执行每个请求的任务…

后记

Netty服务器启动过程大致如此,之后的 NioEventLoop 是处理相关业务的代码待分析中。下次准备写一下Netty接受请求的源码剖析。

-------------本文结束,感谢您的阅读-------------
贵在坚持,如果您觉得本文还不错,不妨打赏一下~
0%