flink sink kafka的变乱提交征象料想

[复制链接]
发表于 2025-12-8 10:31:21 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

×
征象

查察flink源码时 sink kafka有变乱提交机制,查察源码发现是使用两阶段提交战略,而变乱提交是checkpoint完成后才实行,那么假如checkpoint设置隔断时间比力长时,变乱未提交之前,后端应该斲丧不到数据,而观察现实征象为写入kafka的斲丧数据可以立马斲丧。
测试用例

测试流程

      
  • 编写任务1,设置较长的checkpoint时间,而且指定 CheckpointingMode.EXACTLY_ONCE,输出输出到kafka。  
  • 编写任务2斲丧任务的结果topic,打印控制台,验证结果。  
  • 根据征象查察源码,分析缘故起因。
测试用例

测试任务1

  
  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2.         env.setParallelism(1);
  3.         env.enableCheckpointing(1000*60l, CheckpointingMode.EXACTLY_ONCE);
  4.         env.getCheckpointConfig().setCheckpointStorage("file:///flink/checkpoint");
  5. //        超时时间,checkpoint没在时间内完成则丢弃
  6.         env.getCheckpointConfig().setCheckpointTimeout(50000L); //10秒
  7.         env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
  8.         env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
  9.         //最小间隔时间(前一次结束时间,与下一次开始时间间隔)
  10.         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
  11. //        当 Flink 任务取消时,保留外部保存的 checkpoint 信息
  12.         
  13.         KafkaSource<String> source = KafkaSource.<String>builder()
  14.                 .setBootstrapServers("127.0.0.1:9092")
  15.                 .setTopics("test001")
  16.                 .setGroupId("my-group")
  17. //                .setStartingOffsets(OffsetsInitializer())
  18.                 .setStartingOffsets(OffsetsInitializer.committedOffsets())
  19.                 .setValueOnlyDeserializer(new SimpleStringSchema())
  20.                 .build();
  21.         DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
  22.         // 从文件读取数据
  23. //        DataStream<SensorReading> dataStream = env.addSource( new SourceTest4.MySensorSource() );
  24.         DataStream<String> map = kafkaSource.map(new MapFunction<String, String>() {
  25.             @Override
  26.             public String map(String s) throws Exception {
  27.             
  28.                 return s;
  29.             }
  30.         });
  31.         Properties properties = new Properties();
  32. // 根据上面的介绍自己计算这边的超时时间,满足条件即可
  33.         properties.setProperty("transaction.timeout.ms","900000");
  34. //        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
  35.         KafkaSink<String> sink = KafkaSink.<String>builder()
  36.                 .setBootstrapServers("192.168.65.128:9092")
  37.                 .setRecordSerializer(KafkaRecordSerializationSchema.<String>builder()
  38.                         .setTopic("test002")
  39.                         .setValueSerializationSchema(new SimpleStringSchema())
  40.                         .build()
  41.                 )
  42.                 .setKafkaProducerConfig(properties)
  43.                 .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
  44. //                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
  45.                 .setTransactionalIdPrefix("flink-xhaodream-")
  46.                 .build();
  47.         map.sinkTo(sink);
  48.         // 打印输出
  49.         env.execute();
复制代码
测试任务2

  
  1.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2.         env.setParallelism(1);
  3.         env.enableCheckpointing(1000*150l, CheckpointingMode.EXACTLY_ONCE);
  4.         env.getCheckpointConfig().setCheckpointStorage("file:///flink/checkpoint");
  5. //        当 Flink 任务取消时,保留外部保存的 checkpoint 信息
  6.         Properties properties1 = new Properties();
  7. //        properties1.put("isolation.level","read_committed");
  8.         KafkaSource<String> source = KafkaSource.<String>builder()
  9.                 .setBootstrapServers("127.0.0.1:9092")
  10.                 .setTopics("test002")
  11.                 .setGroupId("my-group2")
  12.                 .setProperties(properties1)
  13.                 .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
  14.                 .setValueOnlyDeserializer(new SimpleStringSchema())
  15.                 .build();
  16.         DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
  17.         kafkaSource.print(" test2接受数据");
  18.         // 打印输出
  19.         env.execute();
复制代码
测试结果分析

测试结果:

任务1开启后,无论是否实行checkpoint,任务checkpoint都可以正常斲丧数据,与预期不符合。
缘故起因排查

查察kafkaSink 的源码,找到跟与两阶段提交干系的代码,1.18源码中TwoPhaseCommittingSink有重构。kafkasink实现TwoPhaseCommittingSink接口实现,创建Commiter和Writer。
  
  1. @PublicEvolving
  2. public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {
  3.     PrecommittingSinkWriter<InputT, CommT> createWriter(Sink.InitContext var1) throws IOException;
  4.     Committer<CommT> createCommitter() throws IOException;
  5.     SimpleVersi
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

登录后关闭弹窗

登录参与点评抽奖  加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表