ToB企服应用市场:ToB评测及商务社交产业平台

标题: LRU go cache的实现 [打印本页]

作者: 河曲智叟    时间: 2024-9-13 23:56
标题: LRU go cache的实现
LRU算法

LRU(Least Recently Used,近来最少使用)是一种缓存镌汰策略,用于决定当缓存满时,应该镌汰哪个缓存项。其根本思想是优先镌汰那些近来最少被访问的数据,假设近来使用的数据在未来一段时间内仍旧可能被再次使用。
LRU原理

LRU思想遵循以下逻辑:
假设缓存的容量为3,并且按照如下次序访问数据:
  1. A -> B -> C -> A -> D
复制代码
在每一步中,LRU缓存的状态如下:
可以看到,每次新项进入缓存时,最久未被使用的项会被镌汰。
LRU实现


LRU常通过一个双向链表和哈希表来实现:

当数据被访问时,将其移动到链表头部;当缓存满时,删除链表尾部的数据。
LRU的思想非常适用于缓存管理,因为它假设“近来被使用的数据未来可能还会被再次使用”,这是一个较为普遍的规律。
Redis LRU算法实现

在Redis中,LRU(Least Recently Used)缓存镌汰策略是一个重要的概念,特殊是当Redis运行在内存有限的环境下时。当Redis到达内存限制时,它需要根据某种策略来删除一些旧的数据,从而为新的数据腾出空间。在这种环境下,Redis提供了多种缓存镌汰策略,其中就包罗LRU策略。
Redis中的LRU策略主要体如今以下几个方面:
1. 内存镌汰策略

Redis答应你设置不同的内存镌汰策略,包罗但不限于LRU策略。可以通过设置参数 maxmemory-policy 来选择具体的策略。常见的LRU相关策略有:

Redis中其他与LRU相关的策略还包罗:

2. LRU算法的实现

Redis中的LRU算法并不是严格的LRU,而是近似LRU。原因在于Redis为了性能考虑,并没有对每个键都维护精确的访问时间,而是使用了采样算法。即每当需要镌汰键时,Redis并不会遍历全部键,而是随机选择多少个键,并从中选择近来最少使用的键进行镌汰。这个采样的数量可以通过设置 maxmemory-samples 参数来控制,通常默认为5。

3. LRU vs LFU

在Redis 4.0引入了LFU(Least Frequently Used)策略,这是一种更换LRU的新策略。与LRU不同,LFU是基于访问频率来镌汰数据的策略,可以通过设置 allkeys-lfu 或 volatile-lfu 来启用。
Redis中的LRU使用场景

如何启用LRU策略:
可以通过设置文件或命令行启动参数来设置Redis的LRU策略。例如:
  1. maxmemory 100mb
  2. maxmemory-policy allkeys-lru
复制代码
如许,当Redis到达100MB内存上限时,它会使用allkeys-lru策略来镌汰近来最少使用的键。
Redis的LRU策略通过这种简朴而高效的方式,确保了在内存受限的环境下,体系能够继续稳定地提供服务,并且在实际生产中广泛应用于各种缓存场景。
基于LRU的缓存库

go-cache

go-cache is an in-memory key:value store/cache similar to memcached that is suitable for applications running on a single machine. Its major advantage is that, being essentially a thread-safe map[string]interface{} with expiration times, it doesn’t need to serialize or transmit its contents over the network.
Any object can be stored, for a given duration or forever, and the cache can be safely used by multiple goroutines.
Although go-cache isn’t meant to be used as a persistent datastore, the entire cache can be saved to and loaded from a file (using c.Items() to retrieve the items map to serialize, and NewFrom() to create a cache from a deserialized one) to recover from downtime quickly. (See the docs for NewFrom() for caveats.)
Go-cache 是一个类似于 memcached 的内存键值存储/缓存,适用于运行在单机上的应用步调。它的主要优势在于,它本质上是一个支持线程安全的 map[string]interface{},并具有过期时间,因此不需要序列化或通过网络传输其内容。
任何对象都可以存储,可以设置存储时长或永久存储,并且缓存可以被多个 goroutine 安全使用。
只管 go-cache 并非计划为持久化数据存储,但整个缓存可以保存到文件中或从文件中加载(使用 c.Items() 检索要序列化的项映射,并使用 NewFrom() 从反序列化的缓存创建缓存),以便快速从停机状态规复。(有关 NewFrom() 的留意事项,请拜见文档。)
安装使用

Installation
go get github.com/patrickmn/go-cache
Usage
  1. import (
  2.         "fmt"
  3.         "github.com/patrickmn/go-cache"
  4.         "time"
  5. )
  6. func main() {
  7.         // Create a cache with a default expiration time of 5 minutes, and which
  8.         // purges expired items every 10 minutes
  9.         c := cache.New(5*time.Minute, 10*time.Minute)
  10.         // Set the value of the key "foo" to "bar", with the default expiration time
  11.         c.Set("foo", "bar", cache.DefaultExpiration)
  12.         // Set the value of the key "baz" to 42, with no expiration time
  13.         // (the item won't be removed until it is re-set, or removed using
  14.         // c.Delete("baz")
  15.         c.Set("baz", 42, cache.NoExpiration)
  16.         // Get the string associated with the key "foo" from the cache
  17.         foo, found := c.Get("foo")
  18.         if found {
  19.                 fmt.Println(foo)
  20.         }
  21.         // Since Go is statically typed, and cache values can be anything, type
  22.         // assertion is needed when values are being passed to functions that don't
  23.         // take arbitrary types, (i.e. interface{}). The simplest way to do this for
  24.         // values which will only be used once--e.g. for passing to another
  25.         // function--is:
  26.         foo, found := c.Get("foo")
  27.         if found {
  28.                 MyFunction(foo.(string))
  29.         }
  30.         // This gets tedious if the value is used several times in the same function.
  31.         // You might do either of the following instead:
  32.         if x, found := c.Get("foo"); found {
  33.                 foo := x.(string)
  34.                 // ...
  35.         }
  36.         // or
  37.         var foo string
  38.         if x, found := c.Get("foo"); found {
  39.                 foo = x.(string)
  40.         }
  41.         // ...
  42.         // foo can then be passed around freely as a string
  43.         // Want performance? Store pointers!
  44.         c.Set("foo", &MyStruct, cache.DefaultExpiration)
  45.         if x, found := c.Get("foo"); found {
  46.                 foo := x.(*MyStruct)
  47.                         // ...
  48.         }
  49. }
复制代码
代码剖析

go-cache源码只有两个go文件:

cache核心结构:
  1. // 值结构
  2. type Item struct {
  3.         Object     interface{}
  4.         Expiration int64
  5. }
  6. // 缓存核心结构
  7. type cache struct {
  8.     // 默认超时时间
  9.         defaultExpiration time.Duration
  10.         // 内存缓存
  11.         items             map[string]Item
  12.         // 读写锁
  13.         mu                sync.RWMutex
  14.         // 删除回调函数
  15.         onEvicted         func(string, interface{})
  16.         //
  17.         janitor           *janitor
  18. }
复制代码
起首来看入口的New函数,New函数内初始化了一个缓存存储items,并且通报给newCacheWithJanitor(带有GC的Cache),并且在newCache中设置默认超时时间,如果为0则不过期,最后如果GC隔断大于0,则启动GC并且向runtime注册一个类似析构函数的handler用于停止GC协程。
  1. func New(defaultExpiration, cleanupInterval time.Duration) *Cache {
  2.         items := make(map[string]Item)
  3.         // 传递超时时间,GC间隔...
  4.         return newCacheWithJanitor(defaultExpiration, cleanupInterval, items)
  5. }
  6. func newCacheWithJanitor(de time.Duration, ci time.Duration, m map[string]Item) *Cache {
  7.         c := newCache(de, m)
  8.         C := &Cache{c}
  9.         if ci > 0 {
  10.                 runJanitor(c, ci)
  11.                 runtime.SetFinalizer(C, stopJanitor)
  12.         }
  13.         return C
  14. }
  15. func newCache(de time.Duration, m map[string]Item) *cache {
  16.         if de == 0 {
  17.                 de = -1
  18.         }
  19.         c := &cache{
  20.                 defaultExpiration: de,
  21.                 items:             m,
  22.         }
  23.         return c
  24. }
复制代码
跳转到GC协程,runJanitor启动一个GC协程,GC协程隔断触发GC过期查抄并且删除过期的Key,当吸收到stop信号后,关闭计时器,退出GC协程。
  1. func (j *janitor) Run(c *cache) {
  2.         ticker := time.NewTicker(j.Interval)
  3.         for {
  4.                 select {
  5.                 case <-ticker.C:
  6.                         c.DeleteExpired()
  7.                 case <-j.stop:
  8.                         ticker.Stop()
  9.                         return
  10.                 }
  11.         }
  12. }
  13. func stopJanitor(c *Cache) {
  14.         c.janitor.stop <- true
  15. }
  16. // GC入口
  17. func runJanitor(c *cache, ci time.Duration) {
  18.         j := &janitor{
  19.                 Interval: ci,
  20.                 stop:     make(chan bool),
  21.         }
  22.         c.janitor = j
  23.         go j.Run(c)
  24. }
复制代码
然后就是key-val的设置,key为string类型,val支持恣意类型,并且答应用户自界说超时时间,通过写锁来保证协程安全数据同等,并且Set会覆盖汗青数据,而Add操作会先判定是否已经存在该Key,
  1. func (c *cache) Set(k string, x interface{}, d time.Duration) {
  2.         // "Inlining" of set
  3.         var e int64
  4.         if d == DefaultExpiration {
  5.                 d = c.defaultExpiration
  6.         }
  7.         if d > 0 {
  8.                 e = time.Now().Add(d).UnixNano()
  9.         }
  10.         // 加写锁保证协程安全
  11.         c.mu.Lock()
  12.         c.items[k] = Item{
  13.                 Object:     x,
  14.                 Expiration: e,
  15.         }
  16.         // go-cache作者无法容忍defer的栈操作带来的ns级的开销
  17.         // 因此不适用defer去释放锁
  18.         c.mu.Unlock()
  19. }
  20. func (c *cache) set(k string, x interface{}, d time.Duration) {
  21.         // 与Set完全一致
  22. }
  23. func (c *cache) Add(k string, x interface{}, d time.Duration) error {
  24.         c.mu.Lock()
  25.         _, found := c.get(k)
  26.         if found {
  27.                 c.mu.Unlock()
  28.                 return fmt.Errorf("Item %s already exists", k)
  29.         }
  30.         c.set(k, x, d)
  31.         c.mu.Unlock()
  32.         return nil
  33. }
复制代码
接下来是数据获取操作,去items中取值,如果已经被GC掉了,直接返回,否则使用lazy的思想去判定key是否过期,过期则返回空,否则返回值(空接口对象可用于类型断言),
  1. func (c *cache) Get(k string) (interface{}, bool) {
  2.         c.mu.RLock()
  3.         // "Inlining" of get and Expired
  4.         item, found := c.items[k]
  5.         if !found {
  6.                 c.mu.RUnlock()
  7.                 return nil, false
  8.         }
  9.         if item.Expiration > 0 {
  10.                 if time.Now().UnixNano() > item.Expiration {
  11.                         c.mu.RUnlock()
  12.                         return nil, false
  13.                 }
  14.         }
  15.         c.mu.RUnlock()
  16.         return item.Object, true
  17. }
复制代码
紧接着提供了Incr操作,答应用户对Number泛型的val进行自增操作,同理还提供了一系列派生Incr操作,不再一一赘述,
  1. func (c *cache) Increment(k string, n int64) error {
  2.         c.mu.Lock()
  3.         v, found := c.items[k]
  4.         if !found || v.Expired() {
  5.                 c.mu.Unlock()
  6.                 return fmt.Errorf("Item %s not found", k)
  7.         }
  8.         switch v.Object.(type) {
  9.         case int:
  10.                 v.Object = v.Object.(int) + int(n)
  11.         case int8:
  12.                 v.Object = v.Object.(int8) + int8(n)
  13.         case int16:
  14.                 v.Object = v.Object.(int16) + int16(n)
  15.         case int32:
  16.                 v.Object = v.Object.(int32) + int32(n)
  17.         case int64:
  18.                 v.Object = v.Object.(int64) + n
  19.         case uint:
  20.                 v.Object = v.Object.(uint) + uint(n)
  21.         case uintptr:
  22.                 v.Object = v.Object.(uintptr) + uintptr(n)
  23.         case uint8:
  24.                 v.Object = v.Object.(uint8) + uint8(n)
  25.         case uint16:
  26.                 v.Object = v.Object.(uint16) + uint16(n)
  27.         case uint32:
  28.                 v.Object = v.Object.(uint32) + uint32(n)
  29.         case uint64:
  30.                 v.Object = v.Object.(uint64) + uint64(n)
  31.         case float32:
  32.                 v.Object = v.Object.(float32) + float32(n)
  33.         case float64:
  34.                 v.Object = v.Object.(float64) + float64(n)
  35.         default:
  36.                 c.mu.Unlock()
  37.                 return fmt.Errorf("The value for %s is not an integer", k)
  38.         }
  39.         c.items[k] = v
  40.         c.mu.Unlock()
  41.         return nil
  42. }
复制代码
对于key的删除操作,则是加锁接纳内置的map delete函数实,如果设置了钩子函数则会调用一次钩子函数:
  1. func (c *cache) Delete(k string) {
  2.         c.mu.Lock()
  3.         v, evicted := c.delete(k)
  4.         c.mu.Unlock()
  5.         if evicted {
  6.                 c.onEvicted(k, v)
  7.         }
  8. }
  9. func (c *cache) delete(k string) (interface{}, bool) {
  10.         if c.onEvicted != nil {
  11.                 if v, found := c.items[k]; found {
  12.                         delete(c.items, k)
  13.                         return v.Object, true
  14.                 }
  15.         }
  16.         delete(c.items, k)
  17.         return nil, false
  18. }
复制代码
此外,作者还基于gob(gob即go binary是go开发组提供的一组序列化操作方法集)提供了序列化和反序列化操作,如果对gob生疏,可以理解为用json包将map进行序列化后存储到Writer中
  1. func (c *cache) Save(w io.Writer) (err error) {
  2.         enc := gob.NewEncoder(w)
  3.         defer func() {
  4.                 if x := recover(); x != nil {
  5.                         err = fmt.Errorf("Error registering item types with Gob library")
  6.                 }
  7.         }()
  8.         c.mu.RLock()
  9.         defer c.mu.RUnlock()
  10.         for _, v := range c.items {
  11.                 gob.Register(v.Object)
  12.         }
  13.         // map序列化
  14.         err = enc.Encode(&c.items)
  15.         return
  16. }
复制代码
最厥后看一下GC协程,当触发定时器时,遍历全部key-val一一判定哪些key过期,然后统一收集钩子函数,最后实行,
  1. func (c *cache) DeleteExpired() {
  2.         var evictedItems []keyAndValue
  3.         now := time.Now().UnixNano()
  4.         c.mu.Lock()
  5.         for k, v := range c.items {
  6.                 // "Inlining" of expired
  7.                 if v.Expiration > 0 && now > v.Expiration {
  8.                         ov, evicted := c.delete(k)
  9.                         if evicted {
  10.                                 evictedItems = append(evictedItems, keyAndValue{k, ov})
  11.                         }
  12.                 }
  13.         }
  14.         c.mu.Unlock()
  15.         for _, v := range evictedItems {
  16.                 c.onEvicted(v.key, v.value)
  17.         }
  18. }
复制代码
对于平凡cache,计划并没有特殊惊艳之处,尤其是是GC协程的处理,宛如一记猛棍,加锁遍历全库去删除key,在Set key时因为ns级别的defer开销而弃用defer,在此处则没有去思考体系开销,内存逐出策略采用较简朴的ttl策略,适合go语言入门时阅读,并且可以加入该作者的仓库共同开发。
除了标准的cache外,作者提供了一个sharded版本的cache,旨在创建一个具有比标准缓存更好算法复杂度的缓存,主要通过防止在添加项目时对整个缓存进行写锁定实现。
  1. type shardedCache struct {
  2.         seed    uint32
  3.         m       uint32
  4.         cs      []*cache
  5.         janitor *shardedJanitor
  6. }
  7. // djb2 with better shuffling. 5x faster than FNV with the hash.Hash overhead.
  8. func djb33(seed uint32, k string) uint32 {
  9.         var (
  10.             // 字符串的长度,类型为 uint32。
  11.                 l = uint32(len(k))
  12.                 // 初始哈希值,使用 5381 + seed + l 进行初始化,基于 djb2 算法的核心思想。
  13.                 d = 5381 + seed + l
  14.                 // 用于迭代字符串的索引。
  15.                 i = uint32(0)
  16.         )
  17.         // Why is all this 5x faster than a for loop?
  18.         // 如果字符串长度大于等于 4,则每次循环处理 4 个字符。
  19.         // 每个字符通过 d * 33 的形式进行混淆,再通过异或操作将字符值混入哈希值。
  20.         if l >= 4 {
  21.                 for i < l-4 {
  22.                         d = (d * 33) ^ uint32(k[i])
  23.                         d = (d * 33) ^ uint32(k[i+1])
  24.                         d = (d * 33) ^ uint32(k[i+2])
  25.                         d = (d * 33) ^ uint32(k[i+3])
  26.                         i += 4
  27.                 }
  28.         }
  29.         // 对剩余不足 4 个字符的部分进行处理,使用 switch 来选择如何处理 1 到 4 个字符,确保每个字符都影响到哈希值。
  30.         switch l - i {
  31.         case 1:
  32.         case 2:
  33.                 d = (d * 33) ^ uint32(k[i])
  34.         case 3:
  35.                 d = (d * 33) ^ uint32(k[i])
  36.                 d = (d * 33) ^ uint32(k[i+1])
  37.         case 4:
  38.                 d = (d * 33) ^ uint32(k[i])
  39.                 d = (d * 33) ^ uint32(k[i+1])
  40.                 d = (d * 33) ^ uint32(k[i+2])
  41.         }
  42.         return d ^ (d >> 16)
  43. }
  44. func (sc *shardedCache) bucket(k string) *cache {
  45.         return sc.cs[djb33(sc.seed, k)%sc.m]
  46. }
复制代码
关于桶的选择,作者引入了djb2算法并且本身做了改良,
  1. djb2 是一种简单而高效的哈希算法,由著名程序员 Daniel J. Bernstein 发明。
  2. 它常用于生成字符串的哈希值,因其简洁性和良好的分布性受到广泛使用,尤其是在字符串哈希表中。
  3. djb2 算法的核心是通过将哈希值乘以一个常数(通常为 33)并加上当前字符的 ASCII 值来逐步生成哈希值。
  4. FNV(Fowler-Noll-Vo)是一种著名的哈希算法,最早由 Glenn Fowler、Landon Curt Noll 和 Kiem-Phong Vo 开发,故而得名。FNV 的设计目标是简洁且快速,适用于字符串和其他数据的哈希运算。
  5. FNV 算法的核心思想是将哈希值乘以一个素数(通常是 16777619 或 1099511628211)并对输入数据逐字节进行异或(XOR)操作。
  6. FNV 主要有两个版本:
  7.    FNV-1:每一步先乘素数再异或输入数据。
  8.    FNV-1a:每一步先异或输入数据再乘素数。
  9. FNV 算法因其简单和有效的哈希值分布在许多系统中得到了广泛应用。尤其在哈希表、查找、数据校验等应用中,FNV 被视为一个不错的选择。
  10. FNV 就是一个典型的逐字符哈希算法,因为它每次只处理一个字符或字节,并对哈希值进行更新。
  11. 相比之下,某些哈希函数(如你提到的 djb33 优化版)可以通过批量处理多个字符来提高效率,减少循环的次数,这就是为什么它们可能会比逐字符哈希函数更快。
复制代码
了解完djb2后回到选择bucket的函数,发现存储桶是通过hash值与m进行取模运算得到的:
  1. func (sc *shardedCache) bucket(k string) *cache {
  2.         return sc.cs[djb33(sc.seed, k)%sc.m]
  3. }
复制代码
但m怎么初始化,作者并没有开发完成,但可以猜测一手,m的值为桶的数量。对于GC部门代码则完全调用标准cache的GC,没有服从提高。
总之,这是一个正在发展中的项目,并且也与LRU相差甚远,ChatGpt保举的仓库似乎也不是十分精准嘛。最后,作者个人卡片写了本身是clovyr公司的CTO…,本人对此知之甚少,竣事!

hashicorp/golang-lru

This provides the lru package which implements a fixed-size thread safe LRU cache. It is based on the cache in Groupcache.
这是一个国人写的库。
安装使用

Install:
go get github.com/hashicorp/golang-lru/v2
Usage:
  1. package main
  2. import (
  3.         "fmt"
  4.         "time"
  5.         "github.com/hashicorp/golang-lru/v2/expirable"
  6. )
  7. func main() {
  8.         // make cache with 10ms TTL and 5 max keys
  9.         cache := expirable.NewLRU[string, string](5, nil, time.Millisecond*10)
  10.         // set value under key1.
  11.         cache.Add("key1", "val1")
  12.         // get value under key1
  13.         r, ok := cache.Get("key1")
  14.         // check for OK value
  15.         if ok {
  16.                 fmt.Printf("value before expiration is found: %v, value: %q\n", ok, r)
  17.         }
  18.         // wait for cache to expire
  19.         time.Sleep(time.Millisecond * 12)
  20.         // get value under key1 after key expiration
  21.         r, ok = cache.Get("key1")
  22.         fmt.Printf("value after expiration is found: %v, value: %q\n", ok, r)
  23.         // set value under key2, would evict old entry because it is already expired.
  24.         cache.Add("key2", "val2")
  25.         fmt.Printf("Cache len: %d\n", cache.Len())
  26.         // Output:
  27.         // value before expiration is found: true, value: "val1"
  28.         // value after expiration is found: false, value: ""
  29.         // Cache len: 1
  30. }
复制代码
代码剖析

golang-lru的核心文件在simpleLru,并且在interface声明中,作者引入泛型界说了实现Get,Set等方法的对象均为Cache对象:

紧接着来看看作者本身是如何实现Cache接口的,可以看到作者也是界说了逐出回调,并且使用典范的map+list来实现LRU算法,其次作者将KV存储于自界说的entry中方便LRU操作,以及通过size限制答应存储的key的个数:
  1. // EvictCallback is used to get a callback when a cache entry is evicted
  2. type EvictCallback[K comparable, V any] func(key K, value V)
  3. // LRU implements a non-thread safe fixed size LRU cache
  4. type LRU[K comparable, V any] struct {
  5.         size      int
  6.         evictList *internal.LruList[K, V]
  7.         items     map[K]*internal.Entry[K, V]
  8.         onEvict   EvictCallback[K, V]
  9. }
  10. // NewLRU constructs an LRU of the given size
  11. func NewLRU[K comparable, V any](size int, onEvict EvictCallback[K, V]) (*LRU[K, V], error) {
  12.         if size <= 0 {
  13.                 return nil, errors.New("must provide a positive size")
  14.         }
  15.         c := &LRU[K, V]{
  16.                 size:      size,
  17.                 evictList: internal.NewList[K, V](),
  18.                 items:     make(map[K]*internal.Entry[K, V]),
  19.                 onEvict:   onEvict,
  20.         }
  21.         return c, nil
  22. }
复制代码
步入list源码,感觉和golang内置的list声明无太大差别,只是扩展了len参数用于标识当前list存储的key的个数,
  1. // Entry is an LRU Entry
  2. type Entry[K comparable, V any] struct {
  3.         next, prev *Entry[K, V]
  4.         // The list to which this element belongs.
  5.         list *LruList[K, V]
  6.         // The LRU Key of this element.
  7.         Key K
  8.         // The Value stored with this element.
  9.         Value V
  10.         // The time this element would be cleaned up, optional
  11.         ExpiresAt time.Time
  12.         // The expiry bucket item was put in, optional
  13.         ExpireBucket uint8
  14. }
  15. // PrevEntry returns the previous list element or nil.
  16. func (e *Entry[K, V]) PrevEntry() *Entry[K, V] {
  17.         if p := e.prev; e.list != nil && p != &e.list.root {
  18.                 return p
  19.         }
  20.         return nil
  21. }
  22. type LruList[K comparable, V any] struct {
  23.         root Entry[K, V] // sentinel list element, only &root, root.prev, and root.next are used
  24.         len  int         // current list Length excluding (this) sentinel element
  25. }
  26. // Init initializes or clears list l.
  27. func (l *LruList[K, V]) Init() *LruList[K, V] {
  28.         l.root.next = &l.root
  29.         l.root.prev = &l.root
  30.         l.len = 0
  31.         return l
  32. }
复制代码
回到cache,继续阅读设置/取值操作,可以看到在Add操作时,判定了key是否存在,如果存在则更新值并且将key的频率革新(对于LRU就是将key移到开头),否则将直接在开头追加一个节点,完成插入操作后,会判定是否超出限制来触发GC(对于LRU GC就是移除末端节点),
  1. func (c *LRU[K, V]) Add(key K, value V) (evicted bool) {
  2.         // Check for existing item
  3.         if ent, ok := c.items[key]; ok {
  4.                 c.evictList.MoveToFront(ent)
  5.                 ent.Value = value
  6.                 return false
  7.         }
  8.         // Add new item
  9.         ent := c.evictList.PushFront(key, value)
  10.         c.items[key] = ent
  11.         evict := c.evictList.Length() > c.size
  12.         // Verify size not exceeded
  13.         if evict {
  14.                 c.removeOldest()
  15.         }
  16.         return evict
  17. }
  18. // Get looks up a key's value from the cache.
  19. func (c *LRU[K, V]) Get(key K) (value V, ok bool) {
  20.         if ent, ok := c.items[key]; ok {
  21.                 c.evictList.MoveToFront(ent)
  22.                 return ent.Value, true
  23.         }
  24.         return
  25. }
复制代码
LRU的核心就是removeOldest函数了,可以看到是一个典范的操作:
  1. // removeOldest removes the oldest item from the cache.
  2. func (c *LRU[K, V]) removeOldest() {
  3.         if ent := c.evictList.Back(); ent != nil {
  4.                 c.removeElement(ent)
  5.         }
  6. }
  7. // removeElement is used to remove a given list element from the cache
  8. func (c *LRU[K, V]) removeElement(e *internal.Entry[K, V]) {
  9.         c.evictList.Remove(e)
  10.         delete(c.items, e.Key)
  11.         if c.onEvict != nil {
  12.                 c.onEvict(e.Key, e.Value)
  13.         }
  14. }
复制代码
以上就是simpleLru的全部内容,同时作者还提供了一个带有超时时间的Cache实现和一个更复杂的ARCCache和TwoQueueCache实现。
总而言之,simpleLru的内存逐出过于粗糙,size竟然是key的数量,通过比力list的元素个数和size进行GC,而不是内存的使用状态,如开头所言,过于粗糙。
回到代码最外层,lru.go中引用的也是simpleLru,也就是说,除非手动选择另外两种Cache,否则默认是最差的simpleLru,同时为了保证线程安全,内嵌了一个读写锁。

代码目次比力混乱,有些cache放到了单独文件夹中,有些cache又单独拿了出来,该项目更像是一个Cache组,提供了多种cache供用户使用,比力有吸引力的是ARCCache和TwoQueueCache,接下来剖析这两个Cache:
  1. // ARCCache is a thread-safe fixed size Adaptive Replacement Cache (ARC).
  2. // ARC is an enhancement over the standard LRU cache in that tracks both
  3. // frequency and recency of use. This avoids a burst in access to new
  4. // entries from evicting the frequently used older entries. It adds some
  5. // additional tracking overhead to a standard LRU cache, computationally
  6. // it is roughly 2x the cost, and the extra memory overhead is linear
  7. // with the size of the cache. ARC has been patented by IBM, but is
  8. // similar to the TwoQueueCache (2Q) which requires setting parameters.
  9. type ARCCache[K comparable, V any] struct {
  10.         size int // Size is the total capacity of the cache
  11.         p    int // P is the dynamic preference towards T1 or T2
  12.         t1 simplelru.LRUCache[K, V]        // T1 是用于最近访问条目的 LRU 缓存
  13.         b1 simplelru.LRUCache[K, struct{}] // B1 是从 t1 驱逐条目的 LRU 缓存
  14.         t2 simplelru.LRUCache[K, V]        // T2 是用于频繁访问条目的 LRU 缓存
  15.         b2 simplelru.LRUCache[K, struct{}] // B2 是从 t2 驱逐条目的 LRU 缓存
  16.         lock sync.RWMutex
  17. }
复制代码
ARCCache 是一个线程安全的固定大小的自顺应更换缓存(ARC)是标准 LRU 缓存的改进版,它同时跟踪使用频率和近来使用环境。 这可以避免由于对新条目的突发访问而驱逐掉常用的旧条目。 相比标准 LRU 缓存,它增加了一些额外的跟踪开销,计算成本约莫是标准 LRU 缓存的两倍,额外的内存开销与缓存大小成线性关系。
根据ARC结构体的描述,感觉和sync.Map一样做了读写分离,以淘汰读写之间加锁的开销,并且用读写锁保证某些环境下的数据同等性和并发安全。
接着往下看,NewARC中使用size去初始化simpleLru的长度,以及设置最大key的数量,
  1. func NewARC[K comparable, V any](size int) (*ARCCache[K, V], error) {
  2.         // Create the sub LRUs
  3.         b1, err := simplelru.NewLRU[K, struct{}](size, nil)
  4.         if err != nil {
  5.                 return nil, err
  6.         }
  7.         b2, err := simplelru.NewLRU[K, struct{}](size, nil)
  8.         if err != nil {
  9.                 return nil, err
  10.         }
  11.         t1, err := simplelru.NewLRU[K, V](size, nil)
  12.         if err != nil {
  13.                 return nil, err
  14.         }
  15.         t2, err := simplelru.NewLRU[K, V](size, nil)
  16.         if err != nil {
  17.                 return nil, err
  18.         }
  19.         // Initialize the ARC
  20.         c := &ARCCache[K, V]{
  21.                 size: size,
  22.                 p:    0,
  23.                 t1:   t1,
  24.                 b1:   b1,
  25.                 t2:   t2,
  26.                 b2:   b2,
  27.         }
  28.         return c, nil
  29. }
复制代码
关键操作在于Add,起首由于涉及多个写操作,干脆用一个写锁把整个方法括起来,Contains方法只是去simpleLru的map中取值,如果存在当前key,则认为是频繁访问的key,从t1有移动到t2中…
  1. // Add adds a value to the cache.
  2. func (c *ARCCache[K, V]) Add(key K, value V) {
  3.         c.lock.Lock()
  4.         defer c.lock.Unlock()
  5.         // Check if the value is contained in T1 (recent), and potentially
  6.         // promote it to frequent T2
  7.         // 如果t1存在当前key,则认为是频繁访问的key,从t1移动到t2中
  8.         if c.t1.Contains(key) {
  9.                 c.t1.Remove(key)
  10.                 c.t2.Add(key, value)
  11.                 return
  12.         }
  13.         // Check if the value is already in T2 (frequent) and update it
  14.         // t1不存在但t2存在key,则将新值覆盖到t2
  15.         if c.t2.Contains(key) {
  16.                 c.t2.Add(key, value)
  17.                 return
  18.         }
  19.         // Check if this value was recently evicted as part of the
  20.         // recently used list
  21.         // 检查该值是否最近被从最近使用列表中驱逐
  22.         // 如果当前key在t1的待删除cache中则扩容并且将key移动到常用队列中
  23.         if c.b1.Contains(key) {
  24.                 // T1 set is too small, increase P appropriately
  25.                 delta := 1
  26.                 b1Len := c.b1.Len()
  27.                 b2Len := c.b2.Len()
  28.                 if b2Len > b1Len {
  29.                         delta = b2Len / b1Len
  30.                 }
  31.                 if c.p+delta >= c.size {
  32.                         c.p = c.size
  33.                 } else {
  34.                         c.p += delta
  35.                 }
  36.                 // Potentially need to make room in the cache
  37.                 if c.t1.Len()+c.t2.Len() >= c.size {
  38.                         c.replace(false)
  39.                 }
  40.                 // Remove from B1
  41.                 c.b1.Remove(key)
  42.                 // Add the key to the frequently used list
  43.                 c.t2.Add(key, value)
  44.                 return
  45.         }
  46.         // Check if this value was recently evicted as part of the
  47.         // frequently used list
  48.         if c.b2.Contains(key) {
  49.                 // T2 set is too small, decrease P appropriately
  50.                 delta := 1
  51.                 b1Len := c.b1.Len()
  52.                 b2Len := c.b2.Len()
  53.                 if b1Len > b2Len {
  54.                         delta = b1Len / b2Len
  55.                 }
  56.                 if delta >= c.p {
  57.                         c.p = 0
  58.                 } else {
  59.                         c.p -= delta
  60.                 }
  61.                 // Potentially need to make room in the cache
  62.                 // 如果总长度大于上线,需要执行GC
  63.                 if c.t1.Len()+c.t2.Len() >= c.size {
  64.                         c.replace(true)
  65.                 }
  66.                 // Remove from B2
  67.                 c.b2.Remove(key)
  68.                 // Add the key to the frequently used list
  69.                 c.t2.Add(key, value)
  70.                 return
  71.         }
  72.         // Potentially need to make room in the cache
  73.         if c.t1.Len()+c.t2.Len() >= c.size {
  74.                 c.replace(false)
  75.         }
  76.         // Keep the size of the ghost buffers trim
  77.         if c.b1.Len() > c.size-c.p {
  78.                 c.b1.RemoveOldest()
  79.         }
  80.         if c.b2.Len() > c.p {
  81.                 c.b2.RemoveOldest()
  82.         }
  83.         // Add to the recently seen list
  84.         c.t1.Add(key, value)
  85. }
复制代码
综上,起首再次夸大下T1,T2,B1,B2的作用:

当缓存已满或靠近满时,ARC根据访问模式来调整T1和T2的大小比例p。扩容实在是调整缓存的权重:

因此,当Add检测到某个键近来被驱逐(在B1或B2中找到)时,ARC根据环境调整p的值。这个“扩容”实在是重新分配T1和T2的空间,确保缓存能动态顺应数据的访问模式。
一句话:扩容是为了根据数据的访问频率来动态调整T1和T2的容量,从而提高缓存的命中率和性能。
关于p字段的使用,需要跳到replace方法中:
  1. func (c *ARCCache[K, V]) replace(b2ContainsKey bool) {
  2.         t1Len := c.t1.Len()
  3.         if t1Len > 0 && (t1Len > c.p || (t1Len == c.p && b2ContainsKey)) {
  4.                 k, _, ok := c.t1.RemoveOldest()
  5.                 if ok {
  6.                         c.b1.Add(k, struct{}{})
  7.                 }
  8.         } else {
  9.                 k, _, ok := c.t2.RemoveOldest()
  10.                 if ok {
  11.                         c.b2.Add(k, struct{}{})
  12.                 }
  13.         }
  14. }
复制代码
在 Add 方法中,每当需要在缓存中腾出空间时,都会调用 replace 方法。replace 方法的举动取决于 p 的值,这个方法决定了是从 T1(近来访问队列)中移除数据,照旧从 T2(频繁访问队列)中移除数据:

此外还需要更新 B1 或 B2:

回到命中B2 命中时的逻辑,
  1. delta := 1
  2. b1Len := c.b1.Len()
  3. b2Len := c.b2.Len()
  4. if b1Len > b2Len {
  5.         delta = b1Len / b2Len
  6. }
  7. if delta >= c.p {
  8.         c.p = 0
  9. } else {
  10.         c.p -= delta
  11. }
复制代码
它的目的是通过淘汰 p 的值来缩小 T1(近来访问的数据)的空间,扩大 T2(频繁访问的数据)的空间。这是因为 B2 中的键表现某些数据曾经是频繁访问的,但由于空间限制被移除了;当它再次被访问时,ARC 认为频繁访问的数据可能更重要,所以需要淘汰 T1 的空间来扩大 T2。
这段代码是处理 B2 命中时的逻辑。它的目的是通过淘汰 p 的值来缩小 T1(近来访问的数据)的空间,扩大 T2(频繁访问的数据)的空间。这是因为 B2 中的键表现某些数据曾经是频繁访问的,但由于空间限制被移除了;当它再次被访问时,ARC 认为频繁访问的数据可能更重要,所以需要淘汰 T1 的空间来扩大 T2
逻辑分析:
   为什么要淘汰T1 的空间呢? 不能值扩大T2吗?
  淘汰 T1 的空间而不是直接扩大 T2 的原因是因为 ARC 算法的计划理念在于保持缓存总容量固定,同时动态调整 T1 和 T2
的大小比例,以顺应不同的访问模式。
  再来看TwoQueueCache的实现:
Two-Queue 缓存算法,它是对标准 LRU(Least Recently Used,近来最少使用)算法的改进。2Q 算法通过将缓存分为两个队列来管理近来使用和频繁使用的条目,以避免标准 LRU 的某些缺陷。
2Q 算法的核心思想:

改进之处:2Q 算法的改进之处在于通过分离近来使用和频繁使用的条目,有用防止短期内大量新访问的条目迅速占据缓存空间,导致那些重要的频繁访问条目被驱逐的题目。
  1. type TwoQueueCache[K comparable, V any] struct {
  2.         size        int
  3.         recentSize  int
  4.         recentRatio float64
  5.         ghostRatio  float64
  6.    
  7.     // 最近使用队列(A1)
  8.         recent      simplelru.LRUCache[K, V]
  9.         // 频繁使用队列(A2)
  10.         frequent    simplelru.LRUCache[K, V]
  11.         // 最近驱逐队列(B1)
  12.         recentEvict simplelru.LRUCache[K, struct{}]
  13.         // 读写锁,用于在多线程环境中保护缓存的一致性。
  14.         lock        sync.RWMutex
  15. }
复制代码
同时界说了两个比率:
  1. const (
  2.         // Default2QRecentRatio 是 2Q 缓存中专门用于最近添加的、只被访问过一次的条目的比例。
  3.         Default2QRecentRatio = 0.25
  4.         // Default2QGhostEntries 是用于跟踪最近被驱逐条目的幽灵条目的默认比例。
  5.         Default2QGhostEntries = 0.50
  6. )
复制代码
然后用size初始化双Q,用ghostSize初始化镌汰队列:
  1. func New2QParams[K comparable, V any](size int, recentRatio, ghostRatio float64) (*TwoQueueCache[K, V], error) {
  2.         ...
  3.         // Determine the sub-sizes
  4.         recentSize := int(float64(size) * recentRatio)
  5.         evictSize := int(float64(size) * ghostRatio)
  6.         // Allocate the LRUs
  7.         recent, err := simplelru.NewLRU[K, V](size, nil)
  8.         if err != nil {
  9.                 return nil, err
  10.         }
  11.         frequent, err := simplelru.NewLRU[K, V](size, nil)
  12.         if err != nil {
  13.                 return nil, err
  14.         }
  15.         recentEvict, err := simplelru.NewLRU[K, struct{}](evictSize, nil)
  16.         if err != nil {
  17.                 return nil, err
  18.         }
  19.         ...
  20. }
复制代码
接着来看一下2Q cache的Add计划思想:
  1. func (c *TwoQueueCache[K, V]) Add(key K, value V) {
  2.         c.lock.Lock()
  3.         defer c.lock.Unlock()
  4.         // Check if the value is frequently used already,
  5.         // and just update the value
  6.         // 如果在常用Q中就更新key的值
  7.         if c.frequent.Contains(key) {
  8.                 c.frequent.Add(key, value)
  9.                 return
  10.         }
  11.         // Check if the value is recently used, and promote
  12.         // the value into the frequent list
  13.         // 如果在最近使用Q中就移动到常用Q
  14.         if c.recent.Contains(key) {
  15.                 c.recent.Remove(key)
  16.                 c.frequent.Add(key, value)
  17.                 return
  18.         }
  19.         // If the value was recently evicted, add it to the
  20.         // frequently used list
  21.         // 如果在淘汰队列中则调用ensureSpace,并且将key-val添加到常用Q
  22.         if c.recentEvict.Contains(key) {
  23.                 c.ensureSpace(true)
  24.                 c.recentEvict.Remove(key)
  25.                 c.frequent.Add(key, value)
  26.                 return
  27.         }
  28.         // Add to the recently seen list
  29.         c.ensureSpace(false)
  30.         // 完全新的key,第一次添加到最近使用Q中
  31.         c.recent.Add(key, value)
  32. }
复制代码
总之,2Q的Add操作非经常规,然后再来看一下2Q中确保空间足够使用的方法:
  1. func (c *TwoQueueCache[K, V]) ensureSpace(recentEvict bool) {
  2.         // If we have space, nothing to do
  3.         recentLen := c.recent.Len()
  4.         freqLen := c.frequent.Len()
  5.         // 空间足够什么都不做
  6.         if recentLen+freqLen < c.size {
  7.                 return
  8.         }
  9.         // If the recent buffer is larger than
  10.         // the target, evict from there
  11.         // 当最近使用Q空间不足时,将key移动到淘汰Q中
  12.         if recentLen > 0 && (recentLen > c.recentSize || (recentLen == c.recentSize && !recentEvict)) {
  13.                 k, _, _ := c.recent.RemoveOldest()
  14.                 c.recentEvict.Add(k, struct{}{})
  15.                 return
  16.         }
  17.     // 如果recentQ足够大小
  18.         // Remove from the frequent list otherwise
  19.         c.frequent.RemoveOldest()
  20. }
复制代码
总体是非经常规的,最后调用c.frequent.RemoveOldest()是因为当 recent 队列不需要驱逐条目时,可能因为缓存总容量已经超出限制,需要进一步从 frequent 队列中驱逐条目。
接着是Get方法,先从常用Q中判定是否存在key,然后再去recent Q 中判定,c.recent.Peek(key) 留意Peek操作,它和Get不同之处在于,Get会触发LRU的list移动节点操作,而Peek则不会。
  1. func (c *TwoQueueCache[K, V]) Get(key K) (value V, ok bool) {
  2.         c.lock.Lock()
  3.         defer c.lock.Unlock()
  4.         // Check if this is a frequent value
  5.         if val, ok := c.frequent.Get(key); ok {
  6.                 return val, ok
  7.         }
  8.         // If the value is contained in recent, then we
  9.         // promote it to frequent
  10.         if val, ok := c.recent.Peek(key); ok {
  11.                 c.recent.Remove(key)
  12.                 c.frequent.Add(key, val)
  13.                 return val, ok
  14.         }
  15.         // No hit
  16.         return
  17. }
  18. // simple Lru的peek
  19. func (c *LRU[K, V]) Peek(key K) (value V, ok bool) {
  20.         var ent *internal.Entry[K, V]
  21.         if ent, ok = c.items[key]; ok {
  22.                 return ent.Value, true
  23.         }
  24.         return
  25. }
  26. // simple Lru的Get
  27. func (c *LRU[K, V]) Get(key K) (value V, ok bool) {
  28.         if ent, ok := c.items[key]; ok {
  29.                 c.evictList.MoveToFront(ent)
  30.                 return ent.Value, true
  31.         }
  32.         return
  33. }
复制代码
最后还有一个Resize方法,全局搜索只有在测试文件里使用过,既然是基于用户传入的size去做限制,如果改变了size,就与计划相悖了。
  1. func (c *TwoQueueCache[K, V]) Resize(size int) (evicted int) {
  2.         c.lock.Lock()
  3.         defer c.lock.Unlock()
  4.         // Recalculate the sub-sizes
  5.         recentSize := int(float64(size) * c.recentRatio)
  6.         evictSize := int(float64(size) * c.ghostRatio)
  7.         c.size = size
  8.         c.recentSize = recentSize
  9.         // ensureSpace
  10.         diff := c.recent.Len() + c.frequent.Len() - size
  11.         if diff < 0 {
  12.                 diff = 0
  13.         }
  14.         for i := 0; i < diff; i++ {
  15.                 c.ensureSpace(true)
  16.         }
  17.         // Reallocate the LRUs
  18.         c.recent.Resize(size)
  19.         c.frequent.Resize(size)
  20.         c.recentEvict.Resize(evictSize)
  21.         return diff
  22. }
复制代码
groupcache

groupcache is a caching and cache-filling library, intended as a replacement for memcached in many cases.
groupcache旨在更换memcached,groupcache 的计划初衷是作为一种分布式缓存办理方案,特殊适用于避免使用外部缓存服务的环境,例如Memcached或Redis。
十分风趣的一点是:does not require running a separate set of servers, thus massively reducing deployment/configuration pain. groupcache is a client library as well as a server. It connects to its own peers, forming a distributed cache.
就是与Redis等其他常用cache实现不同,groupcache并不运行在单独的server上,而是作为library和app运行在同一进程中。所以groupcache既是server也是client。
安装使用

Install:
  1. go get github.com/golang/groupcache
复制代码
Usage:
  1. package main
  2. import (
  3.     "fmt"
  4.     "log"
  5.     "net/http"
  6.     "github.com/golang/groupcache"
  7. )
  8. func main() {
  9.     // 定义一个名为 "example" 的缓存组,最大缓存容量为 64MB
  10.     var cacheGroup = groupcache.NewGroup("example", 64<<20, groupcache.GetterFunc(
  11.         func(ctx groupcache.Context, key string, dest groupcache.Sink) error {
  12.             // 模拟缓存失效时的数据获取逻辑
  13.             value := "Value for " + key
  14.             dest.SetString(value)
  15.             return nil
  16.         }))
  17.     http.HandleFunc("/cache/", func(w http.ResponseWriter, r *http.Request) {
  18.         key := r.URL.Path[len("/cache/"):]
  19.         var data string
  20.         err := cacheGroup.Get(nil, key, groupcache.StringSink(&data))
  21.         if err != nil {
  22.             http.Error(w, err.Error(), http.StatusInternalServerError)
  23.             return
  24.         }
  25.         fmt.Fprintf(w, "Cached value for %s: %s\n", key, data)
  26.     })
  27.     log.Println("Starting server on :8080")
  28.     log.Fatal(http.ListenAndServe(":8080", nil))
  29. }
复制代码
将以上代码保存为 main.go,然后运行:
  1. go run main.go
复制代码
服务会启动在 localhost:8080。可以通过浏览器或命令行访问:
  1. curl http://localhost:8080/cache/testkey
复制代码
这将返回类似于以下内容:
  1. Cached value for testkey: Value for testkey
复制代码
第一次访问时,groupcache 会通过 GetterFunc 添补缓存。随后的访问则会直接从缓存中返回该值。
分布式部署:要在多个节点之间使用 groupcache 进行分布式缓存,只需要在启动时指定每个节点的所在,并让它们相互毗连。例如,可以使用 groupcache.NewHTTPPool() 来设置每个节点的所在。
  1. peers := groupcache.NewHTTPPool("http://localhost:8080")
  2. peers.Set("http://localhost:8081", "http://localhost:8082")
复制代码
代码剖析

缓存镌汰算法

FIFO(First In First Out)
先进先出,也就是镌汰缓存中最老(最早添加)的记录。FIFO 认为,最早添加的记录,其不再被使用的可能性比刚添加的可能性大。这种算法的实现也非常简朴,创建一个队列,新增记录添加到队尾,每次内存不够时,镌汰队首。但是很多场景下,部门记录固然是最早添加但也最常被访问,而不得不因为呆的时间太长而被镌汰。这类数据会被频繁地添加进缓存,又被镌汰出去,导致缓存命中率降低。
LFU(Least Frequently Used)
最少使用,也就是镌汰缓存中访问频率最低的记录。LFU 认为,如果数据过去被访问多次,那么未来被访问的频率也更高。LFU 的实现需要维护一个按照访问次数排序的队列,每次访问,访问次数加1,队列重新排序,镌汰时选择访问次数最少的即可。LFU 算法的命中率是比力高的,但缺点也非常明显,维护每个记录的访问次数,对内存的消耗是很高的;另外,如果数据的访问模式发生变革,LFU 需要较长的时间去顺应,也就是说 LFU 算法受汗青数据的影响比力大。例如某个数据汗青上访问次数奇高,但在某个时间点之后几乎不再被访问,但因为汗青访问次数过高,而迟迟不能被镌汰。
LRU(Least Recently Used)
近来最少使用,相对于仅考虑时间因素的 FIFO 和仅考虑访问频率的 LFU,LRU 算法可以认为是相对均衡的一种镌汰算法。LRU 认为,如果数据近来被访问过,那么未来被访问的概率也会更高。LRU 算法的实现非常简朴,维护一个队列,如果某条记录被访问了,则移动到队尾,那么队首则是近来最少访问的数据,镌汰该条记录即可。

这张图很好地表现了 LRU 算法最核心的 2 个数据结构

起首来看groupcache的lru实现,可以看到groupcache使用了go原生的list来实现Lru(没有浪费时间去重新实现),而key使用空接口代表能传入恣意类型(任何可比力类型):
  1. type Key interface{}
  2. type entry struct {
  3.         key   Key
  4.         value interface{}
  5. }
  6. type Cache struct {
  7.         // MaxEntries is the maximum number of cache entries before
  8.         // an item is evicted. Zero means no limit.
  9.         MaxEntries int
  10.         // OnEvicted optionally specifies a callback function to be
  11.         // executed when an entry is purged from the cache.
  12.         OnEvicted func(key Key, value interface{})
  13.         ll    *list.List
  14.         cache map[interface{}]*list.Element
  15. }
  16. func New(maxEntries int) *Cache {
  17.         return &Cache{
  18.                 MaxEntries: maxEntries,
  19.                 ll:         list.New(),
  20.                 cache:      make(map[interface{}]*list.Element),
  21.         }
  22. }
复制代码
Get操作也十分的中规中矩,如果cache没有初始化则懒加载,同时在新增key的时间判定是否存在,移动节点,以及内存镌汰:
  1. func (c *Cache) Add(key Key, value interface{}) {
  2.         if c.cache == nil {
  3.                 c.cache = make(map[interface{}]*list.Element)
  4.                 c.ll = list.New()
  5.         }
  6.         if ee, ok := c.cache[key]; ok {
  7.                 c.ll.MoveToFront(ee)
  8.                 ee.Value.(*entry).value = value
  9.                 return
  10.         }
  11.         ele := c.ll.PushFront(&entry{key, value})
  12.         c.cache[key] = ele
  13.         if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
  14.                 c.RemoveOldest()
  15.         }
  16. }
复制代码
镌汰机制也非经常规:
  1. // RemoveOldest removes the oldest item from the cache.
  2. func (c *Cache) RemoveOldest() {
  3.         if c.cache == nil {
  4.                 return
  5.         }
  6.         ele := c.ll.Back()
  7.         if ele != nil {
  8.                 c.removeElement(ele)
  9.         }
  10. }
  11. func (c *Cache) removeElement(e *list.Element) {
  12.         c.ll.Remove(e)
  13.         kv := e.Value.(*entry)
  14.         delete(c.cache, kv.key)
  15.         if c.OnEvicted != nil {
  16.                 c.OnEvicted(kv.key, kv.value)
  17.         }
  18. }
复制代码
从以上代码可以看出,Cache结构是并发不安全的,欲知groupcache如何保证并发安全,请看下文!
并发缓存组

继续来阅读groupcache.go的代码,cache结构对lru.Cache又做了一层封装,并且加了一个读写锁和3个n标识,同时引入了基于内存占用的垃圾接纳:
  1. type cache struct {
  2.         mu         sync.RWMutex
  3.         nbytes     int64 // of all keys and values
  4.         lru        *lru.Cache
  5.         nhit, nget int64
  6.         nevict     int64 // number of evictions
  7. }
复制代码
紧接着对lru.Cache操作进行了二次封装,
  1. func (c *cache) add(key string, value ByteView) {
  2.     // add设计对map的并发读写不加锁会导致panic
  3.         c.mu.Lock()
  4.         defer c.mu.Unlock()
  5.         // 懒加载
  6.         if c.lru == nil {
  7.                 c.lru = &lru.Cache{
  8.                         OnEvicted: func(key lru.Key, value interface{}) {
  9.                             // 注册回调函数
  10.                                 val := value.(ByteView)
  11.                                 // 释放内存
  12.                                 c.nbytes -= int64(len(key.(string))) + int64(val.Len())
  13.                                 // 统计释放的key的个数
  14.                                 c.nevict++
  15.                         },
  16.                 }
  17.         }
  18.         // 加入key
  19.         c.lru.Add(key, value)
  20.         // 增加内存消耗计数
  21.         c.nbytes += int64(len(key)) + int64(value.Len())
  22. }
  23. func (c *cache) get(key string) (value ByteView, ok bool) {
  24.     // 为什么读操作还要加锁, 因为还涉及两个标识的自增
  25.         c.mu.Lock()
  26.         defer c.mu.Unlock()
  27.         // get次数+1
  28.         c.nget++
  29.         if c.lru == nil {
  30.                 return
  31.         }
  32.         vi, ok := c.lru.Get(key)
  33.         if !ok {
  34.                 return
  35.         }
  36.         // 命中次数+1
  37.         c.nhit++
  38.         return vi.(ByteView), true
  39. }
  40. func (c *cache) removeOldest() {
  41.         c.mu.Lock()
  42.         defer c.mu.Unlock()
  43.         if c.lru != nil {
  44.                 c.lru.RemoveOldest()
  45.         }
  46. }
  47. func (c *cache) bytes() int64 {
  48.         c.mu.RLock()
  49.         defer c.mu.RUnlock()
  50.         return c.nbytes
  51. }
  52. func (c *cache) items() int64 {
  53.         c.mu.RLock()
  54.         defer c.mu.RUnlock()
  55.         return c.itemsLocked()
  56. }
  57. func (c *cache) itemsLocked() int64 {
  58.         if c.lru == nil {
  59.                 return 0
  60.         }
  61.         // 返回list的元素个数
  62.         return int64(c.lru.Len())
  63. }
