.NET 响应式编程 System.Reactive 系列文章(二):深入理解 IObservable<T ...

打印 上一主题 下一主题

主题 834|帖子 834|积分 2502

.NET 响应式编程 System.Reactive 系列文章(二):深入理解 IObservable 和 IObserver

弁言:为什么我们调整了学习顺序?

在上一篇文章的结尾,我原本筹划在本篇介绍 System.Reactive 的基础操作符,比如怎样创建、转换和过滤数据流。但在撰写内容时,我意识到,对于刚接触 System.Reactive 的读者来说,直接介绍操作符可能有些匆匆,因为 操作符的使用必须建立在对 IObservable 和 IObserver 这两个焦点接口的深刻理解之上
正如在传统编程中,你必要先理解 集合(Collection)迭代器(Iterator) 的本质,才气更好地使用 LINQ 操作符一样。而在 Rx 中,IObservable 是数据流的生产者,IObserver 是数据流的斲丧者,理解这两个接口是掌握 Rx 的基础。
因此,我决定调整顺序,在本篇文章中,深入介绍 IObservable 和 IObserver 的焦点概念、方法和使用方式,为后续学习操作符打下坚实的基础。
IObservable 和 IObserver 的关系

在 Rx 中,数据流的生产和斲丧是通过 观察者模式(Observer Pattern) 实现的。这种模式定义了两种角色:

  • IObservable(可观察对象/数据流的生产者)
  • IObserver(观察者/数据流的斲丧者)
二者的关系可以简单理解为:

  • IObservable 负责“推送”数据项
  • IObserver 负责“接收”数据项
订阅(Subscribe) 是连接这两者的桥梁。当 IObserver 订阅一个 IObservable 时,数据流开始传递。
1. IObservable 的定义和职责

IObservable 接口定义
  1. public interface IObservable<out T>
  2. {
  3.     IDisposable Subscribe(IObserver<T> observer);
  4. }
复制代码
IObservable 的职责:

  • 代表一个数据流,它可以产生零个、一个或多个数据项。
  • 当一个观察者(IObserver)订阅这个数据流时,它会调用 Subscribe 方法,并开始推送数据。
  • 数据流可能会因为正常完成发生错误而终止。
2. IObserver 的定义和职责

IObserver 接口定义
  1. public interface IObserver<in T>
  2. {
  3.     void OnNext(T value);
  4.     void OnError(Exception error);
  5.     void OnCompleted();
  6. }
复制代码
IObserver 的职责:

  • 代表一个数据的斲丧者,它对 IObservable 提供的数据流做出响应。
  • IObserver 必要实现三个方法:

    • OnNext(T value):当有新的数据项时调用。
    • OnError(Exception error):当数据流发生错误时调用。
    • OnCompleted():当数据流正常结束时调用。

3. IObservable 和 IObserver 的交互流程

让我们通过一个实际的交互流程图来直观地理解 IObservable 和 IObserver 的关系:

  • 观察者(Observer)通过 Subscribe 方法订阅可观察对象(Observable)。
  • 可观察对象(Observable)调用 Observer 的 OnNext 方法推送数据。
  • 如果发生错误,可观察对象(Observable)调用 OnError 方法终止数据流。
  • 如果数据流正常结束,可观察对象(Observable)调用 OnCompleted 方法终止数据流。
sequenceDiagram    participant Observable as IObservable    participant Observer as IObserver    Observer ->> Observable: Subscribe()    Observable ->> Observer: OnNext(T value)    Observable ->> Observer: OnNext(T value)    alt 数据流正常结束        Observable ->> Observer: OnCompleted()    else 发生错误        Observable ->> Observer: OnError(Exception error)    end4. 示例代码:实现一个简单的 Observable 和 Observer

为了更好地理解这两个接口,我们从零开始,手动实现一个简单的 IObservable 和 IObserver。
实现自定义 Observable
  1. using System;
  2. using System.Threading;
  3. using System.Threading.Tasks;
  4. public sealed class SimpleObservable : IObservable<int>
  5. {
  6.         IDisposable IObservable<int>.Subscribe(IObserver<int> observer)
  7.         {
  8.                 SimpleDisposable disposable = new();
  9.                 Task.Run(() =>
  10.                 {
  11.                         // 模拟数据的生产,以及假设每次生产都需要时间,消费者可以随时调用Dispose方法取消订阅
  12.                         for (int i = 1; i <= 5; i++)
  13.                         {
  14.                                 if (disposable.IsDisposed)
  15.                                 {
  16.                                         return;
  17.                                 }
  18.                                 observer.OnNext(i);
  19.                 // 模拟产生数据需要耗时50毫秒
  20.                                 Thread.Sleep(50);
  21.                         }
  22.                         observer.OnCompleted();
  23.                 });
  24.                 return disposable;
  25.         }
  26.         private sealed class SimpleDisposable : IDisposable
  27.         {
  28.                 internal bool IsDisposed { get; private set; }
  29.                 void IDisposable.Dispose()
  30.                 {
  31.                         IsDisposed = true;
  32.                         Console.WriteLine("Subscription disposed.");
  33.                 }
  34.         }
  35. }
复制代码
订阅和运行
  1. using System;
  2. public sealed class SimpleObserver : IObserver<int>
  3. {
  4.         void IObserver<int>.OnNext(int value) => Console.WriteLine($"Received: {value}");
  5.         void IObserver<int>.OnError(Exception error) => Console.WriteLine($"Error: {error.Message}");
  6.         void IObserver<int>.OnCompleted() => Console.WriteLine("Sequence Completed.");
  7. }
复制代码
输出结果:
  1. using System;
  2. using System.Threading;
  3. class Program
  4. {
  5.         static void Main(string[] args)
  6.         {
  7.                 IObservable<int> observable = new SimpleObservable();
  8.                 IObserver<int> observer = new SimpleObserver();
  9.                 IDisposable subscription = observable.Subscribe(observer);
  10.         // 模拟消费数据100毫秒后取消订阅
  11.                 Thread.Sleep(100);
  12.                 subscription.Dispose();
  13.         }
  14. }
复制代码
5. 常见问题解答

Q1:为什么 Subscribe 方法返回 IDisposable?

Subscribe 方法返回一个 IDisposable 对象,允许订阅者在不再必要数据流时取消订阅,以释放资源,制止内存泄漏。
Q2:OnError 和 OnCompleted 可以同时调用吗?

不能。数据流要么以错误终止,要么正常结束,二者是互斥的
Q3:IObservable 可以被多个 IObserver 订阅吗?

可以。一个 IObservable 可以被多个观察者订阅,每个观察者都会接收到数据流的推送。
总结

在本篇文章中,我们深入探讨了 IObservable 和 IObserver 这两个焦点接口的定义和职责,并通过代码示例展示了它们怎样交互。
焦点要点:


  • IObservable 是数据流的生产者,它负责推送数据。
  • IObserver 是数据流的斲丧者,它负责接收和处理数据。
  • Subscribe 方法将生产者和斲丧者连接起来,并返回一个 IDisposable 对象,用于取消订阅。
下一篇文章预告

《.NET 响应式编程 System.Reactive 系列文章(三):Subscribe 和 IDisposable 的深入理解》
在下一篇文章中,我们将重点探讨 Subscribe 方法的内部工作机制IDisposable 的作用,以及怎样优雅地管理订阅的生命周期。敬请期待!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

前进之路

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表