KVstore :键值映射存储服务器

打印 上一主题 下一主题

主题 946|帖子 946|积分 2838

概述:本文介绍kv存储服务,所谓kv即key-value映射,用户存储键值对,提供:1.根据键查找值 2.根据键修改值 3.根据键删除值

效果:kv存储是运行在服务器上的一个历程,客户端通过套接字与服务器上的kvstore历程进行通信,客户端发送由协议规定的请求例如 SET name01 wjq ,kvstore服务器接收到请求并解析,复兴效果 SUCCESS; 又例如客户端发送 GET name01 ,接收到服务端的复兴 wjq

实现思路:

1.首先我们必要做到kvstore与客户端通信,这里使用tcp,也就是说设计之初kvstore就是一个支持百万级并发毗连的tcp服务器:这里使用一个reactor模型,直接附上代码,tcp服务器不在本文讲解范围内
  1. #include <sys/socket.h>
  2. #include <errno.h>
  3. #include <netinet/in.h>
  4. #include <stdio.h>
  5. #include <string.h>
  6. #include <unistd.h>
  7. #include <pthread.h>
  8. #include <sys/poll.h>
  9. #include <sys/epoll.h>
  10. #include <sys/time.h>
  11. #include "kvstore.h"
  12. // listenfd
  13. // EPOLLIN -->
  14. int accept_cb(int fd);
  15. // clientfd
  16. //
  17. int recv_cb(int fd);
  18. int send_cb(int fd);
  19. // conn, fd, buffer, callback
  20. int epfd = 0;
  21. struct conn_item connlist[1048576] = {0}; // 1024  2G     2 * 512 * 1024 * 1024
  22. // list
  23. struct timeval zvoice_king;
  24. //
  25. // 1000000
  26. #define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
  27. int set_event(int fd, int event, int flag) {
  28.         if (flag) { // 1 add, 0 mod
  29.                 struct epoll_event ev;
  30.                 ev.events = event ;
  31.                 ev.data.fd = fd;
  32.                 epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
  33.         } else {
  34.        
  35.                 struct epoll_event ev;
  36.                 ev.events = event;
  37.                 ev.data.fd = fd;
  38.                 epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
  39.         }
  40.        
  41. }
  42. int accept_cb(int fd) {
  43.         struct sockaddr_in clientaddr;
  44.         socklen_t len = sizeof(clientaddr);
  45.        
  46.         int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
  47.         if (clientfd < 0) {
  48.                 return -1;
  49.         }
  50.         set_event(clientfd, EPOLLIN, 1);
  51.         connlist[clientfd].fd = clientfd;
  52.         memset(connlist[clientfd].rbuffer, 0, BUFFER_LENGTH);
  53.         connlist[clientfd].rlen = 0;
  54.         memset(connlist[clientfd].wbuffer, 0, BUFFER_LENGTH);
  55.         connlist[clientfd].wlen = 0;
  56.        
  57.         connlist[clientfd].recv_t.recv_callback = recv_cb;
  58.         connlist[clientfd].send_callback = send_cb;
  59.         if ((clientfd % 1000) == 999) {
  60.                 struct timeval tv_cur;
  61.                 gettimeofday(&tv_cur, NULL);
  62.                 int time_used = TIME_SUB_MS(tv_cur, zvoice_king);
  63.                 memcpy(&zvoice_king, &tv_cur, sizeof(struct timeval));
  64.                
  65.                 printf("clientfd : %d, time_used: %d\n", clientfd, time_used);
  66.         }
  67.         return clientfd;
  68. }
  69. int recv_cb(int fd) { // fd --> EPOLLIN
  70.         char *buffer = connlist[fd].rbuffer;
  71.         int idx = connlist[fd].rlen;
  72.        
  73.         int count = recv(fd, buffer, BUFFER_LENGTH, 0);
  74.         if (count == 0) {
  75.                 printf("disconnect\n");
  76.                 epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);               
  77.                 close(fd);
  78.                
  79.                 return -1;
  80.         }
  81.        
  82.         connlist[fd].rlen = count;
  83.         kvstore_request(&connlist[fd]);
  84.         connlist[fd].wlen = strlen(connlist[fd].wbuffer);
  85.         set_event(fd, EPOLLOUT, 0);   
  86.        
  87.         return count;
  88. }
  89. int send_cb(int fd) {
  90.         char *buffer = connlist[fd].wbuffer;
  91.         int idx = connlist[fd].wlen;
  92.         int count = send(fd, buffer, idx, 0);
  93.         set_event(fd, EPOLLIN, 0);
  94.         return count;
  95. }
  96. int init_server(unsigned short port) {
  97.         int sockfd = socket(AF_INET, SOCK_STREAM, 0);
  98.         struct sockaddr_in serveraddr;
  99.         memset(&serveraddr, 0, sizeof(struct sockaddr_in));
  100.         serveraddr.sin_family = AF_INET;
  101.         serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
  102.         serveraddr.sin_port = htons(port);
  103.         if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) {
  104.                 perror("bind");
  105.                 return -1;
  106.         }
  107.         listen(sockfd, 10);
  108.         return sockfd;
  109. }
  110. int epoll_entry(void) {
  111.         int port_count = 20;
  112.         unsigned short port = 2048;
  113.         int i = 0;
  114.        
  115.         epfd = epoll_create(1); // int size
  116.         for (i = 0;i < port_count;i ++) {
  117.                 int sockfd = init_server(port + i);  // 2048, 2049, 2050, 2051 ... 2057
  118.                 connlist[sockfd].fd = sockfd;
  119.                 connlist[sockfd].recv_t.accept_callback = accept_cb;
  120.                 set_event(sockfd, EPOLLIN, 1);
  121.         }
  122.         gettimeofday(&zvoice_king, NULL);
  123.         struct epoll_event events[1024] = {0};
  124.        
  125.         while (1) { // mainloop();
  126.                 int nready = epoll_wait(epfd, events, 1024, -1); //
  127.                 int i = 0;
  128.                 for (i = 0;i < nready;i ++) {
  129.                         int connfd = events[i].data.fd;
  130.                         if (events[i].events & EPOLLIN) { //
  131.                                 int count = connlist[connfd].recv_t.recv_callback(connfd);
  132.                                 //printf("recv count: %d <-- buffer: %s\n", count, connlist[connfd].rbuffer);
  133.                         } else if (events[i].events & EPOLLOUT) {
  134.                                 // printf("send --> buffer: %s\n",  connlist[connfd].wbuffer);
  135.                                
  136.                                 int count = connlist[connfd].send_callback(connfd);
  137.                         }
  138.                 }
  139.         }
  140.         //getchar();
  141.         //close(clientfd);
  142. }
