Rust 如何实现 async/await

打印 上一主题 下一主题

主题 903|帖子 903|积分 2709

目录

异步编程在 Rust 中的地位非常高,很多 crate 尤其是多IO操作的都使用了 async/await.
首先弄清楚异步编程的几个基本概念:
Future

Future 代表一个可在未来某个时候获取返回值的 task,为了获取这个 task 的执行状况,Future 提供了一个函数用于判断该 task 是否执行返回。
  1. trait Future {
  2.   type Output;
  3.   fn poll(self: Pin<&mut self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
  4. }
复制代码
之所以没有设计为 trait 形式,主要是 clone 函数,受限于 Rust 的 trait object safety,trait 中的任何函数的参数或返回值如果包含 Self 且有 type bound Sized,则不符合 trait object safe 规范,这样的 trait 可以被定义,可以被实现,但是无法与 dyn 一起进行动态绑定。
而 clones 函数又是必须的,因为 future 可能还会接着调用 future 的 poll 方法,就需要再 clone 一个 context 传入。
或许可以用 Box 或者 Arc 之类的,但是这些都不比 raw pointer 灵活,所以最终 Rust 还是选择定义一个包含函数指针的 struct。
async/await

这两个关键字可以说是异步编程领域的标志。,但在 Rust 中这两个关键字只是起到语法糖的作用,并不是异步的核心。
async 用于快速创建 Future,不管是函数还是代码块或者lambda表达式,都可以在前面加上 async 关键字快速变成 Future。对于
  1. struct Waker {
  2.   waker: RawWaker
  3. }
  4. struct RawWaker {
  5.   data: *const (),
  6.   vtable: &'static RawWakerVTable
  7. }
  8. struct RawWakerVTable {
  9.   clone: unsafe fn(*const ()) -> RawWaker,
  10.   wake: unsafe fn(*const ()),
  11.   wake_by_ref: unsafe fn(*const ()),
  12.   drop: unsafe fn(*const ())
  13. }
复制代码
编译器会自动生成类似下面的代码
  1. async fn bar() {
  2.   foo().await;
  3. }
复制代码
Tips:上面的代码可以在 Rust Playground 里面点生成 HIR 看到。
Executor

前面讲到 wake 的时候,其实现与具体的 executor 相关,但是我觉得如果不从 executor 的实现角度看一下比较难以理解,只能浅显地知道 wake 是告诉 executor 准备再 poll 一遍。
Rust 中我知道的 async runtime lib 就是 futures-rs 和 tokio,前者在 GitHub 上是 rust-lang 官方组织推出的 repo,而后者虽然不清楚是否有官方参与,但是功能明显比前者丰富,据我所知使用异步的项目大部分都是使用 tokio。
我这里选择更简单的 futures-rs 讲一下其 executor 的实现,虽然其更加轻量但起码也是官方推出的,有质量保证。
Waker struct 到 ArcWake trait

futures-rs 还是将标准库里面的 Waker 封装成了 ArcWake trait,并且是 pub 的。和 raw pointer 打交道毕竟是 unsafe 的,与其满篇的 unsafe 乱飞,不如将 unsafe 限制在一定的范围内。
Waker 本质上是一个变量的指针(data)带着四个函数指针的结构体(RawWakerVTable),因此在定义函数指针时只需要将指针强转成实现某个 trait 的泛型,再调用该 trait 的对应方法不就可以了。以 wake 函数为例:
  1. fn bar() -> impl Future {
  2.     std::future::from_generator(move |mut _task_context| {
  3.         let _t = {
  4.             match std::future::IntoFuture::into_future(foo()) {
  5.                 mut __awaitee => loop {
  6.                     match unsafe {
  7.                         std::future::Future::poll(
  8.                             std::pin::Pin::new_unchecked(&mut __awaitee),
  9.                             std::future::get_context(_task_context),
  10.                         )
  11.                     } {
  12.                         std::task::Poll::Ready { 0: result } => break result,
  13.                         std::task::Poll::Pending {} => {}
  14.                     }
  15.                     _task_context = (yield ());
  16.                 },
  17.             };
  18.         };
  19.         _t
  20.     })
  21. }
复制代码
这样就实现了 Waker struct 到 Waker trait 的转换。尽管如此,我们还需要一个结构体用来表示 Waker,满足下列条件:

  • 实现 Deref trait,在引用时返回 &std::task::Waker
  • 为了满足 Rust 的 safety rules,需要手动管理data的内存,显然某个实现了 Wake 的类型不会为了创建 waker 就交出自己的拥有权,因此只能通过传入的引用转成指针来创建 ManuallyDrop 实例,并考虑到 Deref trait 和后续的 Context 创建,需要通过 PhantomData 来管理 lifetime annotation
从而创建 WakeRef 结构体:
  1. trait Wake {
  2.   fn wake(self) {
  3.     Wake::wake_by_ref(&self);
  4.   }
  5.   fn wake_by_ref(&self);
  6. }
  7. unsafe fn wake<T: WakeTrait>(data: *const ()) {//对应RawWakerVTable里的函数指针
  8.   let v = data.cast::<T>();
  9.   v.wake();
  10. }
复制代码
如何根据引用创建 WakeRef 实例:
  1. use std::mem::ManuallyDrop;
  2. use std::task::Waker;
  3. use std::marker::PhantomData;
  4. struct WakeRef<'a> {
  5.   waker: ManuallyDrop<Waker>,
  6.   _marker: PhantomData<&'a ()>
  7. }
复制代码
因此对于某个实现 Wake 的类型来说,只需要传入引用就可以用 Context::from_waker(&waker) 来创建 context 了。
在 futures-rs 中,由于涉及到多线程,所以上述的其实并不安全,需要将普通引用改成 Arc 用于在多线程之间传递,Wake trait 也变成了 ArcWake,
  1. use std::task::{Waker, RawWaker};
  2. fn get_waker<W: Wake>(wake: &W) -> WakeRef<'_> {
  3.   let ptr = wake as *const _ as *const ();
  4.   WakeRef {
  5.     waker: ManuallyDrop::new(unsafe {Waker::from_raw(RawWaker::new(ptr, ...))}),//...省略的是创建RawWakerVTable的过程
  6.     _marker: PhantomData
  7.   }
  8. }
复制代码
但是道理差不多。RawWakerVTable 的四个函数也与这个有关,以 wake 函数为例:
  1. use std::task::Waker;
  2. impl std::ops::Deref for WakeRef<'_> {
  3.   type Target = Waker;
  4.   fn deref(&self) -> &Waker {
  5.     &self.waker
  6.   }
  7. }
复制代码
FuturesUnordered

FuturesUnordered 是一个 Future 的托管容器,其有一条链表维护所有的 Future,再通过一个队列维护所有需要运行的 Future(当然这里都不是 collections 里面那种普通的链表和队列,由于 FuturesUnordered 其实要与单线程和线程池 executor 共用,所以这两个数据结构其实还涉及很多原子化操作,在保证原子化且无锁的前提下要设计一个链表还挺麻烦的)。
  1. trait ArcWake: Send + Sync {
  2.   fn wake(self: Arc<Self>) {
  3.     Self::wake_by_ref(&self)
  4.   }
  5.   
  6.   fn wake_by_ref(arc_self: &Arc<Self>);
  7. }
复制代码
这里重点看 FuturesUnordered 如何实现 Waker,FuturesUnordered 将 Future 看作一个个 Task 。
  1. unsafe fn wake_arc_raw<T: ArcWake>(data: *const ()) {
  2.   let arc: Arc<T> = Arc::from_raw(data.cast::<T>());
  3.   ArcWake::wake(arc);
  4. }
复制代码
为 Task 实现 ArcWake
  1. struct FuturesUnordered<Fut> {
  2.   ready_to_run_queue: Arc<ReadyToRunQueue<Fut>>,//需要运行的Future队列
  3.   head_all: AtomicPtr<Task<Fut>>,//所有Future组成的链表
  4.   is_terminated: AtomicBool
  5. }
复制代码
当一个 Task 运行(被poll)时,其被从 FuturesUnordered 的 ready_to_run_queue 上摘下来,而在 wake 中又会重新放回去。因此,如果 Future 内部调用了 wake,则 Task 会再被放到 ready_to_run_queue 上运行,如果没有则不会。
所以每个 Future 使用的 context 其实是来自于 Task:
  1. struct Task<Fut> {
  2.   future: UnsafeCell<Option<Fut>>,
  3.   next_all: AtomicPtr<Task<Fut>>,//下一个Task节点
  4.   len_all: UnsafeCell<usize>,//链表长度
  5.   next_ready_to_run: AtomicPtr<Task<Fut>>,//下一个要运行的Task
  6.   ready_to_run_queue: Weak<ReadyToRunQueue<Fut>>,
  7.   queued: AtomicBool,//是否在Task链表内(Task运行时需要从链表上摘下)
  8.   woken: AtomicBool//是否已经调用wake函数
  9. }
复制代码
FuturesUnordered 本身实现了 Stream trait
  1. impl<Fut> ArcWake for Task<Fut> {
  2.   fn wake_by_ref(arc_self: &Arc<Self>) {
  3.     let inner = match arc_self.ready_to_run_queue.upgrade() {
  4.       Some(inner) => inner,
  5.       None => return,
  6.     };
  7.    
  8.     arc_self.woken.store(true, Relaxed);
  9.     let prev = arc_self.queued.swap(true, SeqCst);
  10.     if !prev {
  11.       inner.enqueue(Arc::as_ptr(arc_self));
  12.       inner.waker.wake();
  13.     }
  14.   }
  15. }
复制代码
单线程 executor 将 Waker 的 wake 与线程的 wake 绑定,当调用 wake 时,如果 executor 线程处于 park(即阻塞) 状态,则 unpark 线程。
  1. let waker = Task::waker_ref(task);
  2. let mut cx = Context::from_waker(&waker);
  3. future.poll(&mut cx);
复制代码
先看 LocalPool 如何定义 run 操作:
  1. trait Stream {
  2.   type Item;
  3.   fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
  4. }
复制代码
ThreadPool 也有自定义的 Task:
  1. struct LocalPool {
  2.   pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
  3.   incoming: Rc<Incoming>
  4. }
复制代码
线程池 executor 和单线程 executor 对待 Pending 的方式,相同点在于如果 Future 没有调用 wake,则放弃 Future,Future 要运行只能重新 spawn。不同点:

  • 线程池:如果 Future 调用 wake,所在的线程阻塞式调用 poll 直到返回 Ready 或者 Future 放弃调用 wake
  • 单线程:调用 wake 不会立刻再屌用 poll,但加入到 ready_to_run_queue 里面在下一次循环中被 poll
总结

本文只是一篇介绍 Rust 异步编程的原理,并通过具体的仓库稍微深挖一下实现的过程。具体的原因还是官方文档的介绍非常模糊,以我来说,第一次看到 Waker 完全不知道怎么用,底层到底是干了什么,"Future be ready to run again" 又是什么意思。如果不稍微看一下 runtime lib 的源码,有些东西很难理解。
本文只是简单介绍了一个 futures-rs 的实现,executor 方面都忽略了很多细节。而 futures-rs 还有大量的扩展代码藏在 util 目录下,但是这些东西一般看看文档就知道大概做了什么,懂得异步的实现原理就知道大概是怎么实现的,如果实在不懂还是可以去看源码。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

天空闲话

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表