【Flink入门修炼】2-2 Flink State 状态

打印 上一主题 下一主题

主题 873|帖子 873|积分 2619


  • 什么是状态?状态有什么作用?
  • 如果你来计划,对于一个流式服务,怎样根据不断输入的数据计算呢?
  • 又怎样做故障恢复呢?
一、为什么要管理状态

流计算不像批计算,数据是连续流入的,而不是一个确定的数据集。在进行计算的时候,不可能把之前已经输入的数据全都生存下来,然后再和新数据归并计算。效率低下不说,内存也扛不住。
另外,如果程序出现故障重启,没有之前计算过的状态生存,那么也就无法再继续计算了。
因此,就需要一个东西来记录各个算子之前已经计算过值的结果,当有新数据来的时候,直接在这个结果上计算更新。这个就是状态
常见的流处理状态功能如下:

  • 数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。
  • 检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的情势缓存下来。比如,判断一个温度传感器数据流中的温度是否在连续上升。
  • 对一个时间窗口内的数据进行聚合分析,分析一个小时内某项指标的75分位或99分位的数值。
  • 在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数。
二、state 简介

Flink的状态是由算子的子使命来创建和管理的。一个状态更新和获取的流程如下图所示,一个算子子使命接收输入流,获取对应的状态,根据新的计算结果更新状态。

状态的生存:
需要考虑的问题:

  • container 非常后,状态不丢
  • 状态可能越来越大
因此,状态不能直接放在内存中,以上两点问题都无法包管。
需要有一个外部持久化存储方式,常见的如放到 HDFS 中。(此部分读者感兴趣可自行搜索资料探索)
三、Flink 状态范例


一)Managed State 和 Raw State


  • Managed State 是由 Flink 管理的。Flink帮助存储、恢复和优化。
  • Raw State 是开发者自己管理的,需要自己序列化(较少用到)。
在 Flink 中保举用户利用Managed State管理状态数据 ,主要原因是 Managed State 能够更好地支持状态数据的重平衡以及更加完善的内存管理。
Managed StateRaw State状态管理方式Flink Runtime 管理,自动存储,自动恢复,内存管理方式上优化明显用户自己管理,需要用户自己序列化状态数据结构已知的数据结构 value , list ,mapflink不知道你存的是什么结构,都转换为二进制字节数据利用场景大多数场景适用需要满足特殊业务,自界说operator时利用,flink满足不了你的需求时候,利用复杂下文将重点介绍Managed State。
二)Keyed State 和 Operator State

Managed State 又有两种范例:Keyed State 和 Operator State。
keyed stateoperator state适用场景只能应用在 KeyedSteam 上可以用于所有的算子State 处理方式每个 key 对应一个 state,一个 operator 处理多个 key ,会访问相应的多个 state一个 operator 对应一个 state并发改变并发改变时,state随着key在实例间迁移并发改变时需要你选择分配方式,内置:1.匀称分配 2.所有state归并后再分发给每个实例访问方式通过RuntimeContext访问,需要operator是一个richFunction需要你实现CheckPointedFunction或ListCheckPointed接口支持数据结构ValuedState, ListState, Reducing State, Aggregating State, MapState, FoldingState(1.4弃用)只支持 listStateKeyed State

简单来说,通过 keyBy 分组的就会用到 Keyed State。就是按照分组来的状态。(Keyed State 是Operator State的特例,区别在于 Keyed State 事先按照 key 对数据集进行了分区,每个 Key State 仅对应ー个Operator和 Key 的组合。)

Keyed State可以通过 Key Groups 进行管理,主要用于当算子并行度发生变化时,自动重新分布Keyed State数据 。分配代码如下:
  1. // KeyGroupRangeAssignment.java
  2.     public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
  3.                 return MathUtils.murmurHash(keyHash) % maxParallelism;
  4.         }
复制代码
Operator State

Operator State 可以用在所有算子上,每个算子子使命或者说每个算子实例共享一个状态,流入这个算子子使命的数据可以访问和更新这个状态。
例如 Kafka Connector 中,每一个并行的 Kafka Consumer 都在 Operator State 中维护当前 Consumer 订阅的 partiton 和 offset。

三)Flink 实现类


在开发中,需要生存的状态也有不同的数据结构,那么 Flink 也提供了相应的类。
如上图所示:

  • ValueState[T] 生存单一变量状态
  • MapState[K, V] 同 java map,生存 kv 型状态
  • ListState[T] 数组范例状态
  • ReducingState[T] 单一状态,将原状态和新状态归并后再更新
  • AggregatingState[IN, OUT] 同样是归并更新,只不过前后数据范例可以不一样
四、实践

实现一个简单的计数窗口。
输入数据是一个元组 Tuple2.of(1L, 3L),把元组的第一个元素当作 key(在示例中都 key 都是 “1”),第二个元素当 value。
该函数将出现的次数以及总和存储在 ValueState 中。 一旦出现次数达到 2,则将匀称值发送到鄙俚,并扫除状态重新开始。 请注意,我们会为每个不同的 key(元组中第一个元素)生存一个单独的值。
  1. public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
  2.     /**
  3.      * The ValueState handle. The first field is the count, the second field a running sum.
  4.      */
  5.     private transient ValueState<Tuple2<Long, Long>> sum;
  6.     @Override
  7.     public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
  8.         // access the state value
  9.         Tuple2<Long, Long> currentSum = sum.value();
  10.         // update the count
  11.         currentSum.f0 += 1;
  12.         // add the second field of the input value
  13.         currentSum.f1 += input.f1;
  14.         // update the state
  15.         sum.update(currentSum);
  16.         // if the count reaches 2, emit the average and clear the state
  17.         if (currentSum.f0 >= 2) {
  18.             out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
  19.             sum.clear();
  20.         }
  21.     }
  22.     @Override
  23.     public void open(Configuration config) {
  24.         ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
  25.                 new ValueStateDescriptor<>(
  26.                         "average", // the state name
  27.                         TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
  28.                         Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
  29.         sum = getRuntimeContext().getState(descriptor);
  30.     }
  31. }
  32. // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
  33. env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
  34.         .keyBy(value -> value.f0)
  35.         .flatMap(new CountWindowAverage())
  36.         .print();
  37. // the printed output will be (1,4) and (1,5)
复制代码
四、小结

本节我们介绍了 Flink 状态,是用于流式计算中中间数据存储和故障恢复的。
Flink 状态分为 Raw State 和 Manage State,其中 Manage State 中又包含 Keyed State 和 Operator State。最重要的是 Keyed State 要重点理解和掌握。
在编程开发过程中,针对不同的数据结构,Flink 提供了对应的 State 类。并提供了一个 state demo 代码供学习。
参考文章:
七、Flink入门--状态管理_flink流式使命怎样包管7*24小时运行-CSDN博客
Flink状态管理详解:Keyed State和Operator List State深度解析
爆肝 3 月,3w 字、15 章节详解 Flink 状态管理!(建议收藏)-腾讯云开发者社区-腾讯云(较详细)
Flink 笔记二 Flink的State--状态原理及原理剖析_flink key state是每个key对应一个state照旧每个分区对应一个state-CSDN博客(源码剖析)
Flink 状态管理详解(State TTL、Operator state、Keyed state)-腾讯云开发者社区-腾讯云
Flink 源码阅读笔记(10)- State 管理

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

飞不高

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表