Project Reactor作为响应式编程范式的核心实现框架,严格遵循Reactive Streams规范体系,其架构设计完备包含了规范定义的四个核心组件:Publisher(数据源)、Subscriber(订阅者)、Subscription(订阅关系)和Processor(处理节点)。在该框架中,Flux和Mono不仅实现了Publisher接口的标准语义,更构建了完备的响应式数据流处理范式:通过订阅关系建立生产-消费通道,基于事件驱动机制实现非壅闭式数据推送,同时通过背压(backpressure)协议保障体系的弹性通信。
基本流程
从团体上理解 Project Reactor 的工作原理,可以或许帮助我们更清晰地把握其中的各种概念和操作,克制迷失方向。实际上,从大局来看,整个 Reactor 就是基于订阅-发布模式的。Flux 和 Mono 作为体系中默认的 Publisher,简化了我们自定义 Publisher 的工作。Flux 和 Mono 集成了大量的操作符,这些操作符的存在镌汰了我们自定义 Subscriber 和 Processor 的需求。通过这些操作符的组合,我们可以直接对数据源和元素进行操作,而无需本身编写额外的 Processor 和 Subscriber。除非在特殊情况下,否则不发起自动去自定义 Subscriber 和 Processor。- 创建数据源(Flux,Mono)->转换和处理数据(map,filter...)->subscribe订阅数据源
复制代码 一、响应式数据源:
1.1 Flux与Mono
作为Project Reactor的核心发布者,Flux和Mono的主要区别如下:
- Flux代表0-N个元素的异步序列
- Mono表示0-1个结果的异步操作
- // 创建Flux
- Flux.just("1", "2", "3").subscribe(System.out::println);
- // 创建Mono
- Mono.just("a").subscribe(System.out::println);
复制代码 1.2 数据源类型
了解了Flux和Mono之后,我们知道了如何简朴的创建数据源,其中Flux和Mono也给我们提供了非常多的创建数据源的方式,大概分为以下几类。
- 空数据源: 用于表示无数据的完成信号(如删除操作的结果)。
- 动态天生:Mono.create 和 Flux.generate/Flux.create 允许手动控制元素发射(同步或异步)。
- 异步数据源: 从 Future、Callable 或 Supplier 中获取数据,支持非壅闭操作。
- 时间驱动: Mono.delay 延迟发射,Flux.interval 周期性发射递增数值。
- 合并/组合: zip 严格对齐元素,merge 无序合并,concat 次序连接。
- 背压适配: 通过 FluxSink 或 MonoSink 手动控制背压和元素发射。
Mono 和 Flux 数据源创建方式分类总结
类别描述Mono 方法示例Flux 方法示例空数据源创建不发射任何元素的数据流。Mono.empty()Flux.empty()单个元素发射单个静态值或对象。Mono.just(T)Flux.just(T...)多个元素发射多个静态值或对象(仅 Flux 支持)。N/AFlux.just(T1, T2...)集合/数组从集合或数组天生元素。N/AFlux.fromIterable(List) Flux.fromArray(T[])流(Stream)从 Java Stream 天生元素。N/AFlux.fromStream(Stream)动态天生通过天生器函数动态天生元素。Mono.create(sink -> {...})Flux.generate(sink -> {...}) Flux.create(sink -> {...})异步数据源从异步操作(如 Future、Callable)获取数据。Mono.fromFuture(Future) Mono.fromCallable(Callable)Flux.from(Publisher) Flux.fromStream(Supplier)错误信号直接发射错误信号。Mono.error(Throwable)Flux.error(Throwable)延迟初始化惰性天生数据(订阅时才执行逻辑)。Mono.defer(() -> ...) Mono.fromSupplier(Supplier)Flux.defer(() -> ...) Flux.fromStream(Supplier)时间驱动基于时间天生数据(如定时、延迟)。Mono.delay(Duration)Flux.interval(Duration)合并/组合合并多个数据源。Mono.zip(Mono1, Mono2...)Flux.merge(Flux1, Flux2...) Flux.concat(Flux1, Flux2...) Flux.zip(Flux1, Flux2...)背压适配适配外部背压机制(如 Sink 手动控制)。Mono.create(MonoSink)Flux.create(FluxSink)条件触发根据条件天生数据(如 first、takeUntil)。Mono.firstWithValue(Mono1, Mono2)Flux.firstWithValue(Publisher...) Flux.takeUntil(Predicate)1.3 数据源发布模子
Project Reactor 的发布模子是其响应式编程的核心机制,主要分为 冷发布者(Cold Publisher) 和 热发布者(Hot Publisher)。它们的区别在于数据流的天生、共享方式以及订阅者的消费行为。以下是详细表明:
1.3.1、冷发布者(Cold Publisher)
定义:冷发布者为每个订阅者天生独立的数据流。每个订阅者都会触发数据源的完备天生过程,即使其他订阅者已订阅过。
特点:
- 数据流独立:每个订阅者从头开始消费数据。
- 延迟天生:数据在订阅时才开始天生(惰性盘算)。
- 资源隔离:不同订阅者的数据天生逻辑互不影响。
适用场景:
- 需要每个订阅者获取完备数据(如 HTTP 请求、数据库查询)。
- 数据源的天生成本较高,但需确保订阅者的独立性。
代码示例- // 创建冷发布者
- Flux<Integer> coldFlux = Flux.range(1, 3).doOnNext(i -> System.out.println("冷发布者发出: " + i));
- // 第一个订阅者
- coldFlux.subscribe(i -> System.out.println("订阅者1: " + i));
- // 第二个订阅者
- coldFlux.subscribe(i -> System.out.println("订阅者2: " + i));
复制代码 输出- 冷发布者发出: 1
- 订阅者1: 1
- 冷发布者发出: 2
- 订阅者1: 2
- 冷发布者发出: 1
- 订阅者2: 1
- 冷发布者发出: 2
- 订阅者2: 2
复制代码 1.3.2、热发布者(Hot Publisher)
- 定义:热发布者共享一个统一的数据流,所有订阅者消费同一份数据。数据源的天生与订阅者的订阅时间无关,后订阅的订阅者可能错过早期数据。
特点:
- 数据流共享:所有订阅者吸收同一数据源。
- 及时性:数据源的天生独立于订阅行为。
- 资源复用:多个订阅者共享同一数据天生逻辑。
适用场景:
- 及时事件推送(如传感器数据、股票报价)。
- 需要广播数据,克制重复天生高成本操作(如 WebSocket 消息)。
热发布者的实现方式有如下几种:
通过 publish() 方法将 Flux 转换为 ConnectableFlux,需手动调用 connect() 启动数据流。
代码示例- // 创建 ConnectableFlux 并转换为热发布者
- ConnectableFlux<Integer> hotFlux = Flux.range(1, 3)
- .doOnNext(i -> System.out.println("热发布者发出: " + i))
- .publish(); // 转换为 ConnectableFlux
- // 订阅者A
- hotFlux.subscribe(i -> System.out.println("订阅者A: " + i));
- // 订阅者B
- hotFlux.subscribe(i -> System.out.println("订阅者B: " + i));
- // 手动触发数据流开始
- hotFlux.connect();
复制代码 输出- 热发布者发出: 1
- 订阅者A: 1
- 订阅者B: 1
- 热发布者发出: 2
- 订阅者A: 2
- 订阅者B: 2
- 热发布者发出: 3
- 订阅者A: 3
- 订阅者B: 3
复制代码 当达到指定订阅者数量时,自动启动数据流。- Flux<Integer> autoFlux = Flux.range(1, 3)
- .doOnNext(i -> System.out.println("热发布者发出: " + i))
- .publish()
- .autoConnect(2);// 当有 2 个订阅者时自动启动
- autoFlux.subscribe(i -> System.out.println("订阅者A: " + i));
- autoFlux.subscribe(i -> System.out.println("订阅者B: " + i));
复制代码 输出- 热发布者发出: 1
- 订阅者A: 1
- 订阅者B: 1
- 热发布者发出: 2
- 订阅者A: 2
- 订阅者B: 2
- 热发布者发出: 3
- 订阅者A: 3
- 订阅者B: 3
复制代码 等价于 publish().refCount(1):当第一个订阅者到来时启动,最后一个取消订阅时终止。- Flux<Long> sharedFlux = Flux.interval(Duration.ofSeconds(1))
- .doOnNext(i -> System.out.println("热发布者发出: " + i))
- .take(5)
- .share();
- sharedFlux.subscribe(i -> System.out.println("订阅者A: " + i));
- Thread.sleep(2500);
- sharedFlux.subscribe(i -> System.out.println("订阅者B: " + i)); // 订阅者B错过前2个数据
复制代码 输出- 热发布者发出: 0
- 订阅者A: 0
- 热发布者发出: 1
- 订阅者A: 1
- 热发布者发出: 2
- 订阅者A: 2
- 订阅者B: 2
- 热发布者发出: 3
- 订阅者A: 3
- 订阅者B: 3
- 热发布者发出: 4
- 订阅者A: 4
- 订阅者B: 4
复制代码 允许新订阅者消费订阅前的历史数据(缓存策略可设置)。- ConnectableFlux<Integer> replayFlux = Flux.range(1, 3)
- .doOnNext(i -> System.out.println("热发布者发出: " + i))
- .replay(2);// 缓存最近2个数据
- replayFlux.subscribe(i -> System.out.println("订阅者A: " + i));
- replayFlux.connect();
- Thread.sleep(1000);
- replayFlux.subscribe(i -> System.out.println("订阅者B: " + i)); // 订阅者B收到最后2个数据
复制代码 输出:- 热发布者发出: 1
- 订阅者A: 1
- 热发布者发出: 2
- 订阅者A: 2
- 热发布者发出: 3
- 订阅者A: 3
- 订阅者B: 2
- 订阅者B: 3
复制代码 冷发布者和热发布者对比表格
特性冷发布者热发布者数据天生时机订阅时天生提前天生(或由 connect() 触发)订阅者独立性每个订阅者独立消费完备数据共享同一数据流资源消耗高(每个订阅者独立天生)低(共享天生逻辑)典型场景数据库查询、静态数据及时事件、广播二、强大的操作符生态体系
2.1 核心操作符分类
类别操作符示例功能描述转换操作符buffer, map, flatMap, window修改流中元素结构或内容(如分组、映射、扁平化)过滤操作符filter, take, skip按条件筛选元素(如保留满意条件的元素、跳过前N项)组合操作符merge, concat, zip合并多个流(如按次序连接、并行合并、元素一一配对)条件操作符any, all, hasElement判定流中元素是否满意条件(如是否存在满意条件的元素)数学操作符count, sum, reduce对元素进行聚合盘算(如统计总数、求和、累加)错误处理操作符onErrorReturn, onErrorResume异常时提供备选值或切换至备用流(如返回静态值、动态规复逻辑)工具操作符delay, timeout, log, subscribe控制流生命周期(如延迟发送、超时停止、记录日志、触发订阅)整个数据源操作doOnNext,,,doOnRequest,doOnSubscribe,doOnComplete等其中以doOn开头的可以对整个数据链的不同状态进行操作2.2 常见操作(类似Java的Stream)
- //转换操作符、过滤操、条件及数学操作符类似Java的Stream这里不做过多赘述
- //map
- Flux.just(1, 2, 3).map(i -> i + 1).subscribe(System.out::println);
- //filter
- Flux.just("a", "b", "c").filter(s -> s.equals("a")).subscribe(System.out::println);
- //flatMap
- Flux.just("a", "b", "c").flatMap(s -> Flux.just(s.toUpperCase())).subscribe(System.out::println);
- //reduce
- Flux.just(1, 2, 3).reduce(0, (a, b) -> a + b).subscribe(System.out::println);
- //window 窗口使用
- Flux.just(1, 2, 3, 4, 5, 6).window(3, 1).flatMap(e -> e.reduce(0, Integer::sum)).subscribe(System.out::println);
- //buffer 背压或者批处理使用,会缓存数据
- Flux.just(1, 2, 3, 4, 5, 6).buffer(3, 1).subscribe(System.out::println);
复制代码 2.3 组合操作符
zip操作符可以将多个(最多8个)流合并成一个流,合并的方式是将两个流中的元素按照次序一一对应,然后将两个元素组合成一个元素。 假如两个流的长度不同等,那么终极合并成的流的长度就是两个流中长度较短的那个流的长度。- Flux<String> flux1 = Flux.just("a", "b", "c");
- Flux<String> flux2 = Flux.just("d", "e", "f");
- Flux<String> flux3 = Flux.just("1", "2", "3");
- Flux.zip(flux1, flux2, flux3).subscribe(System.out::println);
- //输出
- [a,d,1]
- [b,e,2]
- [c,f,3]
复制代码 merge 操作符可以将两个流合并成一个流,合并的方式是将两个流中的元素交替地放入到合并后的流中。同时运行,根据时间先后运行。- Flux<Integer> flux3 = Flux.just(1, 2, 3).delayElements(Duration.ofMillis(80));
- Flux<Integer> flux4 = Flux.just(4, 5, 6).delayElements(Duration.ofMillis(50));
- flux3.mergeWith(flux4).subscribe(System.out::println);
- //输出 由于是根据时间先后处理,所以结果大概率是这样,也有可能会稍有不同
- 4
- 1
- 5
- 2
- 6
- 3
复制代码 concat 操作符可以将两个流合并成一个流,合并的方式是将两个流中的元素按照次序放入到合并后的流中。按照次序分别运行,flux1运行完成以后再运行flux2- Flux<Integer> flux1 = Flux.just(1, 2, 3).delayElements(Duration.ofMillis(80));
- Flux<Integer> flux2 = Flux.just(4, 5, 6).delayElements(Duration.ofMillis(50));
- flux1.concatWith(flux2).subscribe(System.out::println);
- //输出
- 1
- 2
- 3
- 4
- 5
- 6
复制代码 2.4 整个数据源操作
Project Reactor 提供了大量的以doOn开头的方法,这些方法用于在数据流的生命周期中插入副作用逻辑(如日志、监控或资源管理),不修改数据流本身,仅用于观察或触发行为。
每个方法的使用方法大致相同,下面以doOnRequest和doOnNext做一下简朴的示例。- Flux.just(1, 2, 3, 4, 5, 6).doOnNext(s -> System.out.println("doOnNext: " + s)).subscribe();
- System.out.println("----------------");
- Flux.just(1, 2, 3).doOnRequest(s -> System.out.println("doOnRequest: " + s)).subscribe(System.out::println);
- //输出
- doOnNext: 1
- doOnNext: 2
- doOnNext: 3
- doOnNext: 4
- doOnNext: 5
- doOnNext: 6
- ----------------
- doOnRequest: 9223372036854775807
- 1
- 2
- 3
复制代码 下面是每个方法的使用场景和触发时机。
方法触发时机参数类型典型场景doOnSubscribe订阅时Consumer资源初始化doOnNext元素推送时Consumer日志记录、状态更新doOnError发生错误时Consumer错误监控、报警doOnComplete流正常结束时Runnable完成关照doOnRequest下游请求数据时Consumer背压调试、请求量监控doOnCancel取消订阅时Runnable资源释放doOnEach所有事件发生时Consumer统一事件处理doOnTerminate流终止前(完成/错误前)Runnable终止前清理逻辑doAfterTerminate流终止后(完成/错误后)Runnable终止后统计doOnDiscard元素被丢弃时Consumer资源回收、数据同等性查抄三、执行控制:订阅与调度
3.1 订阅机制
subscribe 操作符用来订阅流中的元素。 当流中的元素没有被订阅的时候,所有的操作都不会触发,只有当流中的元素被订阅的时候,所有的操作才会触发。 通过上面内容的阅读,信赖你已经对Project Reactor的发布订阅模子已经了解了个大概,上面的订阅的例子也有很多,这里不做过多的赘述。
3.2 调度器策略
Schedulers 是管理线程和并发使命的核心工具,用于控制响应式流的执行上下文。通过合理选择调度器,可以优化资源使用、克制壅闭,并提拔应用性能
调度器线程模子适用场景注意事项immediate当前线程轻量级同步操作克制壅闭single单线程严格次序执行克制长时间壅闭boundedElastic动态线程池壅闭 I/O 操作控制最大线程数和队列容量parallel固定大小线程池盘算麋集型并行使命线程数默认等于 CPU 核心数fromExecutorService自定义线程池集成现有线程池需自行管理生命周期3.3 默认调度器
在 Project Reactor 中,可以很方便的通过publishOn和subscribeOn来切换使用的线程调度器。- Flux.range(1, 10)
- .publishOn(Schedulers.boundedElastic()) //切换调度器
- .log("publish thread:")
- .flatMap(n -> Mono.fromCallable(() -> n).subscribeOn(Schedulers.parallel())) //切换调度器
- .log("subscribe thread:")
- .subscribe();
复制代码 3.4 自定义虚拟线程调度器
当然在JDK17及更改的版本中也可以联合虚拟线程进一步提高并发量。- Scheduler customSchedule = Schedulers.fromExecutor(Executors.newVirtualThreadPerTaskExecutor());
- Flux.range(1, 10)
- .publishOn(customSchedule)
- .log("publish thread:")
- .flatMap(n -> Mono.fromCallable(() -> n).subscribeOn(Schedulers.parallel()))
- .log("subscribe thread:")
- .subscribe();
复制代码 四、高级控制组件
4.1 Processor与Sink的关系
在 Project Reactor 中,Processor 曾是一个关键组件,但随着 Reactor 3.4+ 版本的演进,官方逐渐将其标志为弃用(Deprecated),并推荐使用更现代的 Sink API 替代。以下是弃用缘故起因、两者核心区别。
1. 线程安全
- processor:大多数 Processor 实现(如 DirectProcessor、UnicastProcessor)非线程安全,直接调用 onNext、onComplete 等方法需手动同步。
- Sink: 原子性操作:Sink 提供 tryEmitNext、tryEmitError 等方法,确保多线程推送数据时的安全性。
2. 脚色定位
- Processor: 同时作为 Publisher 和 Subscriber,这种设计虽然机动,但导致职责不清晰,轻易误用。
- Sink:仅作为纯生产者(仅天生数据流)
3. 背压处理
- processor
对背压的支持差别大:
- DirectProcessor 完全忽略背压(无界队列)。
- UnicastProcessor 支持单订阅者的背压,但需手动设置缓冲区。
- Sink:内置设置,通过 onBackpressureBuffer、onBackpressureError 等链式方法直接定义背压行为。
4. 生命周期管理复杂
- processor:需显式调用 onComplete 或 onError 结束流,若遗漏可能导致资源泄漏或订阅者挂起。
- Sink:通过 tryEmitComplete 和 tryEmitError 明确结束流,克制资源泄漏。
5. API 设计
- processor:Processor 的 API 未针对现代响应式编程模式优化(如缺少对重试、重播的内置支持)。
- Sink:机动简朴,通过 Sinks.Many 的 multicast()、unicast() 或 replay() 快速设置多订阅者行为。
4.2 API使用示例
由于processor已经被弃用,不推荐使用,这里不做过多介绍。
- 1:发送单个数据
- Sinks.One<String> sink = Sinks.one();
- Mono<String> mono = sink.asMono();
- mono.subscribe(
- value -> System.out.println("Received: " + value),
- error -> System.err.println("Error: " + error),
- () -> System.out.println("Completed")
- );
- sink.tryEmitValue("Hello"); // 等效于 tryEmitNext + tryEmitComplete
复制代码 - 2:发送多个数据
- // 创建多播 Sink, 并设计缓冲被压策略
- Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
- //转换为flux
- Flux<String> hotFlux = sink.asFlux().map(String::toUpperCase);
- // 订阅者A
- hotFlux.subscribe(i -> System.out.println("订阅者A: " + i));
- // 订阅者B
- hotFlux.subscribe(i -> System.out.println("订阅者B: " + i));
- // 发送数据
- sink.tryEmitNext("hello");
- sink.tryEmitNext("world");
- sink.tryEmitComplete();
复制代码 - 3:支持历史数据
- // 创建重播 Sink,保留最近 2 个元素
- Sinks.Many<String> sink = Sinks.many().replay().limit(2);
- sink.tryEmitNext("A");
- sink.tryEmitNext("B");
- // 订阅者1 (接收历史数据 A, B)
- sink.asFlux().subscribe(s -> System.out.println("Sub1: " + s));
- // 推送新数据
- sink.tryEmitNext("C");
- // 订阅者2(接收历史数据 B, C)
- sink.asFlux().subscribe(s -> System.out.println("Sub2: " + s));
- //输出
- Sub1: A
- Sub1: B
- Sub1: C
- Sub2: B
- Sub2: C
复制代码 4.2 背压
4.2.1 背压策略
1.onBackpressureBuffer(缓冲策略)
- 行为:将未消费的数据存储在缓冲区中,等待下游请求时发送。
- 设置选项
- 缓冲区大小:可指定有界或无界(默认无界,需审慎使用)。
- 溢出策略
- ERROR:缓冲区满时抛出 IllegalStateException。
- DROP_LATEST:丢弃新数据,保留旧数据。
- DROP_OLDEST:丢弃最旧数据,保留新数据。
- 适用场景:允许短暂的速率不匹配,但需控制内存占用。
2. onBackpressureError(错误策略)
- 行为:当缓冲区满或下游未请求时,立刻抛出错误(IllegalStateException)。
- 适用场景:严格要求及时性,容忍数据丢失但需快速失败。
3. directBestEffort(尽力而为策略)
- 行为:无缓冲区,直接推送数据到下游。假如下游未请求,静默丢弃新数据。
- 特点:克制内存占用,但可能导致数据丢失。
- 适用场景:及时事件处理(如日志、指标收罗),允许偶然丢失。
4. replay(重播策略)
- 行为
新订阅者重播历史数据
- 同时支持及时数据推送。
- 可设置重播的缓冲区大小(如保留最近的 N 个元素)。
- 背压处理
- 对新订阅者:重播历史数据时遵循背压请求。
- 对及时数据:使用 onBackpressureBuffer 或 directBestEffort 策略。
- 适用场景:需要新订阅者获取历史数据的场景。
4.2.2. 默认策略
- multicast():默认使用 directBestEffort(无缓冲区)。
- unicast():默认使用 onBackpressureBuffer(无界缓冲区)。
- replay():默认保留所有历史数据(无界缓冲区)。
五、 Hooks与Context
5.1 Hooks
在 Project Reactor 中,Hooks 是一组全局回调机制,允许对 Reactor 库的默认行为进行定制化扩展,用于调试、监控或修改响应式流的执行逻辑。
1、Hooks 的核心用途
- 全局错误处理:捕捉未被下游处理的异常。
- 操作符生命周期监控:在操作符执行前后插入自定义逻辑。
- 调试与追踪:增强堆栈跟踪信息,定位异步流问题。
- 行为修改:动态替换或包装操作符的实现。
2、常用 Hooks 及功能
1. onOperatorError
- 作用:捕捉操作符执行过程中抛出的未处理异常。
- 典型场景:统一日志记录、转换错误类型。
- Hooks.onOperatorError((error, context) -> {
- System.err.println("全局捕获异常: " + error);
- return error;
- });
复制代码 2. onNextDropped
- 作用:处理因下游取消订阅、背压溢出等缘故起因被丢弃的 onNext 元素。
- 典型场景:记录丢失的数据,用于审计或补偿。
- Hooks.onNextDropped(item ->
- System.out.println("元素被丢弃: " + item)
- );
复制代码 3. onErrorDropped
- 作用:处理因下游已终止(如已调用 onComplete)而被丢弃的 onError 信号。
- 典型场景:克制静默忽略错误。
- Hooks.onErrorDropped(error ->
- System.err.println("错误被丢弃: " + error)
- );
复制代码 4. onOperatorDebug
- 作用:启用调试模式,为异步操作符天生增强的堆栈跟踪信息(含订阅点位置)。
- 代价:增长性能开销,仅限开发环境使用。
- Hooks.onOperatorDebug(); // 启用调试模式
复制代码 5. onEachOperator / onLastOperator
- 作用:在每个操作符执行前后插入自定义逻辑(如日志、指标收罗)。
- 典型场景:性能监控、动态修改数据流。
- Hooks.onEachOperator(operator -> {
- long start = System.currentTimeMillis();
- return original -> original.doFinally(signal ->
- System.out.println("操作符耗时: " + (System.currentTimeMillis() - start) + "ms")
- );
- });
复制代码 6. 重置 Hooks
- 规复默认行为
- javaCopy CodeHooks.resetOnOperatorError();
- Hooks.resetOnNextDropped();
- Hooks.resetOnOperatorDebug();
复制代码 4.4 Context
在 Project Reactor 中,Context 是用于在响应式流的各个阶段之间传递上下文数据的核心机制。它办理了传统 ThreadLocal 在异步、多线程环境中的范围性,允许数据在操作符链中安全传递。以下是 Context 的详细分析,涵盖其设计头脑、API 使用及典型场景。
1. 为什么需要 Context?
- 问题:在异步响应式流中,数据可能由不同线程处理,ThreadLocal 无法跨线程传递。
- 办理方案:Context 提供一种与订阅链绑定的、不可变的键值存储,确保上下文数据在流的生命周期内可被安全访问。
2. Context 的特点
- 不可变性:每次修改会天生新实例,确保线程安全。
- 订阅链绑定:数据跟随订阅链传递,而非依赖线程(需要注意的是Context的传递是从底部往上传递的)。
- 键值存储:类似 Map 结构,支持类型安全的键(ContextKey)。
- 自底向上(Downstream → Upstream)
- 写入次序:后调用的 contextWrite 会覆盖先调用的。
- 读取次序:下游(靠近订阅点)的 Context 优先被访问。
通过 contextWrite 操作符将 Context 写入响应式流,通过deferContextual在流中读取 Context- //注意由于ontext的传递是从底部往上传递的,所以必须在下面(A点)先写入才能在(B点读取到)
- Flux.just("A", "B", "C", "D")
- //记为B点 拼接 Context 中的值
- .flatMap(s -> {
- System.out.println("ssss:" + s);
- return Mono.deferContextual(ctx -> Mono.just(s + ctx.get("suffix")));
- }
- )
- //记为A点 写入 Context(关键:必须在读取操作之前调用)
- .contextWrite(Context.of("suffix", "-ctx"))
- // 订阅输出结果
- .subscribe(System.out::println);
复制代码 Context自底向上(Downstream → Upstream)传播示例
由于Context自底向上的传播特性,所以Context中B点的值会覆盖A点的值- Flux.just("A", "B", "C", "D")
- // 拼接 Context 中的值
- .flatMap(s -> {
- //由于ctx222会覆盖ctx111,所以此处拼接的是ctx222
- System.out.println("ssss:" + s);
- return Mono.deferContextual(ctx -> Mono.just(s + ctx.get("suffix")));
- }
- )
- //记为B点, 写入 Context ctx222会覆盖ctx111
- .contextWrite(Context.of("suffix", "-ctx222"))
- //记为A点, 写入 Context
- .contextWrite(Context.of("suffix", "-ctx111"))
- // 订阅输出结果
- .subscribe(System.out::println);
- //输出
- ssss:A
- A-ctx222
- ssss:B
- B-ctx222
- ssss:C
- C-ctx222
- ssss:D
- D-ctx222
复制代码 结语
通过深入理解Project Reactor这些核心概念,可以更好地驾御响应式编程范式,构建出更高效、更弹性的分布式体系。与现代虚拟线程的联合,为构建新一代高并发应用提供了更优解。通过合理选择调度策略、优化线程模子,可以在保持代码简便的同时,充分发挥硬件性能。
路漫漫其修远兮,吾将上下而求索
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |