第二部门:Reactive编程核心概念
2.1 数据流(Data Stream)
2.1.1 数据流的根本概念
数据流(Data Stream)是Reactive编程的核心概念之一,它代表一系列按时间顺序排列的变乱或数据项的集合。在传统的下令式编程中,数据通常是静态的,程序通过顺序实行指令来处理数据。而在Reactive编程中,数据被视为动态的、连续变化的流,程序通过订阅这些流来相应数据的变化。
数据流的特点包括:
- 时间连续性:数据流中的变乱是按时间顺序发生的。
- 异步性:数据项可能在任何时间点到达,不受程序控制流约束。
- 不可变性:流中的数据一旦发出就不能更改,只能通过转换操纵生成新流。
- 可组合性:多个数据流可以通过操纵符(Operators)举行组合、转换和过滤。
2.1.2 数据流的类型
在Reactive编程中,数据流可以分为以下几种类型:
(1) 冷流(Cold Stream)
- 特点:数据流的生成由订阅者触发,每个订阅者都会收到完整的数据序列。
- 示例:从文件读取数据、HTTP哀求返回的相应流。
- 代码示例(RxJS):
- import { of } from 'rxjs';
- const coldStream$ = of(1, 2, 3); // 冷流
- coldStream$.subscribe(value => console.log(`Subscriber 1: ${value}`));
- coldStream$.subscribe(value => console.log(`Subscriber 2: ${value}`));
- // 输出:
- // Subscriber 1: 1
- // Subscriber 1: 2
- // Subscriber 1: 3
- // Subscriber 2: 1
- // Subscriber 2: 2
- // Subscriber 2: 3
复制代码 (2) 热流(Hot Stream)
- 特点:数据流独立于订阅者存在,订阅者只能收到订阅后发出的数据。
- 示例:鼠标移动变乱、WebSocket及时消息。
- 代码示例(RxJS):
- import { fromEvent, interval } from 'rxjs';
- const hotStream$ = fromEvent(document, 'click'); // 热流(鼠标点击事件)
- setTimeout(() => {
- hotStream$.subscribe(event => console.log(`Subscriber: Click at ${event.timeStamp}`));
- }, 3000);
- // 3秒后订阅,只能收到3秒后的点击事件
复制代码 (3) 有限流(Finite Stream)
- 特点:数据流会在某个时刻结束(如HTTP哀求完成)。
- 示例:API哀求返回的单个相应。
(4) 无限流(Infinite Stream)
- 特点:数据流可能永远不会结束(如传感器数据、及时股票行情)。
- 示例:interval 生成的定时数据流。
2.1.3 数据流的操纵符
Reactive编程提供丰富的操纵符(Operators)来处理数据流,常见的操纵符包括:
类别操纵符示例功能形貌创建流of, from, interval从静态数据、数组或时间间隔创建流转换流map, scan, buffer对数据项举行转换或累积计算过滤流filter, take, skip根据条件筛选或限定命据项组合流merge, concat, zip合并多个流的数据错误处理catchError, retry捕捉和处理流中的错误高级调度debounceTime, throttle控制数据流的发射频率 代码示例(RxJS 操纵符组合):
- import { fromEvent } from 'rxjs';
- import { map, filter, debounceTime } from 'rxjs/operators';
- const input = document.querySelector('input');
- fromEvent(input, 'input')
- .pipe(
- map(event => event.target.value), // 提取输入值
- filter(text => text.length > 3), // 过滤长度≤3的输入
- debounceTime(500) // 防抖(500ms内无新输入才发射)
- )
- .subscribe(value => console.log(`Search for: ${value}`));
复制代码 2.1.4 数据流的实际应用
(1) 及时搜索发起
- // 使用RxJS实现搜索框自动补全
- searchInput$.pipe(
- debounceTime(300),
- distinctUntilChanged(),
- switchMap(query => fetch(`/api/search?q=${query}`))
- ).subscribe(results => updateUI(results));
复制代码 (2) 游戏开发(脚色移动)
- // 使用键盘事件流控制角色移动
- const keyPress$ = fromEvent(document, 'keydown');
- const move$ = keyPress$.pipe(
- filter(key => ['ArrowUp', 'ArrowDown'].includes(key.code)),
- map(key => key.code === 'ArrowUp' ? 1 : -1)
- );
- move$.subscribe(delta => character.y += delta * SPEED);
复制代码 (3) 金融生意业务数据聚合
- // 合并多个股票行情流
- const stock1$ = stockFeed('AAPL');
- const stock2$ = stockFeed('GOOG');
- merge(stock1$, stock2$)
- .pipe(bufferTime(1000)) // 每1秒聚合一次
- .subscribe(prices => calculatePortfolioValue(prices));
复制代码 2.1.5 数据流的背压问题(Backpressure)
当数据生产速度高出消耗速度时,系统可能因资源耗尽而瓦解。Reactive编程通过以下计谋处理背压:
- 丢弃计谋:throttle、sample
- 缓冲计谋:buffer、window
- 反馈控制:相应式流规范(Reactive Streams)的request(n)机制
代码示例(背压处理):
- // Reactor(Java)中的背压控制
- Flux.range(1, 1000)
- .onBackpressureBuffer(100) // 缓冲100个元素
- .subscribe(
- value -> processSlowly(value),
- err -> handleError(err),
- () -> System.out.println("Done")
- );
复制代码 2.2 观察者模式(Observer Pattern)
2.2.1 观察者模式基础
观察者模式是Reactive编程的底层设计模式,它定义了一对多的依靠关系:当一个对象(Subject)的状态改变时,全部依靠它的对象(Observers)会自动收到关照并更新。
模式脚色
脚色形貌Subject维护观察者列表,提供注册/注销方法,关照状态变化Observer定义更新接口,吸收Subject的关照并实行相应逻辑ConcreteSubject具体的被观察对象,存储状态并在变化时关照观察者ConcreteObserver具体的观察者,实现更新逻辑 2.2.2 观察者模式实现
经典实现(Java)
- // 主题接口
- interface Subject {
- void registerObserver(Observer o);
- void removeObserver(Observer o);
- void notifyObservers();
- }
- // 具体主题(温度传感器)
- class TemperatureSensor implements Subject {
- private List<Observer> observers = new ArrayList<>();
- private float temperature;
- public void setTemperature(float temp) {
- this.temperature = temp;
- notifyObservers();
- }
- @Override
- public void registerObserver(Observer o) {
- observers.add(o);
- }
- @Override
- public void notifyObservers() {
- for (Observer o : observers) {
- o.update(temperature);
- }
- }
- }
- // 观察者接口
- interface Observer {
- void update(float temperature);
- }
- // 具体观察者(温度显示器)
- class TemperatureDisplay implements Observer {
- @Override
- public void update(float temp) {
- System.out.println("当前温度: " + temp + "°C");
- }
- }
- // 使用示例
- public class Main {
- public static void main(String[] args) {
- TemperatureSensor sensor = new TemperatureSensor();
- sensor.registerObserver(new TemperatureDisplay());
- sensor.setTemperature(25.5f); // 输出:当前温度: 25.5°C
- }
- }
复制代码 Reactive扩展(RxJS)
- // 创建一个可观察对象(Subject)
- const subject = new rxjs.Subject();
- // 订阅观察者
- const subscription1 = subject.subscribe(
- value => console.log(`Observer 1: ${value}`)
- );
- const subscription2 = subject.subscribe(
- value => console.log(`Observer 2: ${value}`)
- );
- // 发送数据
- subject.next('Hello');
- subject.next('World');
- // 输出:
- // Observer 1: Hello
- // Observer 2: Hello
- // Observer 1: World
- // Observer 2: World
复制代码 2.2.3 观察者模式与Reactive编程的关系
- 数据流即Subject:Reactive框架中的Observable本质上是加强版的Subject,支持多播和操纵符。
- 观察者即Subscriber:订阅者通过subscribe()方法注册回调,相当于观察者模式的update()。
- 扩展本领:
- 多播(Multicast):一个数据流被多个观察者共享
- 生命周期管理:complete()和error()关照
- 操纵符链式调用:map、filter等转换操纵
2.2.4 观察者模式的实际应用
(1) 用户界面变乱处理
- // 按钮点击事件观察
- const button = document.getElementById('myButton');
- const click$ = fromEvent(button, 'click');
- click$.subscribe(event => {
- console.log('Button clicked at:', event.timeStamp);
- });
复制代码 (2) 状态管理(Redux Store)
- // Redux的Store本质上是一个Subject
- const store = createStore(reducer);
- store.subscribe(() => {
- console.log('State changed:', store.getState());
- });
- store.dispatch({ type: 'INCREMENT' });
复制代码 (3) WebSocket及时通信
- const socket = new WebSocket('ws://example.com');
- const message$ = new Subject();
- socket.onmessage = event => {
- message$.next(event.data);
- };
- message$.subscribe(data => {
- console.log('Received:', data);
- });
复制代码 2.2.5 观察者模式的优缺点
优点
- 松耦合:Subject和Observer之间无直接依靠
- 动态关系:可运行时添加/删除观察者
- 广播通信:一次状态变更可关照多个观察者
缺点
- 内存走漏风险:未正确注销观察者会导致引用残留
- 关照顺序不可控:观察者被调用的顺序可能影响系统举动
- 调试困难:数据流动的隐式流传可能增加调试复杂度
2.2.6 观察者模式的变体
变体形貌Reactive实现发布-订阅模式引入变乱通道,解耦发布者和订阅者RxJS的Subject相应式属性(Binding)自动同步对象属性变化(如Vue的v-model)MobX的observable数据总线(Event Bus)全局变乱中央,任意组件可发布/订阅变乱Vue的EventEmitter 总结
- 数据流是Reactive编程的核心抽象,代表随时间变化的变乱序列,可通过操纵符举行机动转换。
- 观察者模式是Reactive系统的底层机制,通过订阅/关照机制实现数据变化的自动流传。
- 两者结合使得Reactive编程可以或许高效处理异步、及时数据,实用于从UI交互到分布式系统的广泛场景。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |