响应式编程的适用场景

打印 上一主题 下一主题

主题 1978|帖子 1978|积分 5934

0 前提

响应式流、背压及响应式流规范。
1 弁言


  • 响应式编程能应用到啥场景?
  • 目前有啥框架用到这技术体系?
2 响应式编程应用场景

响应式编程不仅是编程技术,更是一种架构设计的系统方法,可应用于任何地方:

  • 简单的 Web 应用系统
  • 大型企业解决方案
数据流处置处罚是响应式编程一大应用场景,流式系统特点:

  • 低耽误
  • 高吞吐量
用非阻塞式通信,可确保资源高效利用,实现低耽误、高吞吐量。
高并发通常涉及大量 IO 操作,相比传统同步阻塞式 IO 模型,响应式编程的异步非阻塞式IO模型得当应对。
网关:响应来自前端系统的流量,并将其转发到后端服务。
焦点诉求

构建一个具有异步非阻塞式的请求处置处罚流程的 Web 服务,需要高效处置处罚跨服务之间的网络请求。
响应式编程的广泛应用:Hystrix、Spring Cloud Gateway 及 Spring WebFlux。
3 响应式流规范

3.1 Hystrix滑动窗口

Spring Cloud Netflix Hystrix 基于 Netflix Hystrix 实现服务熔断功能。Netflix Hystrix,Netflix 开源的一款容错库,利用HystrixCircuitBreaker类实现熔断器。
咋动态获取系统运行时的各项数据?

HealthCountsStream采用滑动窗口,大量采用数据流处置处罚方面技术及 RxJava 响应式编程框架。Hystrix 以s为单位统计系统中所有请求的处置处罚情况,再每次取最近 10s 数据盘算,如失败率超过阈值,熔断。
实现

把系统运行时产生数据视为一个个事件,滑动窗口中每个桶的数据都来自事件,通常需对其转换以便后续操作。
Hystrix采用RxJava,用其一系列操作符实现滑动窗口:

  • window 操作符,把当前流中的元素网络到另外的流序列
  • flatMap 操作符,把流中的每个元素转换成一个流,再把转换之后得到的所有流中的元素举行合并
  • reduce 操作符,对流中包罗的所有元素举行累积操作,得到一个包罗盘算结果的流
  1. this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {
  2.     @Override
  3.     public Observable<Bucket> call() {
  4.         return inputEventStream
  5.             .observe()
  6.             // 使用 window 操作符收集一个 Bucket 时间内的数据
  7.             .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
  8.             // 将每个 window 内聚集起来的事件集合汇总成 Bucket
  9.             .flatMap(reduceBucketToSummary)
  10.             .startWith(emptyEventCountsToStart);
  11.     }
  12. });
复制代码
  1. this.sourceStream = bucketedStream
  2.     // 将 N 个 Bucket 进行汇总
  3.     .window(numBuckets, 1)
  4.     // 汇总成一个窗口
  5.     .flatMap(reduceWindowToSummary)
  6.     ...
  7.     // 添加背压控制
  8.     .onBackpressureDrop();
复制代码
Hystrix 用 RxJava 的 window、flatMap等操作符来将单位窗口时间内的事件。以及将一个窗口大小内的 Bucket 聚集到一起形成滑动窗口,并基于滑动窗口集成指标数据。
3.2 Spring Cloud Gateway中的过滤器

Spring开发的API网关,基于Spring5和Spring Boot2和Proiect Reactor框架提供响应式、非阻塞式I/O模型:

只需实现GlobalFilter接口,重写 filter():
  1. public class IPLimitFilter implements GlobalFilter
  2.     @Override
  3.     public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
  4.         // 1 获取当前请求路径
  5.         String url = exchange.getRequest().getURI().getPath();
  6.         // 2 获得所有需ip限流校验的url list
  7.         List<String> ipLimitList = excludeUrlProperties.getIpLimitUrls();
  8.         // 3 校验并判断
  9.         if (ipLimitList != null && !ipLimitList.isEmpty()) {
  10.             for (String limitUrl : ipLimitList) {
  11.                 if (antPathMatcher.matchStart(limitUrl, url)) {
  12.                     // 若匹配到,则表明需进行ip拦截校验
  13.                     log.info("IPLimitFilter - 拦截到需要进行ip限流校验的方法:URL = " + url);
  14.                     return doLimit(exchange, chain);
  15.                 }
  16.             }
  17.         }
  18.         // 4 默认放行
  19.         return chain.filter(exchange);
  20.     }
  21. }
复制代码
filter()返回的Mono对象,是响应式编程框架 Project Reactor 中代表单个返回值的流式对象
案例
  1. @Component
  2. public class PreGatewayFilterFactory extends AbstractGatewayFilterFactory<PreGatewayFilterFactory.Config> {
  3.         public PreGatewayFilterFactory() {
  4.                 super(Config.class);
  5.         }
  6.         @Override
  7.         public GatewayFilter apply(Config config) {
  8.                 // grab configuration from Config object
  9.                 return (exchange, chain) -> {
  10.                         //If you want to build a "pre" filter you need to manipulate the
  11.                         //request before calling chain.filter
  12.                         ServerHttpRequest.Builder builder = exchange.getRequest().mutate();
  13.                         //use builder to manipulate the request
  14.                         return chain.filter(exchange.mutate().request(builder.build()).build());
  15.                 };
  16.         }
  17.         public static class Config {
  18.                 //Put the configuration properties for your filter here
  19.         }
  20. }
复制代码
3.3 Spring Webflux 中的请求处置处罚流程

Spring 5 中引入的全新的响应式 Web 服务开发框架。针对涉及大量I/O 操作的服务化架构,WebFlux也是解决方案。
工作流程


示例
  1. public Mono<Void> handle(ServerWebExchange exchange) {
  2.     if (this.handlerMappings == null) {
  3.         return createNotFoundError();
  4.     }
  5.     return Flux.fromIterable(this.handlerMappings)
  6.             .concatMap(mapping -> mapping.getHandler(exchange))
  7.             .next()
  8.             // 如果没有找到 HandlerMapping,则抛出异常
  9.             .switchIfEmpty(createNotFoundError())
  10.             // 触发 HandlerAdapter 的 handle 方法
  11.             .flatMap(handler -> invokeHandler(exchange, handler))
  12.             // 触发HandlerResultHandler的handleResult方法
  13.             .flatMap(result -> handleResult(exchange, result));
  14. }
复制代码
总结

通过理论联系实际,讨论了响应式编程的详细应用场景。
响应式编程技术已经应用到了日常开发的很多开源框架中,这些框架在分布式系统和微服务架构中得到了广泛的应用。
FAQ

Q:Netflix Hystrix 中基于响应式流的滑动窗口实现机制?
A:Netflix Hystrix 中基于响应式流的滑动窗口实现机制是通过在数据流中利用滑动窗口来实现的。滑动窗口是一种将数据流分成固定大小的块的技术,每个块的大小和时间范围是可配置的。在 Hystrix 中,滑动窗口被用来网络服务调用的响应时间、成功率等指标,并在这些指标上实验断路器逻辑。
详细实现


  • Hystrix为每个服务调用创建一个独立的滑动窗口,滑动窗口包罗最近一段时间内所有调用指标
  • 滑动窗口会根据配置的时间范围和块大小举行分割,并在每个块中记载指标数据
  • 每个块都有一个计数器来记载成功和失败的调用次数以及响应时间等指标
  • 在每个块的结束时,Hystrix会根据计数器中的数据盘算出该块的成功率、平均响应时间等指标,并将这些数据发送到断路器中举行判断
  • 如果断路器发现连续若干个时间段内的成功率低于阈值或平均响应时间超过阈值,就会触发断路器打开操作,停止向该服务发送请求
  • 当断路器打开后,Hystrix 会定期实验发送一个测试请求到该服务,如果测试请求成功,则断路器关闭,否则继续保持打开状态
通过基于响应式流的滑动窗口实现机制,Hystrix 可以快速地检测到服务调用失败、超时等问题,并在出现问题时快速地停止向该服务发送请求,从而提高了系统的可靠性和稳定性。
本文已收录在Github关注我,紧跟本系列专栏文章,咱们下篇再续!
<ul>
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
继续阅读请点击广告

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

杀鸡焉用牛刀

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表