从源码分析 Go 语言使用 cgo 导致的线程增长

打印 上一主题 下一主题

主题 915|帖子 915|积分 2745

TDengine Go 连接器 https://github.com/taosdata/driver-go 使用 cgo 调用 taos.so 中的 API,使用过程中发现线程数不断增长,本文从一个 cgo 调用开始解析 Go 源码,分析造成线程增长的原因。
转换 cgo 代码

对 driver-go/wrapper/taosc.go 进行转换
go tool cgo taosc.go
执行后生成 _obj 文件夹
go 代码分析

以 taosc.cgo1.go 中 TaosResetCurrentDB 为例来分析。
  1. // TaosResetCurrentDB void taos_reset_current_db(TAOS *taos);
  2. func TaosResetCurrentDB(taosConnect unsafe.Pointer) {
  3.     func() { _cgo0 := /*line :161:26*/taosConnect; _cgoCheckPointer(_cgo0, nil); _Cfunc_taos_reset_current_db(_cgo0); }()
  4. }
  5. //go:linkname _cgoCheckPointer runtime.cgoCheckPointer
  6. func _cgoCheckPointer(interface{}, interface{})
  7. //go:cgo_unsafe_args
  8. func _Cfunc_taos_reset_current_db(p0 unsafe.Pointer) (r1 _Ctype_void) {
  9.     _cgo_runtime_cgocall(_cgo_453a0cad50ef_Cfunc_taos_reset_current_db, uintptr(unsafe.Pointer(&p0)))
  10.     if _Cgo_always_false {
  11.         _Cgo_use(p0)
  12.     }
  13.     return
  14. }
  15. //go:linkname _cgo_runtime_cgocall runtime.cgocall
  16. func _cgo_runtime_cgocall(unsafe.Pointer, uintptr) int32
  17. //go:cgo_import_static _cgo_453a0cad50ef_Cfunc_taos_reset_current_db
  18. //go:linkname __cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db _cgo_453a0cad50ef_Cfunc_taos_reset_current_db
  19. var __cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db byte
  20. var _cgo_453a0cad50ef_Cfunc_taos_reset_current_db = unsafe.Pointer(&__cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db)
复制代码

  • TaosResetCurrentDB 首先调用 _cgoCheckPointer 检查传入参数是否为 nil。
  • //go:linkname _cgoCheckPointer runtime.cgoCheckPointer 表示 cgoCheckPointer 方法实现是 runtime.cgoCheckPointer,如果传入参数是 nil 程序将会 panic。
  • 接着调用 _Cfunc_taos_reset_current_db。
  • Cfunc_taos_reset_current_db 方法中 _Cgo_always_false 在运行时会是 false,所以只分析第一句 _cgo_runtime_cgocall(_cgo_453a0cad50ef_Cfunc_taos_reset_current_db, uintptr(unsafe.Pointer(&p0)))。

    • _cgo_runtime_cgocall 实现是 runtime.cgocall 这个会重点分析。
    • _cgo_453a0cad50ef_Cfunc_taos_reset_current_db 由上方最后代码块可以看出是 taos_reset_current_db 方法指针。
    • uintptr(unsafe.Pointer(&p0)) 表示 p0 的指针地址。
    • 由上面可以看出这句意思是调用 runtime.cgocall,参数为方法指针和参数的指针地址。

分析 runtime.cgocall

基于 golang 1.20.4 分析该方法
  1. func cgocall(fn, arg unsafe.Pointer) int32 {
  2.     if !iscgo && GOOS != "solaris" && GOOS != "illumos" && GOOS != "windows" {
  3.         throw("cgocall unavailable")
  4.     }
  5.     if fn == nil {
  6.         throw("cgocall nil")
  7.     }
  8.     if raceenabled {
  9.         racereleasemerge(unsafe.Pointer(&racecgosync))
  10.     }
  11.     mp := getg().m // 获取当前 goroutine 的 M
  12.     mp.ncgocall++  // 总 cgo 计数 +1
  13.     mp.ncgo++      // 当前 cgo 计数 +1
  14.     mp.cgoCallers[0] = 0 // 重置追踪
  15.     entersyscall() // 进入系统调用,保存上下文, 标记当前 goroutine 独占 m, 跳过垃圾回收
  16.     osPreemptExtEnter(mp) // 标记异步抢占, 使异步抢占逻辑失效
  17.     mp.incgo = true // 修改状态
  18.     errno := asmcgocall(fn, arg) // 真正进行方法调用的地方
  19.     mp.incgo = false // 修改状态
  20.     mp.ncgo-- // 当前 cgo 调用-1
  21.     osPreemptExtExit(mp) // 恢复异步抢占
  22.     exitsyscall() // 退出系统调用,恢复调度器控制
  23.     if raceenabled {
  24.         raceacquire(unsafe.Pointer(&racecgosync))
  25.     }
  26.     // 避免 GC 过早回收
  27.     KeepAlive(fn)
  28.     KeepAlive(arg)
  29.     KeepAlive(mp)
  30.     return errno
  31. }
复制代码
其中两个主要的方法 entersyscall 和 asmcgocall,接下来对这两个方法进行着重分析。
分析 entersyscall
  1. func entersyscall() {
  2.     reentersyscall(getcallerpc(), getcallersp())
  3. }
复制代码
entersyscall 直接调用的 reentersyscall,关注下 reentersyscall 注释中的一段:
  1. // If the syscall does not block, that is it, we do not emit any other events.
  2. // If the syscall blocks (that is, P is retaken), retaker emits traceGoSysBlock;
复制代码
如果 syscall 调用没有阻塞则不会触发任何事件,如果被阻塞 retaker 会触发 traceGoSysBlock,那需要了解一下多长时间被认为是阻塞,先跟到 retaker 方法。
  1. func retake(now int64) uint32 {
  2.     n := 0
  3.     lock(&allpLock)
  4.     for i := 0; i < len(allp); i++ {
  5.         pp := allp[i]
  6.         if pp == nil {
  7.             continue
  8.         }
  9.         pd := &pp.sysmontick
  10.         s := pp.status
  11.         sysretake := false
  12.         if s == _Prunning || s == _Psyscall {
  13.             t := int64(pp.schedtick)
  14.             if int64(pd.schedtick) != t {
  15.                 pd.schedtick = uint32(t)
  16.                 pd.schedwhen = now
  17.             } else if pd.schedwhen+forcePreemptNS <= now {
  18.                 preemptone(pp)
  19.                 sysretake = true
  20.             }
  21.         }
  22.         // 从系统调用中抢占P
  23.         if s == _Psyscall {
  24.             // 如果已经超过了一个系统监控的 tick(20us),则从系统调用中抢占 P
  25.             t := int64(pp.syscalltick)
  26.             if !sysretake && int64(pd.syscalltick) != t {
  27.                 pd.syscalltick = uint32(t)
  28.                 pd.syscallwhen = now
  29.                 continue
  30.             }
  31.             if runqempty(pp) && sched.nmspinning.Load()+sched.npidle.Load() > 0 && pd.syscallwhen+10*1000*1000 > now {
  32.                 continue
  33.             }
  34.             unlock(&allpLock)
  35.             incidlelocked(-1)
  36.             if atomic.Cas(&pp.status, s, _Pidle) {
  37.                 if trace.enabled {
  38.                     traceGoSysBlock(pp)
  39.                     traceProcStop(pp)
  40.                 }
  41.                 n++
  42.                 pp.syscalltick++
  43.                 handoffp(pp)
  44.             }
  45.             incidlelocked(1)
  46.             lock(&allpLock)
  47.         }
  48.     }
  49.     unlock(&allpLock)
  50.     return uint32(n)
  51. }
复制代码
从上面可以看到系统调用阻塞 20 多微秒会被抢占 P,cgo 被迫 handoffp,接下来分析 handoffp 方法
  1. func handoffp(pp *p) {
  2.     // ...
  3.     // 没有任务且没有自旋和空闲的 M 则需要启动一个新的 M
  4.     if sched.nmspinning.Load()+sched.npidle.Load() == 0 && sched.nmspinning.CompareAndSwap(0, 1) {
  5.         sched.needspinning.Store(0)
  6.         startm(pp, true)
  7.         return
  8.     }
  9.     // ...
  10. }
复制代码
handoffp 方法会调用 startm 来启动一个新的 M,跟到 startm 方法。
  1. func startm(pp *p, spinning bool) {
  2.     // ...
  3.     nmp := mget()
  4.     if nmp == nil {
  5.         // 没有M可用,调用newm
  6.         id := mReserveID()
  7.         unlock(&sched.lock)
  8.         var fn func()
  9.         if spinning {
  10.             fn = mspinning
  11.         }
  12.         newm(fn, pp, id)
  13.         releasem(mp)
  14.         return
  15.     }
  16.     // ...
  17. }
复制代码
此时如果没有 M  startm 会调用 newm 创建一个新的 M,接下来分析 newm 方法。
  1. func newm(fn func(), pp *p, id int64) {
  2.     acquirem()
  3.     mp := allocm(pp, fn, id)
  4.     mp.nextp.set(pp)
  5.     mp.sigmask = initSigmask
  6.     if gp := getg(); gp != nil && gp.m != nil && (gp.m.lockedExt != 0 || gp.m.incgo) && GOOS != "plan9" {
  7.         lock(&newmHandoff.lock)
  8.         if newmHandoff.haveTemplateThread == 0 {
  9.             throw("on a locked thread with no template thread")
  10.         }
  11.         mp.schedlink = newmHandoff.newm
  12.         newmHandoff.newm.set(mp)
  13.         if newmHandoff.waiting {
  14.             newmHandoff.waiting = false
  15.             notewakeup(&newmHandoff.wake)
  16.         }
  17.         unlock(&newmHandoff.lock)
  18.         releasem(getg().m)
  19.         return
  20.     }
  21.     newm1(mp)
  22.     releasem(getg().m)
  23. }
  24. func newm1(mp *m) {
  25.     if iscgo {
  26.         var ts cgothreadstart
  27.         if _cgo_thread_start == nil {
  28.             throw("_cgo_thread_start missing")
  29.         }
  30.         ts.g.set(mp.g0)
  31.         ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))
  32.         ts.fn = unsafe.Pointer(abi.FuncPCABI0(mstart))
  33.         if msanenabled {
  34.             msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
  35.         }
  36.         if asanenabled {
  37.             asanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
  38.         }
  39.         execLock.rlock()
  40.         // 创建新线程
  41.         asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))
  42.         execLock.runlock()
  43.         return
  44.     }
  45.     execLock.rlock()
  46.     newosproc(mp)
  47.     execLock.runlock()
  48. }
复制代码
从 newm 看出如果线程都在阻塞中则调用 newm1,newm1 调用 _cgo_thread_start 创建新线程。
由以上分析得出当高并发调用 cgo 且执行时间超过 20 微秒时会创建新线程。
分析 asmcgocall

只分析 amd64
asm_amd64.s
  1. TEXT ·asmcgocall(SB),NOSPLIT,$0-20
  2.     MOVQ    fn+0(FP), AX
  3.     MOVQ    arg+8(FP), BX
  4.     MOVQ    SP, DX
  5.     // 考虑是否需要切换到 m.g0 栈
  6.     // 也用来调用创建新的 OS 线程,这些线程已经在 m.g0 栈中了
  7.     get_tls(CX)
  8.     MOVQ    g(CX), DI
  9.     CMPQ    DI, $0
  10.     JEQ nosave
  11.     MOVQ    g_m(DI), R8
  12.     MOVQ    m_gsignal(R8), SI
  13.     CMPQ    DI, SI
  14.     JEQ nosave
  15.     MOVQ    m_g0(R8), SI
  16.     CMPQ    DI, SI
  17.     JEQ nosave
  18.    
  19.     // 切换到系统栈
  20.     CALL    gosave_systemstack_switch<>(SB)
  21.     MOVQ    SI, g(CX)
  22.     MOVQ    (g_sched+gobuf_sp)(SI), SP
  23.     // 于调度栈中(pthread 新创建的栈)
  24.     // 确保有足够的空间给四个 stack-based fast-call 寄存器
  25.     // 为使得 windows amd64 调用服务
  26.     SUBQ    $64, SP
  27.     ANDQ    $~15, SP // 为 gcc ABI 对齐
  28.     MOVQ    DI, 48(SP) // 保存 g
  29.     MOVQ    (g_stack+stack_hi)(DI), DI
  30.     SUBQ    DX, DI
  31.     MOVQ    DI, 40(SP) // 保存栈深 (不能仅保存 SP,因为栈可能在回调时被复制)
  32.     MOVQ    BX, DI  // DI = AMD64 ABI 第一个参数
  33.     MOVQ    BX, CX  // CX = Win64 第一个参数
  34.     CALL    AX  // 调用 fn
  35.     // 恢复寄存器、 g、栈指针
  36.     get_tls(CX)
  37.     MOVQ    48(SP), DI
  38.     MOVQ    (g_stack+stack_hi)(DI), SI
  39.     SUBQ    40(SP), SI
  40.     MOVQ    DI, g(CX)
  41.     MOVQ    SI, SP
  42.     MOVL    AX, ret+16(FP)
  43.     RET
  44. nosave:
  45.     // 在系统栈上运行,可能没有 g
  46.     // 没有 g 的情况发生在线程创建中或线程结束中(比如 Solaris 平台上的 needm/dropm)
  47.     // 这段代码和上面类似,但没有保存和恢复 g,且没有考虑栈的移动问题(因为我们在系统栈上,而非 goroutine 栈)
  48.     // 如果已经在系统栈上,则上面的代码可被直接使用,在 Solaris 上会进入下面这段代码。
  49.     // 使用这段代码来为所有 "已经在系统栈" 的调用进行服务,从而保持正确性。
  50.     SUBQ    $64, SP
  51.     ANDQ    $~15, SP // ABI 对齐
  52.     MOVQ    $0, 48(SP) // 上面的代码保存了 g, 确保 debug 时可用
  53.     MOVQ    DX, 40(SP) // 保存原始的栈指针
  54.     MOVQ    BX, DI  // DI = AMD64 ABI 第一个参数
  55.     MOVQ    BX, CX  // CX = Win64 第一个参数
  56.     CALL    AX
  57.     MOVQ    40(SP), SI // 恢复原来的栈指针
  58.     MOVQ    SI, SP
  59.     MOVL    AX, ret+16(FP)
  60.     RET
复制代码
这段就是将当前栈移到系统栈去执行,因为 C 需要无穷大的栈,在 Go 的栈上执行 C 函数会导致栈溢出。
产生问题

cgo 调用会将当前栈移到系统栈,并且当 cgo 高并发调用且阻塞超过 20 微秒时会新建线程。而 Go 并不会销毁线程,由此造成线程增长。
解决方案

限制 Go 程序最大线程数,默认为 cpu 核数。
  1. runtime.GOMAXPROCS(runtime.NumCPU())
复制代码
使用 channel 限制 cgo 最大并发数为 cpu 核数
[code]package threadimport "runtime"var c chan struct{}func Lock() {    c
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

钜形不锈钢水箱

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表