Golang的GMP调度模型与源码剖析

打印 上一主题 下一主题

主题 844|帖子 844|积分 2532

0、引言

我们知道,这当代操作系统中,多线程和多进程模型被广泛的使用以进步系统的并发效率。随着互联网不断的发展,面对如今的高并发场景,为每个使命都创建一个线程是不现实的,使用线程则需要系统不断的在用户态和内核态之间不断的切换,引起不必要的损耗,于是引入了协程。协程存在于用户空间,是一种轻量级的并发执行单元,其创建和上下文的开销更小,如何管理数目众多的协程是一个重要的话题。此篇笔记用于分享笔者学习Go语言协程调度的GMP模型的理解,以及源码的实现。当前使用的Go语言版本为1.22.4。
本篇笔记参考了以下文章:
[Golang三关-典藏版] Golang 调度器 GMP 原理与调度全分析 | Go 技术论坛
Golang GMP 原理
Golang-gopark函数和goready函数原理分析
1、GMP模型拆解

Goroutine调度器的工作是将预备运行的goroutine分配到工作线程上,涉及到的主要概念如下:
1.1、G

G代表的是Goroutine,是Go语言对协程概念的抽象,其有以下的特点:

  • 是一个轻量级的线程
  • 拥有自己的栈、状态、以及执行的使命函数
  • 每一个G会被分配到一个可用的P,并且在M上运行
其结构界说位于runtime/runtime2.go中:
  1. type g struct {
  2.     // ...
  3.     m         *m      
  4.     // ...
  5.     sched     gobuf
  6.     // ...
  7. }
  8. type gobuf struct {
  9.     sp   uintptr
  10.     pc   uintptr
  11.     ret  uintptr
  12.     bp   uintptr // for framepointer-enabled architectures
  13. }
复制代码
在这里,我们焦点关注其内嵌了一个m和一个gobuf类型的sched。gobuf主要用于Gorutine的上下文切换,其保存了G执行过程中的CPU寄存器的状态,使得G在停息、调度和恢复运行时能够正确地恢复上下文。
G主要有以下几种状态:
  1. const (
  2.         _Gidle = iota // 0
  3.         _Grunnable // 1
  4.         _Grunning // 2
  5.         _Gsyscall // 3
  6.         _Gwaiting // 4
  7.     //...
  8.         _Gdead // 6
  9.     //...
  10.         _Gcopystack // 8
  11.     _Gpreempted // 9
  12.         //...
  13. )
复制代码

  • Gidle:表示这个G刚刚被分配,尚未初始化。
  • Grunnable:表示这个G在运行队列中,它当前不再执行用户代码,栈未被占用。
  • Grunning:表示这个G大概在执行用户代码,栈被这个G占用,它不在运行队列中,并且它被分配给了一个M和一个P(g.m和g.m.p是有效的)。
  • Gsyscall:表示这个G正在执行系统调用,它不在执行用户代码,栈被这个G占用。它不在运行队列中,并且它被分配给了一个M。
  • Gwaiting:表示这G被堵塞在运行时,它没有执行用户代码,也不在运行队列中,但是它应该被记录在某个地方,以便在必要时将其叫醒。(ready())gc、channel 通讯或者锁操作时经常会进入这种状态。
  • Gdead:表示这个G当前未使用,它大概是刚被初始化,也大概是已经被销毁。
  • Gcopystack:表示这个G的栈正在被移动。
  • Gpreempted:表示这个G因抢占而被挂起,且该G自行停止,等待进一步的恢复。它雷同于Gwaiting,但是Gpreempted还没有一个负责将其状态恢复的管理者,只有某个suspendG操作将该G的状态从Gpreempted转换为Gwaiting,这样调度器才会接管这个G。
在阅读有关调度逻辑的源码的时间,我们可以通过搜刮casgstatus方法去定位到使得G状态改变的函数,比方:casgstatus(gp, _Grunning, _Gsyscall)表示将该G的状态从Grunning变动到Gsyscall,就可以找到对应的函数学习了。

1.2、M

M是Machine,也是Worker Thread,代表的是操作系统的线程。Go运行时在需要时创建或者销毁M,将G安排到M上执行,充分使用多核CPU的能力。其具有以下的特点:

  • M是Go与操作系统之间的桥梁,它负责执行分配给它的G。
  • M的数目会根据系统资源进行调解。
  • M大概会被特定的G通过LockOSThread锁定,这种G和M的绑定确保了特定Goroutine可以持续使用同一个线程。
结构界说如下:
  1. type m struct{
  2.         g0      *g     // goroutine with scheduling stack
  3.         curg          *g       // current running goroutine
  4.         tls           [tlsSlots]uintptr // thread-local storage (for x86 extern register)
  5.         p             puintptr // attached p for executing go code (nil if not executing go code)
  6.         oldp          puintptr // the p that was attached before executing a syscall
  7.         //...
  8. }
复制代码
每一个M结构体都会有一个名为g0的G,它是一个特别的Goroutine,它并不复杂执行用户的代码,而是负责调度G。g0会分配G绑定到M中执行。tls表示的是“Local Thread Storage”,其存储了与当火线程相关的特定信息,而tls数组的第一个槽位通常用于存储g0的栈指针。
M存在一个状态,名为“自旋态”,处在自旋态的M会不断的往全局队列中寻找可运行的G去执行,并且排除自旋态。
1.3、P

P是Processor,代表逻辑处理器,是Goroutine调度的虚拟概念。每个P负责分配执行Goroutine的资源,其具有以下的特点:

  • P是G的执行上下文,它具有一个本地队列存储着G,以及对应的使命调度机制,负责在M上执行一个详细的G。
  • P的数目由情况变量GOMAXPROCS决定,如果其数目大于CPU的物理线程数目时就没有更多的意义了。
  • P是去执行Go代码所必备的资源,M必须绑定了一个P才能去执行Go代码。但是M可以在没有绑定P的情况下执行系统调用或者被阻塞。
  1. type p struct {
  2.         status      uint32
  3.         runqhead uint32
  4.         runqtail uint32
  5.         runq     [256]guintptr
  6.         m           muintptr
  7.         runnext guintptr
  8.         //...
  9. }
复制代码

  • runq存储了这个P具有的goroutine队列,最大长度为256
  • runqhead和runqtail分别指向队列的头部和尾部
  • runnext存储了下一个可执行的goroutine
P也含有几个状态,如下:
  1. const (
  2.         _Pidle = iota
  3.         _Prunning
  4.         _Psyscall
  5.         _Pgcstop
  6.         _Pdead
  7. )
复制代码

  • Pidle:表示P没有被运行用户代码或者调度器,通常这个P在空闲P列表中,供调度器使用,但它也大概在其他状态之间转换。P由空闲队列idle list或者其他转换其状态的对象拥有,它的runq是空的。
  • Prunning:表示P被M拥有,并且正在运行用户代码或者调度器。只有拥有此P的M被允许更改P的状态,M可以将P转换为Pidle(当没有工作的时间)、Psyscall(当进入一个系统调用时)、Pgcstop(安顿垃圾回收时)。M还可以将P的所有权交接给另一个M(比方调度一个locked的G)
  • Psyscall:表示P没有在运行用户代码,与在系统调用中的M相关但不被其拥有。处于Psyscall状态的P大概会被其他M抢走。将P转换给另一个M是轻量级的,并且P会保持和原始的M的关联性。
  • Pgcstop:表示P被停息以进行STW(Stop The World)(执行垃圾回收)。
  • Pdead:表示P不再被使用(GOMAXPROCS镌汰)。死去的P将会被剥夺资源,但是任然会保留少量的资源比方Trace Buffer,用于后续的跟踪分析需求。
1.4、Schedt

schedt是全局goroutine队列的封装
  1. type schedt struct {
  2.     // ...
  3.     lock mutex
  4.     // ...
  5.     runq     gQueue
  6.     runqsize int32![](https://img2024.cnblogs.com/blog/3542244/202411/3542244-20241117153220788-1594654379.png)
  7.     // ...
  8. }
复制代码

  • lock:是操作全局队列的锁
  • runq:存储G的队列
  • runqsize:全局G队列的容量
2、调度模型的工作流程

我们可以用下图来整体的表示该调度模型的流程:
在接下来的部分,我们将主要探讨GMP调度模型是怎么完成一轮调度的,即是如何完成g0到g再到g0的切换的,期间大致发生了什么。
2.1、G的状态转换

我们刚刚提及到,每一个M都有一个名为g0的Goroutine,去负责调度普通的g绑定到M上执行。g0和普通的g之间存在一个转换,当执行普通的g上的代码的时间,就会将执行权交给g,当g执行完代码或者因为原因需要被挂起、退出执行等,就会重新将执行权交给g0。
g0和P是一个协作的关系,P的队列决定了哪些goroutine可以在绑定P时被调用,而g0是执行调度逻辑的关键的goroutine,负责在必要时释放P的资源。
当g0需要将执行权交给g时,会调用一个名为gogo的方法,传入g的栈指针,去执行用户的代码。
  1. func gogo(buf *gobuf)
复制代码
当需要重新将执行权转交给g0时,都会执行一个名为mcall的方法。
  1. func mcall(fn func(*g))
复制代码
mcall在go需要进行协程调换时被调用,它传入一个回调函数fn,里面携带了当前正在运行的g的指针,它主要做了以下三点的工作:

  • 保存当前g的信息,即将PC/SP的信息存储到g->sched中,包管后续可以恢复g的执行现场。
  • 将当前M的堆栈从g切换到g0
  • 在g0的栈上执行新的函数fn,通常在fn中会进一步安排g的去向,并且调用schedule函数,让当前M去寻找另一个可以执行的G。

2.2、调度类型

我们如今知道了,g和g0是通过什么函数进行状态切换的。接下来我们就要来探讨,它们是什么情况下要进行切换,即调度计谋有什么。
GMP调度模型一共有4种调度计谋,分别为:主动调度被动调度正常调度抢占调度


  • 主动调度:提供给用户的方法,当用户调用了runtime.Gosched()方法时,此时当前的g会让出执行权,将g安排进使命队列等待下一次被调度。
  • 被动调度:当因不满足某种执行条件,通常为channel读写条件不满足时,会执行gopark()函数,此时的g将会被置为等待状态。
  • 正常调度:g正常的执行完毕,转接执行权。
  • 抢占调度:存在一个全局监控者moniter,它会每隔一段时间周期去查抄是否有G运行太长时间,若发现了,将会通知P去进行和M的解绑,让出P。这里需要全局监控者的存在是因为当G进入到系统调用的时间,这个线程M会陷入僵持,无法主动去查抄,需要外助辅助。
2.3、宏观调度流程

接下来我们来关注整体一轮的调度流程,对于g0和g的一轮调度,可以用下图来表示。

schedule作为每一轮调度的开始,它会寻找到可以执行的G,然后调用execute将该g绑定到一个线程M上,然后执行gogo方法去真正的运行一个goroutine。当需要转换时,goroutine会在底层执行mcall方法,保存栈信息,然后执行回调函数fn,即绿框内的方法之一,将执行权重新交给g0。
2.3.1、schedule()

schedule()方法定位于runtime/proc中,忽略非主流程部分,源码内容如下:
  1. //找到一个是就绪态的G去运行
  2. func schedule() {
  3.         mp := getg().m
  4.         //...
  5. top:
  6.         pp := mp.p.ptr()
  7.         pp.preempt = false
  8.         //如果该M在自旋,但是队列含有G,那么抛出异常。
  9.         if mp.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
  10.                 throw("schedule: spinning with local work")
  11.         }
  12.         gp, inheritTime, tryWakeP := findRunnable() //阻塞的寻找G
  13.        
  14.     //...
  15.         //当前M将要运转一个G,解除自旋状态
  16.         if mp.spinning {
  17.                 resetspinning()
  18.         }
  19.         //...
  20.         execute(gp, inheritTime)
  21. }
复制代码
该方法主要是寻找一个可以运行的G,交给该线程去运行。我们在一开始提到,线程会存在一种名为“自旋态”的状态,它会不断的自旋去寻找可以执行的G来执行,成功找到了就排除了自旋态。
这里存在一个点我们值得去注意,处在自旋态的线程它不是在空占用计算资源吗?那么不就是降低了系统的性能吗?
实在这是一个中和的计谋,如果每次当出现了一个新的Goroutine需要去执行的时间,我们才创建一个线程M去执行它,然后执行完了又删除掉不去复用,那么就会带来大量的创建销毁的资源斲丧。我们希望当有一个新的Goroutine来的时间,能立刻有一个M去执行它,就可以将空闲临时无使命处理的M去自己寻找Goroutine,镌汰了创建销毁的资源斲丧。但是我们也不能有太多的处于自旋态的线程,否则就造就另一个过多斲丧的地方了。
我们先跟进一下resetspinning(),看看其执行的计谋是什么。
1、resetspinning()
  1. func resetspinning() {
  2.         gp := getg()
  3.         //...
  4.         gp.m.spinning = false
  5.         nmspinning := sched.nmspinning.Add(-1)
  6.         //...
  7.         wakep()
  8. }
  9. //尝试添加一个P去执行G。该方法被调用当一个G状态为runnable时。
  10. func wakep() {
  11.     //如果自旋的M数量不为0则返回
  12.         if sched.nmspinning.Load() != 0 || !sched.nmspinning.CompareAndSwap(0, 1) {
  13.                 return
  14.         }
  15.         // 禁用抢占,直到 pp 的所有权转移到 startm 中的下一个 M,否则在这里的抢占将导致 pp 被卡在等待进入 _Pgcstop 状态。
  16.         mp := acquirem()
  17.         var pp *p
  18.         lock(&sched.lock)
  19.     //尝试从空闲P队列获取一个P
  20.         pp, _ = pidlegetSpinning(0)
  21.         if pp == nil {
  22.                 if sched.nmspinning.Add(-1) < 0 {
  23.                         throw("wakep: negative nmspinning")
  24.                 }
  25.                 unlock(&sched.lock)
  26.                 releasem(mp)
  27.                 return
  28.         }
  29.        
  30.         unlock(&sched.lock)
  31.         startm(pp, true, false)
  32.         releasem(mp)
  33. }
复制代码
在resetspinning中,我们先将当前M排除了自旋态,然后尝试去叫醒一个P,即进入到wakep()方法中。
  1. if sched.nmspinning.Load() != 0 || !sched.nmspinning.CompareAndSwap(0, 1) {
  2.                 return
  3.         }
复制代码
在wakep方法内,我们先查抄了当前处在自旋的M的数目,如果>0,则不再去叫醒一个新的P,这是为了防止同一时间内过多的自旋的M空运转斲丧CPU资源。
  1. pp, _ = pidlegetSpinning(0)
  2.         if pp == nil {
  3.                 if sched.nmspinning.Add(-1) < 0 {
  4.                         throw("wakep: negative nmspinning")
  5.                 }
  6.                 unlock(&sched.lock)
  7.                 releasem(mp)
  8.                 return
  9.         }
复制代码
接着会尝试从空闲P队列中获取一个P,如果没有空闲的P,那么此时会镌汰自旋线程的数目(这里只是镌汰了数目,但是详细这个处在自旋的线程接下来去做什么了我也没有明白)并且返回。
  1. startm(pp, true, false)
复制代码
如果获取了一个空闲的P,会为这一个P分配一个线程M。

2、findRunnable()

findRunnable是一轮调度流程中最焦点的方法,它用于找到一个可执行的G。
  1. func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
  2.         mp := getg().m
  3. top:
  4.     pp := mp.p.ptr()
  5.         //...
  6.        
  7.     //每61次调度周期就检查一次全局G队列,防止在特定情况只依赖于本地队列。
  8.         if pp.schedtick%61 == 0 && sched.runqsize > 0 {
  9.                 lock(&sched.lock)
  10.                 gp := globrunqget(pp, 1)
  11.                 unlock(&sched.lock)
  12.                 if gp != nil {
  13.                         return gp, false, false
  14.                 }
  15.         }
  16.     //...
  17.     // local runq
  18.         if gp, inheritTime := runqget(pp); gp != nil {
  19.                 return gp, inheritTime, false
  20.         }
  21.         // global runq
  22.         if sched.runqsize != 0 {
  23.                 lock(&sched.lock)
  24.                 gp := globrunqget(pp, 0)
  25.                 unlock(&sched.lock)
  26.                 if gp != nil {
  27.                         return gp, false, false
  28.                 }
  29.         }
  30.    
  31.     //在正式的去偷取G之前,用非阻塞的方式检查是否有就绪的网络协程,这是对netpoll的一个优化。
  32.         if netpollinited() && netpollAnyWaiters() && sched.lastpoll.Load() != 0 {
  33.                 if list, delta := netpoll(0); !list.empty() { // non-blocking
  34.                         gp := list.pop()
  35.                         injectglist(&list)
  36.                         netpollAdjustWaiters(delta)
  37.                         trace := traceAcquire()
  38.                         casgstatus(gp, _Gwaiting, _Grunnable)
  39.                         if trace.ok() {
  40.                                 trace.GoUnpark(gp, 0)
  41.                                 traceRelease(trace)
  42.                         }
  43.                         return gp, false, false
  44.                 }
  45.         }
  46.    
  47.     //如果当前的M出于自旋状态,或者说处于自旋状态的M的数量小于活跃的P数量的一半时,则进行G窃取。(防止当系统的并行度较低时,自旋的M过多占用CPU资源)
  48.         if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {
  49.                 if !mp.spinning {
  50.                         mp.becomeSpinning()
  51.                 }
  52.                 gp, inheritTime, tnow, w, newWork := stealWork(now)
  53.                 if gp != nil {
  54.                         // Successfully stole.
  55.                         return gp, inheritTime, false
  56.                 }
  57.                 if newWork {
  58.                         // There may be new timer or GC work; restart to
  59.                         // discover.
  60.                         goto top
  61.                 }
  62.                 now = tnow
  63.                 if w != 0 && (pollUntil == 0 || w < pollUntil) {
  64.                         // Earlier timer to wait for.
  65.                         pollUntil = w
  66.                 }
  67.         }
  68.    
  69.     //...
复制代码
其主要的执行步骤如下:
(一)第六十一次调度
  1. if pp.schedtick%61 == 0 && sched.runqsize > 0 {
  2.                 lock(&sched.lock)
  3.                 gp := globrunqget(pp, 1)
  4.                 unlock(&sched.lock)
  5.                 if gp != nil {
  6.                         return gp, false, false
  7.                 }
  8.         }
复制代码
首先查抄P的调度次数,如果这次是P的第61此次调度,并且全局的G队列长度>0,就会从全局队列获取一个G。这是为了防止在特定情况下,只运行本地队列的G,忽视了全局队列。
其内部调用的globrunqget方法主流程如下:
  1. //尝试从G的全局队列获取一批G
  2. func globrunqget(pp *p, max int32) *g {
  3.         assertLockHeld(&sched.lock)
  4.         //检查全局队列是否为空
  5.         if sched.runqsize == 0 {
  6.                 return nil
  7.         }
  8.     //计算需要获取的G的数量
  9.         n := sched.runqsize/gomaxprocs + 1
  10.         if n > sched.runqsize {
  11.                 n = sched.runqsize
  12.         }
  13.         if max > 0 && n > max {
  14.                 n = max
  15.         }
  16.     //确保从队列中获取的G数量不超过当前本地队列的G数量的一半,避免全局队列所有的G都转移到本地队列中导致负载不均衡
  17.         if n > int32(len(pp.runq))/2 {
  18.                 n = int32(len(pp.runq)) / 2
  19.         }
  20.         sched.runqsize -= n
  21.         gp := sched.runq.pop()
  22.         n--
  23.         for ; n > 0; n-- {
  24.                 gp1 := sched.runq.pop()
  25.                 runqput(pp, gp1, false)
  26.         }
  27.         return gp
  28. }
复制代码
  1. //计算需要获取的G的数量
  2.         n := sched.runqsize/gomaxprocs + 1
  3.         if n > sched.runqsize {
  4.                 n = sched.runqsize
  5.         }
  6.         if max > 0 && n > max {
  7.                 n = max
  8.         }
  9.         if n > int32(len(pp.runq))/2 {
  10.                 n = int32(len(pp.runq)) / 2
  11.         }
复制代码
n为要从全局G队列获取的G的数目,可以看到它会至少获取一个G,至多获取runqsize/gomaxprocs+1个G,它包管了一个P不过多的获取G从而影响负载均衡。并且不允许n一次获取全局G队列一半以上的G,包管负载均衡。
  1. gp := sched.runq.pop()
  2.         n--
  3.         for ; n > 0; n-- {
  4.                 gp1 := sched.runq.pop()
  5.                 runqput(pp, gp1, false)
  6.         }
复制代码
决定好获取多少个G后,第一个G会直接通过指针返回,剩余的则是将其添加到P的本地队列中。
在当前(一)的调用中,函数设置了max值为1,因此只会从全局队列获取1个G返回。
虽然在(一)中不会执行runqput,但是我们照旧来看看是怎么将G添加到P的本地队列的。
  1. // runqput尝试将G放到本地队列中
  2. //如果next是False,runqput会将G添加到本地队列的尾部
  3. //如果是True,runqput会将G添加到下一个将被调度的G的槽位
  4. //如果运行队列满了,那么将会把g放回全局队列
  5. func runqput(pp *p, gp *g, next bool) {
  6.     //
  7.         if randomizeScheduler && next && randn(2) == 0 {
  8.                 next = false
  9.         }
  10.         if next {
  11.         retryNext:
  12.                 oldnext := pp.runnext
  13.                 if !pp.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
  14.                         goto retryNext
  15.                 }
  16.                 if oldnext == 0 {
  17.                         return
  18.                 }
  19.                 // Kick the old runnext out to the regular run queue.
  20.                 gp = oldnext.ptr()
  21.         }
  22. retry:
  23.         h := atomic.LoadAcq(&pp.runqhead) //加载队列头的位置
  24.         t := pp.runqtail
  25.         if t-h < uint32(len(pp.runq)) { //检查本地队列是否已满
  26.                 pp.runq[t%uint32(len(pp.runq))].set(gp) //未满将gp插入runqtail的指定位置
  27.                 atomic.StoreRel(&pp.runqtail, t+1) //更新runtail,表示插入的G可供消费
  28.                 return
  29.         }
  30.         if runqputslow(pp, gp, h, t) { //如果本地队列已满,则尝试放回全局队列
  31.                 return
  32.         }
  33.         // the queue is not full, now the put above must succeed
  34.         goto retry
  35. }
