datastruct/dict/dict.go- type Consumer func(key string, val interface{}) bool
- type Dict interface {
- Get(key string) (val interface{}, exists bool)
- Len() int
- Put(key string, val interface{}) (result int)
- PutIfAbsent(key string, val interface{}) (result int)
- PutIfExists(key string, val interface{}) (result int)
- Remove(key string) (result int)
- ForEach(consumer Consumer)
- Keys() []string
- RandomKeys(limit int) []string
- RandomDistinctKeys(limit int) []string
- Clear()
- }
复制代码 Dict接口:Redis数据结构的接口。这里我们使用sync.Map作为字典的实现,如果想用别的数据结构,换一个实现即可
Consumer:遍历字典所有的键值对,返回值是布尔,true继续遍历,false停止遍历
datastruct/dict/sync_dict.go- type SyncDict struct {
- m sync.Map
- }
- func MakeSyncDict() *SyncDict {
- return &SyncDict{}
- }
- func (dict *SyncDict) Get(key string) (val interface{}, exists bool) {
- val, ok := dict.m.Load(key)
- return val, ok
- }
- func (dict *SyncDict) Len() int {
- length := 0
- dict.m.Range(func(k, v interface{}) bool {
- length++
- return true
- })
- return length
- }
- func (dict *SyncDict) Put(key string, val interface{}) (result int) {
- _, existed := dict.m.Load(key)
- dict.m.Store(key, val)
- if existed {
- return 0
- }
- return 1
- }
- func (dict *SyncDict) PutIfAbsent(key string, val interface{}) (result int) {
- _, existed := dict.m.Load(key)
- if existed {
- return 0
- }
- dict.m.Store(key, val)
- return 1
- }
- func (dict *SyncDict) PutIfExists(key string, val interface{}) (result int) {
- _, existed := dict.m.Load(key)
- if existed {
- dict.m.Store(key, val)
- return 1
- }
- return 0
- }
- func (dict *SyncDict) Remove(key string) (result int) {
- _, existed := dict.m.Load(key)
- dict.m.Delete(key)
- if existed {
- return 1
- }
- return 0
- }
- func (dict *SyncDict) ForEach(consumer Consumer) {
- dict.m.Range(func(key, value interface{}) bool {
- consumer(key.(string), value)
- return true
- })
- }
- func (dict *SyncDict) Keys() []string {
- result := make([]string, dict.Len())
- i := 0
- dict.m.Range(func(key, value interface{}) bool {
- result[i] = key.(string)
- i++
- return true
- })
- return result
- }
- func (dict *SyncDict) RandomKeys(limit int) []string {
- result := make([]string, limit)
- for i := 0; i < limit; i++ {
- dict.m.Range(func(key, value interface{}) bool {
- result[i] = key.(string)
- return false
- })
- }
- return result
- }
- func (dict *SyncDict) RandomDistinctKeys(limit int) []string {
- result := make([]string, limit)
- i := 0
- dict.m.Range(func(key, value interface{}) bool {
- result[i] = key.(string)
- i++
- if i == limit {
- return false
- }
- return true
- })
- return result
- }
- func (dict *SyncDict) Clear() {
- *dict = *MakeSyncDict()
- }
复制代码 使用sync.Map实现Dict接口
database/db.go- type DB struct {
- index int
- data dict.Dict
- }
- type ExecFunc func(db *DB, args [][]byte) resp.Reply
- type CmdLine = [][]byte
- func makeDB() *DB {
- db := &DB{
- data: dict.MakeSyncDict(),
- }
- return db
- }
- func (db *DB) Exec(c resp.Connection, cmdLine [][]byte) resp.Reply {
- cmdName := strings.ToLower(string(cmdLine[0]))
- cmd, ok := cmdTable[cmdName]
- if !ok {
- return reply.MakeErrReply("ERR unknown command '" + cmdName + "'")
- }
- if !validateArity(cmd.arity, cmdLine) {
- return reply.MakeArgNumErrReply(cmdName)
- }
- fun := cmd.executor
- return fun(db, cmdLine[1:]) // 把 set k v 中的set切掉
- }
- func validateArity(arity int, cmdArgs [][]byte) bool {
- argNum := len(cmdArgs)
- if arity >= 0 {
- return argNum == arity
- }
- return argNum >= -arity
- }
- func (db *DB) GetEntity(key string) (*database.DataEntity, bool) {
- raw, ok := db.data.Get(key)
- if !ok {
- return nil, false
- }
- entity, _ := raw.(*database.DataEntity)
- return entity, true
- }
- func (db *DB) PutEntity(key string, entity *database.DataEntity) int {
- return db.data.Put(key, entity)
- }
- func (db *DB) PutIfExists(key string, entity *database.DataEntity) int {
- return db.data.PutIfExists(key, entity)
- }
- func (db *DB) PutIfAbsent(key string, entity *database.DataEntity) int {
- return db.data.PutIfAbsent(key, entity)
- }
- func (db *DB) Remove(key string) {
- db.data.Remove(key)
- }
- func (db *DB) Removes(keys ...string) (deleted int) {
- deleted = 0
- for _, key := range keys {
- _, exists := db.data.Get(key)
- if exists {
- db.Remove(key)
- deleted++
- }
- }
- return deleted
- }
- func (db *DB) Flush() {
- db.data.Clear()
- }
复制代码 实现Redis中的分数据库
ExecFunc:所有Redis的指令都写成这样的类型
validateArity方法:
- 定长:set k v => arity=3;
- 变长:exists k1 k2 k3 ... => arity=-2,表示参数>=2个
database/command.go- var cmdTable = make(map[string]*command)
- type command struct {
- executor ExecFunc
- arity int
- }
- func RegisterCommand(name string, executor ExecFunc, arity int) {
- name = strings.ToLower(name)
- cmdTable[name] = &command{
- executor: executor,
- arity: arity,
- }
- }
复制代码 command:每一个command结构体都是一个指令,例如ping,keys等等
arity:参数数量
cmdTable:记录所有指令和command结构体的关系
RegisterCommand:注册指令的实现,在程序
database/ping.go- func Ping(db *DB, args [][]byte) resp.Reply {
- if len(args) == 0 {
- return &reply.PongReply{}
- } else if len(args) == 1 {
- return reply.MakeStatusReply(string(args[0]))
- } else {
- return reply.MakeErrReply("ERR wrong number of arguments for 'ping' command")
- }
- }
- func init() {
- RegisterCommand("ping", Ping, 1)
- }
复制代码 init方法:在启动程序时就会调用这个方法,用于初始化
database/keys.go- func execDel(db *DB, args [][]byte) resp.Reply {
- keys := make([]string, len(args))
- for i, v := range args {
- keys[i] = string(v)
- }
- deleted := db.Removes(keys...)
- return reply.MakeIntReply(int64(deleted))
- }
- func execExists(db *DB, args [][]byte) resp.Reply {
- result := int64(0)
- for _, arg := range args {
- key := string(arg)
- _, exists := db.GetEntity(key)
- if exists {
- result++
- }
- }
- return reply.MakeIntReply(result)
- }
- func execFlushDB(db *DB, args [][]byte) resp.Reply {
- db.Flush()
- return &reply.OkReply{}
- }
- func execType(db *DB, args [][]byte) resp.Reply {
- key := string(args[0])
- entity, exists := db.GetEntity(key)
- if !exists {
- return reply.MakeStatusReply("none")
- }
- switch entity.Data.(type) {
- case []byte:
- return reply.MakeStatusReply("string")
- }
- return &reply.UnknownErrReply{}
- }
- func execRename(db *DB, args [][]byte) resp.Reply {
- if len(args) != 2 {
- return reply.MakeErrReply("ERR wrong number of arguments for 'rename' command")
- }
- src := string(args[0])
- dest := string(args[1])
-
- entity, ok := db.GetEntity(src)
- if !ok {
- return reply.MakeErrReply("no such key")
- }
- db.PutEntity(dest, entity)
- db.Remove(src)
- return &reply.OkReply{}
- }
- func execRenameNx(db *DB, args [][]byte) resp.Reply {
- src := string(args[0])
- dest := string(args[1])
- _, exist := db.GetEntity(dest)
- if exist {
- return reply.MakeIntReply(0)
- }
- entity, ok := db.GetEntity(src)
- if !ok {
- return reply.MakeErrReply("no such key")
- }
- db.Removes(src, dest)
- db.PutEntity(dest, entity)
- return reply.MakeIntReply(1)
- }
- func execKeys(db *DB, args [][]byte) resp.Reply {
- pattern := wildcard.CompilePattern(string(args[0]))
- result := make([][]byte, 0)
- db.data.ForEach(func(key string, val interface{}) bool {
- if pattern.IsMatch(key) {
- result = append(result, []byte(key))
- }
- return true
- })
- return reply.MakeMultiBulkReply(result)
- }
- func init() {
- RegisterCommand("Del", execDel, -2)
- RegisterCommand("Exists", execExists, -2)
- RegisterCommand("Keys", execKeys, 2)
- RegisterCommand("FlushDB", execFlushDB, -1)
- RegisterCommand("Type", execType, 2)
- RegisterCommand("Rename", execRename, 3)
- RegisterCommand("RenameNx", execRenameNx, 3)
- }
复制代码 keys.go实现以下指令:
execDel:del k1 k2 k3 ...
execExists:exist k1 k2 k3 ...
execFlushDB:flushdb
execType:type k1
execRename:rename k1 k2
execRenameNx:renamenx k1 k2
execKeys:keys(依赖lib包的工具类wildcard.go)
database/string.go- func execGet(db *DB, args [][]byte) resp.Reply {
- key := string(args[0])
- bytes, err := db.getAsString(key)
- if err != nil {
- return err
- }
- if bytes == nil {
- return &reply.NullBulkReply{}
- }
- return reply.MakeBulkReply(bytes)
- }
- func (db *DB) getAsString(key string) ([]byte, reply.ErrorReply) {
- entity, ok := db.GetEntity(key)
- if !ok {
- return nil, nil
- }
- bytes, ok := entity.Data.([]byte)
- if !ok {
- return nil, &reply.WrongTypeErrReply{}
- }
- return bytes, nil
- }
- func execSet(db *DB, args [][]byte) resp.Reply {
- key := string(args[0])
- value := args[1]
- entity := &database.DataEntity{
- Data: value,
- }
- db.PutEntity(key, entity)
- return &reply.OkReply{}
- }
- func execSetNX(db *DB, args [][]byte) resp.Reply {
- key := string(args[0])
- value := args[1]
- entity := &database.DataEntity{
- Data: value,
- }
- result := db.PutIfAbsent(key, entity)
- return reply.MakeIntReply(int64(result))
- }
- func execGetSet(db *DB, args [][]byte) resp.Reply {
- key := string(args[0])
- value := args[1]
- entity, exists := db.GetEntity(key)
- db.PutEntity(key, &database.DataEntity{Data: value})
- if !exists {
- return reply.MakeNullBulkReply()
- }
- old := entity.Data.([]byte)
- return reply.MakeBulkReply(old)
- }
- func execStrLen(db *DB, args [][]byte) resp.Reply {
- key := string(args[0])
- entity, exists := db.GetEntity(key)
- if !exists {
- return reply.MakeNullBulkReply()
- }
- old := entity.Data.([]byte)
- return reply.MakeIntReply(int64(len(old)))
- }
- func init() {
- RegisterCommand("Get", execGet, 2)
- RegisterCommand("Set", execSet, -3)
- RegisterCommand("SetNx", execSetNX, 3)
- RegisterCommand("GetSet", execGetSet, 3)
- RegisterCommand("StrLen", execStrLen, 2)
- }
复制代码 string.go实现以下指令:
execGet:get k1
execSet:set k v
execSetNX:setnex k v
execGetSet:getset k v 返回旧值
execStrLen:strlen k
database/database.go- type Database struct {
- dbSet []*DB
- }
- func NewDatabase() *Database {
- mdb := &Database{}
- if config.Properties.Databases == 0 {
- config.Properties.Databases = 16
- }
- mdb.dbSet = make([]*DB, config.Properties.Databases)
- for i := range mdb.dbSet {
- singleDB := makeDB()
- singleDB.index = i
- mdb.dbSet[i] = singleDB
- }
- return mdb
- }
- func (mdb *Database) Exec(c resp.Connection, cmdLine [][]byte) (result resp.Reply) {
- defer func() {
- if err := recover(); err != nil {
- logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack())))
- }
- }()
- cmdName := strings.ToLower(string(cmdLine[0]))
- if cmdName == "select" {
- if len(cmdLine) != 2 {
- return reply.MakeArgNumErrReply("select")
- }
- return execSelect(c, mdb, cmdLine[1:])
- }
- dbIndex := c.GetDBIndex()
- selectedDB := mdb.dbSet[dbIndex]
- return selectedDB.Exec(c, cmdLine)
- }
- func execSelect(c resp.Connection, mdb *Database, args [][]byte) resp.Reply {
- dbIndex, err := strconv.Atoi(string(args[0]))
- if err != nil {
- return reply.MakeErrReply("ERR invalid DB index")
- }
- if dbIndex >= len(mdb.dbSet) {
- return reply.MakeErrReply("ERR DB index is out of range")
- }
- c.SelectDB(dbIndex)
- return reply.MakeOkReply()
- }
- func (mdb *Database) Close() {
- }
- func (mdb *Database) AfterClientClose(c resp.Connection) {
- }
复制代码 Database:一组db的集合
Exec:执行切换db指令或者其他指令
execSelect方法:选择db(指令:select 2)
resp/handler/handler.go- import (
- database2 "go-redis/database"
- )
- func MakeHandler() *RespHandler {
- var db database.Database
- db = database2.NewDatabase()
- return &RespHandler{
- db: db,
- }
- }
复制代码 修改实现协议层handler的database实现
架构小结
TCP层服务TCP的连接,然后将连接交给RESP协议层的handler,handler监听客户端的连接,将指令解析后发给管道,管道转给database层(database/database.go),核心层根据命令类型执行不同的方法,然后返回。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |