【Go | 从0实现简朴分布式缓存】-3:分布式节点通信

打印 上一主题 下一主题

主题 844|帖子 844|积分 2532



  
本文为极客兔兔动手写分布式缓存GeeCache学习条记。
一、通信流程

在前面一节中,已经为 HTTPPool 实现了服务端功能,通信不但需要服务端还需要客户端,因此本节来实现 HTTPPool 客户端的功能。
最开始我们的流程定义如下,前面已经实现了1和3,那么现在需要实现2这个点。

现在我们进一步来细化2这个点的步调,来看看是怎么实现的。

二、peers.go

首先抽象出 2 个接口,PeerPicker 的 PickPeer() 方法用于根据传入的 key 选择相应节点 PeerGetter。
接口 PeerGetter 的 Get() 方法用于从对应 group 查找缓存值。PeerGetter 就对应于上述流程中的 HTTP 客户端。
  1. type PeerPicker interface {
  2.         PickPeer(key string) (peer PeerGetter, ok bool)
  3.         //根据传进来的键,去选择相对应的接口(就是调用一致性哈希算法)。
  4. }
  5. // PeerGetter is the interface that must be implemented by a peer.
  6. type PeerGetter interface {
  7.         Get(group string, key string) ([]byte, error)
  8.         //从客户端去请求其他节点的对应的值的过程。
  9. }
复制代码
三、http.go

首先需要编写Peers的Set函数,也就是节点的注册函数。
先初始化一个一致性哈希环,将传入的节点地点添加到哈希环中。为每个节点创建一个 httpGetter 对象,并将它们存储到一个映射中,以便后续通过节点地点快速访问对应的 Getter 对象。整个过程是线程安全的,通过互斥锁掩护共享资源的并发访问。
一句话来表现就是:Set() 方法实例化了一致性哈希算法,并且添加了传入的节点。并为每一个节点创建了一个 HTTP 客户端 httpGetter。
peers ...string 是方法的参数,表现一个可变数目的字符串切片。调用时可以传入恣意数目的字符串作为参数,这些字符串代表 HTTP 节点的地点。
  1. func (p *HTTPPool) Set(peers ...string) {
  2.         p.mu.Lock()
  3.         defer p.mu.Unlock()
  4.         p.peers = consistenthash.New(defaultReplicas, nil)
  5.         p.peers.Add(peers...)
  6.         p.httpGetters = make(map[string]*httpGetter, len(peers))
  7.         for _, peer := range peers {
  8.                 p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath}
  9.         }
  10. }
复制代码
consistenthash.New 是一致性哈希库的构造函数,用于创建一个新的一致性哈希环。
defaultReplicas 是一个常量,表现每个节点在哈希环上的假造副本数目。一致性哈希通过假造节点(副本)来提高负载均衡的效果。
nil 是一致性哈希的哈希函数参数,这里使用默认的哈希函数。
p.peers.Add 方法会将每个节点添加到一致性哈希环中。一致性哈希环会根据节点的哈希值将它们分布在环上。
p.httpGetters 是一个映射(map),用于存储每个节点的 HTTP Getter 对象。
make(map[string]*httpGetter, len(peers)) 创建了一个映射,键是节点地点(string),值是指向 httpGetter 的指针。
for _, peer := range peers 遍历传入的 peers 切片,peer 是当前节点的地点。p.httpGetters[peer] 是将节点地点作为键存储到映射中。&httpGetter{baseURL: peer + p.basePath} 创建了一个 httpGetter 对象,并将其地点存储到映射中。也就是peer 是节点的地点。p.basePath 是 HTTPPool 中定义的一个字段,表现 HTTP 哀求的路径前缀。peer + p.basePath 拼接成完备的 HTTP 哀求的基地点。

然后是Pickeer函数。PickerPeer() 包装了一致性哈希算法的 Get() 方法,根据具体的 key,选择节点peer,并返回该节点对应的 HTTP 客户端,也就是PeerGetter,bool就代表是否成功的找到对应的节点。
p.peers.Get(key):调用一致性哈希环的 Get 方法,根据输入的 key 选择一个节点。peer != "":检查返回的节点地点是否为空。如果为空,说明没有找到合适的节点。peer != p.self:检查返回的节点是否是当前节点(p.self)。如果返回的节点是当前节点,说明不需要跨节点远程获取数据,可以直接跳过。
  1. func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) {
  2.         p.mu.Lock()
  3.         defer p.mu.Unlock()
  4.         if peer := p.peers.Get(key); peer != "" && peer != p.self {
  5.                 p.Log("Pick peer %s", peer)
  6.                 return p.httpGetters[peer], true
  7.         }
  8.         return nil, false
  9. }
复制代码
四、geecache.go

在geecache.go中需要添加方法。
首先,新增 getFromPeer() 方法,使用实现了 PeerGetter 接口的 httpGetter 从访问远程节点,获取缓存值,并将其封装为一个 ByteView 类型返回。。
  1. func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
  2.         bytes, err := peer.Get(g.name, key)
  3.         if err != nil {
  4.                 return ByteView{}, err
  5.         }
  6.         return ByteView{b: bytes}, nil
  7. }
复制代码
原本的load()函数非常简朴,是直接调用getLocally()函数,现在 需要改进。
首先实行从分布式缓存体系中加载指定键(key)的值。它首先实行从远程节点加载数据,也就是getFromPeer这个方法会帮我们去调用peer的Get()方法,如果失败,则退回到当地加载,也就是getLocally(key)。
  1. func (g *Group) load(key string) (value ByteView, err error) {
  2.         if g.peers != nil {
  3.                 if peer, ok := g.peers.PickPeer(key); ok {
  4.                         if value, err = g.getFromPeer(peer, key); err == nil {
  5.                                 return value, nil
  6.                         }
  7.                         log.Println("[GeeCache] Failed to get from peer", err)
  8.                 }
  9.         }
  10.         return g.getLocally(key)
  11. }
复制代码

接下来又到了调用http.go中的Get方法。
首先创建具体的 HTTP 客户端类 httpGetter,实现 PeerGetter 接口。
baseURL 表现将要访问的远程节点的地点,例如 http://example.com/_geecache/。
使用 http.Get() 方式获取返回值(http.Get(u)是发送一个 HTTP GET 哀求到构造好的 URL),并转换为 []bytes 类型。
  1. type httpGetter struct {
  2.         baseURL string
  3. }
  4. func (h *httpGetter) Get(group string, key string) ([]byte, error) {
  5.         u := fmt.Sprintf(
  6.                 "%v%v/%v",
  7.                 h.baseURL,
  8.                 url.QueryEscape(group),
  9.                 url.QueryEscape(key),
  10.         )
  11.         res, err := http.Get(u)
  12.         if err != nil {
  13.                 return nil, err
  14.         }
  15.         defer res.Body.Close()
  16.         if res.StatusCode != http.StatusOK {
  17.                 return nil, fmt.Errorf("server returned: %v", res.Status)
  18.         }
  19.         bytes, err := ioutil.ReadAll(res.Body)
  20.         if err != nil {
  21.                 return nil, fmt.Errorf("reading response body: %v", err)
  22.         }
  23.         return bytes, nil
  24. }
  25. var _ PeerGetter = (*httpGetter)(nil)
复制代码
接下来讲讲具体代码的作用:
url.QueryEscape(group) 和 url.QueryEscape(key):使用 url.QueryEscape 函数对缓存组名称和键进行 URL 编码,确保它们可以安全地嵌入到 URL 中。
  1. var _ PeerGetter = (*httpGetter)(nil)
复制代码
这行代码就比较熟悉了:通过类型断言,用于确保 *httpGetter 类型实现了 PeerGetter 接口。
也就是通过将 (*httpGetter)(nil) 赋值给 PeerGetter 类型的变量,逼迫编译器检查 *httpGetter 是否实现了 PeerGetter 接口。

以是在geecache.go中,我们一共实现需要添加下面的代码。
  1. // A Group is a cache namespace and associated data loaded spread overtype Group struct {        name      string        getter    Getter        mainCache cache        peers     PeerPicker}// RegisterPeers registers a PeerPicker for choosing remote peerfunc (g *Group) RegisterPeers(peers PeerPicker) {        if g.peers != nil {                panic("RegisterPeerPicker called more than once")        }        g.peers = peers}func (g *Group) load(key string) (value ByteView, err error) {
  2.         if g.peers != nil {
  3.                 if peer, ok := g.peers.PickPeer(key); ok {
  4.                         if value, err = g.getFromPeer(peer, key); err == nil {
  5.                                 return value, nil
  6.                         }
  7.                         log.Println("[GeeCache] Failed to get from peer", err)
  8.                 }
  9.         }
  10.         return g.getLocally(key)
  11. }
  12. func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
  13.         bytes, err := peer.Get(g.name, key)
  14.         if err != nil {
  15.                 return ByteView{}, err
  16.         }
  17.         return ByteView{b: bytes}, nil
  18. }
复制代码
新增 RegisterPeers() 方法,将 实现了 PeerPicker 接口的 HTTPPool 注入到 Group 中。
新增 getFromPeer() 方法,使用实现了 PeerGetter 接口的 httpGetter 从访问远程节点,获取缓存值。
修改 load 方法,使用 PickPeer() 方法选择节点,若非本机节点,则调用 getFromPeer() 从远程获取。若是本机节点或失败,则回退到 getLocally()。
五、测试代码

测试总体代码如下:
  1. var db = map[string]string{
  2.         "Tom":  "630",
  3.         "Jack": "589",
  4.         "Sam":  "567",
  5. }
  6. func createGroup() *geecache.Group {
  7.         return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
  8.                 func(key string) ([]byte, error) {
  9.                         log.Println("[SlowDB] search key", key)
  10.                         if v, ok := db[key]; ok {
  11.                                 return []byte(v), nil
  12.                         }
  13.                         return nil, fmt.Errorf("%s not exist", key)
  14.                 }))
  15. }
  16. func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
  17.         peers := geecache.NewHTTPPool(addr)
  18.         peers.Set(addrs...)
  19.         gee.RegisterPeers(peers)
  20.         log.Println("geecache is running at", addr)
  21.         log.Fatal(http.ListenAndServe(addr[7:], peers))
  22. }
  23. func startAPIServer(apiAddr string, gee *geecache.Group) {
  24.         http.Handle("/api", http.HandlerFunc(
  25.                 func(w http.ResponseWriter, r *http.Request) {
  26.                         key := r.URL.Query().Get("key")
  27.                         view, err := gee.Get(key)
  28.                         if err != nil {
  29.                                 http.Error(w, err.Error(), http.StatusInternalServerError)
  30.                                 return
  31.                         }
  32.                         w.Header().Set("Content-Type", "application/octet-stream")
  33.                         w.Write(view.ByteSlice())
  34.                 }))
  35.         log.Println("fontend server is running at", apiAddr)
  36.         log.Fatal(http.ListenAndServe(apiAddr[7:], nil))
  37. }
  38. func main() {
  39.         var port int
  40.         var api bool
  41.         flag.IntVar(&port, "port", 8001, "Geecache server port")
  42.         flag.BoolVar(&api, "api", false, "Start a api server?")
  43.         flag.Parse()
  44.         apiAddr := "http://localhost:9999"
  45.         addrMap := map[int]string{
  46.                 8001: "http://localhost:8001",
  47.                 8002: "http://localhost:8002",
  48.                 8003: "http://localhost:8003",
  49.         }
  50.         var addrs []string
  51.         for _, v := range addrMap {
  52.                 addrs = append(addrs, v)
  53.         }
  54.         gee := createGroup()
  55.         if api {
  56.                 go startAPIServer(apiAddr, gee)
  57.         }
  58.         startCacheServer(addrMap[port], addrs, gee)
  59. }
复制代码
接下来挨个讲讲对应的功能。
定义了一个模仿的数据库,使用一个字符串到字符串的映射来存储键值对。它模仿了后端存储,用于在缓存未命中时提供数据。
也就是提供Getter回调函数,方便在没有找到数据的时候来返回。
  1. var db = map[string]string{
  2.         "Tom":  "630",
  3.         "Jack": "589",
  4.         "Sam":  "567",
  5. }
复制代码

创建了一个缓存组,名为 “scores”,缓存大小为 2KB。它使用了一个自定义的 GetterFunc,当缓存未命中时,会从模仿数据库 db(也就是上面的当地db模仿数据库) 中获取数据。如果键不存在,则返回错误。
  1. func createGroup() *geecache.Group {
  2.         return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
  3.                 func(key string) ([]byte, error) {
  4.                         log.Println("[SlowDB] search key", key)
  5.                         if v, ok := db[key]; ok {
  6.                                 return []byte(v), nil
  7.                         }
  8.                         return nil, fmt.Errorf("%s not exist", key)
  9.                 }))
  10. }
复制代码

启动了一个缓存服务器,监听指定的地点 addr。它创建了一个 HTTP 节点池 peers,并将全部节点地点 addrs 添加到池中。然后,它将节点池注册到缓存组 gee 中,并启动 HTTP 服务以监听缓存哀求。
  1. func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
  2.         peers := geecache.NewHTTPPool(addr)
  3.         peers.Set(addrs...)
  4.         gee.RegisterPeers(peers)
  5.         log.Println("geecache is running at", addr)
  6.         log.Fatal(http.ListenAndServe(addr[7:], peers))
  7. }
复制代码

启动了一个 API 服务器,监听指定的地点 apiAddr。它为 /api 路径提供了一个 HTTP 处理函数,该函数从哀求中提取键 key,并从缓存组 gee 中获取数据。如果获取成功,它将数据作为相应返回;如果失败,则返回错误。
  1. func startAPIServer(apiAddr string, gee *geecache.Group) {
  2.         http.Handle("/api", http.HandlerFunc(
  3.                 func(w http.ResponseWriter, r *http.Request) {
  4.                         key := r.URL.Query().Get("key")
  5.                         view, err := gee.Get(key)
  6.                         if err != nil {
  7.                                 http.Error(w, err.Error(), http.StatusInternalServerError)
  8.                                 return
  9.                         }
  10.                         w.Header().Set("Content-Type", "application/octet-stream")
  11.                         w.Write(view.ByteSlice())
  12.                 }))
  13.         log.Println("fontend server is running at", apiAddr)
  14.         log.Fatal(http.ListenAndServe(apiAddr[7:], nil))
  15. }
复制代码

通过下令行参数解析来设置缓存服务器的端口和是否启动 API 服务器。它定义了一个地点映射 addrMap,用于存储全部缓存节点的地点。然后,它创建了一个缓存组 gee,并根据参数决定是否启动 API 服务器。最后,它启动一个缓存服务器,监听指定的端口。
  1. func main() {
  2.         var port int
  3.         var api bool
  4.         flag.IntVar(&port, "port", 8001, "Geecache server port")
  5.         flag.BoolVar(&api, "api", false, "Start a api server?")
  6.         flag.Parse()
  7.         apiAddr := "http://localhost:9999"
  8.         addrMap := map[int]string{
  9.                 8001: "http://localhost:8001",
  10.                 8002: "http://localhost:8002",
  11.                 8003: "http://localhost:8003",
  12.         }
  13.         var addrs []string
  14.         for _, v := range addrMap {
  15.                 addrs = append(addrs, v)
  16.         }
  17.         gee := createGroup()
  18.         if api {
  19.                 go startAPIServer(apiAddr, gee)
  20.         }
  21.         startCacheServer(addrMap[port], addrs, gee)
  22. }
复制代码
也就是总的来说,startCacheServer() 用来启动缓存服务器:创建 HTTPPool,添加节点信息,注册到 gee 中,启动 HTTP 服务(共3个端口,8001/8002/8003),用户不感知。
startAPIServer() 用来启动一个 API 服务(端口 9999),与用户进行交互,用户感知。
main() 函数需要下令行传入 port 和 api 2 个参数,用来在指定端口启动 HTTP 服务。

测试会发现一个显着的问题,就是测试的时候,并发了 3 个哀求 ?key=Tom,从日志中可以看到,三次均选择了节点 8001,这是一致性哈希算法的功劳。但是有一个问题在于,同时向 8001 发起了 3 次哀求。
如果有 10 万个在并发哀求该数据呢?那就会向 8001 同时发起 10 万次哀求,如果 8001 又同时向数据库发起 10 万次查询哀求,很容易导致缓存被击穿。
三次哀求的效果是一致的,对于相同的 key,能不能只向 8001 发起一次哀求?这就是需要优化的点,也就是防止缓存击穿。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

八卦阵

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表