大数据-124 - Flink State 01篇 状态原理和原理分析:状态范例 实行分析 ...

打印 上一主题 下一主题

主题 932|帖子 932|积分 2796

点一下关注吧!!!非常感谢!!连续更新!!!

目前已经更新到了:



  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)
章节内容

上节我们完成了如下的内容:


  • Flink 并行度
  • Flink 并行度详解
  • Flink 并行度 案例

状态范例

Flink根据是否需要保存中间结果,把计算分为有状态计算和无状态计算。


  • 有状态计算:依靠之前或之后的变乱
  • 无状态计算:独立
根据数据布局差别,Flink定义了多种State,应用于差别的场景。


  • ValueState:即范例为T的单值状态,这个状态与对应的Key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过 value() 方法获取状态值
  • ListState:即Key上的状态值为一个列表,可以通过add方法往列表中附加值,也可以通过get()方法返回一个Iterable来遍历状态值
  • ReducingState:这种状态通过用户传入的ReduceFunction,每次调用add方法添加值的时间,会调用ReduceFunction,最后合并到一个单一的状态值。
  • FoldingState:跟ReducingState有点类似,不外它的状态值范例可以与add方法中传入的元素范例差别(这种状态会在未来的Flink版本当中删除)
  • MapState:即状态值为一个Map,用户通过put和putAll方法添加元素
State按照是否有Key划分为:


  • KeyedState
  • OperatorState
案例1 使用State求均匀值

实现思路



  • 读数据源
  • 将数据源根据Key分组
  • 按照Key分组战略,对流式数据调用状态化处理:实例化出一个状态实例,随着流式数据的到来更新状态,最后输出结果
编写代码

  1. package icu.wzk;
  2. import org.apache.flink.api.common.functions.RichFlatMapFunction;
  3. import org.apache.flink.api.common.state.ValueState;
  4. import org.apache.flink.api.common.state.ValueStateDescriptor;
  5. import org.apache.flink.api.common.typeinfo.TypeHint;
  6. import org.apache.flink.api.common.typeinfo.TypeInformation;
  7. import org.apache.flink.api.java.functions.KeySelector;
  8. import org.apache.flink.api.java.tuple.Tuple2;
  9. import org.apache.flink.configuration.Configuration;
  10. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  11. import org.apache.flink.streaming.api.datastream.KeyedStream;
  12. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  13. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  14. import org.apache.flink.util.Collector;
  15. public class FlinkStateTest01 {
  16.     public static void main(String[] args) throws Exception {
  17.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18.         env.setParallelism(1);
  19.         DataStreamSource<Tuple2<Long, Long>> data = env
  20.                 .fromElements(
  21.                         Tuple2.of(1L, 3L),
  22.                         Tuple2.of(1L, 5L),
  23.                         Tuple2.of(1L, 7L),
  24.                         Tuple2.of(1L, 4L),
  25.                         Tuple2.of(1L, 2L)
  26.                 );
  27.         KeyedStream<Tuple2<Long, Long>, Long> keyed = data
  28.                 .keyBy(new KeySelector<Tuple2<Long, Long>, Long>() {
  29.                     @Override
  30.                     public Long getKey(Tuple2<Long, Long> value) throws Exception {
  31.                         return value.f0;
  32.                     }
  33.                 });
  34.         SingleOutputStreamOperator<Tuple2<Long, Long>> flatMapped = keyed
  35.                 .flatMap(new RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
  36.                     private transient ValueState<Tuple2<Long, Long>> sum;
  37.                     @Override
  38.                     public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
  39.                         Tuple2<Long, Long> currentSum = sum.value();
  40.                         if (currentSum == null) {
  41.                             currentSum = Tuple2.of(0L, 0L);
  42.                         }
  43.                         // 更新
  44.                         currentSum.f0 += 1L;
  45.                         currentSum.f1 += value.f1;
  46.                         System.out.println("currentValue: " + currentSum);
  47.                         // 更新状态值
  48.                         sum.update(currentSum);
  49.                         // 如果 count >= 5 清空状态值 重新计算
  50.                         if (currentSum.f0 >= 5) {
  51.                             out.collect(new Tuple2<>(value.f0, currentSum.f1 / currentSum.f0));
  52.                             sum.clear();
  53.                         }
  54.                     }
  55.                     @Override
  56.                     public void open(Configuration parameters) throws Exception {
  57.                         ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
  58.                                 "average",
  59.                                 TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})
  60.                         );
  61.                         sum = getRuntimeContext().getState(descriptor);
  62.                     }
  63.                 });
  64.         flatMapped.print();
  65.         env.execute("Flink State Test");
  66.     }
  67. }
复制代码
运行结果


实行分析



Keyed State

表示和Key相干的一种State, 只能用于KeyedStream范例数据集对应的Function和Operator之上,KeyedState是OperatorState的特例,区别在于KeyedState事先按照Key对数据集进行了区分,每个KeyState仅对应一个Operator和Key的组合。
KeyedState可以通过KeyGroups进行管理,主要用于当算子并行度发生变革时,自动重新分布KeyedState数据。在系统运行过程中,一个Keyed算子实例大概运行一个大概多个KeyGroups的Keys。
Operator State

与 Keyed State 差别的是,Operator State 只和并行的算子实例绑定,和数据元素中的Key无关,每个算子实例中持有全部数据元素中的一部分状态数据。Operator State 支持算子实例并行度发生变革时自动重新分配状态数据。
同时在Flink中KeyedState和OperatorState均具有两种形式,此中一种为托管状态(Managed State)形式,由FlinkRuntime中控制和管理状态数据,并将状态数据转换为内存HashTables或RocksDB的对象存储,然后将这些状态数据通过内部的接口长期话到CheckPoints中,使命非常时可以通过这些状态数据恢复使命。另外一种是原生状态(Row State)形式,由算子本身管理数据布局,当触发CheckPoint中,当从CheckPoint恢复使命时,算子本身再返序列化出状态的数据布局。
DataStreamAPI支持使用ManagedState和RawState两种状态形式,在Flink中推荐用户使用ManagedState管理状态数据,主要缘故原由是ManagedState可以或许更好地支持状态数据的重平衡以及更加完善的内存管理。
状态描述


State既然是暴露给用户的,那么就需要有一些属性需要指定:


  • State名称
  • Value Serializer
  • State Type Info
在对应的StateBackend中,会去调用对应的create方法获取到stateDescriptor中的值。
Flink通过StateDescriptor来定义一个状态,这是一个抽象类,内部定义了状态名称、范例、序列化器等底子信息,与上面的状态对应,从StateDescriptor派生ValueStateDescriptor、ListStateDescriptor等等


  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • FoldingState getFoldingState(FoldingStateDescriptor)
  • MapState getMapState(MapStateDescriptot)

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

卖不甜枣

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表