40分钟学 Go 语言高并发:Go Channel使用与实践教程

打印 上一主题 下一主题

主题 790|帖子 790|积分 2370

Go Channel使用与实践教程

一、Channel基础概念与类型

Channel是Go语言中的一个核心类型,是一种通讯机制,用于在不同的goroutine之间通报数据。它是Go语言实现CSP(Communicating Sequential Processes)并发模子的基础。
1.1 Channel类型和特性表

特性阐明定向性双向channel、只读channel、只写channel缓冲性无缓冲channel、有缓冲channel阻塞性阻塞式操作、非阻塞式操作(select)状态开启、关闭 Channel工作流程图


1.2 Channel根本操作

  1. // 创建channel
  2. ch := make(chan int)        // 无缓冲
  3. ch := make(chan int, 5)     // 有缓冲
  4. // 发送数据
  5. ch <- value
  6. // 接收数据
  7. value := <-ch
  8. value, ok := <-ch          // 检查channel是否关闭
  9. // 关闭channel
  10. close(ch)
  11. // 循环读取
  12. for v := range ch {
  13.     // 处理数据
  14. }
复制代码
二、有缓冲vs无缓冲Channel

2.1 对比表

特性无缓冲Channel有缓冲Channel容量0用户指定巨细发送操作阻塞直到有接收者缓冲区未满时不阻塞接收操作阻塞直到有发送者缓冲区非空时不阻塞同步性强同步异步实用场景需要即时相应的场景处理惩罚突发请求、削峰填谷 2.2 无缓冲Channel示例

  1. package main
  2. import (
  3.     "fmt"
  4.     "time"
  5. )
  6. func main() {
  7.     // 创建无缓冲channel
  8.     ch := make(chan int)
  9.    
  10.     // 发送者goroutine
  11.     go func() {
  12.         fmt.Println("发送数据前...")
  13.         ch <- 1
  14.         fmt.Println("发送数据后...")
  15.     }()
  16.    
  17.     // 让发送者goroutine有时间先执行
  18.     time.Sleep(time.Second)
  19.    
  20.     fmt.Println("准备接收数据...")
  21.     value := <-ch
  22.     fmt.Printf("接收到数据: %d\n", value)
  23. }
复制代码
2.3 有缓冲Channel示例

  1. package main
  2. import (
  3.     "fmt"
  4.     "time"
  5. )
  6. func main() {
  7.     // 创建带缓冲的channel
  8.     ch := make(chan int, 2)
  9.    
  10.     // 发送者goroutine
  11.     go func() {
  12.         for i := 1; i <= 3; i++ {
  13.             fmt.Printf("发送数据: %d\n", i)
  14.             ch <- i
  15.             fmt.Printf("成功发送: %d\n", i)
  16.         }
  17.     }()
  18.    
  19.     // 等待数据发送
  20.     time.Sleep(time.Second)
  21.    
  22.     // 接收数据
  23.     for i := 1; i <= 3; i++ {
  24.         value := <-ch
  25.         fmt.Printf("接收到数据: %d\n", value)
  26.     }
  27. }
复制代码
三、常见Channel模式

3.1 生产者-消费者模式

  1. package main
  2. import (
  3.     "fmt"
  4.     "math/rand"
  5.     "time"
  6. )
  7. // 生产者函数
  8. func producer(ch chan<- int) {
  9.     for i := 0; ; i++ {
  10.         // 生产数据
  11.         value := rand.Intn(100)
  12.         ch <- value
  13.         fmt.Printf("生产者生产数据: %d\n", value)
  14.         time.Sleep(time.Millisecond * 500)
  15.     }
  16. }
  17. // 消费者函数
  18. func consumer(id int, ch <-chan int) {
  19.     for value := range ch {
  20.         fmt.Printf("消费者%d消费数据: %d\n", id, value)
  21.         time.Sleep(time.Millisecond * 800)
  22.     }
  23. }
  24. func main() {
  25.     ch := make(chan int, 5)
  26.    
  27.     // 启动生产者
  28.     go producer(ch)
  29.    
  30.     // 启动多个消费者
  31.     for i := 0; i < 3; i++ {
  32.         go consumer(i, ch)
  33.     }
  34.    
  35.     // 运行一段时间后退出
  36.     time.Sleep(time.Second * 5)
  37. }
