.NET 响应式编程 System.Reactive 系列文章(三):Subscribe 和 IDisposab ...

打印 上一主题 下一主题

主题 866|帖子 866|积分 2608

.NET 响应式编程 System.Reactive 系列文章(三):Subscribe 和 IDisposable 的深入理解

引言:为什么理解 Subscribe 和 IDisposable 很紧张?

在前两篇文章中,我们详细先容了 IObservable 和 IObserver 的核心概念及交互流程。但在实际使用 System.Reactive 时,一个常见的误区是认为数据流一旦订阅,就不需要额外管理。这种认知是危险的,因为 Observable 的订阅可能是无穷的,如果不管理好订阅的生命周期,很轻易导致内存泄漏资源浪费
在 Rx 中,Subscribe() 方法返回一个 IDisposable 接口对象,用于手动取消订阅和开释资源。另外,System.Reactive 还提供了不返回 IDisposable 的 Subscribe 重载,这些重载方法通过 CancellationToken 管理订阅的生命周期。在本篇文章中,我们将深入探讨 Subscribe 和 IDisposable 的原理、这些特别重载的设计原因,以及在实际使用中的应用场景。
1. Subscribe 的内部机制

1.1 Subscribe 的作用

Subscribe 是连接 IObservableIObserver 的桥梁。当你调用 Subscribe() 方法时:

  • IObservable 开始向 IObserver 推送数据
  • 订阅会保持活泼状态,直到:

    • 数据流竣事(调用 OnCompleted())。
    • 发生错误(调用 OnError())。
    • 手动取消订阅(调用 Dispose())。
    • 超时取消订阅(向CancellationToken注册超时回调)。

1.2 为什么 Subscribe 返回 IDisposable?

普通的 Subscribe 重载 返回一个 IDisposable 对象,允许你通过调用 Dispose() 方法取消订阅。这是管理数据流生命周期的核心机制之一。
2. Subscribe 重载:不返回 IDisposable 的特别情况

System.Reactive 提供了一些特别的 Subscribe 重载方法,它们不返回 IDisposable,而是依赖于 CancellationToken 来控制订阅的生命周期。这些方法设计的目的是为了提供一种外部取消订阅的机制,让你无需手动管理 Dispose() 的调用。
2.1 方法签名

以下是其中一个不返回 IDisposable 的 Subscribe 重载:
  1. public static void Subscribe<T>(
  2.     this IObservable<T> source,
  3.     Action<T> onNext,
  4.     Action<Exception> onError,
  5.     Action onCompleted,
  6.     CancellationToken cancellationToken
  7. );
复制代码
这种重载方法的使用场景是:你希望通过 CancellationToken 来控制订阅的生命周期,而不是手动调用 Dispose()。
2.2 示例代码:使用 CancellationToken 管理订阅

示例:超时取消订阅
  1. using System;
  2. using System.Reactive.Linq;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. class Program
  6. {
  7.         static void Main(string[] args)
  8.         {
  9.                 IObservable<long> observable = Observable.Interval(TimeSpan.FromSeconds(1));
  10.                 CancellationTokenSource cts = new();
  11.                 // 使用 Subscribe 方法并传入 CancellationToken
  12.                 observable.Subscribe(
  13.                         onNext: static value => Console.WriteLine($"Received: {value}"),
  14.                         onError: static ex => Console.WriteLine($"Error: {ex.Message}"),
  15.                         onCompleted: static () => Console.WriteLine("Completed"),
  16.                         token: cts.Token
  17.                 );
  18.                 // 模拟运行 5 秒后取消订阅
  19.                 Console.WriteLine("Running for 5 seconds...");
  20.                 Thread.Sleep(5000);
  21.                 cts.Cancel();
  22.                 Console.WriteLine("Subscription cancelled.");
  23.         }
  24. }
复制代码
输出结果:
  1. Running for 5 seconds...
  2. Received: 0
  3. Received: 1
  4. Received: 2
  5. Received: 3
  6. Subscription cancelled.
复制代码
2.3 使用场景:什么时候使用 CancellationToken?

