ToB企服应用市场:ToB评测及商务社交产业平台
标题:
【智能大数据分析 | 实验四】Spark实验:Spark Streaming
[打印本页]
作者:
熊熊出没
时间:
2024-10-22 15:02
标题:
【智能大数据分析 | 实验四】Spark实验:Spark Streaming
【作者主页】Francek Chen
【专栏介绍】 ⌈ ⌈ ⌈智能大数据分析 ⌋ ⌋ ⌋ 智能大数据分析是教唆用先进的技能和算法对大规模数据举行深入分析和挖掘,以提取有价值的信息和洞察。它联合了大数据技能、人工智能(AI)、机器学习(ML)和数据挖掘等多种方法,旨在通过自动化的方式分析复杂数据集,发现潜在的价值和关联性,实现数据的自动化处理和分析,从而支持决媾和优化业务流程。与传统的人工分析相比,智能大数据分析具有自动化、深度挖掘、及时性和可视化等特点。智能大数据分析广泛应用于各个范畴,包括金融服务、医疗康健、零售、市场营销等,帮助企业做出更为精准的决议,提升竞争力。
【GitCode】专栏资源生存在我的GitCode仓库:https://gitcode.com/Morse_Chen/Intelligent_bigdata_analysis。
一、实验目的
相识 Spark Streaming 版本的 WordCount 和 MapReduce 版本的 WordCount 的区别;
理解 Spark Streaming 的工作流程;
理解 Spark Streaming 的工作原理。
二、实验要求
要求实验竣事时,每位学生能正确运行乐资本实验中所写的 jar 包程序,能正确的盘算出单词数目。
三、实验原理
(一)Spark Streaming 架构
盘算流程
:Spark Streaming 是将流式盘算分解成一系列短小的批处理作业。这里的批处理引擎是 Spark,也就是把 Spark Streaming 的输入数据按照 batch size(如1秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成 Spark 中的 RDD(Resilient Distributed Dataset),然后将 Spark Streaming 中对 DStream 的 Transformation 操作变为针对 Spark 中对 RDD 的 Transformation 操作,将 RDD 经过操作酿成中间结果生存在内存中。整个流式盘算根据业务的需求可以对中间的结果举行叠加,或者存储到外部设备。如图1所示:
图1
容错性
:对于流式盘算来说,容错性至关重要。起首我们要明白一下 Spark 中 RDD 的容错机制。每一个 RDD 都是一个不可变的分布式可重算的数据集,其纪录着确定性的操作继续关系(lineage),所以只要输入数据是可容错的,那么恣意一个 RDD 的分区(Partition)堕落或不可用,都是可以使用原始输入数据通过转换操作而重新算出的。
对于 Spark Streaming 来说,其 RDD 的传承关系如下图所示,图中的每一个椭圆形表现一个 RDD,椭圆形中的每个圆形代表一个 RDD 中的一个 Partition,图中的每一列的多个 RDD 表现一个 DStream(图中有三个 DStream),而每一行末了一个 RDD 则表现每一个 Batch Size 所产生的中间结果 RDD。我们可以看到图中的每一个 RDD 都是通过 lineage 相毗连的,由于 Spark Streaming 输入数据可以来自于磁盘,例如 HDFS(多份拷贝)或是来自于网络的数据流(Spark Streaming 会将网络输入数据的每一个数据流拷贝两份到其他的机器)都能保证容错性。所以 RDD 中恣意的 Partition 堕落,都可以并行地在其他机器上将缺失的 Partition 盘算出来。这个容错恢复方式比连续盘算模型(如 Storm)的服从更高。 如图2所示:
图2
及时性
:对于及时性的讨论,会牵涉到流式处理框架的应用场景。Spark Streaming 将流式盘算分解成多个 Spark Job,对于每一段数据的处理都会经过 Spark DAG 图分解,以及 Spark 的任务集的调度过程。对于目前版本的 Spark Streaming 而言,其最小的 Batch Size 的选取在0.5~2秒钟之间(Storm 目前最小的延迟是100ms左右),所以 Spark Streaming 能够满足除对及时性要求非常高(如高频及时生意业务)之外的全部流式准及时盘算场景。
扩展性与吞吐量
:Spark 目前在 EC2 上已能够线性扩展到100个节点(每个节点4Core),可以以数秒的延迟处理6GB/s的数据量(60M records/s),其吞吐量也比流行的 Storm 高2~5倍,图3是 Berkeley 使用 WordCount 和 Grep 两个用例所做的测试,在 Grep 这个测试中,Spark Streaming 中的每个节点的吞吐量是 670k records/s,而 Storm 是 115k records/s。如图3所示:
图3
(二)Spark Streaming 编程模型
Spark Streaming 的编程和 Spark 的编程如出一辙,对于编程的理解也非常类似。对于 Spark 来说,编程就是对于 RDD 的操作;而对于 Spark Streaming 来说,就是对 DStream 的操作。下面将通过一个各人熟悉的 WordCount 的例子来说明 Spark Streaming 中的输入操作、转换操作和输出操作。
Spark Streaming 初始化
:在开始举行 DStream 操作之前,需要对 Spark Streaming 举行初始化天生 StreamingContext。参数中比较重要的是第一个和第三个,第一个参数是指定 Spark Streaming 运行的集群地点,而第三个参数是指定 Spark Streaming 运行时的 batch 窗口大小。在这个例子中就是将1秒钟的输入数据举行一次 Spark Job 处理。
val ssc = new StreamingContext("Spark://…", "WordCount", Seconds(1), [Homes], [Jars])
Spark Streaming 的输入操作
:目前 Spark Streaming 已支持了丰富的输入接口,大致分为两类:一类是磁盘输入,如以 batch size 作为时间间隔监控 HDFS 文件系统的某个目录,将目录中内容的变革作为 Spark Streaming 的输入;另一类就是网络流的方式,目前支持 Kafka、Flume、Twitter 和 TCP socket。在 WordCount 例子中,假定通过网络 socket 作为输入流,监听某个特定的端口,末了得出输入 DStream(lines)。
val lines = ssc.socketTextStream("localhost",8888)
Spark Streaming 的转换操作
:与 Spark RDD 的操作极为类似,Spark Streaming 也就是通过转换操作将一个或多个 DStream 转换成新的 DStream。常用的操作包括 map、filter、flatmap 和 join,以及需要举行 shuffle 操作的 groupByKey/reduceByKey 等。在 WordCount 例子中,我们起首需要将 DStream(lines) 切分成单词,然后将相同单词的数目举行叠加, 终极得到的 wordCounts 就是每一个 batch size 的(单词,数目)中间结果。
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
另外,Spark Streaming 有特定的
窗口操作
,窗口操作涉及两个参数:一个是滑动窗口的宽度(Window Duration);另一个是窗口滑动的频率(Slide Duration),这两个参数必须是 batch size 的倍数。例如以过去5秒钟为一个输入窗口,每1秒统计一下 WordCount,那么我们会将过去5秒钟的每一秒钟的 WordCount 都举行统计,然后举行叠加,得出这个窗口中的单词统计。
val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, Seconds(5s),seconds(1))
但上面这种方式还不够高效。如果我们以增量的方式来盘算就更加高效,例如,盘算 t+4 秒这个时刻过去5秒窗口的 WordCount,那么我们可以将 t+3 时刻过去5秒的统计量加上 [t+3,t+4] 的统计量,在减去 [t-2,t-1] 的统计量,这种方法可以复用中间三秒的统计量,提高统计的服从。如图4所示:
val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(5s),seconds(1))
图4
Spark Streaming 的输出操作
:对于输出操作,Spark 提供了将数据打印到屏幕及输入到文件中。在 WordCount 中我们将 DStream wordCounts 输入到 HDFS 文件中。
wordCounts = saveAsHadoopFiles("WordCount")
Spark Streaming 启动
:经过上述的操作,Spark Streaming 还没有举行工作,我们还需要调用 Start 操作,Spark Streaming 才开始监听相应的端口,然后收取数据,并举行统计。
ssc.start()
(三)Spark Streaming 典型案例
在互联网应用中,网站流量统计作为一种常用的应用模式,需要在不同粒度上对不同数据举行统计,既有及时性的需求,又需要涉及到聚合、去重、毗连等较为复杂的统计需求。传统上,若是使用 Hadoop MapReduce 框架,虽然可以容易地实现较为复杂的统计需求,但及时性却无法得到保证;反之若是接纳 Storm 如许的流式框架,及时性虽可以得到保证,但需求的实现复杂度也大大提高了。Spark Streaming 在两者之间找到了一个平衡点,能够以准及时的方式容易地实现较为复杂的统计需求。 下面介绍一下使用 Kafka 和 Spark Streaming 搭建及时流量统计框架。
数据暂存
:Kafka 作为分布式消息队列,既有非常良好的吞吐量,又有较高的可靠性和扩展性,在这里接纳Kafka作为日记传递中间件来接收日记,抓取客户端发送的流量日记,同时接受 Spark Streaming 的哀求,将流量日记按序发送给 Spark Streaming 集群。
数据处理
:将 Spark Streaming 集群与 Kafka 集群对接,Spark Streaming 从 Kafka 集群中获取流量日记并举行处理。Spark Streaming 会及时地从 Kafka 集群中获取数据并将其存储在内部的可用内存空间中。当每一个 batch 窗口到来时,便对这些数据举行处理。
结果存储
:为了便于前端展示和页面哀求,处理得到的结果将写入到数据库中。
相比于传统的处理框架,Kafka+Spark Streaming 的架构有以下几个优点。Spark 框架的高效和低延迟保证了 Spark Streaming 操作的准及时性。使用 Spark 框架提供的丰富 API 和高灵活性,可以精简地写出较为复杂的算法。编程模型的高度同等使得上手 Spark Streaming 相称容易,同时也可以保证业务逻辑在及时处理和批处理上的复用。
Spark Streaming 提供了一套高效、可容错的准及时大规模流式处理框架,它能和批处理及即时查询放在同一个软件栈中。如果你学会了 Spark 编程,那么也就学会了 Spark Streaming 编程,如果理解了 Spark 的调度和存储,Spark Streaming 也类似。按照目前的发展趋势,Spark Streaming 肯定将会得到更大范围的使用。
四、实验环境
云创大数据实验平台:
Java 版本:jdk1.7.0_79
Hadoop 版本:hadoop-2.7.1
Spark 版本:spark-1.6.0
ZooKeeper 版本:zookeeper-3.4.6
Kafka 版本:kafka_2.10-0.9.0.1
IntelliJ IDEA 版本:IntelliJ IDEA Community Edition 2016.3.1
五、实验步骤
(一)启动 Hadoop 集群和 Spark 集群
详细摆设 Hadoop 和 Spark 集群的步骤可参考:【智能大数据分析 | 实验二】Spark实验:摆设Spark集群
这里,登录大数据实验一体机,启动实验,并点击右上方的一键搭建按钮,等待一键搭建完成。
使用jps
查验 Hadoop 集群和 Spark 集群是否乐成启动。乐成启动 Hadoop 集群和 Spark 集群的情况使用jps
命令能乐成看到以下 java 历程。
jps
复制代码
(二)编写 SparkStreaming 代码
打开 IntelliJ IDEA 准备编写 Spark-streaming 代码。点击 File -> New -> Module -> Maven -> Next -> 输入 GroupId 和 AriifactId -> Next -> 输入 Module name 新建一个 maven 的 Module。
打开项目录,点击目录下的pom.xml文件,在标签中输入 maven 的依赖。然后右键 -> maven -> Reimport 导入 maven 依赖, 结果如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cstor.sparkstreaming</groupId>
<artifactId>nice</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
<!-- https://mvnrepository.com/artifact/org.apache.spark/Spark Streaming_2.10 -->
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies>
</project>
复制代码
在src/main/java的目录下,点击java目录新建一个 package 命名为spark.streaming.test,然后在包下新建一个SparkStreaming的 java class。在SparkStreaming中键入代码。
package spark.streaming.test;
import scala.Tuple2;
import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import java.util.Iterator;
import java.util.regex.Pattern;
public class SparkStreaming {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws InterruptedException {
if (args.length < 2) {
System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x){
return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
ssc.start();
ssc.awaitTermination();
}
}
复制代码
点击 File -> Project Structure -> Aritifacts -> 点击加号 -> JAR -> from modules with dependences -> 选择刚才新建的 module -> 选择 Main Class -> Ok -> 选择 Output directory -> 点击 Ok。
去掉除guava-14.0.1.jar和guice-3.0.jar以外全部的 JAR 包,点击 Apply,再点击 Ok。
点击 Build -> Build Aritifacts 。
然后,就可以在类似该路径下D:\DELL\AppData\IdealWorkSpace\out\artifacts\sparkstreaming_jar找到刚才天生的 jar 包。
选择刚才设置的 jar 包,上传到 master 上去。
(三)运行 Sparksteaming JAR包
新建一个 SSH 毗连,登录 master 服务器,使用命令nc -lk 9999
设置路由器。
nc -lk 9999
复制代码
注
:如果系统只没有nc这个命令,可以使用yum install nc安装nc命令。
进入 spark 的安装目录,执行下面的命令。
cd /usr/cstor/spark
bin/spark-submit --class spark.streaming.test.SparkStreaming ~/sparkstreaming.jar localhost 9999
复制代码
在网络流中输入单词。按回车竣事一次输出。
在命令提交的 xshell 毗连中观察程序输出。按 Ctrl+C 可终止程序运行。
六、实验结果
在提交任务之后应该能看到以下结果(因屏幕刷新很快,所以只能看到部门结果)。在nc -lk 9999
命令下输入:
所示结果中应该立刻表现出如下内容:
七、实验心得
深入理解 Spark Streaming 的工作原理
: 通过本次实验,我对 Spark Streaming 的流处理机制有了更直观的理解。实验让我看到,Spark Streaming 通过将流式数据划分成一系列的批处理任务,将及时数据按指定时间窗口转换为 RDD,并对 RDD 举行一系列的转换操作。这种批处理方式较好地平衡了及时性和容错性,能够处理大规模的数据流并确保系统的稳固运行。
不同于传统 MapReduce 的及时性处理
: 实验中,我们使用了类似 WordCount 的例子,直观地体会到 Spark Streaming 相比 MapReduce 在及时处理方面的优势。传统的 MapReduce 虽然适合处理大批量数据,但及时性表现较差。而 Spark Streaming 能将数据按时间窗口举行切片处理,险些能做到准及时的盘算,这对于需要快速响应的应用场景非常适用。
Kafka与Spark Streaming的联合
: 实验提到了通过 Kafka 举行流数据传输的典型应用案例。这让我意识到,Kafka 作为消息队列与 Spark Streaming 的联合,不但提高了系统的数据吞吐量,还能保证数据的可靠性和扩展性。在当代大数据处理环境中,这种组合能更好地满足高效处理及时数据的需求。
编程实践中的挑衅与收获
: 实验过程中,我实际编写并运行了 Spark Streaming 程序。在编程实践中,我学会了如何通过 Java 编写流处理任务,如何通过 socket 监听数据流,并通过 RDD 转换和窗口操作处理数据。实验对编码要求较高,我在调试过程中也遇到了一些标题,比如依赖包的导入、环境配置等,这些标题的办理过程让我对大数据编程环境的搭建有了更多的实战经验。
系统的扩展性与容错性
: 实验中,Spark Streaming 展示了其在扩展性和容错性方面的优势。通过对 RDD lineage 的追踪机制,即使在某些节点发生故障时,系统也能够通过重新盘算 RDD 的分区来恢复数据。这种容错机制相较于其他及时流处理框架如 Storm 更加高效可靠。
总的来说,本次实验让我更好地理解了 Spark Streaming 的工作机制和实际应用场景,同时也强化了我的编程本领和对大数据处理框架的认识。
附
:以上文中的数据文件及相关资源下载地点:
链接:https://pan.quark.cn/s/e71b7b0d440a
提取码:rB2f
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4