复制代码
同时声明一个状态结构,将cache的私有变量对外服务出去:
  1. type CacheStats struct {
  2.         Bytes     int64
  3.         Items     int64
  4.         Gets      int64
  5.         Hits      int64
  6.         Evictions int64
  7. }
  8. func (c *cache) stats() CacheStats {
  9.         c.mu.RLock()
  10.         defer c.mu.RUnlock()
  11.         return CacheStats{
  12.                 Bytes:     c.nbytes,
  13.                 Items:     c.itemsLocked(),
  14.                 Gets:      c.nget,
  15.                 Hits:      c.nhit,
  16.                 Evictions: c.nevict,
  17.         }
  18. }
复制代码
同时为了计算key-val结构占用的内存以及存储恣意类型数据(包罗图片,视频等数据),抽象了一层只读的ByteView出来:
  1. // ByteView 持有一个不可变的字节视图。
  2. // 在内部,它封装了一个 []byte 或一个 string,
  3. // 但这个细节对调用者是不可见的。
  4. type ByteView struct {
  5.         // If b is non-nil, b is used, else s is used.
  6.         b []byte
  7.         s string
  8. }
  9. // Len returns the view's length.
  10. func (v ByteView) Len() int {
  11.         if v.b != nil {
  12.                 return len(v.b)
  13.         }
  14.         return len(v.s)
  15. }
复制代码
cache也有了,为了提高存储本领和分级缓存机制,于是将cache放到了一个Group中:
  1. type Group struct {
  2.         name       string
  3.         getter     Getter
  4.         peersOnce  sync.Once
  5.         peers      PeerPicker
  6.         cacheBytes int64 // limit for sum of mainCache and hotCache size
  7.        
  8.         // 主存(高可信)
  9.         // mainCache is a cache of the keys for which this process
  10.         // (amongst its peers) is authoritative. That is, this cache
  11.         // contains keys which consistent hash on to this process's
  12.         // peer number.
  13.         mainCache cache
  14.     // 高热存储
  15.         hotCache cache
  16.         // loadGroup ensures that each key is only fetched once
  17.         // (either locally or remotely), regardless of the number of
  18.         // concurrent callers.
  19.         // loadGroup 确保每个键只被获取一次(无论是本地还是远程),
  20.     // 不论有多少并发调用者。
  21.         loadGroup flightGroup
  22.        
  23.         // 方便内存对齐
  24.         _ int32 // force Stats to be 8-byte aligned on 32-bit platforms
  25.         // Stats are statistics on the group.
  26.         Stats Stats
  27. }
复制代码
Group涉及到两个关键的知识点,同等性哈希和flightGroup.
同等性哈希

什么是同等性哈希?
同等性哈希算法是Cache 从单节点走向分布式节点的一个重要的环节。
对于分布式缓存来说,当一个节点吸收到哀求,如果该节点并没有存储缓存值,那么它面临的困难是,从谁那获取数据?本身,照旧节点1, 2, 3, 4… 。假设包罗本身在内一共有 10 个节点,当一个节点吸收到哀求时,随机选择一个节点,由该节点从数据源获取数据。
假设第一次随机选取了节点 1 ,节点 1 从数据源获取到数据的同时缓存该数据;那第二次,只有 1/10 的可能性再次选择节点 1, 有 9/10 的概率选择了其他节点,如果选择了其他节点,就意味着需要再一次从数据源获取数据,一样平常来说,这个操作是很耗时的。如许做,一是缓存服从低,二是各个节点上存储着雷同的数据,浪费了大量的存储空间。
那有什么办法,对于给定的 key,每一次都选择同一个节点呢?使用 hash 算法也能够做到这一点。那把 key 的每一个字符的 ASCII 码加起来,再除以 10 取余数可以吗?固然可以,这可以认为是自界说的 hash 算法。

从上面的图可以看到,恣意一个节点恣意时刻哀求查找键 Tom 对应的值,都会分配给节点 2,有用地办理了上述的题目。
但如果此时节点数量变了怎么办?简朴求取 Hash 值办理了缓存性能的题目,但是没有考虑节点数量变革的场景。假设,移除了其中一台节点,只剩下 9 个,那么之前 hash(key) % 10 变成了 hash(key) % 9,也就意味着几乎缓存值对应的节点都发生了改变。即几乎全部的缓存值都失效了。节点在吸收到对应的哀求时,均需要重新去数据源获取数据,容易引起 缓存雪崩。
  1. 缓存雪崩:缓存在同一时刻全部失效,造成瞬时DB请求量大、压力骤增,引起雪崩。
  2. 常因为缓存服务器宕机,或缓存设置了相同的过期时间引起。
复制代码
为了防止Hash函数简陋无法应对节点扩缩容导致的缓存全部失效引发的雪崩,引入了同等性哈希算法:同等性哈希算法(Consistent Hashing)是一种特殊的哈希算法,旨在为分布式体系中负载均衡和数据分布提供高效、均衡的办理方案。它的核心思想是将全部的缓存节点和数据映射到一个虚拟的圆环上,然后通过哈希函数将数据映射到环上的位置,以确定数据应存储在哪个节点上。
同等性哈希算法将 key 映射到 2^32 的空间中,将这个数字首尾相连,形成一个环。


环上有 peer2,peer4,peer6 三个节点,key11,key2,key27 均映射到 peer2,key23 映射到 peer4。此时,如果新增节点/呆板 peer8,假设它新增位置如图所示,那么只有 key27 从 peer2 调整到 peer8,其余的映射均没有发生改变。
也就是说,同等性哈希算法,在新增/删除节点时,只需要重新定位该节点附近的一小部门数据,而不需要重新定位全部的节点,这就办理了上述的题目。
同时,如果服务器的节点过少,容易引起 key 的倾斜。例如上面例子中的 peer2,peer4,peer6 分布在环的上半部门,下半部门是空的。那么映射到环下半部门的 key 都会被分配给 peer2,key 过度向 peer2 倾斜,缓存节点间负载不均。
为了办理这个题目,引入了虚拟节点的概念,一个真实节点对应多个虚拟节点。
假设 1 个真实节点对应 3 个虚拟节点,那么 peer1 对应的虚拟节点是 peer1-1、 peer1-2、 peer1-3(通常以添加编号的方式实现),其余节点也以雷同的方式操作。

虚拟节点扩充了节点的数量,办理了节点较少的环境下数据容易倾斜的题目。而且代价非常小,只需要增加一个字典(map)维护真实节点与虚拟节点的映射关系即可。
我们跳转到consistenthash.go文件内看看go官方是怎么实现的,
  1. type Hash func(data []byte) uint32
  2. type Map struct {
  3.     // Hash 函数 hash
  4.         hash     Hash
  5.         // 虚拟节点倍数 replicas
  6.         replicas int
  7.         // 哈希环 keys
  8.         keys     []int // Sorted
  9.         // 虚拟节点与真实节点的映射表 hashMap,键是虚拟节点的哈希值,值是真实节点的名称。
  10.         hashMap  map[int]string
  11. }
复制代码
PS:不要再问为啥用uint和32位了,如果你确定一个数不可能是负数,为啥要浪费1bit去存储符号? 32位是因为有些体系寻址只支持32bit,go官方写一个开源的缓存,不可能丢弃掉32bit的用户,况且32bit在64bit上也能顺利运行。

接着往下看,go官方默认会采用src32.ChecksumIEEE作为hash函数,
   CRC32(Cyclic Redundancy Check
32-bit)是一种广泛使用的错误检测机制,用于检测数据在存储或传输过程中是否发生了错误。CRC32 使用 32
位的校验和,它通过多项式除法来生成一个 32 位的值,该值可以用来检测数据的完整性。
  1. func New(replicas int, fn Hash) *Map {
  2.         m := &Map{
  3.                 replicas: replicas,
  4.                 hash:     fn,
  5.                 hashMap:  make(map[int]string),
  6.         }
  7.         if m.hash == nil {
  8.                 m.hash = crc32.ChecksumIEEE
  9.         }
  10.         return m
  11. }
复制代码
当key加入缓存时,同时加入到hash环中:
  1. func (m *Map) Add(keys ...string) {
  2.         for _, key := range keys {
  3.                 for i := 0; i < m.replicas; i++ {
  4.                     // 计算key+虚节点的hash值
  5.                         hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
  6.                         // 把key的hash加入到总key中
  7.                         m.keys = append(m.keys, hash)
  8.                         // 添加hash到key的映射
  9.                         m.hashMap[hash] = key
  10.                 }
  11.         }
  12.         sort.Ints(m.keys)
  13. }
  14. func (m *Map) Get(key string) string {
  15.         if m.IsEmpty() {
  16.                 return ""
  17.         }
  18.         hash := int(m.hash([]byte(key)))
  19.         // Binary search for appropriate replica.
  20.         idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })
  21.         // Means we have cycled back to the first replica.
  22.         if idx == len(m.keys) {
  23.                 idx = 0
  24.         }
  25.         return m.hashMap[m.keys[idx]]
  26. }
复制代码
防止缓存击穿——singleflight 的使用

singleflight 是一种编程模式或技能,用于办理同一时间内对雷同资源的重复哀求,避免并发调用的抖动或过载题目。该模式确保在并发环境下,雷同的哀求只会被实行一次,其余的并发哀求会等待第一个哀求的结果,而不是重复实行雷同的操作。
在 Go 语言中,singleflight 是由 golang.org/x/sync/singleflight 包提供的一个实现,用于避免并发的重复工作。
核心思想:

singleflight 主要用于以下场景:

小例子:
  1. package main
  2. import (
  3.     "fmt"
  4.     "golang.org/x/sync/singleflight"
  5.     "time"
  6. )
  7. func main() {
  8.     var group singleflight.Group
  9.     // 模拟三个并发请求
  10.     for i := 0; i < 3; i++ {
  11.         go func(i int) {
  12.             result, _, _ := group.Do("key", func() (interface{}, error) {
  13.                 fmt.Printf("Request %d: Executing...\n", i)
  14.                 time.Sleep(2 * time.Second)  // 模拟长时间的操作
  15.                 return "Result", nil
  16.             })
  17.             fmt.Printf("Request %d: Result = %s\n", i, result)
  18.         }(i)
  19.     }
  20.     time.Sleep(3 * time.Second)
  21. }
复制代码
groupCache在groupcache.go提供了flightGroup接口,并在singleflight给出了实现:
  1. type flightGroup interface {
  2.         // Done is called when Do is done.
  3.         Do(key string, fn func() (interface{}, error)) (interface{}, error)
  4. }
复制代码
  1. package singleflight
  2. import "sync"
  3. // call 是正在进行或已完成的 Do 调用
  4. type call struct {
  5.         wg  sync.WaitGroup
  6.         val interface{}
  7.         err error
  8. }
  9. type Group struct {
  10.         mu sync.Mutex       // protects m
  11.         // m是key-call的绑定
  12.         m  map[string]*call // lazily initialized
  13. }
  14. // 如果出现重复请求,重复的调用者会等待原始调用完成,并接收相同的结果。
  15. func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
  16.     // 加锁保护m
  17.         g.mu.Lock()
  18.         if g.m == nil {
  19.                 g.m = make(map[string]*call)
  20.         }
  21.        
  22.         // 判断当前key是不是正在被call
  23.         if c, ok := g.m[key]; ok {
  24.             // 如果是则释放锁,因为我们不需要改变任何东西
  25.             // 只需要等待call返回
  26.                 g.mu.Unlock()
  27.                 // 等待
  28.                 c.wg.Wait()
  29.                 // 返回key对应的val
  30.                 return c.val, c.err
  31.         }
  32.        
  33.         // 当前key第一次被call
  34.         c := new(call)
  35.         // wg的并发g计数器加1
  36.         c.wg.Add(1)
  37.         g.m[key] = c
  38.         // 此时不需要再操作m,所以把锁放掉即可
  39.         g.mu.Unlock()
  40.    
  41.         // 所有协程各自调用自己的fn()
  42.         c.val, c.err = fn()
  43.         // 执行完成就给各自的call wg的并发g计数器-1
  44.         c.wg.Done()
  45.     // 再次加锁删除掉调用的纪录
  46.         g.mu.Lock()
  47.         delete(g.m, key)
  48.         g.mu.Unlock()
  49.         return c.val, c.err
  50. }
复制代码
接口型函数

回到groupcache.go中,发现作者针对取值的举动界说了一个接口型函数,
  1. // A Getter loads data for a key.
  2. type Getter interface {
  3.         // Get returns the value identified by key, populating dest.
  4.         //
  5.         // The returned data must be unversioned. That is, key must
  6.         // uniquely describe the loaded data, without an implicit
  7.         // current time, and without relying on cache expiration
  8.         // mechanisms.
  9.         Get(ctx context.Context, key string, dest Sink) error
  10. }
  11. // A GetterFunc implements Getter with a function.
  12. type GetterFunc func(ctx context.Context, key string, dest Sink) error
  13. func (f GetterFunc) Get(ctx context.Context, key string, dest Sink) error {
  14.         return f(ctx, key, dest)
  15. }
复制代码
接口型函数在go中被大量用到,再举一个net/http的例子:
  1. type Handler interface {
  2.         ServeHTTP(ResponseWriter, *Request)
  3. }
  4. type HandlerFunc func(ResponseWriter, *Request)
  5. func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
  6.         f(w, r)
  7. }
复制代码
想象这么一个使用场景,GetData 的作用是从某数据源获取结果,接口类型 Getter 是其中一个参数,代表某数据源,认为只要是实现了Get(string)的就是数据源,
  1. func GetData(getter Getter, key string) []byte {
  2.         buf, err := getter.Get(key)
  3.         if err == nil {
  4.                 return buf
  5.         }
  6.         return nil
  7. }
复制代码
可以有多种方式调用该函数:
方法1:传入平凡函数,但用Getter强转:
  1. func test(key string) ([]byte, error) {
  2.         return []byte(key), nil
  3. }
  4. func main() {
  5.     GetFromSource(GetterFunc(test), "hello")
  6. }
复制代码
方法2:实现了 Getter 接口的结构体作为参数
  1. type DB struct{ url string}
  2. func (db *DB) Query(sql string, args ...string) string {
  3.         // ...
  4.         return "hello"
  5. }
  6. func (db *DB) Get(key string) ([]byte, error) {
  7.         // ...
  8.         v := db.Query("SELECT NAME FROM TABLE WHEN NAME= ?", key)
  9.         return []byte(v), nil
  10. }
  11. func main() {
  12.         GetFromSource(new(DB), "hello")
  13. }
复制代码
如许,既能够将平凡的函数类型(需类型转换)作为参数,也可以将结构体作为参数,使用更为灵活,可读性也更好,这就是接口型函数的代价。如果是平凡结构体作为参数,则会直打仗发结构体的Get方法,如果是函数作为参数,则可以把函数的返回值作为Get的结果,非常巧妙。
对于net/http包的接口型函数同理,如果传入的是一个实现了 Handler 接口的结构体,就可以完全托管全部的 HTTP 哀求,后续怎么路由,怎么处理,哀求前后增加什么功能,都可以自界说了。
“Set”/Get方法

回归到groupcache的核心,使用NewGroup初始化得到一个cache,
  1. var (
  2.         mu     sync.RWMutex
  3.         groups = make(map[string]*Group)
  4.         initPeerServerOnce sync.Once
  5.         initPeerServer     func()
  6. )
  7. func callInitPeerServer() {
  8.         if initPeerServer != nil {
  9.                 initPeerServer()
  10.         }
  11. }
  12. func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
  13.         return newGroup(name, cacheBytes, getter, nil)
  14. }
  15. // If peers is nil, the peerPicker is called via a sync.Once to initialize it.
  16. func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
  17.         if getter == nil {
  18.                 panic("nil Getter")
  19.         }
  20.         mu.Lock()
  21.         defer mu.Unlock()
  22.         // 初始化
  23.         initPeerServerOnce.Do(callInitPeerServer)
  24.         // group的名称不允许重复
  25.         if _, dup := groups[name]; dup {
  26.                 panic("duplicate registration of group " + name)
  27.         }
  28.         g := &Group{
  29.                 name:       name,
  30.                 getter:     getter,
  31.                 peers:      peers,
  32.                 cacheBytes: cacheBytes,
  33.                 // singleflight调用组
  34.                 loadGroup:  &singleflight.Group{},
  35.         }
  36.         if fn := newGroupHook; fn != nil {
  37.                 fn(g)
  38.         }
  39.         groups[name] = g
  40.         return g
  41. }
复制代码
同时提供了两个注册钩子的方法,用于开启分布式缓存服务,
  1. func RegisterNewGroupHook(fn func(*Group)) {
  2.         if newGroupHook != nil {
  3.                 panic("RegisterNewGroupHook called more than once")
  4.         }
  5.         newGroupHook = fn
  6. }
  7. // RegisterServerStart registers a hook that is run when the first
  8. // group is created.
  9. func RegisterServerStart(fn func()) {
  10.         if initPeerServer != nil {
  11.                 panic("RegisterServerStart called more than once")
  12.         }
  13.         initPeerServer = fn
  14. }
复制代码
对于getter参数,可以从test文件里看到官方的传值样例:

groupcache.Group 本身不直接提供显式的 set 方法。这是因为 groupcache 的计划理念是只通过缓存加载函数(Getter)进行数据添补,而不是手动设置缓存值。groupcache 的工作机制类似于一个读缓存,当缓存中没有数据时,它会触发缓存加载函数来获取数据,并将结果存储在缓存中,之后再从缓存中读取。
它的工作流程如下:

核心逻辑在于Get方法:
  1. // 在不初始化picker的情况下会返回NoPeers
  2. func getPeers(groupName string) PeerPicker {
  3.         if portPicker == nil {
  4.                 return NoPeers{}
  5.         }
  6.         pk := portPicker(groupName)
  7.         if pk == nil {
  8.                 pk = NoPeers{}
  9.         }
  10.         return pk
  11. }
  12. // NewGroup给传入的peers为nil
  13. // 所以会100%调用一次getPeers
  14. func (g *Group) initPeers() {
  15.         if g.peers == nil {
  16.                 // g.peers为nil
  17.                 g.peers = getPeers(g.name)
  18.         }
  19. }
  20. func (g *Group) Get(ctx context.Context, key string, dest Sink) error {
  21.     // once控制调用一次初始化节点
  22.         g.peersOnce.Do(g.initPeers)
  23.        
  24.         // 调用1次get就给Stats的get计数器+1
  25.         g.Stats.Gets.Add(1)
  26.         if dest == nil {
  27.                 return errors.New("groupcache: nil dest Sink")
  28.         }
  29.         // 查询key是否存在
  30.         value, cacheHit := g.lookupCache(key)
  31.         if cacheHit {
  32.             // 如果缓存命中后命中次数+1
  33.                 g.Stats.CacheHits.Add(1)
  34.                 return setSinkView(dest, value)
  35.         }
  36.         // Optimization to avoid double unmarshalling or copying: keep
  37.         // track of whether the dest was already populated. One caller
  38.         // (if local) will set this; the losers will not. The common
  39.         // case will likely be one caller.
  40.         destPopulated := false
  41.         value, destPopulated, err := g.load(ctx, key, dest)
  42.         if err != nil {
  43.                 return err
  44.         }
  45.         if destPopulated {
  46.                 return nil
  47.         }
  48.         return setSinkView(dest, value)
  49. }
  50. func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
  51.     // 如果未设置key直接返回
  52.         if g.cacheBytes <= 0 {
  53.                 return
  54.         }
  55.         // 如果在mainCache找也返回
  56.         value, ok = g.mainCache.get(key)
  57.         if ok {
  58.                 return
  59.         }
  60.         // 最后取hotCache中去找
  61.         value, ok = g.hotCache.get(key)
  62.         return
  63. }
复制代码
当从缓存中取到值(BytesView)后,调用setSinkView优化数据的通报路径,淘汰内存拷贝操作,提升性能。
  1. // sink 通常用来表示数据的最终接收者或目标。
  2. func setSinkView(s Sink, v ByteView) error {
  3.         // A viewSetter is a Sink that can also receive its value from
  4.         // a ByteView. This is a fast path to minimize copies when the
  5.         // item was already cached locally in memory (where it's
  6.         // cached as a ByteView)
  7.         type viewSetter interface {
  8.                 setView(v ByteView) error
  9.         }
  10.         if vs, ok := s.(viewSetter); ok {
  11.                 return vs.setView(v)
  12.         }
  13.         if v.b != nil {
  14.                 return s.SetBytes(v.b)
  15.         }
  16.         return s.SetString(v.s)
  17. }
复制代码
如果没有命中缓存则把key加入缓存,
  1.         ...
  2.         destPopulated := false
  3.         value, destPopulated, err := g.load(ctx, key, dest)
  4.         if err != nil {
  5.                 return err
  6.         }
  7.         // 不再二次填充
  8.         if destPopulated {
  9.                 return nil
  10.         }
  11.         return setSinkView(dest, value)
  12. }
  13. func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
  14.     // load计数器+1
  15.         g.Stats.Loads.Add(1)
  16.         viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
  17.                 // Check the cache again because singleflight can only dedup calls
  18.                 // that overlap concurrently.  It's possible for 2 concurrent
  19.                 // requests to miss the cache, resulting in 2 load() calls.  An
  20.                 // unfortunate goroutine scheduling would result in this callback
  21.                 // being run twice, serially.  If we don't check the cache again,
  22.                 // cache.nbytes would be incremented below even though there will
  23.                 // be only one entry for this key.
  24.                 //
  25.                 // Consider the following serialized event ordering for two
  26.                 // goroutines in which this callback gets called twice for the
  27.                 // same key:
  28.                 // 1: Get("key")
  29.                 // 2: Get("key")
  30.                 // 1: lookupCache("key")
  31.                 // 2: lookupCache("key")
  32.                 // 1: load("key")
  33.                 // 2: load("key")
  34.                 // 1: loadGroup.Do("key", fn)
  35.                 // 1: fn()
  36.                 // 2: loadGroup.Do("key", fn)
  37.                 // 2: fn()
  38.                 if value, cacheHit := g.lookupCache(key); cacheHit {
  39.                         g.Stats.CacheHits.Add(1)
  40.                         return value, nil
  41.                 }
  42.                
  43.                 g.Stats.LoadsDeduped.Add(1)
  44.                 var value ByteView
  45.                 var err error
  46.                 // 从其他节点尝试获取key
  47.                 if peer, ok := g.peers.PickPeer(key); ok {
  48.                         value, err = g.getFromPeer(ctx, peer, key)
  49.                         if err == nil {
  50.                                 g.Stats.PeerLoads.Add(1)
  51.                                 return value, nil
  52.                         }
  53.                         g.Stats.PeerErrors.Add(1)
  54.                         // TODO(bradfitz): log the peer's error? keep
  55.                         // log of the past few for /groupcachez?  It's
  56.                         // probably boring (normal task movement), so not
  57.                         // worth logging I imagine.
  58.                 }
  59.                 // 从本地获取key-val
  60.                 value, err = g.getLocally(ctx, key, dest)
  61.                 if err != nil {
  62.                         g.Stats.LocalLoadErrs.Add(1)
  63.                         return nil, err
  64.                 }
  65.                 g.Stats.LocalLoads.Add(1)
  66.                 destPopulated = true // only one caller of load gets this return value
  67.                 // 触发一次垃圾回收
  68.                 g.populateCache(key, value, &g.mainCache)
  69.                 return value, nil
  70.         })
  71.         if err == nil {
  72.                 value = viewi.(ByteView)
  73.         }
  74.         return
  75. }
  76. func (g *Group) getLocally(ctx context.Context, key string, dest Sink) (ByteView, error) {
  77.     // getter此刻被调用
  78.         err := g.getter.Get(ctx, key, dest)
  79.         if err != nil {
  80.                 return ByteView{}, err
  81.         }
  82.         return dest.view()
  83. }
  84. func (g *Group) populateCache(key string, value ByteView, cache *cache) {
  85.         if g.cacheBytes <= 0 {
  86.                 return
  87.         }
  88.         // 先把key-val加入缓存再淘汰旧的数据
  89.         cache.add(key, value)
  90.         // Evict items from cache(s) if necessary.
  91.         for {
  92.                 mainBytes := g.mainCache.bytes()
  93.                 hotBytes := g.hotCache.bytes()
  94.                 if mainBytes+hotBytes <= g.cacheBytes {
  95.                         return
  96.                 }
  97.                 // TODO(bradfitz): this is good-enough-for-now logic.
  98.                 // It should be something based on measurements and/or
  99.                 // respecting the costs of different resources.
  100.                 victim := &g.mainCache
  101.                 if hotBytes > mainBytes/8 {
  102.                         victim = &g.hotCache
  103.                 }
  104.                 victim.removeOldest()
  105.         }
  106. }
复制代码
镌汰策略优先镌汰主存,如果 hotCache 中的使用量(hotBytes)凌驾了 mainCache 使用量的 1/8(mainBytes/8),则选择 hotCache 作为镌汰对象。这表明 hotCache 中的数据量相对较大,可能需要优先进行镌汰。
hotCache 存储了热点数据,通常这些数据被频繁访问。保持热点数据的有用性可以显著提高体系性能。如果热点缓存的数据量很大,选择 hotCache 作为镌汰对象的逻辑实际上是为了避免 hotCache 过度膨胀。如果 hotCache 中的数据量过大,它可能占用了过多的缓存空间,影响到主缓存(mainCache)的性能和可用空间。因此,得当的镌汰可以资助控制 hotCache 的大小,使其保持在公道的范围内,从而避免对团体体系性能产生负面影响。
在 groupcache 的 load 方法中,destPopulated 的作用是指示是否有数据被成功添补到 dest 中。具体来说,它用于标记 dest 是否在缓存加载过程中被添补了数据。这是一个重要的机制,以确保只有一个调用者会负责添补缓存数据,避免重复的添补操作。

内存计算的思考

以上LRU cache实现,有各种各样的镌汰策略,只有groupcache是基于当前key占用的字节大小进行内存镌汰,其他cache以致用用户传入size作为阈值…
groupcache的内存占用计算:
  1. c.nbytes += int64(len(key)) + int64(value.Len())
复制代码
string和[]bytes实际占用的内存应该不是简朴的len(),string 类型在 Go 中是一个只读的字符串类型,底层由一个指向字节数组的指针和一个表现长度的整数构成。[]byte 在 Go 中是一个切片类型,底层结构大抵如下:
  1. type stringHeader struct {
  2.     Data uintptr
  3.     Len  int
  4. }
  5. type sliceHeader struct {
  6.     Data uintptr
  7.     Len  int
  8.     Cap  int
  9. }
复制代码
string 类型的总内存占用则包罗结构体的开销以及字符串数据的字节数,切片的实际内存占用包罗切片结构体的大小和实际存储的数据的字节数。
从内存管理的角度来看,groupcache 的内存计算确实是比力粗略的。groupcache 主要依赖于键值对的字节长度(即 len(key) + value.Len())来估算缓存占用的大小。然而,这种计算并不包罗其他与内存管理相关的开销,例如 Go 运行时分配的元数据、切片头部或字符串指针的大小,以及潜伏的内存对齐和额外的内存分配。
特殊是:

Redis 的内存计算相对比 groupcache 更加复杂和精确,因为 Redis 是专门为高效内存管理计划的内存数据库。它会尝试尽可能精确地跟踪每个键值对的内存占用,以确保在高负载下能够有用使用内存。
Redis 通过一些内存优化策略来只管淘汰内存占用:

Redis 的内存计算是非常精确的,考虑了多方面的因素,如数据的编码方式、数据结构的开销、内存碎片和分配器的影响。此外,Redis 通过多种内存优化策略来最大化内存使用服从。因此,相较于 groupcache 的粗略估算,Redis 能更正确地计算和管理内存。
reflect+unsafe计算内存消耗

unsafe.Sizeof 可以用来精确计算 Go 中某个变量或类型的大小。它返回的值是类型在内存中的占用大小(以字节为单元)。
  1. package main
  2. import (
  3.     "fmt"
  4.     "unsafe"
  5. )
  6. func main() {
  7.     var i int
  8.     fmt.Println("Size of int:", unsafe.Sizeof(i)) // 输出int类型的字节大小
  9.     var s string
  10.     fmt.Println("Size of string:", unsafe.Sizeof(s)) // 输出string类型的字节大小
  11. }
复制代码
但unsafe.Sizeof 只会计算类型的直接内存大小,不包罗底层引用类型(如切片、映射、字符串等)的间接内存大小。
reflect 包可以用来查抄 Go 变量的类型信息,并通过遍历复杂数据结构(如结构体、切片、映射等)来计算其内存使用。
刚好加上unsafe包就可以得到以下函数:
  1. package main
  2. import (
  3.     "fmt"
  4.     "reflect"
  5.     "unsafe"
  6. )
  7. func sizeOf(v interface{}) uintptr {
  8.     val := reflect.ValueOf(v)
  9.     return calculateSize(val)
  10. }
  11. func calculateSize(v reflect.Value) uintptr {
  12.     switch v.Kind() {
  13.     case reflect.Ptr, reflect.Interface:
  14.         if v.IsNil() {
  15.             return 0
  16.         }
  17.         return unsafe.Sizeof(v.Pointer()) + calculateSize(v.Elem())
  18.     case reflect.Slice, reflect.Array:
  19.         size := uintptr(0)
  20.         for i := 0; i < v.Len(); i++ {
  21.             size += calculateSize(v.Index(i))
  22.         }
  23.         return size
  24.     case reflect.Struct:
  25.         size := uintptr(0)
  26.         for i := 0; i < v.NumField(); i++ {
  27.             size += calculateSize(v.Field(i))
  28.         }
  29.         return size
  30.     default:
  31.         return unsafe.Sizeof(v.Interface())
  32.     }
  33. }
  34. func main() {
  35.     type Person struct {
  36.         Name string
  37.         Age  int
  38.     }
  39.     p := Person{Name: "Alice", Age: 30}
  40.     fmt.Printf("Memory size: %d bytes\n", sizeOf(p))
  41. }
复制代码
对于一个字符串s:


因此size+len可以粗略的估计string和bytes占用的内存,不过要计算字符串在内存中的总占用,则还需要考虑其他因素,好比堆栈的内存管理和对齐题目。
runtime.MemStats计算内存

Go 的 runtime 包提供了对 Go 运行时信息的访问,包罗内存分配、垃圾接纳等统计信息。你可以通过 runtime.MemStats 获取步调的团体内存使用环境:
  1. package main
  2. import (
  3.     "fmt"
  4.     "runtime"
  5. )
  6. func printMemUsage() {
  7.     var m runtime.MemStats
  8.     runtime.ReadMemStats(&m)
  9.     fmt.Printf("Alloc = %v MiB", m.Alloc / 1024 / 1024)
  10.     fmt.Printf("\tTotalAlloc = %v MiB", m.TotalAlloc / 1024 / 1024)
  11.     fmt.Printf("\tSys = %v MiB", m.Sys / 1024 / 1024)
  12.     fmt.Printf("\tNumGC = %v\n", m.NumGC)
  13. }
  14. func main() {
  15.     printMemUsage()
  16.     // 模拟一些内存分配
  17.     var mem []byte
  18.     for i := 0; i < 10; i++ {
  19.         mem = append(mem, make([]byte, 10*1024*1024)...)
  20.         printMemUsage()
  21.     }
  22. }
复制代码

runtime.MemStats 是 Go 标准库中用于获取内存使用统计信息的结构体,包含了 Go 运行时内存分配、垃圾接纳等方面的具体信息。这些信息对理解应用步调的内存占用、调优和诊断性能题目非常有资助。
下面是 runtime.MemStats 的主要字段及其寄义:
根本分配统计
堆统计
栈统计
垃圾接纳器统计
其他统计
比力关注的字段有三个,Alloc,HeapAlloc,Sys,具体对比这三者:
小结:

综上可以用Alloc去评估内存的使用环境,但groupcache大概是考虑到其开销而没有用这个方法。对于go cache如何更细粒度的控制内存逐出,欢迎大佬们增补。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4