netty核心流程(一):服务端怎样创建连接

打印 上一主题 下一主题

主题 870|帖子 870|积分 2610

为了吸收连接请求, Netty 服务端应该做些什么事情?

根据Java NIO 的知识,服务端在预备吸收客户端连接之前做了下面几个工作,我们可以带着问题往下看。

  • 服务端对连接请求是怎样初始化的?
  • 怎样把用户界说的处置处罚逻辑 childHandler 加入到 Netty 的处置处罚流程里?
  • 怎样在 Socket 上绑定一个端口?
  • 怎样把请求连接的 OP_ACCEPT 事件(客户端请求连接的事件)注册到 Selector 上的?
  • Netty 线程是怎样轮询 ServerSocketChannel 的网络连接事件的?
  • 对于客户端发来的连接请求,服务端是如那边理的?
  • 连接成功创建后,怎样读客户端发来的数据?
我们会根据 Netty 底层的源码来解析上面这几个部分是怎样实现的,源码讲解时我们只讲解主线代码,只要可以大概理解 Netty 大体是怎样实现上述功能就可以了,否则会影响理解。
服务端对连接请求是怎样初始化的?

我们根据上节课 NettyServer 的代码讲起,首先我们先分析一下端口绑定的那行代码:

我们对 bind() 方法进行代码追踪,找到方法 doBind():

 
initAndRegister() 功能是初始化 ServerSocketChannel,然后把对应的网络事件注册到 Selector 的方法,我们具体看看:
  1. // 创建和初始化一个 ServerSocketChannel,并注册到 Selector 轮询复用组件上去。
  2. final ChannelFuture initAndRegister() {
  3.     Channel channel = null;
  4.     try {
  5.         // 通过工厂类获取 ServerSocketChannel
  6.         channel = channelFactory.newChannel();
  7.         // 初始化 ServerSocketChannel
  8.         init(channel);
  9.     } catch (Throwable t) {
  10.         if (channel != null) {
  11.             channel.unsafe().closeForcibly();
  12.             return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
  13.         }
  14.         return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
  15.     }
  16.     // 拿出来之前创建的 EventLoopGroup, 然后把 Channel 注册到 EventLoopGroup上,目的是轮询各种 channel 上的网络事件,
  17.     // 我们猜测是不是 让 EvetnLoopGroup 中的独立线程利用一个 Selector 来注册 Channel,并轮询网络事件。
  18.     ChannelFuture regFuture = config().group().register(channel);
  19.     if (regFuture.cause() != null) {
  20.         if (channel.isRegistered()) {
  21.             channel.close();
  22.         } else {
  23.             channel.unsafe().closeForcibly();
  24.         }
  25.     }
  26.     return regFuture;
  27. }
复制代码
可以看到,通过 channelFactory.newChannel() 获得一个 Channel。这个 channelFactory 的实现类很多,这里用的是类 ReflectiveChannelFactory 来实现的,顾名思义是通过反射获得的 Channel。我们再验证下 ReflectiveChannelFactory 是怎样给我们实例化 Channel 的。

 
可以看到果然是通过反射后得到的 ServerSocketChannel 的实例。
那么,工厂类是怎样知道反射哪个 Channel 呢?这就要看我们上一章写的服务端的代码了。

 
可以看到,服务端用来吸收请求的类是 NioServerSocketChannel。那么,这个类毕竟有没有封装 ServerSocketChannel 呢?我们看 NioServerSocketChannel 的构造方法就可以了:

在这里,我们就真的找到了创建 ServerSocketChannel 实例的代码。也就是说,NioServerSocketChannel 是对 ServerSocketChannel 的封装。到这里,ServerSocketChannel 的实例化就成功了,接着我们看看对 ServerSocketChannel 做了哪些初始化的工作?

init(channel) 方法是对 ServerSocketChannel 的初始化。由于是服务端代码,对应的实现类就是 ServerBootstrap,再看它对 init(channel) 的具体实现:
 java
  1. void init(Channel channel) {
  2.     // 对ServerSocketChannel 进行相关网络参数的指定。
  3.     setChannelOptions(channel, newOptionsArray(), logger);
  4.     // 初始化相关的一些属性
  5.     setAttributes(channel, newAttributesArray());
  6.     ChannelPipeline p = channel.pipeline();
  7.     final EventLoopGroup currentChildGroup = childGroup;
  8.     final ChannelHandler currentChildHandler = childHandler;
  9.     final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
  10.     final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
  11.     // 第一个拦截器:对网络请求处理链路中加入一个自己内置的一个处理逻辑。初始化了网络请求的处理链路
  12.     p.addLast(new ChannelInitializer<Channel>() {
  13.         @Override
  14.         public void initChannel(final Channel ch) {
  15.             final ChannelPipeline pipeline = ch.pipeline();
  16.             ChannelHandler handler = config.handler();
  17.             if (handler != null) {
  18.                 // 第二个拦截器:
  19.                 pipeline.addLast(handler);
  20.             }
  21.             // 循环执行下面的任务
  22.             ch.eventLoop().execute(new Runnable() {
  23.                 @Override
  24.                 public void run() {
  25.                     // 第三个拦截器:加入设定的拦截器
  26.                     pipeline.addLast(new ServerBootstrapAcceptor(
  27.                             ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
  28.                 }
  29.             });
  30.         }
  31.     });
  32. }
复制代码
可以看到,初始化了一些相关的网络参数,比如上节课 Netty Demo 中 Server 端代码的 SO_BACKLOG 属性。然后初始化一些相关属性。
最后一段紧张的代码是在 Pipeline(拦截器链) 上加入第一个内置的拦截器;第二个拦截器是内部设置的拦截器;然后,通过 eventLoop() 循环处置处罚连接请求,同时还实例化了第三个拦截器 ServerBootstrapAcceptor,然后把第三个拦截器放入拦截器链中,这个过程是启用了一个新的线程来完成的。ServerBootstrapAcceptor 的构造方法的参数包括用户设置的 channel(也就是 NioServerSocketChannel),以及用户设置的 childgroup 线程组,以及处置处罚客户端连接的逻辑 childHandler,最后是一些设置和属性的参数。
初始化就结束了。
怎样把用户界说的处置处罚连接请求的逻辑加入到 Netty 里?

然后,进入了 ServerBootstrapAcceptor:

这里会看到第四个拦截器。其中,这第四个拦截器才是真正的用户设置的拦截器。随后还会有个监听器,用于拦截器链中所有的拦截器都执行完了以后,再做一些收尾工作。
这段逻辑包涵了 4 个拦截器的创建,比较复杂:

现在,服务端连接的初始化已经完成了,但是还没有在 Selector 上注册相应的事件,接下来我们看看这个功能是怎样实现的。
怎样把请求连接的 OP_ACCEPT 事件(客户端请求连接的事件)注册到 Selector 上的?

我们看一下相应的源码:

 
注册网络担当连接请求的事件就在调用 register() 方法里了。

由于我们用的是线程组,这里的实现类是 MultithreadEventLoopGroup,其实这里面就是一组 EventLoop。EventLoop 本质上一个线程,每个 EventLoop 会对应一个 Selector。以是,EventLoop 是一个循序处置处罚网络事件的线程。这里通过调用 next() 返回一个线程组里面的一个线程。
接着,我们追踪到 AbstractNioChannel 类中的 doRegister()方法中:

在这里,我们就能看到已经把 ServerSocketChannel 以及它对应的 OP_ACCEPT(OP_ACCEPT=0) 事件注册到 Selector 上了。
至此,我们已经把连接事件注册到了对应线程的 Selector 上。
给 ServerSocketChannel 绑定端口

我们回到上一章的 Demo,从 bind() 方法开始看:


这里就已经把端口绑定到 socket 上了,就不继续深究内部的代码了。
至此,服务端创建连接的预备工作都分析完了。
Netty 线程是怎样轮询 ServerSocketChannel 的网络连接事件的?

首先,我们看看 NioEventLoopGroup 的父类 MultithreadEventLoopGroup 的构造方法:
  1. static {
  2.     // 默认的线程数为 CPU 核数的两倍
  3.     DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
  4.             "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
  5.     if (logger.isDebugEnabled()) {
  6.         logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
  7.     }
  8. }
  9. protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
  10.     // 设定线程数
  11.     super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
  12. }
复制代码
MultithreadEventLoopGroup 是一个多线程集合,如果你不设置线程数的话。默认线程数是 CPU 的核数*2。
 

当要增加一个线程时,调用的是 newChild() 方法,我们再看看里面做了什么:

 
我们可以看到,线程组里面的元素是 NioEventLoop 类。

很明显,这是一个单线程的线程池。
然后,我们代码跟踪,可以看到往 Selector 里注册网络事件。
  1. private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
  2.     try {
  3.         // 注册网络事件
  4.         ch.register(unwrappedSelector, interestOps, task);
  5.     } catch (Exception e) {
  6.         throw new EventLoopException("failed to register a channel", e);
  7.     }
  8. }
复制代码
这里注册的是 OP_ACCEPT 网络事件。
那么,这个线程是在哪里轮询网络事件的呢?这就要找到线程的 run() 方法了
  1. protected void run() {
  2.     int selectCnt = 0;
  3.     for (;;) {
  4.         try {
  5.             int strategy;
  6.             try {
  7.                 strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
  8.                 switch (strategy) {
  9.                 case SelectStrategy.CONTINUE:
  10.                     continue;
  11.                 case SelectStrategy.BUSY_WAIT:
  12.                 // 通过轮询的方式不断尝试监听新的网络事件
  13.                 case SelectStrategy.SELECT:
  14.                     // 设定每次轮询的间隔时间
  15.                     long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
  16.                     if (curDeadlineNanos == -1L) {
  17.                         curDeadlineNanos = NONE; // nothing on the calendar
  18.                     }
  19.                     nextWakeupNanos.set(curDeadlineNanos);
  20.                     try {
  21.                         if (!hasTasks()) {
  22.                             // 真正的尝试轮询新的网络事件
  23.                             strategy = select(curDeadlineNanos);
  24.                         }
  25.                     } finally {
  26.                         nextWakeupNanos.lazySet(AWAKE);
  27.                     }
  28.                     // fall through
  29.                 default:
  30.                 }
复制代码
通过for(;;)死循环来不断实验发现新的网络事件,如果有就返回。但有个轮询隔断时间来控制执行频率:
[code]private int select(long deadlineNanos) throws IOException {    if (deadlineNanos == NONE) {        // 轮询网络事件        return selector.select();    }    long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;    return timeoutMillis

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

宝塔山

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表