Golang网络模型netpoll源码解析

打印 上一主题 下一主题

主题 863|帖子 863|积分 2589

0、弁言

在学习完了Socket编程的基础知识、Linux系统提供的I/O多路复用的实现以及Golang的GMP调度模型之后,我们进而学习Golang的网络模型——netpoll。本文将从为什么必要使用netpoll模型,以及netpoll的具体流程实现两个主要角度来睁开学习。当前使用的Go的版本为1.22.4,Linux系统。
1、为什么要使用netpoll模型?

首先,什么是多路复用?
多路,指的是存在着多个必要服务的对象;复用,指的是重复利用一个单元来为上述的多个目标提供服务。
我们知道,Linux系统为用户提供了三个内核实现的IO多路复用技能的系统调用,用发展时间来排序分别为:select->poll->epoll。其中,epoll在当今使用的最为广泛,对比与select调用,它有以下的优势:

  • fd数量机动:可监听的fd数量上限机动,使用方可以在调用epoll_create操作时自行指定。
  • 更少的内核拷贝次数:在内核中,使用红黑树的结构来存储必要监听的fd,相比与调用select每次必要将所有的fd拷贝进内核,监听到变乱后再全部拷贝回用户态,epoll只必要将必要监听的fd添加到变乱表后,即可多次监听。
  • 返回结果明确:epoll运行将就绪变乱添加到就绪变乱列表中,当用户调用epoll_wait操作时,内核只返回就绪变乱,而select返回的是所有的变乱,必要用户再举行一次遍历,找到就绪变乱再处理。
必要注意的是,在不同的条件环境下,epoll的优势可能反而作用不明显。epoll只实用在监听fd基数较大且活跃度不高的场景,如此epoll变乱表的空间复用和epoll_wait操作的精准才气表现出其优势;而当处在fd基数较小且活跃度高的场景下,select反而更加简朴有用,构造epoll的红黑树结构的消耗会成为其累赘。
思量到场景的多样性,我们会选择使用epoll去完成内核变乱监听的操作,那么如何将golang和epoll结合起来呢?
在 Go 语言的并发模型中,GMP 框架实现了一种高效的协程调度机制,它屏蔽了操作系统线程的细节,用户可以通过轻量级的 Goroutine 来实现细粒度的并发操作。然而,底层的 IO 多路复用机制(如 Linux 的 epoll)调度的单元仍旧是线程(M)。为了将 IO 调度从线程层面提升到协程层面,充分发挥 Goroutine 的高并发优势,netpoll 应运而生。
接下来我们就来学习netpoll框架的实现。
2、netpoll实现原理

2.1、焦点结构

1、pollDesc

为了将IO调度从线程提升到协程层面,netpoll框架有个重要的焦点结构pollDesc,它有两个,一个为表层,含有指针指向了里层的pollDesc。本文中讲到的pollDesc都为里层pollDesc。
表层pollDesc定位在internel/poll/fd_poll_runtime.go文件中:
  1. type pollDesc struct {
  2.         runtimeCtx uintptr
  3. }
复制代码
使用一个runtimeCtx指针指向其底层实实际例。
里层的位于runtime/netpoll.go中。
  1. //网络poller描述符
  2. type pollDesc struct {
  3.     //next指针,指向在pollCache链表结构中,以下个pollDesc实例。
  4.         link  *pollDesc      
  5.     //指向fd
  6.         fd    uintptr
  7.        
  8.     //读事件状态标识器,状态有四种:
  9.     //1、pdReady:表示读操作已就绪,等待处理
  10.     //2、pdWait:表示g将要被阻塞等待读操作就绪,此时还未阻塞
  11.     //3、g:读操作的g已经被阻塞,rg指向阻塞的g实例
  12.     //4、pdNil:空
  13.         rg atomic.Uintptr
  14.         wg atomic.Uintptr
  15.     //...
  16. }
复制代码
pollDesc的焦点字段是读/写标识器rg/wg,它用于标识fd的io变乱状态,并且持有被阻塞的g实例。当后续必要唤醒这个g处理读写变乱的时候,可以通过pollDesc追溯得到g的实例举行操作。有了pollDesc这个数据结构,Golang就能将对处理socket的调度单元从线程Thread转换成协程G。
2、pollCache

pollCache缓冲池接纳了单向链表的方式存储多个pollDesc实例。
  1. type pollCache struct {
  2.         lock  mutex
  3.         first *pollDesc
  4. }
复制代码
其包含了两个焦点方法,分别是alloc()和free()
  1. //从pollCache中分配得到一个pollDesc实例
  2. func (c *pollCache) alloc() *pollDesc {
  3.         lock(&c.lock)
  4.     //如果链表为空,则进行初始化
  5.         if c.first == nil {
  6.         //pdSize = 248
  7.                 const pdSize = unsafe.Sizeof(pollDesc{})
  8.         //4096 / 248 = 16
  9.                 n := pollBlockSize / pdSize
  10.                 if n == 0 {
  11.                         n = 1
  12.                 }
  13.         //分配指定大小的内存空间
  14.                 mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
  15.         //完成指定数量的pollDesc创建
  16.                 for i := uintptr(0); i < n; i++ {
  17.                         pd := (*pollDesc)(add(mem, i*pdSize))
  18.                         pd.link = c.first
  19.                         c.first = pd
  20.                 }
  21.         }
  22.         pd := c.first
  23.         c.first = pd.link
  24.         lockInit(&pd.lock, lockRankPollDesc)
  25.         unlock(&c.lock)
  26.         return pd
  27. }
复制代码
  1. //free用于将一个pollDesc放回pollCache
  2. func (c *pollCache) free(pd *pollDesc) {
  3.         //...
  4.         lock(&c.lock)
  5.         pd.link = c.first
  6.         c.first = pd
  7.         unlock(&c.lock)
  8. }
复制代码
2.2、netpoll框架宏观流程


在宏观的角度下,netpoll框架主要涉及了以下的几个流程:

  • poll_init:底层调用epoll_create指令,在内核态中开辟epoll变乱表。
  • poll_open:先构造一个pollDesc实例,然后通过epoll_ctl(ADD)指令,向内核中添加要监听的socket,并将这一个fd绑定在pollDesc中。pollDesc含有状态标识器rg/wg,用于标识变乱状态以及存储阻塞的g。
  • poll_wait:当g依靠的变乱未就绪时,调用gopark方法,将g置为阻塞态存放在pollDesc中。
  • net_poll:GMP调度器会轮询netpoll流程,通常会用非阻塞的方式发起epoll_wait指令,取出就绪的pollDesc,提前出其内部陷入阻塞态的g然后将其重新添加到GMP的调度队列中。(以及在sysmon流程和gc流程都会触发netpoll)
3、流程源码实现

3.1、流程入口

我们参考以下的简易TCP服务器实现框架,走进netpoll框架的具体源码实现。
  1. // 启动 tcp server 代码示例
  2. func main() {
  3.     //创建TCP端口监听器,涉及以下事件:
  4.     //1:创建socket fd,调用bind和accept系统接口函数
  5.     //2:调用epoll_create,创建eventpool
  6.     //3:调用epoll_ctl(ADD),将socket fd注册到epoll事件表
  7.         l, _ := net.Listen("tcp", ":8080")
  8.         // eventloop reactor 模型
  9.         for {
  10.         //等待TCP连接到达,涉及以下事件:
  11.         //1:循环+非阻塞调用accept
  12.         //2:若未就绪,则调用gopark进行阻塞
  13.         //3:等待netpoller轮询唤醒
  14.         //4:获取到conn fd后注册到eventpool
  15.         //5:返回conn
  16.                 conn, _ := l.Accept()
  17.                 // goroutine per conn
  18.                 go serve(conn)
  19.         }
  20. }
  21. // 处理一笔到来的 tcp 连接
  22. func serve(conn net.Conn) {
  23.     //关闭conn,从eventpool中移除fd
  24.         defer conn.Close()
  25.         var buf []byte
  26.     //读取conn中的数据,涉及以下事件:
  27.     //1:循环+非阻塞调用recv(read)
  28.     //2:若未就绪,通过gopark阻塞,等待netpoll轮询唤醒
  29.         _, _ = conn.Read(buf)
  30.     //向conn中写入数据,涉及以下事件:
  31.     //1:循环+非阻塞调用writev (write)
  32.     //2:若未就绪,通过gopark阻塞,等待netpoll轮询唤醒
  33.         _, _ = conn.Write(buf)
  34. }
复制代码
3.2、Socket创建

以net.Listen方法为入口,举行创建socket fd,调用的方法栈如下:
方法文件net.Listen()net/dial.gonet.ListenConfig.Listen()net/dial.gonet.sysListener.listenTCP()net/tcpsock_posix.gonet.internetSocket()net/ipsock_posix.gonet.socket()net/sock_posix.go焦点的调用在net.socket()方法内,源码焦点流程如下:
  1. func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) (fd *netFD, err error) {
  2.     //进行socket系统调用,创建一个socket
  3.         s, err := sysSocket(family, sotype, proto)
  4.     //绑定socket fd
  5.     fd, err = newFD(s, family, sotype, net);
  6.     //...
  7.    
  8.     //进行了以下事件:
  9.     //1、通过syscall bind指令绑定socket的监听地址
  10.     //2、通过syscall listen指令发起对socket的监听
  11.     //3、完成epollEvent表的创建(全局执行一次)
  12.     //4、将socket fd注册到epoll事件表中,监听读写就绪事件
  13.     err := fd.listenStream(ctx, laddr, listenerBacklog(), ctrlCtxFn);
  14. }
复制代码
首先先执行了sysSocket系统调用,创建一个socket,它是一个整数值,用于标识操作系统中打开的文件或网络套接字;接着调用newFD方法包装成netFD对象,以便实现更高效的异步 IO 和 Goroutine 调度。
3.3、poll_init

紧接3.2中的net.socket方法,在内部还调用了net.netFD.listenStream(),poll_init的调用栈如下:
方法文件net.netFD.listenStream()net/sock_posix.gonet.netFD.init()net/fd_unix.gopoll.FD.init()internal/poll/fd_unix.gopoll.pollDesc.init()internal/poll/fd_poll_runtime.goruntime.poll_runtime_pollServerInit()runtime/netpoll.goruntime.netpollinit()runtime/netpoll_epoll.gonet.netFD.listenStream()焦点步骤如下:
  1. func (fd *netFD) listenStream(ctx context.Context, laddr sockaddr, backlog int, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) error {
  2.         //....
  3.        
  4.     //通过Bind系统调用绑定监听地址
  5.         if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
  6.                 return os.NewSyscallError("bind", err)
  7.         }
  8.     //通过Listen系统调用对socket进行监听
  9.         if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
  10.                 return os.NewSyscallError("listen", err)
  11.         }
  12.     //fd.init()进行了以下操作:
  13.     //1、完成eventPool的创建
  14.     //2、将socket fd注册到epoll事件表中
  15.         if err = fd.init(); err != nil {
  16.                 return err
  17.         }
  18.         //...
  19.         return nil
  20. }
复制代码

  • 使用Bind系统调用绑定必要监听的所在
  • 使用Listen系统调用监听socket
  • 调用fd.init完成eventpool的创建以及fd的注册
net.netFD.init()方法在内部转而调用poll.FD.init()
  1. func (fd *netFD) init() error {
  2.         return fd.pfd.Init(fd.net, true)
  3. }
  4. func (fd *FD) Init(net string, pollable bool) error {
  5.         fd.SysFile.init()
  6.         // We don't actually care about the various network types.
  7.         if net == "file" {
  8.                 fd.isFile = true
  9.         }
  10.         if !pollable {
  11.                 fd.isBlocking = 1
  12.                 return nil
  13.         }
  14.         err := fd.pd.init(fd)
  15.         if err != nil {
  16.                 // If we could not initialize the runtime poller,
  17.                 // assume we are using blocking mode.
  18.                 fd.isBlocking = 1
  19.         }
  20.         return err
  21. }
复制代码
然后又转入到poll.pollDesc.init()的调用中。
  1. func (pd *pollDesc) init(fd *FD) error {
  2.     //通过sysOnce结构,完成epoll事件表的唯一一次创建
  3.         serverInit.Do(runtime_pollServerInit)
  4.     //完成init后,进行poll_open
  5.     ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
  6.         //...
  7.     //绑定里层的pollDesc实例
  8.     pd.runtimeCtx = ctx
  9.         return nil
  10. }
复制代码
这里的poll.pollDesc是表层pollDesc,表层pd的init是poll_init和poll_open流程的入口:

  • 执行serverInit.Do(runtime_pollServerInit),其中serverInit是名为sysOnce的特别结构,它会包管执行的方法在全局只会被执行一次,然后执行runtime_pollServerInit,完成poll_init操作
  • 完成poll_init后,调用runtime_pollOpen(uintptr(fd.Sysfd))将fd参加到eventpool中,完成poll_open操作
  • 绑定里层的pollDesc实例
我们先来关注serverInit.Do(runtime_pollServerInit)中,执行的runtime_pollServerInit方法,它定位在runtime/netpoll.go下:
  1. //go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
  2. func poll_runtime_pollServerInit() {
  3.         netpollGenericInit()
  4. }
  5. func netpollGenericInit() {
  6.         if netpollInited.Load() == 0 {
  7.                 lockInit(&netpollInitLock, lockRankNetpollInit)
  8.                 lock(&netpollInitLock)
  9.                 if netpollInited.Load() == 0 {
  10.             //进入netpollinit调用
  11.                         netpollinit()
  12.                         netpollInited.Store(1)
  13.                 }
  14.                 unlock(&netpollInitLock)
  15.         }
  16. }
复制代码
  1. func netpollinit() {
  2.         var errno uintptr
  3.     //进行epollcreate系统调用,创建epoll事件表
  4.         epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
  5.         //...
  6.     //创建pipe管道,接收信号,如程序终止:
  7.     //r:信号接收端,会注册对应的read事件到epoll事件表中
  8.     //w:信号发送端,有信号到达的时候,会往w发送信号,并对r产生读就绪事件
  9.         r, w, errpipe := nonblockingPipe()
  10.         //...
  11.     //在epollEvent中注册监听r的读就绪事件
  12.         ev := syscall.EpollEvent{
  13.                 Events: syscall.EPOLLIN,
  14.         }
  15.         *(**uintptr)(unsafe.Pointer(&ev.Data)) = &netpollBreakRd
  16.         errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, r, &ev)
  17.         //...
  18.     //使用全局变量缓存pipe的读写端
  19.         netpollBreakRd = uintptr(r)
  20.         netpollBreakWr = uintptr(w)
  21. }
复制代码
在netpollinit()方法内部,举行了以下操作:

  • 执行epoll_create指令创建了epoll变乱表,并返回epoll文件描述符epfd。
  • 创建了两个pipe管道,当向w端写入信号的时候,r端会发生读就绪变乱。
  • 注册监听r的读就绪变乱。
  • 缓存管道。
在这里,我们创建了两个管道r以及w,并且在eventpool中注册了r的读就绪变乱的监听,当我们向w管道写入数据的时候,r管道就会产生读就绪变乱,从而冲破阻塞的epoll_wait操作,进而执行其他的操作。
3.3、poll_open

方法文件net.netFD.listenStream()net/sock_posix.gonet.netFD.init()net/fd_unix.gopoll.FD.init()internal/poll/fd_unix.gopoll.pollDesc.init()internal/poll/fd_poll_runtime.goruntime.poll_runtime_pollOpen()runtime/netpoll.goruntime.netpollopenruntime/netpoll_epoll.go在poll.pollDesc.init()方法中,完成了poll_init流程后,就会进入到poll_open流程,执行runtime.poll_runtime_pollOpen()。
  1. //go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
  2. func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
  3.     //获取一个pollDesc实例
  4.         pd := pollcache.alloc()
  5.         lock(&pd.lock)
  6.         wg := pd.wg.Load()
  7.         if wg != pdNil && wg != pdReady {
  8.                 throw("runtime: blocked write on free polldesc")
  9.         }
  10.         rg := pd.rg.Load()
  11.         if rg != pdNil && rg != pdReady {
  12.                 throw("runtime: blocked read on free polldesc")
  13.         }
  14.     //绑定socket fd到pollDesc中
  15.         pd.fd = fd
  16.         //...
  17.     //初始化读写状态标识器为无状态
  18.         pd.rg.Store(pdNil)
  19.         pd.wg.Store(pdNil)
  20.         //...
  21.         unlock(&pd.lock)
  22.        
  23.     //将fd添加进epoll事件表中
  24.         errno := netpollopen(fd, pd)
  25.         //...
  26.     //返回pollDesc实例
  27.         return pd, 0
  28. }
复制代码
  1. func netpollopen(fd uintptr, pd *pollDesc) uintptr {
  2.         var ev syscall.EpollEvent
  3.     //通过epollctl操作,在EpollEvent中注册针对fd的监听事件
  4.     //操作类型宏指令:EPOLL_CTL_ADD——添加fd并注册监听事件
  5.     //事件类型:epollevent.events:
  6.     //1、EPOLLIN:监听读就绪事件
  7.     //2、EPOLLOUT:监听写就绪事件
  8.     //3、EPOLLRDHUP:监听中断事件
  9.     //4、EPOLLET:使用边缘触发模式
  10.         ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET
  11.         tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
  12.         *(*taggedPointer)(unsafe.Pointer(&ev.Data)) = tp
  13.         return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
  14. }
