利用Apache Beam进行统一批处理与流处理

打印 上一主题 下一主题

主题 989|帖子 989|积分 2967

Apache Beam是一个开源的统一编程模子,用于界说和执行数据处理流水线,支持批处理和流处理。Beam旨在提供一个简单、可扩展且机动的框架,适用于各种数据处理任务。本文将具体先容如何利用Apache Beam进行批处理和流处理,并通过Java代码示例帮助新人理解。
1. Apache Beam简介

Apache Beam的核心概念包罗:


  • Pipeline:代表整个数据处理任务。
  • PCollection:代表数据集,可以是有限的(批处理)或无限的(流处理)。
  • PTransform:代表数据转换操作。
  • Runner:负责执行Pipeline,可以是当地执行或分布式执行(如Google Cloud Dataflow、Apache Flink等)。
2. 安装与配置

首先,需要在项目中添加Apache Beam的依赖。在Maven项目中,可以在pom.xml中添加以下依赖:
  1. <dependency>
  2.     <groupId>org.apache.beam</groupId>
  3.     <artifactId>beam-sdks-java-core</artifactId>
  4.     <version>2.36.0</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>org.apache.beam</groupId>
  8.     <artifactId>beam-runners-direct-java</artifactId>
  9.     <version>2.36.0</version>
  10. </dependency>
复制代码
3. 创建一个简单的批处理Pipeline

以下是一个简单的批处理示例,读取一个文本文件并计算每个单词的出现次数。
  1. import org.apache.beam.sdk.Pipeline;
  2. import org.apache.beam.sdk.io.TextIO;
  3. import org.apache.beam.sdk.options.PipelineOptions;
  4. import org.apache.beam.sdk.options.PipelineOptionsFactory;
  5. import org.apache.beam.sdk.transforms.Count;
  6. import org.apache.beam.sdk.transforms.FlatMapElements;
  7. import org.apache.beam.sdk.transforms.MapElements;
  8. import org.apache.beam.sdk.values.KV;
  9. import org.apache.beam.sdk.values.TypeDescriptors;
  10. public class WordCountBatch {
  11.     public static void main(String[] args) {
  12.         PipelineOptions options = PipelineOptionsFactory.create();
  13.         Pipeline pipeline = Pipeline.create(options);
  14.         pipeline
  15.             .apply(TextIO.read().from("path/to/input.txt"))
  16.             .apply(FlatMapElements.into(TypeDescriptors.strings())
  17.                 .via(line -> Arrays.asList(line.split("\\s+"))))
  18.             .apply(Count.perElement())
  19.             .apply(MapElements.into(TypeDescriptors.strings())
  20.                 .via(kv -> kv.getKey() + ": " + kv.getValue()))
  21.             .apply(TextIO.write().to("path/to/output"));
  22.         pipeline.run().waitUntilFinish();
  23.     }
  24. }
复制代码
代码解释:


  • 创建Pipeline:利用PipelineOptionsFactory.create()创建Pipeline选项,然后创建Pipeline实例。
  • 读取文件:利用TextIO.read().from("path/to/input.txt")读取输入文件。
  • 分割单词:利用FlatMapElements将每行文天职割成单词。
  • 计数:利用Count.perElement()计算每个单词的出现次数。
  • 格式化输出:利用MapElements将结果格式化为字符串。
  • 写入文件:利用TextIO.write().to("path/to/output")将结果写入输出文件。
  • 运行Pipeline:调用pipeline.run().waitUntilFinish()运行并等候Pipeline完成。
4. 创建一个简单的流处理Pipeline

以下是一个简单的流处理示例,从Kafka读取数据并计算每个单词的出现次数。
  1. import org.apache.beam.sdk.Pipeline;
  2. import org.apache.beam.sdk.io.kafka.KafkaIO;
  3. import org.apache.beam.sdk.options.PipelineOptions;
  4. import org.apache.beam.sdk.options.PipelineOptionsFactory;
  5. import org.apache.beam.sdk.transforms.Count;
  6. import org.apache.beam.sdk.transforms.FlatMapElements;
  7. import org.apache.beam.sdk.transforms.MapElements;
  8. import org.apache.beam.sdk.values.KV;
  9. import org.apache.beam.sdk.values.TypeDescriptors;
  10. import org.apache.kafka.common.serialization.StringDeserializer;
  11. public class WordCountStream {
  12.     public static void main(String[] args) {
  13.         PipelineOptions options = PipelineOptionsFactory.create();
  14.         Pipeline pipeline = Pipeline.create(options);
  15.         pipeline
  16.             .apply(KafkaIO.<String, String>read()
  17.                 .withBootstrapServers("localhost:9092")
  18.                 .withTopic("input-topic")
  19.                 .withKeyDeserializer(StringDeserializer.class)
  20.                 .withValueDeserializer(StringDeserializer.class)
  21.                 .withoutMetadata())
  22.             .apply(MapElements.into(TypeDescriptors.strings())
  23.                 .via(kv -> kv.getValue()))
  24.             .apply(FlatMapElements.into(TypeDescriptors.strings())
  25.                 .via(line -> Arrays.asList(line.split("\\s+"))))
  26.             .apply(Count.perElement())
  27.             .apply(MapElements.into(TypeDescriptors.strings())
  28.                 .via(kv -> kv.getKey() + ": " + kv.getValue()))
  29.             .apply(TextIO.write().to("path/to/output"));
  30.         pipeline.run().waitUntilFinish();
  31.     }
  32. }
复制代码
代码解释:


  • 创建Pipeline:利用PipelineOptionsFactory.create()创建Pipeline选项,然后创建Pipeline实例。
  • 读取Kafka数据:利用KafkaIO.read()从Kafka读取数据。
  • 提取值:利用MapElements提取Kafka记录的值。
  • 分割单词:利用FlatMapElements将每行文天职割成单词。
  • 计数:利用Count.perElement()计算每个单词的出现次数。
  • 格式化输出:利用MapElements将结果格式化为字符串。
  • 写入文件:利用TextIO.write().to("path/to/output")将结果写入输出文件。
  • 运行Pipeline:调用pipeline.run().waitUntilFinish()运行并等候Pipeline完成。
5. 总结

Apache Beam提供了一个统一的编程模子,使得批处理和流处理可以无缝切换。通过上述示例,我们展示了如何利用Beam进行简单的批处理和流处理任务。希望这些示例能帮助新人更好地理解和利用Apache Beam。
通过深入学习Beam的各种转换和IO操作,你可以构建更复杂和强大的数据处理流水线,满足各种业务需求。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

知者何南

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表