IT评测·应用市场-qidao123.com技术社区

标题: Reactive编程:数据流和观察者 [打印本页]

作者: 立聪堂德州十三局店    时间: 2025-4-6 18:02
标题: Reactive编程:数据流和观察者


  

第二部门:Reactive编程核心概念

2.1 数据流(Data Stream)

2.1.1 数据流的根本概念

数据流(Data Stream)是Reactive编程的核心概念之一,它代表一系列按时间顺序排列的变乱或数据项的集合。在传统的下令式编程中,数据通常是静态的,程序通过顺序实行指令来处理数据。而在Reactive编程中,数据被视为动态的、连续变化的流,程序通过订阅这些流来相应数据的变化。
数据流的特点包括:
2.1.2 数据流的类型

在Reactive编程中,数据流可以分为以下几种类型:
(1) 冷流(Cold Stream)


(2) 热流(Hot Stream)


(3) 有限流(Finite Stream)


(4) 无限流(Infinite Stream)


2.1.3 数据流的操纵符

Reactive编程提供丰富的操纵符(Operators)来处理数据流,常见的操纵符包括:
类别操纵符示例功能形貌创建流of, from, interval从静态数据、数组或时间间隔创建流转换流map, scan, buffer对数据项举行转换或累积计算过滤流filter, take, skip根据条件筛选或限定命据项组合流merge, concat, zip合并多个流的数据错误处理catchError, retry捕捉和处理流中的错误高级调度debounceTime, throttle控制数据流的发射频率 代码示例(RxJS 操纵符组合)
  1. import { fromEvent } from 'rxjs';
  2. import { map, filter, debounceTime } from 'rxjs/operators';
  3. const input = document.querySelector('input');
  4. fromEvent(input, 'input')
  5.   .pipe(
  6.     map(event => event.target.value), // 提取输入值
  7.     filter(text => text.length > 3), // 过滤长度≤3的输入
  8.     debounceTime(500) // 防抖(500ms内无新输入才发射)
  9.   )
  10.   .subscribe(value => console.log(`Search for: ${value}`));
复制代码
2.1.4 数据流的实际应用

(1) 及时搜索发起

  1. // 使用RxJS实现搜索框自动补全
  2. searchInput$.pipe(
  3.   debounceTime(300),
  4.   distinctUntilChanged(),
  5.   switchMap(query => fetch(`/api/search?q=${query}`))
  6. ).subscribe(results => updateUI(results));
复制代码
(2) 游戏开发(脚色移动)

  1. // 使用键盘事件流控制角色移动
  2. const keyPress$ = fromEvent(document, 'keydown');
  3. const move$ = keyPress$.pipe(
  4.   filter(key => ['ArrowUp', 'ArrowDown'].includes(key.code)),
  5.   map(key => key.code === 'ArrowUp' ? 1 : -1)
  6. );
  7. move$.subscribe(delta => character.y += delta * SPEED);
复制代码
(3) 金融生意业务数据聚合

  1. // 合并多个股票行情流
  2. const stock1$ = stockFeed('AAPL');
  3. const stock2$ = stockFeed('GOOG');
  4. merge(stock1$, stock2$)
  5.   .pipe(bufferTime(1000)) // 每1秒聚合一次
  6.   .subscribe(prices => calculatePortfolioValue(prices));
复制代码
2.1.5 数据流的背压问题(Backpressure)

当数据生产速度高出消耗速度时,系统可能因资源耗尽而瓦解。Reactive编程通过以下计谋处理背压:
代码示例(背压处理)
  1. // Reactor(Java)中的背压控制
  2. Flux.range(1, 1000)
  3.     .onBackpressureBuffer(100) // 缓冲100个元素
  4.     .subscribe(
  5.         value -> processSlowly(value),
  6.         err -> handleError(err),
  7.         () -> System.out.println("Done")
  8.     );
复制代码

2.2 观察者模式(Observer Pattern)

2.2.1 观察者模式基础

观察者模式是Reactive编程的底层设计模式,它定义了一对多的依靠关系:当一个对象(Subject)的状态改变时,全部依靠它的对象(Observers)会自动收到关照并更新。
模式脚色

脚色形貌Subject维护观察者列表,提供注册/注销方法,关照状态变化Observer定义更新接口,吸收Subject的关照并实行相应逻辑ConcreteSubject具体的被观察对象,存储状态并在变化时关照观察者ConcreteObserver具体的观察者,实现更新逻辑 2.2.2 观察者模式实现

经典实现(Java)

  1. // 主题接口
  2. interface Subject {
  3.     void registerObserver(Observer o);
  4.     void removeObserver(Observer o);
  5.     void notifyObservers();
  6. }
  7. // 具体主题(温度传感器)
  8. class TemperatureSensor implements Subject {
  9.     private List<Observer> observers = new ArrayList<>();
  10.     private float temperature;
  11.     public void setTemperature(float temp) {
  12.         this.temperature = temp;
  13.         notifyObservers();
  14.     }
  15.     @Override
  16.     public void registerObserver(Observer o) {
  17.         observers.add(o);
  18.     }
  19.     @Override
  20.     public void notifyObservers() {
  21.         for (Observer o : observers) {
  22.             o.update(temperature);
  23.         }
  24.     }
  25. }
  26. // 观察者接口
  27. interface Observer {
  28.     void update(float temperature);
  29. }
  30. // 具体观察者(温度显示器)
  31. class TemperatureDisplay implements Observer {
  32.     @Override
  33.     public void update(float temp) {
  34.         System.out.println("当前温度: " + temp + "°C");
  35.     }
  36. }
  37. // 使用示例
  38. public class Main {
  39.     public static void main(String[] args) {
  40.         TemperatureSensor sensor = new TemperatureSensor();
  41.         sensor.registerObserver(new TemperatureDisplay());
  42.         sensor.setTemperature(25.5f); // 输出:当前温度: 25.5°C
  43.     }
  44. }
复制代码
Reactive扩展(RxJS)

  1. // 创建一个可观察对象(Subject)
  2. const subject = new rxjs.Subject();
  3. // 订阅观察者
  4. const subscription1 = subject.subscribe(
  5.   value => console.log(`Observer 1: ${value}`)
  6. );
  7. const subscription2 = subject.subscribe(
  8.   value => console.log(`Observer 2: ${value}`)
  9. );
  10. // 发送数据
  11. subject.next('Hello');
  12. subject.next('World');
  13. // 输出:
  14. // Observer 1: Hello
  15. // Observer 2: Hello
  16. // Observer 1: World
  17. // Observer 2: World
复制代码
2.2.3 观察者模式与Reactive编程的关系

2.2.4 观察者模式的实际应用

(1) 用户界面变乱处理

  1. // 按钮点击事件观察
  2. const button = document.getElementById('myButton');
  3. const click$ = fromEvent(button, 'click');
  4. click$.subscribe(event => {
  5.   console.log('Button clicked at:', event.timeStamp);
  6. });
复制代码
(2) 状态管理(Redux Store)

  1. // Redux的Store本质上是一个Subject
  2. const store = createStore(reducer);
  3. store.subscribe(() => {
  4.   console.log('State changed:', store.getState());
  5. });
  6. store.dispatch({ type: 'INCREMENT' });
复制代码
(3) WebSocket及时通信

  1. const socket = new WebSocket('ws://example.com');
  2. const message$ = new Subject();
  3. socket.onmessage = event => {
  4.   message$.next(event.data);
  5. };
  6. message$.subscribe(data => {
  7.   console.log('Received:', data);
  8. });
复制代码
2.2.5 观察者模式的优缺点

优点


缺点


2.2.6 观察者模式的变体

变体形貌Reactive实现发布-订阅模式引入变乱通道,解耦发布者和订阅者RxJS的Subject相应式属性(Binding)自动同步对象属性变化(如Vue的v-model)MobX的observable数据总线(Event Bus)全局变乱中央,任意组件可发布/订阅变乱Vue的EventEmitter
总结



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com技术社区 (https://dis.qidao123.com/) Powered by Discuz! X3.4