复制代码
  1. if randomizeScheduler && next && randn(2) == 0 {
  2.                 next = false
  3.         }
复制代码
在第一步中,我们看到即使next被设置为true,即要求了该G应该被放置在本地P队列的runnext槽位中,也会有概率地将next置为false
  1. if next {
  2.         retryNext:
  3.                 oldnext := pp.runnext
  4.                 if !pp.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
  5.                         goto retryNext
  6.                 }
  7.                 if oldnext == 0 {
  8.                         return
  9.                 }
  10.                 // Kick the old runnext out to the regular run queue.
  11.                 gp = oldnext.ptr()
  12.         }
复制代码
如果next仍为true,此时先获取原本P调度器中,runnext槽位的G(oldnext),然后会不断地尝试将新的G替换掉旧的G直到成功为止。当成功之后,在下面的操作流程中会把旧的G放入到P的本地队列中。
  1. retry:
  2.         h := atomic.LoadAcq(&pp.runqhead) //加载队列头的位置
  3.         t := pp.runqtail
  4.         if t-h < uint32(len(pp.runq)) { //检查本地队列是否已满
  5.                 pp.runq[t%uint32(len(pp.runq))].set(gp) //未满将gp插入runqtail的指定位置
  6.                 atomic.StoreRel(&pp.runqtail, t+1) //更新runtail,表示插入的G可供消费
  7.                 return
  8.         }
  9.         if runqputslow(pp, gp, h, t) { //如果本地队列已满,则尝试放回全局队列
  10.                 return
  11.         }
  12.         // the queue is not full, now the put above must succeed
  13.         goto retry
  14. }
复制代码
在将G加入进P的本地队列的流程中,需要获取队列头部和尾部的坐标,用来判断本地队列是否已满,未满则将G插入进本地队列的尾部中。否则执行runqputslow方法,尝试放回全局队列。
接下来继承跟进runqputslow方法的执行流程。
  1. //将G和一批工作(本地队列的G)放入到全局队列
  2. func runqputslow(pp *p, gp *g, h, t uint32) bool {
  3.         var batch [len(pp.runq)/2 + 1]*g //本地队列一半的G
  4.         // First, grab a batch from local queue.
  5.         n := t - h
  6.         n = n / 2
  7.         if n != uint32(len(pp.runq)/2) {
  8.                 throw("runqputslow: queue is not full")
  9.         }
  10.         for i := uint32(0); i < n; i++ {
  11.                 batch[i] = pp.runq[(h+i)%uint32(len(pp.runq))].ptr()
  12.         }
  13.         if !atomic.CasRel(&pp.runqhead, h, h+n) { // cas-release, commits consume
  14.                 return false
  15.         }
  16.         batch[n] = gp
  17.         if randomizeScheduler { //打乱顺序
  18.                 for i := uint32(1); i <= n; i++ {
  19.                         j := cheaprandn(i + 1)
  20.                         batch[i], batch[j] = batch[j], batch[i]
  21.                 }
  22.         }
  23.         // Link the goroutines.
  24.         for i := uint32(0); i < n; i++ {
  25.                 batch[i].schedlink.set(batch[i+1])
  26.         }
  27.         var q gQueue
  28.         q.head.set(batch[0])
  29.         q.tail.set(batch[n])
  30.         // Now put the batch on global queue.
  31.         lock(&sched.lock)
  32.         globrunqputbatch(&q, int32(n+1))
  33.         unlock(&sched.lock)
  34.         return true
  35. }
复制代码
从n=n-n/2我们可以得知,是获取一半数目的G。
通过stealWork->runqsteal->runqgrab的方法链路,完成了将其他P的本地队列G搬运到当前P的本地队列中的过程。
(六)总览

最后,我们用画图来整体回顾findRunnable的执行流程。

2.3.2、execute()

当我们成功的通过findRunnable()找到了可以被执行的G的时间,就会对当前的G调用execute()方法,开始去调用这个G。
  1. var batch [len(pp.runq)/2 + 1]*g //本地队列一半的G
复制代码
可以看到execute的主要使命就是将当前的G和M进行绑定,即把G分配给这个线程M,然后调解它的状态为执行态,最后调用gogo方法完成对用户方法的运行。
2.3.3、mcall()

从2.3.2小节中我们知道,执行的execute函数完成了g0和g的切换,将对M的执行权交给了g,然后调用了gogo方法运行g。当需要重新将M的执行权从g切换到g0的时间,需要执行mcall()方法,完成切换。mcall()方法的作用我们在2.1小节中提到过,该方法是通过汇编语言实现的,主要的作用是完成了对g的栈信息的保存、将当前堆栈从g切换到g0、在g0的栈上执行mcall方法中传入的fn回调函数。
什么时间调用mcall(),就涉及到我们在2.2小节讲到了调度类型了。接下来我们通过源码一一分析。
1、主动调度

主动调度是提供给用户的让权方法,执行的是runtime包下的Gosched方法。
  1. n := t - h
  2.         n = n / 2
  3.         if n != uint32(len(pp.runq)/2) {
  4.                 throw("runqputslow: queue is not full")
  5.         }
  6.         for i := uint32(0); i < n; i++ {
  7.                 batch[i] = pp.runq[(h+i)%uint32(len(pp.runq))].ptr()
  8.         }
复制代码
Gosched方法就调用了mcall,并且传入回调函数gosched_m。
  1. if randomizeScheduler { //打乱顺序
  2.                 for i := uint32(1); i <= n; i++ {
  3.                         j := cheaprandn(i + 1)
  4.                         batch[i], batch[j] = batch[j], batch[i]
  5.                 }
  6.         }
复制代码
gosched_m完成了对G的状态的转换,然后调用dropg将M和G解绑,再将G放回到全局队列里面,终极调用schedule进行新一轮的调度。
2、被动调度

当当前G需要被被动调用的时间,就会调用goprak(),将其置为阻塞态,等待别人的叫醒。
  1. // Link the goroutines.
  2.         for i := uint32(0); i < n; i++ {
  3.                 batch[i].schedlink.set(batch[i+1])
  4.         }
  5.         var q gQueue
  6.         q.head.set(batch[0])
  7.         q.tail.set(batch[n])
  8.         // Now put the batch on global queue.
  9.         lock(&sched.lock)
  10.         globrunqputbatch(&q, int32(n+1))
  11.         unlock(&sched.lock)
  12.         return true
复制代码
gopark内部调用了mcall(park_m),park_m将G的状态置为waiting,并且将M和G解绑,然后开启新一轮的调度。
进入等待的G需要被动的被其他事件叫醒,此时就会调用goready方法。
  1. // local runq
  2.         if gp, inheritTime := runqget(pp); gp != nil {
  3.                 return gp, inheritTime, false
  4.         }
复制代码
ready方法会将G的状态重新切换成运行态,并且将G放入到P的运行队列里面。从代码中我们可以看到,被叫醒的G并不会立刻执行,而是加入到本地队列中等待下一次被调度。
3、正常调度

如果G被正常的执行完毕,就会调用goexit1()方法完成g和g0的切换。
  1. // 从本地可运行队列中获取 g。
  2. func runqget(pp *p) (gp *g, inheritTime bool) {
  3.         // 如果有 runnext,则它是下一个要运行的 G。
  4.         next := pp.runnext
  5.     // 如果 runnext 非零且 CAS 操作失败,它只能被另一个 P 窃取,因为其他 P 可以竞争将 runnext 设置为零,但只有当前 P 可以将其设置为非零。
  6.         // 因此,如果 CAS 失败,则无需重试该操作。
  7.         if next != 0 && pp.runnext.cas(next, 0) {
  8.                 return next.ptr(), true
  9.         }
  10.         for {
  11.                 h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with other consumers
  12.                 t := pp.runqtail
  13.                 if t == h {
  14.                         return nil, false
  15.                 }
  16.                 gp := pp.runq[h%uint32(len(pp.runq))].ptr()
  17.                 if atomic.CasRel(&pp.runqhead, h, h+1) { // cas-release, commits consume
  18.                         return gp, false
  19.                 }
  20.         }
  21. }
复制代码
终极,协程G被销毁,并且开启新一轮的调度。
4、抢占调度

