如何使用Go中的Weighted实现资源管理

打印 上一主题 下一主题

主题 911|帖子 911|积分 2733

1. 简介

本文将介绍 Go 语言中的 Weighted 并发原语,包括 Weighted 的基本使用方法、实现原理、使用注意事项等内容。能够更好地理解和应用 Weighted 来实现资源的管理,从而提高程序的稳定性。
2. 问题引入

在微服务架构中,我们的服务节点负责接收其他节点的请求,并提供相应的功能和数据。比如账户服务,其他服务需要获取账户信息,都会通过rpc请求向账户服务发起请求。
这些服务节点通常以集群的方式部署在服务器上,用于处理大量的并发请求。每个服务器都有其处理能力的上限,超过该上限可能导致性能下降甚至崩溃。
在部署服务时,通常会评估服务的并发量,并为其分配适当的资源以处理预期的请求负载。然而,在微服务架构中,存在着上游服务请求下游服务的场景。如果上游服务在某些情况下没有正确考虑并发量,或者由于某些异常情况导致大量请求发送给下游服务,那么下游服务可能面临超过其处理能力的问题。这可能导致下游服务的响应时间增加,甚至无法正常处理请求,进而影响整个系统的稳定性和可用性。下面用一个简单的代码来说明一下:
  1. package main
  2. import (
  3.         "fmt"
  4.         "net/http"
  5.         "sync"
  6. )
  7. func main() {
  8.         // 启动下游服务,用于处理请求
  9.         http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
  10.                 // 模拟下游服务的处理逻辑
  11.                 // ...
  12.                 // 完成请求处理后,从等待组中删除一个等待
  13.                 wg.Done()
  14.         })
  15.         // 启动下游服务的 HTTP 服务器
  16.         http.ListenAndServe(":8080", nil)
  17. }
复制代码
这里启动一个简单的HTTP服务器,由其来模拟下游服务,来接收上游服务的请求。下面我们启动一个简单的程序,由其来模拟上游服务发送请求:
  1. func main() {
  2.         // 创建一个等待组,用于等待所有请求完成
  3.         var wg sync.WaitGroup
  4.         // 模拟上游服务发送大量请求给下游服务
  5.         go func() {
  6.                 for i := 0; i < 1000000; i++ {
  7.                         wg.Add(1)
  8.                         go sendRequest(&wg)
  9.                 }
  10.         }()
  11.         // 等待所有请求完成
  12.         wg.Wait()
  13. }
  14. func sendRequest(wg *sync.WaitGroup) {
  15.         // 模拟上游服务发送请求给下游服务
  16.         resp, err := http.Get("http://localhost:8080/")
  17.         if err != nil {
  18.                 fmt.Println("请求失败:", err)
  19.         } else {
  20.                 fmt.Println("请求成功:", resp.Status)
  21.         }
  22.         // 请求完成后,通知等待组
  23.         wg.Done()
  24. }
复制代码
这里,我们同时启动了1000000个协程同时往HTTP服务器发送请求,如果服务器配置不够高,亦或者是请求量更多的情况下,已经超过了服务器的处理上限,服务器没有主够的资源去处理这些请求,此时将有可能直接将服务器打挂掉,服务直接不可用。在这种情况下,如果由于上游服务的问题,导致下游服务,甚至整个链路的系统都直接崩溃,这个是不合理的,此时需要有一些手段保护下游服务由于异常流量导致整个系统的崩溃。
这里对上面的场景进行分析,可以发现,此时是由于上游服务大量请求的过来,而当前服务并没有足够的资源去处理这些请求,但是并没有对其加以限制,而是继续处理,最终导致了整个系统的不可用。那么此时就应该进行限流,对并发请求量进行控制,对服务器能够处理的并发数进行合理评估,当并发请求数超过了限制,此时应该直接拒绝其访问,避免整个系统的不可用。
那问题来了,go语言中,有什么方法能够实现资源的管理,如果没有足够的资源,此时将直接返回,不对请求进行处理呢?其实go语言中有Weighted类型,在这种场景还挺合适的。下面我们将对其进行介绍。
3. 基本使用

3.1 基本介绍

Weighted 是 Go 语言中 golang.org/x/sync包中的一种类型,用于限制并发访问某个资源的数量。它提供了一种机制,允许调用者以不同的权重请求访问资源,并在资源可用时进行授予。
Weighted的定义如下,提供了Acquire,TryAcquire,Release三个方法:
  1. type Weighted struct {
  2.    size    int64
  3.    cur     int64
  4.    mu      sync.Mutex
  5.    waiters list.List
  6. }
  7. func (s *Weighted) Acquire(ctx context.Context, n int64) error{}
  8. func (s *Weighted) TryAcquire(n int64) bool{}
  9. func (s *Weighted) Release(n int64) {}
复制代码

  • Acquire: 以权重 n 请求获取资源,阻塞直到资源可用或上下文 ctx 结束。
  • TryAcquire: 尝试以权重 n 获取信号量,如果成功则返回 true,否则返回 false,并保持信号量不变。
  • Release:释放具有权重 n 的信号量。
3.2 权重说明

有时候,不同请求对资源的消耗是不同的。通过设置权重,你可以更好地控制不同请求对资源的使用情况。例如,某些请求可能需要更多的计算资源或更长的处理时间,你可以设置较高的权重来确保它们能够获取到足够的资源。
其次就是权重大只是代表着请求需要使用到的资源多,对于优先级并不会有作用。在Weighted 中,资源的许可是以先进先出(FIFO)的顺序分配的,而不是根据权重来决定获取的优先级。当有多个请求同时等待获取资源时,它们会按照先后顺序依次获取资源的许可。
假设先请求权重为 1 的资源,然后再请求权重为 2 的资源。如果当前可用的资源许可足够满足两个请求的总权重,那么先请求的权重为 1 的资源会先获取到许可,然后是后续请求的权重为 2 的资源。
  1. w.Acquire(context.Background(), 1) // 权重为 1 的请求先获取到资源许可
  2. w.Acquire(context.Background(), 2) // 权重为 2 的请求在权重为 1 的请求之后获取到资源许可
复制代码
3.3 基本使用

当使用Weighted来控制资源的并发访问时,通常需要以下几个步骤:

  • 创建Weighted实例,定义好最大资源数
  • 当需要资源时,调用Acquire方法占据资源
  • 当处理完成之后,调用Release方法释放资源
下面是一个简单的代码的示例,展示了如何使用Weighted实现资源控制:
  1. func main() {
  2.    // 1. 创建一个信号量实例,设置最大并发数
  3.    sem := semaphore.NewWeighted(10)
  4.    // 具体处理请求的函数
  5.    handleRequest := func(id int) {
  6.       // 2. 调用Acquire尝试获取资源
  7.       err := sem.Acquire(context.Background(), 1)
  8.       if err != nil {
  9.          fmt.Printf("Goroutine %d failed to acquire resource\n", id)
  10.       }
  11.       // 3. 成功获取资源,使用defer,在任务执行完之后,自动释放资源
  12.       defer sem.Release(1)
  13.       // 执行业务逻辑
  14.       return
  15.    }
  16.    // 模拟并发请求
  17.    for i := 0; i < 20; i++ {
  18.       go handleRequest(i)
  19.    }
  20.    time.Sleep(20 * time.Second)
  21. }
复制代码
首先,调用NewWeighted方法创建一个信号量实例,设置最大并发数为10。然后在每次请求处理前调用Acquire方法尝试获取资源,成功获取资源后,使用defer关键字,在任务执行完后自动释放资源,调用Release方法释放一个资源。
保证最多同时有10个协程获取资源。如果有更多的协程尝试获取资源,它们会等待其他协程释放资源后再进行获取。
4. 实现原理

4.1 设计初衷

Weighted类型的设计初衷是为了在并发环境中实现对资源的控制和限制。它提供了一种简单而有效的机制,允许在同一时间内只有一定数量的并发操作可以访问或使用特定的资源。
4.2 基本原理

Weighted类型的基本实现原理是基于计数信号量的概念。计数信号量是一种用于控制并发访问的同步原语,它维护一个可用资源的计数器。在Weighted中,该计数器表示可用的资源数量。
当一个任务需要获取资源时,它会调用Acquire方法。该方法首先会检查当前可用资源的数量,如果大于零,则表示有可用资源,并将计数器减一,任务获取到资源,并继续执行。如果当前可用资源的数量为零,则任务会被阻塞,直到有其他任务释放资源。
当一个任务完成对资源的使用后,它会调用Release方法来释放资源。该方法会将计数器加一,表示资源已经可用,其他被阻塞的任务可以继续获取资源并执行。
通过这种方式,Weighted实现了对资源的限制和控制。它确保在同一时间内只有一定数量的并发任务可以访问资源,超过限制的任务会被阻塞,直到有其他任务释放资源。这样可以有效地避免资源过度使用和竞争,保证系统的稳定性和性能。
4.3 代码实现

4.3.1 结构体定义

Weighted的结构体定义如下:
  1. type Weighted struct {
  2.    size    int64
  3.    cur     int64
  4.    mu      sync.Mutex
  5.    waiters list.List
  6. }
复制代码

  • size:表示资源的总数量,即可以同时获取的最大资源数量。
  • cur:表示当前已经被获取的资源数量。
  • mu:用于保护Weighted类型的互斥锁,确保并发安全性。
  • waiters:使用双向链表来存储等待获取资源的任务。
4.3.2 Acquire方法

Acquire方法将获取指定数量的资源。如果当前可用资源数量不足,调用此方法的任务将被阻塞,并加入到等待队列中。
[code]func (s *Weighted) Acquire(ctx context.Context, n int64) error {   // 1. 使用互斥锁s.mu对Weighted类型进行加锁,确保并发安全性。   s.mu.Lock()   // size - cur 代表剩余可用资源数,如果大于请求资源数n, 此时代表剩余可用资源 大于 需要的资源数   // 其次,Weighted资源分配的顺序是FIFO,如果等待队列不为空,当前请求就需要自动放到队列最后面   if s.size-s.cur >= n && s.waiters.Len() == 0 {      s.cur += n      s.mu.Unlock()      return nil   }    // s.size 代表最大资源数,如果需要的资源数 大于 最大资源数,此时直接返回错误   if n > s.size {      // Don't make other Acquire calls block on one that's doomed to fail.      s.mu.Unlock()
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

尚未崩坏

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表