Reactive编程:数据流和观察者

打印 上一主题 下一主题

主题 2302|帖子 2302|积分 6906



  

第二部门:Reactive编程核心概念

2.1 数据流(Data Stream)

2.1.1 数据流的根本概念

数据流(Data Stream)是Reactive编程的核心概念之一,它代表一系列按时间顺序排列的变乱或数据项的集合。在传统的下令式编程中,数据通常是静态的,程序通过顺序实行指令来处理数据。而在Reactive编程中,数据被视为动态的、连续变化的流,程序通过订阅这些流来相应数据的变化。
数据流的特点包括:

  • 时间连续性:数据流中的变乱是按时间顺序发生的。
  • 异步性:数据项可能在任何时间点到达,不受程序控制流约束。
  • 不可变性:流中的数据一旦发出就不能更改,只能通过转换操纵生成新流。
  • 可组合性:多个数据流可以通过操纵符(Operators)举行组合、转换和过滤。
2.1.2 数据流的类型

在Reactive编程中,数据流可以分为以下几种类型:
(1) 冷流(Cold Stream)



  • 特点:数据流的生成由订阅者触发,每个订阅者都会收到完整的数据序列。
  • 示例:从文件读取数据、HTTP哀求返回的相应流。
  • 代码示例(RxJS)
    1. import { of } from 'rxjs';
    2. const coldStream$ = of(1, 2, 3); // 冷流
    3. coldStream$.subscribe(value => console.log(`Subscriber 1: ${value}`));
    4. coldStream$.subscribe(value => console.log(`Subscriber 2: ${value}`));
    5. // 输出:
    6. // Subscriber 1: 1
    7. // Subscriber 1: 2
    8. // Subscriber 1: 3
    9. // Subscriber 2: 1
    10. // Subscriber 2: 2
    11. // Subscriber 2: 3
    复制代码
(2) 热流(Hot Stream)



  • 特点:数据流独立于订阅者存在,订阅者只能收到订阅后发出的数据。
  • 示例:鼠标移动变乱、WebSocket及时消息。
  • 代码示例(RxJS)
    1. import { fromEvent, interval } from 'rxjs';
    2. const hotStream$ = fromEvent(document, 'click'); // 热流(鼠标点击事件)
    3. setTimeout(() => {
    4.   hotStream$.subscribe(event => console.log(`Subscriber: Click at ${event.timeStamp}`));
    5. }, 3000);
    6. // 3秒后订阅,只能收到3秒后的点击事件
    复制代码
(3) 有限流(Finite Stream)



  • 特点:数据流会在某个时刻结束(如HTTP哀求完成)。
  • 示例:API哀求返回的单个相应。
(4) 无限流(Infinite Stream)



  • 特点:数据流可能永远不会结束(如传感器数据、及时股票行情)。
  • 示例:interval 生成的定时数据流。
2.1.3 数据流的操纵符

Reactive编程提供丰富的操纵符(Operators)来处理数据流,常见的操纵符包括:
类别操纵符示例功能形貌创建流of, from, interval从静态数据、数组或时间间隔创建流转换流map, scan, buffer对数据项举行转换或累积计算过滤流filter, take, skip根据条件筛选或限定命据项组合流merge, concat, zip合并多个流的数据错误处理catchError, retry捕捉和处理流中的错误高级调度debounceTime, throttle控制数据流的发射频率 代码示例(RxJS 操纵符组合)
  1. import { fromEvent } from 'rxjs';
  2. import { map, filter, debounceTime } from 'rxjs/operators';
  3. const input = document.querySelector('input');
  4. fromEvent(input, 'input')
  5.   .pipe(
  6.     map(event => event.target.value), // 提取输入值
  7.     filter(text => text.length > 3), // 过滤长度≤3的输入
  8.     debounceTime(500) // 防抖(500ms内无新输入才发射)
  9.   )
  10.   .subscribe(value => console.log(`Search for: ${value}`));
复制代码
2.1.4 数据流的实际应用

(1) 及时搜索发起

  1. // 使用RxJS实现搜索框自动补全
  2. searchInput$.pipe(
  3.   debounceTime(300),
  4.   distinctUntilChanged(),
  5.   switchMap(query => fetch(`/api/search?q=${query}`))
  6. ).subscribe(results => updateUI(results));
复制代码
(2) 游戏开发(脚色移动)

  1. // 使用键盘事件流控制角色移动
  2. const keyPress$ = fromEvent(document, 'keydown');
  3. const move$ = keyPress$.pipe(
  4.   filter(key => ['ArrowUp', 'ArrowDown'].includes(key.code)),
  5.   map(key => key.code === 'ArrowUp' ? 1 : -1)
  6. );
  7. move$.subscribe(delta => character.y += delta * SPEED);
复制代码
(3) 金融生意业务数据聚合

  1. // 合并多个股票行情流
  2. const stock1$ = stockFeed('AAPL');
  3. const stock2$ = stockFeed('GOOG');
  4. merge(stock1$, stock2$)
  5.   .pipe(bufferTime(1000)) // 每1秒聚合一次
  6.   .subscribe(prices => calculatePortfolioValue(prices));
复制代码
2.1.5 数据流的背压问题(Backpressure)

当数据生产速度高出消耗速度时,系统可能因资源耗尽而瓦解。Reactive编程通过以下计谋处理背压:

  • 丢弃计谋:throttle、sample
  • 缓冲计谋:buffer、window
  • 反馈控制:相应式流规范(Reactive Streams)的request(n)机制
代码示例(背压处理)
  1. // Reactor(Java)中的背压控制
  2. Flux.range(1, 1000)
  3.     .onBackpressureBuffer(100) // 缓冲100个元素
  4.     .subscribe(
  5.         value -> processSlowly(value),
  6.         err -> handleError(err),
  7.         () -> System.out.println("Done")
  8.     );
复制代码

2.2 观察者模式(Observer Pattern)

2.2.1 观察者模式基础

观察者模式是Reactive编程的底层设计模式,它定义了一对多的依靠关系:当一个对象(Subject)的状态改变时,全部依靠它的对象(Observers)会自动收到关照并更新。
模式脚色

脚色形貌Subject维护观察者列表,提供注册/注销方法,关照状态变化Observer定义更新接口,吸收Subject的关照并实行相应逻辑ConcreteSubject具体的被观察对象,存储状态并在变化时关照观察者ConcreteObserver具体的观察者,实现更新逻辑 2.2.2 观察者模式实现

经典实现(Java)

  1. // 主题接口
  2. interface Subject {
  3.     void registerObserver(Observer o);
  4.     void removeObserver(Observer o);
  5.     void notifyObservers();
  6. }
  7. // 具体主题(温度传感器)
  8. class TemperatureSensor implements Subject {
  9.     private List<Observer> observers = new ArrayList<>();
  10.     private float temperature;
  11.     public void setTemperature(float temp) {
  12.         this.temperature = temp;
  13.         notifyObservers();
  14.     }
  15.     @Override
  16.     public void registerObserver(Observer o) {
  17.         observers.add(o);
  18.     }
  19.     @Override
  20.     public void notifyObservers() {
  21.         for (Observer o : observers) {
  22.             o.update(temperature);
  23.         }
  24.     }
  25. }
  26. // 观察者接口
  27. interface Observer {
  28.     void update(float temperature);
  29. }
  30. // 具体观察者(温度显示器)
  31. class TemperatureDisplay implements Observer {
  32.     @Override
  33.     public void update(float temp) {
  34.         System.out.println("当前温度: " + temp + "°C");
  35.     }
  36. }
  37. // 使用示例
  38. public class Main {
  39.     public static void main(String[] args) {
  40.         TemperatureSensor sensor = new TemperatureSensor();
  41.         sensor.registerObserver(new TemperatureDisplay());
  42.         sensor.setTemperature(25.5f); // 输出:当前温度: 25.5°C
  43.     }
  44. }
复制代码
Reactive扩展(RxJS)

  1. // 创建一个可观察对象(Subject)
  2. const subject = new rxjs.Subject();
  3. // 订阅观察者
  4. const subscription1 = subject.subscribe(
  5.   value => console.log(`Observer 1: ${value}`)
  6. );
  7. const subscription2 = subject.subscribe(
  8.   value => console.log(`Observer 2: ${value}`)
  9. );
  10. // 发送数据
  11. subject.next('Hello');
  12. subject.next('World');
  13. // 输出:
  14. // Observer 1: Hello
  15. // Observer 2: Hello
  16. // Observer 1: World
  17. // Observer 2: World
复制代码
2.2.3 观察者模式与Reactive编程的关系


  • 数据流即Subject:Reactive框架中的Observable本质上是加强版的Subject,支持多播和操纵符。
  • 观察者即Subscriber:订阅者通过subscribe()方法注册回调,相当于观察者模式的update()。
  • 扩展本领

    • 多播(Multicast):一个数据流被多个观察者共享
    • 生命周期管理:complete()和error()关照
    • 操纵符链式调用:map、filter等转换操纵

2.2.4 观察者模式的实际应用

(1) 用户界面变乱处理

  1. // 按钮点击事件观察
  2. const button = document.getElementById('myButton');
  3. const click$ = fromEvent(button, 'click');
  4. click$.subscribe(event => {
  5.   console.log('Button clicked at:', event.timeStamp);
  6. });
复制代码
(2) 状态管理(Redux Store)

  1. // Redux的Store本质上是一个Subject
  2. const store = createStore(reducer);
  3. store.subscribe(() => {
  4.   console.log('State changed:', store.getState());
  5. });
  6. store.dispatch({ type: 'INCREMENT' });
复制代码
(3) WebSocket及时通信

  1. const socket = new WebSocket('ws://example.com');
  2. const message$ = new Subject();
  3. socket.onmessage = event => {
  4.   message$.next(event.data);
  5. };
  6. message$.subscribe(data => {
  7.   console.log('Received:', data);
  8. });
复制代码
2.2.5 观察者模式的优缺点

优点



  • 松耦合:Subject和Observer之间无直接依靠
  • 动态关系:可运行时添加/删除观察者
  • 广播通信:一次状态变更可关照多个观察者
缺点



  • 内存走漏风险:未正确注销观察者会导致引用残留
  • 关照顺序不可控:观察者被调用的顺序可能影响系统举动
  • 调试困难:数据流动的隐式流传可能增加调试复杂度
2.2.6 观察者模式的变体

变体形貌Reactive实现发布-订阅模式引入变乱通道,解耦发布者和订阅者RxJS的Subject相应式属性(Binding)自动同步对象属性变化(如Vue的v-model)MobX的observable数据总线(Event Bus)全局变乱中央,任意组件可发布/订阅变乱Vue的EventEmitter
总结



  • 数据流是Reactive编程的核心抽象,代表随时间变化的变乱序列,可通过操纵符举行机动转换。
  • 观察者模式是Reactive系统的底层机制,通过订阅/关照机制实现数据变化的自动流传。
  • 两者结合使得Reactive编程可以或许高效处理异步、及时数据,实用于从UI交互到分布式系统的广泛场景。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

立聪堂德州十三局店

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表