IT评测·应用市场-qidao123.com技术社区
标题:
Reactive编程:数据流和观察者
[打印本页]
作者:
立聪堂德州十三局店
时间:
2025-4-6 18:02
标题:
Reactive编程:数据流和观察者
第二部门:Reactive编程核心概念
2.1 数据流(Data Stream)
2.1.1 数据流的根本概念
数据流(Data Stream)是Reactive编程的核心概念之一,它代表一系列按时间顺序排列的变乱或数据项的集合。在传统的下令式编程中,数据通常是静态的,程序通过顺序实行指令来处理数据。而在Reactive编程中,数据被视为
动态的、连续变化的流
,程序通过订阅这些流来相应数据的变化。
数据流的特点包括:
时间连续性
:数据流中的变乱是按时间顺序发生的。
异步性
:数据项可能在任何时间点到达,不受程序控制流约束。
不可变性
:流中的数据一旦发出就不能更改,只能通过转换操纵生成新流。
可组合性
:多个数据流可以通过操纵符(Operators)举行组合、转换和过滤。
2.1.2 数据流的类型
在Reactive编程中,数据流可以分为以下几种类型:
(1) 冷流(Cold Stream)
特点
:数据流的生成由订阅者触发,每个订阅者都会收到完整的数据序列。
示例
:从文件读取数据、HTTP哀求返回的相应流。
代码示例(RxJS)
:
import { of } from 'rxjs';
const coldStream$ = of(1, 2, 3); // 冷流
coldStream$.subscribe(value => console.log(`Subscriber 1: ${value}`));
coldStream$.subscribe(value => console.log(`Subscriber 2: ${value}`));
// 输出:
// Subscriber 1: 1
// Subscriber 1: 2
// Subscriber 1: 3
// Subscriber 2: 1
// Subscriber 2: 2
// Subscriber 2: 3
复制代码
(2) 热流(Hot Stream)
特点
:数据流独立于订阅者存在,订阅者只能收到订阅后发出的数据。
示例
:鼠标移动变乱、WebSocket及时消息。
代码示例(RxJS)
:
import { fromEvent, interval } from 'rxjs';
const hotStream$ = fromEvent(document, 'click'); // 热流(鼠标点击事件)
setTimeout(() => {
hotStream$.subscribe(event => console.log(`Subscriber: Click at ${event.timeStamp}`));
}, 3000);
// 3秒后订阅,只能收到3秒后的点击事件
复制代码
(3) 有限流(Finite Stream)
特点
:数据流会在某个时刻结束(如HTTP哀求完成)。
示例
:API哀求返回的单个相应。
(4) 无限流(Infinite Stream)
特点
:数据流可能永远不会结束(如传感器数据、及时股票行情)。
示例
:interval 生成的定时数据流。
2.1.3 数据流的操纵符
Reactive编程提供丰富的操纵符(Operators)来处理数据流,常见的操纵符包括:
类别
操纵符示例
功能形貌
创建流
of, from, interval从静态数据、数组或时间间隔创建流
转换流
map, scan, buffer对数据项举行转换或累积计算
过滤流
filter, take, skip根据条件筛选或限定命据项
组合流
merge, concat, zip合并多个流的数据
错误处理
catchError, retry捕捉和处理流中的错误
高级调度
debounceTime, throttle控制数据流的发射频率
代码示例(RxJS 操纵符组合)
:
import { fromEvent } from 'rxjs';
import { map, filter, debounceTime } from 'rxjs/operators';
const input = document.querySelector('input');
fromEvent(input, 'input')
.pipe(
map(event => event.target.value), // 提取输入值
filter(text => text.length > 3), // 过滤长度≤3的输入
debounceTime(500) // 防抖(500ms内无新输入才发射)
)
.subscribe(value => console.log(`Search for: ${value}`));
复制代码
2.1.4 数据流的实际应用
(1) 及时搜索发起
// 使用RxJS实现搜索框自动补全
searchInput$.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query => fetch(`/api/search?q=${query}`))
).subscribe(results => updateUI(results));
复制代码
(2) 游戏开发(脚色移动)
// 使用键盘事件流控制角色移动
const keyPress$ = fromEvent(document, 'keydown');
const move$ = keyPress$.pipe(
filter(key => ['ArrowUp', 'ArrowDown'].includes(key.code)),
map(key => key.code === 'ArrowUp' ? 1 : -1)
);
move$.subscribe(delta => character.y += delta * SPEED);
复制代码
(3) 金融生意业务数据聚合
// 合并多个股票行情流
const stock1$ = stockFeed('AAPL');
const stock2$ = stockFeed('GOOG');
merge(stock1$, stock2$)
.pipe(bufferTime(1000)) // 每1秒聚合一次
.subscribe(prices => calculatePortfolioValue(prices));
复制代码
2.1.5 数据流的背压问题(Backpressure)
当数据生产速度高出消耗速度时,系统可能因资源耗尽而瓦解。Reactive编程通过以下计谋处理背压:
丢弃计谋
:throttle、sample
缓冲计谋
:buffer、window
反馈控制
:相应式流规范(Reactive Streams)的request(n)机制
代码示例(背压处理)
:
// Reactor(Java)中的背压控制
Flux.range(1, 1000)
.onBackpressureBuffer(100) // 缓冲100个元素
.subscribe(
value -> processSlowly(value),
err -> handleError(err),
() -> System.out.println("Done")
);
复制代码
2.2 观察者模式(Observer Pattern)
2.2.1 观察者模式基础
观察者模式是Reactive编程的底层设计模式,它定义了
一对多的依靠关系
:当一个对象(Subject)的状态改变时,全部依靠它的对象(Observers)会自动收到关照并更新。
模式脚色
脚色
形貌
Subject维护观察者列表,提供注册/注销方法,关照状态变化Observer定义更新接口,吸收Subject的关照并实行相应逻辑ConcreteSubject具体的被观察对象,存储状态并在变化时关照观察者ConcreteObserver具体的观察者,实现更新逻辑
2.2.2 观察者模式实现
经典实现(Java)
// 主题接口
interface Subject {
void registerObserver(Observer o);
void removeObserver(Observer o);
void notifyObservers();
}
// 具体主题(温度传感器)
class TemperatureSensor implements Subject {
private List<Observer> observers = new ArrayList<>();
private float temperature;
public void setTemperature(float temp) {
this.temperature = temp;
notifyObservers();
}
@Override
public void registerObserver(Observer o) {
observers.add(o);
}
@Override
public void notifyObservers() {
for (Observer o : observers) {
o.update(temperature);
}
}
}
// 观察者接口
interface Observer {
void update(float temperature);
}
// 具体观察者(温度显示器)
class TemperatureDisplay implements Observer {
@Override
public void update(float temp) {
System.out.println("当前温度: " + temp + "°C");
}
}
// 使用示例
public class Main {
public static void main(String[] args) {
TemperatureSensor sensor = new TemperatureSensor();
sensor.registerObserver(new TemperatureDisplay());
sensor.setTemperature(25.5f); // 输出:当前温度: 25.5°C
}
}
复制代码
Reactive扩展(RxJS)
// 创建一个可观察对象(Subject)
const subject = new rxjs.Subject();
// 订阅观察者
const subscription1 = subject.subscribe(
value => console.log(`Observer 1: ${value}`)
);
const subscription2 = subject.subscribe(
value => console.log(`Observer 2: ${value}`)
);
// 发送数据
subject.next('Hello');
subject.next('World');
// 输出:
// Observer 1: Hello
// Observer 2: Hello
// Observer 1: World
// Observer 2: World
复制代码
2.2.3 观察者模式与Reactive编程的关系
数据流即Subject
:Reactive框架中的Observable本质上是加强版的Subject,支持多播和操纵符。
观察者即Subscriber
:订阅者通过subscribe()方法注册回调,相当于观察者模式的update()。
扩展本领
:
多播(Multicast):一个数据流被多个观察者共享
生命周期管理:complete()和error()关照
操纵符链式调用:map、filter等转换操纵
2.2.4 观察者模式的实际应用
(1) 用户界面变乱处理
// 按钮点击事件观察
const button = document.getElementById('myButton');
const click$ = fromEvent(button, 'click');
click$.subscribe(event => {
console.log('Button clicked at:', event.timeStamp);
});
复制代码
(2) 状态管理(Redux Store)
// Redux的Store本质上是一个Subject
const store = createStore(reducer);
store.subscribe(() => {
console.log('State changed:', store.getState());
});
store.dispatch({ type: 'INCREMENT' });
复制代码
(3) WebSocket及时通信
const socket = new WebSocket('ws://example.com');
const message$ = new Subject();
socket.onmessage = event => {
message$.next(event.data);
};
message$.subscribe(data => {
console.log('Received:', data);
});
复制代码
2.2.5 观察者模式的优缺点
优点
松耦合
:Subject和Observer之间无直接依靠
动态关系
:可运行时添加/删除观察者
广播通信
:一次状态变更可关照多个观察者
缺点
内存走漏风险
:未正确注销观察者会导致引用残留
关照顺序不可控
:观察者被调用的顺序可能影响系统举动
调试困难
:数据流动的隐式流传可能增加调试复杂度
2.2.6 观察者模式的变体
变体
形貌
Reactive实现
发布-订阅模式引入变乱通道,解耦发布者和订阅者RxJS的Subject相应式属性(Binding)自动同步对象属性变化(如Vue的v-model)MobX的observable数据总线(Event Bus)全局变乱中央,任意组件可发布/订阅变乱Vue的EventEmitter
总结
数据流
是Reactive编程的核心抽象,代表随时间变化的变乱序列,可通过操纵符举行机动转换。
观察者模式
是Reactive系统的底层机制,通过订阅/关照机制实现数据变化的自动流传。
两者结合使得Reactive编程可以或许高效处理异步、及时数据,实用于从UI交互到分布式系统的广泛场景。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 IT评测·应用市场-qidao123.com技术社区 (https://dis.qidao123.com/)
Powered by Discuz! X3.4