ToB企服应用市场:ToB评测及商务社交产业平台

标题: 云小课|MRS数据分析-通过Spark Streaming作业消费Kafka数据 [打印本页]

作者: 张春    时间: 2023-2-24 01:52
标题: 云小课|MRS数据分析-通过Spark Streaming作业消费Kafka数据
阅识风云是华为云信息大咖,擅长将复杂信息多元化呈现,其出品的一张图(云图说)、深入浅出的博文(云小课)或短视频(云视厅)总有一款能让您快速上手华为云。更多精彩内容请单击此处。
摘要:Spark Streaming是一种构建在Spark上的实时计算框架,扩展了Spark处理大规模流式数据的能力。本文介绍如何使用MRS集群运行Spark Streaming作业消费Kafka数据。
本文分享自华为云社区《【云小课】EI第48课 MRS数据分析-通过Spark Streaming作业消费Kafka数据》,作者: 阅识风云 。
Spark是分布式批处理框架,提供分析挖掘与迭代式内存计算能力,支持多种语言(Scala/Java/Python)的应用开发。
Spark Streaming是一种构建在Spark上的实时计算框架,扩展了Spark处理大规模流式数据的能力。本文介绍如何使用MRS集群运行Spark Streaming作业消费Kafka数据。
在本案例中,假定某个业务Kafka每1秒就会收到1个单词记录。基于业务需要,开发的Spark应用程序实现实时累加计算每个单词的记录总数的功能。
本案例基本操作流程如下所示:
场景描述

Spark提供分析挖掘与迭代式内存计算能力, 适用以下场景:
当前Spark支持两种数据处理方式:Direct Streaming和Receiver方式。
Direct Streaming方式主要通过采用Direct API对数据进行处理。以Kafka Direct接口为例,与启动一个Receiver来连续不断地从Kafka中接收数据并写入到WAL中相比,Direct API简单地给出每个batch区间需要读取的偏移量位置。然后,每个batch的Job被运行,而对应偏移量的数据在Kafka中已准备好。这些偏移量信息也被可靠地存储在checkpoint文件中,应用失败重启时可以直接读取偏移量信息。
Direct Kafka接口数据传输
需要注意的是,Spark Streaming可以在失败后重新从Kafka中读取并处理数据段。然而,由于语义仅被处理一次,重新处理的结果和没有失败处理的结果是一致的。
因此,Direct API消除了需要使用WAL和Receivers的情况,且确保每个Kafka记录仅被接收一次,这种接收更加高效。使得Spark Streaming和Kafka可以很好地整合在一起。总体来说,这些特性使得流处理管道拥有高容错性、高效性及易用性,因此推荐使用Direct Streaming方式处理数据。
在一个Spark Streaming应用开始时(也就是Driver开始时),相关的StreamingContext(所有流功能的基础)使用SparkContext启动Receiver成为长驻运行任务。这些Receiver接收并保存流数据到Spark内存中以供处理。用户传送数据的生命周期如图1-2所示:
数据传输生命周期
华为云MapReduce服务提供了Spark服务多种场景下的样例工程,本案例对应示例场景的开发思路:
步骤1:创建MRS集群

1、创建并购买一个包含有Spark2x、Kafka组件的MRS集群,详情请参见MRS用户指南的“购买自定义集群”。
说明:本文以购买的MRS 3.1.0版本的集群为例,集群未开启Kerberos认证。
2、集群购买成功后,在MRS集群的任一节点内,安装集群客户端,具体操作可参考MRS快速入门的“安装并使用集群客户端”。
例如客户端安装目录为“/opt/client”。
步骤2:准备应用程序

1、通过开源镜像站获取样例工程。
下载样例工程的Maven工程源码和配置文件,并在本地配置好相关开发工具,可参考MRS开发指南(普通版_3.x)的“通过开源镜像站获取样例工程”。
根据集群版本选择对应的分支,下载并获取MRS相关样例工程。
例如本章节场景对应示例为“SparkStreamingKafka010JavaExample”样例。
2、本地使用IDEA工具导入样例工程,等待Maven工程下载相关依赖包,具体操作可参考考MRS开发指南(普通版_3.x)的Spark开发指南(普通模式)的“配置并导入样例工程”。
在本示例工程中,通过使用Streaming调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数,关键代码片段如下:
  1. public class StreamingExampleProducer {
  2.     public static void main(String[] args) throws IOException {
  3.         if (args.length < 2) {
  4.             printUsage();
  5.         }
  6.         String brokerList = args[0];
  7.         String topic = args[1];
  8.         String filePath = "/home/data/";    //源数据获取路径
  9.         Properties props = new Properties();
  10.         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
  11.         props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
  12.         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  13.         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  14.         Producer<String, String> producer = new KafkaProducer<String, String>(props);
  15.         for (int m = 0; m < Integer.MAX_VALUE / 2; m++) {
  16.             File dir = new File(filePath);
  17.             File[] files = dir.listFiles();
  18.             if (files != null) {
  19.                 for (File file : files) {
  20.                     if (file.isDirectory()) {
  21.                         System.out.println(file.getName() + "This is a directory!");
  22.                     } else {
  23.                         BufferedReader reader = null;
  24.                         reader = new BufferedReader(new FileReader(filePath + file.getName()));
  25.                         String tempString = null;
  26.                         while ((tempString = reader.readLine()) != null) {
  27.                             // Blank line judgment
  28.                             if (!tempString.isEmpty()) {
  29.                                 producer.send(new ProducerRecord<String, String>(topic, tempString));
  30.                             }
  31.                         }
  32.                         // make sure the streams are closed finally.
  33.                         reader.close();
  34.                     }
  35.                 }
  36.             }
  37.             try {
  38.                 Thread.sleep(3);
  39.             } catch (InterruptedException e) {
  40.                 e.printStackTrace();
  41.             }
  42.         }
  43.     }
  44.     private static void printUsage() {
  45.         System.out.println("Usage: {brokerList} {topic}");
  46.     }
  47. }
复制代码
3、本地配置好Maven及SDK相关参数后,样例工程会自动加载相关依赖包。加载完毕后,执行package打包,获取打包后的Jar文件。
例如打包后的Jar文件为“SparkStreamingKafka010JavaExample-1.0.jar”。
步骤3:上传Jar包及源数据

1、准备向Kafka发送的源数据,例如如下的“input_data.txt”文件,将该文件上传到客户端节点的“/home/data”目录下。
  1. ZhangSan
  2. LiSi
  3. WangwWU
  4. Tom
  5. Jemmmy
  6. LinDa
复制代码
2、将编译后的Jar包上传到客户端节点,例如上传到“/opt”目录。
说明:如果本地网络无法直接连接客户端节点上传文件,可先将jar文件或者源数据上传至OBS文件系统中,然后通过MRS管理控制台集群内的“文件管理”页面导入HDFS中,再通过HDFS客户端使用hdfs dfs -get命令下载到客户端节点本地。
步骤4:运行作业并查看结果

1、使用root用户登录安装了集群客户端的节点。
  1. cd /opt/client
  2. source bigdata_env
复制代码
2、创建用于接收数据的Kafka Topic。
  1. kafka-topics.sh --create --zookeeper quorumpeer实例IP地址:ZooKeeper客户端连接端口/kafka --replication-factor 2 --partitions 3 --topic topic名称
复制代码
quorumpeer实例IP地址可登录集群的FusionInsight Manager界面,在“集群 > 服务 > ZooKeeper > 实例”界面中查询,多个地址可用“,”分隔。ZooKeeper客户端连接端口可通过ZooKeeper服务配置参数“clientPort”查询,默认为2181。
例如执行以下命令:
  1. kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 2 --partitions 2 --topic sparkkafka
复制代码
返回结果如下:
  1. Created topic sparkkafka.
复制代码
3、Topic创建成功后,运行程序向Kafka发送数据。
  1. java -cp /opt/SparkStreamingKafka010JavaExample-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.StreamingExampleProducer Broker实例IP地址:Kafka连接端口 topic名称
复制代码
Kafka Broker实例IP地址可登录集群的FusionInsight Manager界面,在“集群 > 服务 > Kafka > 实例”界面中查询,多个地址可用“,”分隔。Broker端口号可通过Kafka服务配置参数“port”查询,默认为9092。
例如执行以下命令:
  1. java -cp /opt/SparkStreamingKafka010JavaExample-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.StreamingExampleProducer 192.168.0.131:9092 sparkkafka
复制代码
  1. 返回结果如下:
复制代码
  1. ...
  2. transactional.id = null
  3. value.serializer = class org.apache.kafka.common.serialization.StringSerializer
  4. 2022-06-08 15:43:42 INFO  AppInfoParser:117 - Kafka version: xxx
  5. 2022-06-08 15:43:42 INFO  AppInfoParser:118 - Kafka commitId: xxx
  6. 2022-06-08 15:43:42 INFO  AppInfoParser:119 - Kafka startTimeMs: xxx
  7. 2022-06-08 15:43:42 INFO  Metadata:259 - [Producer clientId=DemoProducer] Cluster ID: d54RYHthSUishVb6nTHP0A
复制代码
4、重新打开一个客户端连接窗口,执行以下命令,读取Kafka Topic中的数据。
  1. cd /opt/client/Spark2x/spark
  2. source bigdata_env
  3. bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.KafkaWordCount /opt/SparkStreamingKafka010JavaExample-1.0.jar <checkpointDir> <brokers> <topic> <batchTime>
复制代码
指应用程序结果备份到HDFS的路径,自行指定即可,例如“/tmp”。
指获取元数据的Kafka地址,格式为“Broker实例IP地址:Kafka连接端口”。
指读取Kafka上的topic名称。
指Streaming分批的处理间隔,例如设置为“5”。
例如执行以下命令:
  1. cd /opt/client/Spark2x/spark
  2. source bigdata_env
  3. bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.KafkaWordCount /opt/SparkStreamingKafka010JavaExample-1.0.jar /tmp 192.168.0.131:9092 sparkkafka 5
复制代码
程序运行后,可查看到Kafka中数据的统计结果:
  1. ....
  2. -------------------------------------------                                      
  3. Time: 1654674380000 ms
  4. -------------------------------------------
  5. (ZhangSan,6)
  6. (Tom,6)
  7. (LinDa,6)
  8. (WangwWU,6)
  9. (LiSi,6)
  10. (Jemmmy,6)
  11. -------------------------------------------                                      
  12. Time: 1654674385000 ms
  13. -------------------------------------------
  14. (ZhangSan,717)
  15. (Tom,717)
  16. (LinDa,717)
  17. (WangwWU,717)
  18. (LiSi,717)
  19. (Jemmmy,717)
  20. -------------------------------------------
  21. Time: 1654674390000 ms
  22. -------------------------------------------
  23. (ZhangSan,2326)
  24. (Tom,2326)
  25. (LinDa,2326)
  26. (WangwWU,2326)
  27. (LiSi,2326)
  28. (Jemmmy,2326)
  29. ...
复制代码
5、登录FusionInsight Manager界面,单击“集群 > 服务 > Spark2x”。
6、在服务概览页面点击Spark WebUI后的链接地址,可进入History Server页面。
单击待查看的App ID,您可以查看Spark Streaming作业的状态。
----结束
好了,本期云小课就介绍到这里,快去体验MapReduce(MRS)更多功能吧!猛戳这里
 
点击关注,第一时间了解华为云新鲜技术~

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4