复制代码
3.2 Pipeline模式

  1. package main
  2. import (
  3.     "fmt"
  4. )
  5. // 生成数字的阶段
  6. func generate(nums ...int) <-chan int {
  7.     out := make(chan int)
  8.     go func() {
  9.         defer close(out)
  10.         for _, n := range nums {
  11.             out <- n
  12.         }
  13.     }()
  14.     return out
  15. }
  16. // 平方计算阶段
  17. func square(in <-chan int) <-chan int {
  18.     out := make(chan int)
  19.     go func() {
  20.         defer close(out)
  21.         for n := range in {
  22.             out <- n * n
  23.         }
  24.     }()
  25.     return out
  26. }
  27. // 过滤阶段:只保留大于100的数
  28. func filter(in <-chan int) <-chan int {
  29.     out := make(chan int)
  30.     go func() {
  31.         defer close(out)
  32.         for n := range in {
  33.             if n > 100 {
  34.                 out <- n
  35.             }
  36.         }
  37.     }()
  38.     return out
  39. }
  40. func main() {
  41.     // 构建pipeline
  42.     numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
  43.     stage1 := generate(numbers...)
  44.     stage2 := square(stage1)
  45.     stage3 := filter(stage2)
  46.    
  47.     // 打印最终结果
  48.     for result := range stage3 {
  49.         fmt.Printf("大于100的平方数: %d\n", result)
  50.     }
  51. }
复制代码
3.3 扇出扇入模式

  1. package main
  2. import (
  3.     "fmt"
  4.     "sync"
  5. )
  6. // 扇出:多个goroutine从一个channel读取数据
  7. func split(ch <-chan int, n int) []<-chan int {
  8.     outputs := make([]<-chan int, n)
  9.     for i := 0; i < n; i++ {
  10.         outputs[i] = processData(ch)
  11.     }
  12.     return outputs
  13. }
  14. // 处理数据的worker
  15. func processData(in <-chan int) <-chan int {
  16.     out := make(chan int)
  17.     go func() {
  18.         defer close(out)
  19.         for n := range in {
  20.             out <- n * n // 假设这是一个耗时的处理过程
  21.         }
  22.     }()
  23.     return out
  24. }
  25. // 扇入:将多个channel合并为一个channel
  26. func merge(channels ...<-chan int) <-chan int {
  27.     var wg sync.WaitGroup
  28.     out := make(chan int)
  29.    
  30.     // 为每个输入channel启动一个goroutine
  31.     wg.Add(len(channels))
  32.     for _, c := range channels {
  33.         go func(ch <-chan int) {
  34.             defer wg.Done()
  35.             for n := range ch {
  36.                 out <- n
  37.             }
  38.         }(c)
  39.     }
  40.    
  41.     // 等待所有输入处理完成后关闭输出channel
  42.     go func() {
  43.         wg.Wait()
  44.         close(out)
  45.     }()
  46.    
  47.     return out
  48. }
  49. func main() {
  50.     // 创建输入channel
  51.     input := make(chan int)
  52.     go func() {
  53.         defer close(input)
  54.         for i := 1; i <= 10; i++ {
  55.             input <- i
  56.         }
  57.     }()
  58.    
  59.     // 扇出:启动3个处理goroutine
  60.     channels := split(input, 3)
  61.    
  62.     // 扇入:合并结果
  63.     results := merge(channels...)
  64.    
  65.     // 打印结果
  66.     for result := range results {
  67.         fmt.Printf("处理结果: %d\n", result)
  68.     }
  69. }
复制代码
四、死锁防备

4.1 常见死锁场景表

场景形貌办理方案同一goroutine中同时发送和接收无缓冲channel在同一goroutine中既要发送又要接收使用不同的goroutine处理惩罚发送和接收channel未初始化使用未make的channel使用make初始化channel循环等待多个goroutine相互等待对方的channel使用select避免永世阻塞channel泄露goroutine泄露导致channel永世无法关闭正确管理goroutine生命周期 4.2 死锁防备示例

  1. package main
  2. import (
  3.     "fmt"
  4.     "time"
  5. )
  6. // 使用select避免死锁
  7. func safeOperation(ch chan int, value int) bool {
  8.     select {
  9.     case ch <- value:
  10.         return true
  11.     case <-time.After(time.Second):
  12.         fmt.Println("操作超时")
  13.         return false
  14.     }
  15. }
  16. // 使用缓冲避免死锁
  17. func bufferExample() {
  18.     ch := make(chan int, 1) // 使用缓冲区
  19.     ch <- 1                 // 不会阻塞
  20.     fmt.Println(<-ch)       // 正常接收
  21. }
  22. // 正确的goroutine管理
  23. func properGoroutineManagement() {
  24.     done := make(chan bool)
  25.     ch := make(chan int)
  26.    
  27.     // 启动工作goroutine
  28.     go func() {
  29.         defer close(ch)
  30.         for i := 0; i < 5; i++ {
  31.             ch <- i
  32.         }
  33.         done <- true
  34.     }()
  35.    
  36.     // 接收数据
  37.     go func() {
  38.         for value := range ch {
  39.             fmt.Printf("接收到数据: %d\n", value)
  40.         }
  41.     }()
  42.    
  43.     <-done // 等待工作完成
  44. }
  45. func main() {
  46.     // 测试安全操作
  47.     ch := make(chan int)
  48.     go func() {
  49.         time.Sleep(time.Second * 2)
  50.         <-ch
  51.     }()
  52.    
  53.     success := safeOperation(ch, 1)
  54.     fmt.Printf("操作是否成功: %v\n", success)
  55.    
  56.     // 测试缓冲示例
  57.     bufferExample()
  58.    
  59.     // 测试goroutine管理
  60.     properGoroutineManagement()
  61. }
复制代码
五、Channel筹划建议

Channel状态转换图



  • 得当使用缓冲

    • 当生产和消费速率不匹配时
    • 需要削峰填谷时
    • 但不要使用过大的缓冲区

  • 正确处理惩罚channel关闭

    • 由发送方负责关闭channel
    • 不要关闭已关闭的channel
    • 不要向已关闭的channel发送数据

  • 合理使用select

    • 处理惩罚多个channel
    • 实现超时控制
    • 提供默认分支避免阻塞

  • 资源管理

    • 确保goroutine正确退出
    • 防止goroutine泄露
    • 得当清理资源

  • Channel性能测试的示例代码:
  1. package channel_test
  2. import (
  3.     "testing"
  4.     "time"
  5. )
  6. // 测试无缓冲channel的性能
  7. func BenchmarkUnbufferedChannel(b *testing.B) {
  8.     ch := make(chan int)
  9.     done := make(chan bool)
  10.     // 接收者goroutine
  11.     go func() {
  12.         for i := 0; i < b.N; i++ {
  13.             <-ch
  14.         }
  15.         done <- true
  16.     }()
  17.     b.ResetTimer()
  18.     // 发送数据
  19.     for i := 0; i < b.N; i++ {
  20.         ch <- i
  21.     }
  22.     <-done
  23. }
  24. // 测试有缓冲channel的性能
  25. func BenchmarkBufferedChannel(b *testing.B) {
  26.     ch := make(chan int, 1000)
  27.     done := make(chan bool)
  28.     // 接收者goroutine
  29.     go func() {
  30.         for i := 0; i < b.N; i++ {
  31.             <-ch
  32.         }
  33.         done <- true
  34.     }()
  35.     b.ResetTimer()
  36.     // 发送数据
  37.     for i := 0; i < b.N; i++ {
  38.         ch <- i
  39.     }
  40.     <-done
  41. }
  42. // 测试channel在高并发下的性能
  43. func BenchmarkChannelHighConcurrency(b *testing.B) {
  44.     ch := make(chan int, 1000)
  45.     done := make(chan bool)
  46.     concurrency := 100
  47.     // 启动多个接收者
  48.     for i := 0; i < concurrency; i++ {
  49.         go func() {
  50.             for {
  51.                 select {
  52.                 case <-ch:
  53.                 case <-done:
  54.                     return
  55.                 }
  56.             }
  57.         }()
  58.     }
  59.     b.ResetTimer()
  60.     // 发送数据
  61.     for i := 0; i < b.N; i++ {
  62.         ch <- i
  63.     }
  64.     close(done)
  65. }
  66. // 测试select语句的性能
  67. func BenchmarkSelectStatement(b *testing.B) {
  68.     ch1 := make(chan int)
  69.     ch2 := make(chan int)
  70.     done := make(chan bool)
  71.     // 接收者goroutine
  72.     go func() {
  73.         for i := 0; i < b.N; i++ {
  74.             select {
  75.             case <-ch1:
  76.             case <-ch2:
  77.             }
  78.         }
  79.         done <- true
  80.     }()
  81.     b.ResetTimer()
  82.     // 交替发送数据到两个channel
  83.     for i := 0; i < b.N; i++ {
  84.         if i%2 == 0 {
  85.             ch1 <- i
  86.         } else {
  87.             ch2 <- i
  88.         }
  89.     }
  90.     <-done
  91. }
  92. // 测试channel的关闭性能
  93. func BenchmarkChannelClose(b *testing.B) {
  94.     for i := 0; i < b.N; i++ {
  95.         ch := make(chan int, 100)
  96.         for j := 0; j < 100; j++ {
  97.             ch <- j
  98.         }
  99.         close(ch)
  100.         // 读取所有数据直到channel关闭
  101.         for range ch {
  102.         }
  103.     }
  104. }
  105. // 测试超时控制的性能
  106. func BenchmarkChannelTimeout(b *testing.B) {
  107.     ch := make(chan int)
  108.     timeout := time.After(time.Millisecond)
  109.     b.ResetTimer()
  110.     for i := 0; i < b.N; i++ {
  111.         select {
  112.         case ch <- i:
  113.         case <-timeout:
  114.         default:
  115.         }
  116.     }
  117. }
复制代码
六、性能考虑


  • 缓冲区巨细

    • 根据实际需求设置
    • 考虑内存使用
    • 避免过大缓冲

  • goroutine数目

    • 控制并发级别
    • 避免过多goroutine
    • 考虑资源消耗

  • channel数目

    • 合理筹划channel数目
    • 避免过多channel
    • 得当复用channel


怎么样本日的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。末了,祝您早日实现财务自由,还请给个赞,谢谢!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

tsx81428

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表