atomic和sema是实现go中锁的基础,简单看下他们的实现原理。
atomic
`atomic 常用来作为保证原子性的操作。
当多个协程,同时一个数据进行操作时候,如果不加锁,最终的很难得到想要的结果。- var p int64 = 0
- func add() {
- p = p + 1
- }
- func main() {
- for i := 0; i < 1000; i++ {
- go add()
- }
- time.Sleep(time.Second * 5)
- fmt.Println(p) //982
- }
复制代码 这种情况下,最终打印的 都不会是1000,每次不固定。- 改成atomic 能解决
- var p int64 = 0
- func add() {
- atomic.AddInt64(&p, 1)
- }
- func main() {
- for i := 0; i < 1000; i++ {
- go add()
- }
- time.Sleep(time.Second * 5)
- fmt.Println(p)
- }
复制代码 atomic 为什么能做到?- TEXT sync∕atomic·AddInt64(SB), NOSPLIT, $0-24
- GO_ARGS
- MOVD $__tsan_go_atomic64_fetch_add(SB), R9
- BL racecallatomic<>(SB)
- MOVD add+8(FP), R0 // convert fetch_add to add_fetch
- MOVD ret+16(FP), R1
- ADD R0, R1, R0
- MOVD R0, ret+16(FP)
- RET
复制代码老的版本中是能见到,lock 这种操作系统级别的锁,新版的go已经改写了这块逻辑,但是能猜想到效果肯定一样。 如果有理清楚的,评论区可以交流下。
小结:- 原子操作是一种硬件层面加锁的机制
- 保证操作一个变量的时候,其他协程/线程无法访问
复制代码 sema
几乎在go的每个锁的定义都能看到sema的身影,理解了sema再看 互斥锁、读写锁就会很好理解。- 信号量锁/信号锁
- 核心是一个uint32值,含义是同时可并发的数量
- 每一个sema 锁都对应一个SemaRoot结构体
- SemaRoot中有一个平衡二叉树用于协程排队
复制代码 例如:- type Mutex struct {
- state int32
- sema uint32
- }
- sema的uint32中的 每一个数,背后都对应一个 semaRoot的结构体
- type semaRoot struct {
- lock mutex
- treap *sudog // root of balanced tree of unique waiters.
- nwait atomic.Uint32 // Number of waiters. Read w/o the lock.
- }
- type sudog struct {
- g *g // 包含了 协程 g
- next *sudog // 下一个
- prev *sudog
- elem unsafe.Pointer // data element (may point to stack)
- }
复制代码 结构如下:
这里可以先讲下,当这个 sema uint32 值,初始化时候,大于0 比如 赋值5,就代表着,并发时候,有5个协程可以获取锁。其他协程需要等待前面5个释放了,才能进入。
sema 大于0
- // 获取sema锁。大于0的情况
- func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) {
- gp := getg()
- if gp != gp.m.curg {
- throw("semacquire not on the G stack")
- }
- // Easy case. // 容易的情况
- if cansemacquire(addr) {
- return
- }
- // 方法很长,先看简单的部分。
- }
-
- func cansemacquire(addr *uint32) bool {
- for {
- v := atomic.Load(addr) // 根据sema的地址,获取int32的值
- if v == 0 { // 如果未0了,就获取失败了
- return false
- }
- // 大于0 ,则把 sema的值减去1
- if atomic.Cas(addr, v, v-1) { // cas 就是 CompareAndSwapInt 的底层实现
- return true
- }
- }
- }
复制代码 到此,对sema为什么只是定义为一个 uint32的值有了大致理解,就是一个控制能有多少个协程同时获取锁的值。
看下释放:- func semrelease1(addr *uint32, handoff bool, skipframes int) {
- root := semtable.rootFor(addr)
- atomic.Xadd(addr, 1) // 给sema的值 加上1
- // Easy case: no waiters?
- // This check must happen after the xadd, to avoid a missed wakeup
- // (see loop in semacquire).
- if root.nwait.Load() == 0 { // 如果没有 nwait在等待,就直接结束。
- return
- }
- }
复制代码 nwait 就是等待协程的个数。
小结, 当sema的值大于0 :- 获取锁:uint32减1 ,获取成功
- 释放锁:uint32加1,释放成功
复制代码 sema值等于0
再看 semacquire1 - func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) {
- / / Harder case:
- // increment waiter count
- s := acquireSudog()
- root := semtable.rootFor(addr) // 根据sema的地址,获取了包含 sudog的队列
- for {
- root.queue(addr, s, lifo) // 将新的协程放入这个等待队列中
- goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes)
- // 主动调用协程的gopark,让它休眠,gopark的说明看 go GMP中有讲
- }
- releaseSudog(s)
- }
- //再看释放
- func semrelease1(addr *uint32, handoff bool, skipframes int) {
- root := semtable.rootFor(addr)
- atomic.Xadd(addr, 1)
- if root.nwait.Load() == 0 {
- return
- }
- // 如果等待队列不是0 ,就需要释放一个
- // Harder case: search for a waiter and wake it.
- lockWithRank(&root.lock, lockRankRoot)
-
- s, t0 := root.dequeue(addr) // 从全局的队列中,取出一个
- if s != nil {
- root.nwait.Add(-1) // 把等待的数量减一
- }
- unlock(&root.lock) //操作全局队列都需要加锁
- }
复制代码 小结,当sema的值等于0时候:- 获取锁:协程休眠,进入堆树等待
- 释放锁:从堆树中取出一个协程,唤醒
- sema 锁退化成一个专用休眠队列
复制代码 有没有可能sema的值,小于0 ?- 看看sema的定义 `uint32` 所以 不可能。
复制代码 总结下:- atomic原子操作是一种硬件层面的加锁机制。
- sema 背后是一整套锁的管理和等待的机制,开发者在使用时候,感知不到。
- sema的值就是能同时获取锁协程的个数。sema的地址作为了休眠等待队列(平衡树)的key。
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |