大数据-118 - Flink DataSet 基本先容 核心特性 创建、转换、输出等 ...

打印 上一主题 下一主题

主题 974|帖子 974|积分 2922

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:



  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)
章节内容

上节我们完成了如下的内容:


  • Flink Sink JDBC
  • Flink Sink Kafka

注意事项

DataSetAPI 和 DataStream API一样有三个部分构成,各部分的作用对应一致,此处不再赘述。

FlinkDataSet

在 Apache Flink 中,DataSet API 是 Flink 批处置惩罚的核心接口,它紧张用于处置惩罚静态数据集。虽然 Flink 的 DataStream API 被广泛用于流式数据处置惩罚,但 DataSet API 实用于大规模批处置惩罚场景,如数据清洗、ETL、分析等。虽然近年来 Flink 更多地向流处置惩罚方向发展,但批处置惩罚仍然是数据处置惩罚中的一个紧张场景。
DataSource

对DataSet批处置惩罚而言,较为频繁的操作是读取HDFS中的文件数据,因为这里紧张先容两个 DataSource 组件:


  • 基于聚集:fromCollection 紧张是为了方便测试
  • 基于文件:readTextFile,基于HDFS中的数据举行盘算分析
基本概念

Flink 的 DataSet API 是一个功能强大的批处置惩罚 API,专为处置惩罚静态、离线数据集计划。DataSet 中的数据是有限的,处置惩罚时系统会先等待整个数据集加载完毕。DataSet 可以通过多种方式创建,例如从文件、数据库、聚集等加载数据,然后通过一系列转换操作(如 map、filter、join 等)举行处置惩罚。
核心特性



  • 支持丰富的转换操作。
  • 提供多种输入输出数据源。
  • 支持复杂的数据类型,包括基本类型、元组、POJO、列表等。
  • 支持优化计划,例如通过 cost-based optimizer 来优化查询执行计划。
DataSet 创建

在 Flink 中,可以通过多种方式创建 DataSet。以下是常见的数据源:
从本地文件读取

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. DataSet<String> text = env.readTextFile("path/to/file");
复制代码
从 CSV 文件读取

  1. DataSet<Tuple3<Integer, String, Double>> csvData = env.readCsvFile("path/to/file.csv")
  2.     .types(Integer.class, String.class, Double.class);
复制代码
从聚集中创建

  1. List<Tuple2<String, Integer>> data = Arrays.asList(
  2.     new Tuple2<>("Alice", 1),
  3.     new Tuple2<>("Bob", 2)
  4. );
  5. DataSet<Tuple2<String, Integer>> dataSet = env.fromCollection(data);
复制代码
从数据库中读取

可以通过自定义的输入格式(如 JDBC 输入格式)从数据库中读取数据,虽然 Flink 本身并没有内置 JDBC 源的批处置惩罚 API,但可以通过自定义实现。
DataSet 的转换操作(Transformation)

Flink 的 DataSet API 提供了丰富的转换操作,可以对数据举行各种变动,以下是常用的转换操作:


Map

将 DataSet 中的每一条记录举行映射操作,天生新的 DataSet。
  1. DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
  2. DataSet<Integer> squaredNumbers = numbers.map(n -> n * n);
复制代码
Filter

过滤掉不满足条件的记录。
  1. DataSet<Integer> evenNumbers = numbers.filter(n -> n % 2 == 0);
复制代码
FlatMap

雷同于 map,但答应一条记录天生多条输出记录。
  1. DataSet<String> lines = env.fromElements("hello world", "flink is great");
  2. DataSet<String> words = lines.flatMap((line, collector) -> {
  3.     for (String word : line.split(" ")) {
  4.         collector.collect(word);
  5.     }
  6. });
复制代码
Reduce

将数据集根据某种聚合逻辑举行合并
  1. DataSet<Integer> sum = numbers.reduce((n1, n2) -> n1 + n2);
复制代码
GroupBy 和 Reduce

对数据集举行分组,然后在每个组上执行聚合操作
  1. DataSet<Tuple2<String, Integer>> wordCounts = words
  2.     .map(word -> new Tuple2<>(word, 1))
  3.     .groupBy(0)
  4.     .reduce((t1, t2) -> new Tuple2<>(t1.f0, t1.f1 + t2.f1));
复制代码
Join

雷同于 SQL 中的毗连操作,毗连两个 DataSet。
  1. DataSet<Tuple2<Integer, String>> persons = env.fromElements(
  2.     new Tuple2<>(1, "Alice"),
  3.     new Tuple2<>(2, "Bob")
  4. );
  5. DataSet<Tuple2<Integer, String>> cities = env.fromElements(
  6.     new Tuple2<>(1, "Berlin"),
  7.     new Tuple2<>(2, "Paris")
  8. );
  9. DataSet<Tuple2<String, String>> personWithCities = persons.join(cities)
  10.     .where(0)
  11.     .equalTo(0)
  12.     .with((p, c) -> new Tuple2<>(p.f1, c.f1));
复制代码
DataSet 输出

DataSet API 提供多种方式将数据写出到外部系统:
写入文件

  1. wordCounts.writeAsCsv("output/wordcounts.csv", "\n", ",");
复制代码
写入数据库

虽然 DataSet API 没有直接提供 JDBC Sink,可以通过自定义 Sink 实现写入数据库功能。
打印控制台

  1. wordCounts.print();
复制代码
批处置惩罚的优化

DataSet API 提供了优化机制,通过成本模子和执行计划的分析来优化任务执行。在 Flink 内部,编译器会根据任务定义的转换操作天生一个优化的执行计划,这个过程雷同于 SQL 查询优化器的工作原理。


  • DataSet 的分区:Flink 可以根据数据集的分区举行优化。例如,通过 partitionByHash 或 partitionByRange 来手动控制数据的分布方式。
  • DataSet 的缓存:可以通过 rebalance()、hashPartition() 等方法来均衡数据负载,以提高并行度和盘算服从。
DataSet API 的容错机制

Flink 的 DataSet API 提供了容错机制,支持在发生故障时重新执行失败的任务。虽然 DataSet API 没有像 DataStream 那样依赖于 Checkpoint 机制,但其批处置惩罚特性答应任务重新开始重新执行,确保数据处置惩罚的正确性。
DataSet 与 DataStream 的对比

DataSet API 与 DataStream API 之间有一些紧张的区别:

DataSet API 的将来

需要注意的是,Flink 的官方路线图中已经不再优先开发 DataSet API 的新特性,将来的紧张开发将集中在 DataStream API,乃至批处置惩罚功能都将通过 DataStream API 来实现。
因此,如果可能,建议新项目只管使用 DataStream API 来更换 DataSet API。
特别是 Flink 的 Table API 和 SQL API 也实用于批处置惩罚和流处置惩罚,这些高层 API 提供了更简洁的语法和更强的优化本领。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

勿忘初心做自己

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表