实验目的
1.了解Spark Streaming的框架结构
2.准确理解Spark Streaming的实现原理
3.熟练把握Spark Streaming进行WordCount的实验流程
实验原理
Spark是一个雷同于MapReduce的分布式计算框架,其核心是弹性分布式数据集,提供了比MapReduce更丰富的模子,可以快速在内存中对数据集进行多次迭代,以支持复杂的数据挖掘算法和图形计算算法。Spark Streaming是一种构建在Spark上的及时计算框架,它扩展了Spark处理大规模流式数据的能力。
1.Spark Streaming的上风
(1)能运行在100个以上的结点上,并达到秒级延迟;
(2)使用基于内存的Spark作为执行引擎,具有高效和容错的特性;
(3)能集成Spark的批处理和交互查询;
(4)为实现复杂的算法提供了与批处理雷同的简单接口。
2.基于Spark on Yarn的Spark Streaming总体架构如下图所示:
Spark on Yarn启动后,由Spark AppMaster把Receiver作为一个Task提交给某一个Spark Executor;Receive启动后输入数据,天生数据块,然后通知Spark AppMaster;Spark AppMaster会根据数据块天生相应的Job,并把Job的Task提交给空闲Spark Executor 执行。图中粗箭头显示被处理的数据流,输入数据流可以是磁盘、网络和HDFS等,输出可以是HDFS,数据库等。
3.Spark Streaming的根本原理
将输入数据流以时间片(秒级)为单位进行拆分,然后以雷同批处理的方式处理每个时间片数据,其根本原理如下图所示。
首先,Spark Streaming把及时输入数据流以时间片Δt (如1秒)为单位切分成块。Spark Streaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据。每个块都会天生一个Spark Job处理,终极效果也返回多块。
4.Spark Streaming内部实现原理
使用Spark Streaming编写的程序与编写Spark程序非常相似,在Spark程序中,主要通过操作RDD(Resilient Distributed Datasets弹性分布式数据集)提供的接口,如map、reduce、filter等,实现数据的批处理。而在Spark Streaming中,则通过操作DStream(表示数据流的RDD序列)提供的接口,这些接口和RDD提供的接口雷同。
实验情况
Linux Ubuntu 16.04
jdk-7u75-linux-x64
scala-2.10.5
spark-1.6.0-bin-hadoop2.6
hadoop-2.6.0-cdh5.4.5
hadoop-2.6.0-eclipse-cdh5.4.5.jar
eclipse-java-juno-SR2-linux-gtk-x86_64
实验内容
下图为项目的流程图,通过nc下令,向9999端口持续发送消息,并使用spark streaming对从9999端口发来的数据进行统计,将统计的效果输出到console界面上。
实验步骤
1.使用jps查看HDFS以及Spark是否已经启动,若未启动,则切换对应目次下,启动Hadoop及Spark。
view plain copy
- jps
- cd /apps/hadoop/sbin
- ./start-dfs.sh
view plain copy
- cd /apps/spark/sbin
- ./start-all.sh
2.使用nc下令,向9999端口,发送数据。
view plain copy
nc 为netcat,一般多用于在局域网之间传输文件。在执行nc -lk 9999后,界面会进入持续等待输入内容状态。我们可以任意输入一些文字,来作为输入,发给9999端口。
3.打开一个新的毗连窗口,并切换目次到/apps/spark目次下,调用spark的example中,自带的wordcount程序。
view plain copy
- cd /apps/spark
- bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999
这里,可以调用example中,使用java程序编写的wordcount。在这里localhost和9999为spark steaming接收数据的主机和端口。在此作为参数,传递给wordcount程序。
可以看到wordcount程序,一直等待继承数据。
4.在执行nc -lk 9999的界面中,输入一些文本,并按回车。
view plain copy
- hello spark streaming hello hadoop
再次切换到执行wordcount程序的终端界面,可以看到spark streaming对输入的数据,进行统计,得到单词个数如下:
此程序可以表明,Spark Streaming数据处理流程。
在两个终端界面中,输入CTRL+C,可以终止程序的运行。
5.使用Spark Streaming的Scala API编写wordcount程序,以实现对单词个数的统计。
首先在Linux本地,新建/data/spark7目次,用于存放所需文件。
view plain copy
切换目次到/data/spark7下,使用wget下令,下载项目所需jar包spark-assembly-1.6.0-hadoop2.6.0.jar。
view plain copy
- cd /data/spark7
- wget http://172.16.103.12:60000/allfiles/spark7/spark-assembly-1.6.0-hadoop2.6.0.jar
创建一个Scala项目,定名为spark7。
在spark7项目下创建包,包名为my.sparkstreaming。
在my.sparkstreaming包下创建Scala Object,名为NetworkWordCount。
6.右键单击项目,新建一个目次,名为spark7lib,用于存放项目所需的jar包
将/data/spark7目次下的spark-assembly-1.6.0-hadoop2.6.0.jar包,拷贝到eclipse中spark7项目的spark7lib目次下
选中spark7lib目次下所有jar包,单击右键,选择Build Path→Add to Build Path。
7.编写sparkstreaming的wordcount代码。
view plain copy
- if (args.length < 2) {
- System.err.println("Usage: NetworkWordCount <hostname> <port>")
- System.exit(1)
- }
首先传递两个参数,第一个参数为发送数据的地址,第二个参数为发送数据的端口号。
view plain copy
- val sparkConf = new SparkConf().setAppName("networkwordcount").setMaster("spark://localhost:7077")
创建一个sparkconf对象。
view plain copy
- val ssc = new StreamingContext(sparkConf, Seconds(1) )
设置监听数据的时间窗口为1分钟。
view plain copy
- val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
Spark继承数据,并对数据进行存储。
view plain copy
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map( wd => (wd, 1)).reduceByKey( _ + _ )
- wordCounts.print();
对接收到数据,放置在一行,并对数据以空格分隔。进行map和reduce操作。得到每个单词出现的个数,并进行打印输出。
view plain copy
- ssc.start();
- ssc.awaitTermination();
最后这两行代码,是开始执行,前面所界说的Spark Streaming的任务。
完备代码如下:
view plain copy
- package my.sparkstreaming
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.StreamingContext
- import org.apache.spark.streaming.Seconds
- import org.apache.spark.storage.StorageLevel
- object NetworkWordCount {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: NetworkWordCount <hostname> <port>")
- System.exit(1)
- }
-
- val sparkConf = new SparkConf().setAppName("networkwordcount").setMaster("spark://localhost:7077")
- val ssc = new StreamingContext(sparkConf, Seconds(1) )
- val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map( wd => (wd, 1)).reduceByKey( _ + _ )
- wordCounts.print();
- ssc.start();
- ssc.awaitTermination();
-
- }
- }
8.切换到Linux本地,执行nc下令,发送数据。
view plain copy
使用Spark Streaming处理发来的数据。
在NetworkWordCount.scala类上,单击右键选择Run As=>Run Configurations=>Arguments
在Program arguments后面的文本框中,输入执行nc下令发送数据的ip和端口。
view plain copy
然后点击Main,进入下面界面,查看项目名和主类名是否与程序的项目名和主类名对应,
若不对应,则在Project下面的文本框,输入本程序的项目名为
view plain copy
在Main class下面的文本框中,输入本程序的包名.类名
view plain copy
- my.sparkstreaming.NetworkWordCount
点击Run 执行。
9.在nc窗口,输入数据"hello world hello hadoop"。
view plain copy
可以看到程序的console界面,输出为:
实验结论及心得
1. 了解了Spark Streaming的框架结构,包罗Spark Streaming的总体架构和内部实现原理。
2. 对Spark Streaming的实现原理有了准确的理解,了解了将输入数据流以时间片为单位拆分并使用RDD操作处理的根本原理。
3. 熟练把握了使用Spark Streaming进行WordCount的实验流程。
4. Spark Streaming是一种强大的及时计算框架,通过扩展Spark的能力,可以快速处理大规模流式数据。
5. Spark Streaming的上风在于可以运行在大规模集群上并实现秒级延迟,同时联合了Spark的高效和容错特性。
6. 理解Spark Streaming的框架结构和实现原理对于正确使用和优化Spark Streaming应用程序非常紧张。
7. 在实验中,学会了使用Spark Streaming进行WordCount实验的流程,这是Spark Streaming的入门应用,也是理解Spark Streaming根本原理的一种方式。
通过这次实验,对Spark Streaming有了更深入的了解,并且把握了相关操作的根本流程。这将有助于在实际应用中更好地使用Spark Streaming进行及时计算和数据处理任务。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |