8 Flink 状态管理

打印 上一主题 下一主题

主题 1010|帖子 1010|积分 3030

A Flink中的状态


状态的分类

1)托管状态(Managed State)和原始状态(Raw State)
Flink的状态有两种:托管状态(Managed State)和原始状态(Raw State)。托管状态就是由Flink同一管理的,状态的存储访问、故障恢复和重组等一系列题目都由Flink实现,我们只要调接口就可以;而原始状态则是自定义的,相称于就是开发了一块内存,需要我们自己管理,实近况态的序列化和故障恢复。
通常我们接纳Flink托管状态来实现需求。
2)算子状态(Operator State)和按键分区状态(Keyed State)
接下来我们的重点就是托管状态(Managed State)。
我们知道在Flink中,一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的slot在盘算资源上是物理隔离的,以是Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有用。
而许多有状态的操纵(比如聚合、窗口)都是要先做keyBy举行按键分区的。按键分区之后,任务所举行的全部盘算都应该只针对当前key有用,以是状态也应该按照key彼此隔离。在这种环境下,状态的访问方式又会有所不同。
基于这样的想法,我们又可以将托管状态分为两类:算子状态和按键分区状态。


别的,也可以通过富函数类(Rich Function)来自定义Keyed State,以是只要提供了富函数类接口的算子,也都可以使用Keyed State。以是纵然是map、filter这样无状态的基本转换算子,我们也可以通过富函数类给它们“追加”Keyed State。比如RichMapFunction、RichFilterFunction。在富函数中,我们可以调用.getRuntimeContext()获取当前的运行时上下文(RuntimeContext),进而获取到访问状态的句柄;这种富函数中自定义的状态也是Keyed State。从这个角度讲,Flink中全部的算子都可以是有状态的。
无论是Keyed State照旧Operator State,它们都是在本地实例上维护的,也就是说每个并行子任务维护着对应的状态,算子的子任务之间状态不共享。
B 按键分区状态(Keyed State)

按键分区状态(Keyed State)顾名思义,是任务按照键(key)来访问和维护的状态。它的特点非常鲜明,就是以key为作用范围举行隔离。
需要注意,使用Keyed State必须基于KeyedStream。没有举行keyBy分区的DataStream,纵然转换算子实现了对应的富函数类,也不能通过运行时上下文访问Keyed State。
8.2.1 值状态(ValueState)

顾名思义,状态中只保存一个“值”(value)。ValueState本身是一个接口,源码中定义如下:
  1. public interface ValueState<T> extends State {
  2.     T value() throws IOException;
  3.     void update(T value) throws IOException;
  4. }
复制代码
这里的T是泛型,表示状态的数据内容可以是任何具体的数据类型。如果想要保存一个长整型值作为状态,那么类型就是ValueState。
我们可以在代码中读写值状态,实现对于状态的访问和更新。
T value():获取当前状态的值;
update(T value):对状态举行更新,传入的参数value就是要覆写的状态值。
在具体使用时,为了让运行时上下文清楚到底是哪个状态,我们还需要创建一个“状态描述器”(StateDescriptor)来提供状态的基本信息。例如源码中,ValueState的状态描述器构造方法如下:
  1. public ValueStateDescriptor(String name, Class<T> typeClass) {
  2.     super(name, typeClass, null);
  3. }
复制代码
这里需要传入状态的名称和类型——这跟我们声明一个变量时做的事变完全一样。
案例需求:检测每种传感器的水位值,如果一连的两个水位值超过10,就输出报警。
  1. public class KeyedValueStateDemo {
  2.     public static void main(String[] args) throws Exception {
  3.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.         env.setParallelism(1);
  5.         SingleOutputStreamOperator<WaterSensor> sensorDS = env
  6.                 .socketTextStream("hadoop102", 7777)
  7.                 .map(new WaterSensorMapFunction())
  8.                 .assignTimestampsAndWatermarks(
  9.                         WatermarkStrategy
  10.                                 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  11.                                 .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
  12.                 );
  13.         sensorDS.keyBy(r -> r.getId())
  14.                 .process(
  15.                         new KeyedProcessFunction<String, WaterSensor, String>() {
  16.                             // TODO 1.定义状态
  17.                             ValueState<Integer> lastVcState;
  18.                             @Override
  19.                             public void open(Configuration parameters) throws Exception {
  20.                                 super.open(parameters);
  21.                                 // TODO 2.在open方法中,初始化状态
  22.                                 // 状态描述器两个参数:第一个参数,起个名字,不重复;第二个参数,存储的类型
  23.                                 lastVcState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastVcState", Types.INT));
  24.                             }
  25.                             @Override
  26.                             public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
  27. //                                lastVcState.value();  // 取出 本组 值状态 的数据
  28. //                                lastVcState.update(); // 更新 本组 值状态 的数据
  29. //                                lastVcState.clear();  // 清除 本组 值状态 的数据
  30.                                 // 1. 取出上一条数据的水位值(Integer默认值是null,判断)
  31.                                 int lastVc = lastVcState.value() == null ? 0 : lastVcState.value();
  32.                                 // 2. 求差值的绝对值,判断是否超过10
  33.                                 Integer vc = value.getVc();
  34.                                 if (Math.abs(vc - lastVc) > 10) {
  35.                                     out.collect("传感器=" + value.getId() + "==>当前水位值=" + vc + ",与上一条水位值=" + lastVc + ",相差超过10!!!!");
  36.                                 }
  37.                                 // 3. 更新状态里的水位值
  38.                                 lastVcState.update(vc);
  39.                             }
  40.                         }
  41.                 )
  42.                 .print();
  43.         env.execute();
  44.     }
  45. }
复制代码
8.2.2 列表状态(ListState)

将需要保存的数据,以列表(List)的情势构造起来。在ListState接口中同样有一个类型参数T,表示列表中数据的类型。ListState也提供了一系列的方法来操纵状态,使用方式与一般的List非常相似。
Iterable get():获取当前的列表状态,返回的是一个可迭代类型Iterable;
update(List values):传入一个列表values,直接对状态举行覆盖;
add(T value):在状态列表中添加一个元素value;
addAll(List values):向列表中添加多个元素,以列表values情势传入。
雷同地,ListState的状态描述器就叫作ListStateDescriptor,用法跟ValueStateDescriptor完全一致。
案例:针对每种传感器输出最高的3个水位值
  1. public class KeyedListStateDemo {
  2.     public static void main(String[] args) throws Exception {
  3.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.         env.setParallelism(1);
  5.         SingleOutputStreamOperator<WaterSensor> sensorDS = env
  6.                 .socketTextStream("hadoop102", 7777)
  7.                 .map(new WaterSensorMapFunction())
  8.                 .assignTimestampsAndWatermarks(
  9.                         WatermarkStrategy
  10.                                 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  11.                                 .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
  12.                 );
  13.         sensorDS.keyBy(r -> r.getId())
  14.                 .process(
  15.                         new KeyedProcessFunction<String, WaterSensor, String>() {
  16.                             ListState<Integer> vcListState;
  17.                             @Override
  18.                             public void open(Configuration parameters) throws Exception {
  19.                                 super.open(parameters);
  20.                                 vcListState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcListState", Types.INT));
  21.                             }
  22.                             @Override
  23.                             public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
  24.                                 // 1.来一条,存到list状态里
  25.                                 vcListState.add(value.getVc());
  26.                                 // 2.从list状态拿出来(Iterable), 拷贝到一个List中,排序, 只留3个最大的
  27.                                 Iterable<Integer> vcListIt = vcListState.get();
  28.                                 // 2.1 拷贝到List中
  29.                                 List<Integer> vcList = new ArrayList<>();
  30.                                 for (Integer vc : vcListIt) {
  31.                                     vcList.add(vc);
  32.                                 }
  33.                                 // 2.2 对List进行降序排序
  34.                                 vcList.sort((o1, o2) -> o2 - o1);
  35.                                 // 2.3 只保留最大的3个(list中的个数一定是连续变大,一超过3就立即清理即可)
  36.                                 if (vcList.size() > 3) {
  37.                                     // 将最后一个元素清除(第4个)
  38.                                     vcList.remove(3);
  39.                                 }
  40.                                 out.collect("传感器id为" + value.getId() + ",最大的3个水位值=" + vcList.toString());
  41.                                 // 3.更新list状态
  42.                                 vcListState.update(vcList);
  43. //                                vcListState.get();            //取出 list状态 本组的数据,是一个Iterable
  44. //                                vcListState.add();            // 向 list状态 本组 添加一个元素
  45. //                                vcListState.addAll();         // 向 list状态 本组 添加多个元素
  46. //                                vcListState.update();         // 更新 list状态 本组数据(覆盖)
  47. //                                vcListState.clear();          // 清空List状态 本组数据
  48.                             }
  49.                         }
  50.                 )
  51.                 .print();
  52.         env.execute();
  53.     }
  54. }
复制代码
8.2.3 Map状态(MapState)

把一些键值对(key-value)作为状态整体保存起来,可以以为就是一组key-value映射的列表。对应的MapState<UK, UV>接口中,就会有UK、UV两个泛型,分别表示保存的key和value的类型。同样,MapState提供了操纵映射状态的方法,与Map的使用非常雷同。
UV get(UK key):传入一个key作为参数,查询对应的value值;
put(UK key, UV value):传入一个键值对,更新key对应的value值;
putAll(Map<UK, UV> map):将传入的映射map中全部的键值对,全部添加到映射状态中;
remove(UK key):将指定key对应的键值对删除;
boolean contains(UK key):判断是否存在指定的key,返回一个boolean值。
别的,MapState也提供了获取整个映射干系信息的方法;
Iterable<Map.Entry<UK, UV>> entries():获取映射状态中全部的键值对;
Iterable keys():获取映射状态中全部的键(key),返回一个可迭代Iterable类型;
Iterable values():获取映射状态中全部的值(value),返回一个可迭代Iterable类型;
boolean isEmpty():判断映射是否为空,返回一个boolean值。
案例需求:统计每种传感器每种水位值出现的次数。
  1. public class KeyedMapStateDemo {
  2.     public static void main(String[] args) throws Exception {
  3.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.         env.setParallelism(1);
  5.         SingleOutputStreamOperator<WaterSensor> sensorDS = env
  6.                 .socketTextStream("hadoop102", 7777)
  7.                 .map(new WaterSensorMapFunction())
  8.                 .assignTimestampsAndWatermarks(
  9.                         WatermarkStrategy
  10.                                 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  11.                                 .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
  12.                 );
  13.         sensorDS.keyBy(r -> r.getId())
  14.                 .process(
  15.                         new KeyedProcessFunction<String, WaterSensor, String>() {
  16.                             MapState<Integer, Integer> vcCountMapState;
  17.                             @Override
  18.                             public void open(Configuration parameters) throws Exception {
  19.                                 super.open(parameters);
  20.                                 vcCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("vcCountMapState", Types.INT, Types.INT));
  21.                             }
  22.                             @Override
  23.                             public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
  24.                                 // 1.判断是否存在vc对应的key
  25.                                 Integer vc = value.getVc();
  26.                                 if (vcCountMapState.contains(vc)) {
  27.                                     // 1.1 如果包含这个vc的key,直接对value+1
  28.                                     Integer count = vcCountMapState.get(vc);
  29.                                     vcCountMapState.put(vc, ++count);
  30.                                 } else {
  31.                                     // 1.2 如果不包含这个vc的key,初始化put进去
  32.                                     vcCountMapState.put(vc, 1);
  33.                                 }
  34.                                 // 2.遍历Map状态,输出每个k-v的值
  35.                                 StringBuilder outStr = new StringBuilder();
  36.                                 outStr.append("======================================\n");
  37.                                 outStr.append("传感器id为" + value.getId() + "\n");
  38.                                 for (Map.Entry<Integer, Integer> vcCount : vcCountMapState.entries()) {
  39.                                     outStr.append(vcCount.toString() + "\n");
  40.                                 }
  41.                                 outStr.append("======================================\n");
  42.                                 out.collect(outStr.toString());
  43. //                                vcCountMapState.get();          // 对本组的Map状态,根据key,获取value
  44. //                                vcCountMapState.contains();     // 对本组的Map状态,判断key是否存在
  45. //                                vcCountMapState.put(, );        // 对本组的Map状态,添加一个 键值对
  46. //                                vcCountMapState.putAll();  // 对本组的Map状态,添加多个 键值对
  47. //                                vcCountMapState.entries();      // 对本组的Map状态,获取所有键值对
  48. //                                vcCountMapState.keys();         // 对本组的Map状态,获取所有键
  49. //                                vcCountMapState.values();       // 对本组的Map状态,获取所有值
  50. //                                vcCountMapState.remove();   // 对本组的Map状态,根据指定key,移除键值对
  51. //                                vcCountMapState.isEmpty();      // 对本组的Map状态,判断是否为空
  52. //                                vcCountMapState.iterator();     // 对本组的Map状态,获取迭代器
  53. //                                vcCountMapState.clear();        // 对本组的Map状态,清空
  54.                             }
  55.                         }
  56.                 )
  57.                 .print();
  58.         env.execute();
  59.     }
  60. }
复制代码
8.2.4 归约状态(ReducingState)

雷同于值状态(Value),不过需要对添加进来的全部数据举行归约,将归约聚合之后的值作为状态保存下来。ReducingState这个接口调用的方法雷同于ListState,只不过它保存的只是一个聚合值,以是调用.add()方法时,不是在状态列表里添加元素,而是直接把新数据和之前的状态举行归约,并用得到的效果更新状态。
归约逻辑的定义,是在归约状态描述器(ReducingStateDescriptor)中,通过传入一个归约函数(ReduceFunction)来实现的。这里的归约函数,就是我们之前介绍reduce聚合算子时讲到的ReduceFunction,以是状态类型跟输入的数据类型是一样的。
  1. public ReducingStateDescriptor(
  2.     String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {...}
复制代码
这里的描述器有三个参数,其中第二个参数就是定义了归约聚合逻辑的ReduceFunction,别的两个参数则是状态的名称和类型。
案例:盘算每种传感器的水位和

  1. .process(new KeyedProcessFunction<String, WaterSensor, Integer>() {
  2.     private ReducingState<Integer> sumVcState;
  3.     @Override
  4.     public void open(Configuration parameters) throws Exception {
  5.         sumVcState = this
  6.           .getRuntimeContext()
  7.           .getReducingState(new ReducingStateDescriptor<Integer>("sumVcState",Integer::sum,Integer.class));
  8.     }
  9.     @Override
  10.     public void processElement(WaterSensor value, Context ctx, Collector<Integer> out) throws Exception {
  11.         sumVcState.add(value.getVc());
  12.         out.collect(sumVcState.get());
  13.     }
  14. })
复制代码
8.2.5 聚合状态(AggregatingState)

与归约状态非常雷同,聚合状态也是一个值,用来保存添加进来的全部数据的聚合效果。与ReducingState不同的是,它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数(AggregateFunction)来定义的;这也就是之前我们讲过的AggregateFunction,内里通过一个累加器(Accumulator)来表示状态,以是聚合的状态类型可以跟添加进来的数据类型完全不同,使用更加灵活。
同样地,AggregatingState接口调用方法也与ReducingState相同,调用.add()方法添加元素时,会直接使用指定的AggregateFunction举行聚合并更新状态。
案例需求:盘算每种传感器的均匀水位
  1. public class KeyedAggregatingStateDemo {
  2.     public static void main(String[] args) throws Exception {
  3.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.         env.setParallelism(1);
  5.         SingleOutputStreamOperator<WaterSensor> sensorDS = env
  6.                 .socketTextStream("hadoop102", 7777)
  7.                 .map(new WaterSensorMapFunction())
  8.                 .assignTimestampsAndWatermarks(
  9.                         WatermarkStrategy
  10.                                 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  11.                                 .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
  12.                 );
  13.         sensorDS.keyBy(r -> r.getId())
  14.                 .process(
  15.                         new KeyedProcessFunction<String, WaterSensor, String>() {
  16.                             AggregatingState<Integer, Double> vcAvgAggregatingState;
  17.                             @Override
  18.                             public void open(Configuration parameters) throws Exception {
  19.                                 super.open(parameters);
  20.                                 vcAvgAggregatingState = getRuntimeContext()
  21.                                         .getAggregatingState(
  22.                                                 new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>(
  23.                                                         "vcAvgAggregatingState",
  24.                                                         new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {
  25.                                                             @Override
  26.                                                             public Tuple2<Integer, Integer> createAccumulator() {
  27.                                                                 return Tuple2.of(0, 0);
  28.                                                             }
  29.                                                             @Override
  30.                                                             public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
  31.                                                                 return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);
  32.                                                             }
  33.                                                             @Override
  34.                                                             public Double getResult(Tuple2<Integer, Integer> accumulator) {
  35.                                                                 return accumulator.f0 * 1D / accumulator.f1;
  36.                                                             }
  37.                                                             @Override
  38.                                                             public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
  39. //                                                                return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
  40.                                                                 return null;
  41.                                                             }
  42.                                                         },
  43.                                                         Types.TUPLE(Types.INT, Types.INT))
  44.                                         );
  45.                             }
  46.                             @Override
  47.                             public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
  48.                                 // 将 水位值 添加到  聚合状态中
  49.                                 vcAvgAggregatingState.add(value.getVc());
  50.                                 // 从 聚合状态中 获取结果
  51.                                 Double vcAvg = vcAvgAggregatingState.get();
  52.                                 out.collect("传感器id为" + value.getId() + ",平均水位值=" + vcAvg);
  53. //                                vcAvgAggregatingState.get();    // 对 本组的聚合状态 获取结果
  54. //                                vcAvgAggregatingState.add();    // 对 本组的聚合状态 添加数据,会自动进行聚合
  55. //                                vcAvgAggregatingState.clear();  // 对 本组的聚合状态 清空数据
  56.                             }
  57.                         }
  58.                 )
  59.                 .print();
  60.         env.execute();
  61.     }
  62. }
复制代码
8.2.6 状态生存时间(TTL)

在实际应用中,许多状态会随着时间的推移渐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。一个优化的思绪是直接在代码中调用.clear()方法去清除状态,但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”(time-to-live,TTL),当状态在内存中存在的时间超出这个值时,就将它清除。
具体实现上,如果用一个历程不停地扫描全部状态看是否过期,显然会占用大量资源做无用功。状态的失效着实不需要立即删除,以是我们可以给状态附加一个属性,也就是状态的“失效时间”。状态创建的时候,设置 失效时间 = 当前时间 + TTL;之后如果有对状态的访问和修改,我们可以再对失效时间举行更新;当设置的清除条件被触发时(比如,状态被访问的时候,或者每隔一段时间扫描一次失效状态),就可以判断状态是否失效、从而举行清除了。
配置状态的TTL时,需要创建一个StateTtlConfig配置对象,然后调用状态描述器的.enableTimeToLive()方法启动TTL功能。
  1. StateTtlConfig ttlConfig = StateTtlConfig
  2.     .newBuilder(Time.seconds(10))
  3.     .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  4.     .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  5.     .build();
  6. ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("my state", String.class);
  7. stateDescriptor.enableTimeToLive(ttlConfig);
复制代码
这里用到了几个配置项:
.newBuilder()
状态TTL配置的构造器方法,必须调用,返回一个Builder之后再调用.build()方法就可以得到StateTtlConfig了。方法需要传入一个Time作为参数,这就是设定的状态生存时间。
.setUpdateType()
设置更新类型。更新类型指定了什么时候更新状态失效时间,这里的OnCreateAndWrite表示只有创建状态和更改状态(写操纵)时更新失效时间。另一种类型OnReadAndWrite则表示无论读写操纵都会更新失效时间,也就是只要对状态举行了访问,就表明它是活跃的,从而延伸生存时间。这个配置默以为OnCreateAndWrite。
.setStateVisibility()
设置状态的可见性。所谓的“状态可见性”,是指因为清除操纵并不是实时的,以是当状态过期之后还有可能继续存在,这时如果对它举行访问,能否正常读取到就是一个题目了。这里设置的NeverReturnExpired是默认行为,表示从不返回过期值,也就是只要过期就以为它已经被清除了,应用不能继续读取;这在处剖析话或者隐私数据时比较紧张。对应的另一种配置是ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,就返回它的值。
除此之外,TTL配置还可以设置在保存查抄点(checkpoint)时触发清除操纵,或者配置增量的清算(incremental cleanup),还可以针对RocksDB状态后端使用压缩过滤器(compaction filter)举行配景清算。这里需要注意,目前的TTL设置只支持处理时间。
  1. public class StateTTLDemo {
  2.     public static void main(String[] args) throws Exception {
  3.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.         env.setParallelism(1);
  5.         SingleOutputStreamOperator<WaterSensor> sensorDS = env
  6.                 .socketTextStream("hadoop102", 7777)
  7.                 .map(new WaterSensorMapFunction())
  8.                 .assignTimestampsAndWatermarks(
  9.                         WatermarkStrategy
  10.                                 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  11.                                 .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
  12.                 );
  13.         sensorDS.keyBy(r -> r.getId())
  14.                 .process(
  15.                         new KeyedProcessFunction<String, WaterSensor, String>() {
  16.                             ValueState<Integer> lastVcState;
  17.                             @Override
  18.                             public void open(Configuration parameters) throws Exception {
  19.                                 super.open(parameters);
  20.                                 // TODO 1.创建 StateTtlConfig
  21.                                 StateTtlConfig stateTtlConfig = StateTtlConfig
  22.                                         .newBuilder(Time.seconds(5)) // 过期时间5s
  23. //                                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 状态 创建和写入(更新) 更新 过期时间
  24.                                         .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 状态 读取、创建和写入(更新) 更新 过期时间
  25.                                         .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期的状态值
  26.                                         .build();
  27.                                 // TODO 2.状态描述器 启用 TTL
  28.                                 ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("lastVcState", Types.INT);
  29.                                 stateDescriptor.enableTimeToLive(stateTtlConfig);
  30.                                 this.lastVcState = getRuntimeContext().getState(stateDescriptor);
  31.                             }
  32.                             @Override
  33.                             public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
  34.                                 // 先获取状态值,打印 ==》 读取状态
  35.                                 Integer lastVc = lastVcState.value();
  36.                                 out.collect("key=" + value.getId() + ",状态值=" + lastVc);
  37.                                 // 如果水位大于10,更新状态值 ===》 写入状态
  38.                                 if (value.getVc() > 10) {
  39.                                     lastVcState.update(value.getVc());
  40.                                 }
  41.                             }
  42.                         }
  43.                 )
  44.                 .print();
  45.         env.execute();
  46.     }
  47. }
复制代码
C 算子状态(Operator State)

算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的key无关,以是不同key的数据只要被分发到同一个并行子任务,就会访问到同一个Operator State。
算子状态的实际应用场景不如Keyed State多,一般用在Source或Sink等与外部体系毗连的算子上,或者完全没有key定义的场景。比如Flink的Kafka毗连器中,就用到了算子状态。
当算子的并行度发生变革时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同。
算子状态也支持不同的布局类型,主要有三种:ListState、UnionListState和BroadcastState。
8.3.1 列表状态(ListState)

与Keyed State中的ListState一样,将状态表示为一组数据的列表。
与Keyed State中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,以是每一个并行子任务上只会保存一个“列表”(list),也就是当前并行子任务上全部状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。
当算子并行度举行缩放调解时,算子的列表状态中的全部元素项会被同一收集起来,相称于把多个分区的列表合并成了一个“大列表”,然后再均匀地分配给全部并行任务。这种“均匀分配”的具体方法就是“轮询”(round-robin),与之前介绍的rebanlance数据传输方式雷同,是通过逐一“发牌”的方式将状态项均匀分配的。这种方式也叫作“均匀分割重组”(even-split redistribution)。
算子状态中不会存在“键组”(key group)这样的布局,以是为了方便重组分配,就把它直接定义成了“列表”(list)。这也就解释了,为什么算子状态中没有最简单的值状态(ValueState)。
案例实操:在map算子中盘算数据的个数。
  1. public class OperatorListStateDemo {
  2.     public static void main(String[] args) throws Exception {
  3.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.         env.setParallelism(2);
  5.         env
  6.                 .socketTextStream("hadoop102", 7777)
  7.                 .map(new MyCountMapFunction())
  8.                 .print();
  9.         env.execute();
  10.     }
  11.     // TODO 1.实现 CheckpointedFunction 接口
  12.     public static class MyCountMapFunction implements MapFunction<String, Long>, CheckpointedFunction {
  13.         private Long count = 0L;
  14.         private ListState<Long> state;
  15.         @Override
  16.         public Long map(String value) throws Exception {
  17.             return ++count;
  18.         }
  19.         /**
  20.          * TODO 2.本地变量持久化:将 本地变量 拷贝到 算子状态中,开启checkpoint时才会调用
  21.          *
  22.          * @param context
  23.          * @throws Exception
  24.          */
  25.         @Override
  26.         public void snapshotState(FunctionSnapshotContext context) throws Exception {
  27.             System.out.println("snapshotState...");
  28.             // 2.1 清空算子状态
  29.             state.clear();
  30.             // 2.2 将 本地变量 添加到 算子状态 中
  31.             state.add(count);
  32.         }
  33.         /**
  34.          * TODO 3.初始化本地变量:程序启动和恢复时, 从状态中 把数据添加到 本地变量,每个子任务调用一次
  35.          *
  36.          * @param context
  37.          * @throws Exception
  38.          */
  39.         @Override
  40.         public void initializeState(FunctionInitializationContext context) throws Exception {
  41.             System.out.println("initializeState...");
  42.             // 3.1 从 上下文 初始化 算子状态
  43.             state = context
  44.                     .getOperatorStateStore()
  45.                     .getListState(new ListStateDescriptor<Long>("state", Types.LONG));
  46.             // 3.2 从 算子状态中 把数据 拷贝到 本地变量
  47.             if (context.isRestored()) {
  48.                 for (Long c : state.get()) {
  49.                     count += c;
  50.                 }
  51.             }
  52.         }
  53.     }
  54. }
复制代码
8.3.2 团结列表状态(UnionListState)

与ListState雷同,团结列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度举行缩放调解时对于状态的分配方式不同。
UnionListState的重点就在于“团结”(union)。在并行度调解时,常规列表状态是轮询分配状态项,而团结列表状态的算子则会直接广播状态的完整列表。这样,并行度缩放之后的并行子任务就获取到了团结后完整的“大列表”,可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“团结重组”(union redistribution)。如果列表中状态项数量太多,为资源和效率考虑一般不建议使用团结重组的方式。
使用方式同ListState,区别在如下标红部分:
  1. state = context
  2.               .getOperatorStateStore()
  3.               .getUnionListState(new ListStateDescriptor<Long>("union-state", Types.LONG));
复制代码
8.3.3 广播状态(BroadcastState)

有时我们盼望算子并行子任务都保持同一份“全局”状态,用来做同一的配置和规则设定。这时全部分区的全部数据都会访问到同一个状态,状态就像被“广播”到全部分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。
因为广播状态在每个并行子任务上的实例都一样,以是在并行度调解的时候就比较简单,只要复制一份到新的并行任务就可以实现扩展;而对于并行度缩小的环境,可以将多余的并行子任务连同状态直接砍掉——因为状态都是复制出来的,并不会丢失。
案例实操:水位超过指定的阈值发送告警,阈值可以动态修改。
  1. public class OperatorBroadcastStateDemo {
  2.     public static void main(String[] args) throws Exception {
  3.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.         env.setParallelism(2);
  5.         // 数据流
  6.         SingleOutputStreamOperator<WaterSensor> sensorDS = env
  7.                 .socketTextStream("hadoop102", 7777)
  8.                 .map(new WaterSensorMapFunction());
  9.         // 配置流(用来广播配置)
  10.         DataStreamSource<String> configDS = env.socketTextStream("hadoop102", 8888);
  11.         // TODO 1. 将 配置流 广播
  12.         MapStateDescriptor<String, Integer> broadcastMapState = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT);
  13.         BroadcastStream<String> configBS = configDS.broadcast(broadcastMapState);
  14.         // TODO 2.把 数据流 和 广播后的配置流 connect
  15.         BroadcastConnectedStream<WaterSensor, String> sensorBCS = sensorDS.connect(configBS);
  16.         // TODO 3.调用 process
  17.         sensorBCS
  18.                 .process(
  19.                         new BroadcastProcessFunction<WaterSensor, String, String>() {
  20.                             /**
  21.                              * 数据流的处理方法: 数据流 只能 读取 广播状态,不能修改
  22.                              * @param value
  23.                              * @param ctx
  24.                              * @param out
  25.                              * @throws Exception
  26.                              */
  27.                             @Override
  28.                             public void processElement(WaterSensor value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
  29.                                 // TODO 5.通过上下文获取广播状态,取出里面的值(只读,不能修改)
  30.                                 ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);
  31.                                 Integer threshold = broadcastState.get("threshold");
  32.                                 // 判断广播状态里是否有数据,因为刚启动时,可能是数据流的第一条数据先来
  33.                                 threshold = (threshold == null ? 0 : threshold);
  34.                                 if (value.getVc() > threshold) {
  35.                                     out.collect(value + ",水位超过指定的阈值:" + threshold + "!!!");
  36.                                 }
  37.                             }
  38.                             /**
  39.                              * 广播后的配置流的处理方法:  只有广播流才能修改 广播状态
  40.                              * @param value
  41.                              * @param ctx
  42.                              * @param out
  43.                              * @throws Exception
  44.                              */
  45.                             @Override
  46.                             public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
  47.                                 // TODO 4. 通过上下文获取广播状态,往里面写数据
  48.                                 BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);
  49.                                 broadcastState.put("threshold", Integer.valueOf(value));
  50.                             }
  51.                         }
  52.                 )
  53.                 .print();
  54.         env.execute();
  55.     }
  56. }
复制代码
D 状态后端(State Backends)

在Flink中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)。状态后端主要负责管理本地状态的存储方式和位置。
8.4.1 状态后端的分类(HashMapStateBackend/RocksDB)

状态后端是一个“开箱即用”的组件,可以在不改变应用步伐逻辑的环境下独立配置。Flink中提供了两类不同的状态后端,
一种是“哈希表状态后端”(HashMapStateBackend),
另一种是“内嵌RocksDB状态后端”(EmbeddedRocksDBStateBackend)。
如果没有特别配置,体系默认的状态后端是HashMapStateBackend。
(1)哈希表状态后端(HashMapStateBackend)
HashMapStateBackend是把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在Taskmanager的JVM堆上。普通的状态,以及窗口中收集的数据和触发器,都会以键值对的情势存储起来,以是底层是一个哈希表(HashMap),这种状态后端也因此得名。
(2)内嵌RocksDB状态后端(EmbeddedRocksDBStateBackend)
RocksDB是一种内嵌的key-value存储介质,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend后,会将处理中的数据全部放入RocksDB数据库中,RocksDB默认存储在TaskManager的本地数据目录里。
RocksDB的状态数据被存储为序列化的字节数组,读写操纵需要序列化/反序列化,因此状态的访问性能要差一些。别的,因为做了序列化,key的比较也会按照字节举行,而不是直接调用.hashCode()和.equals()方法。
EmbeddedRocksDBStateBackend始终执行的是异步快照,以是不会因为保存查抄点而壅闭数据的处理;而且它还提供了增量式保存查抄点的机制,这在许多环境下可以大大提升保存效率。
8.4.2 怎样选择精确的状态后端

HashMap和RocksDB两种状态后端最大的区别,就在于本地状态存放在哪里。
HashMapStateBackend是内存盘算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。
而RocksDB是硬盘存储,以是可以根据可用的磁盘空间举行扩展,以是它非常得当于超等海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的低落,均匀读写性能要比HashMapStateBackend慢一个数量级。
8.4.3 状态后端的配置

在不做配置的时候,应用步伐使用的默认状态后端是由集群配置文件flink-conf.yaml中指定的,配置的键名称为state.backend。这个默认配置对集群上运行的全部作业都有用,我们可以通过更改配置值来改变默认的状态后端。别的,我们还可以在代码中为当前作业单独配置状态后端,这个配置会覆盖掉集群配置文件的默认值。
(1)配置默认的状态后端
在flink-conf.yaml中,可以使用state.backend来配置默认状态后端。
配置项的可能值为hashmap,这样配置的就是HashMapStateBackend;如果配置项的值是rocksdb,这样配置的就是EmbeddedRocksDBStateBackend。
下面是一个配置HashMapStateBackend的例子:
  1. # 默认状态后端
  2. state.backend: hashmap
  3. # 存放检查点的文件路径
  4. state.checkpoints.dir: hdfs://hadoop102:8020/flink/checkpoints
复制代码
这里的state.checkpoints.dir配置项,定义了查抄点和元数据写入的目录。
(2)为每个作业(Per-job/Application)单独配置状态后端
通过执行环境设置,HashMapStateBackend。
  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setStateBackend(new HashMapStateBackend());
复制代码
通过执行环境设置,EmbeddedRocksDBStateBackend。
  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setStateBackend(new EmbeddedRocksDBStateBackend());
复制代码
需要注意,如果想在IDE中使用EmbeddedRocksDBStateBackend,需要为Flink项目添加依赖:
  1. <dependency>
  2.     <groupId>org.apache.flink</groupId>
  3.     <artifactId>flink-statebackend-rocksdb</artifactId>
  4.     <version>${flink.version}</version>
  5. </dependency>
复制代码
而由于Flink发行版中默认就包罗了RocksDB(服务器上解压的Flink),以是只要我们的代码中没有使用RocksDB的干系内容,就不需要引入这个依赖。
补充

我们前面写的 wordcount 的例子,没有包罗状态管理。如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,全部的数据都需要重新盘算。从容错和消息处理的语义上(at least once, exactly once),Flink引入了state和checkpoint。
因此可以说flink因为引入了state和checkpoint以是才支持的exactly once
首先区分一下两个概念:
state:
state一般指一个具体的task/operator的状态:
state数据默认保存在java的堆内存中,TaskManage节点的内存中。
operator表示一些算子在运行的过程中会产生的一些中间效果。
checkpoint:
checkpoint可以理解为checkpoint是把state数据定时持久化存储了,则表示了一个Flink Job在一个特定时刻的一份全局状态快照,即包罗了全部task/operator的状态。
注意:task(subTask)是Flink中执行的基本单位。operator指算子(transformation)
State可以被记载,在失败的环境下数据还可以恢复。
Flink中有两种基本类型的State:
Keyed State
Operator State
Keyed State和Operator State,可以以两种情势存在:
原始状态(raw state)
托管状态(managed state)
托管状态是由Flink框架管理的状态。
我们说operator算子保存了数据的中间效果,中间效果保存在什么类型中,如果我们这里是托管状态,则由flink框架自行管理
原始状态由用户自行管理状态具体的数据布局,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据布局一无所知。
通常在DataStream上的状态保举使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态。
1. State-Keyed State

基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state,比如:stream.keyBy(…)。KeyBy之后的Operator State,可以理解为分区过的Operator State。
保存state的数据布局:
ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值。
ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值。
ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,末了合并到一个单一的状态值。
MapState<UK, UV>:即状态值为一个map。用户通过put或putAll方法添加元素。
需要注意的是,以上所述的State对象,仅仅用于与状态举行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储体系中。相称于我们只是持有了这个状态的句柄。

  • ValueState
    使用ValueState保存中间效果对下面数据举行分组求和。
    开发步骤:
  • 获取流处理执行环境
  • 加载数据源
  • 数据分组
  • 数据转换,定义ValueState,保存中间效果
  • 数据打印
  • 触发执行
    ValueState:测试数据源:
  1. List(
  2.    (1L, 4L),
  3.    (2L, 3L),
  4.    (3L, 1L),
  5.    (1L, 2L),
  6.    (3L, 2L),
  7.    (1L, 2L),
  8.    (2L, 2L),
  9.    (2L, 9L)
  10. )
复制代码
示例代码:
  1. import org.apache.flink.api.common.functions.RichFlatMapFunction
  2. import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
  3. import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
  4. import org.apache.flink.configuration.Configuration
  5. import org.apache.flink.streaming.api.scala._
  6. import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
  7. import org.apache.flink.util.Collector
  8. object TestKeyedState {
  9.   class CountWithKeyedState extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
  10.     /**
  11.      * ValueState状态句柄. 第一个值为count,第二个值为sum。
  12.      */
  13.     private var sum: ValueState[(Long, Long)] = _
  14.     override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
  15.       // 获取当前状态值
  16.       val tmpCurrentSum: (Long, Long) = sum.value
  17.       // 状态默认值
  18.       val currentSum = if (tmpCurrentSum != null) {
  19.         tmpCurrentSum
  20.       } else {
  21.         (0L, 0L)
  22.       }
  23.       // 更新
  24.       val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
  25.       // 更新状态值
  26.       sum.update(newSum)
  27.       // 如果count >=3 清空状态值,重新计算
  28.       if (newSum._1 >= 3) {
  29.         out.collect((input._1, newSum._2 / newSum._1))
  30.         sum.clear()
  31.       }
  32.     }
  33.     override def open(parameters: Configuration): Unit = {
  34.       sum = getRuntimeContext.getState(
  35.         new ValueStateDescriptor[(Long, Long)]("average", // 状态名称
  36.           TypeInformation.of(new TypeHint[(Long, Long)](){}) )// 状态类型
  37.       )
  38.     }
  39.   }  
  40.   def main(args: Array[String]): Unit = {
  41.     //初始化执行环境
  42.     val env = StreamExecutionEnvironment.getExecutionEnvironment
  43.     //构建数据源
  44.     val inputStream: DataStream[(Long, Long)] = env.fromCollection(
  45.       List(
  46.         (1L, 4L),
  47.         (2L, 3L),
  48.         (3L, 1L),
  49.         (1L, 2L),
  50.         (3L, 2L),
  51.         (1L, 2L),
  52.         (2L, 2L),
  53.         (2L, 9L))
  54.     )
  55.     //执行数据处理
  56.     inputStream.keyBy(0)
  57.       .flatMap(new CountWithKeyedState)
  58.       .setParallelism(1)
  59.       .print
  60.     //运行任务
  61.     env.execute
  62.   }
  63. }  
复制代码

  • MapState
    使用MapState保存中间效果对下面数据举行分组求和:
  • 获取流处理执行环境
  • 加载数据源
  • 数据分组
  • 数据转换,定义MapState,保存中间效果
  • 数据打印
  • 触发执行
    MapState:测试数据源:
  1. List(
  2.    ("java", 1),
  3.    ("python", 3),
  4.    ("java", 2),
  5.    ("scala", 2),
  6.    ("python", 1),
  7.    ("java", 1),
  8.    ("scala", 2)
  9. )  
复制代码
示例代码:
  1. object MapState {
  2.   def main(args: Array[String]): Unit = {
  3.     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  4.     env.setParallelism(1)
  5.     /**
  6.       * 使用MapState保存中间结果对下面数据进行分组求和
  7.       * 1.获取流处理执行环境
  8.       * 2.加载数据源
  9.       * 3.数据分组
  10.       * 4.数据转换,定义MapState,保存中间结果
  11.       * 5.数据打印
  12.       * 6.触发执行
  13.       */
  14.     val source: DataStream[(String, Int)] = env.fromCollection(List(
  15.       ("java", 1),
  16.       ("python", 3),
  17.       ("java", 2),
  18.       ("scala", 2),
  19.       ("python", 1),
  20.       ("java", 1),
  21.       ("scala", 2)))
  22.   
  23.     source.keyBy(0)
  24.       .map(new RichMapFunction[(String, Int), (String, Int)] {
  25.         var mste: MapState[String, Int] = _
  26.         override def open(parameters: Configuration): Unit = {
  27.           val msState = new MapStateDescriptor[String, Int]("ms",
  28.             TypeInformation.of(new TypeHint[(String)] {}),
  29.             TypeInformation.of(new TypeHint[(Int)] {}))
  30.           mste = getRuntimeContext.getMapState(msState)
  31.         }
  32.         override def map(value: (String, Int)): (String, Int) = {
  33.           val i: Int = mste.get(value._1)
  34.           mste.put(value._1, value._2 + i)
  35.           (value._1, value._2 + i)
  36.         }
  37.       }).print()
  38.     env.execute()
  39.   }
  40. }  
复制代码
2. State-Operator State

与Key无关的State,与Operator绑定的state,整个operator只对应一个state。
保存state的数据布局:
ListState
举例来说,Flink中的 Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消耗topic的全部(partition, offset)映射。
步骤:
1.获取执行环境
2.设置查抄点机制:路径,重启策略
3.自定义数据源
需要继承并行数据源和CheckpointedFunction
设置listState,通过上下文对象context获取
数据处理,保存offset
制作快照
4.数据打印
5.触发执行
示例代码:
  1. import java.util
  2. import org.apache.flink.api.common.restartstrategy.RestartStrategies
  3. import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
  4. import org.apache.flink.api.common.time.Time
  5. import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
  6. import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
  7. import org.apache.flink.runtime.state.filesystem.FsStateBackend
  8. import org.apache.flink.streaming.api.CheckpointingMode
  9. import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
  10. import org.apache.flink.streaming.api.environment.CheckpointConfig
  11. import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
  12. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  13. import org.apache.flink.streaming.api.scala._
  14. object ListOperate {
  15.   def main(args: Array[String]): Unit = {
  16.     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  17.     env.setParallelism(1)
  18.     env.enableCheckpointing(5000)
  19.     env.setStateBackend(new FsStateBackend("hdfs://node01:8020/tmp/check/8"))
  20.     env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  21.     env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  22.     env.getCheckpointConfig.setCheckpointTimeout(60000)
  23.     env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  24.     env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  25.     //重启策略
  26.     env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(1), Time.seconds(5)))
  27.     //模拟kakfa偏移量
  28.     env.addSource(new MyRichParrelSourceFun)
  29.       .print()
  30.     env.execute()
  31.   }
  32. }
  33. class MyRichParrelSourceFun extends RichParallelSourceFunction[String]
  34.   with CheckpointedFunction {
  35.   var listState: ListState[Long] = _
  36.   var offset: Long = 0L
  37.   //任务运行
  38.   override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
  39.     val iterState: util.Iterator[Long] = listState.get().iterator()
  40.     while (iterState.hasNext) {
  41.       offset = iterState.next()
  42.     }
  43.     while (true) {
  44.       offset += 1
  45.       ctx.collect("offset:"+offset)
  46.       Thread.sleep(1000)
  47.       if(offset > 10){
  48.         1/0
  49.       }
  50.     }
  51.   }
  52.   //取消任务
  53.   override def cancel(): Unit = ???
  54.   //制作快照
  55.   override def snapshotState(context: FunctionSnapshotContext): Unit = {
  56.     listState.clear()
  57.     listState.add(offset)
  58.   }
  59.   //初始化状态
  60.   override def initializeState(context: FunctionInitializationContext): Unit = {
  61.     listState = context.getOperatorStateStore.getListState(new ListStateDescriptor[Long](
  62.       "listState", TypeInformation.of(new TypeHint[Long] {})
  63.     ))
  64.   }
  65. }
复制代码
3. Broadcast State

Broadcast State 是 Flink 1.5 引入的新特性。在开发过程中,如果遇到需要下发/广播配置、规则等低吞吐事件流到下游全部 task 时,就可以使用 Broadcast State 特性。下游的 task 吸取这些配置、规则并保存为 BroadcastState, 将这些配置应用到另一个数据流的盘算中 。

  • API介绍
    通常,我们首先会创建一个Keyed或Non-Keyed的Data Stream,然后再创建一个Broadcasted Stream,末了通过Data Stream来毗连(调用connect方法)到Broadcasted Stream上,这样实现将Broadcast State广播到Data Stream下游的每个Task中。
    如果Data Stream是Keyed Stream,则毗连到Broadcasted Stream后,添加处理ProcessFunction时需要使用KeyedBroadcastProcessFunction来实现,下面是KeyedBroadcastProcessFunction的API,代码如下所示:
  1. public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
  2.     public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
  3.     public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
  4. }
复制代码
上面泛型中的各个参数的含义,说明如下:
KS:表示Flink步伐从最上游的Source Operator开始构建Stream,当调用keyBy时所依赖的Key的类型;
IN1:表示非Broadcast的Data Stream中的数据记载的类型;
IN2:表示Broadcast Stream中的数据记载的类型;
OUT:表示经过KeyedBroadcastProcessFunction的processElement()和processBroadcastElement()方法处理后输出效果数据记载的类型。
如果Data Stream是Non-Keyed Stream,则毗连到Broadcasted Stream后,添加处理ProcessFunction时需要使用BroadcastProcessFunction来实现,下面是BroadcastProcessFunction的API,代码如下所示:
  1. public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
  2.   public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
  3.   public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
  4.     }
复制代码
上面泛型中的各个参数的含义,与前面KeyedBroadcastProcessFunction的泛型类型中的后3个含义相同,只是没有调用keyBy操纵对原始Stream举行分区操纵,就不需要KS泛型参数。
注意事项:
1.Broadcast State 是Map类型,即K-V类型。
2.Broadcast State 只有在广播一侧的方法中processBroadcastElement可以修改;在非广播一侧方法中processElement只读。
3.Broadcast State在运行时保存在内存中。
2) 场景举例
1.动态更新盘算规则: 如事件流需要根据最新的规则举行盘算,则可将规则作为广播状态广播到下游Task中。
2.实时增加额外字段: 如事件流需要实时增加用户的基础信息,则可将用户的基础信息作为广播状态广播到下游Task中。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

兜兜零元

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表