GO实现Redis:GO实现内存数据库(3)

打印 上一主题 下一主题

主题 666|帖子 666|积分 1998


datastruct/dict/dict.go
  1. type Consumer func(key string, val interface{}) bool
  2. type Dict interface {
  3.    Get(key string) (val interface{}, exists bool)
  4.    Len() int
  5.    Put(key string, val interface{}) (result int)
  6.    PutIfAbsent(key string, val interface{}) (result int)
  7.    PutIfExists(key string, val interface{}) (result int)
  8.    Remove(key string) (result int)
  9.    ForEach(consumer Consumer)
  10.    Keys() []string
  11.    RandomKeys(limit int) []string
  12.    RandomDistinctKeys(limit int) []string
  13.    Clear()
  14. }
复制代码
Dict接口:Redis数据结构的接口。这里我们使用sync.Map作为字典的实现,如果想用别的数据结构,换一个实现即可
Consumer:遍历字典所有的键值对,返回值是布尔,true继续遍历,false停止遍历


datastruct/dict/sync_dict.go
  1. type SyncDict struct {
  2.    m sync.Map
  3. }
  4. func MakeSyncDict() *SyncDict {
  5.    return &SyncDict{}
  6. }
  7. func (dict *SyncDict) Get(key string) (val interface{}, exists bool) {
  8.    val, ok := dict.m.Load(key)
  9.    return val, ok
  10. }
  11. func (dict *SyncDict) Len() int {
  12.    length := 0
  13.    dict.m.Range(func(k, v interface{}) bool {
  14.       length++
  15.       return true
  16.    })
  17.    return length
  18. }
  19. func (dict *SyncDict) Put(key string, val interface{}) (result int) {
  20.    _, existed := dict.m.Load(key)
  21.    dict.m.Store(key, val)
  22.    if existed {
  23.       return 0
  24.    }
  25.    return 1
  26. }
  27. func (dict *SyncDict) PutIfAbsent(key string, val interface{}) (result int) {
  28.    _, existed := dict.m.Load(key)
  29.    if existed {
  30.       return 0
  31.    }
  32.    dict.m.Store(key, val)
  33.    return 1
  34. }
  35. func (dict *SyncDict) PutIfExists(key string, val interface{}) (result int) {
  36.    _, existed := dict.m.Load(key)
  37.    if existed {
  38.       dict.m.Store(key, val)
  39.       return 1
  40.    }
  41.    return 0
  42. }
  43. func (dict *SyncDict) Remove(key string) (result int) {
  44.    _, existed := dict.m.Load(key)
  45.    dict.m.Delete(key)
  46.    if existed {
  47.       return 1
  48.    }
  49.    return 0
  50. }
  51. func (dict *SyncDict) ForEach(consumer Consumer) {
  52.    dict.m.Range(func(key, value interface{}) bool {
  53.       consumer(key.(string), value)
  54.       return true
  55.    })
  56. }
  57. func (dict *SyncDict) Keys() []string {
  58.    result := make([]string, dict.Len())
  59.    i := 0
  60.    dict.m.Range(func(key, value interface{}) bool {
  61.       result[i] = key.(string)
  62.       i++
  63.       return true
  64.    })
  65.    return result
  66. }
  67. func (dict *SyncDict) RandomKeys(limit int) []string {
  68.    result := make([]string, limit)
  69.    for i := 0; i < limit; i++ {
  70.       dict.m.Range(func(key, value interface{}) bool {
  71.          result[i] = key.(string)
  72.          return false
  73.       })
  74.    }
  75.    return result
  76. }
  77. func (dict *SyncDict) RandomDistinctKeys(limit int) []string {
  78.    result := make([]string, limit)
  79.    i := 0
  80.    dict.m.Range(func(key, value interface{}) bool {
  81.       result[i] = key.(string)
  82.       i++
  83.       if i == limit {
  84.          return false
  85.       }
  86.       return true
  87.    })
  88.    return result
  89. }
  90. func (dict *SyncDict) Clear() {
  91.    *dict = *MakeSyncDict()
  92. }
复制代码
使用sync.Map实现Dict接口


