ToB企服应用市场:ToB评测及商务社交产业平台
标题:
使用C++20协程和io_uring优雅地实现异步IO
[打印本页]
作者:
杀鸡焉用牛刀
时间:
2024-5-15 12:51
标题:
使用C++20协程和io_uring优雅地实现异步IO
距离2020年已颠末去很久了,各大编译器对于C++20各项尺度的支持也日趋完善,无栈协程也是此中之一,所以我就实验着拿协程与io_uring实现了一下proactor模式,这篇文章用来记录一下我的设计和想法。除此之外,我们能在网络上找到许多良好的C++20协程的教程以及许多良好的协程应用(库),但从协程入门到架构出成熟的应用(库)之间还存在着不小的鸿沟,而直接去啃大型工程的源代码绝对不算是一种高服从的学习方式。所以,如果这篇文章能够在这方面提供一定的帮助的话那就再好不过了。
正如上所述,这篇文章是介绍基于C++20协程实现异步IO的,而不是介绍C++20协程的,因此有一定的阅读门槛。在阅读之前,你应当至少熟悉一下C++20协程。
为什么要使用协程
因为协程能够让我们像写同步IO那样来实现异步IO,如下所示:
auto foo(tcp_connection connection) -> task<void> {
char buffer[1024];
int result = co_await connection.recv(buffer, sizeof(buffer));
// do something...
result = co_await connection.send(buffer, result);
// do something...
co_return;
}
复制代码
如果我们合理地实现了协程的挂起、恢复等操作,那么当我们执行co_await connection.recv时,我们实际上希望代码执行的操作如下:
告诉操作系统,监听recv操作,等待对方发送数据;
挂起当前协程;
去处理别的变乱。
当操作系统吸取到recv的数据时,执行以下操作:
处理recv,把数据读进来;
恢复之前挂起的协程,从挂起的地方恢复执行。
这就是我们需要协程做的变乱。如果你熟悉reactor模式的话,这应该并不陌生——我们只是把回调函数换成了协程而已。那么回到这一部分的标题——我们为什么要使用协程而不是回调函数呢?——因为使用协程写出来的代码更好看,也更好维护,仅此而已。
关于为什么要使用异步IO:异步IO能够进步程序的吞吐量。试想一下一台基于同步IO的HTTP服务器,一种不难想到的实现方式是每accept一个连接,就创建一个新的线程来处理这个连接的IO,最后当这个连接断开时销毁这个线程。这么实现当然可以,但创建和销毁线程的开销是很大的,而且这要求线程调度器能够很好地分配线程之间的时间。使用IO多路复用的方式能够使用有限(以致单线程)处理许多连接的IO,而不至于浪费过多的资源。
关于协程和回调函数的性能:我想二者应当是差不多的,或者协程可能还会更差一点,因为挂起协程和恢复协程需要执行一些额外操作。不过既然性能还没有紧张到需要去抠dpdk,那么和这一点点的性能上风相比较的话,代码的可维护性和可读性绝对也是不容忽视的问题。
关于异步IO的性能:我们通常讲异步IO性能更好指的是吞吐量,而不是低延时。不论是reactor模式还是proactor模式,其设计主旨都是要让CPU在等待IO的时候去处理别的变乱,不要让CPU闲下来。如果低延时很紧张的话,应当考虑使用同步IO与轮询的方式。
设计思路
根据第一部分,设计的基调就能够定下来了。我们重新考虑一下需要做的变乱:
当我们执行到co_await read(...)等异步IO时,挂起当前协程,去处理其他变乱;
当异步IO执行完毕时,恢复协程的执行。
细致思索一下上述两点,我们就能够得到全部要做的变乱:
我们需要适时挂起协程,所以起首我们要实现协程task;
协程可能会调用协程,所以需要维护一下协程的调用栈(我是在promise里维护的);
协程是用来处理异步IO的,所以我们需要有一些组件来处理io_uring的IO(我是在io_context_worker中处理的);
当异步IO执行完毕时,需要有什么东西恢复协程的执行(这也是在io_context_worker中处理的);
当整个协程执行完毕时,需要销毁协程(这也是在io_context_worker中处理的)。
在继续阅读之前,我先贴一下代码。对照着代码看的话会舒服一些:
GitHub
。
task与promise
task和promise均在coco/task.hpp中定义。我对task的定位正如协程最基本的功能——能够挂起和恢复的函数。task类本身只是对std::coroutine_handle的简易封装。在这里我只介绍一下task的operator co_await。
task的operator co_await只是返回task_awaitable,所以co_await处理的重点实际上是在task_awaitable中实现的。考虑一下,当我们co_await一个task时,我们究竟是在干什么:
挂起当前协程
维护协程的调用栈
启动被co_await的协程
在task_awaitable::await_suspend()中很轻易看出这三点:
template <class T>
template <class Promise>
auto task_awaitable<T>::await_suspend(
std::coroutine_handle<Promise> caller) noexcept -> coroutine_handle {
// Set caller for this coroutine.
promise_base &base = static_cast<promise_base &>(m_coroutine.promise());
promise_base &caller_base = static_cast<promise_base &>(caller.promise());
base.m_caller_or_top = &caller_base;
// Maintain stack bottom and top.
promise_base *stack_bottom = caller_base.m_stack_bottom;
assert(stack_bottom == stack_bottom->m_stack_bottom);
base.m_stack_bottom = stack_bottom;
stack_bottom->m_caller_or_top = &base;
return m_coroutine;
}
复制代码
这里偏重讲一下维护协程的调用栈。对于协程而言,至少存在一个协程,它是在协程外创建的(比如main函数)。因为它不在协程中,所以它也无法被co_await,这个协程我们称之为协程的栈底。在这个协程的执行过程中,它可能会创建和执行新的协程。当新的协程执行完毕时,它们会被整理,并且将执行权交还给调用者,这与普通函数的调用栈是一样的,只不过这个功能需要我们自己来实现。
因为promise在内存中的位置是不可移动的(我禁止了promise的拷贝与移动),所以我直接采用了类似链表的方式将协程的调用栈串了起来。在promise_base中,有两个成员变量用来维护这个调用栈:
promise_base *m_caller_or_top;
promise_base *m_stack_bottom;
复制代码
因为第一个变量被复用了(具备差别的含义),所以可能有点乱。对于
栈底协程
而言,m_caller_or_top指向当前调用栈的
栈顶协程
,对于其他协程而言,m_caller_or_top指向自己的调用者(父协程)。这么设计是因为
栈底协程
不存在调用者,所以就干脆用这个变量存一下栈顶了。m_stack_bottom顾名思义,就是指向栈底的协程。对于
栈底协程
而言,这个变量指向的就是它自己了。
有了m_caller_or_top,当一个协程执行完毕时,就能方便地找到它的父协程并交换执行权。有了m_stack_bottom和m_caller_or_top,我们就能很方便地找到协程的栈底和栈顶。当需要恢复task时,就能够保证总是恢复栈顶的协程。
当协程执行完毕时,需要将控制权交还给父协程。我们考虑一下交还控制权需要做的变乱:
维护调用栈,变更栈顶
如果不是栈底,则恢复父协程的执行
协程执行完毕时会去实验执行promise的final_suspend(),因此这部分代码在promise的final_suspend()中实现。final_suspend()返回的范例叫promise_awaitable,其对应的代码如下:
template <class Promise>
auto promise_awaitable::await_suspend(
std::coroutine_handle<Promise> coroutine) noexcept
-> std::coroutine_handle<> {
promise_base &base = static_cast<promise_base &>(coroutine.promise());
promise_base *stack = base.m_stack_bottom;
// Stack bottom completed. Nothing to resume.
if (stack == &base)
return std::noop_coroutine();
// Set caller coroutine as the top of the stack.
promise_base *caller = base.m_caller_or_top;
stack->m_caller_or_top = caller;
// Resume caller.
return caller->m_coroutine;
}
复制代码
这段代码应该非常易懂,不过我们很轻易联想到一个问题:既然交还了控制权,那么它是在何时销毁的?
实在这也不难想到,协程是由父协程销毁的。协程的返回值存放在promise中,当父协程co_await sometask时,父协程还需要读取子协程的promise以获取返回值。当子协程task析构时,子协程才真正被销毁。
io_context的设计
既然协程需要异步地处理IO,那么一定需要个处理IO的地方,就是io_context。io_context维护了一个线程池,线程池中每一个线程均执行一个worker,每个worker均维护一个io_uring来处理本线程的IO事件和协程。当需要提交新的task给线程池的时候,由io_context分配给某一个worker执行。
这听起来和reactor模式好像没啥区别,用epoll写reactor模式的时候基本上也是这么干的,这是因为我本来就是从reactor模式那边搬过来的。不过相比于reactor模式,这么做还是有不少细节要处理的。在使用io_uring时,每次我们启动异步IO时,都需要获取到io_uring对象——要使用io_uring_prep_系列函数,我们必须从io_uring对象中获取一个sqe。而如上所述,io_uring对象在worker中,这就造成了一个贫苦:我们无法在io_context给task分配worker的时候将io_uring对象的引用(指针)传递给task。虽然使用全局变量不失为一种选择,但我不想这么做,因为大概使用者想要在一个进程中创建几个差别的io_context用呢。
虽然无法直接将io_uring的引用传递给task,但尚有一种方法可以举行交互。在全部awaitable的await_suspend中,我们可以拿到当前协程的coroutine_handle,而在io_context中,我们也能拿到task的coroutine_handle,因此可以通过promise来传递io_uring的引用。
详细在实现时,我没有传递io_uring的引用,而是传递了worker的指针。这么做是因为当初我想同时支持IOCP,传递worker可以省掉一些贫苦,虽然厥后放弃了。worker的指针只被放在协程栈的栈底,这么做是因为当协程在差别worker之间转移时,能够很方便地修改协程所属的worker(只需要修改栈底就可以了),尽管厥后也没有实现work-stealing队列。在promise中,处理worker的方法如下所示:
/// \brief
/// Set I/O context for current coroutine.
/// \param[in] io_ctx
/// I/O context to be set for current coroutine.
auto set_worker(io_context_worker *io_ctx) noexcept -> void {
m_stack_bottom->m_worker.store(io_ctx, std::memory_order_release);
}
/// \brief
/// Get I/O context for current coroutine.
/// \return
/// I/O context for current coroutine.
[[nodiscard]] auto worker() const noexcept -> io_context_worker * {
return m_stack_bottom->m_worker.load(std::memory_order_acquire);
}
复制代码
不过这么做也有一个缺点,就是io_context侵入了promise的设计,使得task必须在io_context中才能发挥作用。
awaitable的设计
awaitable在coco/io.hpp中定义。各种awaitable的设计就比较简单了。以read_awaitable为例,它在await_suspend()中获取当前协程所属的worker,然后启用异步IO,如下所示:
template <class Promise>
auto await_suspend(std::coroutine_handle<Promise> coro) noexcept -> bool {
m_userdata.coroutine = coro.address();
return this->suspend(coro.promise().worker());
}
复制代码
获取了当前协程的worker后,就转入this->suspend()函数中去执行了。suspend()方法主要的工作是启动异步IO操作,并挂起当前协程:
auto coco::read_awaitable::suspend(io_context_worker *worker) noexcept -> bool {
assert(worker != nullptr);
m_userdata.cqe_res = 0;
m_userdata.cqe_flags = 0;
io_uring *ring = worker->io_ring();
io_uring_sqe *sqe = io_uring_get_sqe(ring);
while (sqe == nullptr) [[unlikely]] {
io_uring_submit(ring);
sqe = io_uring_get_sqe(ring);
}
io_uring_prep_read(sqe, m_file, m_buffer, m_size, m_offset);
sqe->user_data = reinterpret_cast<uint64_t>(&m_userdata);
int result = io_uring_submit(ring);
if (result < 0) [[unlikely]] { // Result is -errno.
m_userdata.cqe_res = result;
return false;
}
return true;
}
复制代码
我没有把启用异步IO部分放到模板函数中,这是因为对于各种差别的IO操作,这部分的代码实际上大同小异。但考虑到这部分代码的长度,放到模板中可能会导致比较严重的二进制膨胀,所以就单独拿出来放到.cpp文件中了。
如果去翻我之前的commit记录的话,会发现早先我并没有把各种awaitable暴露出来,而是让各种异步操作(比如connection.receive())返回task。厥后将awaitable暴露出来是考虑到诸如read、write等操作可能会被频繁地调用,而每次创建一个协程都需要申请一次堆内存,在循环中执行的话可能对运行服从有比较严重的影响。
关于代码
再放一遍代码地址:
GitHub
这份代码不长,总共两三千行,而且此中一多半都是注释,结合本文的话应该不会很难读。这本身只是一份实行性质的代码,同时我希望它适合拿来学习,所以我并没有计划塞入太多的功能。除此之外,我不是很建议你拿来放到工程中使用,因为我可能会一时兴起做出一些breaking change。如果你真的有这个需要的话,我建议你fork一份代码自己维护。
一些可能会被问到的问题
为什么没有实现UDP相关的IO?
因为io_uring似乎还没有支持recvfrom,至少我实现的时候还没有。
为什么不使用mmap和内核共享内存/为什么不向io_uring注册文件描述符等性能相关的问题
因为我不是io_uring专家。我写这个库的目标是学习用C++20的协程架构一个异步IO库,做这些性能优化会加大架构难度,并且花掉我大量的头发和时间,使我本不茂密的头发雪上加霜。除此之外,你会发现我也没有实现work-stealing队列,原因同理。Round Robin的性能虽然不至于最优,但也不会太差。
考不考虑加上HTTP支持?
考虑过,太懒所以放弃了。一方面是手写HTTP parser还是挺贫苦的,就算能用bison自动生成也还得去啃RFC。另一方面是,TCP作为一种基于流的协议,我没有想好怎样处理连接的缓存能够分身性能和使用的便捷性。如果你有这方面的需求的话,不妨先用着其他的HTTP parser,比如
llhttp
。
为什么没有实现yield?
我觉得异步IO一样寻常用不到这东西,所以就没写。如果需要的话就自己实现吧。
会支持Windows(IOCP)吗?
我有考虑过支持IOCP,但IOCP不支持定时器,我又不想删除Linux这边的timer,所以暂且没有这个想法。
作者你README写得好水啊
确实,我也觉得好水啊,有没有美意人帮忙修一修啊。
为什么不消中文写注释和README?
不消中文写注释是因为clang-format没法处理中文的断行。不消中文写README是因为懒,不想写两份README。说不定哪天心情好了就写一份中文版。
碎碎念
要不要塞张插图呢?
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4