抢占调度最为复杂,因为它需要全局监控者m去查抄所有的P是否被恒久阻塞,这需要花时间去检索,而不能直接锁定到哪个P需要被抢占。全局监控者会调用retake()方法去查抄,其流程如下:
  1. // global runq
  2.         if sched.runqsize != 0 {
  3.                 lock(&sched.lock)
  4.                 gp := globrunqget(pp, 0)
  5.                 unlock(&sched.lock)
  6.                 if gp != nil {
  7.                         return gp, false, false
  8.                 }
  9.         }
复制代码
  1.     //在正式的去偷取G之前,用非阻塞的方式检查是否有就绪的网络协程,这是对netpoll的一个优化。
  2.         if netpollinited() && netpollAnyWaiters() && sched.lastpoll.Load() != 0 {
  3.                 if list, delta := netpoll(0); !list.empty() { // non-blocking
  4.                         gp := list.pop()
  5.                         injectglist(&list)
  6.                         netpollAdjustWaiters(delta)
  7.                         trace := traceAcquire()
  8.                         casgstatus(gp, _Gwaiting, _Grunnable)
  9.                         if trace.ok() {
  10.                                 trace.GoUnpark(gp, 0)
  11.                                 traceRelease(trace)
  12.                         }
  13.                         return gp, false, false
  14.                 }
  15.         }
复制代码
逐一的获取P,进行查抄。
  1.         if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {
  2.                 if !mp.spinning {
  3.                         mp.becomeSpinning()
  4.                 }
  5.                 gp, inheritTime, tnow, w, newWork := stealWork(now)
  6.                 if gp != nil {
  7.                         // Successfully stole.
  8.                         return gp, inheritTime, false
  9.                 }
  10.                 //...
  11.         }
复制代码
当满足以下三个条件的时间,就会执行抢占调度:

  • p的本地队列有等待执行的G
  • 当前没有空闲的p和m
  • 执行系统调用的时间超过10ms
此时就会调用抢占调度,先将p的状态置为idle,表示可以被其他的M获取绑定,然后调用handoffp方法。
  1. if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {
  2.                 if !mp.spinning {
  3.                         mp.becomeSpinning()
  4.                 }
复制代码
当我们满足以下情况之一的时间,就会为当前的P新分配一个M进行调度:

  • 全局队列不为空或者本地队列不为空,即有可以运行的G。
  • 需要有trace去执行。
  • 有垃圾回收的工作需要执行。
  • 当前时刻没有自旋的线程M并且没有空闲的P(表示当前时刻使命繁忙)。
  • 当前P是唯一在运行的P,并且有网络事件等待处理。
当满足五个条件之一的时间,都会进入到startm()方法中,为当前的P分配一个M。
  1. gp, inheritTime, tnow, w, newWork := stealWork(now)
  2.                 if gp != nil {
  3.                         // Successfully stole.
  4.                         return gp, inheritTime, false
  5.                 }
复制代码
  1. func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
  2.         pp := getg().m.p.ptr()
  3.         ranTimer := false
  4.     //最多从其他P窃取4次任务
  5.         const stealTries = 4
  6.         for i := 0; i < stealTries; i++ {
  7.         //在进行最后一次的遍历前,优先检查其他P的Timer队列
  8.                 stealTimersOrRunNextG := i == stealTries-1
  9.                 //随机生成遍历起点
  10.                 for enum := stealOrder.start(cheaprand()); !enum.done(); enum.next() {
  11.                         //...
  12.                         p2 := allp[enum.position()]
  13.                         if pp == p2 {
  14.                                 continue
  15.                         }
  16.                        
  17.                         //...
  18.                         //如果P是非空闲的,则尝试窃取
  19.                         if !idlepMask.read(enum.position()) {
  20.                                 if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
  21.                                         return gp, false, now, pollUntil, ranTimer
  22.                                 }
  23.                         }
  24.                 }
  25.         }
  26.         //如果在所有尝试中均未找到可运行的 Goroutine 或 Timer,则返回 nil,并返回 pollUntil(下一次轮询的时间)。
  27.         return nil, false, now, pollUntil, ranTimer
  28. }
复制代码
如果传入的pp是nil,那么会自动设置为空闲p队列中的第一个p,如果仍然为nil表示当前没有空闲的p,会退出方法。
  1. const stealTries = 4
  2.         for i := 0; i < stealTries; i++ {
复制代码
然后会尝试获取当前的空闲的m,如果不存在则新创建一个m。
至此,关于GMP模型的节选部分的讲解就完成了,大概有许多我理解的不对的地方接待大家讨论,谢谢观看。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

八卦阵

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表