浅谈errgroup的使用以及源码分析

打印 上一主题 下一主题

主题 893|帖子 893|积分 2679

本文讲解的是golang.org/x/sync这个包中的errgroup
1、errgroup 的基础介绍

学习过 Go 的朋友都知道 Go 实现并发编程是比较容易的事情,只需要使用go关键字就可以开启一个 goroutine。那对于并发场景中,如何实现goroutine的协调控制呢?常见的一种方式是使用sync.WaitGroup 来进行协调控制。
使用过sync.WaitGroup 的朋友知道,sync.WaitGroup 虽然可以实现协调控制,但是不能传递错误,那该如何解决呢?聪明的你可能马上想到使用 chan 或者是 context来传递错误,确实是可以的。那接下来,我们一起看看官方是怎么实现上面的需求的呢?
1.1 errgroup的安装

安装命令:
  1. go get golang.org/x/sync
  2. //下面的案例是基于v0.1.0 演示的
  3. go get golang.org/x/sync@v0.1.0
复制代码
1.2 errgroup的基础例子

这里我们需要请求3个url来获取数据,假设请求url2时报错,url3耗时比较久,需要等一秒。
  1. package main
  2. import (
  3.         "errors"
  4.         "fmt"
  5.         "golang.org/x/sync/errgroup"
  6.         "strings"
  7.         "time"
  8. )
  9. func main()  {
  10.         queryUrls := map[string]string{
  11.                 "url1": "http://localhost/url1",
  12.                 "url2": "http://localhost/url2",
  13.                 "url3": "http://localhost/url3",
  14.         }
  15.         var eg errgroup.Group
  16.         var results []string
  17.         for _, url := range queryUrls {
  18.                 url := url
  19.                 eg.Go(func() error {
  20.                         result, err := query(url)
  21.                         if err != nil {
  22.                                 return err
  23.                         }
  24.                         results = append(results, fmt.Sprintf("url:%s -- ret: %v", url, result))
  25.                         return nil
  26.                 })
  27.         }
  28.        
  29.   // group 的wait方法,等待上面的 eg.Go 的协程执行完成,并且可以接受错误
  30.         err := eg.Wait()
  31.         if err != nil {
  32.                 fmt.Println("eg.Wait error:", err)
  33.                 return
  34.         }
  35.         for k, v := range results {
  36.                 fmt.Printf("%v ---> %v\n", k, v)
  37.         }
  38. }
  39. func query(url string) (ret string, err error) {
  40.         // 假设这里是发送请求,获取数据
  41.         if strings.Contains(url, "url2") {
  42.                 // 假设请求 url2 时出现错误
  43.                 fmt.Printf("请求 %s 中....\n", url)
  44.                 return "", errors.New("请求超时")
  45.         } else if strings.Contains(url, "url3") {
  46.                 // 假设 请求 url3 需要1秒
  47.                 time.Sleep(time.Second*1)
  48.         }
  49.         fmt.Printf("请求 %s 中....\n", url)
  50.         return "success", nil
  51. }
复制代码
执行结果:
  1. 请求 http://localhost/url2 中....
  2. 请求 http://localhost/url1 中....
  3. 请求 http://localhost/url3 中....
  4. eg.Wait error: 请求超时
复制代码
果然,当其中一个goroutine出现错误时,会把goroutine中的错误传递出来。
我们自己运行一下上面的代码就会发现这样一个问题,请求 url2 出错了,但是依旧在请求 url3 。因为我们需要聚合 url1、url2、url3 的结果,所以当其中一个出现问题时,我们是可以做一个优化的,就是当其中一个出现错误时,取消还在执行的任务,直接返回结果,不用等待任务执行结果。
那应该如何做呢?
这里假设 url1 执行1秒,url2 执行报错,url3执行3秒。所以当url2报错后,就不用等url3执行结束就可以返回了。
[code]package mainimport (        "context"        "errors"        "fmt"        "golang.org/x/sync/errgroup"        "strings"        "time")func main()  {        queryUrls := map[string]string{                "url1": "http://localhost/url1",                "url2": "http://localhost/url2",                "url3": "http://localhost/url3",        }        var results []string        ctx, cancel := context.WithCancel(context.Background())        eg, errCtx := errgroup.WithContext(ctx)        for _, url := range queryUrls {                url := url                eg.Go(func() error {                        result, err := query(errCtx, url)                        if err != nil {        //其实这里不用手动取消,看完源码就知道为啥了                                cancel()                                return err                        }                        results = append(results, fmt.Sprintf("url:%s -- ret: %v", url, result))                        return nil                })        }        err := eg.Wait()        if err != nil {                fmt.Println("eg.Wait error:", err)                return        }        for k, v := range results {                fmt.Printf("%v ---> %v\n", k, v)        }}func query(errCtx context.Context, url string) (ret string, err error) {        fmt.Printf("请求 %s 开始....\n", url)        // 假设这里是发送请求,获取数据        if strings.Contains(url, "url2") {                // 假设请求 url2 时出现错误                time.Sleep(time.Second*2)                return "", errors.New("请求出错")        } else if strings.Contains(url, "url3") {                // 假设 请求 url3 需要1秒                select {                case
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

我爱普洱茶

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表