使用场景推荐的 Subscribe 重载需要手动取消订阅返回 IDisposable 的重载使用外部控制(如用户交互、超时)控制订阅带 CancellationToken 的重载典型场景:


  • 异步任务取消
    在异步任务中使用 CancellationToken 取消订阅数据流,制止阻塞或内存泄漏。
  • 超时控制
    使用 CancellationTokenSource.CancelAfter() 设置超时取消订阅。
2.4 示例:设置超时取消订阅
  1. using System;
  2. using System.Reactive.Linq;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. class Program
  6. {
  7.         static void Main(string[] args)
  8.         {
  9.                 IObservable<long> observable = Observable.Interval(TimeSpan.FromSeconds(1));
  10.                 CancellationTokenSource cts = new();
  11.                 cts.CancelAfter(TimeSpan.FromSeconds(3)); // 设置 3 秒后自动取消订阅
  12.                 observable.Subscribe(
  13.                         onNext: static value => Console.WriteLine($"Received: {value}"),
  14.                         onError: static ex => Console.WriteLine($"Error: {ex.Message}"),
  15.                         onCompleted: static () => Console.WriteLine("Completed"),
  16.                         token: cts.Token
  17.                 );
  18.                 Console.WriteLine("Running...");
  19.                 Thread.Sleep(5000);
  20.                 Console.WriteLine("Program ended.");
  21.         }
  22. }
复制代码
输出结果:
  1. Running...
  2. Received: 0
  3. Received: 1
  4. Received: 2
  5. Program ended.
复制代码
3. 使用场景总结

使用方式特点适用场景Subscribe 返回 IDisposable允许手动取消订阅长时间订阅或频繁管理多个订阅Subscribe 接受 CancellationToken通过外部控制(如超时或用户交互)取消订阅异步任务、超时控制、用户交互场景4. 注意事项:CancellationToken 的局限性

固然使用 CancellationToken 可以简化订阅管理,但也有一些需要注意的地方:

  • 不支持手动取消
    如果你使用的是返回 IDisposable 的 Subscribe 方法,你可以手动调用 Dispose() 取消订阅。但如果你使用带 CancellationToken 的重载,就无法通过 Dispose() 取消订阅。
  • 更得当一次性订阅
    CancellationToken 的 Subscribe 重载更得当一次性订阅的场景。如果你需要频繁管理多个订阅,使用 CompositeDisposable 或手动管理 IDisposable 可能更合适。
5. 两种订阅方式的对比

特性返回 IDisposable 的 Subscribe带 CancellationToken 的 Subscribe是否支持手动取消订阅✅ 支持❌ 不支持是否支持外部控制订阅生命周期❌ 需要手动调用 Dispose()✅ 通过 CancellationToken 控制是否得当长期订阅✅ 得当❌ 更得当一次性订阅6. Subscribe 和 IDisposable 的交互流程图

sequenceDiagram    participant Observer as IObserver    participant Observable as IObservable    participant IDisposable as IDisposable    Observer ->> Observable: Subscribe()    Observable ->> Observer: OnNext(T value)    Observable ->> Observer: OnNext(T value)    Observer ->> IDisposable: Dispose()    Observable -->> Observer: 停止推送数据总结

在本篇文章中,我们详细探讨了 Subscribe 和 IDisposable 的内部机制,并重点先容了 带 CancellationToken 的 Subscribe 重载

  • Subscribe() 方法返回 IDisposable,用于管理订阅的生命周期。
  • 不返回 IDisposable 的 Subscribe 重载,通过 CancellationToken 控制订阅的停止。
  • 使用场景不同:IDisposable 更得当长期订阅,CancellationToken 更得当一次性或外部控制的订阅。
下一篇文章预报

《.NET 响应式编程 System.Reactive 系列文章(四):操作符基础》
下一篇文章将先容 System.Reactive 的基础操作符,包括怎样创建转换过滤数据流。我们将通过实战示例,资助你快速掌握 Rx 的操作符使用方法。敬请等待!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

泉缘泉

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表