database/db.go
  1. type DB struct {
  2.         index int
  3.         data  dict.Dict
  4. }
  5. type ExecFunc func(db *DB, args [][]byte) resp.Reply
  6. type CmdLine = [][]byte
  7. func makeDB() *DB {
  8.         db := &DB{
  9.                 data: dict.MakeSyncDict(),
  10.         }
  11.         return db
  12. }
  13. func (db *DB) Exec(c resp.Connection, cmdLine [][]byte) resp.Reply {
  14.         cmdName := strings.ToLower(string(cmdLine[0]))
  15.         cmd, ok := cmdTable[cmdName]
  16.         if !ok {
  17.                 return reply.MakeErrReply("ERR unknown command '" + cmdName + "'")
  18.         }
  19.         if !validateArity(cmd.arity, cmdLine) {
  20.                 return reply.MakeArgNumErrReply(cmdName)
  21.         }
  22.         fun := cmd.executor
  23.         return fun(db, cmdLine[1:]) // 把 set k v 中的set切掉
  24. }
  25. func validateArity(arity int, cmdArgs [][]byte) bool {
  26.         argNum := len(cmdArgs)
  27.         if arity >= 0 {
  28.                 return argNum == arity
  29.         }
  30.         return argNum >= -arity
  31. }
  32. func (db *DB) GetEntity(key string) (*database.DataEntity, bool) {
  33.         raw, ok := db.data.Get(key)
  34.         if !ok {
  35.                 return nil, false
  36.         }
  37.         entity, _ := raw.(*database.DataEntity)
  38.         return entity, true
  39. }
  40. func (db *DB) PutEntity(key string, entity *database.DataEntity) int {
  41.         return db.data.Put(key, entity)
  42. }
  43. func (db *DB) PutIfExists(key string, entity *database.DataEntity) int {
  44.         return db.data.PutIfExists(key, entity)
  45. }
  46. func (db *DB) PutIfAbsent(key string, entity *database.DataEntity) int {
  47.         return db.data.PutIfAbsent(key, entity)
  48. }
  49. func (db *DB) Remove(key string) {
  50.         db.data.Remove(key)
  51. }
  52. func (db *DB) Removes(keys ...string) (deleted int) {
  53.         deleted = 0
  54.         for _, key := range keys {
  55.                 _, exists := db.data.Get(key)
  56.                 if exists {
  57.                         db.Remove(key)
  58.                         deleted++
  59.                 }
  60.         }
  61.         return deleted
  62. }
  63. func (db *DB) Flush() {
  64.         db.data.Clear()
  65. }
复制代码
实现Redis中的分数据库
ExecFunc:所有Redis的指令都写成这样的类型
validateArity方法:

  • 定长:set k v => arity=3;
  • 变长:exists k1 k2 k3 ... => arity=-2,表示参数>=2个
database/command.go
  1. var cmdTable = make(map[string]*command)
  2. type command struct {
  3.    executor ExecFunc
  4.    arity    int
  5. }
  6. func RegisterCommand(name string, executor ExecFunc, arity int) {
  7.    name = strings.ToLower(name)
  8.    cmdTable[name] = &command{
  9.       executor: executor,
  10.       arity:    arity,
  11.    }
  12. }
复制代码
command:每一个command结构体都是一个指令,例如ping,keys等等
arity:参数数量
cmdTable:记录所有指令和command结构体的关系
RegisterCommand:注册指令的实现,在程序


database/ping.go
  1. func Ping(db *DB, args [][]byte) resp.Reply {
  2.     if len(args) == 0 {
  3.         return &reply.PongReply{}
  4.     } else if len(args) == 1 {
  5.         return reply.MakeStatusReply(string(args[0]))
  6.     } else {
  7.         return reply.MakeErrReply("ERR wrong number of arguments for 'ping' command")
  8.     }
  9. }
  10. func init() {
  11.     RegisterCommand("ping", Ping, 1)
  12. }
复制代码
init方法:在启动程序时就会调用这个方法,用于初始化


database/keys.go
  1. func execDel(db *DB, args [][]byte) resp.Reply {
  2.    keys := make([]string, len(args))
  3.    for i, v := range args {
  4.       keys[i] = string(v)
  5.    }
  6.    deleted := db.Removes(keys...)
  7.    return reply.MakeIntReply(int64(deleted))
  8. }
  9. func execExists(db *DB, args [][]byte) resp.Reply {
  10.    result := int64(0)
  11.    for _, arg := range args {
  12.       key := string(arg)
  13.       _, exists := db.GetEntity(key)
  14.       if exists {
  15.          result++
  16.       }
  17.    }
  18.    return reply.MakeIntReply(result)
  19. }
  20. func execFlushDB(db *DB, args [][]byte) resp.Reply {
  21.    db.Flush()
  22.    return &reply.OkReply{}
  23. }
  24. func execType(db *DB, args [][]byte) resp.Reply {
  25.    key := string(args[0])
  26.    entity, exists := db.GetEntity(key)
  27.    if !exists {
  28.       return reply.MakeStatusReply("none")
  29.    }
  30.    switch entity.Data.(type) {
  31.    case []byte:
  32.       return reply.MakeStatusReply("string")
  33.    }
  34.    return &reply.UnknownErrReply{}
  35. }
  36. func execRename(db *DB, args [][]byte) resp.Reply {
  37.    if len(args) != 2 {
  38.       return reply.MakeErrReply("ERR wrong number of arguments for 'rename' command")
  39.    }
  40.    src := string(args[0])
  41.    dest := string(args[1])
  42.    
  43.    entity, ok := db.GetEntity(src)
  44.    if !ok {
  45.       return reply.MakeErrReply("no such key")
  46.    }
  47.    db.PutEntity(dest, entity)
  48.    db.Remove(src)
  49.    return &reply.OkReply{}
  50. }
  51. func execRenameNx(db *DB, args [][]byte) resp.Reply {
  52.    src := string(args[0])
  53.    dest := string(args[1])
  54.    _, exist := db.GetEntity(dest)
  55.    if exist {
  56.       return reply.MakeIntReply(0)
  57.    }
  58.    entity, ok := db.GetEntity(src)
  59.    if !ok {
  60.       return reply.MakeErrReply("no such key")
  61.    }
  62.    db.Removes(src, dest)
  63.    db.PutEntity(dest, entity)
  64.    return reply.MakeIntReply(1)
  65. }
  66. func execKeys(db *DB, args [][]byte) resp.Reply {
  67.    pattern := wildcard.CompilePattern(string(args[0]))
  68.    result := make([][]byte, 0)
  69.    db.data.ForEach(func(key string, val interface{}) bool {
  70.       if pattern.IsMatch(key) {
  71.          result = append(result, []byte(key))
  72.       }
  73.       return true
  74.    })
  75.    return reply.MakeMultiBulkReply(result)
  76. }
  77. func init() {
  78.    RegisterCommand("Del", execDel, -2)
  79.    RegisterCommand("Exists", execExists, -2)
  80.    RegisterCommand("Keys", execKeys, 2)
  81.    RegisterCommand("FlushDB", execFlushDB, -1)
  82.    RegisterCommand("Type", execType, 2)
  83.    RegisterCommand("Rename", execRename, 3)
  84.    RegisterCommand("RenameNx", execRenameNx, 3)
  85. }
复制代码
keys.go实现以下指令:
execDel:del k1 k2 k3 ...
execExists:exist k1 k2 k3 ...
execFlushDB:flushdb
execType:type k1
execRename:rename k1 k2
execRenameNx:renamenx k1 k2
execKeys:keys(依赖lib包的工具类wildcard.go)


database/string.go
  1. func execGet(db *DB, args [][]byte) resp.Reply {
  2.    key := string(args[0])
  3.    bytes, err := db.getAsString(key)
  4.    if err != nil {
  5.       return err
  6.    }
  7.    if bytes == nil {
  8.       return &reply.NullBulkReply{}
  9.    }
  10.    return reply.MakeBulkReply(bytes)
  11. }
  12. func (db *DB) getAsString(key string) ([]byte, reply.ErrorReply) {
  13.    entity, ok := db.GetEntity(key)
  14.    if !ok {
  15.       return nil, nil
  16.    }
  17.    bytes, ok := entity.Data.([]byte)
  18.    if !ok {
  19.       return nil, &reply.WrongTypeErrReply{}
  20.    }
  21.    return bytes, nil
  22. }
  23. func execSet(db *DB, args [][]byte) resp.Reply {
  24.    key := string(args[0])
  25.    value := args[1]
  26.    entity := &database.DataEntity{
  27.       Data: value,
  28.    }
  29.    db.PutEntity(key, entity)
  30.    return &reply.OkReply{}
  31. }
  32. func execSetNX(db *DB, args [][]byte) resp.Reply {
  33.    key := string(args[0])
  34.    value := args[1]
  35.    entity := &database.DataEntity{
  36.       Data: value,
  37.    }
  38.    result := db.PutIfAbsent(key, entity)
  39.    return reply.MakeIntReply(int64(result))
  40. }
  41. func execGetSet(db *DB, args [][]byte) resp.Reply {
  42.    key := string(args[0])
  43.    value := args[1]
  44.    entity, exists := db.GetEntity(key)
  45.    db.PutEntity(key, &database.DataEntity{Data: value})
  46.    if !exists {
  47.       return reply.MakeNullBulkReply()
  48.    }
  49.    old := entity.Data.([]byte)
  50.    return reply.MakeBulkReply(old)
  51. }
  52. func execStrLen(db *DB, args [][]byte) resp.Reply {
  53.    key := string(args[0])
  54.    entity, exists := db.GetEntity(key)
  55.    if !exists {
  56.       return reply.MakeNullBulkReply()
  57.    }
  58.    old := entity.Data.([]byte)
  59.    return reply.MakeIntReply(int64(len(old)))
  60. }
  61. func init() {
  62.    RegisterCommand("Get", execGet, 2)
  63.    RegisterCommand("Set", execSet, -3)
  64.    RegisterCommand("SetNx", execSetNX, 3)
  65.    RegisterCommand("GetSet", execGetSet, 3)
  66.    RegisterCommand("StrLen", execStrLen, 2)
  67. }
复制代码
string.go实现以下指令:
execGet:get k1
execSet:set k v
execSetNX:setnex k v
execGetSet:getset k v 返回旧值
execStrLen:strlen k


database/database.go
  1. type Database struct {
  2.    dbSet []*DB
  3. }
  4. func NewDatabase() *Database {
  5.    mdb := &Database{}
  6.    if config.Properties.Databases == 0 {
  7.       config.Properties.Databases = 16
  8.    }
  9.    mdb.dbSet = make([]*DB, config.Properties.Databases)
  10.    for i := range mdb.dbSet {
  11.       singleDB := makeDB()
  12.       singleDB.index = i
  13.       mdb.dbSet[i] = singleDB
  14.    }
  15.    return mdb
  16. }
  17. func (mdb *Database) Exec(c resp.Connection, cmdLine [][]byte) (result resp.Reply) {
  18.    defer func() {
  19.       if err := recover(); err != nil {
  20.          logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack())))
  21.       }
  22.    }()
  23.    cmdName := strings.ToLower(string(cmdLine[0]))
  24.    if cmdName == "select" {
  25.       if len(cmdLine) != 2 {
  26.          return reply.MakeArgNumErrReply("select")
  27.       }
  28.       return execSelect(c, mdb, cmdLine[1:])
  29.    }
  30.    dbIndex := c.GetDBIndex()
  31.    selectedDB := mdb.dbSet[dbIndex]
  32.    return selectedDB.Exec(c, cmdLine)
  33. }
  34. func execSelect(c resp.Connection, mdb *Database, args [][]byte) resp.Reply {
  35.    dbIndex, err := strconv.Atoi(string(args[0]))
  36.    if err != nil {
  37.       return reply.MakeErrReply("ERR invalid DB index")
  38.    }
  39.    if dbIndex >= len(mdb.dbSet) {
  40.       return reply.MakeErrReply("ERR DB index is out of range")
  41.    }
  42.    c.SelectDB(dbIndex)
  43.    return reply.MakeOkReply()
  44. }
  45. func (mdb *Database) Close() {
  46. }
  47. func (mdb *Database) AfterClientClose(c resp.Connection) {
  48. }
复制代码
Database:一组db的集合
Exec:执行切换db指令或者其他指令
execSelect方法:选择db(指令:select 2)


resp/handler/handler.go
  1. import (
  2.         database2 "go-redis/database"
  3. )
  4. func MakeHandler() *RespHandler {
  5.    var db database.Database
  6.    db = database2.NewDatabase()
  7.    return &RespHandler{
  8.       db: db,
  9.    }
  10. }
复制代码
修改实现协议层handler的database实现
架构小结

TCP层服务TCP的连接,然后将连接交给RESP协议层的handler,handler监听客户端的连接,将指令解析后发给管道,管道转给database层(database/database.go),核心层根据命令类型执行不同的方法,然后返回。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

滴水恩情

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表