利用C++20协程和io_uring优雅地实现异步IO
距离2020年已经过去很久了,各大编译器对于C++20各项标准的支持也日趋美满,无栈协程也是其中之一,所以我就尝试着拿协程与io_uring实现了一下proactor模式,这篇文章用来记载一下我的操持和想法。除此之外,我们能在网络上找到许多优秀的C++20协程的教程以及许多优秀的协程应用(库),但从协程入门到架构出成熟的应用(库)之间还存在着不小的鸿沟,而直接去啃大型工程的源代码绝对不算是一种高效率的学习方式。所以,如果这篇文章能够在这方面提供一定的资助的话那就再好不外了。正如上所述,这篇文章是介绍基于C++20协程实现异步IO的,而不是介绍C++20协程的,因此有一定的阅读门槛。在阅读之前,你应当至少熟悉一下C++20协程。
为什么要利用协程
因为协程能够让我们像写同步IO那样来实现异步IO,如下所示:
auto foo(tcp_connection connection) -> task<void> {
char buffer;
int result = co_await connection.recv(buffer, sizeof(buffer));
// do something...
result = co_await connection.send(buffer, result);
// do something...
co_return;
}如果我们合理地实现了协程的挂起、规复等利用,那么当我们执行co_await connection.recv时,我们实际上希望代码执行的利用如下:
[*]告诉利用系统,监听recv利用,等候对方发送数据;
[*]挂起当前协程;
[*]去处理别的变乱。
当利用系统接收到recv的数据时,执行以下利用:
[*]处理recv,把数据读进来;
[*]规复之前挂起的协程,从挂起的地方规复执行。
这就是我们必要协程做的变乱。如果你熟悉reactor模式的话,这应该并不生疏——我们只是把回调函数换成了协程而已。那么回到这一部分的标题——我们为什么要利用协程而不是回调函数呢?——因为利用协程写出来的代码更好看,也更好维护,仅此而已。
关于为什么要利用异步IO:异步IO能够提高程序的吞吐量。试想一下一台基于同步IO的HTTP服务器,一种不难想到的实现方式是每accept一个毗连,就创建一个新的线程来处理这个毗连的IO,最后当这个毗连断开时销毁这个线程。这么实现当然可以,但创建和销毁线程的开销是很大的,而且这要求线程调理器能够很好地分配线程之间的时间。利用IO多路复用的方式能够利用有限(甚至单线程)处理许多毗连的IO,而不至于浪费过多的资源。
关于协程和回调函数的性能:我想二者应当是差不多的,大概协程可能还会更差一点,因为挂起协程和规复协程必要执行一些额外利用。不外既然性能还没有紧张到必要去抠dpdk,那么和这一点点的性能上风相比较的话,代码的可维护性和可读性绝对也是不容忽视的问题。
关于异步IO的性能:我们通常讲异步IO性能更好指的是吞吐量,而不是低延时。岂论是reactor模式还是proactor模式,其操持主旨都是要让CPU在等候IO的时候去处理别的变乱,不要让CPU闲下来。如果低延时很重要的话,应当考虑利用同步IO与轮询的方式。
操持思路
根据第一部分,操持的基调就能够定下来了。我们重新考虑一下必要做的变乱:
[*]当我们执行到co_await read(...)等异步IO时,挂起当前协程,去处理其他变乱;
[*]当异步IO执行完毕时,规复协程的执行。
仔细思考一下上述两点,我们就能够得到全部要做的变乱:
[*]我们必要适时挂起协程,所以首先我们要实现协程task;
[*]协程可能会调用协程,所以必要维护一下协程的调用栈(我是在promise里维护的);
[*]协程是用来处理异步IO的,所以我们必要有一些组件来处理io_uring的IO(我是在io_context_worker中处理的);
[*]当异步IO执行完毕时,必要有什么东西规复协程的执行(这也是在io_context_worker中处理的);
[*]当整个协程执行完毕时,必要销毁协程(这也是在io_context_worker中处理的)。
在继承阅读之前,我先贴一下代码。对照着代码看的话会舒服一些:GitHub。
task与promise
task和promise均在coco/task.hpp中界说。我对task的定位正如协程最基本的功能——能够挂起和规复的函数。task类本身只是对std::coroutine_handle的简易封装。在这里我只介绍一下task的operator co_await。
task的operator co_await只是返回task_awaitable,所以co_await处理的重点实际上是在task_awaitable中实现的。考虑一下,当我们co_await一个task时,我们究竟是在干什么:
[*]挂起当前协程
[*]维护协程的调用栈
[*]启动被co_await的协程
在task_awaitable::await_suspend()中很容易看出这三点:
template <class T>
template <class Promise>
auto task_awaitable<T>::await_suspend(
std::coroutine_handle<Promise> caller) noexcept -> coroutine_handle {
// Set caller for this coroutine.
promise_base &base = static_cast<promise_base &>(m_coroutine.promise());
promise_base &caller_base = static_cast<promise_base &>(caller.promise());
base.m_caller_or_top = &caller_base;
// Maintain stack bottom and top.
promise_base *stack_bottom = caller_base.m_stack_bottom;
assert(stack_bottom == stack_bottom->m_stack_bottom);
base.m_stack_bottom = stack_bottom;
stack_bottom->m_caller_or_top = &base;
return m_coroutine;
}这里偏重讲一下维护协程的调用栈。对于协程而言,至少存在一个协程,它是在协程外创建的(好比main函数)。因为它不在协程中,所以它也无法被co_await,这个协程我们称之为协程的栈底。在这个协程的执行过程中,它可能会创建和执行新的协程。当新的协程执行完毕时,它们会被清算,并且将执行权交还给调用者,这与普通函数的调用栈是一样的,只不外这个功能必要我们本身来实现。
因为promise在内存中的位置是不可移动的(我禁止了promise的拷贝与移动),所以我直接接纳了类似链表的方式将协程的调用栈串了起来。在promise_base中,有两个成员变量用来维护这个调用栈:
promise_base *m_caller_or_top;
promise_base *m_stack_bottom;因为第一个变量被复用了(具备差别的含义),所以可能有点乱。对于栈底协程而言,m_caller_or_top指向当前调用栈的栈顶协程,对于其他协程而言,m_caller_or_top指向本身的调用者(父协程)。这么操持是因为栈底协程不存在调用者,所以就干脆用这个变量存一下栈顶了。m_stack_bottom顾名思义,就是指向栈底的协程。对于栈底协程而言,这个变量指向的就是它本身了。
有了m_caller_or_top,当一个协程执行完毕时,就能方便地找到它的父协程并互换执行权。有了m_stack_bottom和m_caller_or_top,我们就能很方便地找到协程的栈底和栈顶。当必要规复task时,就能够包管总是规复栈顶的协程。
当协程执行完毕时,必要将控制权交还给父协程。我们考虑一下交还控制权必要做的变乱:
[*]维护调用栈,变更栈顶
[*]如果不是栈底,则规复父协程的执行
协程执行完毕时会去尝试执行promise的final_suspend(),因此这部分代码在promise的final_suspend()中实现。final_suspend()返回的类型叫promise_awaitable,其对应的代码如下:
template <class Promise>
auto promise_awaitable::await_suspend(
std::coroutine_handle<Promise> coroutine) noexcept
-> std::coroutine_handle<> {
promise_base &base= static_cast<promise_base &>(coroutine.promise());
promise_base *stack = base.m_stack_bottom;
// Stack bottom completed. Nothing to resume.
if (stack == &base)
return std::noop_coroutine();
// Set caller coroutine as the top of the stack.
promise_base *caller = base.m_caller_or_top;
stack->m_caller_or_top = caller;
// Resume caller.
return caller->m_coroutine;
}这段代码应该非常易懂,不外我们很容易联想到一个问题:既然交还了控制权,那么它是在何时销毁的?
其实这也不难想到,协程是由父协程销毁的。协程的返回值存放在promise中,当父协程co_await sometask时,父协程还必要读取子协程的promise以获取返回值。当子协程task析构时,子协程才真正被销毁。
io_context的操持
既然协程必要异步地处理IO,那么必然必要个处理IO的地方,就是io_context。io_context维护了一个线程池,线程池中每一个线程均执行一个worker,每个worker均维护一个io_uring来处理本线程的IO事件和协程。当必要提交新的task给线程池的时候,由io_context分配给某一个worker执行。
这听起来和reactor模式好像没啥区别,用epoll写reactor模式的时候基本上也是这么干的,这是因为我本来就是从reactor模式那边搬过来的。不外相比于reactor模式,这么做还是有不少细节要处理的。在利用io_uring时,每次我们启动异步IO时,都必要获取到io_uring对象——要利用io_uring_prep_系列函数,我们必须从io_uring对象中获取一个sqe。而如上所述,io_uring对象在worker中,这就造成了一个麻烦:我们无法在io_context给task分配worker的时候将io_uring对象的引用(指针)通报给task。固然利用全局变量不失为一种选择,但我不想这么做,因为也许利用者想要在一个历程中创建几个差别的io_context用呢。
固然无法直接将io_uring的引用通报给task,但还有一种方法可以进行交互。在全部awaitable的await_suspend中,我们可以拿到当前协程的coroutine_handle,而在io_context中,我们也能拿到task的coroutine_handle,因此可以通过promise来通报io_uring的引用。
具体在实现时,我没有通报io_uring的引用,而是通报了worker的指针。这么做是因为当初我想同时支持IOCP,通报worker可以省掉一些麻烦,固然后来放弃了。worker的指针只被放在协程栈的栈底,这么做是因为当协程在差别worker之间转移时,能够很方便地修改协程所属的worker(只必要修改栈底就可以了),尽管后来也没有实现work-stealing队列。在promise中,处理worker的方法如下所示:
/// \brief
/// Set I/O context for current coroutine.
/// \param io_ctx
/// I/O context to be set for current coroutine.
auto set_worker(io_context_worker *io_ctx) noexcept -> void {
m_stack_bottom->m_worker.store(io_ctx, std::memory_order_release);
}
/// \brief
/// Get I/O context for current coroutine.
/// \return
/// I/O context for current coroutine.
[] auto worker() const noexcept -> io_context_worker * {
return m_stack_bottom->m_worker.load(std::memory_order_acquire);
}不外这么做也有一个缺点,就是io_context侵入了promise的操持,使得task必须在io_context中才能发挥作用。
awaitable的操持
awaitable在coco/io.hpp中界说。各种awaitable的操持就比较简朴了。以read_awaitable为例,它在await_suspend()中获取当前协程所属的worker,然后启用异步IO,如下所示:
template <class Promise>
auto await_suspend(std::coroutine_handle<Promise> coro) noexcept -> bool {
m_userdata.coroutine = coro.address();
return this->suspend(coro.promise().worker());
}获取了当前协程的worker后,就转入this->suspend()函数中去执行了。suspend()方法主要的工作是启动异步IO利用,并挂起当前协程:
auto coco::read_awaitable::suspend(io_context_worker *worker) noexcept -> bool {
assert(worker != nullptr);
m_userdata.cqe_res = 0;
m_userdata.cqe_flags = 0;
io_uring *ring = worker->io_ring();
io_uring_sqe *sqe= io_uring_get_sqe(ring);
while (sqe == nullptr) [] {
io_uring_submit(ring);
sqe = io_uring_get_sqe(ring);
}
io_uring_prep_read(sqe, m_file, m_buffer, m_size, m_offset);
sqe->user_data = reinterpret_cast<uint64_t>(&m_userdata);
int result = io_uring_submit(ring);
if (result < 0) [] { // Result is -errno.
m_userdata.cqe_res = result;
return false;
}
return true;
}我没有把启用异步IO部分放到模板函数中,这是因为对于各种差别的IO利用,这部分的代码实际上大同小异。但考虑到这部分代码的长度,放到模板中可能会导致比较严重的二进制膨胀,所以就单独拿出来放到.cpp文件中了。
如果去翻我之前的commit记载的话,会发现起初我并没有把各种awaitable暴袒露来,而是让各种异步利用(好比connection.receive())返回task。后来将awaitable暴袒露来是考虑到诸如read、write等利用可能会被频繁地调用,而每次创建一个协程都必要申请一次堆内存,在循环中执行的话可能对运行效率有比较严重的影响。
关于代码
再放一遍代码地址:GitHub
这份代码不长,统共两三千行,而且其中一多半都是注释,结合本文的话应该不会很难读。这本身只是一份实行性质的代码,同时我希望它适合拿来学习,所以我并没有操持塞入太多的功能。除此之外,我不是很发起你拿来放到工程中利用,因为我可能会一时鼓起做出一些breaking change。如果你真的有这个必要的话,我发起你fork一份代码本身维护。
一些可能会被问到的问题
[*]为什么没有实现UDP相干的IO?
因为io_uring似乎还没有支持recvfrom,至少我实现的时候还没有。
[*]为什么不利用mmap和内核共享内存/为什么不向io_uring注册文件描述符等性能相干的问题
因为我不是io_uring专家。我写这个库的目标是学习用C++20的协程架构一个异步IO库,做这些性能优化会加大架构难度,并且花掉我大量的头发和时间,使我本不茂密的头发雪上加霜。除此之外,你会发现我也没有实现work-stealing队列,原因同理。Round Robin的性能固然不至于最优,但也不会太差。
[*]考不考虑加上HTTP支持?
考虑过,太懒所以放弃了。一方面是手写HTTP parser还是挺麻烦的,就算能用bison自动生成也还得去啃RFC。另一方面是,TCP作为一种基于流的协议,我没有想好如那边理毗连的缓存能够分身性能和利用的便捷性。如果你有这方面的需求的话,不妨先用着其他的HTTP parser,好比llhttp。
[*]为什么没有实现yield?
我以为异步IO一般用不到这东西,所以就没写。如果必要的话就本身实现吧。
[*]会支持Windows(IOCP)吗?
我有考虑过支持IOCP,但IOCP不支持定时器,我又不想删除Linux这边的timer,所以暂且没有这个想法。
[*]作者你README写得好水啊
确实,我也以为好水啊,有没有好心人帮忙修一修啊。
[*]为什么不用中文写注释和README?
不用中文写注释是因为clang-format没法处理中文的断行。不用中文写README是因为懒,不想写两份README。说不定哪天心情好了就写一份中文版。
碎碎念
要不要塞张插图呢?
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]