前言
Netty 是常用的网络框架,最近debug了源码,准备写一写 Netty 引导启动一个服务端的源码分析过程。
Netty服务启动调用分析
以下示例代码来自Netty源码4.1版本:
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// 配置SSL
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// 1、 2、 3、4
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap(); // 5
b.group(bossGroup, workerGroup) // 6
.channel(NioServerSocketChannel.class) // 7
.option(ChannelOption.SO_BACKLOG, 100) // 8
.handler(new LoggingHandler(LogLevel.INFO)) // 9
.childHandler(new ChannelInitializer<SocketChannel>() { // 10
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync(); // 11
// Wait until the server socket is closed.
f.channel().closeFuture().sync(); // 12
} finally {
// Shut down all event loops to terminate all threads.
// 13
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
解释 :
- bossGroup, workerGroup 这俩 EventLoopGroup 对象是整个Netty核心对象,其分别对应Reactor模型中的 mainReactor 和 subReactor 模型。
- bossGroup 用于接收 tcp 请求处理 acccept 事件,然后他将请求交给 workerGroup。
- workerGroup 会获取到真正的连接,然后与连接通信,进行 read -> decode -> do somthing -> encode -> write 等操作.
- EventLoopGroup 是事件循环组(线程组),其含有多个 EventLoop ,可以注册 channel ,用于事件循环中进行选择器的选择事件。
- new NioEventLoopGroup(1); 指定了使用1个线程,如果使用默认构造器,则默认是使用 cpus * 2 个. 在其父类
MultithreadEventLoopGroup
中,DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
- 父类 MultithreadEventLoopGroup 构造器中回创建 大小为所配置的线程数的 EventExecutor 数组。
同时,也会对每个 EventExecutor 做监听:children = new EventExecutor[nThreads]; children[i] = newChild(executor, args); // newChild 由子类实现。
for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); }
- 创建服务端引导类 ServerBootstrap ,用于启动服务器和引导整个程序初始化。它和 ServerChannel 关联, ServerChannel 是 Channel 的子类。
- group(EventLoopGroup parentGroup, EventLoopGroup childGroup) 是配置 bossGroup 和 workerGroup 到父类的 group 和 自己的childGroup 字段中,用于后期引导使用。
- channel(Class<? extends C> channelClass) 是通过 这个 Class 对象反射创建 channelFactory。 调用处是在 AbstractBootstrap.initAndRegister() 中 ,
channel = channelFactory.newChannel(); // channelFactory 的实现类是 ReflectiveChannelFactory
- option(ChannelOption
option, T value) 用来配置一些 tcp 相关的选项,并放在一个 LinkedHashMap 中 - handler(new LoggingHandler(LogLevel.INFO))添加一个服务器专属的日志处理器类, handler方法的 handler 是给 bossGroup 用的
- childHandler(ChannelHandler childHandler), 是给 workerGroup 用的,其添加了一个 SocketChannel (而不是 Server端的 ServerSocketChannel) 的 handler。
- 绑定端口,阻塞至连接成功。
- main 线程阻塞等待你关闭。
- 优雅的关闭 bossGroup 、 workerGroup 中的所有资源。
附:NioEventLoopGroup 创建源码:
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param chooserFactory the {@link EventExecutorChooserFactory} to use.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 创建指定线程数的执行器数组, EventExecutor 就是 EventLoop 的父类
children = new EventExecutor[nThreads];
// 初始化每个 EventExecutor
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// new NioEventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 创建失败,就优雅关闭前面所有的 EventExecutor
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
// 为每个单例线程池添加一个关闭监听器
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children); // 用 LinkedHashSet 管理所有 EventExecutor
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
ServerSocketChannel 绑定分析
ChannelFuture f = b.bind(PORT).sync();
方法进入 AbstractBootstrap
的 doBind 方法:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister(); // 1. 初始化和注册
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise); // 2. 绑定 ServerSocketChannel
}
}
});
return promise;
}
}
说明: 其核心功能封装为2个方法:
- initAndRegister(); // 1. 初始化和注册
- doBind0(regFuture, channel, localAddress, promise); // 2. 绑定 ServerSocketChannel
再分开看这两个核心方法。
initAndRegister()
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel(); // 1. 创建 NioServerSocketChannel
init(channel);// 2 初始化 NioServerSocketChannel
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel); // 3. 配置的 EventLoopGroup 中注册 NioServerSocketChannel
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture; // 4. 返回 ChannelFuture 占位符
}
说明:
创建
NioServerSocketChannel
。channelFactory 反射创建一个 NioServerSocketChannel。public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); } protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
- 通过 NioServerSocketChannel 的 SelectorProvider 的 openServerSocketChannel 方法得到 JDK 的 channel。目的是使 Netty 再对 JDK 的 channel 进行包装。
- 通过源码追踪到抽象父类 AbstractChannel 的创建时,
- 创建了唯一的 ChannelId 对象 ;
- 创建了 Unsafe 对象,其实现为 NioMessageUnsafe 。用于操作消息。
- 创建了 ChannelPipeline 对象,其实现为 DefaultChannelPipeline。双向链表结构,过滤进出的消息.
- 创建了一个 NioSocketChannelConfig 对象,用于对外展示配置信息。
- init 初始化。
init(Channel channel)
方法是个抽象方法, 其实现在ServerBootstrap
中。其内容也比较简单,其设置 NioSocketChannel 的TCP属性;最主要的功能是添加 ChannelInitializer 处理器到 ChannelPipeline 中。 - 配置的 EventLoopGroup 中注册 NioServerSocketChannel
- 返回 ChannelFuture 占位符
doBind0
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); // 开始绑定
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
说明:直接开始绑定,绑定的过程比较复杂,经过了一个拦截的过程,具体调用栈如下图:
其最终会调用到 NioServerSocketChannel
的 doBind 方法,然后交由 jdk 实现绑定。
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
绑定完成后会调用 AbstractChannel
的 safeSetSuccess
方法。待所有 handler 都绑定了 pipleline , 则绑定结束。
继续 debug 代码,将会一直 debug 到 NioEventLoop
的 run()
方法中,run() 方法是一个死循环,不停的轮询 select 事件,调用 processSelectedKeys()
来处理相关事件,然后 runAllTasks()
来执行每个请求的任务…
后记
Netty服务器启动过程大致如此,之后的 NioEventLoop 是处理相关业务的代码待分析中。下次准备写一下Netty接受请求的源码剖析。