TDengine Go 连接器 https://github.com/taosdata/driver-go 使用 cgo 调用 taos.so 中的 API,使用过程中发现线程数不断增长,本文从一个 cgo 调用开始解析 Go 源码,分析造成线程增长的原因。
转换 cgo 代码
对 driver-go/wrapper/taosc.go 进行转换
go tool cgo taosc.go
执行后生成 _obj 文件夹
go 代码分析
以 taosc.cgo1.go 中 TaosResetCurrentDB 为例来分析。- // TaosResetCurrentDB void taos_reset_current_db(TAOS *taos);
- func TaosResetCurrentDB(taosConnect unsafe.Pointer) {
- func() { _cgo0 := /*line :161:26*/taosConnect; _cgoCheckPointer(_cgo0, nil); _Cfunc_taos_reset_current_db(_cgo0); }()
- }
- //go:linkname _cgoCheckPointer runtime.cgoCheckPointer
- func _cgoCheckPointer(interface{}, interface{})
- //go:cgo_unsafe_args
- func _Cfunc_taos_reset_current_db(p0 unsafe.Pointer) (r1 _Ctype_void) {
- _cgo_runtime_cgocall(_cgo_453a0cad50ef_Cfunc_taos_reset_current_db, uintptr(unsafe.Pointer(&p0)))
- if _Cgo_always_false {
- _Cgo_use(p0)
- }
- return
- }
- //go:linkname _cgo_runtime_cgocall runtime.cgocall
- func _cgo_runtime_cgocall(unsafe.Pointer, uintptr) int32
- //go:cgo_import_static _cgo_453a0cad50ef_Cfunc_taos_reset_current_db
- //go:linkname __cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db _cgo_453a0cad50ef_Cfunc_taos_reset_current_db
- var __cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db byte
- var _cgo_453a0cad50ef_Cfunc_taos_reset_current_db = unsafe.Pointer(&__cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db)
复制代码
- TaosResetCurrentDB 首先调用 _cgoCheckPointer 检查传入参数是否为 nil。
- //go:linkname _cgoCheckPointer runtime.cgoCheckPointer 表示 cgoCheckPointer 方法实现是 runtime.cgoCheckPointer,如果传入参数是 nil 程序将会 panic。
- 接着调用 _Cfunc_taos_reset_current_db。
- Cfunc_taos_reset_current_db 方法中 _Cgo_always_false 在运行时会是 false,所以只分析第一句 _cgo_runtime_cgocall(_cgo_453a0cad50ef_Cfunc_taos_reset_current_db, uintptr(unsafe.Pointer(&p0)))。
- _cgo_runtime_cgocall 实现是 runtime.cgocall 这个会重点分析。
- _cgo_453a0cad50ef_Cfunc_taos_reset_current_db 由上方最后代码块可以看出是 taos_reset_current_db 方法指针。
- uintptr(unsafe.Pointer(&p0)) 表示 p0 的指针地址。
- 由上面可以看出这句意思是调用 runtime.cgocall,参数为方法指针和参数的指针地址。
分析 runtime.cgocall
基于 golang 1.20.4 分析该方法- func cgocall(fn, arg unsafe.Pointer) int32 {
- if !iscgo && GOOS != "solaris" && GOOS != "illumos" && GOOS != "windows" {
- throw("cgocall unavailable")
- }
- if fn == nil {
- throw("cgocall nil")
- }
- if raceenabled {
- racereleasemerge(unsafe.Pointer(&racecgosync))
- }
- mp := getg().m // 获取当前 goroutine 的 M
- mp.ncgocall++ // 总 cgo 计数 +1
- mp.ncgo++ // 当前 cgo 计数 +1
- mp.cgoCallers[0] = 0 // 重置追踪
- entersyscall() // 进入系统调用,保存上下文, 标记当前 goroutine 独占 m, 跳过垃圾回收
- osPreemptExtEnter(mp) // 标记异步抢占, 使异步抢占逻辑失效
- mp.incgo = true // 修改状态
- errno := asmcgocall(fn, arg) // 真正进行方法调用的地方
- mp.incgo = false // 修改状态
- mp.ncgo-- // 当前 cgo 调用-1
- osPreemptExtExit(mp) // 恢复异步抢占
- exitsyscall() // 退出系统调用,恢复调度器控制
- if raceenabled {
- raceacquire(unsafe.Pointer(&racecgosync))
- }
- // 避免 GC 过早回收
- KeepAlive(fn)
- KeepAlive(arg)
- KeepAlive(mp)
- return errno
- }
复制代码 其中两个主要的方法 entersyscall 和 asmcgocall,接下来对这两个方法进行着重分析。
分析 entersyscall
- func entersyscall() {
- reentersyscall(getcallerpc(), getcallersp())
- }
复制代码 entersyscall 直接调用的 reentersyscall,关注下 reentersyscall 注释中的一段:- // If the syscall does not block, that is it, we do not emit any other events.
- // If the syscall blocks (that is, P is retaken), retaker emits traceGoSysBlock;
复制代码 如果 syscall 调用没有阻塞则不会触发任何事件,如果被阻塞 retaker 会触发 traceGoSysBlock,那需要了解一下多长时间被认为是阻塞,先跟到 retaker 方法。- func retake(now int64) uint32 {
- n := 0
- lock(&allpLock)
- for i := 0; i < len(allp); i++ {
- pp := allp[i]
- if pp == nil {
- continue
- }
- pd := &pp.sysmontick
- s := pp.status
- sysretake := false
- if s == _Prunning || s == _Psyscall {
- t := int64(pp.schedtick)
- if int64(pd.schedtick) != t {
- pd.schedtick = uint32(t)
- pd.schedwhen = now
- } else if pd.schedwhen+forcePreemptNS <= now {
- preemptone(pp)
- sysretake = true
- }
- }
- // 从系统调用中抢占P
- if s == _Psyscall {
- // 如果已经超过了一个系统监控的 tick(20us),则从系统调用中抢占 P
- t := int64(pp.syscalltick)
- if !sysretake && int64(pd.syscalltick) != t {
- pd.syscalltick = uint32(t)
- pd.syscallwhen = now
- continue
- }
- if runqempty(pp) && sched.nmspinning.Load()+sched.npidle.Load() > 0 && pd.syscallwhen+10*1000*1000 > now {
- continue
- }
- unlock(&allpLock)
- incidlelocked(-1)
- if atomic.Cas(&pp.status, s, _Pidle) {
- if trace.enabled {
- traceGoSysBlock(pp)
- traceProcStop(pp)
- }
- n++
- pp.syscalltick++
- handoffp(pp)
- }
- incidlelocked(1)
- lock(&allpLock)
- }
- }
- unlock(&allpLock)
- return uint32(n)
- }
复制代码 从上面可以看到系统调用阻塞 20 多微秒会被抢占 P,cgo 被迫 handoffp,接下来分析 handoffp 方法- func handoffp(pp *p) {
- // ...
- // 没有任务且没有自旋和空闲的 M 则需要启动一个新的 M
- if sched.nmspinning.Load()+sched.npidle.Load() == 0 && sched.nmspinning.CompareAndSwap(0, 1) {
- sched.needspinning.Store(0)
- startm(pp, true)
- return
- }
- // ...
- }
复制代码 handoffp 方法会调用 startm 来启动一个新的 M,跟到 startm 方法。- func startm(pp *p, spinning bool) {
- // ...
- nmp := mget()
- if nmp == nil {
- // 没有M可用,调用newm
- id := mReserveID()
- unlock(&sched.lock)
- var fn func()
- if spinning {
- fn = mspinning
- }
- newm(fn, pp, id)
- releasem(mp)
- return
- }
- // ...
- }
复制代码 此时如果没有 M startm 会调用 newm 创建一个新的 M,接下来分析 newm 方法。- func newm(fn func(), pp *p, id int64) {
- acquirem()
- mp := allocm(pp, fn, id)
- mp.nextp.set(pp)
- mp.sigmask = initSigmask
- if gp := getg(); gp != nil && gp.m != nil && (gp.m.lockedExt != 0 || gp.m.incgo) && GOOS != "plan9" {
- lock(&newmHandoff.lock)
- if newmHandoff.haveTemplateThread == 0 {
- throw("on a locked thread with no template thread")
- }
- mp.schedlink = newmHandoff.newm
- newmHandoff.newm.set(mp)
- if newmHandoff.waiting {
- newmHandoff.waiting = false
- notewakeup(&newmHandoff.wake)
- }
- unlock(&newmHandoff.lock)
- releasem(getg().m)
- return
- }
- newm1(mp)
- releasem(getg().m)
- }
- func newm1(mp *m) {
- if iscgo {
- var ts cgothreadstart
- if _cgo_thread_start == nil {
- throw("_cgo_thread_start missing")
- }
- ts.g.set(mp.g0)
- ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))
- ts.fn = unsafe.Pointer(abi.FuncPCABI0(mstart))
- if msanenabled {
- msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
- }
- if asanenabled {
- asanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
- }
- execLock.rlock()
- // 创建新线程
- asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))
- execLock.runlock()
- return
- }
- execLock.rlock()
- newosproc(mp)
- execLock.runlock()
- }
复制代码 从 newm 看出如果线程都在阻塞中则调用 newm1,newm1 调用 _cgo_thread_start 创建新线程。
由以上分析得出当高并发调用 cgo 且执行时间超过 20 微秒时会创建新线程。
分析 asmcgocall
只分析 amd64
asm_amd64.s- TEXT ·asmcgocall(SB),NOSPLIT,$0-20
- MOVQ fn+0(FP), AX
- MOVQ arg+8(FP), BX
- MOVQ SP, DX
- // 考虑是否需要切换到 m.g0 栈
- // 也用来调用创建新的 OS 线程,这些线程已经在 m.g0 栈中了
- get_tls(CX)
- MOVQ g(CX), DI
- CMPQ DI, $0
- JEQ nosave
- MOVQ g_m(DI), R8
- MOVQ m_gsignal(R8), SI
- CMPQ DI, SI
- JEQ nosave
- MOVQ m_g0(R8), SI
- CMPQ DI, SI
- JEQ nosave
-
- // 切换到系统栈
- CALL gosave_systemstack_switch<>(SB)
- MOVQ SI, g(CX)
- MOVQ (g_sched+gobuf_sp)(SI), SP
- // 于调度栈中(pthread 新创建的栈)
- // 确保有足够的空间给四个 stack-based fast-call 寄存器
- // 为使得 windows amd64 调用服务
- SUBQ $64, SP
- ANDQ $~15, SP // 为 gcc ABI 对齐
- MOVQ DI, 48(SP) // 保存 g
- MOVQ (g_stack+stack_hi)(DI), DI
- SUBQ DX, DI
- MOVQ DI, 40(SP) // 保存栈深 (不能仅保存 SP,因为栈可能在回调时被复制)
- MOVQ BX, DI // DI = AMD64 ABI 第一个参数
- MOVQ BX, CX // CX = Win64 第一个参数
- CALL AX // 调用 fn
- // 恢复寄存器、 g、栈指针
- get_tls(CX)
- MOVQ 48(SP), DI
- MOVQ (g_stack+stack_hi)(DI), SI
- SUBQ 40(SP), SI
- MOVQ DI, g(CX)
- MOVQ SI, SP
- MOVL AX, ret+16(FP)
- RET
- nosave:
- // 在系统栈上运行,可能没有 g
- // 没有 g 的情况发生在线程创建中或线程结束中(比如 Solaris 平台上的 needm/dropm)
- // 这段代码和上面类似,但没有保存和恢复 g,且没有考虑栈的移动问题(因为我们在系统栈上,而非 goroutine 栈)
- // 如果已经在系统栈上,则上面的代码可被直接使用,在 Solaris 上会进入下面这段代码。
- // 使用这段代码来为所有 "已经在系统栈" 的调用进行服务,从而保持正确性。
- SUBQ $64, SP
- ANDQ $~15, SP // ABI 对齐
- MOVQ $0, 48(SP) // 上面的代码保存了 g, 确保 debug 时可用
- MOVQ DX, 40(SP) // 保存原始的栈指针
- MOVQ BX, DI // DI = AMD64 ABI 第一个参数
- MOVQ BX, CX // CX = Win64 第一个参数
- CALL AX
- MOVQ 40(SP), SI // 恢复原来的栈指针
- MOVQ SI, SP
- MOVL AX, ret+16(FP)
- RET
复制代码 这段就是将当前栈移到系统栈去执行,因为 C 需要无穷大的栈,在 Go 的栈上执行 C 函数会导致栈溢出。
产生问题
cgo 调用会将当前栈移到系统栈,并且当 cgo 高并发调用且阻塞超过 20 微秒时会新建线程。而 Go 并不会销毁线程,由此造成线程增长。
解决方案
限制 Go 程序最大线程数,默认为 cpu 核数。- runtime.GOMAXPROCS(runtime.NumCPU())
复制代码 使用 channel 限制 cgo 最大并发数为 cpu 核数
[code]package threadimport "runtime"var c chan struct{}func Lock() { c |