IT评测·应用市场-qidao123.com

标题: 响应式编程之Project Reactor [打印本页]

作者: 勿忘初心做自己    时间: 2025-3-24 15:57
标题: 响应式编程之Project Reactor
Project Reactor作为响应式编程范式的核心实现框架,严格遵循Reactive Streams规范体系,其架构设计完备包含了规范定义的四个核心组件:Publisher(数据源)、Subscriber(订阅者)、Subscription(订阅关系)和Processor(处理节点)。在该框架中,FluxMono不仅实现了Publisher接口的标准语义,更构建了完备的响应式数据流处理范式:通过订阅关系建立生产-消费通道,基于事件驱动机制实现非壅闭式数据推送,同时通过背压(backpressure)协议保障体系的弹性通信。
基本流程

从团体上理解 Project Reactor 的工作原理,可以或许帮助我们更清晰地把握其中的各种概念和操作,克制迷失方向。实际上,从大局来看,整个 Reactor 就是基于订阅-发布模式的。Flux 和 Mono 作为体系中默认的 Publisher,简化了我们自定义 Publisher 的工作。Flux 和 Mono 集成了大量的操作符,这些操作符的存在镌汰了我们自定义 Subscriber 和 Processor 的需求。通过这些操作符的组合,我们可以直接对数据源和元素进行操作,而无需本身编写额外的 Processor 和 Subscriber。除非在特殊情况下,否则不发起自动去自定义 Subscriber 和 Processor。
  1. 创建数据源(Flux,Mono)->转换和处理数据(map,filter...)->subscribe订阅数据源
复制代码
一、响应式数据源:

1.1 Flux与Mono

作为Project Reactor的核心发布者,Flux和Mono的主要区别如下:
  1. // 创建Flux
  2. Flux.just("1", "2", "3").subscribe(System.out::println);
  3. // 创建Mono
  4. Mono.just("a").subscribe(System.out::println);
复制代码
1.2 数据源类型

了解了Flux和Mono之后,我们知道了如何简朴的创建数据源,其中Flux和Mono也给我们提供了非常多的创建数据源的方式,大概分为以下几类。
Mono 和 Flux 数据源创建方式分类总结
类别‌‌描述‌‌Mono 方法示例‌‌Flux 方法示例‌‌空数据源‌创建不发射任何元素的数据流。Mono.empty()Flux.empty()‌单个元素‌发射单个静态值或对象。Mono.just(T)Flux.just(T...)‌多个元素‌发射多个静态值或对象(仅 Flux 支持)。N/AFlux.just(T1, T2...)‌集合/数组‌从集合或数组天生元素。N/AFlux.fromIterable(List) Flux.fromArray(T[])‌流(Stream)‌从 Java Stream 天生元素。N/AFlux.fromStream(Stream)‌动态天生‌通过天生器函数动态天生元素。Mono.create(sink -> {...})Flux.generate(sink -> {...}) Flux.create(sink -> {...})‌异步数据源‌从异步操作(如 Future、Callable)获取数据。Mono.fromFuture(Future) Mono.fromCallable(Callable)Flux.from(Publisher) Flux.fromStream(Supplier)‌错误信号‌直接发射错误信号。Mono.error(Throwable)Flux.error(Throwable)‌延迟初始化‌惰性天生数据(订阅时才执行逻辑)。Mono.defer(() -> ...) Mono.fromSupplier(Supplier)Flux.defer(() -> ...) Flux.fromStream(Supplier)‌时间驱动‌基于时间天生数据(如定时、延迟)。Mono.delay(Duration)Flux.interval(Duration)‌合并/组合‌合并多个数据源。Mono.zip(Mono1, Mono2...)Flux.merge(Flux1, Flux2...) Flux.concat(Flux1, Flux2...) Flux.zip(Flux1, Flux2...)‌背压适配‌适配外部背压机制(如 Sink 手动控制)。Mono.create(MonoSink)Flux.create(FluxSink)‌条件触发‌根据条件天生数据(如 first、takeUntil)。Mono.firstWithValue(Mono1, Mono2)Flux.firstWithValue(Publisher...) Flux.takeUntil(Predicate)1.3 数据源发布模子

Project Reactor 的发布模子是其响应式编程的核心机制,主要分为 ‌冷发布者(Cold Publisher)‌ 和 ‌热发布者(Hot Publisher)‌。它们的区别在于数据流的天生、共享方式以及订阅者的消费行为。以下是详细表明:
1.3.1、冷发布者(Cold Publisher)

定义‌:冷发布者为每个订阅者天生‌独立的数据流‌。每个订阅者都会触发数据源的完备天生过程,即使其他订阅者已订阅过。
特点‌:
适用场景‌:
1.3.2、热发布者(Hot Publisher)

热发布者的实现方式有如下几种:
通过 publish() 方法将 Flux 转换为 ConnectableFlux,需手动调用 connect() 启动数据流。
代码示例
  1.         // 创建 ConnectableFlux 并转换为热发布者
  2.         ConnectableFlux<Integer> hotFlux = Flux.range(1, 3)
  3.                 .doOnNext(i -> System.out.println("热发布者发出: " + i))
  4.                 .publish(); // 转换为 ConnectableFlux
  5.         // 订阅者A
  6.         hotFlux.subscribe(i -> System.out.println("订阅者A: " + i));
  7.         // 订阅者B
  8.         hotFlux.subscribe(i -> System.out.println("订阅者B: " + i));
  9.         // 手动触发数据流开始
  10.         hotFlux.connect();
复制代码
输出
  1. 热发布者发出: 1
  2. 订阅者A: 1
  3. 订阅者B: 1
  4. 热发布者发出: 2
  5. 订阅者A: 2
  6. 订阅者B: 2
  7. 热发布者发出: 3
  8. 订阅者A: 3
  9. 订阅者B: 3
复制代码
当达到指定订阅者数量时,自动启动数据流。
  1.         Flux<Integer> autoFlux = Flux.range(1, 3)
  2.                 .doOnNext(i -> System.out.println("热发布者发出: " + i))
  3.                 .publish()
  4.                 .autoConnect(2);// 当有 2 个订阅者时自动启动
  5.         autoFlux.subscribe(i -> System.out.println("订阅者A: " + i));
  6.         autoFlux.subscribe(i -> System.out.println("订阅者B: " + i));
复制代码
输出
  1. 热发布者发出: 1
  2. 订阅者A: 1
  3. 订阅者B: 1
  4. 热发布者发出: 2
  5. 订阅者A: 2
  6. 订阅者B: 2
  7. 热发布者发出: 3
  8. 订阅者A: 3
  9. 订阅者B: 3
复制代码
等价于 publish().refCount(1):当第一个订阅者到来时启动,最后一个取消订阅时终止。
  1.         Flux<Long> sharedFlux = Flux.interval(Duration.ofSeconds(1))
  2.                 .doOnNext(i -> System.out.println("热发布者发出: " + i))
  3.                 .take(5)
  4.                 .share();
  5.         sharedFlux.subscribe(i -> System.out.println("订阅者A: " + i));
  6.         Thread.sleep(2500);
  7.         sharedFlux.subscribe(i -> System.out.println("订阅者B: " + i)); // 订阅者B错过前2个数据
复制代码
输出
  1. 热发布者发出: 0
  2. 订阅者A: 0
  3. 热发布者发出: 1
  4. 订阅者A: 1
  5. 热发布者发出: 2
  6. 订阅者A: 2
  7. 订阅者B: 2
  8. 热发布者发出: 3
  9. 订阅者A: 3
  10. 订阅者B: 3
  11. 热发布者发出: 4
  12. 订阅者A: 4
  13. 订阅者B: 4
复制代码
允许新订阅者消费订阅前的历史数据(缓存策略可设置)。
  1.         ConnectableFlux<Integer> replayFlux = Flux.range(1, 3)
  2.                 .doOnNext(i -> System.out.println("热发布者发出: " + i))
  3.                 .replay(2);// 缓存最近2个数据
  4.         replayFlux.subscribe(i -> System.out.println("订阅者A: " + i));
  5.         replayFlux.connect();
  6.         Thread.sleep(1000);
  7.         replayFlux.subscribe(i -> System.out.println("订阅者B: " + i)); // 订阅者B收到最后2个数据
复制代码
输出‌:
  1. 热发布者发出: 1
  2. 订阅者A: 1
  3. 热发布者发出: 2
  4. 订阅者A: 2
  5. 热发布者发出: 3
  6. 订阅者A: 3
  7. 订阅者B: 2
  8. 订阅者B: 3
复制代码
冷发布者和热发布者对比表格
特性‌‌冷发布者‌‌热发布者‌数据天生时机订阅时天生提前天生(或由 connect() 触发)订阅者独立性每个订阅者独立消费完备数据共享同一数据流资源消耗高(每个订阅者独立天生)低(共享天生逻辑)典型场景数据库查询、静态数据及时事件、广播二、强大的操作符生态体系

2.1 核心操作符分类

类别‌‌操作符示例‌‌功能描述‌‌转换操作符‌buffer, map, flatMap, window修改流中元素结构或内容(如分组、映射、扁平化)‌过滤操作符‌filter, take, skip按条件筛选元素(如保留满意条件的元素、跳过前N项)‌组合操作符‌merge, concat, zip合并多个流(如按次序连接、并行合并、元素一一配对)‌条件操作符‌any, all, hasElement判定流中元素是否满意条件(如是否存在满意条件的元素)‌数学操作符‌count, sum, reduce对元素进行聚合盘算(如统计总数、求和、累加)‌错误处理操作符‌onErrorReturn, onErrorResume异常时提供备选值或切换至备用流(如返回静态值、动态规复逻辑)‌工具操作符‌delay, timeout, log, subscribe控制流生命周期(如延迟发送、超时停止、记录日志、触发订阅)整个数据源操作doOnNext,,,doOnRequest,doOnSubscribe,doOnComplete等其中以doOn开头的可以对整个数据链的不同状态进行操作2.2 常见操作(类似Java的Stream)
  1. //转换操作符、过滤操、条件及数学操作符类似Java的Stream这里不做过多赘述
  2. //map
  3. Flux.just(1, 2, 3).map(i -> i + 1).subscribe(System.out::println);
  4. //filter
  5. Flux.just("a", "b", "c").filter(s -> s.equals("a")).subscribe(System.out::println);
  6. //flatMap
  7. Flux.just("a", "b", "c").flatMap(s -> Flux.just(s.toUpperCase())).subscribe(System.out::println);
  8. //reduce
  9. Flux.just(1, 2, 3).reduce(0, (a, b) -> a + b).subscribe(System.out::println);
  10. //window  窗口使用
  11. Flux.just(1, 2, 3, 4, 5, 6).window(3, 1).flatMap(e -> e.reduce(0, Integer::sum)).subscribe(System.out::println);
  12. //buffer  背压或者批处理使用,会缓存数据
  13. Flux.just(1, 2, 3, 4, 5, 6).buffer(3, 1).subscribe(System.out::println);
复制代码
2.3 组合操作符

zip操作符可以将多个(最多8个)流合并成一个流,合并的方式是将两个流中的元素按照次序一一对应,然后将两个元素组合成一个元素。 假如两个流的长度不同等,那么终极合并成的流的长度就是两个流中长度较短的那个流的长度。
  1. Flux<String> flux1 = Flux.just("a", "b", "c");
  2. Flux<String> flux2 = Flux.just("d", "e", "f");
  3. Flux<String> flux3 = Flux.just("1", "2", "3");
  4. Flux.zip(flux1, flux2, flux3).subscribe(System.out::println);
  5. //输出
  6. [a,d,1]
  7. [b,e,2]
  8. [c,f,3]
复制代码
merge 操作符可以将两个流合并成一个流,合并的方式是将两个流中的元素交替地放入到合并后的流中。同时运行,根据时间先后运行。
  1. Flux<Integer> flux3 = Flux.just(1, 2, 3).delayElements(Duration.ofMillis(80));
  2. Flux<Integer> flux4 = Flux.just(4, 5, 6).delayElements(Duration.ofMillis(50));
  3. flux3.mergeWith(flux4).subscribe(System.out::println);
  4. //输出  由于是根据时间先后处理,所以结果大概率是这样,也有可能会稍有不同
  5. 4
  6. 1
  7. 5
  8. 2
  9. 6
  10. 3
复制代码
concat 操作符可以将两个流合并成一个流,合并的方式是将两个流中的元素按照次序放入到合并后的流中。按照次序分别运行,flux1运行完成以后再运行flux2
  1. Flux<Integer> flux1 = Flux.just(1, 2, 3).delayElements(Duration.ofMillis(80));
  2. Flux<Integer> flux2 = Flux.just(4, 5, 6).delayElements(Duration.ofMillis(50));
  3. flux1.concatWith(flux2).subscribe(System.out::println);
  4. //输出
  5. 1
  6. 2
  7. 3
  8. 4
  9. 5
  10. 6
复制代码
2.4 整个数据源操作

Project Reactor 提供了大量的以doOn开头的方法,这些方法用于在数据流的生命周期中插入副作用逻辑(如日志、监控或资源管理),‌不修改数据流本身,仅用于观察或触发行为‌。
每个方法的使用方法大致相同,下面以doOnRequest和doOnNext做一下简朴的示例。
  1. Flux.just(1, 2, 3, 4, 5, 6).doOnNext(s -> System.out.println("doOnNext: " + s)).subscribe();
  2. System.out.println("----------------");
  3. Flux.just(1, 2, 3).doOnRequest(s -> System.out.println("doOnRequest: " + s)).subscribe(System.out::println);
  4. //输出
  5. doOnNext: 1
  6. doOnNext: 2
  7. doOnNext: 3
  8. doOnNext: 4
  9. doOnNext: 5
  10. doOnNext: 6
  11. ----------------
  12. doOnRequest: 9223372036854775807
  13. 1
  14. 2
  15. 3
复制代码
下面是每个方法的使用场景和触发时机。
方法触发时机参数类型典型场景doOnSubscribe订阅时Consumer资源初始化doOnNext元素推送时Consumer日志记录、状态更新doOnError发生错误时Consumer错误监控、报警doOnComplete流正常结束时Runnable完成关照doOnRequest下游请求数据时Consumer背压调试、请求量监控doOnCancel取消订阅时Runnable资源释放doOnEach所有事件发生时Consumer统一事件处理doOnTerminate流终止前(完成/错误前)Runnable终止前清理逻辑doAfterTerminate流终止后(完成/错误后)Runnable终止后统计doOnDiscard元素被丢弃时Consumer资源回收、数据同等性查抄三、执行控制:订阅与调度

3.1 订阅机制

subscribe 操作符用来订阅流中的元素。 当流中的元素没有被订阅的时候,所有的操作都不会触发,只有当流中的元素被订阅的时候,所有的操作才会触发。 通过上面内容的阅读,信赖你已经对Project Reactor的发布订阅模子已经了解了个大概,上面的订阅的例子也有很多,这里不做过多的赘述。
3.2 调度器策略

Schedulers‌ 是管理线程和并发使命的核心工具,用于控制响应式流的执行上下文。通过合理选择调度器,可以优化资源使用、克制壅闭,并提拔应用性能
调度器线程模子适用场景注意事项immediate当前线程轻量级同步操作克制壅闭single单线程严格次序执行克制长时间壅闭boundedElastic动态线程池壅闭 I/O 操作控制最大线程数和队列容量parallel固定大小线程池盘算麋集型并行使命线程数默认等于 CPU 核心数fromExecutorService自定义线程池集成现有线程池需自行管理生命周期3.3 默认调度器

在 Project Reactor 中,可以很方便的通过publishOn和subscribeOn来切换使用的线程调度器。
  1. Flux.range(1, 10)
  2.         .publishOn(Schedulers.boundedElastic()) //切换调度器
  3.         .log("publish thread:")
  4.         .flatMap(n -> Mono.fromCallable(() -> n).subscribeOn(Schedulers.parallel()))  //切换调度器
  5.         .log("subscribe thread:")
  6.         .subscribe();
复制代码
3.4 自定义虚拟线程调度器

当然在JDK17及更改的版本中也可以联合虚拟线程进一步提高并发量。
  1. Scheduler customSchedule = Schedulers.fromExecutor(Executors.newVirtualThreadPerTaskExecutor());
  2. Flux.range(1, 10)
  3.         .publishOn(customSchedule)
  4.         .log("publish thread:")
  5.         .flatMap(n -> Mono.fromCallable(() -> n).subscribeOn(Schedulers.parallel()))
  6.         .log("subscribe thread:")
  7.         .subscribe();
复制代码
四、高级控制组件

4.1 Processor与Sink的关系

在 Project Reactor 中,‌Processor‌ 曾是一个关键组件,但随着 Reactor 3.4+ 版本的演进,官方逐渐将其标志为‌弃用(Deprecated)‌,并推荐使用更现代的 ‌Sink API‌ 替代。以下是弃用缘故起因、两者核心区别。
1. ‌线程安全

2. ‌脚色定位

3. ‌背压处理

4. ‌生命周期管理复杂

5. ‌API 设计

4.2 API使用示例

​    由于processor已经被弃用,不推荐使用,这里不做过多介绍。
  1. // 创建重播 Sink,保留最近 2 个元素
  2. Sinks.Many<String> sink = Sinks.many().replay().limit(2);
  3. sink.tryEmitNext("A");
  4. sink.tryEmitNext("B");
  5. // 订阅者1 (接收历史数据 A, B)
  6. sink.asFlux().subscribe(s -> System.out.println("Sub1: " + s));
  7. // 推送新数据
  8. sink.tryEmitNext("C");
  9. // 订阅者2(接收历史数据 B, C)
  10. sink.asFlux().subscribe(s -> System.out.println("Sub2: " + s));
  11. //输出
  12. Sub1: A
  13. Sub1: B
  14. Sub1: C
  15. Sub2: B
  16. Sub2: C
复制代码
4.2  背压

4.2.1 背压策略

1.onBackpressureBuffer(缓冲策略)

2. onBackpressureError(错误策略)

3. directBestEffort(尽力而为策略)

4. replay(重播策略)

4.2.2. 默认策略

五、 Hooks与Context

5.1 Hooks

在 Project Reactor 中,‌Hooks‌ 是一组全局回调机制,允许对 Reactor 库的默认行为进行‌定制化扩展‌,用于调试、监控或修改响应式流的执行逻辑。
1、Hooks 的核心用途

2、常用 Hooks 及功能

1. onOperatorError
2. onNextDropped
3. onErrorDropped
4. onOperatorDebug
5. onEachOperator / onLastOperator
6. 重置 Hooks
4.4 Context

在 Project Reactor 中,‌Context‌ 是用于在响应式流的各个阶段之间传递‌上下文数据‌的核心机制。它办理了传统 ThreadLocal 在异步、多线程环境中的范围性,允许数据在操作符链中安全传递。以下是 Context 的详细分析,涵盖其设计头脑、API 使用及典型场景。
1. 为什么需要 Context?

2. Context 的特点

通过 contextWrite 操作符将 Context 写入响应式流,通过deferContextual在流中读取 Context
  1. //注意由于ontext的传递是从底部往上传递的,所以必须在下面(A点)先写入才能在(B点读取到)
  2. Flux.just("A", "B", "C", "D")
  3.         //记为B点  拼接 Context 中的值
  4.         .flatMap(s -> {
  5.                     System.out.println("ssss:" + s);
  6.                     return Mono.deferContextual(ctx -> Mono.just(s + ctx.get("suffix")));
  7.                 }
  8.         )
  9.         //记为A点  写入 Context(关键:必须在读取操作之前调用)
  10.         .contextWrite(Context.of("suffix", "-ctx"))
  11.         // 订阅输出结果
  12.         .subscribe(System.out::println);
复制代码
Context自底向上(Downstream → Upstream)传播示例
由于Context自底向上的传播特性,所以Context中B点的值会覆盖A点的值
  1. Flux.just("A", "B", "C", "D")
  2.         // 拼接 Context 中的值
  3.         .flatMap(s -> {
  4.                     //由于ctx222会覆盖ctx111,所以此处拼接的是ctx222
  5.                     System.out.println("ssss:" + s);
  6.                     return Mono.deferContextual(ctx -> Mono.just(s + ctx.get("suffix")));
  7.                 }
  8.         )
  9.         //记为B点,    写入 Context ctx222会覆盖ctx111
  10.         .contextWrite(Context.of("suffix", "-ctx222"))
  11.         //记为A点,    写入 Context
  12.         .contextWrite(Context.of("suffix", "-ctx111"))
  13.         // 订阅输出结果
  14.         .subscribe(System.out::println);
  15. //输出
  16. ssss:A
  17. A-ctx222
  18. ssss:B
  19. B-ctx222
  20. ssss:C
  21. C-ctx222
  22. ssss:D
  23. D-ctx222
复制代码
结语

通过深入理解Project Reactor这些核心概念,可以更好地驾御响应式编程范式,构建出更高效、更弹性的分布式体系。与现代虚拟线程的联合,为构建新一代高并发应用提供了更优解。通过合理选择调度策略、优化线程模子,可以在保持代码简便的同时,充分发挥硬件性能。
路漫漫其修远兮,吾将上下而求索

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/) Powered by Discuz! X3.4