本文为极客兔兔动手写分布式缓存GeeCache学习条记。
一、通信流程
在前面一节中,已经为 HTTPPool 实现了服务端功能,通信不但需要服务端还需要客户端,因此本节来实现 HTTPPool 客户端的功能。
最开始我们的流程定义如下,前面已经实现了1和3,那么现在需要实现2这个点。
现在我们进一步来细化2这个点的步调,来看看是怎么实现的。
二、peers.go
首先抽象出 2 个接口,PeerPicker 的 PickPeer() 方法用于根据传入的 key 选择相应节点 PeerGetter。
接口 PeerGetter 的 Get() 方法用于从对应 group 查找缓存值。PeerGetter 就对应于上述流程中的 HTTP 客户端。
- type PeerPicker interface {
- PickPeer(key string) (peer PeerGetter, ok bool)
- //根据传进来的键,去选择相对应的接口(就是调用一致性哈希算法)。
- }
- // PeerGetter is the interface that must be implemented by a peer.
- type PeerGetter interface {
- Get(group string, key string) ([]byte, error)
- //从客户端去请求其他节点的对应的值的过程。
- }
复制代码 三、http.go
首先需要编写Peers的Set函数,也就是节点的注册函数。
先初始化一个一致性哈希环,将传入的节点地点添加到哈希环中。为每个节点创建一个 httpGetter 对象,并将它们存储到一个映射中,以便后续通过节点地点快速访问对应的 Getter 对象。整个过程是线程安全的,通过互斥锁掩护共享资源的并发访问。
一句话来表现就是:Set() 方法实例化了一致性哈希算法,并且添加了传入的节点。并为每一个节点创建了一个 HTTP 客户端 httpGetter。
peers ...string 是方法的参数,表现一个可变数目的字符串切片。调用时可以传入恣意数目的字符串作为参数,这些字符串代表 HTTP 节点的地点。
- func (p *HTTPPool) Set(peers ...string) {
- p.mu.Lock()
- defer p.mu.Unlock()
- p.peers = consistenthash.New(defaultReplicas, nil)
- p.peers.Add(peers...)
- p.httpGetters = make(map[string]*httpGetter, len(peers))
- for _, peer := range peers {
- p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath}
- }
- }
复制代码 consistenthash.New 是一致性哈希库的构造函数,用于创建一个新的一致性哈希环。
defaultReplicas 是一个常量,表现每个节点在哈希环上的假造副本数目。一致性哈希通过假造节点(副本)来提高负载均衡的效果。
nil 是一致性哈希的哈希函数参数,这里使用默认的哈希函数。
p.peers.Add 方法会将每个节点添加到一致性哈希环中。一致性哈希环会根据节点的哈希值将它们分布在环上。
p.httpGetters 是一个映射(map),用于存储每个节点的 HTTP Getter 对象。
make(map[string]*httpGetter, len(peers)) 创建了一个映射,键是节点地点(string),值是指向 httpGetter 的指针。
for _, peer := range peers 遍历传入的 peers 切片,peer 是当前节点的地点。p.httpGetters[peer] 是将节点地点作为键存储到映射中。&httpGetter{baseURL: peer + p.basePath} 创建了一个 httpGetter 对象,并将其地点存储到映射中。也就是peer 是节点的地点。p.basePath 是 HTTPPool 中定义的一个字段,表现 HTTP 哀求的路径前缀。peer + p.basePath 拼接成完备的 HTTP 哀求的基地点。
然后是Pickeer函数。PickerPeer() 包装了一致性哈希算法的 Get() 方法,根据具体的 key,选择节点peer,并返回该节点对应的 HTTP 客户端,也就是PeerGetter,bool就代表是否成功的找到对应的节点。
p.peers.Get(key):调用一致性哈希环的 Get 方法,根据输入的 key 选择一个节点。peer != "":检查返回的节点地点是否为空。如果为空,说明没有找到合适的节点。peer != p.self:检查返回的节点是否是当前节点(p.self)。如果返回的节点是当前节点,说明不需要跨节点远程获取数据,可以直接跳过。
- func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) {
- p.mu.Lock()
- defer p.mu.Unlock()
- if peer := p.peers.Get(key); peer != "" && peer != p.self {
- p.Log("Pick peer %s", peer)
- return p.httpGetters[peer], true
- }
- return nil, false
- }
复制代码 四、geecache.go
在geecache.go中需要添加方法。
首先,新增 getFromPeer() 方法,使用实现了 PeerGetter 接口的 httpGetter 从访问远程节点,获取缓存值,并将其封装为一个 ByteView 类型返回。。
- func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
- bytes, err := peer.Get(g.name, key)
- if err != nil {
- return ByteView{}, err
- }
- return ByteView{b: bytes}, nil
- }
复制代码 原本的load()函数非常简朴,是直接调用getLocally()函数,现在 需要改进。
首先实行从分布式缓存体系中加载指定键(key)的值。它首先实行从远程节点加载数据,也就是getFromPeer这个方法会帮我们去调用peer的Get()方法,如果失败,则退回到当地加载,也就是getLocally(key)。
- func (g *Group) load(key string) (value ByteView, err error) {
- if g.peers != nil {
- if peer, ok := g.peers.PickPeer(key); ok {
- if value, err = g.getFromPeer(peer, key); err == nil {
- return value, nil
- }
- log.Println("[GeeCache] Failed to get from peer", err)
- }
- }
- return g.getLocally(key)
- }
复制代码 接下来又到了调用http.go中的Get方法。
首先创建具体的 HTTP 客户端类 httpGetter,实现 PeerGetter 接口。
baseURL 表现将要访问的远程节点的地点,例如 http://example.com/_geecache/。
使用 http.Get() 方式获取返回值(http.Get(u)是发送一个 HTTP GET 哀求到构造好的 URL),并转换为 []bytes 类型。
- type httpGetter struct {
- baseURL string
- }
- func (h *httpGetter) Get(group string, key string) ([]byte, error) {
- u := fmt.Sprintf(
- "%v%v/%v",
- h.baseURL,
- url.QueryEscape(group),
- url.QueryEscape(key),
- )
- res, err := http.Get(u)
- if err != nil {
- return nil, err
- }
- defer res.Body.Close()
- if res.StatusCode != http.StatusOK {
- return nil, fmt.Errorf("server returned: %v", res.Status)
- }
- bytes, err := ioutil.ReadAll(res.Body)
- if err != nil {
- return nil, fmt.Errorf("reading response body: %v", err)
- }
- return bytes, nil
- }
- var _ PeerGetter = (*httpGetter)(nil)
复制代码 接下来讲讲具体代码的作用:
url.QueryEscape(group) 和 url.QueryEscape(key):使用 url.QueryEscape 函数对缓存组名称和键进行 URL 编码,确保它们可以安全地嵌入到 URL 中。
- var _ PeerGetter = (*httpGetter)(nil)
复制代码 这行代码就比较熟悉了:通过类型断言,用于确保 *httpGetter 类型实现了 PeerGetter 接口。
也就是通过将 (*httpGetter)(nil) 赋值给 PeerGetter 类型的变量,逼迫编译器检查 *httpGetter 是否实现了 PeerGetter 接口。
以是在geecache.go中,我们一共实现需要添加下面的代码。
- // A Group is a cache namespace and associated data loaded spread overtype Group struct { name string getter Getter mainCache cache peers PeerPicker}// RegisterPeers registers a PeerPicker for choosing remote peerfunc (g *Group) RegisterPeers(peers PeerPicker) { if g.peers != nil { panic("RegisterPeerPicker called more than once") } g.peers = peers}func (g *Group) load(key string) (value ByteView, err error) {
- if g.peers != nil {
- if peer, ok := g.peers.PickPeer(key); ok {
- if value, err = g.getFromPeer(peer, key); err == nil {
- return value, nil
- }
- log.Println("[GeeCache] Failed to get from peer", err)
- }
- }
- return g.getLocally(key)
- }
- func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
- bytes, err := peer.Get(g.name, key)
- if err != nil {
- return ByteView{}, err
- }
- return ByteView{b: bytes}, nil
- }
复制代码 新增 RegisterPeers() 方法,将 实现了 PeerPicker 接口的 HTTPPool 注入到 Group 中。
新增 getFromPeer() 方法,使用实现了 PeerGetter 接口的 httpGetter 从访问远程节点,获取缓存值。
修改 load 方法,使用 PickPeer() 方法选择节点,若非本机节点,则调用 getFromPeer() 从远程获取。若是本机节点或失败,则回退到 getLocally()。
五、测试代码
测试总体代码如下:
- var db = map[string]string{
- "Tom": "630",
- "Jack": "589",
- "Sam": "567",
- }
- func createGroup() *geecache.Group {
- return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
- func(key string) ([]byte, error) {
- log.Println("[SlowDB] search key", key)
- if v, ok := db[key]; ok {
- return []byte(v), nil
- }
- return nil, fmt.Errorf("%s not exist", key)
- }))
- }
- func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
- peers := geecache.NewHTTPPool(addr)
- peers.Set(addrs...)
- gee.RegisterPeers(peers)
- log.Println("geecache is running at", addr)
- log.Fatal(http.ListenAndServe(addr[7:], peers))
- }
- func startAPIServer(apiAddr string, gee *geecache.Group) {
- http.Handle("/api", http.HandlerFunc(
- func(w http.ResponseWriter, r *http.Request) {
- key := r.URL.Query().Get("key")
- view, err := gee.Get(key)
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- w.Header().Set("Content-Type", "application/octet-stream")
- w.Write(view.ByteSlice())
- }))
- log.Println("fontend server is running at", apiAddr)
- log.Fatal(http.ListenAndServe(apiAddr[7:], nil))
- }
- func main() {
- var port int
- var api bool
- flag.IntVar(&port, "port", 8001, "Geecache server port")
- flag.BoolVar(&api, "api", false, "Start a api server?")
- flag.Parse()
- apiAddr := "http://localhost:9999"
- addrMap := map[int]string{
- 8001: "http://localhost:8001",
- 8002: "http://localhost:8002",
- 8003: "http://localhost:8003",
- }
- var addrs []string
- for _, v := range addrMap {
- addrs = append(addrs, v)
- }
- gee := createGroup()
- if api {
- go startAPIServer(apiAddr, gee)
- }
- startCacheServer(addrMap[port], addrs, gee)
- }
复制代码 接下来挨个讲讲对应的功能。
定义了一个模仿的数据库,使用一个字符串到字符串的映射来存储键值对。它模仿了后端存储,用于在缓存未命中时提供数据。
也就是提供Getter回调函数,方便在没有找到数据的时候来返回。
- var db = map[string]string{
- "Tom": "630",
- "Jack": "589",
- "Sam": "567",
- }
复制代码 创建了一个缓存组,名为 “scores”,缓存大小为 2KB。它使用了一个自定义的 GetterFunc,当缓存未命中时,会从模仿数据库 db(也就是上面的当地db模仿数据库) 中获取数据。如果键不存在,则返回错误。
- func createGroup() *geecache.Group {
- return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
- func(key string) ([]byte, error) {
- log.Println("[SlowDB] search key", key)
- if v, ok := db[key]; ok {
- return []byte(v), nil
- }
- return nil, fmt.Errorf("%s not exist", key)
- }))
- }
复制代码 启动了一个缓存服务器,监听指定的地点 addr。它创建了一个 HTTP 节点池 peers,并将全部节点地点 addrs 添加到池中。然后,它将节点池注册到缓存组 gee 中,并启动 HTTP 服务以监听缓存哀求。
- func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
- peers := geecache.NewHTTPPool(addr)
- peers.Set(addrs...)
- gee.RegisterPeers(peers)
- log.Println("geecache is running at", addr)
- log.Fatal(http.ListenAndServe(addr[7:], peers))
- }
复制代码 启动了一个 API 服务器,监听指定的地点 apiAddr。它为 /api 路径提供了一个 HTTP 处理函数,该函数从哀求中提取键 key,并从缓存组 gee 中获取数据。如果获取成功,它将数据作为相应返回;如果失败,则返回错误。
- func startAPIServer(apiAddr string, gee *geecache.Group) {
- http.Handle("/api", http.HandlerFunc(
- func(w http.ResponseWriter, r *http.Request) {
- key := r.URL.Query().Get("key")
- view, err := gee.Get(key)
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- w.Header().Set("Content-Type", "application/octet-stream")
- w.Write(view.ByteSlice())
- }))
- log.Println("fontend server is running at", apiAddr)
- log.Fatal(http.ListenAndServe(apiAddr[7:], nil))
- }
复制代码 通过下令行参数解析来设置缓存服务器的端口和是否启动 API 服务器。它定义了一个地点映射 addrMap,用于存储全部缓存节点的地点。然后,它创建了一个缓存组 gee,并根据参数决定是否启动 API 服务器。最后,它启动一个缓存服务器,监听指定的端口。
- func main() {
- var port int
- var api bool
- flag.IntVar(&port, "port", 8001, "Geecache server port")
- flag.BoolVar(&api, "api", false, "Start a api server?")
- flag.Parse()
- apiAddr := "http://localhost:9999"
- addrMap := map[int]string{
- 8001: "http://localhost:8001",
- 8002: "http://localhost:8002",
- 8003: "http://localhost:8003",
- }
- var addrs []string
- for _, v := range addrMap {
- addrs = append(addrs, v)
- }
- gee := createGroup()
- if api {
- go startAPIServer(apiAddr, gee)
- }
- startCacheServer(addrMap[port], addrs, gee)
- }
复制代码 也就是总的来说,startCacheServer() 用来启动缓存服务器:创建 HTTPPool,添加节点信息,注册到 gee 中,启动 HTTP 服务(共3个端口,8001/8002/8003),用户不感知。
startAPIServer() 用来启动一个 API 服务(端口 9999),与用户进行交互,用户感知。
main() 函数需要下令行传入 port 和 api 2 个参数,用来在指定端口启动 HTTP 服务。
测试会发现一个显着的问题,就是测试的时候,并发了 3 个哀求 ?key=Tom,从日志中可以看到,三次均选择了节点 8001,这是一致性哈希算法的功劳。但是有一个问题在于,同时向 8001 发起了 3 次哀求。
如果有 10 万个在并发哀求该数据呢?那就会向 8001 同时发起 10 万次哀求,如果 8001 又同时向数据库发起 10 万次查询哀求,很容易导致缓存被击穿。
三次哀求的效果是一致的,对于相同的 key,能不能只向 8001 发起一次哀求?这就是需要优化的点,也就是防止缓存击穿。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |