马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
×
征象
查察flink源码时 sink kafka有变乱提交机制,查察源码发现是使用两阶段提交战略,而变乱提交是checkpoint完成后才实行,那么假如checkpoint设置隔断时间比力长时,变乱未提交之前,后端应该斲丧不到数据,而观察现实征象为写入kafka的斲丧数据可以立马斲丧。
测试用例
测试流程
- 编写任务1,设置较长的checkpoint时间,而且指定 CheckpointingMode.EXACTLY_ONCE,输出输出到kafka。
- 编写任务2斲丧任务的结果topic,打印控制台,验证结果。
- 根据征象查察源码,分析缘故起因。
测试用例
测试任务1
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.enableCheckpointing(1000*60l, CheckpointingMode.EXACTLY_ONCE);
- env.getCheckpointConfig().setCheckpointStorage("file:///flink/checkpoint");
- // 超时时间,checkpoint没在时间内完成则丢弃
- env.getCheckpointConfig().setCheckpointTimeout(50000L); //10秒
- env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
- env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
- //最小间隔时间(前一次结束时间,与下一次开始时间间隔)
- env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
- // 当 Flink 任务取消时,保留外部保存的 checkpoint 信息
-
- KafkaSource<String> source = KafkaSource.<String>builder()
- .setBootstrapServers("127.0.0.1:9092")
- .setTopics("test001")
- .setGroupId("my-group")
- // .setStartingOffsets(OffsetsInitializer())
- .setStartingOffsets(OffsetsInitializer.committedOffsets())
- .setValueOnlyDeserializer(new SimpleStringSchema())
- .build();
- DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
- // 从文件读取数据
- // DataStream<SensorReading> dataStream = env.addSource( new SourceTest4.MySensorSource() );
- DataStream<String> map = kafkaSource.map(new MapFunction<String, String>() {
- @Override
- public String map(String s) throws Exception {
-
- return s;
- }
- });
- Properties properties = new Properties();
- // 根据上面的介绍自己计算这边的超时时间,满足条件即可
- properties.setProperty("transaction.timeout.ms","900000");
- // properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
- KafkaSink<String> sink = KafkaSink.<String>builder()
- .setBootstrapServers("192.168.65.128:9092")
- .setRecordSerializer(KafkaRecordSerializationSchema.<String>builder()
- .setTopic("test002")
- .setValueSerializationSchema(new SimpleStringSchema())
- .build()
- )
- .setKafkaProducerConfig(properties)
- .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
- // .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
- .setTransactionalIdPrefix("flink-xhaodream-")
- .build();
- map.sinkTo(sink);
- // 打印输出
- env.execute();
复制代码 测试任务2
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.enableCheckpointing(1000*150l, CheckpointingMode.EXACTLY_ONCE);
- env.getCheckpointConfig().setCheckpointStorage("file:///flink/checkpoint");
- // 当 Flink 任务取消时,保留外部保存的 checkpoint 信息
- Properties properties1 = new Properties();
- // properties1.put("isolation.level","read_committed");
- KafkaSource<String> source = KafkaSource.<String>builder()
- .setBootstrapServers("127.0.0.1:9092")
- .setTopics("test002")
- .setGroupId("my-group2")
- .setProperties(properties1)
- .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
- .setValueOnlyDeserializer(new SimpleStringSchema())
- .build();
- DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
- kafkaSource.print(" test2接受数据");
- // 打印输出
- env.execute();
复制代码 测试结果分析
测试结果:
任务1开启后,无论是否实行checkpoint,任务checkpoint都可以正常斲丧数据,与预期不符合。
缘故起因排查
查察kafkaSink 的源码,找到跟与两阶段提交干系的代码,1.18源码中TwoPhaseCommittingSink有重构。kafkasink实现TwoPhaseCommittingSink接口实现,创建Commiter和Writer。
- @PublicEvolving
- public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {
- PrecommittingSinkWriter<InputT, CommT> createWriter(Sink.InitContext var1) throws IOException;
- Committer<CommT> createCommitter() throws IOException;
- SimpleVersi
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |