ToB企服应用市场:ToB评测及商务社交产业平台
标题:
基于Hadoop的云计算与大数据处理(Spark Streaming WordCount)
[打印本页]
作者:
农妇山泉一亩田
时间:
2024-6-15 01:04
标题:
基于Hadoop的云计算与大数据处理(Spark Streaming WordCount)
实验目的
1.了解Spark Streaming的框架结构
2.准确理解Spark Streaming的实现原理
3.熟练把握Spark Streaming进行WordCount的实验流程
实验原理
Spark是一个雷同于MapReduce的分布式计算框架,其核心是弹性分布式数据集,提供了比MapReduce更丰富的模子,可以快速在内存中对数据集进行多次迭代,以支持复杂的数据挖掘算法和图形计算算法。Spark Streaming是一种构建在Spark上的及时计算框架,它扩展了Spark处理大规模流式数据的能力。
1.Spark Streaming的上风
(1)能运行在100个以上的结点上,并达到秒级延迟;
(2)使用基于内存的Spark作为执行引擎,具有高效和容错的特性;
(3)能集成Spark的批处理和交互查询;
(4)为实现复杂的算法提供了与批处理雷同的简单接口。
2.基于Spark on Yarn的Spark Streaming总体架构如下图所示:
Spark on Yarn启动后,由Spark AppMaster把Receiver作为一个Task提交给某一个Spark Executor;Receive启动后输入数据,天生数据块,然后通知Spark AppMaster;Spark AppMaster会根据数据块天生相应的Job,并把Job的Task提交给空闲Spark Executor 执行。图中粗箭头显示被处理的数据流,输入数据流可以是磁盘、网络和HDFS等,输出可以是HDFS,数据库等。
3.Spark Streaming的根本原理
将输入数据流以时间片(秒级)为单位进行拆分,然后以雷同批处理的方式处理每个时间片数据,其根本原理如下图所示。
首先,Spark Streaming把及时输入数据流以时间片Δt (如1秒)为单位切分成块。Spark Streaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据。每个块都会天生一个Spark Job处理,终极效果也返回多块。
4.Spark Streaming内部实现原理
使用Spark Streaming编写的程序与编写Spark程序非常相似,在Spark程序中,主要通过操作RDD(Resilient Distributed Datasets弹性分布式数据集)提供的接口,如map、reduce、filter等,实现数据的批处理。而在Spark Streaming中,则通过操作DStream(表示数据流的RDD序列)提供的接口,这些接口和RDD提供的接口雷同。
实验情况
Linux Ubuntu 16.04
jdk-7u75-linux-x64
scala-2.10.5
spark-1.6.0-bin-hadoop2.6
hadoop-2.6.0-cdh5.4.5
hadoop-2.6.0-eclipse-cdh5.4.5.jar
eclipse-java-juno-SR2-linux-gtk-x86_64
实验内容
下图为项目的流程图,通过nc下令,向9999端口持续发送消息,并使用spark streaming对从9999端口发来的数据进行统计,将统计的效果输出到console界面上。
实验步骤
1.使用jps查看HDFS以及Spark是否已经启动,若未启动,则切换对应目次下,启动Hadoop及Spark。
view plain copy
jps
cd /apps/hadoop/sbin
./start-dfs.sh
view plain copy
cd /apps/spark/sbin
./start-all.sh
2.使用nc下令,向9999端口,发送数据。
view plain copy
nc -lk 9999
nc 为netcat,一般多用于在局域网之间传输文件。在执行nc -lk 9999后,界面会进入持续等待输入内容状态。我们可以任意输入一些文字,来作为输入,发给9999端口。
3.打开一个新的毗连窗口,并切换目次到/apps/spark目次下,调用spark的example中,自带的wordcount程序。
view plain copy
cd /apps/spark
bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999
这里,可以调用example中,使用java程序编写的wordcount。在这里localhost和9999为spark steaming接收数据的主机和端口。在此作为参数,传递给wordcount程序。
可以看到wordcount程序,一直等待继承数据。
4.在执行nc -lk 9999的界面中,输入一些文本,并按回车。
view plain copy
hello spark streaming hello hadoop
再次切换到执行wordcount程序的终端界面,可以看到spark streaming对输入的数据,进行统计,得到单词个数如下:
此程序可以表明,Spark Streaming数据处理流程。
在两个终端界面中,输入CTRL+C,可以终止程序的运行。
5.使用Spark Streaming的Scala API编写wordcount程序,以实现对单词个数的统计。
首先在Linux本地,新建/data/spark7目次,用于存放所需文件。
view plain copy
mkdir -p /data/spark7
切换目次到/data/spark7下,使用wget下令,下载项目所需jar包spark-assembly-1.6.0-hadoop2.6.0.jar。
view plain copy
cd /data/spark7
wget http://172.16.103.12:60000/allfiles/spark7/spark-assembly-1.6.0-hadoop2.6.0.jar
创建一个Scala项目,定名为spark7。
在spark7项目下创建包,包名为my.sparkstreaming。
在my.sparkstreaming包下创建Scala Object,名为NetworkWordCount。
6.右键单击项目,新建一个目次,名为spark7lib,用于存放项目所需的jar包
将/data/spark7目次下的spark-assembly-1.6.0-hadoop2.6.0.jar包,拷贝到eclipse中spark7项目的spark7lib目次下
选中spark7lib目次下所有jar包,单击右键,选择Build Path→Add to Build Path。
7.编写sparkstreaming的wordcount代码。
view plain copy
if
(args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}
首先传递两个参数,第一个参数为发送数据的地址,第二个参数为发送数据的端口号。
view plain copy
val sparkConf =
new
SparkConf().setAppName("networkwordcount").setMaster("spark://localhost:7077")
创建一个sparkconf对象。
view plain copy
val ssc =
new
StreamingContext(sparkConf, Seconds(1) )
设置监听数据的时间窗口为1分钟。
view plain copy
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
Spark继承数据,并对数据进行存储。
view plain copy
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map( wd => (wd, 1)).reduceByKey( _ + _ )
wordCounts.print();
对接收到数据,放置在一行,并对数据以空格分隔。进行map和reduce操作。得到每个单词出现的个数,并进行打印输出。
view plain copy
ssc.start();
ssc.awaitTermination();
最后这两行代码,是开始执行,前面所界说的Spark Streaming的任务。
完备代码如下:
view plain copy
package
my.sparkstreaming
import
org.apache.spark.SparkConf
import
org.apache.spark.streaming.StreamingContext
import
org.apache.spark.streaming.Seconds
import
org.apache.spark.storage.StorageLevel
object NetworkWordCount {
def main(args: Array[String]) {
if
(args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}
val sparkConf =
new
SparkConf().setAppName("networkwordcount").setMaster("spark://localhost:7077")
val ssc =
new
StreamingContext(sparkConf, Seconds(1) )
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map( wd => (wd, 1)).reduceByKey( _ + _ )
wordCounts.print();
ssc.start();
ssc.awaitTermination();
}
}
8.切换到Linux本地,执行nc下令,发送数据。
view plain copy
nc -lk 9999
使用Spark Streaming处理发来的数据。
在NetworkWordCount.scala类上,单击右键选择Run As=>Run Configurations=>Arguments
在Program arguments后面的文本框中,输入执行nc下令发送数据的ip和端口。
view plain copy
localhost 9999
然后点击Main,进入下面界面,查看项目名和主类名是否与程序的项目名和主类名对应,
若不对应,则在Project下面的文本框,输入本程序的项目名为
view plain copy
spark7
在Main class下面的文本框中,输入本程序的包名.类名
view plain copy
my.sparkstreaming.NetworkWordCount
点击Run 执行。
9.在nc窗口,输入数据"hello world hello hadoop"。
view plain copy
hello word hello hadoop
可以看到程序的console界面,输出为:
实验结论及心得
1. 了解了Spark Streaming的框架结构,包罗Spark Streaming的总体架构和内部实现原理。
2. 对Spark Streaming的实现原理有了准确的理解,了解了将输入数据流以时间片为单位拆分并使用RDD操作处理的根本原理。
3. 熟练把握了使用Spark Streaming进行WordCount的实验流程。
4. Spark Streaming是一种强大的及时计算框架,通过扩展Spark的能力,可以快速处理大规模流式数据。
5. Spark Streaming的上风在于可以运行在大规模集群上并实现秒级延迟,同时联合了Spark的高效和容错特性。
6. 理解Spark Streaming的框架结构和实现原理对于正确使用和优化Spark Streaming应用程序非常紧张。
7. 在实验中,学会了使用Spark Streaming进行WordCount实验的流程,这是Spark Streaming的入门应用,也是理解Spark Streaming根本原理的一种方式。
通过这次实验,对Spark Streaming有了更深入的了解,并且把握了相关操作的根本流程。这将有助于在实际应用中更好地使用Spark Streaming进行及时计算和数据处理任务。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4