复制代码
不仅在net.Listen()流程中会触发poll open,在net.Listener.Accept流程中也会,当我们获取到了连接之后,也必要为这个连接封装成一个pollDesc实例,然后执行poll_open流程将其注册到epoll变乱表中。
  1. func (fd *netFD) accept()(netfd *netFD, err error){
  2.     // 通过 syscall accept 接收到来的 conn fd
  3.     d, rsa, errcall, err := fd.pfd.Accept()
  4.     // ...
  5.     // 封装到来的 conn fd
  6.     netfd, err = newFD(d, fd.family, fd.sotype, fd.net)
  7.     // 将 conn fd 注册到 epoll 事件表中
  8.     err = netfd.init()
  9.     // ...
  10.     return netfd,nil
  11. }
复制代码
3.4、poll_close

当连接conn必要关闭的时候,终极会进入到poll_close流程,执行epoll_ctl(DELETE)删除对应的fd。
方法文件net.conn.Closenet/net.gonet.netFD.Closenet/fd_posix.gopoll.FD.Closeinternal/poll/fd_unix.gopoll.FD.decrefinternal/poll/fd_mutex.gopoll.FD.destroyinternal/poll/fd_unix.gopoll.pollDesc.closeinternal/poll/fd_poll_runtime.gopoll.runtime_pollCloseinternal/poll/fd_poll_runtime.goruntime.poll_runtime_pollCloseruntime/netpoll.goruntime.netpollcloseruntime/netpoll_epoll.gosyscall.EpollCtlruntime/netpoll_epoll.go
  1. //go:linkname poll_runtime_pollClose internal/poll.runtime_pollClose
  2. func poll_runtime_pollClose(pd *pollDesc) {
  3.         if !pd.closing {
  4.                 throw("runtime: close polldesc w/o unblock")
  5.         }
  6.         wg := pd.wg.Load()
  7.         if wg != pdNil && wg != pdReady {
  8.                 throw("runtime: blocked write on closing polldesc")
  9.         }
  10.         rg := pd.rg.Load()
  11.         if rg != pdNil && rg != pdReady {
  12.                 throw("runtime: blocked read on closing polldesc")
  13.         }
  14.         netpollclose(pd.fd)
  15.         pollcache.free(pd)
  16. }
复制代码
  1. func netpollclose(fd uintptr) uintptr {
  2.         var ev syscall.EpollEvent
  3.         return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_DEL, int32(fd), &ev)
  4. }
复制代码
3.5、poll_wait

poll_wait流程终极会执行gopark将g陷入到用户态阻塞
方法文件poll.pollDesc.waitinternal/poll/fd_poll_runtime.gopoll.runtime_pollWaitinternal/poll/fd_poll_runtime.goruntime.poll_runtime_pollWaitruntime/netpoll.goruntime.netpollblockruntime/netpoll.goruntime.goparkruntime/proc.goruntime.netpollblockcommitruntime/netpoll.go在表层pollDesc中,会通过其内部的里层pollDesc指针,调用到runtime下的netpollblock方法。
  1. /*
  2.     针对某个 pollDesc 实例,监听指定的mode 就绪事件
  3.         - 返回true——已就绪  返回false——因超时或者关闭导致中断
  4.         - 其他情况下,会通过 gopark 操作将当前g 阻塞在该方法中
  5. */
  6. func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
  7.     //针对mode事件,获取相应的状态
  8.         gpp := &pd.rg
  9.         if mode == 'w' {
  10.                 gpp = &pd.wg
  11.         }
  12.         for {
  13.                 //关心的io事件就绪,直接返回
  14.                 if gpp.CompareAndSwap(pdReady, pdNil) {
  15.                         return true
  16.                 }
  17.         //关心的io事件未就绪,则置为等待状态,G将要被阻塞
  18.                 if gpp.CompareAndSwap(pdNil, pdWait) {
  19.                         break
  20.                 }
  21.                 //...
  22.         }
  23.        
  24.         //...
  25.     //将G置为阻塞态
  26.                 gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceBlockNet, 5)
  27.     //当前g从阻塞态被唤醒,重置标识器
  28.     old := gpp.Swap(pdNil)
  29.         if old > pdWait {
  30.                 throw("runtime: corrupted polldesc")
  31.         }
  32.     //判断是否是因为所关心的事件触发而唤醒
  33.         return old == pdReady
  34. }
复制代码
在gopark方法中,会闭包调用netpollblockcommit方法,其中会根据g关心的变乱范例,将其实例存储到pollDesc的rg或wg容器中。
  1. // 将 gpp 状态标识器的值由 pdWait 修改为当前 g
  2. func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
  3.         r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
  4.         if r {
  5.                 //增加等待轮询器的例程计数。
  6.                 //调度器使用它来决定是否阻塞
  7.                 //如果没有其他事情可做,则等待轮询器。
  8.                 netpollAdjustWaiters(1)
  9.         }
  10.         return r
  11. }
复制代码
接着我们来关注何时会触发poll_wait流程。
首先是在listener.Accept流程中,如果当前尚未有连接到达,则执行poll wait将当前g阻塞挂载在该socket fd对应pollDesc的rg中。
  1. // Accept wraps the accept network call.
  2. func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
  3.         //...
  4.         for {
  5.         //以非阻塞模式发起一次accept,尝试接收conn
  6.                 s, rsa, errcall, err := accept(fd.Sysfd)
  7.                 if err == nil {
  8.                         return s, rsa, "", err
  9.                 }
  10.                 switch err {
  11.             //忽略中断类错误
  12.                 case syscall.EINTR:
  13.                         continue
  14.             //尚未有到达的conn
  15.                 case syscall.EAGAIN:
  16.             //进入poll_wait流程,监听fd的读就绪事件,当有conn到达表现为fd可读。
  17.                         if fd.pd.pollable() {
  18.                 //假如读操作未就绪,当前g会被阻塞在方法内部,直到因为超时或者就绪被netpoll ready唤醒。
  19.                                 if err = fd.pd.waitRead(fd.isFile); err == nil {
  20.                                         continue
  21.                                 }
  22.                         }
  23.                 //...
  24.         }
  25. }
复制代码
  1. // 指定 mode 为 r 标识等待的是读就绪事件,然后走入更底层的 poll_wait 流程
  2. func (pd *pollDesc) waitRead(isFile bool) error {
  3.     return pd.wait('r', isFile)
  4. }
复制代码
其次分别是在conn.Read/conn.Write流程中,假若conn fd下读操作未就绪(无数据到达)/写操作未就绪(缓冲区空间不足),则会执行poll wait将g阻塞并挂载在对应的pollDesc中的rg/wg中。
  1. func (fd *FD) Read(p []byte) (int, error) {
  2.         //...
  3.         for {
  4.         //非阻塞模式进行一次read调用
  5.                 n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
  6.                 if err != nil {
  7.                         n = 0
  8.             //进入poll_wait流程,并标识关心读就绪事件
  9.                         if err == syscall.EAGAIN && fd.pd.pollable() {
  10.                                 if err = fd.pd.waitRead(fd.isFile); err == nil {
  11.                                         continue
  12.                                 }
  13.                         }
  14.                 }
  15.                 err = fd.eofError(n, err)
  16.                 return n, err
  17.         }
  18. }
复制代码
  1. func (fd *FD)Write(p []byte)(int,error){
  2.     // ...
  3.     for{
  4.     // ...
  5.     // 以非阻塞模式执行一次syscall write操作
  6.         n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])
  7.         if n >0{
  8.             nn += n
  9.         }
  10.         // 缓冲区内容都已写完,直接退出
  11.         if nn ==len(p){
  12.             return nn, err
  13.         }
  14.     // 走入 poll_wait 流程,并标识关心的是该 fd 的写就绪事件
  15.     if err == syscall.EAGAIN && fd.pd.pollable(){
  16.         // 倘若写操作未就绪,当前g 会 park 阻塞在该方法内部,直到因超时或者事件就绪而被 netpoll ready 唤醒
  17.         if err = fd.pd.waitWrite(fd.isFile); err ==nil{
  18.             continue
  19.         }
  20.     }
  21.     // ...  
  22.    
  23. }
复制代码
3.6、net_poll

netpoll流程至关重要,它会在底层调用系统的epoll_wait操作,找到触发变乱的fd,然后再逆向找到绑定fd的pollDesc实例,返回内部阻塞的g叫给上游处理唤醒。其调用栈如下:
方法文件runtime.netpollruntime/netpoll_epoll.goruntime.netpollreadyruntime/netpoll.goruntime.netpollunblockruntime/netpoll.gonetpoll具体的源码如下:
  1. //netpoll用于轮询检查是否有就绪的io事件
  2. //若发现了就绪的io事件,检查是否有pollDesc中的g关心其事件
  3. //若找到了关心其io事件就绪的g,添加到list返回给上游处理
  4. func netpoll(delay int64) (gList, int32) {
  5.         if epfd == -1 {
  6.                 return gList{}, 0
  7.         }
  8.         var waitms int32
  9.     //根据传入的delay参数,决定调用epoll_wait的模式:
  10.     //delay < 0:设为阻塞模式(在 gmp 调度流程中,如果某个 p 迟迟获取不到可执行的 g 时,会通过该模式,使得 thread 陷入阻塞态,但该情况全局最多仅有一例)
  11.     //delay = 0:设为非阻塞模式(通常情况下为此模式,包括 gmp 常规调度流程、gc 以及全局监控线程 sysmon 都是以此模式触发的 netpoll 流程)
  12.     //delay > 0:设为超时模式(在 gmp 调度流程中,如果某个 p 迟迟获取不到可执行的 g 时,并且通过 timer 启动了定时任务时,会令 thread 以超时模式执行 epoll_wait 操作)
  13.         if delay < 0 {
  14.                 waitms = -1
  15.         } else if delay == 0 {
  16.                 waitms = 0
  17.         } else if delay < 1e6 {
  18.                 waitms = 1
  19.         } else if delay < 1e15 {
  20.                 waitms = int32(delay / 1e6)
  21.         } else {
  22.                 waitms = 1e9
  23.         }
  24.     //最多接收128个io就绪事件
  25.         var events [128]syscall.EpollEvent
  26. retry:
  27.     //以指定模式调用epoll_wait
  28.         n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
  29.         //...
  30.     //存储关心io事件就绪的G实例
  31.         var toRun gList
  32.         delta := int32(0)
  33.     //遍历返回的就绪事件
  34.         for i := int32(0); i < n; i++ {
  35.                 ev := events[i]
  36.                 if ev.Events == 0 {
  37.                         continue
  38.                 }
  39.                 //pipe接收端的信号处理,检查是否需要退出netpoll
  40.                 if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollBreakRd {
  41.                         if ev.Events != syscall.EPOLLIN {
  42.                                 println("runtime: netpoll: break fd ready for", ev.Events)
  43.                                 throw("runtime: netpoll: break fd ready for something unexpected")
  44.                         }
  45.                 //...
  46.                         continue
  47.                 }
  48.                 var mode int32
  49.         //记录io就绪事件的类型
  50.                 if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
  51.                         mode += 'r'
  52.                 }
  53.                 if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
  54.                         mode += 'w'
  55.                 }
  56.         // 根据 epollevent.data 获取到监听了该事件的 pollDesc 实例
  57.                 if mode != 0 {
  58.                         tp := *(*taggedPointer)(unsafe.Pointer(&ev.Data))
  59.                         pd := (*pollDesc)(tp.pointer())
  60.                         //...
  61.             //检查是否为G所关心的事件
  62.                                 delta += netpollready(&toRun, pd, mode)
  63.                        
  64.                 }
  65.         }
  66.         return toRun, delta
  67. }
复制代码
  1. func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {
  2.         delta := int32(0)
  3.         var rg, wg *g
  4.         if mode == 'r' || mode == 'r'+'w' {
  5.         //就绪事件包含读就绪,尝试唤醒pd内部的rg
  6.                 rg = netpollunblock(pd, 'r', true, &delta)
  7.         }
  8.         if mode == 'w' || mode == 'r'+'w' {
  9.         //就绪事件包含读就绪,尝试唤醒pd内部的wg
  10.                 wg = netpollunblock(pd, 'w', true, &delta)
  11.         }
  12.     //存在G实例,则加入list中
  13.         if rg != nil {
  14.                 toRun.push(rg)
  15.         }
  16.         if wg != nil {
  17.                 toRun.push(wg)
  18.         }
  19.         return delta
  20. }
复制代码
  1. func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g {
  2.     //获取存储的g实例
  3.         gpp := &pd.rg
  4.         if mode == 'w' {
  5.                 gpp = &pd.wg
  6.         }
  7.         for {
  8.                 old := gpp.Load()
  9.                 //...
  10.                 new := pdNil
  11.                 if ioready {
  12.                         new = pdReady
  13.                 }
  14.         //将gpp的值从g置换成pdReady
  15.                 if gpp.CompareAndSwap(old, new) {
  16.                         if old == pdWait {
  17.                                 old = pdNil
  18.                         } else if old != pdNil {
  19.                                 *delta -= 1
  20.                         }
  21.             //返回需要唤醒的g实例
  22.                         return (*g)(unsafe.Pointer(old))
  23.                 }
  24.         }
  25. }
复制代码
那么,我们也同样必要关注在哪个环节进入了net_poll流程。
首先,是在GMP调度器中的findRunnable方法中被调用,用于找到可执行的G实例。具体的实如今之前的GMP调度文章中有讲解,这里只关心涉及到net_poll方面的源码。
findRunnable方法定位在runtime/proc.go中
  1. func findRunnable()(gp *g, inheritTime, tryWakeP bool){
  2.     // ..
  3.     /*
  4.         同时满足下述三个条件,发起一次【非阻塞模式】的 netpoll 流程:
  5.             - epoll事件表初始化过
  6.             - 有 g 在等待io 就绪事件
  7.             - 没有空闲 p 在以【阻塞或超时】模式发起 netpoll 流程
  8.     */
  9.     if netpollinited()&& atomic.Load(&netpollWaiters)>0&& atomic.Load64(&sched.lastpoll)!=0{
  10.         // 以非阻塞模式发起一轮 netpoll,如果有 g 需要唤醒,一一唤醒之,并返回首个 g 给上层进行调度
  11.         if list := netpoll(0);!list.empty(){// non-blocking
  12.             // 获取就绪 g 队列中的首个 g
  13.             gp := list.pop()
  14.             // 将就绪 g 队列中其余 g 一一置为就绪态,并添加到全局队列
  15.             injectglist(&list)
  16.             // 把首个g 也置为就绪态
  17.             casgstatus(gp,_Gwaiting,_Grunnable)
  18.             // ...   
  19.             //返回 g 给当前 p进行调度
  20.             return gp,false,false
  21.         }
  22.     }
  23.     // ...
  24.     /*
  25.         同时满足下述三个条件,发起一次【阻塞或超时模式】的 netpoll 流程:
  26.             - epoll事件表初始化过
  27.             - 有 g 在等待io 就绪事件
  28.             - 没有空闲 p 在以【阻塞或超时】模式发起 netpoll 流程
  29.     */
  30.     if netpollinited()&&(atomic.Load(&netpollWaiters)>0|| pollUntil !=0)&& atomic.Xchg64(&sched.lastpoll,0)!=0{
  31.     // 默认为阻塞模式  
  32.         delay :=int64(-1)
  33.         // 存在定时时间,则设为超时模式
  34.         if pollUntil !=0{
  35.             delay = pollUntil - now
  36.         // ...   
  37.         }
  38.         // 以【阻塞或超时模式】发起一轮 netpoll
  39.         list := netpoll(delay)// block until new work is available
  40.     }
  41.     // ...   
  42. }
复制代码
其次,是位于同文件下的sysmon方法中,它会被一个全局监控者G执行,每隔10ms发一次非阻塞的net_poll流程。
  1. // The main goroutine.
  2. func main(){
  3. // ...
  4. // 新建一个 m,直接运行 sysmon 函数
  5.     systemstack(func(){
  6.         newm(sysmon,nil,-1)
  7.     })
  8.     // ...
  9. }
  10. // 全局唯一监控线程的执行函数
  11. func sysmon(){
  12. // ...
  13. for{
  14. // ...
  15. /*
  16.         同时满足下述三个条件,发起一次【非阻塞模式】的 netpoll 流程:
  17.             - epoll事件表初始化过
  18.             - 没有空闲 p 在以【阻塞或超时】模式发起 netpoll 流程
  19.             - 距离上一次发起 netpoll 流程的时间间隔已超过 10 ms
  20.     */
  21.         lastpoll :=int64(atomic.Load64(&sched.lastpoll))
  22.         if netpollinited()&& lastpoll !=0&& lastpoll+10*1000*1000< now {
  23.             // 以非阻塞模式发起 netpoll
  24.             list := netpoll(0)// non-blocking - returns list of goroutines
  25.             // 获取到的  g 置为就绪态并添加到全局队列中
  26.             if!list.empty(){
  27.                 // ...
  28.                 injectglist(&list)
  29.                 // ...
  30.             }
  31.         }
  32.     // ...  
  33.     }
  34. }
复制代码
末了,还会发生在GC流程中。
  1. func pollWork() bool{
  2.     // ...
  3.     // 若全局队列或 p 的本地队列非空,则提前返回
  4.     /*
  5.         同时满足下述三个条件,发起一次【非阻塞模式】的 netpoll 流程:
  6.             - epoll事件表初始化过
  7.             - 有 g 在等待io 就绪事件
  8.             - 没有空闲 p 在以【阻塞或超时】模式发起 netpoll 流程
  9.     */
  10.     if netpollinited()&& atomic.Load(&netpollWaiters)>0&& sched.lastpoll !=0{
  11.     // 所有取得 g 更新为就绪态并添加到全局队列
  12.         if list := netpoll(0);!list.empty(){
  13.             injectglist(&list)
  14.             return true
  15.         }
  16.     }
  17.     // ...
  18. }
复制代码
4、参考博文

感谢观看,本篇博文参考了小徐先生的文章,非常保举大家去观看并且进入到源码中学习,链接如下:
万字解析 golang netpoll 底层原理

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

没腿的鸟

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表