同步原语和锁
Golang作为一个原生支持用户态的语言,当提到并发进程,多线程的时候,是离不开锁的,锁是一种并发编程中的同步原语(Synchronization Primitives),它能保证多个 Goroutine 在访问同一片内存时不会出现竞争条件(Race condition)等题目。
基于原语
go语言在sync包中提供了用于同步的一些根本原语,包罗常见的sync.Mutex,sync.RWMutex,sync.WaitGroup,
sync.Once,sync.Cond.
这些根本原语提高了较为底子的同步功能,但是它们是一种相对原始的同步机制,在多数情况下,我们都应该使用抽象层级的更高的 Channel 实现同步。
Mutex
Mutex由两个字段:state,sema构成,此中state表现当前互斥锁的状态,而sema是用于控制锁的状态的信号量。上述两个加起来,只占用8个字节
- 状态
- 在默认的情况下,互斥锁的所有的状态都是0,int32中差异位分别表现了差异的状态
- mutexLocked — 表现互斥锁的锁定状态;
- mutexWoken — 表现从正常模式被从唤醒;
- mutexStarving — 当前的互斥锁进入饥饿状态;
- waitersCount — 当前互斥锁上等待的 Goroutine 个数;
- 正常模式和饥饿模
- 正常模式:锁的的等待者会按照新出的顺序获取锁,但是刚被唤起的goroutine和新创造的进程竞争的时候,大概率会获得锁,为了防止这种情况,一旦goroutine凌驾1ms没有获得锁,就会将当前的状态切换为饥饿模式,防止部分 Goroutine 被『饿死』。
- 在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。假如一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会被切换回正常模式。
- 加锁和解锁
- 互斥锁的加锁是靠 sync.Mutex.Lock 完成的,最新的 Go 语言源代码中已经将 sync.Mutex.Lock 方法举行了简化,方法的主干只保留最常见、简朴的情况 — 当锁的状态是 0 时,将 mutexLocked 位置成 1:假如互斥锁的状态不是0的时候就会调用sync.Mutex.lockSlow 尝试通过自旋(Spinnig)等方式等待锁的释放,该方法的主体是一个非常大 for 循环,这里将该方法分成几个部分先容获取锁的过程:
- 判断当前goroutine是否进入自旋转
- 通过自旋等待互斥锁的释放;
- 计算互斥锁的最新状态;
- 更新互斥锁的状态并获取锁
- 自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。在多核的 CPU 上,自旋可以制止 Goroutine 的切换,使用适当会对性能带来很大的增益,但是使用的不适当就会拖慢整个步伐,所以 Goroutine 进入自旋的条件非常苛刻:
- 互斥锁只有在普通模式下才会进入自旋
- sync.runtime_canSpin 需要返回 true:
- 运行在多 CPU 的呆板上;
- 当前 Goroutine 为了获取该锁进入自旋的次数小于四次;
- 当前呆板上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;
- 假如没有通过CAS 获得锁,会调用 sync.runtime_SemacquireMutex 使用信号量保证资源不会被两个 Goroutine 获取。sync.runtime_SemacquireMutex 会在方法中不停调用尝试获取锁并休眠当前 Goroutine 等待信号量的释放,一旦当前 Goroutine 可以获取信号量,它就会立刻返回。
RWMutex
读写互斥锁sync.RWMutex,是细粒度的互斥锁,她并不限制资源的并发读,但是读写,写写操作无法并行执行。一个常见的服务对资源的读写比例会非常高,由于大多数的读请求之间不会相互影响,所以我们可以读写资源操作的分离,在雷同场景下提高服务的性能。
布局体
sync.RWMutex 中总共包含以下 5 个字段:
- type RWMUtex struct {
- w Mutex
- writerSem uint32
- readerSem uint32
- readerCount int32
- readerWait int32
- }
复制代码
- w 复用互斥锁提供的本领
- writerSem和readSem 分别用于写等待和读等待
- readerCount 存储了当前正在执行的读操作的数量
- readerWait 表现当写操作被阻塞时等待的读操作的个数
我们会依次分析获取写锁和读锁的实现本领,此中:
- 写操作使用 sync.RWMutex.Lock 和 sync.RWMutex.Unlock 方法;
- 读操作使用 sync.RWMutex.RLock 和 sync.RWMutex.RUnlock 方法;
写锁
- 当资源的使用者想要获取写锁时,需要调用 sync.RWMutex.Lock 方法
- 写锁的释放会调用 sync.RWMutex.Unlock 方法
与加锁的过程恰恰相反,写锁的释放分以下几个执行
- 调用atomic.AddInt32 函数将变回正数,释放读锁;
- 通过 for 循环触发所有由于获取读锁而陷入等待的 Goroutine
- 调用 sync.Mutex.Unlock 方法释放写锁
读锁
读锁的加锁方法 sync.RWMutex.RLock
- func (rw *RWMutex) RLock(){
- if atomic.AddInt32(&rw.readerCount,1) < 0 {
- runtime_SemacquireMutex(&rw.readerSem,false,0)
- }
- }
复制代码
- 假如该方法返回函数-其他 Goroutine 获得了写锁,当前 Goroutine 就会调用 sync.runtime_SemacquireMutex 陷入休眠等待锁的释放。
- 假如该方法的结果为非负数 — 没有 Goroutine 获得写锁,当火线法就会乐成返回.
当 Goroutine 想要释放读锁时,会调用如下所示的 sync.RWMutex.RUnlock 方法
- func (rw *RWMutex) RUnlock() {
- if r := atomic.AddInt32(&rw.readerCount,-1);r<0{
- rw.rUnlockSlow(r)
- }
- }
复制代码 WaitGroup
sync.WaitGroup 可以等待一组 Goroutine 的返回,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求:
- reuqests := []*Requests{...}
- wg := &sync.WaitGroup()
- wg.Add(len(requests))
- for _,request := range requests {
- go func(r *Request){
- defer wg.Done()
- }(request)
- }
- wg.Wait()
复制代码 我们可以通过 sync.WaitGroup 将原来顺序执行的代码在多个 Goroutine 中并发执行,加速步伐处理的速率。
布局体
sync.WaitGroup 布局体中的成员变量非常简朴,此中只包含两个成员变量
- type WaitGroup struct {
- noCopy noCopy
- state1 [3]uint32
- }
复制代码
- noCopy 保证 sync.WaitGroup 不会被开发者通过再赋值的方式拷贝
- state1 存储着状态和信号量
接口
此中的 sync.WaitGroup.Done 只是向 sync.WaitGroup.Add 方法传入了 -1,所以我们重点分析另外两个方法 sync.WaitGroup.Add 和 sync.WaitGroup.Wait
- func (wg *WaitGroup) Add(delta int){
- statep,semap := wg.state()
- state := atomic.AddUint64(statep,uint64(delta)<<32)
- v := int32(state >>32)
- w := uint32(state)
- if v < 0 {
- panic("sync: negative WaitGroup counter")
- }
- if v > 0 || w == 0{
- return
- }
- *statep = 0
- for ; w != 0; w-- {
- runtime_Semrelease(semap, false, 0)
- }
- }
复制代码 另一个方法 sync.WaitGroup.Wait
- func (wg *WaitGroup) Wait(){
- statep,semp := wg.state()
- for {
- state := atomic.LoadUint64(statep)
- v :=int32(state >> 32)
- if v == 0{
- return
- }
- if atomic.CompareAndSwapUint64(statep, state, state+1) {
- runtime_Semacquire(semap)
- if +statep != 0 {
- panic("sync: WaitGroup is reused before previous Wait has returned")
- }
- return
- }
- }
- }
复制代码 当 sync.WaitGroup 的计数器归零时,当陷入睡眠状态的 Goroutine 就被唤醒
Once
Go 语言标准库中 sync.Once 可以保证在 Go 步伐运行期间的某段代码只会执行一次。在运行如下所示的代码时,我们会看到如下所示的运行结果
- func main() {
- o := &sync.Once{}
- for i:=0;i<10;i++{
- o.Do(func(){
- fmt.Println("ddd)
- })
- }
- }
复制代码 布局体
每一个 sync.Once 布局体中都只包含一个用于标识代码块是否执行过的 done 以及一个互斥锁 sync.Mutex
- type Once struct {
- done uint32
- m Mutex
- }
复制代码 接口
sync.Once.Do 是 sync.Once 布局体对外唯一暴露的方法
- 假如传入的函数已经执行过了,就会直接返回
- 假如传入的函数没有执行过,就会调用sync.Once.doSlow执行传入函数
- func (o *Once) Do(f func()){
- if atomic.LoadUint32(&o.done) == 0 {
- o.doSlow(f)
- }
- }
- func (o *Once) doSlow(f func()){
- o.m.Lock()
- defer o.m.Unlock()
- if o.done == 0 {
- defer atomic.StoreUinit32(&o.done,1)
- f()
- }
- }
复制代码 Cond
Go标准库的中的sync.Cond是一个条件变量,它可以让一系列的goroutine都在满意特定条件下时候被唤醒,每一个 sync.Cond 布局体在初始化时都需要传入一个互斥锁,我们可以通过下面的例子了解它的使用方法
- func main() {
- c := sync.NewCond(&sync.Mutex{})
- for i :=0;i<10;i++{
- go listen(c)
- }
- time.Sleep(1*time.Second)
- go broadcast(c)
- ch := make(chan os.Signal,1)
- signal.Notify(ch, os.Interrupt)
- <-ch
- }
- func broadcast(c *sync.Cond){
- c.l.Lock()
- c.Broadcast()
- c.l.Unlock()
- }
- func listen(c *sync.Cond) {
- c.l.Lock()
- c.wait()
- fmt.Println("ddd")
- c.l,Unlock()
- }
复制代码 上述代码同时运行了 11 个 Goroutine,这 11 个 Goroutine 分别做了差异事情:
- 10 个 Goroutine 通过 sync.Cond.Wait 等待特定条件的满意;
- 1 个 Goroutine 会调用 sync.Cond.Broadcast 方法关照所有陷入等待的 Goroutine;
sync.Cond.Signal 和 sync.Cond.Broadcast 方法就是用来唤醒调用 sync.Cond.Wait 陷入休眠的 Goroutine,它们两个的实现有一些细微差异:
- sync.Cond.Signal 方法会唤醒队列最前面的 Goroutine;
- sync.Cond.Broadcast 方法会唤醒队列中全部的 Goroutine;
在一样平常情况下,我们都会先调用 sync.Cond.Wait 陷入休眠等待满意期望条件,当满意唤醒条件时,就可以选择使用 sync.Cond.Signal 或者 sync.Cond.Broadcast 唤醒一个或者全部的 Goroutine。
ErrGroup
x/sync/errgroup.Group 就为我们在一组 Goroutine 中提供了同步、错误流传以及上下文取消的功能,我们可以使用如下所示的方式并行获取网页的数据
- var g errgroup.Group
- var urls = []string{
- "http://www.golang.org"
- "http://www.baidu.com"
- }
- for i := range urls {
- url := urls[i]
- g.Go(func() error {
- resp,err := http.Get(url)
- if err == nil{
- resp.Body.Close()
- }
- return err
- })
- }
- if err := g.Wait();err == nil{
- fmt.Println("Successfully fetched all URLs.")
- }
复制代码 x/sync/errgroup.Group.Go 方法能够创建一个 Goroutine 并在此中执行传入的函数,而 x/sync/errgroup.Group.Wait 会等待所有 Goroutine 全部返回,该方法的差异返回结果也有差异的含义:
- 假如返回错误 — 这一组 Goroutine 最少返回一个错误;
- 假如返回空值 — 所有 Goroutine 都乐成执行
Semaphore
信号量是在并发编程中常见的一种同步机制,在需要控制访问资源的进程数量时就会用到信号量,它会保证持有的计数器在 0 到初始化的权重之间波动
- 每次获取资源时都会将信号量中的计数器减去对应的数值,在释放时重新加回来
- 当碰到计数器大于信号量大小时就会进入休眠等待其他线程释放信号
这个布局体对外也只暴露了四个方法:
- x/sync/semaphore.NewWeighted 用于创建新的信号量;
- x/sync/semaphore.Weighted.Acquire 阻塞地获取指定权重的资源,假如当前没有空闲资源,就会陷入休眠等待;
- x/sync/semaphore.Weighted.TryAcquire 非阻塞地获取指定权重的资源,假如当前没有空闲资源,就会直接返回 false;
- x/sync/semaphore.Weighted.Release 用于释放指定权重的资源;
在使用过程中需要注意以下几个题目
- x/sync/semaphore.Weighted.Acquire 和 x/sync/semaphore.Weighted.TryAcquire 方法都可以用于获取资源,前者会阻塞地获取信号量,后者会非阻塞地获取信号量;
- x/sync/semaphore.Weighted.Release 方法会按照 FIFO 的顺序唤醒可以被唤醒的 Goroutine;
- 假如一个 Goroutine 获取了较多地资源,由于 x/sync/semaphore.Weighted.Release 的释放计谋大概会等待比较长的时间
SingleFlight
这个是Go语言的扩展包中提供的另外一个信号量,它能够在一个服务中克制对下游的多次重复请求,比如在redis的缓存雪崩中,能够限制对同一个 Key 的多次重复请求,镌汰对下游的瞬时流量。
在资源获取非常昂贵的时候,就很适当使用x/sync/singleflight.Group
- type service struct {
- requestGroup singleflight.Group
- }
- func (s *service) handleRequest(ctx context.Context, request Request) (Response error){
- v,err,_ := requestGroup.Do(request.Hash(),func() (interface{},error) {
- rows, err := // select * from tables
- if err != nil {
- return nil, err
- }
- })
- if err != nil{
- return nil,err
- }
- return Response {
- rows:rows,
- },nil
- }
复制代码 由于请求的哈希在业务上一样平常表现相同的请求,所以上述代码使用它作为请求的键。固然,我们也可以选择其他的唯一字段作为 x/sync/singleflight.Group.Do 方法的第一个参数镌汰重复的请求。
最后编辑于:2024-08-19 21:07:58 © 著作权归作者所有,转载或内容合作请联系作者
喜好的朋侪记得点赞、收藏、关注哦!!!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |