【详细介绍及演示】Flink之checkpoint查抄点的使用

打印 上一主题 下一主题

主题 897|帖子 897|积分 2691

目次

一、介绍
二、 设置checkpoint查抄点演示
1、 代码演示
 2、测试代码效果
3、查看快照环境
​编辑
三、在集群上运行
1、第一次运行
2、第二次运行
四、自定义查抄点savePoint
1、提交一个flink job  打成jar包
2、输入一些数据,观察单词对应的数字的变革
​编辑 3、执行savepoint操纵,添加查抄点
 4、查看最近完成的flink job对应的savepoint
5、重新启动flink job,进行测试
6、观察变革
五、总结


一、介绍

Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息。
一句话概括: Checkpoint就是State的快照。
二、 设置checkpoint查抄点演示

简单的举例说明:
1、 代码演示

  1. package com.bigdata.checkpoint;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.api.common.functions.FlatMapFunction;
  4. import org.apache.flink.api.common.functions.MapFunction;
  5. import org.apache.flink.api.common.restartstrategy.RestartStrategies;
  6. import org.apache.flink.api.common.time.Time;
  7. import org.apache.flink.api.java.functions.KeySelector;
  8. import org.apache.flink.api.java.tuple.Tuple2;
  9. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  10. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  11. import org.apache.flink.streaming.api.environment.CheckpointConfig;
  12. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  13. import org.apache.flink.util.Collector;
  14. import java.util.concurrent.TimeUnit;
  15. public class CheckPointWordCountDemo {
  16.     public static void main(String[] args) throws Exception {
  17.         //1. env-准备环境
  18.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19.         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  20.         // 在windows运行,将数据提交hdfs,会出现权限问题,使用这个语句解决。
  21.         System.setProperty("HADOOP_USER_NAME", "root");
  22.         // 在这个基础之上,添加快照
  23.         // 第一句:开启快照,每隔1s保存一次快照
  24.         env.enableCheckpointing(1000);
  25.         // 第二句:设置快照保存的位置
  26.         env.setStateBackend(new FsStateBackend("hdfs://bigdata01:9820/flink/checkpoint"));
  27.         // 第三句: 通过webui的cancel按钮,取消flink的job时,不删除HDFS的checkpoint目录
  28.         env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  29.         //checkpoint默认重启策略是一直重启,可以自己定义重启策略
  30.         //重启策略可以单独使用,不设置checkpoint也可使用   //savepoint可以手动使用命令设置checkpoint
  31.         //2分钟内重启3次,重启时间间隔是5s
  32.         env.setRestartStrategy(
  33.                 RestartStrategies.failureRateRestart(3,
  34.                         Time.of(2, TimeUnit.MINUTES),
  35.                         Time.of(5,TimeUnit.SECONDS))
  36.         );
  37.         //2. source-加载数据
  38.         DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
  39.         //3. transformation-数据处理转换
  40.         source.flatMap(new FlatMapFunction<String, String>() {
  41.             @Override
  42.             public void flatMap(String line, Collector<String> collector) throws Exception {
  43.                 String[] arr = line.split(" ");
  44.                 for (String word : arr) {
  45.                     collector.collect(word);
  46.                 }
  47.             }
  48.         }).map(new MapFunction<String, Tuple2<String,Integer>>() {
  49.             @Override
  50.             public Tuple2<String, Integer> map(String word) throws Exception {
  51.                 return Tuple2.of(word,1);
  52.             }
  53.         }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
  54.             @Override
  55.             public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
  56.                 return tuple2.f0;
  57.             }
  58.         }).sum(1).print();
  59.         //4. sink-数据输出
  60.         //5. execute-执行
  61.         env.execute();
  62.     }
  63. }
复制代码
 2、测试代码效果

首先启动当地的nc, 启动hdfs服务

3、查看快照环境


运行,刷新查看checkpoint生存的数据,它会老师成一个新的文件夹,然后再删除老的文件夹,在某一时刻,会出现两个文件夹同时存在的环境。



三、在集群上运行

 首先启动flink:start-cluster.sh
由上一步可以发现数据是生存了,但是并没有起作用,想起作用需要在集群上运行,以下演示集群上的效果:
1、第一次运行

在当地先clean, 再package ,再Wagon一下:


在bigdata01服务器上执行以下下令 
  1. #flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jar
  2. flink run -c com.bigdata.day06._01CheckPointDemo /opt/app/FlinkDemo-1.0-SNAPSHOT.jar
  3. #记得,先启动nc ,再启动任务,否则报错!
复制代码
通过nc -lk 9999 输入以下内容:



进入bigdata01:8081页面上查看结果 

想查看运行结果,可以通过使用的slot数目判断一下:




取消flink job的运行


 查看一下这次的单词统计到哪个数字了:


2、第二次运行

  1. #flink run -c 全类名  -s hdfs://hadoop10:8020/flink-checkpoint/293395ef7e496bda2eddd153a18d5212/chk-34  /opt/app/flink-test-1.0-SNAPSHOT.jar
  2. #启动
  3. flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603  /opt/app/FlinkDemo-1.0-SNAPSHOT.jar
复制代码
-s 指定从checkpoint目次规复状态数据 ,注意每个人都不一样
从上一次离开时,停止的checkpoint目次




观察数据:在nc 上输入一个hello,1 得到新的结果hello,8


四、自定义查抄点savePoint

checkpoint自动完成state快照、savePoint是手动的完成快照。
如果步伐在没有设置checkpoint的环境,可以通过savePoint设置state快照
1、提交一个flink job  打成jar包

  1. package com.bigdata.checkpoint;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.api.common.functions.FlatMapFunction;
  4. import org.apache.flink.api.common.functions.MapFunction;
  5. import org.apache.flink.api.common.restartstrategy.RestartStrategies;
  6. import org.apache.flink.api.common.time.Time;
  7. import org.apache.flink.api.java.functions.KeySelector;
  8. import org.apache.flink.api.java.tuple.Tuple2;
  9. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  10. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  11. import org.apache.flink.streaming.api.environment.CheckpointConfig;
  12. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  13. import org.apache.flink.util.Collector;
  14. import java.util.concurrent.TimeUnit;
  15. public class CheckPointWordCountDemo {
  16.     public static void main(String[] args) throws Exception {
  17.         //1. env-准备环境
  18.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19.         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  20.         //重启策略可以单独使用,不设置checkpoint也可使用  
  21.         //savepoint可以手动使用命令设置checkpoint
  22.         //2分钟内重启3次,重启时间间隔是5s
  23.         env.setRestartStrategy(
  24.                 RestartStrategies.failureRateRestart(3,
  25.                         Time.of(2, TimeUnit.MINUTES),
  26.                         Time.of(5,TimeUnit.SECONDS))
  27.         );
  28.         //2. source-加载数据
  29.         DataStreamSource<String> source = env.socketTextStream("bigdata01", 2727);
  30.         //3. transformation-数据处理转换
  31.         source.flatMap(new FlatMapFunction<String, String>() {
  32.             @Override
  33.             public void flatMap(String line, Collector<String> collector) throws Exception {
  34.                 String[] arr = line.split(",");
  35.                 for (String word : arr) {
  36.                     collector.collect(word);
  37.                 }
  38.             }
  39.         }).map(new MapFunction<String, Tuple2<String,Integer>>() {
  40.             @Override
  41.             public Tuple2<String, Integer> map(String word) throws Exception {
  42.                 //自制一个bug用来测试
  43.                 if(word.equals("bug")){
  44.                     throw new Exception("出错了,请重试");
  45.                 }
  46.                 return Tuple2.of(word,1);
  47.             }
  48.         }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
  49.             @Override
  50.             public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
  51.                 return tuple2.f0;
  52.             }
  53.         }).sum(1).print();
  54.         //4. sink-数据输出
  55.         //5. execute-执行
  56.         env.execute();
  57.     }
  58. }
复制代码
执行改任务 
2、输入一些数据,观察单词对应的数字的变革

 3、执行savepoint操纵,添加查抄点




  • 停止flink job,并且触发savepoint操纵
  1. flink stop --savepointPath  hdfs://bigdata01:9820/flink-savepoint  152e493da9cdeb327f6cbbad5a7f8e41
复制代码
    背面的序号为Job 的ID


  • 不会停止flink的job,只是完成savepoint操纵(执行这个操纵)
  1. flink savepoint 79f53c5c0bb3563b6b6ed3011176c411 hdfs://bigdata01:9820/flink-savepoint
复制代码


  •  停止一个 flink 的任务
  1. flink stop 6a27b580aa5c6b57766ae6241d9270ce
复制代码
    背面的序号为Job 的ID
 4、查看最近完成的flink job对应的savepoint

 

发现任务中已经有查抄点 
5、重新启动flink job,进行测试

停止任务后,查看最终查抄点的路径
然后重新启动
  1. flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink-savepoint/savepoint-79f53c-64b5d94771eb
  2. /opt/app/flink-test-1.0-SNAPSHOT.jar
复制代码
-c后是全类名,-s 后是查抄的路径 ,最后一部门是jar包的位置
6、观察变革

再次输入单词,可以看到在之前的底子上累加

另外,在集群中运行我们的步伐,默认并行度为1,它不会按照呆板的CPU核数,而是按照配置文件中的一个默认值运行的
五、总结

有两种添加查抄点的方式:
1、在java代码中自动添加
在执行任务时会在hdfs上创建查抄点
  1. // 第一句:开启快照,每隔1s保存一次快照
  2. env.enableCheckpointing(1000);
  3. // 第二句:设置快照保存的位置
  4. env.setStateBackend(new FsStateBackend("hdfs://bigdata01:9820/flink/checkpoint"));
  5. // 第三句: 通过webui的cancel按钮,取消flink的job时,不删除HDFS的checkpoint目录
  6.   env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
复制代码
2、在集群上通过下令在指定位置手动添加
flink savepoint 任务号  hdfs://bigdata01:9820/flink-savepoint

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

王海鱼

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表