etcd MVCC 存储结构及流程

打印 上一主题 下一主题

主题 896|帖子 896|积分 2688

什么是 MVCC

MVCC 是 Multi-Version Concurrency Control 的缩写,即多版本并发控制。它是一种并发控制的方法,用于在数据库体系中实现事务的隔离性。MVCC 是一种乐观锁机制,它通过保存数据的多个版本来实现事务的隔禽性。在 etcd 中,MVCC 是用于实现数据的版本控制的。而且可以查看历史版本的数据。
测试
  1. # 添加数据
  2. etcdctl put /test t1
  3. OK
  4. etcdctl put /test t2
  5. OK
  6. # 查看数据
  7. etcdctl get /test
  8. /test
  9. t2
  10. # 查看 json 格式数据
  11. etcdctl get /test --write-out=json
  12. # {"header":{"cluster_id":8735285696067307020,"member_id":7131777314758672153,"revision":15,"raft_term":4},"kvs":[{"key":"L3Rlc3Q=","create_revision":14,"mod_revision":15,"version":2,"value":"dDI="}],"count":1}
  13. # 查看历史版本
  14. etcdctl get /test --rev=14
  15. /test
  16. t1
复制代码
可以看到,通过 --rev 参数可以查看历史版本的数据。也就是我第一次添加的数据。那么 json 中 revision 是什么意思呢?
revision

reversion 中是 etcd 中的一个概念,它是一个递增的整数,用于标识 etcd 中的数据版本。他是一个 int64 类型。没操纵一次 etcd 数据(增,删,改),reversion 就会递增。
  1. # 删除数据
  2. etcdctl del /test
  3. 1
  4. # 查看 revision
  5. etcdctl get / -wjson
  6. # {"header":{"cluster_id":8735285696067307020,"member_id":7131777314758672153,"revision":16,"raft_term":4}}
  7. # 刚才是 15 现在是 16
  8. # 添加 /test2 数据
  9. etcdctl put /test2 t3
  10. OK
  11. # 查看 revision
  12. etcdctl get / -wjson
  13. # {"header":{"cluster_id":8735285696067307020,"member_id":7131777314758672153,"revision":17,"raft_term":4}}
复制代码
存储结构

etcd mvcc 中,维护了两个数据结构,分别是 treeindex 和 boltDB。treeindex 是一个 B 树,用于存储 key 和 revision 之间的映射关系,它主要维护在内存中。而 boltDB 是一个 key-value 数据库,用于存储 key 和 value 之间的映射关系, 它主要维护在磁盘中, 用于长期化数据,虽然 boltdb 使用了 mmap 机制,但是它还是一个磁盘数据库。
treeindex

为什么 etcd 的 treeindex 使用 B-tree 而不使用哈希表、平衡二叉树?
由于 etcd 需要范围查询,以是哈希表不适合。而且etcd 中的 key 过多,平衡二叉树的查询效率不高,以是使用 B tree。
b-tree:

在 treeindex 中,数据的每个 key 是一个 keyIndex 结构,它保存了 key 和 revision 之间的映射关系。keyIndex 结构如下:
  1. type keyIndex struct {
  2.         key         []byte // key 的值
  3.         modified    Revision // 最后一次修改的 main revision
  4.         generations []generation // 保存了 key 的历史版本 没删除一次然后添加一次就是一个 generation
  5. }
  6. type Revision struct {
  7.         // 就是 revision 的值,比如上边的 15 等
  8.         Main int64
  9.         // 子 revision 的值 主要是在事务中使用 比如事务中多个操作 那么就是 0 1 2 3 等
  10.         Sub int64
  11. }
  12. // generation 保存了 key 的历史版本
  13. type generation struct {
  14.         ver     int64 // 版本号
  15.         created Revision // 最后一次被创建的 revision
  16.         revs    []Revision // 保存了 key 的历史 revision
  17. }
复制代码
在 treeindex 中,每个 keyIndex 保存了 key 的历史版本,而且每个 keyIndex 中的 generations 保存了 key 的历史版本。而且每个 generation 中的 revs 保存了 key 的历史 revision。这样就可以实现历史版本的查询。
获取 resersion 的值
  1. func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created Revision, ver int64, err error) {
  2.         ti.RLock()
  3.         defer ti.RUnlock()
  4.         return ti.unsafeGet(key, atRev)
  5. }
  6. func (ti *treeIndex) unsafeGet(key []byte, atRev int64) (modified, created Revision, ver int64, err error) {
  7.         keyi := &keyIndex{key: key}
  8.     // 从 B 树中获取 keyIndex
  9.         if keyi = ti.keyIndex(keyi); keyi == nil {
  10.                 return Revision{}, Revision{}, 0, ErrRevisionNotFound
  11.         }
  12.     // 从 keyIndex 中获取 revision
  13.         return keyi.get(ti.lg, atRev)
  14. }
  15. func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
  16.         if ki, ok := ti.tree.Get(keyi); ok {
  17.                 return ki
  18.         }
  19.         return nil
  20. }
复制代码
[code]func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created Revision, ver int64, err error) {        if ki.isEmpty() {                lg.Panic(                        "'get' got an unexpected empty keyIndex",                        zap.String("key", string(ki.key)),                )        }    // 找到 key 的 generation        g := ki.findGeneration(atRev)        if g.isEmpty() {                return Revision{}, Revision{}, 0, ErrRevisionNotFound        }    // 从 generation 中获取 revision 找到第一次小于 atRev 的 revision        n := g.walk(func(rev Revision) bool { return rev.Main > atRev })        if n != -1 {                return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil        }        return Revision{}, Revision{}, 0, ErrRevisionNotFound}// 基本的意思就是从后往前找到第一个 revision 小于 atRev 的 generationfunc (ki *keyIndex) findGeneration(rev int64) *generation {        lastg := len(ki.generations) - 1        cg := lastg        for cg >= 0 {                if len(ki.generations[cg].revs) == 0 {                        cg--                        continue                }                g := ki.generations[cg]                if cg != lastg {            // 如果当前 generation 的末了一个 revision 小于等于 rev 那么就返回 nil                        if tomb := g.revs[len(g.revs)-1].Main; tomb

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

玛卡巴卡的卡巴卡玛

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表