Go语言中的并发编程

嚴華  论坛元老 | 2024-9-23 04:43:53 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1728|帖子 1728|积分 5184

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x

  • Go语言中的并发编程
  • Go语言中的并发编程主要依靠于两个核心概念:goroutine 和 channel。
  • 1. Goroutine
  • goroutine 的特点
  • 结束 goroutine
  • 2. Channel
  • 创建 Channel
  • 发送和接收数据
  • Channel 的类型
  • 使用 select 语句
  • 简朴的多个 goroutine
  • 使用 WaitGroup 等待所有 goroutine 完成
  • goroutine与线程

    • 可增长的栈

  • GPM

    • 1. G(Goroutine)
    • 2. M(Operating System Thread)
    • 3. P(Processor)

  • GPM 模型的工作流程
  • P与M一样平常也是一一对应的。他们关系是:
  • 运行队列
  • 协作式与抢占式
  • 在这个示例中,我们通过设置 GOMAXPROCS 来限定可以并发运行的 goroutine 数目。
  • channel
  • channel类型

    • 创建channel

  • channel操作
  • 发送
  • 接收
  • 关闭
  • 关闭后的通道有以下特点:

    • 无缓冲的通道

  • 有缓冲的通道
  • 特点
  • for range从通道循环取值
  • 多重赋值读通道
  • 单向通道
  • 1. 定义单向通道
  • 单向通道的优点
  • worker pool(goroutine池)
  • 实现步骤
  • select多路复用
  • 多路复用的主要概念
  • 并发安全和锁
  • 并发安全
  • 锁的类型
  • 互斥锁
  • 读写互斥锁
  • Cond(条件变量)
  • 主要概念
  • Once(一次性)
  • 主要概念:
  • .WaitGroup(等待组)
  • 主要概念
  • sync.Map
  • 主要特点
  • 常用方法
  • 原子操作
  • 原子操作的上风
  • atomic 包中的主要函数
  Go语言中的并发编程

Go语言中的并发编程主要依靠于两个核心概念:goroutine 和 channel。

1. Goroutine

Goroutine 是Go语言中实现并发的基本单位。
它是一种轻量级的线程,由Go运行时(runtime)管理,
而不是由操作体系直担当理。Goroutine的开销非常小,因此可以在一个步伐中创建成千上万个Goroutine。
goroutine的概念雷同于线程,但goroutine是由Go的运行时(runtime)调度和管理的。
Go步伐会智能地将 goroutine 中的任务公道地分配给每个CPU。
Go语言之所以被称为现代化的编程语言,就是因为它在语言层面已经内置了调度和上下文切换的机制。
Go语言中使用goroutine非常简朴,只需要在调用函数的时候在前面加上go关键字,就可以为一个函数创建一个goroutine。
一个goroutine必定对应一个函数,可以创建多个goroutine去执行相同的函数。
  1. package main
  2. import (
  3.         "fmt"
  4.         "time"
  5. )
  6. func sayHello() {
  7.         for i := 0; i < 5; i++ {
  8.                 fmt.Println("Hello")
  9.                 time.Sleep(100 * time.Millisecond)
  10.         }
  11. }
  12. func main() {
  13.         go sayHello() // 启动一个新的Goroutine
  14.         for i := 0; i < 5; i++ {
  15.                 fmt.Println("World")
  16.                 time.Sleep(100 * time.Millisecond)
  17.         }
  18. }
  19. 在这个例子中,sayHello 函数在一个新的Goroutine中运行,
  20. 而 main 函数在另一个Goroutine中运行。两个Goroutine并发执行,因此你会看到 "Hello" 和 "World" 交替打印。
复制代码
  在步伐启动时,Go步伐就会为main()函数创建一个默认的goroutine。
  当main()函数返回的时候该goroutine就结束了,所有在main()函数中启动的goroutine会一同结束
  goroutine 的特点

轻量级:goroutine 使用非常少的内存,大约 2KB 的初始栈空间,且栈会根据需要动态增长和缩小。
并发执行:浩繁 goroutine 可以并发地执行,使得 Go 语言非常适当处理 I/O 麋集型任务。
调度:Go 运行时负责管理 goroutine 的调度,可以在多个操作体系线程之间动态地分配 goroutine。
结束 goroutine

goroutine 的执行并不是无限的,它会在函数返回后结束。如果你希望主步伐等待某个 goroutine 执行完毕,可以通过使用 Channel 来实现通信。
  1. package main
  2. import (
  3.     "fmt"
  4.     "time"
  5. )
  6. func main() {
  7.     done := make(chan bool)
  8.     go func() {
  9.         fmt.Println("Goroutine 正在执行...")
  10.         time.Sleep(2 * time.Second) // 模拟耗时任务
  11.         done <- true // 发送信号通知主程序
  12.     }()
  13.     <-done // 阻塞,等待 goroutine 完成
  14.     主程序会阻塞在这一行,直到从 done channel 接收到数据(即等待 goroutine 的通知)。这样可以确保在 goroutine 完成任务后,主程序才会继续执行。
  15.     fmt.Println("Goroutine 完成.")
  16. }
复制代码
2. Channel

Channel 是Go语言中用于在Goroutine之间通报数据的通信机制。
Channel可以看作是一个管道,一个Goroutine可以将数据发送到Channel,另一个Goroutine可以从Channel接收数据。
创建 Channel

  1. 可以使用 make 函数来创建一个 channel。 channel 的类型是在声明时指定的,表明可以传递的数据类型。
  2. ch := make(chan int) // 创建一个传递 int 类型数据的 channel
复制代码
发送和接收数据

一旦 channel 被创建,就可以使用 <- 操作符发送和接收数据。
发送数据: 使用 ch <- value 将数据发送到 channel 中。
接收数据: 使用 value := <-ch 从 channel 中接收数据。
  1. package main
  2. import (
  3.     "fmt"
  4. )
  5. func main() {
  6.     ch := make(chan string)
  7.     go func() {
  8.         ch <- "Hello, Channel!" // 发送数据到 channel
  9.     }()
  10.     msg := <-ch // 从 channel 接收数据
  11.     fmt.Println(msg) // 输出:Hello, Channel!
  12. }
复制代码
Channel 的类型

Channel 可以是无缓冲(unbuffered)或有缓冲(buffered):
无缓冲 Channel: 默认情况下,channel 是无缓冲的,发送和接收操作会阻塞,直到另一端预备好。
有缓冲 Channel: 可以创建一个有缓冲的 channel,指定缓冲区的大小。发送操作将在缓冲区满时阻塞,而接收操作将在缓冲区为空时阻塞。
  1. ch := make(chan int, 3) // 创建一个缓冲区大小为 3 的 channel
  2. ch <- 1 // 发送数据
  3. ch <- 2
  4. ch <- 3 // 发送数据不会阻塞,直到缓冲区满
  5. // ch <- 4 // 这将导致阻塞,因为缓冲区已满
  6. fmt.Println(<-ch) // 接收数据
复制代码
使用 select 语句

select 语句用于等待多个 channel 操作,它会监听多个 channel,并在其中恣意一个 channel 可用时执行对应的操作。
  1. package main
  2. import (
  3.     "fmt"
  4.     "time"
  5. )
  6. func main() {
  7.     ch1 := make(chan string)
  8.     ch2 := make(chan string)
  9.     go func() {
  10.         time.Sleep(1 * time.Second)
  11.         ch1 <- "来自通道 1"
  12.     }()
  13.     go func() {
  14.         time.Sleep(2 * time.Second)
  15.         ch2 <- "来自通道 2"
  16.     }()
  17.     select {
  18.     case msg1 := <-ch1:
  19.         fmt.Println(msg1)
  20.     case msg2 := <-ch2:
  21.         fmt.Println(msg2)
  22.     }
  23. }
  24. 在这个示例中,select 语句阻塞,直到从 ch1 或 ch2 接收到数据。一旦某个 channel 可用,select 将执行相应的代码块。
复制代码
简朴的多个 goroutine

  1. 并发地启动多个 goroutine,每个 goroutine 输出一个计数。
  2. package main
  3. import (
  4.     "fmt"
  5.     "time"
  6. )
  7. func count(id int) {
  8.     for i := 0; i < 5; i++ {
  9.         fmt.Printf("Goroutine %d: %d\n", id, i)
  10.         time.Sleep(100 * time.Millisecond) // 模拟耗时操作
  11.     }
  12. }
  13. func main() {
  14.     for i := 0; i < 3; i++ { // 启动 3 个 goroutine
  15.         go count(i) // 每个 goroutine 执行 count 函数
  16.     }
  17.     time.Sleep(1 * time.Second) // 等待 goroutine 完成
  18.     fmt.Println("主 goroutine 结束.")
  19. }
复制代码
使用 WaitGroup 等待所有 goroutine 完成

通常,直接使用 time.Sleep 来等待 goroutine 完成不是一个好的实践。
可以使用 sync.WaitGroup 来等待多个 goroutine 完成。
在这个示例中,使用 sync.WaitGroup 实现了更优雅的等待。
通过 wg.Add(1) 增长等待计数,
通过 wg.Done() 在每个 goroutine 完成时减少计数,
最后通过 wg.Wait() 等待所有计数归零,表示所有 goroutine 已完成。
  1. package main
  2. import (
  3.     "fmt"
  4.     "sync"
  5.     "time"
  6. )
  7. func count(id int, wg *sync.WaitGroup) {
  8.     defer wg.Done() // 在函数结束时调用 Done(),表示这个 goroutine 完成了
  9.     for i := 0; i < 5; i++ {
  10.         fmt.Printf("Goroutine %d: %d\n", id, i)
  11.         time.Sleep(100 * time.Millisecond) // 模拟耗时操作
  12.     }
  13. }
  14. func main() {
  15.     var wg sync.WaitGroup
  16.     for i := 0; i < 3; i++ {
  17.         wg.Add(1) // 增加等待计数
  18.         go count(i, &wg) // 启动 goroutine
  19.     }
  20.     wg.Wait() // 等待所有 goroutine 完成
  21.     fmt.Println("所有 goroutine 完成,主 goroutine 结束.")
  22. }
复制代码
goroutine与线程

可增长的栈

OS线程(操作体系线程)一样平常都有固定的栈内存(通常为2MB),
一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),
goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限定可以达到1GB,
虽然极少会用到这么大。所以在Go语言中一次创建十万左右的goroutine也是可以的。
GPM

GPM(Goroutine, OS Thread, and M:即 Goroutine、操作体系线程和调度器)是 Go 语言的调度模型,主要负责管理 goroutine 的执行。
1. G(Goroutine)

  1.    G 代表一个 goroutine,是 Go 程序中的一个轻量级线程。
  2.    每个 goroutine 被分配少量的内存(通常为 2KB 的初始栈空间),并且这个栈空间是动态增长的。
  3.    通过 go 关键字创建的 goroutine 是并发执行的,多个 goroutine 可以同时处于工作状态,使得程序可以有效利用 CPU 资源。
复制代码
2. M(Operating System Thread)

  1.    M 代表操作系统线程。 Go 运行时系统管理着多个 M,每个 M 都是一个操作系统线程。
  2.    调度器通过将 goroutine 分配到 M 上执行,使得 goroutine 的执行不受限于固定的操作系统线程数量。
  3.    M 的数量可以根据系统的实际情况进行控制,开发者可以使用 runtime.GOMAXPROCS(n) 来设置可运行的 M 的最大数量。
复制代码
3. P(Processor)

  1.    P 代表处理器,是 Go 运行时的一个概念,表示调度器用来管理 goroutine 的资源。
  2.    每个 P 有一个本地运行队列(local run queue),用于存放可以运行的 goroutine。
  3.    P 还维护与 M 的关联,使得每个 M 在执行时能访问一个 P 及其本地队列。
  4.    P 的数量通常与 GOMAXPROCS 的值相等,决定了可以并发执行的 goroutine 的数量。
复制代码
GPM 模型的工作流程

  1. 创建 goroutine:开发者通过 go 关键字启动一个新的 goroutine。Go 运行时会在内部创建一个相应的 G 实例。
  2. 调度和执行:
  3. 调度器会将 G 放入适当的 P 的本地队列中。
  4. M 会从 P 的本地队列中获取 G 并执行。每个 M 可以访问其绑定的 P 的 goroutine。
  5. 解决阻塞:
  6. 当 goroutine 进行 I/O 操作或其他阻塞操作时,调度器会将该 G 标记为阻塞状态,
  7. 同时将其他可运行的 G 移入 M 的任务队列,确保 CPU 资源的高效利用。
  8. 调度抢占:
  9. 时间片调度:调度器会为每个 goroutine 分配一个时间片(通常为 10 毫秒)。如果 goroutine 在这段时间内没有主动让出 CPU 时间,调度器会强制切换到其他 goroutine。
  10. 阻塞和唤醒:当 goroutine 执行阻塞操作(如 I/O 操作、channel 发送/接收等)时,调度器会将其挂起并将 CPU 资源分配给其他可运行的 goroutine。
  11. 回收和清理:
  12. 当 goroutine 完成其任务后,其相关的 G 实例将被清理,释放相关资源。
复制代码
P与M一样平常也是一一对应的。他们关系是:

P管理着一组G挂载在M上运行。当一个G长久阻塞在一个M上时,runtime会新建一个M,阻塞G所在的P会把其他的G 挂载在新建的M上。当旧的G阻塞完成或者以为其已经死掉时 回收旧的M。
P的个数是通过runtime.GOMAXPROCS设定(最大256),Go1.5版本之后默以为物理线程数。 在并发量大的时候会增长一些P和M,但不会太多,切换太频仍的话得不偿失。
运行队列

Go 的调度器维护了多个运行队列(run queue),其中包括:
全局队列:所有的 goroutine 的全局运行队列。
本地队列:每个线程都有本身的本地队列,用于存储该线程上正在运行的 goroutine。
调度器会在这些队列中移动 goroutine,以确保每个线程能够尽快获取需要执行的 goroutine。
单从线程调度讲,Go语言相比起其他语言的上风在于OS线程是由OS内核来调度的,
goroutine则是由Go运行时(runtime)本身的调度器调度的,
这个调度器使用一个称为m:n调度的技能(复用/调度m个goroutine到n个OS线程)。
其一大特点是goroutine的调度是在用户态下完成的, 不涉及内核态与用户态之间的频仍切换,
包括内存的分配与开释,都是在用户态维护着一块大的内存池,
不直接调用体系的malloc函数(除非内存池需要改变),本钱比调度OS线程低很多。
Go 运行时维护着一块大的内存池,减少了对操作体系的频仍 malloc 和 free 调用,降低了内存分配与开释的本钱
另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上,
再加上本身goroutine的超轻量,以上种种保证了go调度方面的性能。
协作式与抢占式

虽然 Go 的调度器支持抢占式调度,但它也鼓励协作式调度。
开发者可以通过在 goroutine 中添加 runtime.Gosched() 来主动让出 CPU 控制权,
答应其他 goroutine 执行。这种方式可以资助进步 CPU 的利用率。
在这个示例中,我们通过设置 GOMAXPROCS 来限定可以并发运行的 goroutine 数目。

  1. package main
  2. import (
  3.     "fmt"
  4.     "runtime"
  5.     "sync"
  6.     "time"
  7. )
  8. func worker(id int, wg *sync.WaitGroup) {
  9.     defer wg.Done()
  10.     fmt.Printf("Worker %d is starting\n", id)
  11.     time.Sleep(2 * time.Second) // 模拟耗时工作
  12.     fmt.Printf("Worker %d is done\n", id)
  13. }
  14. func main() {
  15.     // 设置 GOMAXPROCS 为 2
  16.     runtime.GOMAXPROCS(2)
  17.     var wg sync.WaitGroup
  18.     for i := 0; i < 5; i++ {
  19.         wg.Add(1)
  20.         go worker(i, &wg)
  21.     }
  22.     wg.Wait() // 等待所有 goroutine 完成
  23.     fmt.Println("所有工作完成")
  24. }
复制代码
channel

单纯地将函数并发执行是没故意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。
虽然可以使用共享内存进行数据交换,但是共享内存在差异的goroutine中容易发生竞态标题。为了保证数据交换的准确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能标题。
Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信
如果说goroutine是Go步伐并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵照先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。
channel类型

   var 变量 chan 元素类型
    var ch1 chan int // 声明一个通报整型的通道
  var ch2 chan bool // 声明一个通报布尔型的通道
  var ch3 chan []int // 声明一个通报int切片的通道
  创建channel

通道是引用类型,通道类型的空值是nil。
  1. var ch chan int
  2. fmt.Println(ch) // <nil>
复制代码
声明的通道后需要使用make函数初始化之后才能使用。
创建channel的格式如下:
  1. make(chan 元素类型, 容量(缓冲区大小 可选))
复制代码
channel操作

通道有发送(send)、接收(receive)和关闭(close)三种操作。
发送和接收都使用<-符号。
现在我们先使用以下语句定义一个通道:
   ch := make(chan int)
  发送

将一个值发送到通道中。
  1. ch <- 10 // 把10发送到ch中
复制代码
接收

从一个通道中接收值。
  1. x := <- ch // 从ch中接收值并赋值给变量x
  2. <-ch       // 从ch中接收值,忽略结果
复制代码
关闭

我们通过调用内置的close函数来关闭通道。
  1. close(ch)
复制代码
关于关闭通道需要留意的事故是,只有在关照接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。
通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。
关闭后的通道有以下特点:


  • 对一个关闭的通道再发送值就会导致panic。
  • 对一个关闭的通道进行接收会一直获取值直到通道为空。
  • 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
  • 关闭一个已经关闭的通道会导致panic。
无缓冲的通道

无缓冲通道(也称为阻塞通道)是一种在发送和接收操作之间创建直接同步的机制。
无缓冲的通道只有在有人接收值的时候才能发送值。
  1. package main
  2. import (
  3.     "fmt"
  4. )
  5. func main() {
  6.     ch := make(chan string) // 创建一个无缓冲的通道
  7.     go func() {
  8.         fmt.Println("Goroutine 正在等待发送消息...")
  9.         ch <- "Hello, Channel!" // 发送消息,这里会阻塞,直到有接收者
  10.         fmt.Println("消息已发送")
  11.     }()
  12.     msg := <-ch // 在此阻塞,等待接收消息
  13.     fmt.Println("接收到的消息:", msg)
  14. }
复制代码
  特点
  阻塞特性:无缓冲通道的发送和接收操作都是阻塞的,在没有接收者时,发送操作会等待接收者。在接收到数据之后,发送操作才会继续,反之亦然。这使得发送和接收操作之间形成一种直接的同步机制。
  实用于协作:无缓冲通道非常适当用于 goroutine 之间的协作与同步,比方在任务执行完成后关照主步伐。
  有缓冲的通道

它与无缓冲通道的主要区别在于,有缓冲通道可以在不阻塞的情况下存储一定数目的消息,直到缓冲区满。这种机制使得 goroutine 的发送和接收操作更加灵活。
  1. 创建有缓冲的通道
  2. 可以在创建通道时指定缓冲区的大小,示例如下:
  3. ch := make(chan string, 3) // 创建一个容量为 3 的有缓冲通道
复制代码
  1. package mainimport (    "fmt"    "time")func main() {    ch := make(chan string, 3) // 创建一个容量为 3 的有缓冲通道    go func() {        for i := 0; i < 5; i++ {            msg := fmt.Sprintf("消息 %d", i)            fmt.Printf("发送: %s\n", msg)            ch <- msg // 发送消息            time.Sleep(100 * time.Millisecond) // 模拟耗时操作        }        close(ch)
  2. // 发送完毕后关闭通道    }()    // 接收消息    for msg := range ch { // 会一直接收直到通道关闭        fmt.Printf("接收到: %s\n", msg)    }}
复制代码
特点

非阻塞的发送:当有缓冲区空间时,发送操作不会阻塞。如果缓冲区已满,则发送操作会阻塞,直到有接收者取走数据。
进步了并发效率:通过有缓冲通道,可以进步并发步伐的效率,答应发送者和接收者在差异速度下处理数据。
可以在多个 goroutine 之间通报数据:多个 goroutine 可以同时向同一个有缓冲通道发送数据,进步了数据通报的灵活性。
for range从通道循环取值

当向通道中发送完数据时,我们可以通过close函数来关闭通道。
当通道被关闭时,再往该通道发送值会引发panic,从该通道取值的操作会先取完通道中的值,再然后取到的值一直都是对应类型的零值。
那怎样判断一个通道是否被关闭了
多重赋值读通道

使用多重赋值从通道读取数据时,可以检测到通道的关闭。具体方法是从通道读取两个值:一个是通道中的数据,另一个是一个布尔值,表示通道是否被关闭。
  1. package mainimport (    "fmt"    "time")func main() {    ch := make(chan string)    go func() {        messages := []string{"消息1", "消息2", "消息3"}        for _, msg := range messages {            ch <- msg // 发送消息            time.Sleep(100 * time.Millisecond)        }        close(ch)
  2. // 关闭通道    }()    // 从通道接收数据    for {        msg, ok := <-ch // 尝试从通道接收数据        if !ok { // ok 为 false 说明通道已关闭            fmt.Println("通道已关闭,退出循环")            break        }        fmt.Println("接收到:", msg)    }}
复制代码
单向通道

单向通道是 Go 语言中的一种通道形式,其限定某个通道只能用于发送或接收数据,而不能同时进行。这种特性有助于进步代码的可读性和安全性,避免不测的错误使用。
单向通道在函数参数中尤其常用,可以通过 chan<- 和 <-chan 来分别定义。
1. 定义单向通道

发送通道:可以使用 chan<- 来表示一个只用于发送数据的通道。
接收通道:可以使用 <-chan 来表示一个只用于接收数据的通道。
  1. package mainimport (    "fmt")// 定义一个函数,担当一个发送通道作为参数 在这个函数中,ch 被定义为一个发送通道,意味着这个函数只能向 ch 发送数据,无法从中接收数据func sendData(ch chan<- string) {    messages := []string{"消息1", "消息2", "消息3"}    for _, msg := range messages {        ch <- msg // 发送消息    }    close(ch)
  2. // 关闭通道}// 定义一个函数,担当一个接收通道作为参数 这个函数的参数是一个接收通道,只能从 ch 接收数据,无法发送数据。func receiveData(ch <-chan string) {    for msg := range ch {        fmt.Println("接收到:", msg) // 接收并打印消息    }}func main() {    ch := make(chan string) // 创建通道    go sendData(ch)   // 启动发送 goroutine    receiveData(ch)   // 启动接收 goroutine,阻塞等待接收数据}
复制代码
单向通道的优点

类型安全:通过限定通道的方向,可以防止函数之间的不测数据发送或接收错误,提升代码的安全性。
明白的计划:使用单向通道可以清楚地表达数据流向,使得步伐逻辑更易于明白。
worker pool(goroutine池)

Worker Pool(工作池或 goroutine 池)是一种计划模式,用于管理大量的 goroutine,
通过限定并发执行的 goroutine 数目来进步步伐的性能和资源使用效率。
这种模式特别实用于需要处理大量相似任务的场景,比如处理 HTTP 哀求、计算任务等。
Worker Pool 模式的核心 idea 是将任务分配给一组 worker(工作者 goroutine),这些 worker 会从一个共享的任务队列中获取任务并执行。
通过控制 worker 的数目,可以防止过多的 goroutine 同时运行,导致体系资源耗尽。
实现步骤

创建任务队列:使用通道来存放待处理的任务。
创建 Worker:Worker 从任务队列中担当任务并执行。
启动 Workers:预先启动一定数目的 worker,以处理任务。
发布任务:将任务发送到任务队列中,供 worker 处理
  1. package main
  2. import (
  3.     "fmt"
  4.     "sync"
  5.     "time"
  6. )
  7. // 1. 定义任务结构体
  8. type Task struct {
  9.     id int // 任务标识
  10. }
  11. // 2. 定义 Worker 函数,处理任务
  12. func worker(id int, tasks <-chan Task, wg *sync.WaitGroup) {
  13.     defer wg.Done() // 标记当前 worker 完成
  14.     for task := range tasks { // 从任务通道中接收任务
  15.         fmt.Printf("Worker %d 正在处理任务 %d\n", id, task.id)
  16.         time.Sleep(1 * time.Second) // 模拟耗时操作
  17.         fmt.Printf("Worker %d 完成任务 %d\n", id, task.id)
  18.     }
  19. }
  20. func main() {
  21.     // 3. 设置 worker 数量
  22.     const numWorkers = 3 // 设定工作池的 worker 数量
  23.     tasks := make(chan Task, 10) // 创建任务通道(容量为 10)
  24.     var wg sync.WaitGroup // 创建 WaitGroup 用于等待所有 worker 完成
  25.     // 4. 启动 workers
  26.     for i := 1; i <= numWorkers; i++ {
  27.         wg.Add(1) // 将 WaitGroup 的计数器加 1
  28.         go worker(i, tasks, &wg) // 启动一个 worker goroutine
  29.     }
  30.     // 5. 发布任务
  31.     for i := 1; i <= 10; i++ {
  32.         tasks <- Task{id: i} // 将任务发送到任务通道
  33.     }
  34.     close(tasks) // 关闭任务通道,表示不再添加任务
  35.     // 6. 等待所有 worker 完成
  36.     wg.Wait() // 阻塞直到所有 worker 完成
  37.     fmt.Println("所有任务完成.")
  38. }
复制代码
select多路复用

多路复用通常指的是通过 select 语句来同时监控多个通道(channels),以便在多个 goroutine 之间进行有效的通信。
这种机制答应步伐同时处理多个 I/O 操作或 goroutine,极大地进步了并发步伐的灵活性和效率。
多路复用的主要概念

select 语句: select 语句是 Go 提供的控制结构,可以在多个通道操作之间进行选择。它会阻塞,直到其中一个通道预备好发送或接收数据。
非阻塞操作: 通过使用 select,可以在多个通道之间进行非阻塞的选择,这样可以避免由于单个操作阻塞而导致整个步伐停滞的情况。
超时处理: 使用 select 可以很容易地实现超机遇制,从而在某个操作未完成时接纳相应措施。
  1. package main
  2. import (
  3.     "fmt"
  4.     "time"
  5. )
  6. func main() {
  7.     ch1 := make(chan string)
  8.     ch2 := make(chan string)
  9.     // 启动第一个 goroutine
  10.     go func() {
  11.         time.Sleep(2 * time.Second) // 模拟耗时操作
  12.         ch1 <- "来自通道 1"
  13.     }()
  14.     // 启动第二个 goroutine
  15.     go func() {
  16.         time.Sleep(1 * time.Second) // 模拟耗时操作
  17.         ch2 <- "来自通道 2"
  18.     }()
  19.     // 使用 select 语句处理多个通道
  20.     for i := 0; i < 2; i++ { // 等待两个结果
  21.         select {
  22.         case msg1 := <-ch1:
  23.             fmt.Println("接收到:", msg1)
  24.         case msg2 := <-ch2:
  25.             fmt.Println("接收到:", msg2)
  26.         }
  27.     }
  28.    
  29.     fmt.Println("所有消息处理完毕.")
  30. }
复制代码
并发安全和锁

在并发编程中,多个 goroutine 可能会同时访问共享数据,导致数据竞争和不同等的状态。
因此,确保并发安满是一个重要的思量。
在 Go 语言中,主要通过使用锁和其他同步机制来实现并发安全。
并发安全

并发安满是确保多个 goroutine 在并行执行时,访问共享资源(如变量、数据结构)时不会导致不同等或错误的状态。为了实现并发安全,通常需要对共享资源进行适当的同步。
锁的类型

互斥锁(Mutex):
Go 语言提供的 sync.Mutex 类型可以实现互斥锁,用于保护共享数据的访问。
当一个 goroutine 获取锁时,其他试图获取同一锁的 goroutine 将被阻塞,直到锁被开释。
读写锁(RWMutex):
sync.RWMutex 答应有多个读者或一个写者访问共享资源。
读者可以并发访问,当一个写者想要写数据时,所有的读者会被阻塞。
channel:
Go 语言的通道(channel)也可以用作同步机制,通过通道的发送和接收来实现数据的安全交换。
互斥锁

  1. package main
  2. import (
  3.     "fmt"
  4.     "sync"
  5.     "time"
  6. )
  7. var (
  8.     balance int           // 共享变量
  9.     mu      sync.Mutex   // 互斥锁
  10. )
  11. // 存款函数
  12. func deposit(amount int, wg *sync.WaitGroup) {
  13.     defer wg.Done() // 确保完成后减少 WaitGroup 的计数
  14.     mu.Lock() // 获取锁
  15.     fmt.Printf("正在存款: %d\n", amount)
  16.     balance += amount // 更新共享变量
  17.     mu.Unlock() // 释放锁
  18. }
  19. // 取款函数
  20. func withdraw(amount int, wg *sync.WaitGroup) {
  21.     defer wg.Done()
  22.     mu.Lock() // 获取锁
  23.     fmt.Printf("正在取款: %d\n", amount)
  24.     balance -= amount // 更新共享变量
  25.     mu.Unlock() // 释放锁
  26. }
  27. func main() {
  28.     var wg sync.WaitGroup
  29.     wg.Add(2) // 增加等待的 goroutine 数量
  30.     go deposit(100, &wg)
  31.     go withdraw(50, &wg)
  32.     wg.Wait() // 等待所有 goroutine 完成
  33.     fmt.Printf("账户余额: %d\n", balance) // 输出最终余额
  34. }
复制代码
读写互斥锁

读写互斥锁的重要概念
多个读者:RWMutex 答应多个 goroutine 同时读取共享资源,这在读操作远多于写操作的场景下,可以显著进步性能。
独占写者:当一个 goroutine 写数据时,其他所有的读者和写者都会被阻塞,直到写操作完成。这确保了写操作的安全性和数据的同等性。
性能上风:在读操作频仍的情况下,读写互斥锁通常比普通的互斥锁(sync.Mutex)性能更好,因为它减少了读操作的阻塞。
  1. package main
  2. import (
  3.     "fmt"
  4.     "sync"
  5.     "time"
  6. )
  7. // 定义一个共享数据结构
  8. type Data struct {
  9.     value int
  10.     mu    sync.RWMutex // 读写互斥锁
  11. }
  12. // 读取数据的函数
  13. func (d *Data) Read() int {
  14.     d.mu.RLock() // 获取读锁
  15.     defer d.mu.RUnlock() // 确保在函数结束时释放读锁
  16.     return d.value // 返回共享数据
  17. }
  18. // 写入数据的函数
  19. func (d *Data) Write(value int) {
  20.     d.mu.Lock() // 获取写锁
  21.     defer d.mu.Unlock() // 确保在函数结束时释放写锁
  22.     d.value = value // 更新共享数据
  23. }
  24. func main() {
  25.     data := Data{}
  26.     // 启动多个写操作
  27.     go func() {
  28.         for i := 0; i < 5; i++ {
  29.             data.Write(i) // 写入数据
  30.             fmt.Printf("写入: %d\n", i)
  31.             time.Sleep(500 * time.Millisecond)
  32.         }
  33.     }()
  34.     // 启动多个读操作
  35.     for i := 0; i < 5; i++ {
  36.         go func() {
  37.             time.Sleep(200 * time.Millisecond) // 确保读取时写入已经发生
  38.             value := data.Read() // 读取数据
  39.             fmt.Printf("读取: %d\n", value)
  40.         }()
  41.     }
  42.     // 睡眠一段时间以等待所有 goroutine 完成
  43.     time.Sleep(3 * time.Second)
  44. }
复制代码
Cond(条件变量)

sync.Cond 用于在多个 goroutine 之间进行复杂的同步。
当一个条件不满足时,可以阻塞当前 goroutine,
并在条件满足时关照等待的 goroutine 继续执行。
主要概念

阻塞和叫醒:使用条件变量,goroutine 可以在某个特定条件未满足时等待。当某些条件被更改(满足)时,可以通过条件变量叫醒等待的 goroutine。
互斥锁:条件变量通常和互斥锁(sync.Mutex 或 sync.RWMutex)结合使用,以确保在查抄条件和修改条件之间的原子性。
多生产者与消费者:条件变量经常用于多生产者和多消费者模式,以协调生产和消费过程。
  1. package main
  2. import (
  3.     "fmt"
  4.     "sync"
  5.     "time"
  6. )
  7. type BoundedQueue struct {
  8.     items    []int// 队列元素
  9.     capacity int// 队列容量
  10.     mu       sync.Mutex// 互斥锁
  11.     cond     *sync.Cond // 条件变量
  12. }
  13. // 创建一个新的 BoundedQueue
  14. func NewBoundedQueue(capacity int) *BoundedQueue {
  15.     q := &BoundedQueue{
  16.         items:    make([]int, 0, capacity),
  17.         capacity: capacity,
  18.     }
  19.     q.cond = sync.NewCond(&q.mu)
  20.     return q
  21. }
  22. // 向队列中添加一个元素
  23. func (q *BoundedQueue) Enqueue(item int) {
  24.     q.mu.Lock() // 加锁以保护共享数据
  25.     defer q.mu.Unlock()
  26.     for len(q.items) == q.capacity {
  27.         q.cond.Wait() // 队列满时,等待条件变量
  28.     }
  29.     // 添加元素
  30.     q.items = append(q.items, item)
  31.     fmt.Printf("生产者: 生产 %d\n", item)
  32.     q.cond.Signal() // 唤醒等待的消费者
  33. }
  34. // 从队列中取出一个元素
  35. func (q *BoundedQueue) Dequeue() int {
  36.     q.mu.Lock() // 加锁以保护共享数据
  37.     defer q.mu.Unlock()
  38.     for len(q.items) == 0 {
  39.         q.cond.Wait() // 队列空时,等待条件变量
  40.     }
  41.     // 删除并返回第一个元素
  42.     item := q.items[0]
  43.     q.items = q.items[1:]
  44.     fmt.Printf("消费者: 消费 %d\n", item)
  45.     q.cond.Signal() // 唤醒等待的生产者
  46.     return item
  47. }
  48. func main() {
  49.     queue := NewBoundedQueue(5) // 创建一个容量为 5 的队列
  50.     var wg sync.WaitGroup
  51.     // 启动多个生产者
  52.     for i := 1; i <= 3; i++ {
  53.         wg.Add(1)
  54.         go func(id int) {
  55.             defer wg.Done()
  56.             for j := 0; j < 5; j++ {
  57.                 queue.Enqueue(j + id*100) // 生产不同的产品
  58.                 time.Sleep(time.Second) // 模拟生产时间
  59.             }
  60.         }(i)
  61.     }
  62.     // 启动多个消费者
  63.     for i := 1; i <= 3; i++ {
  64.         wg.Add(1)
  65.         go func() {
  66.             defer wg.Done()
  67.             for j := 0; j < 5; j++ {
  68.                 queue.Dequeue() // 消费产品
  69.                 time.Sleep(2 * time.Second) // 模拟消费时间
  70.             }
  71.         }()
  72.     }
  73.     wg.Wait() // 等待所有 goroutine 完成
  74.     fmt.Println("所有生产和消费任务完成.")
  75. }
复制代码
Once(一次性)

在 Go 语言中,sync.Once 是一个用于确保某个操作只被执行一次的同步机制。
它用于对资源进行单次初始化或设置,避免多次初始化可能导致的数据不同等和资源浪费。
常见的使用场景包括单例模式和全局变量的初始化。
主要概念:

确保执行一次:使用 sync.Once 的 Do 方法可以确保给定的函数由于多次调用而只执行一次,
无论有多少 goroutine 并发调用它。
线程安全:Once 是线程安全的,适当在多个 goroutine 中使用,比方在初始化全局变量时。
  1. package main
  2. import (
  3.     "fmt"
  4.     "sync"
  5. )
  6. // 1. 定义全局变量
  7. var (
  8.     instance *Singleton     // 存储单例实例的指针
  9.     once     sync.Once      // 用于确保操作只执行一次
  10. )
  11. // 2. 定义单例结构体
  12. type Singleton struct {
  13.     value int // 单例的属性,存储的值
  14. }
  15. // 3. 获取单例实例的函数
  16. func GetInstance() *Singleton {
  17.     // 4. 确保传入的函数只会执行一次
  18.     once.Do(func() {
  19.         fmt.Println("初始化单例...") // 仅在第一次被调用时打印
  20.         instance = &Singleton{value: 42} // 实例化单例,赋予 value 值为 42
  21.     })
  22.     return instance // 返回单例实例
  23. }
  24. func main() {
  25.     var wg sync.WaitGroup // 创建一个 WaitGroup,用于等待 goroutine 完成
  26.     // 5. 启动多个 goroutine
  27.     for i := 0; i < 5; i++ {
  28.         wg.Add(1) // 增加 WaitGroup 的计数
  29.         go func(id int) {
  30.             defer wg.Done() // 在 goroutine 结束时减少 WaitGroup 的计数
  31.             inst := GetInstance() // 获取单例实例
  32.             fmt.Printf("Goroutine %d: 单例值为 %d\n", id, inst.value) // 输出 goroutine ID 及单例的值
  33.         }(i) // 向 goroutine 传递当前 ID
  34.     }
  35.     wg.Wait() // 等待所有 goroutine 完成
  36. }
复制代码
.WaitGroup(等待组)

主要概念

计数器:WaitGroup 维护一个计数器,表示正在运行的 goroutine 数目。
Add:通过 Add(n) 方法增长 goroutine 数目,n 通常为 1,表示启动一个新的 goroutine。
Done:在 goroutine 执行完毕后,需要调用 Done() 方法来减少计数器的值。
Wait:主步伐调用 Wait() 方法,该方法会阻塞,直到计数器的值变为 0,表示所有的 goroutine 都已完成。
sync.Map

sync.Map 是 Go 1.9 版本引入的一种并发安全的、无锁的、键值对映射。
sync.Map 是一种并发安全的映射类型,旨在提供高效的多 goroutine 读写操作。
它是标准库 sync 包中的一部分,特别计划用于替代传统的 map 类型,
以办理在并发情况下通常需要手动管理的同步标题。
主要特点

并发安全:sync.Map 在并发情况中安全,不需要额外的锁来管理对它的读写操作。
读优化:它采用了读优化策略,适当读操作远多于写操作的场景。
结合原生 map 使用:针对不常变更的值,可以直接使用原生的 map,而对于需要并发读写的场景使用 sync.Map。
内置接口:sync.Map 提供了一些内置方法,简化了对映射的操作。
常用方法

Store(key, value):将键-值对存储到 sync.Map 中。
Load(key):获取指定键的值,如果存在返回值和 true,否则返回 nil 和 false。
LoadOrStore(key, value):如果指定键存在,返回该键的值;否则,存储键-值对并返回新值。
Delete(key):删除指定键及其对应的值。
Range(f func(key, value interface{}) bool):遍历 sync.Map 中的所有键值对。
  1. package main
  2. import (
  3.     "fmt"
  4.     "sync"
  5. )
  6. func main() {
  7.     var m sync.Map // 创建一个 sync.Map
  8.     // 存储键值对
  9.     m.Store("a", 1)
  10.     m.Store("b", 2)
  11.     // 启动多个 goroutine 进行并发读写
  12.     var wg sync.WaitGroup
  13.     wg.Add(3)
  14.     // 读取数据
  15.     go func() {
  16.         defer wg.Done()
  17.         if value, ok := m.Load("a"); ok {
  18.             fmt.Printf("读取键 'a': %v\n", value)
  19.         }
  20.     }()
  21.     // 读取数据
  22.     go func() {
  23.         defer wg.Done()
  24.         if value, ok := m.Load("b"); ok {
  25.             fmt.Printf("读取键 'b': %v\n", value)
  26.         }
  27.     }()
  28.     // 存储新值
  29.     go func() {
  30.         defer wg.Done()
  31.         m.Store("c", 3)
  32.         fmt.Println("存储键 'c': 3")
  33.     }()
  34.     wg.Wait() // 等待所有 goroutine 完成
  35.     // 遍历 Map
  36.     m.Range(func(key, value interface{}) bool {
  37.         fmt.Printf("遍历 - 键: %v, 值: %v\n", key, value)
  38.         return true // 返回 true 继续遍历
  39.     })
  40. }
复制代码
原子操作

在 Go 语言中,原子操作是指在执行某个操作时不会被其他 goroutine 停止或干扰的一种操作。
原子操作通常用于处理共享变量的并发访问,以确保数据的同等性和完备性。
Go 提供了一些原子操作的支持,主要集中在 sync/atomic 包中。
原子操作在 Go 语言中是处理并发共享数据的一种高效方法,
通过 sync/atomic 包提供的工具,可以在不使用显式锁的情况下安全地更新和读取共享变量。
这特别实用于需要频仍更新的场景,如计数器、状态标志等。
原子操作的上风

高效性:原子操作避免了使用 mutex(互斥锁)带来的额外开销,通常速度更快。
避免死锁:由于不涉及锁的获取和开释,原子操作可以减少死锁的风险。
简朴性:可以在多 goroutine 中访问同一变量而不需要进行显式的锁机制。
atomic 包中的主要函数

sync/atomic 包提供了针对整数和指针类型的原子操作,常用的函数包括:
AddInt32 / AddInt64:对 32 位或 64 位整数执行加算操作。
LoadInt32 / LoadInt64:加载 32 位或 64 位整数的当前值。
StoreInt32 / StoreInt64:将 32 位或 64 位整数的值存储为一个新的值。
CompareAndSwapInt32 / CompareAndSwapInt64:比较并交换操作,如果当前值与预期值相等,则将其替换为新值。
  1. package main
  2. import (
  3.     "fmt"
  4.     "sync"
  5.     "sync/atomic"
  6.     "time"
  7. )
  8. func main() {
  9.     var counter int64 // 定义一个共享变量
  10.     var wg sync.WaitGroup // 用于等待所有 goroutine 完成
  11.     // 启动多个 goroutine 进行并发增加
  12.     for i := 0; i < 5; i++ {
  13.         wg.Add(1) // 增加 WaitGroup 的计数
  14.         go func() {
  15.             defer wg.Done() // 在 goroutine 完成时减少计数
  16.             for j := 0; j < 1000; j++ {
  17.                 atomic.AddInt64(&counter, 1) // 原子操作增加计数
  18.             }
  19.         }()
  20.     }
  21.     wg.Wait() // 等待所有 goroutine 完成
  22.     fmt.Printf("最终计数: %d\n", atomic.LoadInt64(&counter)) // 获取最终计数并输出
  23. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

嚴華

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表