2.3.1(项目)kv存储——框架梳理(待定)

打印 上一主题 下一主题

主题 844|帖子 844|积分 2532

一、过一遍代码路线:

(1)底层网络框架;(2)业务层的协议筹划(发什么数据,以及收什么数据);(3)业务层的引擎筹划;
1. 底层网络框架

目标:通过定义函数创建网络层和业务层的联系,并从原有已以网络框架为主改造后以业务框架为main函数,确保此后网络层隔离和透明(以reactor.c为例)。
要求:reactor.c(手写)、ntyco协程(看懂)、io_uring(看懂)。
做法:用做reactor的做法来做kvstore的网络框架
核心:是设置自定义的布局体,明白什么变乱执行什么回调函数。while(1){}循环里,(a)EPOLLIN变乱执行recv_callback,EPOLLOUT变乱执行send_callback。(b)EPOLLIN变乱包罗sockfd和clientfd,分别执行accept函数和recv/send函数,可以通过recv_callback的index加以区分。
要点:
知识点(1):通过宏定义开启或关闭不同的功能,由此方便新功能编写。
  1. #define ENABLE_KVSTORE        1
复制代码
  1. #if    ENABLE_KVSTORE
  2.         kvs_request(&connect_list[fd]);
  3. #endif
  4. #if    ENABLE_KVSTORE
  5.         kvs_response(&connect_list[fd]);
  6. #endif
复制代码
知识点(2):(1) 协议处理 → 放到kvstore.c;(2)main入口函数 → 放到网络框架(reactor.c)里面?还是放到kvstore.c里面?
答:如果把main入口函数放到网络框架(reactor.c)就表明主要步调由网络框架实现,按照这个逻辑kvstore.c只是其中的一个业务,而流程不应该如许。所以main要放到kvstore.c里面,以保证网络层被完全的隔离。对于一个服务而言,我们要终极封装成:网络层仅必要传输端口(port)协议的接口(kvs_protocol)这两个端口已往,
知识点(3)网络框架(reactor.c)里怎样与kvstore.c毗连:
(a)定义kvs_request和kvs_response;
(b)send_cb和recv_cb分别设置kvs_request和kvs_response;
(c)send_cb里的kvs_request使用kvs_handler处理数据 ←
kvs_handler通过“static msg_handler kvs_handler” 定义范例,在网络框架(reactor.c)主函数里被handler赋值← 
handler作为网络框架(reactor.c)主函数里其中一个输入参数,被msg_handler布局体赋值 ←  
布局体msg_handler在网络框架(reactor.c)被定义,包罗*msg、length、*response,同时也在kvstore.h文件里。←
作为在网络框架(reactor.c)主函数输入参数的handler,在kvstore.c里被定义为kvs_protocal。网络框架(reactor.c)的命令行 “static msg_handler kvs_handler” 为了 “kvs_handler = handler”。←

网络层的编译命令

此处的代码:
server.h(基本没变化)
  1. #ifndef __SERVER_H__
  2. #define __SERVER_H__
  3. #define BUFFER_LENGTH                128
  4. #define ENABLE_KVSTORE              1
  5. typedef int  (*RCALLBACK)(int fd);
  6. struct conn {
  7.         int       fd;
  8.         char    rbuffer[BUFFER_LENGTH];
  9.         int       rlength;
  10.         char    wbuffer[BUFFER_LENGTH];
  11.         int       wlength;
  12.         RCALLBACK  send_callback;
  13.         RCALLBACK  recv_callback;
  14. };
  15. #if ENABLE_KVSTORE
  16. int kvs_request(struct conn *c);
  17. int kvs_response(struct conn *c);
  18. #endif
  19. #endif
复制代码
kvs_reactor.c
  1. #include <stdio.h>
  2. #include <sys/types.h>
  3. #include <sys/socket.h>
  4. #include <arpa/inet.h>
  5. #include <string.h>
  6. #include <errno.h>
  7. #include <sys/epoll.h>
  8. #include <unistd.h>
  9. #include <sys/time.h>
  10. #include "server.h"
  11. //#define BUFFER_LENGTH       1024
  12. #define CONNECT_SIZE           1048576
  13. #define PORT_SIZE                   1
  14. #define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
  15. #if ENABLE_KVSTORE
  16. typedef int (*msg_handler)(char *msg, int length, char *response);
  17. static msg_handler kvs_handler;
  18. int kvs_request(struct conn *c){
  19. //        printf("recv %d: %s\n", c->rlength ,c->rbuffer);
  20.         c->wlength = kvs_handler(c->rbuffer, c->rlength, c->wbuffer);
  21. }
  22. #endif
  23. int kvs_response(struct conn *c){
  24. }
  25. //typedef int  (*RCALLBACK)(int fd);
  26. int accept_cb(int fd);
  27. int recv_cb(int fd);
  28. int send_cb(int fd);
  29. int epfd = 0;
  30. struct timeval begin;
  31. /*
  32. struct conn {
  33.         int       fd;
  34.         char    rbuffer[BUFFER_LENGTH];
  35.         int       rlength;
  36.         char    wbuffer[BUFFER_LENGTH];
  37.         int       wlength;
  38.         RCALLBACK  send_callback;
  39.         RCALLBACK  recv_callback;
  40. };
  41. */
  42. struct conn connect_list[CONNECT_SIZE] = {0};
  43. int set_event(int fd, int event, int flag){
  44.         if (flag == 1){ // non-zero add
  45.         struct epoll_event ev;
  46.         ev.data.fd = fd;
  47.         ev.events = event;
  48.         epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
  49.         } else {           // zero MOD
  50.         struct epoll_event ev;
  51.         ev.data.fd = fd;
  52.         ev.events = event;
  53.         epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
  54.         }
  55. }
  56. int event_register(int fd, int event){
  57.         if (fd < 0){
  58.                 return -1;
  59.         }
  60.         connect_list[fd].fd = fd;
  61.         connect_list[fd].recv_callback = recv_cb;
  62.         connect_list[fd].send_callback = send_cb;
  63.         memset(connect_list[fd].rbuffer, 0, BUFFER_LENGTH);
  64.         connect_list[fd].rlength = 0;
  65.         memset(connect_list[fd].wbuffer, 0, BUFFER_LENGTH);
  66.         connect_list[fd].wlength = 0;
  67.         set_event(fd, EPOLLIN, 1);
  68. }
  69. int accept_cb(int fd){
  70.         struct sockaddr_in clientaddr;
  71.         socklen_t len = sizeof(clientaddr);
  72.         int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
  73.         if (clientfd < 0){
  74.                 printf("accept error:  %d, %s\n", errno, strerror(errno));
  75.                 return -1;
  76.         }
  77.         event_register(clientfd, EPOLLIN);
  78.         if((clientfd % 1000) == 0){
  79.                 struct timeval current;
  80.                 gettimeofday(&current, NULL);
  81.                 int time_used = TIME_SUB_MS(current, begin);
  82.                 memcpy(&begin, &current, sizeof(struct timeval));
  83.                 printf("Accept clientfd : %d, time_used: %d\n", clientfd, time_used);
  84.         }
  85.         return 0;
  86. }
  87. int recv_cb(int fd){
  88.         int count = recv(fd, connect_list[fd].rbuffer, BUFFER_LENGTH, 0);
  89.         if (count ==0){
  90.                 printf("client disconnect: %d\n", fd);
  91.                 close(fd);
  92.                 epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);  // unfinished
  93.                 return 0;
  94.         }
  95.         connect_list[fd].rlength = count;
  96. //        printf("rbuffer: %s, recv: %d\n", connect_list[fd].rbuffer, count);
  97. # if 0 //echo
  98.         connect_list[fd].wlength = connect_list[fd].rlength;
  99.         memcpy(connect_list[fd].wbuffer, connect_list[fd].rbuffer, connect_list[fd].wlength);
  100. #endif
  101. #if    ENABLE_KVSTORE
  102.         kvs_request(&connect_list[fd]);
  103. #endif
  104.         set_event(fd, EPOLLOUT, 0);
  105.         return count;
  106. }
  107. int send_cb(int fd){
  108. #if    ENABLE_KVSTORE
  109.         kvs_response(&connect_list[fd]);
  110. #endif
  111.         int count = 0;
  112.         count = send(fd, connect_list[fd].wbuffer, connect_list[fd].wlength, 0);
  113. //        printf("wbuffer: %s, send: %d\n", connect_list[fd].wbuffer, count);
  114.         set_event(fd, EPOLLIN, 0);
  115.         return count;
  116. }
  117. int r_init_server(unsigned short port){
  118.         int sockfd = socket(AF_INET, SOCK_STREAM, 0);
  119.         struct sockaddr_in servaddr;
  120.         servaddr.sin_family = AF_INET;
  121.         servaddr.sin_addr.s_addr= htonl(INADDR_ANY);  // uint32_t
  122.         servaddr.sin_port = htons(port);
  123.         if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {
  124.                 printf("bind failed: %s\n", strerror(errno));
  125.         }
  126.         listen(sockfd, 10);
  127. //        printf("listen finished: %d\n", sockfd);
  128.         return sockfd;
  129. }
  130. int reactor_start(unsigned short port, msg_handler handler){
  131.         epfd = epoll_create(1);
  132.         //unsigned short port = 2000;
  133.         kvs_handler =  handler;
  134.         int i = 0;
  135.         for (i = 0; i < PORT_SIZE; i ++){
  136.                 int sockfd = r_init_server(port + i);
  137.                 connect_list[sockfd].fd =  sockfd;
  138.                 connect_list[sockfd].recv_callback = accept_cb;
  139.                 printf("sockfd: %d\n", sockfd);
  140.                 set_event(sockfd, EPOLLIN, 1);
  141.         }
  142.         gettimeofday(&begin, NULL);
  143.         while(1){
  144.                 struct epoll_event events[1024] = {0};
  145.                 int nready = epoll_wait(epfd, events, 1024, -1);
  146. //                printf("epoll_wait is %d\n", nready);
  147.                 int i = 0;
  148.                 for (i = 0; i < nready; i ++){
  149.                         int connfd = events[i].data.fd;
  150.                         if (events[i].events & EPOLLIN){
  151.                                 connect_list[connfd].recv_callback(connfd);
  152.                         }
  153.                         if (events[i].events & EPOLLOUT){
  154.                                 connect_list[connfd].send_callback(connfd);
  155.                         }
  156.                 }
  157.         }
  158.         return 0;
  159. }
复制代码
kvstore.c
  1. #include <stdio.h>
  2. #include <string.h>
  3. #include <stdlib.h>
  4. #include "kvstore.h"
  5. /*
  6. * msg: request message
  7. * length: length of request message
  8. * response:  need to seed
  9. *@return: length of response
  10. */
  11. int kvs_protocal(char *msg, int length, char *response){
  12.         printf("recv %d: %s\n", length, msg);
  13.         memcpy(response, msg, length);
  14.         return strlen(response);
  15. }
  16. int main(int argc, char *argv[]){ //  input parameters
  17.         if (argc != 2) return -1;
  18.         // ./program Hello World: Number of arguments is 3
  19.         int port = atoi(argv[1]);
  20. #if   (NETWORK_SELECT  ==  NETWORK_REACTOR)
  21.         reactor_start(port, kvs_protocal);
  22. #elif
  23. #endif
  24. }
复制代码
kvstore.h
  1. #ifndef   __KV_STORE_H__
  2. #define  __KV_STORE_H__
  3. #define NETWORK_REACTOR     0
  4. #define NETWORK_SELECT        NETWORK_REACTOR
  5. typedef int (*msg_handler)(char *msg, int length, char *response);
  6. extern int reactor_start(unsigned short port, msg_handler handler);
  7. const char *command[] = {
  8.         "SET", "GET", "DEL", "MOD", "EXIST"
  9. };
  10. const char *response[] = {
  11. };
  12. #endif
复制代码
2. 业务层的协议筹划:

目标:在前面已经创建tcp公共协议毗连的根本上,创建发什么数据和接收什么数据的私有协议 / 自定义协议 / 发数据的格式。
具体做法:对于网络部门已经实现了数据的接收和发送,所以接下来我们要在收数据的地方修改,在原有reactor.c的根本上,
(1)新建kvstore.c定义有关业务联系的函数(接收request和输出response);
(2)新建kvstore.h为函数(接收request和输出response)定义函数具体的协议、具体response
具体5个response格式:
  1. command(命令)
  2. SET         Key     Value
  3. GET         Key
  4. DEL         Key
  5. MOD         Key     Value
  6.   
  7. EXIST       Key
复制代码
(3)在上一部门网络已经封装根本上,以kvstore.c里的自定义kvs_protocol函数(也就是对应网络框架(reactor.c) 的主函数第2个输入参数)为入口函数,进行具体的协议解析功能(也就是定义接收和发送的数据规则寄义,如下2种模式)
  1. SET Key Value
  2. Get Key
复制代码
要点:
知识点(1):针对tcp的分包和粘包,问题1是什么是分包和粘包,问题2是怎么解决这个问题?
答1:由于send(fd, buffer, length, 0)的次数和recv(fd, buffer, length, 0)大概不一致,大概send()多次发送recv一次性都接收了(这就是粘包),所以要区分出每次的数据(分包解数据)。
答2:分包的时间必要和对方约定好格式,同时tcp毗连满足假设(1) recv()先接收的数据就是对方先发送的数据,保证了接收数据的次序;(2)  数据在传输过程中不会发生丢包情况,保证了数据必达性
方式1. send()在发包时就确定约定的长度,比如接收2次就可以接收完整的数据包,第一次接收2字节作为标志,第二次再接收数据包。
  1. SEND:
  2. [15]SET Key Value\r\n   // 15表示15个字符
复制代码
  1. 第一次:接收开头的2个字符,同时通过这2个字符的值得知后面还要接收15字符
  2. short length = 0;
  3. recv(fd, &length, 2, 0);
  4. 第二次:接收剩余数据15个字符
  5. recv(fd, buffer, length, 0);
复制代码
方式2. 仿照Redis,先确定有多少(N)个token后就先发送字符N,具体发送格式如下:
  1. Refis → SET Key Value
  2. 发送顺序如下
  3. 3\r\n     // 3表示“SET Key Value”一共是3个token(单词)
  4. 3\r\n     // 3表示第一个token(单词)“SET"是3个字符的长度
  5. SET\r\n   // 发送一个token,“SET"
  6. 3\r\n     // 3表示第二个token(单词)“Key"也是3个字符的长度
  7. Key\r\n   // 发送一个token,“Key"
  8. 5\r\n     // 5表示第三个token(单词)“Value"也是5个字符的长度
  9. Value\r\n   // 发送一个token,“Value"
复制代码
伪代码如下,终极要读(1+2*3)=7次
  1. 自行实现readline()函数,要求每一行读到"\r\n"就停止
  2. count = atoi(buffer);
  3. for (i = 0; i < count; i ++){
  4.     readline(fd, tokenlen);
  5.     readline(fd, token);
  6. }
复制代码
3. 业务层的引擎筹划

数据布局用红黑树、希哈、数组、链表(手写)



二、备注:

(1) “const char *command[] ={};” 什么意思?以及为什么是const以及为什么*?
答:const: 确保字符串内容不能被修改。*: 表现指针,每个数组元素都指向一个字符串。
(2) 定义函数不消分号";",而布局体必要,那么什么时间必要加“;”?
答:定义一个范例(比方 struct 或 typedef)告诉编译器这个范例存在但并不提供执行行为,必要以分号结束。
(a)必要分号的地方:变量声明、布局体定义、typedef 定义新范例
  1. int a = 10; // 表示变量声明结束
  2. struct Point {
  3.     int x;
  4.     char y;
  5. }; // 分号表示 `struct Point` 的定义结束
  6. typedef struct {
  7.     int x;
  8.     char y;
  9. } PointType; // 分号表示 `typedef` 的结束
复制代码
(b)不必要分号的地方:函数定义、#define 宏(宏是简单的文本替换)。
  1. int add(int a, int b) {
  2.     return a + b; // 函数体内语句的分号只是代码逻辑的一部分
  3. }
  4. #define SQUARE(x) ((x) * (x)) // 宏定义
复制代码
(3)为什么分为 .c 和 .h 文件?
答:每个 .c 文件负责一个模块的实现,多个模块通过 .h 文件互相调用。.h 文件只能声明接口,不能包罗具体实现。自定义头文件使用双引号:#include "xxxx.h"。
(4)"int main(int argc, char *argv[]){}" 是一个标准的 C 语言步调入口函数。
答:argc表现参数个数 (Argument Count),argv表现参数向量 (Argument Vector)。用于获取命令行传递给步调的参数。
  1. int main(int argc, char *argv[]) {
  2.     printf("Number of arguments: %d\n", argc);
  3.     for (int i = 0; i < argc; i++) {
  4.         printf("Argument %d: %s\n", i, argv[i]);
  5.     }
  6.     return 0;
  7. }
  8. 运行:
  9. ./program Hello World
  10. 输出:
  11. Number of arguments: 3
  12. Argument 0: ./program
  13. Argument 1: Hello
  14. Argument 2: World
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

卖不甜枣

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表