ToB企服应用市场:ToB评测及商务社交产业平台

标题: GO语言实战之并发和 goroutine [打印本页]

作者: 嚴華    时间: 2024-7-23 03:26
标题: GO语言实战之并发和 goroutine
写在前面



对每个人而言,真正的职责只有一个:找到自我。然后在心中服从其一生,经心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的恐惊 ——赫尔曼·黑塞《德米安》

内容摘要:

并发

编码中,并行实行多个任务会有更大的好处。一个例子是:
Web 服务必要在各自独立的套接字(socket)上同时接收多个数据请求。每个套接字请求都是独立的,可以完全独立于其他套接字举行处理惩罚。具有并行实行多个请求的本领可以显著进步这类体系的性能。
考虑到这一点,Go 语言的语法和运行时直接内置了对并发的支持。
Go 语言里的并发指的是能让某个函数独立于其他函数运行的本领。当一个函数创建为 goroutine 时,Go 会将其视为一个独立的工作单元。这个单元会被调治到可用的逻辑处理惩罚器上实行
Go 语言 运行时的调治器是一个复杂的软件,能管理被创建的所有 goroutine 并为其分配实行时间。这个调治器在操纵体系之上,将操纵体系的线程与语言运行时的逻辑处理惩罚器绑定,并在逻辑处理惩罚器上运行 goroutine。
调治器在任何给定的时间,都会全面控制哪个 goroutine 要在哪个逻辑处理惩罚器上运行
Go 语言的并发同步模型来自一个叫作通讯序次进程(Communicating Sequential Processes,CSP)的范型(paradigm)。
CSP 是一种消息通报模型,通过在 goroutine 之间通报数据来通报消息,而不是对数据举行加锁来实现同步访问。用于在 goroutine 之间同步和通报数据的关键数据类型叫作通道(channel)。
使用通道可以使编写并发程序更容易,也能够让并发程序出错更少
并发与并行

操纵体系会在物理处理惩罚器上调治线程来运行,而 Go 语言的运行时会在逻辑处理惩罚器上调治 goroutine 来运行。每个逻辑处理惩罚器都分别绑定到单个操纵体系线程
在 1.5 版本 Go语言的运行时默认会为每个可用的物理处理惩罚器分配一个逻辑处理惩罚器。
在 1.5 版本之前的版本中,默认给整个应用程序只分配一个逻辑处理惩罚器。这些逻辑处理惩罚器会用于实行所有被创建的goroutine。即便只有一个逻辑处理惩罚器,Go也可以以神奇的效率和性能,并发调治无数个goroutine。

在图 6-2 中,可以看到操纵体系线程、逻辑处理惩罚器和当地运行队列之间的关系。
假如创建一个 goroutine 并准备运行,这个 goroutine 就会被放到调治器的全局运行队列中。之后,调治器就将这些队列中的 goroutine 分配给一个逻辑处理惩罚器,并放到这个逻辑处理惩罚器对应的当地运行队列中。当地运行队列中的 goroutine 会不停等待直到本身被分配的逻辑处理惩罚器实行.
正在运行的 goroutine 必要实行一个阻塞的体系调用,如打开一个文件。当这类调用发生时,线程和 goroutine 会从逻辑处理惩罚器上分离,该线程会继续阻塞,等待体系调用的返回。
与此同时,这个逻辑处理惩罚器就失去了用来运行的线程。以是,调治器会创建一个新线程,并将其绑定到该逻辑处理惩罚器上。之后,调治器会从当地运行队列里选择另一个 goroutine 来运行。
一旦被阻塞的体系调用实行完成并返回,对应的 goroutine 会放回到当地运行队列,而之前的线程会保存好,以便之后可以继续使用。
在很多情况下,并发的结果比并行好,因为操纵体系和硬件的总资源一般很少,但能支持体系同时做很多事情。这种“使用较少的资源做更多的事情”的哲学,也是指导 Go 语言计划的哲学
假如盼望让 goroutine 并行,必须使用多于一个逻辑处理惩罚器。当有多个逻辑处理惩罚器时,调治器会将 goroutine 平等分配到每个逻辑处理惩罚器上。这会让 goroutine 在不同的线程上运行。
不外要想真的实现并行的结果,用户必要让本身的程序运行在有多个物理处理惩罚器的机器上。否则,哪怕 Go 语言运行时使用多个线程,goroutine 依然会在同一个物理处理惩罚器上并发运行,达不到并行的结果

goroutine

创建两个 goroutine,以并发的情势分别显示大写和小写的英文字母
  1. package main
  2. import (
  3. "fmt"
  4. "runtime"
  5. "sync"
  6. )
  7. // main 是所有 Go 程序的入口
  8. func main() {
  9. // 分配一个逻辑处理器给调度器使用
  10. runtime.GOMAXPROCS(1)
  11. // wg 用来等待程序完成
  12. // 计数加 2,表示要等待两个 goroutine
  13. var wg sync.WaitGroup
  14. wg.Add(2)
  15. fmt.Println("Start Goroutines")
  16. // 声明一个匿名函数,并创建一个 goroutine
  17. go func() {
  18.   // 在函数退出时调用 Done 来通知 main 函数工作已经完成
  19.   defer wg.Done()
  20.   // 显示字母表 3 次
  21.   for count := 0; count < 3; count++ {
  22.    for char := 'a'; char < 'a'+26; char++ {
  23.     fmt.Printf("%c ", char)
  24.    }
  25.   }
  26. }()
  27. // 声明一个匿名函数,并创建一个 goroutine
  28. go func() {
  29.   // 在函数退出时调用 Done 来通知 main 函数工作已经完成
  30.   defer wg.Done()
  31.   // 显示字母表 3 次
  32.   for count := 0; count < 3; count++ {
  33.    for char := 'A'; char < 'A'+26; char++ {
  34.     fmt.Printf("%c ", char)
  35.    }
  36.   }
  37. }()
  38. // 等待 goroutine 结束
  39. fmt.Println("等待 goroutine 结束")
  40. wg.Wait()
  41. fmt.Println("\nTerminating Program")
  42. }
复制代码
调用了 runtime 包的 GOMAXPROCS 函数。这个函数允许程序更改调治器可以使用的逻辑处理惩罚器的数量。假如不想在代码里做这个调用,也可以通过修改和这 个函数名字一样的情况变量的值来更改逻辑处理惩罚器的数量。给这个函数传入 1,是关照调治器只能为该程序使用一个逻辑处理惩罚器。
两个函数分别通过关键字 go 创建 goroutine 来实行
  1. Start Goroutines
  2. 等待 goroutine 结束
  3. A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z
  4. Terminating Program
复制代码
第一个 goroutine 完成所有显示必要花时间太短了,以至于在调治器切换到第二个 goroutine 之前,就完成了所有任务。这也是为什么会看到先输出了所有的大写字母
main 函数通过 WaitGroup,等待两个 goroutine 完成它们的工作, WaitGroup 是一个计数信号量,可以用来记录并维护运行的 goroutine
假如 WaitGroup 的值大于 0,Wait 方法就会阻塞。创建了一个 WaitGroup 类型的变量,之后在将这个 WaitGroup 的值设置为 2,表现有两个正在运行的 goroutine。
为了减小 WaitGroup 的值并最终释放 main 函数,使用 defer 声明在函数退出时调用 Done 方法, 关键字 defer 会修改函数调用时机,在正在实行的函数返回时才真正调用 defer 声明的函数。
在 Go 语言中,sync.WaitGroup 用于协调并发任务的完成``。WaitGroup 提供了三个方法:Add()、Done() 和 Wait()。

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. func main() {
  7. var wg sync.WaitGroup
  8. // 启动两个 Goroutine
  9. wg.Add(2)
  10. go worker(1, &wg)
  11. go worker(2, &wg)
  12. // 等待所有 Goroutine 完成
  13. wg.Wait()
  14. fmt.Println("所有 Goroutine 已完成")
  15. }
  16. func worker(id int, wg *sync.WaitGroup) {
  17. // 在函数结束时通知 WaitGroup 完成
  18. defer wg.Done()
  19. fmt.Printf("Worker %d 正在执行\n", id)
  20. }
复制代码
基于调治器的内部算法,一个正运行的 goroutine 在工作竣事前,可以被停止并重新调治
调治器这样做的目的是防止某个 goroutine 长时间占用逻辑处理惩罚器。当 goroutine 占用时间过长时,调治器会停止当前正运行的 goroutine,并给其他可运行的 goroutine 运行的机会。

以给每个可用的物理处理惩罚器在运行的时间分配一个逻辑处理惩罚器 runtime.GOMAXPROCS(runtime.NumCPU())
  1. package main
  2. import (
  3. "fmt"
  4. "runtime"
  5. "sync"
  6. )
  7. // 创建一个协调器
  8. var wg sync.WaitGroup
  9. // main is the entry point for all Go programs.
  10. func main() {
  11. // 分配一个逻辑处理器处理
  12. runtime.GOMAXPROCS(runtime.NumCPU())
  13. // 有两个 goroutine 要运行等待
  14. wg.Add(2)
  15. // 创建两个 goroutine
  16. fmt.Println("Create Goroutines")
  17. go printPrime("A")
  18. go printPrime("B")
  19. // 等待 goroutine 结束
  20. fmt.Println("Waiting To Finish")
  21. wg.Wait()
  22. fmt.Println("Terminating Program")
  23. }
  24. // printPrime 显示 5000 以内的素数值
  25. func printPrime(prefix string) {
  26. // 在函数退出时调用 Done 来通知 main 函数工作已经完成
  27. defer wg.Done()
  28. next:
  29. for outer := 2; outer < 5000; outer++ {
  30.   for inner := 2; inner < outer; inner++ {
  31.    if outer%inner == 0 {
  32.     continue next
  33.    }
  34.   }
  35.   //time.Sleep(1 * time.Second) // 睡眠2秒
  36.   fmt.Printf("%s:%d\n", prefix, outer)
  37. }
  38. fmt.Println("Completed", prefix)
  39. }
复制代码
必要夸大的是,使用多个逻辑处理惩罚器并不意味着性能更好。在修改任何语言运行时设置参数的时间,都必要配合基准测试来评估程序的运行结果
只有在有多个逻辑处理惩罚器且可以同时让每个 goroutine 运行在一个可用的物理处理惩罚器上的时间,goroutine 才会并行运行
竞争状态

假如两个大概多个 goroutine 在没有相互同步的情况下,访问某个共享的资源,并试图同时读和写这个资源,就处于相互竞争的状态,这种情况被称作竞争状态(race candition)
对一个共享资源的读和写操纵必须是原子化的,换句话说,同一时候只能有一个 goroutine 对共享资源举行读和写操纵
  1. package main
  2. import (
  3. "fmt"
  4. "runtime"
  5. "sync"
  6. )
  7. var (
  8. // counter 是所有 goroutine 都要增加其值的变量
  9. counter int
  10. wg sync.WaitGroup
  11. )
  12. func main() {
  13. wg.Add(2)
  14. go incCounter(1)
  15. go incCounter(2)
  16. wg.Wait()
  17. fmt.Println("Final Counter:", counter)
  18. }
  19. // incCounter 增加包里 counter 变量的值
  20. func incCounter(id int) {
  21. defer wg.Done()
  22. for count := 0; count < 2; count++ {
  23.   value := counter
  24.   // 当前 goroutine 从线程退出,并放回到队列
  25.   runtime.Gosched()
  26.   value++
  27.   // 将该值保存回 counter
  28.   counter = value
  29. }
  30. }
复制代码
每个 goroutine 都会覆盖另一个 goroutine 的工作。这种覆盖发生在 goroutine 切换的时间。每个 goroutine 创造了一个 counter 变量的副本,之后就切换到另一个 goroutine。当这个 goroutine 再次运行的时间,counter 变量的值已经改变了,但是 goroutine 并没有更新本身的谁人副本的值,而是继续使用这个副本的值,用这个值递增,并存回 counter 变量,结果覆盖了另一个 goroutine 完成的工作。

Go调治器会在 goroutine 发生阻塞、IO操纵、函数调用等情况下主动举行调治切换,而无需显式调用 runtime.Gosched() 。
Go 语言有一个特别的工具,可以在代码里检测竞争状态。在查找这类错误的时间,这个工具非常好用,尤其是在竞争状态并不像这个例子里这么显着的时间。让我们用这个竞争检测器来检测一下我们的例子代码,
  1. go build -race // 用竞争检测器标志来编译程序
复制代码
  1. counter = value
  2. value := counter
  3. go incCounter(1)
  4. go incCounter(2)
复制代码
这几行代码分别是对 counter 变量的读和写操纵
一种修正代码、消除竞争状态的办法是,使用Go 语言提供的锁机制,来锁住共享资源,从而保证 goroutine 的同步状态。
锁住共享资源

Go 语言提供了传统的同步 goroutine 的机制,就是对共享资源加锁。假如必要序次访问一个,整型变量大概一段代码,atomic 和 sync包里的函数提供了很好的解决方案。 atomic 包里的几个函数以及 sync 包里的 mutex 类型。
原子函数

  1. // 这个示例程序展示如何使用 atomic 包来提供
  2. package main
  3. import (
  4. "fmt"
  5. "runtime"
  6. "sync"
  7. "sync/atomic"
  8. )
  9. var (
  10. // counter 是所有 goroutine 都要增加其值的变量
  11. counter int64
  12. // wg 用来等待程序结束
  13. wg sync.WaitGroup
  14. )
  15. // main 是所有 Go 程序的入口
  16. func main() {
  17. wg.Add(2)
  18. go incCounter(1)
  19. go incCounter(2)
  20. wg.Wait()
  21. fmt.Println("Final Counter:", counter)
  22. }
  23. func incCounter(id int) {
  24. defer wg.Done()
  25. for count := 0; count < 2; count++ {
  26.   // 安全地对 counter 加 1
  27.   atomic.AddInt64(&counter, 1)
  28.   // 当前 goroutine 从线程退出,并放回到队列
  29.   runtime.Gosched()
  30. }
  31. }
复制代码
使用了 atmoic 包的 AddInt64 函数。这个函数会同步整型值的加法,方法是强制同一时候只能有一个 goroutine 运行并完成这个加法操纵。
当 goroutine 试图去调用任何原子函数时,这些 goroutine 都会主动根据所引用的变量做同步处理惩罚。
另外两个有用的原子函数是 LoadInt64 和 StoreInt64。这两个函数提供了一种安全地读和写一个整型值的方式
  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. )
  8. var (
  9. shutdown int64
  10. wg       sync.WaitGroup
  11. )
  12. func main() {
  13. wg.Add(2)
  14. go doWork("A")
  15. go doWork("B")
  16. time.Sleep(1 * time.Second)
  17. fmt.Println("Shutdown Now")
  18. // 该停止工作了,安全地设置 shutdown 标志
  19. atomic.StoreInt64(&shutdown, 1)
  20. wg.Wait()
  21. }
  22. func doWork(name string) {
  23. // Schedule the call to Done to tell main we are done.
  24. defer wg.Done()
  25. for {
  26.   fmt.Printf("Doing %s Work\n", name)
  27.   //给定 goroutine 执行的时间
  28.   time.Sleep(250 * time.Millisecond)
  29.   // 读取工作标识,判断当前工作是否完成,要停止工作了吗?
  30.   if atomic.LoadInt64(&shutdown) == 1 {
  31.    fmt.Printf("Shutting %s Down\n", name)
  32.    break
  33.   }
  34. }
  35. }
