Flink基本概念和算子利用

守听  金牌会员 | 2024-10-4 02:37:29 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 815|帖子 815|积分 2445

基础概念

Flink是一个框架和分布式处置惩罚引擎,用于对无界数据流有界数据流举行有状态计算,它的核心目标是“数据流上的有状态计算”。

有界流和无界流



  • 有界流:具有明白的开始和结束时间,数据量有限。适合利用批处置惩罚技术,可以在处置惩罚前将全部数据一次性读入内存举行处置惩罚。有界流通常用于历史数据分析、数据迁徙等场景。‌
  • 无界流:没有明白的开始和结束时间,数据接二连三天生。由于数据是无穷且持续的,无界流需要实时处置惩罚,并且必须持续摄取和处置惩罚数据,不能等待全部数据到达后再举行处置惩罚。适合适用于流处置惩罚。
名词

源算子(source)

Flink可以从各种来源获取数据,然后构建DataStream举行转换处置惩罚。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。
  1. // 创建执行环境
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. // 从集合读取数据
  4. DataStreamSource<Integer> collectionSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15));
  5. // 从文件中读取数据
  6. DataStreamSource<String> fileSource = env.fromSource(FileSource.forRecordStreamFormat(new TextLineInputFormat(),new Path("input/word.txt")).build(),WatermarkStrategy.noWatermarks(),"fileSource");
  7. //  从kafka读取数据
  8. DataStreamSource<String> kafkaSource = env.fromSource(KafkaSource.<String>builder()
  9.         // kafka地址--可配置多个
  10.         .setBootstrapServers("")
  11.         // topic名称--可配置多个
  12.         .setTopics("")
  13.         // 消费组id
  14.         .setGroupId("")
  15.         // 反序列化方式
  16.         .setValueOnlyDeserializer(new SimpleStringSchema())
  17.         // kafka 消费偏移量 方式 earliest(一定从最早的开始消费)、latest(一定从最新的开始消费)或者手动设置偏移量 ,默认是earliest
  18.         .setStartingOffsets(OffsetsInitializer.latest())
  19.         // 水位线,自定义数据源算子名称
  20.         .build(), WatermarkStrategy.noWatermarks(), "kafkaSource");
  21. // 从socket读取数据
  22. DataStreamSource<String> socketSource = env.socketTextStream("...","1234");
复制代码
基本转换算子

Map

对元素的数据类型和内容做转换。
  1. // 第一个参数为输入流,第二个参数为输出流
  2. SingleOutputStreamOperator<UserDto> userDataStream = kafkaSource.map(new MapFunction<String, UserDto>() {   
  3.     @Override     
  4.     public UserDto map(String message) throws Exception {        
  5.         return JSONObject.parseObject(message, UserDto.class);   
  6.     }
  7. });
复制代码
FlatMap

输入一个元素同时产生零个、一个或多个元素。
  1. // 第一个参数为输入流,第二个参数为输出流
  2. // 可做转换,可做条件过滤
  3. SingleOutputStreamOperator<UserDto> userDataStream =kafkaSource.flatMap(new FlatMapFunction<String, UserDto>() {   
  4.     @Override   
  5.     public void flatMap(String message, Collector<UserDto> collector) throws Exception {        
  6.         UserDto userDto = JSONObject.parseObject(message, UserDto.class);        
  7.         collector.collect(userDto);
  8.     }
  9. });
复制代码
Filter

对数据源根据条件过滤数据,保留满足条件的数据
  1. // 过滤出年龄大于18的用户
  2. SingleOutputStreamOperator<UserDto> filterDataStream = userDataStream.filter(new FilterFunction<UserDto>() {   
  3.      @Override   
  4.      public boolean filter(UserDto userDto) throws Exception {        
  5.         return userDto.getAge() > 18;   
  6.      }
  7. });
复制代码
聚合算子

KeyBy

根据指定的字段(key),将数据划分到不相交的分区中。相同key的元素会被分到同一个分区中。
  1. // 将用户id一样的用户分到一个分区内
  2. KeyedStream<UserDto, Integer> userKeyedStream = userDataStream.keyBy(new KeySelector<UserDto, Integer>() {   
  3.     @Override   
  4.     public Integer getKey(UserDto userDto) throws Exception {        
  5.         return userDto.getId();   
  6.     }
  7. });
复制代码
Reduce (仅支持同类型的数据)

对流的数据,来一条计算一条,将当前元素和上一次聚合后的数据组合,输出新值,并将新值举行保存,作为下一次计算的元素。
聚合前和聚合后的数据类型是一致的。
当第一条数据进来时,不会触发计算。
  1. // 计算一个用户的订单总价格
  2. SingleOutputStreamOperator<UserDto> reduce = userKeyedStream.reduce(new ReduceFunction<UserDto>() {
  3.     @Override
  4.     public UserDto reduce(UserDto t1, UserDto t2) throws Exception {
  5.         int totalPrice = t1.getTotalPrice() + t2.getOrderPrice();
  6.         UserDto userDto = new UserDto();
  7.         userDto.setId(t1.getId());
  8.         userDto.setAge(t1.getAge());
  9.         userDto.setTotalPrice(totalPrice);
  10.         return userDto;
  11.     }
  12. });
复制代码
Aggregate (支持差异类型的数据)

  1. SingleOutputStreamOperator<String> aggregate = windowedStream.aggregate(new AggregateFunction<UserDto, Integer, String>() {   
  2.     /**     
  3.     * 创建累加器,就是初始化累加器     
  4.     * @return     
  5.     */   
  6.     @Override   
  7.     public Integer createAccumulator() {      
  8.         return 0;   
  9.     }   
  10.     /**     
  11.     * 计算逻辑或者是聚合逻辑     
  12.     * @param userDto     
  13.     * @param beforeData     
  14.     * @return     
  15.     */   
  16.     @Override   
  17.     public Integer add(UserDto userDto, Integer beforeData) {        
  18.         return beforeData + userDto.getAge();   
  19.     }   
  20.     /**     
  21.     * 获取最终结果,窗口触发时输出     
  22.     * @param integer     
  23.     * @return     
  24.     */   
  25.     @Override   
  26.     public String getResult(Integer integer) {        
  27.         return "计算结束,最终结果为:" + integer.toString();   
  28.     }   
  29.     /**     
  30.     * 只有会话窗口才会使用到     
  31.     * @param integer     
  32.     * @param acc1     
  33.     * @return     
  34.     */   
  35.     @Override   
  36.     public Integer merge(Integer integer, Integer acc1) {        
  37.         return 0;   
  38.     }
  39. });
复制代码
窗口(window)

把流切割成有限巨细的多个“存储桶”;每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据举行计算处置惩罚。窗口不是静态天生的,是动态创建的。当这个窗口范围的进入第一条数据时,才会创建对应的窗口。
滚动窗口

有固定的巨细,是一种对数据举行“匀称切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。每个数据都会分配到一个窗口,而且只会属于一个窗口。滚动窗口可以基于时间定义,也可以基于数据的个数定义,需要的参数只有一个,就是窗口的巨细。
  1. // 分组
  2. KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(p -> p.f0);
  3. // 基于处理时间开窗,窗口长度为10s,窗口开始时间为 窗口长度整数倍向下取整,结束时间为开始时间+窗口长度
  4. WindowedStream<Tuple2<String, Integer>, String, TimeWindow> tumblingProcessingTimeStream = keyByStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
  5. // 基于事件时间开窗,窗口长度为10s,窗口开始时间为数据源事件时间,结束时间为开始时间+窗口长度
  6. WindowedStream<Tuple2<String, Integer>, String, TimeWindow> tumblingEventTimeStream = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(10)));
  7. // 基于次数开窗
  8. WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> countWindowStream = keyedStream.countWindow(10);
复制代码
滑动窗口

巨细是固定的,但是窗口之间不是收尾相接的,而是可以“错开”肯定的位置。定义滑动窗口的参数有2个:窗口巨细和滑动步长,滑动步长代表了窗口计算的频率。因此,如果 slide 小于窗口巨细,滑动窗口可以答应窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。
  1. // 分组
  2. KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(p -> p.f0);
  3. // 基于处理事件开窗,窗口长度为10s,滑动步长为1s
  4. WindowedStream<Tuple2<String, Integer>, String, TimeWindow> slidingProcessingTimeWindowStream = keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(1)));
  5. // 基于事件事件开窗,窗口长度为10s,滑动步长为1s
  6. WindowedStream<Tuple2<String, Integer>, String, TimeWindow> slidingEventTimeWindowStream = keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(1)));
  7. // 基于次数开窗
  8. WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> countWindowStream = keyedStream.countWindow(10, 1);
复制代码
会话窗口

是基于会话来对数据举行分组的。会话窗口只能基于时间来定义。会话窗口中,最重要的参数就是会话的超时时间,也就是两个会话窗口之间的最小距离。如果相邻两个数据到来的时间间隔(Gap)小于指定的巨细(size),那说明还在保持会话,他们就属于同一个窗口;如果gap大于Size,那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了。会话窗口的长度不固定,起始和结束时间也不是确定的,各个分区之间窗口没有任何关联。会话窗口之间肯定不会重叠的,而且会保留至少size的间隔。
  1. // 分组
  2. KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(p -> p.f0);
  3. // 基于处理时间开窗,会话间隔时间为10s
  4. WindowedStream<Tuple2<String, Integer>, String, TimeWindow> sessionWindow = keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));
  5. // 基于事件时间开窗,会话间隔时间为10s
  6. WindowedStream<Tuple2<String, Integer>, String, TimeWindow> sessionWindow = keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(10)));
复制代码
全局窗口

这种窗口全局有效,会把相同key的全部数据都分配到同一个窗口中。这种窗口没有结束的时间,默认是不会触发计算的。如果盼望它能对数据举行计算,还需要自定义“触发器”(Trigger)。全局窗口没有结束的时间点,所以一般在盼望做更加机动的窗口处置惩罚时自定义利用。Flink中的计数窗口底层就是用全局窗口实现的。
窗口触发器(trigger)

定义了窗口何时被触发并决定触发后的行为(如举行窗口数据的计算或清理)。
EventTimeTrigger

基于变乱时间和水印机制来触发窗口计算。当窗口的最大时间戳小于等于当前的水印时,立刻触发窗口计算。
ProcessingTimeTrigger

基于处置惩罚时间(即呆板的体系时间)来触发窗口计算。当处置惩罚时间达到窗口的结束时间时,触发窗口计算。
CountTrigger

根据窗口内元素的数量来触发计算。当窗口内的元素数量达到预设的阈值时,触发窗口计算。
关键方法



  • onElement(T element, long timestamp, W window, TriggerContext ctx)
    当元素被添加到窗口时调用,用于注册定时器或更新窗口状态。
  • onEventTime(long time, W window, TriggerContext ctx)
    当变乱时间计时器触发时调用,用于处置惩罚变乱时间相关的触发逻辑。
  • onProcessingTime(long time, W window, TriggerContext ctx)
    当处置惩罚时间计时器触发时调用,用于处置惩罚处置惩罚时间相关的触发逻辑。
  • onMerge(W window, OnMergeContext ctx)
    当两个窗口合并时调用,用于合并窗口的状态和定时器。
  • clear(W window, TriggerContext ctx)
    当窗口被删除时调用,用于清理窗口的状态和定时器。
  1. @Override
  2. public TriggerResult onElement(BatteryRuntimeFlinkDto batteryRuntimeDto, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {   
  3.     ReducingState<Long> countState = triggerContext.getPartitionedState(countStateDescriptor);   
  4.    
  5. }
  6. @Override
  7. public TriggerResult onProcessingTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {   
  8.     log.info("窗口清除定时器触发,清除计数器和定时器,并关窗");   
  9.     this.clear(globalWindow, triggerContext);   
  10.     return TriggerResult.PURGE;
  11. }
  12. @Override
  13. public TriggerResult onEventTime(long time, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {   
  14.     return TriggerResult.CONTINUE;
  15. }
  16. @Override
  17. public void clear(GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {   
  18.     // 清除计数器   
  19.     triggerContext.getPartitionedState(countStateDescriptor).clear();   
  20.     // 清除定时器   
  21.     triggerContext.deleteProcessingTimeTimer(triggerContext.getPartitionedState(processTimerDescription).get());
  22. }
复制代码
处置惩罚算子(process)

ProcessFunction

最基本的处置惩罚函数,基于DataStream直接调用.process()时作为参数传入。
  1. public class CabinetDetailProcessFunction extends ProcessFunction<CabinetDetailDto, BatteryPutTakeLogDataSourceDto> {        
  2.     //往redis中写入   
  3.     private transient RedisService redisService;   
  4.     private String platform;   
  5.     public CabinetDetailProcessFunction(String platform) {        
  6.         this.platform = platform;   
  7.     }   
  8.     @Override   
  9.     public void open(Configuration parameters) throws Exception {        
  10.         super.open(parameters);        
  11.         this.redisService = ApplicationContextHolder.getBean(RedisService.class);   
  12.     }   
  13.         @Override   
  14.         public void processElement(CabinetDetailDto cabinetDetailDto, Context context, Collector<BatteryPutTakeLogDataSourceDto> collector) throws Exception {        
  15.             
  16.         }
  17. }
复制代码
KeyedProcessFunction

对流按键分区后的处置惩罚函数,基于KeyedStream调用.process()时作为参数传入。
ProcessWindowFunction

开窗之后的处置惩罚函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。
  1. public class BatteryRuntimeProcessFunction extends ProcessWindowFunction<BatteryRuntimeFlinkDto, BatteryRuntimeFlinkDto, String, GlobalWindow> {   
  2. @Override   
  3. public void process(String s, Context context, Iterable<BatteryRuntimeFlinkDto> iterable, Collector<BatteryRuntimeFlinkDto> collector) throws Exception {        
  4.     List<BatteryRuntimeFlinkDto> batteryRuntimeDtos = new ArrayList<>();      
  5.     iterable.forEach(p -> batteryRuntimeDtos.add(p));      
  6.     if (CollectionUtils.isEmpty(batteryRuntimeDtos)) {            
  7.         return;        
  8.     }      
  9.     BatteryRuntimeFlinkDto batteryRuntimeFlinkDto =
  10.     batteryRuntimeDtos.get(0);            
  11.     collector.collect(batteryRuntimeFlinkDto);   
  12. }}
复制代码
ProcessAllWindowFunction

同样是开窗之后的处置惩罚函数,基于AllWindowedStream调用.process()时作为参数传入。
CoProcessFunction

合并(connect)两条流之后的处置惩罚函数,基于ConnectedStreams调用.process()时作为参数传入。
ProcessJoinFunction

间隔毗连(interval join)两条流之后的处置惩罚函数,基于IntervalJoined调用.process()时作为参数传入。
BroadcastProcessFunction

广播毗连流处置惩罚函数,基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播毗连流”BroadcastConnectedStream,是一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做毗连(conncet)之后的产物。
  1. public class BatteryRuntimeConnectProcessFunction extends BroadcastProcessFunction<BatteryRuntimeDto, BatteryPutTakeLogDataSourceDto, BatteryRuntimeFlinkDto> {
  2. // 状态
  3. MapStateDescriptor<String, BatteryInBoxStatusDto> descriptor =   new MapStateDescriptor<>("boxInStatus", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint<BatteryInBoxStatusDto>(){}));   
  4. @Override
  5. public void processElement(BatteryRuntimeDto batteryRuntimeDto, ReadOnlyContext readOnlyContext, Collector<BatteryRuntimeFlinkDto> collector) throws Exception {   
  6. // dosometing
  7. }
  8. @Override   
  9. public void processBroadcastElement(BatteryPutTakeLogDataSourceDto batteryPutTakeLogDataSourceDto, Context context, Collector<BatteryRuntimeFlinkDto> collector) throws Exception {
  10. // dosometing
  11. }
复制代码
KeyedBroadcastProcessFunction

按键分区的广播毗连流处置惩罚函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与BroadcastProcessFunction差异的是,这时的广播毗连流,是一个KeyedStream与广播流(BroadcastStream)做毗连之后的产物。
输出算子(sink)

输出算子,就是经过一系列处置惩罚算子后的数据输出到某个位置。例如:kafka,redis,数据库等等。
KafkaSink

  1. DataStream stream...;
  2. KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
  3. // 指定 kafka 的地址和端口
  4. .setBootstrapServers("kafka地址和端口")
  5. // 指定序列化器:指定Topic名称、具体的序列化
  6. .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic名称") .setValueSerializationSchema(new SimpleStringSchema()) .build() )
  7. /**
  8. * EXACTLY_ONCE: 精准一次投送。这是最严格,最理想的数据投送保证。数据不丢失不重复。
  9. * AT_LEAST_ONCE: 至少一次投送。数据保证不丢失,但可能会重复。
  10. * NONE: 无任何额外机制保证。数据有可能丢失或者重复。
  11. */
  12. // sink设置保证级别为 至少一次投送。数据保证不丢失,但可能会重复
  13. .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); stream.sinkTo(kafkaSink);
复制代码
JDBCSink

  1. DataStream<UserDto> reduceStream...;
  2. // 构建jdbc sink
  3. SinkFunction<UserDto> jdbcSink = JdbcSink.sink(
  4. // 数据插入sql语句
  5. "insert into user (`name`, `age`) values(?, ?)",
  6. new JdbcStatementBuilder<UserDto>() {
  7. @Override
  8. // 字段映射配置
  9. public void accept(PreparedStatement pStmt, UserDto userDto) throws SQLException {
  10. pStmt.setString(1, userDto.getUserName());
  11. pStmt.setInt(2, userDto.getAge()); } },
  12. JdbcExecutionOptions
  13. .builder()
  14. // 批次大小,条数
  15. .withBatchSize(10)
  16. // 批次最大等待时间
  17. .withBatchIntervalMs(5000)
  18. // 重复次数
  19. .withMaxRetries(1) .build(),
  20. // jdbc信息配置
  21. new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  22. .withDriverName("com.mysql.jdbc.Driver")
  23. .withUrl("数据库地址")
  24. .withUsername("root")
  25. .withPassword("password")
  26. .build() );
  27. // 添加jdbc sink
  28. reduceStream.addSink(jdbcSink);
复制代码
其他方式的sink: File、MongoDB、RabbitMQ、Elasticsearch、Apache Pulsar 等利用方式,可参考官方文档(Apache Flink Documentation)。
Flink 相关依靠:

  1. <dependency>
  2.     <groupId>org.apache.flink</groupId>
  3.     <artifactId>flink-java</artifactId>
  4.     <version>1.17.0</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>org.apache.flink</groupId>
  8.     <artifactId>flink-streaming-java</artifactId>
  9.     <version>1.17.0</version>
  10. </dependency>
  11. <dependency>
  12.     <groupId>org.apache.flink</groupId>
  13.     <artifactId>flink-runtime-web</artifactId>
  14.     <version>1.17.0</version>
  15. </dependency>
  16. <dependency>
  17.     <groupId>org.apache.flink</groupId>
  18.     <artifactId>flink-clients</artifactId>
  19.     <version>1.17.0</version>
  20. </dependency>
  21. <!-- File连接器 -->
  22. <dependency>
  23.     <groupId>org.apache.flink</groupId>
  24.     <artifactId>flink-connector-files</artifactId>
  25.     <version>1.17.0</version>
  26. </dependency>
  27. <!-- kafka连接器 -->
  28. <dependency>
  29.     <groupId>org.apache.flink</groupId>
  30.     <artifactId>flink-connector-kafka</artifactId>
  31.     <version>1.17.0</version>
  32. </dependency>
  33. <!-- jdbc连接器 -->
  34. <dependency>
  35.     <groupId>org.apache.flink</groupId>
  36.     <artifactId>flink-connector-jdbc</artifactId>
  37.     <version>1.16.0</version>
  38. </dependency>
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

守听

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表