复制代码
函数epoll_entry实现了与客户端之间的通信,并通过kvstore_request(&connlist[fd])这个函数实现了处置惩罚客户端请求,并将处置惩罚效果发送给客户端

2.kvstore存储引擎的实现

概要:由于服务器要将客户端请求存储的内容存储起来,有两种方式,一是存储到数据库,二是存储到服务端本地
为了简朴实现业务,本文使用存储到本地进行讲解,接纳的数据结构是哈希表
先介绍哈希表的实现以及为kvstore封装的接口:
  1. /*
  2. * 单线程版本,没有做线程安全!
  3. *
  4. */
  5. #include <stdio.h>
  6. #include <string.h>
  7. #include <stdlib.h>
  8. #include <pthread.h>
  9. #include "kvstore.h"
  10. #define MAX_KEY_LEN        128
  11. #define MAX_VALUE_LEN        512
  12. #define MAX_TABLE_SIZE        102400
  13. #define ENABLE_POINTER_KEY        1
  14. typedef struct hashnode_s { // hash node
  15. #if ENABLE_POINTER_KEY
  16.     char *key;
  17.     char *value;
  18. #else
  19.     char key[MAX_KEY_LEN];
  20.     char value[MAX_VALUE_LEN];
  21. #endif
  22.     struct hashnode_s *next;
  23. } hashnode_t;
  24. typedef struct hashtable_s { // hash table
  25.     hashnode_t **nodes; // hashnode_t * 类型的 *nodes,也就是存放着hashnode_t类型的指针的数组nodes
  26.     int max_slots;
  27.     int count;
  28. } hashtable_t;
  29. hashtable_t Hash;
  30. static int _hash(char *key, int size) { // hash函数,使用key确定hash值
  31.     if (!key) return -1;
  32.     int sum = 0;
  33.     int i = 0;
  34.     while (key[i] != 0) { // 使用ASCII计算hash值,由于key是字符数组,该方法通用
  35.         sum += key[i];
  36.         i ++;
  37.     }
  38.     return sum % size; // 返回hash值
  39. }
  40. hashnode_t *_create_node(char *key, char *value) {
  41.     hashnode_t *node = (hashnode_t *)kvstore_malloc(sizeof(hashnode_t));
  42.     if (!node) return NULL; // malloc filed
  43. #if ENABLE_POINTER_KEY
  44.     // 为节点的成员分配空间
  45.     node->key = kvstore_malloc(strlen(key) + 1);
  46.     if (!node->key) {
  47.         kvstore_free(node); // node分配成功但key失败
  48.         return NULL;
  49.     }
  50.     strcpy(node->key, key);
  51.     node->value = kvstore_malloc(strlen(value) + 1);
  52.     if (!node->value) {
  53.         kvstore_free(node->key); // node和key分配成功但value失败
  54.         kvstore_free(node);
  55.         return NULL;
  56.     }
  57.     strcpy(node->value, value);
  58. #else
  59.         strncpy(node->key, key, MAX_KEY_LEN);
  60.         strncpy(node->value, value, MAX_VALUE_LEN);
  61.        
  62. #endif
  63.     // 初始化 next
  64.     node->next = NULL;
  65.     return node;
  66. }
  67. int init_hashtable(hashtable_t *hash) {
  68.     if (!hash) return -1;
  69.     hash->nodes = (hashnode_t**)kvstore_malloc(sizeof(hashnode_t *) * MAX_TABLE_SIZE);
  70.     if (!hash->nodes) return -1;
  71.     hash->max_slots = MAX_TABLE_SIZE;
  72.     hash->count = 0;
  73.     return 0;
  74. }
  75. void dest_hashtable(hashtable_t *hash) { // 销毁哈希表
  76.     if (!hash) return;
  77.     // 遍历释放数组中所有链表
  78.     int i = 0;
  79.     for (i = 0; i < hash->max_slots; i++) {
  80.         hashnode_t *node = hash->nodes[i];
  81.         while (node != NULL) {
  82.             hashnode_t *tmp = node; // 保存当前节点
  83.             node = node->next; // 移动到下一个节点
  84.             hash->nodes[i] = node; // 更新头指针,在这段代码中没有作用
  85.             kvstore_free(tmp); // 释放当前节点
  86.         }
  87.     }
  88.     kvstore_free(hash->nodes); // 释放哈希表的数组成员
  89. }
  90. int put_kv_hashtable(hashtable_t *hash, char *key, char *value) {
  91.     if (!hash || !key || !value) return -1;
  92.     int idx = _hash(key, MAX_TABLE_SIZE); // 哈希值作为数组下标
  93.     hashnode_t *node = hash->nodes[idx]; // 获取正确数组位置的头指针
  94. #if 1
  95.     while (node != NULL) {    // 如果已经存在,直接退出,不重复插入
  96.         if (strcmp(node->key, key) == 0) {
  97.             return 1;
  98.         }
  99.         node = node->next;
  100.     }
  101. #endif
  102.     hashnode_t *new_node = _create_node(key, value);
  103.     // 头插法
  104.     new_node->next = hash->nodes[idx];
  105.     hash->nodes[idx] = new_node; // 更新头节点指针
  106.     hash->count ++;
  107.     return 0;
  108. }
  109. char *get_kv_hashtable(hashtable_t *hash, char *key) { // search
  110.     if (!hash || !key) return NULL;
  111.     int idx = _hash(key, MAX_TABLE_SIZE);
  112.     hashnode_t *node = hash->nodes[idx]; // 确定数组索引
  113.     while (node != NULL) { // 遍历查找
  114.         if (strcmp(node->key, key) == 0) {
  115.             return node->value;
  116.         }
  117.         node = node->next;
  118.     }
  119.     return NULL;
  120. }
  121. int count_kv_hashtable(hashtable_t *hash) {
  122.         return hash->count;
  123. }
  124. int delete_kv_hashtable(hashtable_t *hash, char *key) { // 根据key删除节点
  125.     if (!hash || !key) return -1;
  126.     int idx = _hash(key, MAX_TABLE_SIZE); // 哈希值作为索引
  127.     // 先判断头指针
  128.     hashnode_t *head = hash->nodes[idx];
  129.     if (head == NULL) return -1;
  130.     // 遍历链表
  131.     hashnode_t *cur = hash->nodes;
  132.     hashnode_t *prev = NULL;
  133.     while (cur != NULL) {
  134.         if (strcmp(cur->key, key) == 0) break;
  135.         prev = cur;
  136.         cur = cur->next;
  137.     }
  138.     if (cur == NULL) return -1; // 没找到
  139.     if (prev == NULL) { // 如果要删除的是头节点
  140.         hash->nodes[idx] = cur->next; // 删除cur
  141.     } else { // 不是头节点
  142.         prev->next = cur->next; // 删除cur
  143.     }
  144. // 释放cur节点的空间
  145. #if ENABLE_POINTER_KEY
  146.         if (cur->key) {
  147.                 kvstore_free(cur->key);
  148.         }
  149.         if (cur->value) {
  150.                 kvstore_free(cur->value);
  151.         }
  152.         kvstore_free(cur);
  153. #else
  154.         free(cur);
  155. #endif
  156.         hash->count --; // 更新count
  157.     return 0;
  158. }
  159. int exit_kv_hashtable(hashtable_t *hash, char *key) { // 判断是否存在该key的映射value
  160.     char *value = get_kv_hashtable(hash, key);
  161.     if (value) return 1;
  162.     else return 0;
  163. }
  164. int kvs_hash_modify(hashtable_t *hash, char *key, char *value) { // 先查找key再修改value
  165.     if (!hash || !key || !value) return -1;
  166.     int idx = _hash(key, MAX_TABLE_SIZE);
  167.     hashnode_t *node = hash->nodes[idx];
  168.     while (node != NULL) {
  169.         if (strcmp(node->key, key) == 0) {
  170.             // 先释放原空间,避免内存泄漏
  171.             kvstore_free(node->value); // 释放原value指向的空间
  172.             node->value = NULL; // 避免使用悬空指针
  173.             // 新分配空间
  174.             node->value = kvstore_malloc(strlen(value) + 1);
  175.             if (node->value) { // 分配成功
  176.                 strcpy(node->value, value);
  177.                 return 0;
  178.             } else
  179.                 assert(0);
  180.         }
  181.         node = node->next;
  182.     }
  183.     return -1;
  184. }
  185. int kvs_hash_count(hashtable_t *hash) {
  186.         return hash->count;
  187. }
  188. // 再封装一层接口:使用第三方库时,对库函数进行一层封装,适配自己的代码,
  189. // 排查问题或更新迭代时只需要修改这一层接口的内容就行,不需要在源代码主体上修改,相当于做了一层隔离
  190. int kvstore_hash_craete(hashtable_t *hash) {
  191.     return init_hashtable(hash);
  192. }
  193. void kvstore_hash_destory(hashtable_t *hash) {
  194.         return dest_hashtable(hash);
  195. }
  196. int kvs_hash_set(hashtable_t *hash, char *key, char *value) {
  197.         return put_kv_hashtable(hash, key, value);
  198. }
  199. char *kvs_hash_get(hashtable_t *hash, char *key) {
  200.         return get_kv_hashtable(hash, key);
  201. }
  202. int kvs_hash_delete(hashtable_t *hash, char *key) {
  203.         return delete_kv_hashtable(hash, key);
  204. }
复制代码
对于哈希表的设计与实现,注释说的很清晰了,末了封装的接口是用在接下来的kvstore主程序中的
3.kvstore主体

概要:这份代码集成了前面的tcp服务epoll_entry、存储组件哈希表以及末了要介绍的:对客户端请求进行解析处置惩罚的组件
先介绍kvstore主程序:
  1. int init_kvengine(void) {
  2.         kvstore_hash_create(&Hash);
  3. }
  4. int exit_kvengine(void) {
  5.    
  6.         kvstore_hash_destory(&Hash);
  7. }
  8. int main() {
  9.         init_kvengine(); // 创建存储引擎,这里是哈希表
  10.        
  11.         epoll_entry();  // 启动tcp服务器,处理并回复客户端请求
  12.         exit_kvengine(); // 销毁哈希表
  13. }
复制代码
而这里调用的init_kvengine();现实上就是前面的哈希表代码中的:
  1. int init_hashtable(hashtable_t *hash) {
  2.     if (!hash) return -1;
  3.     hash->nodes = (hashnode_t**)kvstore_malloc(sizeof(hashnode_t *) * MAX_TABLE_SIZE);
  4.     if (!hash->nodes) return -1;
  5.     hash->max_slots = MAX_TABLE_SIZE;
  6.     hash->count = 0;
  7.     return 0;
  8. }
复制代码
4.请求解析

我们对于kvstore主程序中的存储引擎、tcp服务都介绍完了,接下来介绍最核心的请求解析函数:
这两个函数位于epoll_entry的kvstore_request(&connlist[fd])函数中:
  1. int kvstore_request(struct conn_item *item) {
  2.     char *msg = item->rbuffer;
  3.     char *tokens[KVSTORE_MAX_TOKENS];
  4.     int count = kvstore_split_token(msg, tokens); // 解析请求
  5.     kvstore_parser_protocol(item, tokens, count); // 生成回复内容
  6.     return 0;
  7. }
复制代码
这个函数做到了对用户请求的解析以及复兴,而依靠的是以下两个函数:
解析请求:

  1. int kvstore_split_token(char *msg, char **tokens) { // 将msg字符串进行分割,结果保存在tokens字符串数字里
  2.     if (msg == NULL || tokens == NULL) return -1; // 参数检查
  3.     int idx = 0;
  4.     char *token = strtok(msg, " "); // 对msg按空格“ ”进行分割,返回第一个子字符串
  5.     while (token != NULL) { // 获取剩余的子字符串
  6.         tokens[idx++] = token; // 将子字符串保存在字符串数组里
  7.         token = strtok(NULL, " "); // 固定写法,依次获取除第一个外,剩余的子字符串
  8.     }
  9.     return idx; // 返回子字符串的个数
  10. }
复制代码
我们能对用户请求按空格进行分割的原因是,kvstore规定了应用层协议,只有按协议规定发送的请求才气被正确处置惩罚,就像linux shell 中的下令的名称以及使用方法一样
处置惩罚并复兴:

  1. int kvstore_parser_protocol(struct conn_item *item, char **tokens, int count) {
  2.     if (item == NULL || tokens[0] == NULL || count == 0) return -1; // 检查参数
  3.     char *msg = item->wbuffer; // 获取写缓冲区
  4.         memset(msg, 0, BUFFER_LENGTH);
  5.     // 对用户的命令的解析结果, 例如 SET name wjq 解析结果如下:
  6.     char *command = tokens[0];  // SET
  7.         char *key = tokens[1];      // name
  8.         char *value = tokens[2];    // wjq
  9.    
  10.     int cmd = KVS_CMD_START;
  11.    
  12.     for (cmd = KVS_CMD_START; cmd < KVS_CMD_SIZE; cmd++) { // 查找比对tokens里的命令
  13.         if (strcmp(commands[cmd], command) == 0) {
  14.             break; // 找到了或者不存在
  15.         }
  16.     }
  17.     // 匹配命令并回复结果
  18.     switch (cmd) {
  19.       
  20.         case KVS_CMD_HSET: {    // SET :添加
  21.                         int res = kvstore_hash_set(key, value); // 调用哈希表的函数
  22.                         if (!res) {
  23.                                 snprintf(msg, BUFFER_LENGTH, "SUCCESS");
  24.                         } else {
  25.                                 snprintf(msg, BUFFER_LENGTH, "FAILED");
  26.                         }
  27.                         break;
  28.                 }
  29.                
  30.                 case KVS_CMD_HGET: {   // GET :查询
  31.                         char *val = kvstore_hash_get(key); // 调用哈希表提供的接口
  32.                         if (val) {
  33.                                 snprintf(msg, BUFFER_LENGTH, "%s", val);
  34.                         } else {
  35.                                 snprintf(msg, BUFFER_LENGTH, "NO EXIST");
  36.                         }
  37.                        
  38.                         break;
  39.                 }
  40.                 case KVS_CMD_HDEL: { // DEL : 删除
  41.                         int res = kvstore_hash_delete(key);
  42.                         if (res < 0) {  // server
  43.                                 snprintf(msg, BUFFER_LENGTH, "%s", "ERROR");
  44.                         } else if (res == 0) {
  45.                                 snprintf(msg, BUFFER_LENGTH, "%s", "SUCCESS");
  46.                         } else {
  47.                                 snprintf(msg, BUFFER_LENGTH, "NO EXIST");
  48.                         }
  49.                        
  50.                         break;
  51.                 }
  52.         case KVS_CMD_HMOD: { // MOD : 修改
  53.                         int res = kvstore_hash_modify(key, value);
  54.                         if (res < 0) {  // server
  55.                                 snprintf(msg, BUFFER_LENGTH, "%s", "ERROR");
  56.                         } else if (res == 0) {
  57.                                 snprintf(msg, BUFFER_LENGTH, "%s", "SUCCESS");
  58.                         } else {
  59.                                 snprintf(msg, BUFFER_LENGTH, "NO EXIST");
  60.                         }
  61.                        
  62.                         break;
  63.                 }
  64.                 case KVS_CMD_HCOUNT: { // COUNT : 查询数量
  65.                         int count = kvstore_hash_count();
  66.                         if (count < 0) {  // server
  67.                                 snprintf(msg, BUFFER_LENGTH, "%s", "ERROR");
  68.                         } else {
  69.                                 snprintf(msg, BUFFER_LENGTH, "%d", count);
  70.                         }
  71.                         break;
  72.                 }
  73.                
  74.                 default: {
  75.                         printf("cmd: %s\n", commands[cmd]);
  76.                         assert(0);
  77.                 }
  78.                
  79.     }
  80. }
复制代码
可以看到解析查询的过程就是将用户按我们指定协议输入的请求,分成几段,为每一条请求进行一次解析、处置惩罚
增删改查用到了哈希表这个数据结构提供的函数,而只有按空格将字符串分割这个函数是我们自行设计的,难度并不大
至此,kvstore的设计实现已经全部完成
保举学习https://xxetb.xetslk.com/s/p5Ibb

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

灌篮少年

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表