Apache Flink 超简朴讲解,Java中的分布式流盘算框架

打印 上一主题 下一主题

主题 853|帖子 853|积分 2559

嘿,各人好!今天咱们聊聊 Java 生态中一个超酷炫的工具——Apache Flink。它就像一把瑞士军刀,专门用来处置惩罚那些海量的、及时活动的数据流。想象一下,如果你有一条永不停息的数据河道,而你想要在这条河里捞出有效的信息,Flink 就是你的好帮忙!
戳底部手刺,一起变现

一、为啥有 Flink?

在互联网时代,数据像潮水一样涌来,企业不仅必要存储这些数据,更希望可以或许迅速从中发掘出价值。传统的批处置惩罚方法有点跟不上节奏了,因为它们处置惩罚完一批数据得等下一批,这就导致了延迟。于是,一些早期的流处置惩罚框架出现了,比如 Storm 和 Spark Streaming。但是这些框架也有些小问题,比如状态管理不敷机动,大概在保证数据一致性方面做得不是特别好。
Flink 就是在这样的背景下诞生的。它的开辟者们想:“如果能有个东西既能做到及时处置惩罚,又能确保数据不丢不乱,那该多好啊!”所以他们造出了 Flink。这个家伙从一开始就被计划成可以处置惩罚无穷的数据流,并且可以或许保证即使在网络故障或节点瓦解的环境下也能保持数据的一致性和准确性。
二、Flink 都干啥用?

说白了,Flink 可以用来做许多风趣的变乱:

  • 及时数据分析:比如说你想知道现在有多少人在看你的直播,大概哪个产物最受欢迎,Flink 可以帮你及时统计。
  • 物联网 (IoT) 数据处置惩罚:想象一下智能家居系统里的温度传感器、湿度传感器,Flink 可以及时收集和分析这些数据,告诉你家里是不是太热了大概太湿了。
  • 金融风控:银行和支付平台可以用 Flink 来检测异常生意业务,防止诈骗。
  • 推荐系统:电商网站可以根据用户的欣赏历史给出个性化的商品推荐,让购物体验更贴心。
  • A/B 测试与实验:测试差别的广告计谋效果怎样,看看哪种方式最吸引人。
三、Flink 的基础知识

1. DataStream API

DataStream API 是 Flink 的核心接口之一,就比如是搭建乐高积木时使用的各种形状的小块。你可以用它来做许多变乱,比如把数据映射成新的情势、过滤掉不必要的东西、按某个字段分组等等。下面是一个简朴的例子,我们读取一段文本,然后盘算每个单词出现的次数。
  1. import org.apache.flink.api.common.functions.FlatMapFunction;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.util.Collector;
  6. public class SimpleWordCount {
  7.     public static void main(String[] args) throws Exception {
  8.         // 创建执行环境
  9.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  10.         // 假设有一个文本文件作为输入源
  11.         DataStream<String> text = env.fromElements("hello world", "hello flink");
  12.         // 解析数据,按空格分割字符串,并映射为 (word, 1) 的键值对
  13.         DataStream<Tuple2<String, Integer>> counts =
  14.             text
  15.                 .flatMap(new Tokenizer())
  16.                 .keyBy(value -> value.f0) // 按第一个字段分组
  17.                 .sum(1);                  // 对第二个字段求和
  18.         // 输出结果到控制台
  19.         counts.print();
  20.         // 执行程序
  21.         env.execute("Simple Word Count");
  22.     }
  23.     // 自定义 FlatMapFunction 实现,用于将每行文本拆分为多个单词
  24.     public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
  25.         @Override
  26.         public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
  27.             // 将每一行按照空格分割成单词
  28.             String[] tokens = value.toLowerCase().split("\\W+");
  29.             // 发射每个单词作为一个元组 (word, 1)
  30.             for (String token : tokens) {
  31.                 if (token.length() > 0) {
  32.                     out.collect(new Tuple2<>(token, 1));
  33.                 }
  34.             }
  35.         }
  36.     }
  37. }
复制代码
戳底部手刺,一起变现

2. Windowing

有时候我们必要对一段时间内的数据举行聚合操作,比如盘算过去一分钟内发生了多少次点击。这就是窗口函数的作用。Flink 支持多种范例的窗口,比如滚动窗口(固定时间间隔)、滑动窗口(重叠的时间段)和会话窗口(基于活动间隙)。下面的例子展示了怎样使用滚动窗口来盘算每个单词在过去5秒内出现的次数。
  1. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  2. import org.apache.flink.api.common.functions.AggregateFunction;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.api.windowing.time.Time;
  7. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  8. import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
  9. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  10. import org.apache.flink.util.Collector;
  11. public class WordCountWithWindow {
  12.     public static void main(String[] args) throws Exception {
  13.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14.         // 模拟一个持续不断的文本流
  15.         DataStream<String> text = env.fromElements("hello world", "hello flink", "hello again");
  16.         // 设置事件时间戳和水印策略(这里简化处理)
  17.         DataStream<Tuple2<String, Integer>> wordCounts = text
  18.             .flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
  19.                 for (String word : line.split(" ")) {
  20.                     if (!word.isEmpty()) {
  21.                         out.collect(Tuple2.of(word, 1));
  22.                     }
  23.                 }
  24.             })
  25.             .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps()
  26.                 .withTimestampAssigner((event, timestamp) -> System.currentTimeMillis()))
  27.             .keyBy(value -> value.f0) // 按单词分组
  28.             .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 定义5秒的滚动窗口
  29.             .aggregate(new WordCountAggregate(), new ResultFormatter());
  30.         wordCounts.print();
  31.         env.execute("Word Count with Windows");
  32.     }
  33.     // 自定义聚合函数
  34.     public static class WordCountAggregate implements AggregateFunction<Tuple2<String, Integer>, Integer, Tuple2<String, Integer>> {
  35.         @Override
  36.         public Integer createAccumulator() {
  37.             return 0;
  38.         }
  39.         @Override
  40.         public Integer add(Tuple2<String, Integer> value, Integer accumulator) {
  41.             return accumulator + value.f1;
  42.         }
  43.         @Override
  44.         public Tuple2<String, Integer> getResult(Integer accumulator) {
  45.             return Tuple2.of("count", accumulator);
  46.         }
  47.         @Override
  48.         public Integer merge(Integer a, Integer b) {
  49.             return a + b;
  50.         }
  51.     }
  52.     // 格式化输出结果
  53.     public static class ResultFormatter implements WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
  54.         @Override
  55.         public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) {
  56.             int sum = 0;
  57.             for (Tuple2<String, Integer> in : input) {
  58.                 sum += in.f1;
  59.             }
  60.             out.collect(Tuple2.of(key, sum));
  61.         }
  62.     }
  63. }
复制代码
3. State Management

Flink 还允许你在运行过程中保存临时的状态信息,这就像玩游戏时存档一样。比如说,如果你正在统计某个用户的行为模式,那么你可以把已经看到的数据暂时存起来,比及后面再继续处置惩罚。下面是使用 ValueState 来跟踪每个单词的最新计数。
  1. import org.apache.flink.api.common.state.ValueState;
  2. import org.apache.flink.api.common.state.ValueStateDescriptor;
  3. import org.apache.flink.api.common.functions.RichFlatMapFunction;
  4. import org.apache.flink.api.java.tuple.Tuple2;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.util.Collector;
  8. public class WordCountWithState {
  9.     public static void main(String[] args) throws Exception {
  10.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  11.         // 模拟一个持续不断的文本流
  12.         DataStream<String> text = env.fromElements("hello world", "hello flink", "hello again");
  13.         // 使用 RichFlatMapFunction 来访问状态
  14.         DataStream<Tuple2<String, Integer>> wordCounts = text
  15.             .flatMap(new Tokenizer())
  16.             .keyBy(value -> value.f0) // 按单词分组
  17.             .flatMap(new CountWithState());
  18.         wordCounts.print();
  19.         env.execute("Word Count with State");
  20.     }
  21.     // 自定义 FlatMapFunction 实现,用于将每行文本拆分为多个单词
  22.     public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
  23.         @Override
  24.         public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
  25.             for (String word : value.split(" ")) {
  26.                 if (!word.isEmpty()) {
  27.                     out.collect(Tuple2.of(word, 1));
  28.                 }
  29.             }
  30.         }
  31.     }
  32.     // 使用 ValueState 来跟踪每个单词的计数
  33.     public static final class CountWithState extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
  34.         private transient ValueState<Integer> countState;
  35.         @Override
  36.         public void open(Configuration parameters) {
  37.             ValueStateDescriptor<Integer> descriptor =
  38.                 new ValueStateDescriptor<>("word-count", Integer.class);
  39.             countState = getRuntimeContext().getState(descriptor);
  40.         }
  41.         @Override
  42.         public void flatMap(Tuple2<String, Integer> input, Collector<Tuple2<String, Integer>> out) throws Exception {
  43.             Integer currentCount = countState.value();
  44.             if (currentCount == null) {
  45.                 currentCount = 0;
  46.             }
  47.             currentCount += input.f1;
  48.             countState.update(currentCount);
  49.             out.collect(new Tuple2<>(input.f0, currentCount));
  50.         }
  51.     }
  52. }
复制代码
4. Checkpointing

为了防止不测发生,Flink 提供了一个叫做 checkpoint 的功能,就像是给步伐拍快照一样。每隔一段时间,它会把当前的状态保存下来,万一之后出错了,就可以直接从近来的一个 checkpoint 开始重新运行,而不会丢失任何数据。默认环境下,Flink 会主动为你设置 checkpoint,但你也可以自定义设置,比如多久拍一次快照,大概把快照存在哪里。
  1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  2. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  3. public class EnableCheckpointing {
  4.     public static void main(String[] args) throws Exception {
  5.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  6.         // 启用 checkpoint 功能,每隔5分钟创建一次检查点
  7.         env.enableCheckpointing(5 * 60 * 1000);
  8.         // 设置 checkpoint 存储位置
  9.         env.setStateBackend(new FsStateBackend("file:///path/to/checkpoints"));
  10.         // ... 其他代码 ...
  11.         
  12.         env.execute("Enable Checkpointing Example");
  13.     }
  14. }
复制代码
5. Fault Tolerance

最后,Flink 还内置了许多容错机制,确保即使部分组件挂掉了也不会影响整个系统的正常工作。比方,它支持使命重试、失败隔离等功能,这些都是为了让我们的应用更加稳定可靠。
戳底部手刺,一起变现


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

何小豆儿在此

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表