基础概念
Flink是一个框架和分布式处置惩罚引擎,用于对无界数据流和有界数据流举行有状态计算,它的核心目标是“数据流上的有状态计算”。
有界流和无界流
- 有界流:具有明白的开始和结束时间,数据量有限。适合利用批处置惩罚技术,可以在处置惩罚前将全部数据一次性读入内存举行处置惩罚。有界流通常用于历史数据分析、数据迁徙等场景。
- 无界流:没有明白的开始和结束时间,数据接二连三天生。由于数据是无穷且持续的,无界流需要实时处置惩罚,并且必须持续摄取和处置惩罚数据,不能等待全部数据到达后再举行处置惩罚。适合适用于流处置惩罚。
名词
源算子(source)
Flink可以从各种来源获取数据,然后构建DataStream举行转换处置惩罚。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。
- // 创建执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // 从集合读取数据
- DataStreamSource<Integer> collectionSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15));
- // 从文件中读取数据
- DataStreamSource<String> fileSource = env.fromSource(FileSource.forRecordStreamFormat(new TextLineInputFormat(),new Path("input/word.txt")).build(),WatermarkStrategy.noWatermarks(),"fileSource");
- // 从kafka读取数据
- DataStreamSource<String> kafkaSource = env.fromSource(KafkaSource.<String>builder()
- // kafka地址--可配置多个
- .setBootstrapServers("")
- // topic名称--可配置多个
- .setTopics("")
- // 消费组id
- .setGroupId("")
- // 反序列化方式
- .setValueOnlyDeserializer(new SimpleStringSchema())
- // kafka 消费偏移量 方式 earliest(一定从最早的开始消费)、latest(一定从最新的开始消费)或者手动设置偏移量 ,默认是earliest
- .setStartingOffsets(OffsetsInitializer.latest())
- // 水位线,自定义数据源算子名称
- .build(), WatermarkStrategy.noWatermarks(), "kafkaSource");
- // 从socket读取数据
- DataStreamSource<String> socketSource = env.socketTextStream("...","1234");
复制代码 基本转换算子
Map
对元素的数据类型和内容做转换。
- // 第一个参数为输入流,第二个参数为输出流
- SingleOutputStreamOperator<UserDto> userDataStream = kafkaSource.map(new MapFunction<String, UserDto>() {
- @Override
- public UserDto map(String message) throws Exception {
- return JSONObject.parseObject(message, UserDto.class);
- }
- });
复制代码 FlatMap
输入一个元素同时产生零个、一个或多个元素。
- // 第一个参数为输入流,第二个参数为输出流
- // 可做转换,可做条件过滤
- SingleOutputStreamOperator<UserDto> userDataStream =kafkaSource.flatMap(new FlatMapFunction<String, UserDto>() {
- @Override
- public void flatMap(String message, Collector<UserDto> collector) throws Exception {
- UserDto userDto = JSONObject.parseObject(message, UserDto.class);
- collector.collect(userDto);
- }
- });
复制代码 Filter
对数据源根据条件过滤数据,保留满足条件的数据
- // 过滤出年龄大于18的用户
- SingleOutputStreamOperator<UserDto> filterDataStream = userDataStream.filter(new FilterFunction<UserDto>() {
- @Override
- public boolean filter(UserDto userDto) throws Exception {
- return userDto.getAge() > 18;
- }
- });
复制代码 聚合算子
KeyBy
根据指定的字段(key),将数据划分到不相交的分区中。相同key的元素会被分到同一个分区中。
- // 将用户id一样的用户分到一个分区内
- KeyedStream<UserDto, Integer> userKeyedStream = userDataStream.keyBy(new KeySelector<UserDto, Integer>() {
- @Override
- public Integer getKey(UserDto userDto) throws Exception {
- return userDto.getId();
- }
- });
复制代码 Reduce (仅支持同类型的数据)
对流的数据,来一条计算一条,将当前元素和上一次聚合后的数据组合,输出新值,并将新值举行保存,作为下一次计算的元素。
聚合前和聚合后的数据类型是一致的。
当第一条数据进来时,不会触发计算。
- // 计算一个用户的订单总价格
- SingleOutputStreamOperator<UserDto> reduce = userKeyedStream.reduce(new ReduceFunction<UserDto>() {
- @Override
- public UserDto reduce(UserDto t1, UserDto t2) throws Exception {
- int totalPrice = t1.getTotalPrice() + t2.getOrderPrice();
- UserDto userDto = new UserDto();
- userDto.setId(t1.getId());
- userDto.setAge(t1.getAge());
- userDto.setTotalPrice(totalPrice);
- return userDto;
- }
- });
复制代码 Aggregate (支持差异类型的数据)
- SingleOutputStreamOperator<String> aggregate = windowedStream.aggregate(new AggregateFunction<UserDto, Integer, String>() {
- /**
- * 创建累加器,就是初始化累加器
- * @return
- */
- @Override
- public Integer createAccumulator() {
- return 0;
- }
- /**
- * 计算逻辑或者是聚合逻辑
- * @param userDto
- * @param beforeData
- * @return
- */
- @Override
- public Integer add(UserDto userDto, Integer beforeData) {
- return beforeData + userDto.getAge();
- }
- /**
- * 获取最终结果,窗口触发时输出
- * @param integer
- * @return
- */
- @Override
- public String getResult(Integer integer) {
- return "计算结束,最终结果为:" + integer.toString();
- }
- /**
- * 只有会话窗口才会使用到
- * @param integer
- * @param acc1
- * @return
- */
- @Override
- public Integer merge(Integer integer, Integer acc1) {
- return 0;
- }
- });
复制代码 窗口(window)
把流切割成有限巨细的多个“存储桶”;每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据举行计算处置惩罚。窗口不是静态天生的,是动态创建的。当这个窗口范围的进入第一条数据时,才会创建对应的窗口。
滚动窗口
有固定的巨细,是一种对数据举行“匀称切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。每个数据都会分配到一个窗口,而且只会属于一个窗口。滚动窗口可以基于时间定义,也可以基于数据的个数定义,需要的参数只有一个,就是窗口的巨细。
- // 分组
- KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(p -> p.f0);
- // 基于处理时间开窗,窗口长度为10s,窗口开始时间为 窗口长度整数倍向下取整,结束时间为开始时间+窗口长度
- WindowedStream<Tuple2<String, Integer>, String, TimeWindow> tumblingProcessingTimeStream = keyByStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
- // 基于事件时间开窗,窗口长度为10s,窗口开始时间为数据源事件时间,结束时间为开始时间+窗口长度
- WindowedStream<Tuple2<String, Integer>, String, TimeWindow> tumblingEventTimeStream = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(10)));
- // 基于次数开窗
- WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> countWindowStream = keyedStream.countWindow(10);
复制代码 滑动窗口
巨细是固定的,但是窗口之间不是收尾相接的,而是可以“错开”肯定的位置。定义滑动窗口的参数有2个:窗口巨细和滑动步长,滑动步长代表了窗口计算的频率。因此,如果 slide 小于窗口巨细,滑动窗口可以答应窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。
- // 分组
- KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(p -> p.f0);
- // 基于处理事件开窗,窗口长度为10s,滑动步长为1s
- WindowedStream<Tuple2<String, Integer>, String, TimeWindow> slidingProcessingTimeWindowStream = keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(1)));
- // 基于事件事件开窗,窗口长度为10s,滑动步长为1s
- WindowedStream<Tuple2<String, Integer>, String, TimeWindow> slidingEventTimeWindowStream = keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(1)));
- // 基于次数开窗
- WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> countWindowStream = keyedStream.countWindow(10, 1);
复制代码 会话窗口
是基于会话来对数据举行分组的。会话窗口只能基于时间来定义。会话窗口中,最重要的参数就是会话的超时时间,也就是两个会话窗口之间的最小距离。如果相邻两个数据到来的时间间隔(Gap)小于指定的巨细(size),那说明还在保持会话,他们就属于同一个窗口;如果gap大于Size,那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了。会话窗口的长度不固定,起始和结束时间也不是确定的,各个分区之间窗口没有任何关联。会话窗口之间肯定不会重叠的,而且会保留至少size的间隔。
- // 分组
- KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(p -> p.f0);
- // 基于处理时间开窗,会话间隔时间为10s
- WindowedStream<Tuple2<String, Integer>, String, TimeWindow> sessionWindow = keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));
- // 基于事件时间开窗,会话间隔时间为10s
- WindowedStream<Tuple2<String, Integer>, String, TimeWindow> sessionWindow = keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(10)));
复制代码 全局窗口
这种窗口全局有效,会把相同key的全部数据都分配到同一个窗口中。这种窗口没有结束的时间,默认是不会触发计算的。如果盼望它能对数据举行计算,还需要自定义“触发器”(Trigger)。全局窗口没有结束的时间点,所以一般在盼望做更加机动的窗口处置惩罚时自定义利用。Flink中的计数窗口底层就是用全局窗口实现的。
窗口触发器(trigger)
定义了窗口何时被触发并决定触发后的行为(如举行窗口数据的计算或清理)。
EventTimeTrigger
基于变乱时间和水印机制来触发窗口计算。当窗口的最大时间戳小于等于当前的水印时,立刻触发窗口计算。
ProcessingTimeTrigger
基于处置惩罚时间(即呆板的体系时间)来触发窗口计算。当处置惩罚时间达到窗口的结束时间时,触发窗口计算。
CountTrigger
根据窗口内元素的数量来触发计算。当窗口内的元素数量达到预设的阈值时,触发窗口计算。
关键方法
- onElement(T element, long timestamp, W window, TriggerContext ctx)
当元素被添加到窗口时调用,用于注册定时器或更新窗口状态。
- onEventTime(long time, W window, TriggerContext ctx)
当变乱时间计时器触发时调用,用于处置惩罚变乱时间相关的触发逻辑。
- onProcessingTime(long time, W window, TriggerContext ctx)
当处置惩罚时间计时器触发时调用,用于处置惩罚处置惩罚时间相关的触发逻辑。
- onMerge(W window, OnMergeContext ctx)
当两个窗口合并时调用,用于合并窗口的状态和定时器。
- clear(W window, TriggerContext ctx)
当窗口被删除时调用,用于清理窗口的状态和定时器。
- @Override
- public TriggerResult onElement(BatteryRuntimeFlinkDto batteryRuntimeDto, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
- ReducingState<Long> countState = triggerContext.getPartitionedState(countStateDescriptor);
-
- }
- @Override
- public TriggerResult onProcessingTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
- log.info("窗口清除定时器触发,清除计数器和定时器,并关窗");
- this.clear(globalWindow, triggerContext);
- return TriggerResult.PURGE;
- }
- @Override
- public TriggerResult onEventTime(long time, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
- return TriggerResult.CONTINUE;
- }
- @Override
- public void clear(GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
- // 清除计数器
- triggerContext.getPartitionedState(countStateDescriptor).clear();
- // 清除定时器
- triggerContext.deleteProcessingTimeTimer(triggerContext.getPartitionedState(processTimerDescription).get());
- }
复制代码 处置惩罚算子(process)
ProcessFunction
最基本的处置惩罚函数,基于DataStream直接调用.process()时作为参数传入。
- public class CabinetDetailProcessFunction extends ProcessFunction<CabinetDetailDto, BatteryPutTakeLogDataSourceDto> {
- //往redis中写入
- private transient RedisService redisService;
- private String platform;
- public CabinetDetailProcessFunction(String platform) {
- this.platform = platform;
- }
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- this.redisService = ApplicationContextHolder.getBean(RedisService.class);
- }
- @Override
- public void processElement(CabinetDetailDto cabinetDetailDto, Context context, Collector<BatteryPutTakeLogDataSourceDto> collector) throws Exception {
-
- }
- }
复制代码 KeyedProcessFunction
对流按键分区后的处置惩罚函数,基于KeyedStream调用.process()时作为参数传入。
ProcessWindowFunction
开窗之后的处置惩罚函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。
- public class BatteryRuntimeProcessFunction extends ProcessWindowFunction<BatteryRuntimeFlinkDto, BatteryRuntimeFlinkDto, String, GlobalWindow> {
- @Override
- public void process(String s, Context context, Iterable<BatteryRuntimeFlinkDto> iterable, Collector<BatteryRuntimeFlinkDto> collector) throws Exception {
- List<BatteryRuntimeFlinkDto> batteryRuntimeDtos = new ArrayList<>();
- iterable.forEach(p -> batteryRuntimeDtos.add(p));
- if (CollectionUtils.isEmpty(batteryRuntimeDtos)) {
- return;
- }
- BatteryRuntimeFlinkDto batteryRuntimeFlinkDto =
- batteryRuntimeDtos.get(0);
- collector.collect(batteryRuntimeFlinkDto);
- }}
复制代码 ProcessAllWindowFunction
同样是开窗之后的处置惩罚函数,基于AllWindowedStream调用.process()时作为参数传入。
CoProcessFunction
合并(connect)两条流之后的处置惩罚函数,基于ConnectedStreams调用.process()时作为参数传入。
ProcessJoinFunction
间隔毗连(interval join)两条流之后的处置惩罚函数,基于IntervalJoined调用.process()时作为参数传入。
BroadcastProcessFunction
广播毗连流处置惩罚函数,基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播毗连流”BroadcastConnectedStream,是一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做毗连(conncet)之后的产物。
- public class BatteryRuntimeConnectProcessFunction extends BroadcastProcessFunction<BatteryRuntimeDto, BatteryPutTakeLogDataSourceDto, BatteryRuntimeFlinkDto> {
- // 状态
- MapStateDescriptor<String, BatteryInBoxStatusDto> descriptor = new MapStateDescriptor<>("boxInStatus", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint<BatteryInBoxStatusDto>(){}));
- @Override
- public void processElement(BatteryRuntimeDto batteryRuntimeDto, ReadOnlyContext readOnlyContext, Collector<BatteryRuntimeFlinkDto> collector) throws Exception {
- // dosometing
- }
- @Override
- public void processBroadcastElement(BatteryPutTakeLogDataSourceDto batteryPutTakeLogDataSourceDto, Context context, Collector<BatteryRuntimeFlinkDto> collector) throws Exception {
- // dosometing
- }
复制代码 KeyedBroadcastProcessFunction
按键分区的广播毗连流处置惩罚函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与BroadcastProcessFunction差异的是,这时的广播毗连流,是一个KeyedStream与广播流(BroadcastStream)做毗连之后的产物。
输出算子(sink)
输出算子,就是经过一系列处置惩罚算子后的数据输出到某个位置。例如:kafka,redis,数据库等等。
KafkaSink
- DataStream stream...;
- KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
- // 指定 kafka 的地址和端口
- .setBootstrapServers("kafka地址和端口")
- // 指定序列化器:指定Topic名称、具体的序列化
- .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic名称") .setValueSerializationSchema(new SimpleStringSchema()) .build() )
- /**
- * EXACTLY_ONCE: 精准一次投送。这是最严格,最理想的数据投送保证。数据不丢失不重复。
- * AT_LEAST_ONCE: 至少一次投送。数据保证不丢失,但可能会重复。
- * NONE: 无任何额外机制保证。数据有可能丢失或者重复。
- */
- // sink设置保证级别为 至少一次投送。数据保证不丢失,但可能会重复
- .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); stream.sinkTo(kafkaSink);
复制代码 JDBCSink
- DataStream<UserDto> reduceStream...;
- // 构建jdbc sink
- SinkFunction<UserDto> jdbcSink = JdbcSink.sink(
- // 数据插入sql语句
- "insert into user (`name`, `age`) values(?, ?)",
- new JdbcStatementBuilder<UserDto>() {
- @Override
- // 字段映射配置
- public void accept(PreparedStatement pStmt, UserDto userDto) throws SQLException {
- pStmt.setString(1, userDto.getUserName());
- pStmt.setInt(2, userDto.getAge()); } },
- JdbcExecutionOptions
- .builder()
- // 批次大小,条数
- .withBatchSize(10)
- // 批次最大等待时间
- .withBatchIntervalMs(5000)
- // 重复次数
- .withMaxRetries(1) .build(),
- // jdbc信息配置
- new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
- .withDriverName("com.mysql.jdbc.Driver")
- .withUrl("数据库地址")
- .withUsername("root")
- .withPassword("password")
- .build() );
- // 添加jdbc sink
- reduceStream.addSink(jdbcSink);
复制代码 其他方式的sink: File、MongoDB、RabbitMQ、Elasticsearch、Apache Pulsar 等利用方式,可参考官方文档(Apache Flink Documentation)。
Flink 相关依靠:
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>1.17.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java</artifactId>
- <version>1.17.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime-web</artifactId>
- <version>1.17.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>1.17.0</version>
- </dependency>
- <!-- File连接器 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-files</artifactId>
- <version>1.17.0</version>
- </dependency>
- <!-- kafka连接器 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka</artifactId>
- <version>1.17.0</version>
- </dependency>
- <!-- jdbc连接器 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-jdbc</artifactId>
- <version>1.16.0</version>
- </dependency>
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |