Linux高性能异步io,io_uring实现服务器

打印 上一主题 下一主题

主题 959|帖子 959|积分 2877

Linux系统中io_uring是2019年首次出现的一个新的异步IO框架,用于实现高效的异步IO操作。
1. io_uring为什么高效



  • 相较于常用的同步io,read/write/accpet等,io_uring使用两个队列来实现io操作,一个是提交队列Submission Queue(SQ),另一个是完成队列Completion Queue(CQ),SQ存放用户提交的io哀求,CQ存放内核io哀求完成后的结果,发起哀求后不须要壅闭等待io操作的结果,实现真正的异步。
  • 使用两个队列会带来两个问题,一个是使命用户空间和两个队列之间使命的频繁拷贝带来的性能消耗,另一个是线程安全问题。

    • 性能问题:io_uring创建时会使用mmap做内存映射,用户空间和内核空间共享一块内存;
    • 线程安全:io_uring使用环形队列,并且队列都使用原子型的头尾指针来管理队列中的条目;

  • io_uring 支持批量提交 I/O 哀求和批量获取完成结果,这淘汰了上下文切换和系统调用的开销,提高了效率。
2. io_uring怎样使用



  • Linux内核版本须要大于5.4,以更好地支持io_uring
  • io_uring的三个系统调用
    1. int io_uring_setup(u32 entries, struct io_uring_params *p);
    复制代码

    • 初始化一个 io_uring 实例。它在内核中创建和配置 io_uring 数据结构,并返回一个文件描述符,用于与该实例进行交互。
    1. int io_uring_register(unsigned int fd, unsigned int opcode, void *arg, unsigned int nr_args);
    复制代码
      

    • 用于向 io_uring 实例注册各种资源,如文件描述符、缓冲区、事件文件描述符等。这些注册操作可以提高 I/O 操作的效率,因为它们允许 io_uring 在执行 I/O 操作时直接访问这些预先注册的资源,而不须要在每次操作时重新设置。
    1. int io_uring_enter(unsigned int fd, unsigned int to_submit, unsigned int min_complete, unsigned int flags, sigset_t *sig);
    复制代码
      

    • 用于提交和等待 io_uring 队列中的 I/O 操作。这个系统调用允许用户提交新的 I/O 哀求,并等待这些哀求的完成,或者仅提交哀求而不等待。

  • 原始系统调用比较抽象,可以借助liburing来使用异步io,在Linux 5.4以上使用以下下令进行安装,低版本可能可以安装,但是不能正常运行。
           git clone https://github.com/axboe/liburing.git
    cd liburing
    ./configure
    make
    sudo make install
3. 使用liburing常用API构建异步服务器

  1. int io_uring_queue_init_params(unsigned entries, struct io_uring *ring,        struct io_uring_params *p);
复制代码


  • 初始化一个io_uring实例,并指定SQ和CQ中的条目数,通报io_uring的配置参数,该函数使用了系统调用io_uring_setup。
  1. struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring)
复制代码


  • 获取SQ中第一个空io_uring_sqe结构体,后续用户可以填充io哀求并提交给io_uring。
  1. io_uring_prep_accept(struct io_uring_sqe *sqe, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags)
复制代码


  • 预备一个 accept 系统调用的提交队列条目,sqe是指向要填充的 io_uring_sqe 结构体的指针,后续参数同accept4,非壅闭调用,只提交哀求,不等待返回结果,对于同步的send/recv也有对应的io_uring_prep_send/io_uring_prep_recv,底层使用io_uring_register系统调用。
  1. int io_uring_submit(struct io_uring *ring);
复制代码


  • 将提交队列条目(io_uring_sqe)一次性提交到内核进行处理。这些哀求会被内核调理并执行,执行完成后,结果会被放入CQ中,底层使用io_uring_enter系统调用。
  1. int io_uring_wait_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr)
复制代码


  • 壅闭等待CQ中的条目,结果存入cqe_ptr。
  1. unsigned io_uring_peek_batch_cqe(struct io_uring *ring,        struct io_uring_cqe **cqes, unsigned count);
复制代码


  • 批量获取CQ中的条目,支持一次性返回多个,可以类比epoll_wait,非壅闭。
  1. void io_uring_cq_advance(struct io_uring *ring, unsigned nr)
复制代码


  • 通知内核有多少个CQ中的条目已经被处理完成,可以被io_ring重用,在每次调用io_uring_peek_batch_cqe后,必须调用io_uring_cq_advance。
以下是完整实现代码:
  1. #include <stdio.h>
  2. #include <liburing.h>
  3. #include <netinet/in.h>
  4. #include <string.h>
  5. #include <unistd.h>
  6. #define EVENT_ACCEPT           0
  7. #define EVENT_READ                1
  8. #define EVENT_WRITE                2
  9. #define ENTRIES_LENGTH                1024
  10. #define BUFFER_LENGTH                1024
  11. struct conn_info {
  12.         int fd;
  13.         int event;
  14. };
  15. int init_server(unsigned short port) {       
  16.         int sockfd = socket(AF_INET, SOCK_STREAM, 0);       
  17.         struct sockaddr_in serveraddr;       
  18.         memset(&serveraddr, 0, sizeof(struct sockaddr_in));       
  19.         serveraddr.sin_family = AF_INET;       
  20.         serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);       
  21.         serveraddr.sin_port = htons(port);       
  22.         if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) {               
  23.                 perror("bind");               
  24.                 return -1;       
  25.         }       
  26.         listen(sockfd, 10);
  27.        
  28.         return sockfd;
  29. }
  30. int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags) {
  31.         struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
  32.         struct conn_info accept_info = {
  33.                 .fd = sockfd,
  34.                 .event = EVENT_READ,
  35.         };
  36.        
  37.         io_uring_prep_recv(sqe, sockfd, buf, len, flags);
  38.         memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
  39. }
  40. int set_event_send(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags) {
  41.         struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
  42.         struct conn_info accept_info = {
  43.                 .fd = sockfd,
  44.                 .event = EVENT_WRITE,
  45.         };
  46.        
  47.         io_uring_prep_send(sqe, sockfd, buf, len, flags);
  48.         memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
  49. }
  50. int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
  51.         struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
  52.         struct conn_info accept_info = {
  53.                 .fd = sockfd,
  54.                 .event = EVENT_ACCEPT,
  55.         };
  56.        
  57.         io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)addr, addrlen, flags);
  58.         memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
  59. }
  60. int main(int argc, char *argv[]) {
  61.         unsigned short port = 9999;
  62.         int sockfd = init_server(port);
  63.         struct io_uring_params params;
  64.         memset(&params, 0, sizeof(params));
  65.         struct io_uring ring;
  66.         io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);
  67.         struct sockaddr_in clientaddr;       
  68.         socklen_t len = sizeof(clientaddr);
  69.         set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);
  70.         char buffer[BUFFER_LENGTH] = {0};
  71.         while (1) {
  72.                 io_uring_submit(&ring);
  73.                 struct io_uring_cqe *cqe;
  74.                 io_uring_wait_cqe(&ring, &cqe);
  75.                 struct io_uring_cqe *cqes[128];
  76.                 int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);  // epoll_wait
  77.                 int i = 0;
  78.                 for (i = 0;i < nready;i ++) {
  79.                         struct io_uring_cqe *entries = cqes[i];
  80.                         struct conn_info result;
  81.                         memcpy(&result, &entries->user_data, sizeof(struct conn_info));
  82.                         if (result.event == EVENT_ACCEPT) {
  83.                                 set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);
  84.                                 int connfd = entries->res;
  85.                                 set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);
  86.                         } else if (result.event == EVENT_READ) {
  87.                                 int ret = entries->res;
  88.                                 if (ret == 0) {
  89.                                         close(result.fd);
  90.                                 } else if (ret > 0) {
  91.                                         set_event_send(&ring, result.fd, buffer, ret, 0);
  92.                                 }
  93.                         }  else if (result.event == EVENT_WRITE) {
  94.                                 int ret = entries->res;
  95.                                 set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0);
  96.                         }
  97.                        
  98.                 }
  99.                 io_uring_cq_advance(&ring, nready);
  100.         }
  101. }
复制代码
4. io_uring机制对比epoll机制



  • epoll通过epoll_ctl增加关注的事件之后,只要不删除,下一次执行epoll_wait时fd发生事件仍然能够返回,但是io_uring注册事件并返回事件后,如果须要继承关注事件,则须要重新注册;
  • epoll_wait返回事件时,是检测到io事件发生,用户层仍然须要对io进行详细的操作,是reactor模式;io_uring的CQ返回时,io操作已经完成,是proactor模式;
  • io_uring可以一次性提交多个io哀求,淘汰系统调用次数;
末了,保举一个Linux c/c++内容学习平台
   https://github.com/0voice

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

万有斥力

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表