2.1.7-1 io_uring的使用

打印 上一主题 下一主题

主题 1093|帖子 1093|积分 3279

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
一、背景

(1)下面几个有关异步操纵的例子:
a)客户端和服务端的异步关系,就是客户端发送哀求后不必要等待结果,接下来发送其他哀求。
b)对于服务端,客户端来哀求后,服务端获得哀求后去mySQl查询,然后在返回结果。在这里,处理哀求、检测对应的io,读写数据和处理数据做到异步。

(2)以下4种都是同步的io,怎么理解?
read()读数据和返回数据是一起的,读数据就返回。读成功就把数据带回来,不成功返回的就是-1。读这个函数自己就是同步的。发起读哀求和返回数据是合到一起的,io是同步的io。
write()发起写哀求和把数据写进内核也是一起的。
recv()在发吸收哀求和吸收数据也是一起的。


(3)异步io,是指比如本来同步的read()中读哀求和数据返回没有做到一起,而是做成2个步骤。优点是使至少本来copy_to_user()的时间节省下来。
思路框架:某个操纵为了做到异步,可以通过引入一个线程池/任务队列实现。第1步是把这个操纵作为任务抛到任务队列里面。→  第2步就是通过线程池(worker)从任务队列里面取任务执行。→  第3步是把执行后的结果放入另一个队列里面。→  第4步开启类似线程的东西(while循环)不断地从“complete queue”里面取结果就可以了。对外界而言就是,只是必要不断投放信息到"submit queue"就可以了。
参数先容:
对于封装的a_read()有3个参数:fd,读数据放到哪个buffer里,另有这个buffer多长。打成task后放到submit queue里面,然后由别的的线程(worker)执行,执行出的结果放到complete queue里面,等待外部来取。

以上执行过程的2个问题:
a) 以上的操纵避免不了频仍的把task任务copy到队列的过程,怎么处理?
答:我们可以把这块频仍分配的内存以mmaped的方式映射出来,就是不必要每次别的创建空间,而是调用定义的“read(fd, buffer, length)”这个函数通过 mmap 提前申请一块内存区域,将其直接映射到任务队列(submit queue和complete queue)所需的空间上,这样,队列的生产和消耗操纵不必要频仍分配和释放内存而是直接复用这块已映射的内存。
提前取出一块内存举行内存共享。
b) 队列怎么做到高效?对外界而言,要求就是做到线程安全,就是每个线程都可以往里面提交。而假如仅仅为了线程安全而采用频仍加锁的方式,效率肯定不高。
答:做成无锁的环形队列。比如submit queue设置出长度为100的环形队列。可以把提交的index对100取模(index%100)得到当前是把这个提交的任务存储到了第几个,因此构建成了一个环(固然存储空间依然是连续的,但在执行的逻辑上面变成了1个环)。(传统队列通常必要锁来保护队列团体,因为插入操纵和取出操纵容易产生访问重叠的区域;而环形队列的插入操纵(写入任务)和 取出操纵(读取任务)互不干扰,同时不必要因为队列数量的变革导致重新分配存储空间)。

(4)io-uring的原理,内核在19年版本里为io-uring新增了3个体系调用。
先通过submit queue和complete queue把环境构建起来,然后在submit queue里一次一次的把事件setup,然后把事件register,当register所有事件后用enter把信息输入(push)进worker里处理。
a). io_uring_setup
b). io_uring_enter
c). io_uring_register
b). 把以上3个体系调用封装起来的库——liburing,一般就用这个库。
io-uring和liburing的关系,可以理解为io_uring提供裸接口,liburing是包装好的库。

(5)io-uring里的u是指user,指用户空间提供的ring。但此处都已经是体系调用了,为什么还是用户空间的ring?
答: register提供事件的时间,用户空间和内存空间是同一个内存,通过mmap方式建立。是针对用户空间对io操纵的环形队列。
二、实现io_uring + tcpserver

io_uring能为开发做什么?
答:
2.1 io_uring安装
(1)linux内核版本最好大于5.4
(2)安装下令:
git clone https://github.com/axboe/liburing.git
cd liburing
make
sudo make instal
2.2 代码实现

2.2.1 流程

1)server初始化,到listen()为止



2)构建submit squeue,拿到submit squeue头指针

3)通过io_uring_prep_accept实现accept()哀求的异步,向submit squeue增长节点。


4)while循环:
(a)通过io_uring_submit提交任务到worker;
(b)通过io_uring_wait_cqe拿到complete squeue头指针,然后看worker里有没有之前的结果
(c)通过io_uring_peek_batch_cqe把结果complete squeue里的结果带出来,功能类似epoll_wait。之后的for循环也是类似epoll_wait的返回值操纵

(d)针对for循环的结果处理
2.2.2 第一次io_uring代码——构建出io_uring可以正常运行的代码框架

  1. #include <stdio.h>
  2. #include <liburing.h>
  3. #include <netinet/in.h>
  4. #include <string.h>
  5. #include <unistd.h>
  6. int init_server(unsigned short port) {       
  7.         int sockfd = socket(AF_INET, SOCK_STREAM, 0);       
  8.         struct sockaddr_in serveraddr;       
  9.         memset(&serveraddr, 0, sizeof(struct sockaddr_in));       
  10.         serveraddr.sin_family = AF_INET;       
  11.         serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);       
  12.         serveraddr.sin_port = htons(port);       
  13.         if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) {               
  14.                 perror("bind");               
  15.                 return -1;       
  16.         }       
  17.         listen(sockfd, 10);
  18.        
  19.         return sockfd;
  20. }
  21. #define ENTRIES_LENGTH                1024
  22. //#define BUFFER_LENGTH                1024
  23. int main(int argc, char *argv[]) {
  24.         unsigned short port = 9999;
  25.         int sockfd = init_server(port);
  26. // 创建一个uring参数
  27.         struct io_uring_params params;
  28.         memset(&params, 0, sizeof(params));
  29. // 创建一个uring
  30.         struct io_uring ring;
  31. // 初始化一个uring,把submit queue和complete queue这两个队列构建起来(已执行了io_uring_setup)
  32.         io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);
  33. // 拿到submit queue环形队列的头指针(已执行了io_uring_setup)
  34.         struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
  35.        
  36. #if 0
  37.         struct sockaddr_in clientaddr;       
  38.         socklen_t len = sizeof(clientaddr);
  39.         accept(sockfd, (struct sockaddr*)&clientaddr, &len);
  40. #else
  41.        
  42.         struct sockaddr_in clientaddr;       
  43.         socklen_t len = sizeof(clientaddr);
  44. // 向submit queue里增加1个节点(已执行了io_uring_register)
  45.         io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)&clientaddr, &len, 0);
  46. //        set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);
  47.        
  48. #endif
  49.         while(1){
  50. // 把所有事件一起提交给worker(已执行了io_uring_enter)
  51.                 io_uring_submit(&ring);
  52.                 printf("Before");
  53. // complete queue查看有无woker完成的结果
  54.                 struct io_uring_cqe *cqe;
  55.                
  56. // 拿到complete queue环形队列头指针
  57.                 io_uring_wait_cqe(&ring, &cqe);
  58.                 printf("After");
  59.         struct io_uring_cqe *cqes[128];
  60. // 取complete queue内部的节点,效果类似epoll_wait()
  61.         int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);
  62.                 int i = 0;
  63.                 for (i = 0; i < nready; i++){
  64.                         printf(" io_uring_peek_batch_cqe\n");
  65. // for循环的结果
  66.                 struct io_uring_cqe *entries = cqes[i];
  67.                 int clientfd = entries->res;
  68.                 printf("clientfd: %d\n", clientfd);
  69.                 }
  70. // 清空cqe里已经运行过的结果
  71.                 io_uring_cq_advance(&ring, nready);
  72.         }
  73. }
复制代码
2.2.2 第二次io_uring代码——可以连续举行连接哀求

代码逻辑:起首初始化1个server,初始化1个ring,首次执行的时间通过设置set_event_accept往submit queue里增长1个节点,下面进入while循环,循环里在每次accept以后在事件处理完后清空,再次通过set_event_accept增长1个节点。
提问1:sq(submit sequeue)的entry(节点)与cq(complete sequeue)的entry有什么关系?
答:sq和cq共用一块内存,但是是同一块内存的两个独立区域,彼此互不干扰
提问2:io_uring_cq_advance为什么没把sqe里设置的set_event_accept()内数据清空?
答:io_uring_cq_advance只清空complete sequeue的数据。
提问3:io_uring和epoll区别?
答:epoll在每次设置完后可以等待时间触发,不必要进一步修改就会连续有事件处理;而io_uring是每次处理完事件后必要再次设置后才能处理下一个事件。在设计思路上,就是reactor和proactor的区别。
  1. #include <stdio.h>
  2. #include <liburing.h>
  3. #include <netinet/in.h>
  4. #include <string.h>
  5. #include <unistd.h>
  6. #define EVENT_ACCEPT   0
  7. #define EVENT_READ       1
  8. #define EVENT_WRITE     2
  9. struct conn_info{    // sizeof()是8,原因是int 是4,2个int是8
  10.         int fd;
  11.         int event;
  12. };
  13. int init_server(unsigned short port) {       
  14.         int sockfd = socket(AF_INET, SOCK_STREAM, 0);       
  15.         struct sockaddr_in serveraddr;       
  16.         memset(&serveraddr, 0, sizeof(struct sockaddr_in));       
  17.         serveraddr.sin_family = AF_INET;       
  18.         serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);       
  19.         serveraddr.sin_port = htons(port);       
  20.         if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) {               
  21.                 perror("bind");               
  22.                 return -1;       
  23.         }       
  24.         listen(sockfd, 10);
  25.        
  26.         return sockfd;
  27. }
  28. #define ENTRIES_LENGTH                1024
  29. int set_event_accept(struct io_uring *ring, int fd, struct sockaddr *addr,
  30.                                         socklen_t *addrlen, int flags){
  31.         struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
  32.         struct conn_info accept_info = {
  33.                 .fd = fd,                          // .fd 明确指定成员并初始化它的值
  34.                 .event = EVENT_ACCEPT,     //  .event 明确指定成员并初始化它的值
  35.         };
  36.        
  37.         io_uring_prep_accept(sqe, fd, (struct sockaddr*)addr, addrlen, flags);
  38.         memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
  39. }
  40. int main(int argc, char *argv[]) {
  41.         unsigned short port = 9999;
  42.         int sockfd = init_server(port);
  43.         struct io_uring_params params;
  44.         memset(&params, 0, sizeof(params));
  45.         struct io_uring ring;
  46.         io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);
  47.        
  48.         struct sockaddr_in clientaddr;       
  49.         socklen_t len = sizeof(clientaddr);
  50.     set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);
  51.         while(1){
  52.                 io_uring_submit(&ring);
  53.                 printf("Before");
  54.                 struct io_uring_cqe *cqe;
  55.                
  56.                 io_uring_wait_cqe(&ring, &cqe);
  57.                 printf("After");
  58.         struct io_uring_cqe *cqes[128];
  59.         int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);
  60.                 int i = 0;
  61.                 for (i = 0; i < nready; i++){
  62.                         printf(" io_uring_peek_batch_cqe\n");
  63.                         struct io_uring_cqe *entries = cqes[i];
  64.                         struct conn_info result;
  65.                         memcpy(&result, &entries->user_data, sizeof(struct conn_info));
  66.                
  67.                         if (result.event == EVENT_ACCEPT){
  68.                                 set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);
  69.                                 printf("set_event_accept\n");
  70.                         }
  71.                 }
  72.                 io_uring_cq_advance(&ring, nready);
  73.         }
  74. }
复制代码
2.2.3 第三次io_uring代码——可以收发数据

  1. #include <stdio.h>
  2. #include <liburing.h>
  3. #include <netinet/in.h>
  4. #include <string.h>
  5. #include <unistd.h>
  6. #include <unistd.h>
  7. #define EVENT_ACCEPT   0
  8. #define EVENT_READ       1
  9. #define EVENT_WRITE     2
  10. struct conn_info{    // sizeof()是8,原因是int 是4?2个int是8
  11.         int fd;
  12.         int event;
  13. };
  14. int init_server(unsigned short port) {       
  15.         int sockfd = socket(AF_INET, SOCK_STREAM, 0);       
  16.         struct sockaddr_in serveraddr;       
  17.         memset(&serveraddr, 0, sizeof(struct sockaddr_in));       
  18.         serveraddr.sin_family = AF_INET;       
  19.         serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);       
  20.         serveraddr.sin_port = htons(port);       
  21.         if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) {               
  22.                 perror("bind");               
  23.                 return -1;       
  24.         }       
  25.         listen(sockfd, 10);
  26.        
  27.         return sockfd;
  28. }
  29. #define ENTRIES_LENGTH                1024
  30. #define BUFFER_LENGTH           1024
  31. int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr,
  32.                                         socklen_t *addrlen, int flags){
  33.         struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
  34.         struct conn_info accept_info = {
  35.                 .fd = sockfd,                          // .fd 明确指定成员并初始化它的值
  36.                 .event = EVENT_ACCEPT,     //  .event 明确指定成员并初始化它的值
  37.         };
  38.        
  39.         io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)addr, addrlen, flags);
  40.         memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
  41. }
  42. int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags){
  43.         struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
  44.         struct conn_info accept_info = {
  45.                 .fd = sockfd,                          // .fd 明确指定成员并初始化它的值
  46.                 .event = EVENT_READ,     //  .event 明确指定成员并初始化它的值
  47.         };
  48.        
  49.         io_uring_prep_recv(sqe, sockfd, buf, len, flags);
  50.         memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
  51. }
  52. int set_event_send(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags){
  53.         struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
  54.         struct conn_info accept_info = {
  55.                 .fd = sockfd,                          // .fd 明确指定成员并初始化它的值
  56.                 .event = EVENT_WRITE,     //  .event 明确指定成员并初始化它的值
  57.         };
  58.        
  59.         io_uring_prep_send(sqe, sockfd, buf, len, flags);
  60.         memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
  61. }
  62. int main(int argc, char *argv[]) {
  63.         unsigned short port = 9998;
  64.         int sockfd = init_server(port);
  65.         struct io_uring_params params;
  66.         memset(&params, 0, sizeof(params));
  67.         struct io_uring ring;
  68.         io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);
  69.        
  70.         struct sockaddr_in clientaddr;       
  71.         socklen_t len = sizeof(clientaddr);
  72.         set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);
  73.         char buffer[BUFFER_LENGTH] = {0};
  74.         while(1){
  75.                 io_uring_submit(&ring);
  76.                 printf("Before");
  77.                 struct io_uring_cqe *cqe;
  78.                
  79.                 io_uring_wait_cqe(&ring, &cqe);
  80.                 printf("After");
  81.         struct io_uring_cqe *cqes[128];
  82.         int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);
  83.                 int i = 0;
  84.                 for (i = 0; i < nready; i++){
  85.                         printf(" io_uring_peek_batch_cqe\n");
  86.                         struct io_uring_cqe *entries = cqes[i];
  87.                         struct conn_info result;
  88.                         memcpy(&result, &entries->user_data, sizeof(struct conn_info));
  89.                
  90.                         if (result.event == EVENT_ACCEPT){
  91.                                 set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);
  92.                                 printf("set_event_accept\n");
  93.                                 int connfd = entries->res;
  94.                                 set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);
  95. //收数据                               
  96.                         } else if (result.event == EVENT_READ){
  97.                                 int ret = entries->res;
  98.                                 printf("set_event_recv ret:  %d %s\n",  ret, buffer);
  99.                                
  100.                                 if (ret == 0){
  101.                                         close(result.fd);
  102.                                 } else if (ret > 0){
  103.                                         //保证了连续发送
  104.                                         set_event_send(&ring, result.fd, buffer, BUFFER_LENGTH, 0);               
  105.                                 } else if (result.event == EVENT_WRITE){
  106.                                         int ret = entries->res;                               
  107.                                         printf("set_event_send ret:  %d %s\n",        ret, buffer);
  108.                                        
  109.                                         set_event_send(&ring, result.fd, buffer, BUFFER_LENGTH, 0);       
  110.                                 }
  111.                 }
  112.                 io_uring_cq_advance(&ring, nready);
  113.         }
  114. }
复制代码
2.3 报错备注:

(1)通过“htop”看到有cup达到100%,很有大概是while()进入死循环。
(2)reactor和proactor的区别是什么?至少找出3点区别
答:reactor是一个事件对应一个动作,proactor是提交事件后结果直接给出来了。
             

五、面试:

(1)到现在纵然学完了,却发现不知道怎么用,会有混沌的状态,有关面试的问题:
a)面试的时间,社招是投50份简历大概有4~5份面试机会,有1~2个offer。校招是投200份简历该带有15~20个面试机会,有1~2个offer。比如面试十频频没过,阐明技术水平有问题。
b)面试时间。假如面试低于30分钟,面试成功的概率低于5%,面试时间低于45分钟,面试成功概率小于30%。
c)所以进步面试时间、面试竣事要复盘(面了什么问题,你是怎么表述的)。
(2)假如UDP的开发如何做?说不清楚可以通过github的开源代码举行梳理,然后找老师讨论。总结到技术文章里
(3)TCP和UDP的区别?
答:我们有5点:<1>第1个TCP基于连接的,UDP是基于数据报的。<2>第2点在分包和粘包的解决方案上是不一样的。TCP可以通过2种方式,1种方式是通过在TCP数据包的前面加上一个长度,另一种可以做分隔符。而UDF必须为每个包加上一个id值。<3>第3点在并发的方式上面也是不一样的,TCP可以直接通过epoll直接管理io就可以,而对于UDP我们必要通过模拟TCP的方式建立连接。<4>第4点在使用场景上面,UDF是做实时性比力强的方面,比如实时对战的游戏。另有就是在数据下载的使用场景下,UDP由于不带拥塞控制可以把整个网络耗尽加快下载速率,比如迅雷会员。<5>udp更偏向于短连接,TCP适合长连接。(解释:TCP基于连接,UDP是基于数据包。(a)TCP情况下server有一个连接、客户端有一个连接,所以连接是一对一的,服务端通过send(),客户端通过recv(fd);(b)TCP像水管一样连接,所以是有次序的,就是先发的先收到。而UDP不一样,UDP没有次序,必要自己为包定义一个id和明确确认机制,以此知道这是第几个包。(c)并发上的区别,UDP在做并发上面必要模拟TCP从而做到并发建立连接,这是为了更好的管理每个io。就是每个客户建立连接,然后通过其他的fd把这连接发出来,从而构建了与TCP不同的方式。(d)使用场景的区别,UDP的2个上风TCP没有的,第1是实时性,TCP为了包管次序所以做了耽误确认,而UDP可以快速确认每个包。或者第2个是下载传输数据的情况,因为UDP是不带拥塞控制的,可以把整个网络耗尽加快下载速率,比如迅雷会员。(e)udp更偏向于短连接,特别是发起一次哀求,比如发起DNS哀求然后返回ip不必要建立连接只要发包收包回包就可以了。TCP适合长连接,就是建立连接不停存在。)
(4)TCP的分包和粘包的解决办法?
答:(a)基于TCP是有次序的,把TCP数据包协议头的前面加几个字节当做长度。(b)做一个分隔符,比如在一个字节发完后加上"\r\n",对方在读的时间每次在读完后就判断有没有这个分隔符,从而做一个包的切割。而UDP在发送数据的时间必要我们在用户空间上自己为包定义一个id,而且要明确确认机制,以此知道这是第几个包
(5)网络部分怎么学?
答:除了网卡驱动没有讲,从网络协议栈、posix api、应用层的多线程、多进程、客户端。(1)总结。(2)把代码调通。
六、 随想或总结

(1)mmp提前申请一块内容,用来避免频仍内存分配。
(2)环形队列以取模(index%环形长度)的方式建立,作用是(a) 通过头指针(head)和尾指针(tail)使 取出操纵(读取任务)和 插入操纵(写入任务)和互不干扰,进步线程安全;(b) 避免因队列大小变革导致的反复的内存分配和内存释放。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

种地

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表