Flink的简朴学习四

铁佛  金牌会员 | 2024-6-10 15:02:46 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 666|帖子 666|积分 1998

一 有状态计算

1.1 概念

1.状态;上一次计算的结果
2.必要基于上一个结果来进行计算,被称为有状态计算
1.2 未使用有状态计算

1.下面这个代码将相同的key发送到同一个task任务内里计算。就是由于这个导致了,明明之前没有输入b,但是输入b之后,立马酿成了2个。说明他是将上一条计算结果直接拿来用了,没有考虑key是不是一样
2.process算子可以在kvDS上面直接进行操作,内里必要传入重写了KeyedProcessFunction内里的processElement方法的对象。
  1. package com.shujia.flink.state;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.datastream.KeyedStream;
  5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
  8. import org.apache.flink.util.Collector;
  9. public class Demo1State {
  10.     public static void main(String[] args)throws Exception {
  11.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12.         DataStream<String> linesDS = env.socketTextStream("master", 8888);
  13.         KeyedStream<String, String> keyByDS = linesDS.keyBy(word -> word);
  14.         //KeyedProcessFunction<KEY, T, R> keyedProcessFunction
  15.         SingleOutputStreamOperator<Tuple2<String, Integer>> process = keyByDS.process(new KeyedProcessFunction<String, String, Tuple2<String, Integer>>() {
  16.             int count = 0;
  17.             /**
  18.              *
  19.              * @param word 一行数据
  20.              * @param ctx 上下文对象
  21.              * @param out 用于将结果发送到下游
  22.              *
  23.              */
  24.             @Override
  25.             public void processElement(String word,
  26.                                        KeyedProcessFunction<String, String, Tuple2<String, Integer>>.Context ctx,
  27.                                        Collector<Tuple2<String, Integer>> out) throws Exception {
  28.                 out.collect(Tuple2.of(word, count));
  29.                 count++;
  30.             }
  31.         });
  32.         process.print();
  33.         env.execute();
  34.     }
  35. }
复制代码


1.3 使用有状态计算

1.使用HashMap保存结果。
  1. package com.shujia.flink.state;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.datastream.KeyedStream;
  5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
  8. import org.apache.flink.util.Collector;
  9. import java.util.HashMap;
  10. public class Demo2State {
  11.     public static void main(String[] args)throws Exception {
  12.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13.         DataStream<String> wordsDS = env.socketTextStream("master", 8888);
  14.         //分组
  15.         KeyedStream<String, String> keyByDS = wordsDS.keyBy(word -> word);
  16.         /*
  17.          * process算子时flink提供的一个底层算子,可以获取到flink底层的状态,时间和数据
  18.          */
  19.         DataStream<Tuple2<String, Integer>> countDS = keyByDS
  20.                 .process(new KeyedProcessFunction<String, String, Tuple2<String, Integer>>() {
  21.                     //保存之前统计的结果(状态)
  22.                     //问题:同一个task中的数据共享同一个count变量
  23.                     //int count = 0;
  24.                     //需要为每一个key保存一个结果
  25.                     //使用单词作为key,数量作为value
  26.                     //问题:使用hashmap保存计算的中间结果,flink的checkpoint不会将hashmap中的数据持久化到hdfs总
  27.                     //所以任务失败重启会丢失之前的结果
  28.                     final HashMap<String, Integer> map = new HashMap<>();
  29.                     /**
  30.                      * processElement方法每一条数据执行一次
  31.                      * @param word 一行数据
  32.                      * @param ctx 上下文对象,可以获取到flink的key和时间属性
  33.                      * @param out 用于将处理结果发送到下游
  34.                      */
  35.                     @Override
  36.                     public void processElement(String word,
  37.                                                KeyedProcessFunction<String, String, Tuple2<String, Integer>>.Context ctx,
  38.                                                Collector<Tuple2<String, Integer>> out) throws Exception {
  39.                         System.out.println(map);
  40.                         //1、通过key获取value
  41.                         //获取之前的结果(状态)
  42.                         Integer count = map.getOrDefault(word, 0);
  43.                         //基于之前的结果进行计算
  44.                         count++;
  45.                         //将计算结果发送到下游
  46.                         out.collect(Tuple2.of(word, count));
  47.                         //更新之前的结果
  48.                         map.put(word, count);
  49.                     }
  50.                 });
  51.         countDS.print();
  52.         env.execute();
  53.     }
  54. }
复制代码
运行事后输入f,会提示之前没有数据,输入h,会提示有个f,由于是在同一个task内里,再输入一个f,会显示2个h一个f


2.但是这些结果都不能长期化保存,想要长期化保存请看2.4节 
 二 checkpointing

2.1 概念

1.可以定时将flink计算的状态长期化到hdfs中,如果任务执行失败,可以基于hdfs中保存到的状态恢复任务,包管之前的结果不丢失。
2.2 设置

2.2.1 代码中设置

1.代码
flink计算的状态会先保存在taskmanager中,当触发checkpoint时会将状态长期化到hdfs中
  1. // 每 1000ms 开始一次 checkpoint
  2. env.enableCheckpointing(5000);
  3. // 高级选项:
  4. // 当手动取消任务时,是否保留HDFS中保留hdfs中的快照
  5. env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  6. //flink计算的状态会先保存在taskmanager中,当触发checkpoint时会将状态持久化到hdfs中
  7. //指定状态在算子中保存的位置(状态后端)
  8. //HashMapStateBackend:将状态保存在taskmanager的内存中
  9. env.setStateBackend(new HashMapStateBackend());
  10. //指定checkpoint保存快照的位置
  11. env.getCheckpointConfig().setCheckpointStorage("hdfs://master:9000/flink/checkpoint");
复制代码
1.ui界面第一次提交

提交jar包跟主类名即可


2.任务取消后,基于hdfs的快照重启任务 

必要找到快照的位置,先在任务取消之前,查察任务id

 再去hdfs上找到这个id相干的路径

 提交的时候加上hdfs://master:9000//然后后面跟上路径

 3.下令行第一次提交

flink run -t yarn-session -p 3 -Dyarn.application.id=application_1717552958247_0001 -c com.shujia.flink.state.Demo1CheckPointing flink-1.0.jar
4.任务取消大概失败后重新提交

1.先在hdfs上找到相对应的任务编号,然后点到chk那边

2.输入下令
flink run -t yarn-session -p 3 -Dyarn.application.id=application_1717552958247_0001 -c com.shujia.flink.state.Demo1CheckPointing -s
hdfs://master:9000/flink/checkpoint/9f54421b62240b04fbde1bc413c98934/chk-2105 flink-1.0.jar

2.2.2 配置文件中设置

1.修改flink-conf.yaml,然后重启Hadoop
  1. execution.checkpointing.interval: 5000
  2. execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
  3. execution.checkpointing.max-concurrent-checkpoints: 1
  4. execution.checkpointing.min-pause: 0
  5. execution.checkpointing.mode: EXACTLY_ONCE
  6. execution.checkpointing.timeout: 10min
  7. execution.checkpointing.tolerable-failed-checkpoints: 0
  8. execution.checkpointing.unaligned: false
  9. state.backend: hashmap
  10. state.checkpoints.dir: hdfs://master:9000/flink/checkpoint
复制代码
2.提交的方法跟上面一样
2.3 原理

1.JobManager的checkpoint Coordonator(协调器)定期向SourceTask发送Checkpoint Trigger(触发器)。
2.SourceTask在数据流中安排Checkpoint barrier(停滞)
3.SourceTask向下游通报barrier,并自身同步进行快照并将状态写入长期化存储中。
4.整个Task完成后,会汇总终极的快照结果,并将之前的快照删除

 
        

2.4 checkpoint所辨认的ValueState

1.由于1.3节使用Java主动HashMap不能被Flink辨认,中间状态不能被长期化保留,所以我们要用flink自带的接口去吸收中间状态
2.中间状态可以吸收的接口


  • ValueState<T>: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子吸收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。
  • ListState<T>: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 大概 addAll(List<T>) 进行添加元素,通过 Iterable<T> get() 获得整个列表。还可以通过 update(List<T>) 覆盖当前的列表。
  • ReducingState<T>: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。
  • AggregatingState<IN, OUT>: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型差别。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。
  • MapState<UK, UV>: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 大概 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包罗任何键值对
3.我们使用ValueState,必要在底层算子process中,先重写open方法,用来创建状态吸收对象。
  1.   ValueState<Integer> valueState;
  2.                     //open方法每一个task启动的时候执行一次,一般用于初始化
  3.                     @Override
  4.                     public void open(Configuration parameters) throws Exception {
  5.                         //获取flink环境对象
  6.                         RuntimeContext runtimeContext = getRuntimeContext();
  7.                         //创建状态的描述对象。指定状态的类型和名称
  8.                         ValueStateDescriptor<Integer> valueStateDescriptor = new ValueStateDescriptor<>("count", Types.INT);
  9.                         //初始化状态
  10.                         //ValueState: 单值状态,为每一个key在状态中保存一个值
  11.                         valueState=runtimeContext.getState(valueStateDescriptor);
  12.                     }
复制代码
ValueState中的value方法是获取上一阶段的状态值,update是更新数据的。完整代码如下
  1. package com.shujia.flink.state;import org.apache.flink.api.common.functions.RuntimeContext;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.KeyedProcessFunction;import org.apache.flink.util.Collector;import java.util.HashMap;public class Demo4ValueState {    public static void main(String[] args)throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        DataStream<String> wordsDS = env.socketTextStream("master", 8888);        //分组        KeyedStream<String, String> keyByDS = wordsDS.keyBy(word -> word);        /*         * process算子时flink提供的一个底层算子,可以获取到flink底层的状态,时间和数据         */        DataStream<Tuple2<String, Integer>> countDS = keyByDS                .process(new KeyedProcessFunction<String, String, Tuple2<String, Integer>>() {                    ValueState<Integer> valueState;
  2.                     //open方法每一个task启动的时候执行一次,一般用于初始化
  3.                     @Override
  4.                     public void open(Configuration parameters) throws Exception {
  5.                         //获取flink环境对象
  6.                         RuntimeContext runtimeContext = getRuntimeContext();
  7.                         //创建状态的描述对象。指定状态的类型和名称
  8.                         ValueStateDescriptor<Integer> valueStateDescriptor = new ValueStateDescriptor<>("count", Types.INT);
  9.                         //初始化状态
  10.                         //ValueState: 单值状态,为每一个key在状态中保存一个值
  11.                         valueState=runtimeContext.getState(valueStateDescriptor);
  12.                     }                    /**                     * processElement方法每一条数据执行一次                     * @param word 一行数据                     * @param ctx 上下文对象,可以获取到flink的key和时间属性                     * @param out 用于将处理结果发送到下游                     */                    @Override                    public void processElement(String word,                                               KeyedProcessFunction<String, String, Tuple2<String, Integer>>.Context ctx,                                               Collector<Tuple2<String, Integer>> out) throws Exception {                        //获取状态中保存和值                        Integer count = valueState.value();                        //判断count是否为null                        if (count==null){                            count=0;                        }                        //累加计算                        count++;                        //将结果发送到下游                        out.collect(Tuple2.of(word,count));                        //更新数据                        valueState.update(count);                    }                });        countDS.print();        env.execute();    }}
复制代码
这个是我第一次执行任务输入的

取消任务,看看能不能保存这个状态,然后提交重新提交任务,
发现还在
 

三 Exactly Once

3.1 生产端

1.kafka 0.11之后,Producer的send操作现在是幂等的,在任何导致producer重试的情况下,相同的消息,如果被producer发送多次,也只会被写入Kafka一次 ACKS机制+副本,包管数据不丢失

 kafka保存数据处理的唯一一次:
幂等性:保持数据不重复
事务:保存数据不重复
ACKS+副本:包管数据不丢失
3.1.1 kafka事务

1.开启事务
  1. package com.shujia.flink.state;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.Producer;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import java.util.Properties;
  6. public class Demo6KafkaAffairs {
  7.     public static void main(String[] args)throws Exception {
  8.         Properties properties = new Properties();
  9.         //指定broker列表
  10.         properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");
  11.         //指定key和value的数据格式
  12.         properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  13.         properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  14.         //给事务取一个名字
  15.         properties.setProperty("transactional.id", "hhh");
  16.         Producer<String, String> producer = new KafkaProducer<String, String>(properties);
  17.         //开启事务
  18.         producer.initTransactions();
  19.         producer.beginTransaction();
  20.         //ProducerRecord<K, V> record
  21.         producer.send(new ProducerRecord<>("train","java"));
  22.         Thread.sleep(10000);
  23.         producer.send(new ProducerRecord<>("train","flink"));
  24.         //提交事务
  25.         producer.commitTransaction();
  26.         producer.flush();
  27.         producer.close();
  28.     }
  29. }
复制代码
2.消费的下令记得使用读并提交。发现两条数据是一起过来的,如果中间有一条数据是失败的,那么整个数据都过不来,这样包管了数据不重复。
3.1.2 ACKS+副本

1.topic创建是必要多个副本
2.将acks设置成-1大概all。
acks机制:当acks=1时(默认),当主分区写入成功,就会返回成功。如果这个时候主分区所在的节点挂了,刚刚写入的数据就会丢失。当acks=0时,生产者只负责生产数据,不负责验证数据是否写入成功,会丢失数据,但是写入的性能好。当acks=-1大概all时,生产者生产数据后必须比及所有副本都同步成功才会返回成功,这样不会丢失数据,但是写入的性能差。
3.1.3 幂等性

Producer的send操作现在是幂等的,在任何导致producer重试的情况下,相同的消息,如果被producer发送多次,也只会被写入Kafka一次
3.2 消费端

1.Flink 分布式快照保存数据计算的状态和消费的偏移量,包管程序重启之后不丢失状态和消费偏移量
2.flink的数据源如果来自于socket,那么在发生checkpoint之前,有数据进去了并又取消了任务,那么这个数据没有写进hdfs。所以我们换数据源,换成Kafka的生产者产生的数据。这样checkpoint会定时将flink的计算状态和Kafka消费偏移量同时保存到hdfs中,这样不会丢失数据
  1. package com.shujia.flink.state;
  2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  4. import org.apache.flink.api.common.typeinfo.Types;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.connector.kafka.source.KafkaSource;
  7. import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
  8. import org.apache.flink.streaming.api.datastream.DataStream;
  9. import org.apache.flink.streaming.api.datastream.KeyedStream;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. public class Demo7ExactlyOnce {
  12.     public static void main(String[] args) throws Exception{
  13.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14.         //创建kafka source
  15.         KafkaSource<String> source = KafkaSource.<String>builder()
  16.                 .setBootstrapServers("master:9092,node1:9092,node2:9092")//kafka集群列表
  17.                 .setTopics("kafka_flink")//指定消费的topic
  18.                 .setGroupId("my-group")//指定消费者组
  19.                 .setStartingOffsets(OffsetsInitializer.earliest())
  20.                 .setValueOnlyDeserializer(new SimpleStringSchema())//指定读取数据的格式
  21.                 .build();
  22.         //使用kafka source
  23.         DataStream<String> wordsDS = env
  24.                 .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
  25.         //3、统计单词的数量
  26.         DataStream<Tuple2<String, Integer>> kvDS = wordsDS
  27.                 .map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));
  28.         //分组统计单词的数量
  29.         KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);
  30.         //对下标为1的列求和
  31.         DataStream<Tuple2<String, Integer>> countDS = keyByDS.sum(1);
  32.         //打印数据
  33.         countDS.print();
  34.         //启动flink
  35.         env.execute();
  36.     }
  37. }
复制代码
3.3 sink端

1.flink在聚合计算后将结果写进hdfs大概kafka中,如果在中间某一个时间有数据进去但是任务又取消大概失败了,但是这样结果不会重复。然而,在非聚合计算中,如果在中间某一个时间有数据进去但是任务又取消大概失败了,这样kafka大概hdfs中数据会重复
  1. package com.shujia.flink.state;
  2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  4. import org.apache.flink.connector.base.DeliveryGuarantee;
  5. import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
  6. import org.apache.flink.connector.kafka.sink.KafkaSink;
  7. import org.apache.flink.connector.kafka.source.KafkaSource;
  8. import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
  9. import org.apache.flink.streaming.api.datastream.DataStream;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import java.util.Properties;
  12. public class Demo8ExactlyOnceKafkaSink {
  13.     public static void main(String[] args)throws Exception {
  14.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15.         //创建kafka source
  16.         KafkaSource<String> source = KafkaSource.<String>builder()
  17.                 .setBootstrapServers("master:9092,node1:9092,node2:9092")//kafka集群列表
  18.                 .setTopics("kafka_flink")//指定消费的topic
  19.                 .setGroupId("my-group")//指定消费者组
  20.                 .setStartingOffsets(OffsetsInitializer.earliest())
  21.                 .setValueOnlyDeserializer(new SimpleStringSchema())//指定读取数据的格式
  22.                 .build();
  23.         //使用kafka source
  24.         DataStream<String> wordsDS = env
  25.                 .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
  26.         DataStream<String> filterDS = wordsDS.filter(word -> !"".equals(word));
  27. //        Properties properties = new Properties();
  28.         //指定事务超时时间,不能大于15分钟
  29. //        properties.setProperty("transaction.timeout.ms", 1000 * 60 * 10 + "");
  30.         //创建kafka sink
  31.         KafkaSink<String> sink = KafkaSink.<String>builder()
  32.                 .setBootstrapServers("master:9092,node1:9092,node2:9092")//kafka集群列表
  33. //                .setKafkaProducerConfig(properties)
  34.                 .setRecordSerializer(KafkaRecordSerializationSchema.builder()
  35.                         .setTopic("filter")//指定topic
  36.                         .setValueSerializationSchema(new SimpleStringSchema())//指定数据格式
  37.                         .build()
  38.                 )
  39.                 //指定数据处理的语义
  40.                 .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
  41.                 .build();
  42.         //使用kafka sink
  43.         filterDS.sinkTo(sink);
  44.         //启动flink
  45.         env.execute();
  46.     }
  47. }
复制代码
提交这个任务代码,执行一次后,再他执行checkpoint之前再次输入shujiashujia,取消任务,然后再通过上一次的checkpoint重启任务,发现:消费端居然消费了两次shujiashujia
生产端:

消费端:

2.为了避免在非聚合计算中,状态大概消费的偏移量存储到kafka大概hdfs中,数据不重复,我们必要开启 Kafka事务。代码如下
  1. package com.shujia.flink.state;
  2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  4. import org.apache.flink.connector.base.DeliveryGuarantee;
  5. import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
  6. import org.apache.flink.connector.kafka.sink.KafkaSink;
  7. import org.apache.flink.connector.kafka.source.KafkaSource;
  8. import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
  9. import org.apache.flink.streaming.api.datastream.DataStream;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import java.util.Properties;
  12. public class Demo8ExactlyOnceKafkaSink {
  13.     public static void main(String[] args)throws Exception {
  14.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15.         //创建kafka source
  16.         KafkaSource<String> source = KafkaSource.<String>builder()
  17.                 .setBootstrapServers("master:9092,node1:9092,node2:9092")//kafka集群列表
  18.                 .setTopics("kafka_flink")//指定消费的topic
  19.                 .setGroupId("my-group")//指定消费者组
  20.                 .setStartingOffsets(OffsetsInitializer.earliest())
  21.                 .setValueOnlyDeserializer(new SimpleStringSchema())//指定读取数据的格式
  22.                 .build();
  23.         //使用kafka source
  24.         DataStream<String> wordsDS = env
  25.                 .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
  26.         DataStream<String> filterDS = wordsDS.filter(word -> !"".equals(word));
  27.         Properties properties = new Properties();
  28.         //指定事务超时时间,不能大于15分钟
  29.         properties.setProperty("transaction.timeout.ms", 1000 * 60 * 10 + "");
  30.         //创建kafka sink
  31.         KafkaSink<String> sink = KafkaSink.<String>builder()
  32.                 .setBootstrapServers("master:9092,node1:9092,node2:9092")//kafka集群列表
  33.                 .setKafkaProducerConfig(properties)
  34.                 .setRecordSerializer(KafkaRecordSerializationSchema.builder()
  35.                         .setTopic("filter")//指定topic
  36.                         .setValueSerializationSchema(new SimpleStringSchema())//指定数据格式
  37.                         .build()
  38.                 )
  39.                 //指定数据处理的语义
  40.                 .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
  41.                 .build();
  42.         //使用kafka sink
  43.         filterDS.sinkTo(sink);
  44.         //启动flink
  45.         env.execute();
  46.     }
  47. }
复制代码
这样我们在生产端产生数据,只有产生checkpoint,才会消费数据(消费端使用读并提交的方式)
但是这样会增加数据延迟

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

铁佛

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表