复制代码
goroutine 会使用 LoadInt64 来检查 shutdown 变量的值。这个函数会安全地返回 shutdown 变量的一个副本。假如这个副本的值为 1,goroutine 就会跳出循环并终止。
main 函数使用 StoreInt64 函数来安全地修改 shutdown 变量的值。假如哪个 doWork goroutine 试图在 main 函数调用 StoreInt64 的同时调用 LoadInt64 函数,那么原子函数会将这些调用相互同步,保证这些操纵都是安全的,不会进入竞争状态
雷同 Java 的原子基本类型,这里可以联公道解,AtomicBoolean,AtomicInteger,AtomicLong
互斥锁

另一种同步访问共享资源的方式是使用互斥锁(mutex)。互斥锁这个名字来自互斥(mutualexclusion)的概念。互斥锁用于在代码上创建一个临界区,保证同一时间只有一个 goroutine 可以实行这个临界区代码。
  1. package main
  2. import (
  3. "fmt"
  4. "runtime"
  5. "sync"
  6. )
  7. var (
  8. counter int
  9. wg      sync.WaitGroup
  10. // mutex 用来定义一段代码临界区
  11. mutex sync.Mutex
  12. )
  13. func main() {
  14. wg.Add(2)
  15. go incCounter(1)
  16. go incCounter(2)
  17. // Wait for the goroutines to finish.
  18. wg.Wait()
  19. fmt.Printf("Final Counter: %d\n", counter)
  20. }
  21. // incCounter 使用互斥锁来同步并保证安全访问,
  22. // 增加包里 counter 变量的值
  23. func incCounter(id int) {
  24. defer wg.Done()
  25. for count := 0; count < 2; count++ {
  26.   mutex.Lock()
  27.   {
  28.    value := counter
  29.    runtime.Gosched()
  30.    value++
  31.    counter = value
  32.   }
  33.   mutex.Unlock()
  34. }
  35. }
复制代码
Lock()和 Unlock() 函数调用定义的临界区里被掩护起来。使用大括号只是为了让临界区看起来更清晰,并不是必须的。同一时候只有一个 goroutine 可以进入临界区。之后,直到调用 Unlock()函数之后,其他 goroutine 才气进入临界区。 可以联合 Java 的 ReentrantLock 来理解。都属于互斥的可重入锁.
通道

在 Go 语言里,你不但可以使用原子函数和互斥锁来保证对共享资源的安全访问以及消除竞争状态,还可以使用通道,通过发送和接收必要共享的资源,在 goroutine 之间做同步。
当一个资源必要在 goroutine 之间共享时,通道在 goroutine 之间架起了一个管道,并提供了确保同步交换数据的机制。声明通道时,必要指定将要被共享的数据的类型。可以通过通道共享内置类型、命名类型、布局类型和引用类型的值大概指针。
在 Go 语言中必要使用内置函数 make 来创建一个通道
  1. // 无缓冲的整型通道
  2. unbuffered := make(chan int)
  3. // 有缓冲的字符串通道
  4. buffered := make(chan string, 10)
复制代码
make 的第一个参数必要是关键字 chan,之后跟着允许通道交换的数据的类型。假如创建的是一个有缓冲的通道,之后还必要在第二个参数指定这个通道的缓冲区的巨细。
向通道发送值大概指针必要用到<-操纵符
  1. // 有缓冲的字符串通道
  2. buffered := make(chan string, 10)
  3. // 通过通道发送一个字符串
  4. buffered <- "Gopher"
复制代码
创建了一个有缓冲的通道,数据类型是字符串,包含一个 10 个值的缓冲区。之后我们通过通道发送字符串"Gopher"。
为了让另一个 goroutine 可以从该通道里接收到这个字符串,我们依旧使用<-操纵符,但这次是一元运算符,当从通道里接收一个值大概指针时,<-运算符在要操纵的通道变量的左侧
  1. // 从通道接收一个字符串
  2. value := <- buffered
复制代码
通道是否带有缓冲,其行为会有一些不同。理解这个差异对决定到底应该使用还是不使用缓冲很有资助。下面我们分别介绍一下这两种类型。
无缓冲的通道

无缓冲的通道(unbuffered channel)是指在接收前没有本领保存任何值的通道。
这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才气完成发送和接收操纵。假如两个 goroutine 没有同时准备好,通道会导致先实行发送或接收操纵的 goroutine 阻塞等待。这种对通道举行发送和接收的交互行为本身就是同步的。此中任意一个操纵都无法离开另一个操纵单独存在。

在网球角逐中,两位选手会把球在两个人之间往返通报。选手总是处在以下两种状态之一:要么在等待接球,要么将球打向对方。可以使用两个 goroutine 来模拟网球角逐,并使用无缓冲的通道来模拟球的往返.
  1. package main
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "sync"
  6. "time"
  7. )
  8. var wg sync.WaitGroup
  9. func init() {
  10. rand.Seed(time.Now().UnixNano())
  11. }
  12. // main is the entry point for all Go programs.
  13. func main() {
  14. // 创建一个无缓冲的通道
  15. court := make(chan int)
  16. // Add a count of two, one for each goroutine.
  17. wg.Add(2)
  18. // 启动两个选手
  19. go player("Nadal", court)
  20. go player("Djokovic", court)
  21. // 发球
  22. court <- 1
  23. // Wait for the game to finish.
  24. wg.Wait()
  25. }
  26. // player 模拟一个选手在打网球
  27. func player(name string, court chan int) {
  28. // 在函数退出时调用 Done 来通知 main 函数工作已经完成
  29. defer wg.Done()
  30. for {
  31.   // 等待球被击打过来
  32.   ball, ok := <-court
  33.   if !ok {
  34.    // 如果通道被关闭,我们就赢了
  35.    fmt.Printf("Player %s Won\n", name)
  36.    return
  37.   }
  38.   // 选随机数,然后用这个数来判断我们是否丢球
  39.   n := rand.Intn(100)
  40.   if n%13 == 0 {
  41.    fmt.Printf("Player %s Missed\n", name)
  42.    // close()是一个内建函数,用于关闭一个通道
  43.    close(court)
  44.    return
  45.   }
  46.   // / 显示击球数,并将击球数加 1
  47.   fmt.Printf("Player %s Hit %d\n", name, ball)
  48.   ball++
  49.   // / 将球打向对手
  50.   court <- ball
  51. }
  52. }
  53. ==========
  54. Player Djokovic Hit 1
  55. Player Nadal Hit 2
  56. Player Djokovic Hit 3
  57. Player Nadal Hit 4
  58. Player Djokovic Hit 5
  59. Player Nadal Hit 6
  60. Player Djokovic Hit 7
  61. Player Nadal Hit 8
  62. Player Djokovic Missed
  63. Player Nadal Won
  64. Process finished with the exit code 0
复制代码
4 个跑步者围绕赛道轮流跑,第二个、第三个和第四个跑步者要接到前一位跑步者的接力棒后才气起跑。角逐中最重要的部门是要通报接力棒,要求同步通报。在同步接力棒的时间,参与接力的两个跑步者必须在同一时候准备好交接.
非常不错的一个 Demo
  1. // 4 个 goroutine 间的接力比赛
  2. package main
  3. import (
  4. "fmt"
  5. "sync"
  6. "time"
  7. )
  8. // wg 用来等待程序结束
  9. var wg sync.WaitGroup
  10. // / main 是所有 Go 程序的入口
  11. func main() {
  12. // 创建一个无缓冲的通道
  13. baton := make(chan int)
  14. // 为最后一位跑步者将计数加 1
  15. wg.Add(1)
  16. // 表示第一位跑步者来到跑道
  17. go Runner(baton)
  18. // 开始比赛,为第一位跑步者提供接力棒
  19. baton <- 1
  20. // 等待接力结束
  21. wg.Wait()
  22. }
  23. // Runner 模拟接力比赛中的一位跑步者
  24. func Runner(baton chan int) {
  25. var newRunner int
  26. // 等待接力棒
  27. runner := <-baton
  28. // 开始绕着跑道跑步
  29. fmt.Printf("当前接力者 %d  开始跑步\n", runner)
  30. // 创建下一位跑步者,这里是一个递归,实际上模拟其他选手到达指定接力位置
  31. if runner != 4 {
  32.   newRunner = runner + 1
  33.   fmt.Printf("当前选手 %d 开始接力\n", newRunner)
  34.   go Runner(baton)
  35. }
  36. // 所有选手到达指定位置
  37. // 围绕跑道跑
  38. time.Sleep(100 * time.Millisecond)
  39. // 当所有人跑完结束比赛
  40. if runner == 4 {
  41.   fmt.Printf("最后一位 %d 跑完了, Race Over\n", runner)
  42.   wg.Done()
  43.   return
  44. }
  45. // 跑完之后交接接力棒
  46. // 将接力棒交给下一位跑步者
  47. fmt.Printf("接力棒由 %d 交给了 %d\n",
  48.   runner,
  49.   newRunner)
  50. baton <- newRunner
  51. }
  52. =========
  53. C:\Users\liruilong\AppData\Local\JetBrains\GoLand2023.2\tmp\GoLand\___go_build_example_com_m_chapter6_listing22.exe
  54. 当前接力者 1  开始跑步
  55. 当前选手 2 开始接力
  56. ====== 接力棒由 1 交给了 2
  57. 当前接力者 2  开始跑步
  58. 当前选手 3 开始接力
  59. ====== 接力棒由 2 交给了 3
  60. 当前接力者 3  开始跑步
  61. 当前选手 4 开始接力
  62. ====== 接力棒由 3 交给了 4
  63. 当前接力者 4  开始跑步
  64. 最后一位 4 跑完了, Race Over
  65. Process finished with the exit code 0
复制代码
这里有个误区,第一眼看这个代码,大概会把 go Runner(baton) 当作一个递归方法来理解,这里使用了, 以是本质上是不同的 goroutine , goroutine 导致当前住进程并不会直接进入被声明函数栈内里。而是会序次实行。 直到发生了通道操纵,然后会进入到其他的 goroutine,也就是说,在第一个人举行跑步时,其他的通道不停时阻塞状态。
有缓冲的通道

有缓冲的通道(buffered channel) 是一种在被接收前能存储一个大概多个值的通道。这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动作的条件也会不同。

这导致有缓冲的通道和无缓冲的通道之间的一个很大的不同:无缓冲的通道保证举行发送和接收的 goroutine 会在同一时间举行数据交换;有缓冲的通道没有这种保证。

可以看到有缓存的通道情况下,两个操纵既不是同步的,也不会相互阻塞,即通道两侧的读写发生是没有直接关系的。
使用有缓冲的通道的例子,这个例子管理一组 goroutine 来接收并完成工作。有缓冲的通道提供了一种清晰而直观的方式来实现这个功能
  1. // 有缓冲的通道和固定数目的 goroutine 来处理一堆工作
  2. package main
  3. import (
  4. "fmt"
  5. "math/rand"
  6. "sync"
  7. "time"
  8. )
  9. const (
  10. numberGoroutines = 4  // 要使用的 goroutine 的数量
  11. taskLoad         = 10 // 要处理的工作的数量
  12. )
  13. // wg 用来等待程序完成
  14. var wg sync.WaitGroup
  15. // init is called to initialize the package by the
  16. // Go runtime prior to any other code being executed.
  17. func init() {
  18. // 初始化随机数种子
  19. rand.Seed(time.Now().Unix())
  20. }
  21. // main 是所有 Go 程序的入口
  22. func main() {
  23. // 创建一个有缓冲的通道来管理工作
  24. tasks := make(chan string, taskLoad)
  25. //使用这么多  goroutine  来处理
  26. wg.Add(numberGoroutines)
  27. for gr := 1; gr <= numberGoroutines; gr++ {
  28.   //  启动每个 goroutine 来处理工作
  29.   go worker(tasks, gr)
  30. }
  31. // 增加一组要完成的工作,这里一次性放入了所有的通道数据
  32. for post := 1; post <= taskLoad; post++ {
  33.   tasks <- fmt.Sprintf("Task : %d", post)
  34. }
  35. // 当所有工作都处理完时关闭通道
  36. // 以便所有 goroutine 退出
  37. close(tasks)
  38. // 等待所有工作完成
  39. wg.Wait()
  40. }
  41. // worker 作为 goroutine 启动来处理
  42. // 从有缓冲的通道传入的工作
  43. func worker(tasks chan string, worker int) {
  44. // 通知函数已经返回
  45. defer wg.Done()
  46. for {
  47.   // 等待分配工作
  48.   task, ok := <-tasks
  49.   if !ok {
  50.    // 这意味着通道已经空了,并且已被关闭
  51.    fmt.Printf("员工: %d : 结束了工作\n", worker)
  52.    return
  53.   }
  54.   // 分配工作显示我们开始工作了
  55.   fmt.Printf("员工: %d : 开始执行任务 %s\n", worker, task)
  56.   // 随机等一段时间来模拟工作
  57.   sleep := rand.Int63n(100)
  58.   time.Sleep(time.Duration(sleep) * time.Millisecond)
  59.   // 显示我们完成了工作
  60.   fmt.Printf("员工: %d : 完成了任务 %s\n", worker, task)
  61. }
  62. }
复制代码
这里雷同队列一样,天生四个 goroutine 在有缓存通道内里拿到数据处理惩罚数据。
  1. C:\Users\liruilong\AppData\Local\JetBrains\GoLand2023.2\tmp\GoLand\___go_build_example_com_m_chapter6_listing24.exe
  2. 员工: 1 : 开始执行任务 Task : 3
  3. 员工: 3 : 开始执行任务 Task : 4
  4. 员工: 2 : 开始执行任务 Task : 1
  5. 员工: 4 : 开始执行任务 Task : 2
  6. 员工: 1 : 完成了任务 Task : 3
  7. 员工: 1 : 开始执行任务 Task : 5
  8. 员工: 2 : 完成了任务 Task : 1
  9. 员工: 2 : 开始执行任务 Task : 6
  10. 员工: 2 : 完成了任务 Task : 6
  11. 员工: 2 : 开始执行任务 Task : 7
  12. 员工: 3 : 完成了任务 Task : 4
  13. 员工: 3 : 开始执行任务 Task : 8
  14. 员工: 4 : 完成了任务 Task : 2
  15. 员工: 4 : 开始执行任务 Task : 9
  16. 员工: 1 : 完成了任务 Task : 5
  17. 员工: 1 : 开始执行任务 Task : 10
  18. 员工: 1 : 完成了任务 Task : 10
  19. 员工: 4 : 完成了任务 Task : 9
  20. 员工: 4 : 结束了工作
  21. 员工: 1 : 结束了工作
  22. 员工: 3 : 完成了任务 Task : 8
  23. 员工: 3 : 结束了工作
  24. 员工: 2 : 完成了任务 Task : 7
  25. 员工: 2 : 结束了工作
  26. Process finished with the exit code 0
复制代码
博文部门内容参考

© 文中涉及参考链接内容版权归原作者所有,如有侵权请告知,这是一个开源项目,假如你承认它,不要吝啬星星哦




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4