ToB企服应用市场:ToB评测及商务社交产业平台

标题: 大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队 [打印本页]

作者: 十念    时间: 2024-8-27 20:58
标题: 大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队
点一下关注吧!!!非常感谢!!连续更新!!!

现在已经更新到了:


章节内容

上节我们完成了如下的内容:


根本概念

根本数据源包括:


引入依赖

我们利用的话,需要引入依赖:
  1. <dependency>
  2.   <groupId>org.apache.spark</groupId>
  3.   <artifactId>spark-streaming_2.12</artifactId>
  4.   <version>${spark.version}</version>
  5. </dependency>
复制代码
文件数据流

根本概念

通过 textFileStreama 方法进行读取 HDFS 兼容的文件体系文件
Spark Streaming 将会监控 directory 目录,并不停处理移动进来的文件

编写代码

  1. package icu.wzk
  2. import org.apache.log4j.{Level, Logger}
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.streaming.{Seconds, StreamingContext}
  5. object FileDStream {
  6.   def main(args: Array[String]): Unit = {
  7.     Logger.getLogger("org").setLevel(Level.ERROR)
  8.     val conf = new SparkConf()
  9.       .setAppName("FileDStream")
  10.       .setMaster("local[*]")
  11.     // 时间间隔
  12.     val ssc = new StreamingContext(conf, Seconds(5))
  13.     // 本地文件,也可以使用 HDFS 文件
  14.     val lines = ssc.textFileStream("goodtbl.java")
  15.     val words = lines.flatMap(_.split("\\s+"))
  16.     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
  17.     // 打印信息
  18.     wordCounts.print()
  19.     ssc.start()
  20.     ssc.awaitTermination()
  21.   }
  22. }
复制代码
代码分析


运行效果

【备注:利用 local[],可能会存在问题。】
【如果给虚拟机配置的CPU数为1,利用 local[
] 也会只启动一个线程,该线程用于 Receiver Task,此时没有资源处理继承到达的数据。】
【现象:程序正常执行,不会打印时间戳,屏幕上也不会有其他有消息信息】

Socket数据流

编写代码

Spark Streaming 可以通过Socket端口监听并继承数据,然后进行相应处理:
打开一个新的命令窗口,启动 nc 程序。(在Flink中也这么用过)
  1. # 如果没有的话 你需要安装一下
  2. nc -lk 9999
复制代码
编写运行的代码:
  1. package icu.wzk
  2. import org.apache.log4j.{Level, Logger}
  3. import org.apachea.spark.SparkConf
  4. import org.apache.spark.streaming.{Seconds, StreamingContext}
  5. object SocketDStream {
  6.   def main(args: Array[String]): Unit = {
  7.     Logger.getLogger("org").setLevel(Level.ERROR)
  8.     val conf = new SparkConf()
  9.       .setAppName("SocketStream")
  10.       .setMaster("local[*]")
  11.     val ssc = new StreamingContext(conf, Seconds(1));
  12.     val lines = ssc.socketTextStream("0.0.0.0", 9999)
  13.     val words = lines.flatMap(_.split("\\s+"))
  14.     val wordCount = words.map(x => (x.trim, 1)).reduceByKey(_ + _)
  15.     wordCount.print()
  16.     ssc.start()
  17.     ssc.awaitTermination()
  18.   }
  19. }
复制代码
随后可以在nc窗口中随意输入一些单词,监听窗口会自动获取单词数据流信息,在监听窗口每X秒就会打印出词频的统计信息,可以在屏幕是上出现效果。
运行效果

【备注:利用 local[],可能会存在问题。】
【如果给虚拟机配置的CPU数为1,利用 local[
] 也会只启动一个线程,该线程用于 Receiver Task,此时没有资源处理继承到达的数据。】
【现象:程序正常执行,不会打印时间戳,屏幕上也不会有其他有消息信息】

此时,从控制台启动后,输入内容

RDD队列流

根本概念

调用 Spark Streaming应用程序的时候,可利用 streamingContext.queueStream(queueOfRDD)创建基于RDD队列的DStream
备注:

每秒创建一个RDD(RDD存放1-100的整数),Streaming每隔1秒就对数据进行处理,盘算RDD中数据除10取余的个数。
队列流长处


编写代码

编写代码如下:
  1. package icu.wzk
  2. import org.apache.log4j.{Level, Logger}
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.rdd.RDD
  5. import org.apache.spark.streaming.{Seconds, StreamingContext}
  6. import scala.collection.mutable.Queue
  7. object RDDQueueDStream {
  8.   def main(args: Array[String]): Unit = {
  9.     Logger.getLogger("org").setLevel(Level.WARN)
  10.     val sparkConf = new SparkConf()
  11.       .setAppName("RDDQueueStream")
  12.       .setMaster("local[*]")
  13.     val ssc = new StreamingContext(sparkConf, Seconds(1))
  14.     val rddQueue = new Queue[RDD[Int]]()
  15.     val queueStream = ssc.queueStream(rddQueue)
  16.     val mappedStream = queueStream.map(r => (r % 10, 1))
  17.     val reducedStream = mappedStream.reduceByKey(_ + _)
  18.     reducedStream.print()
  19.     ssc.start()
  20.     for (i <- 1 to 5) {
  21.       rddQueue.synchronized {
  22.         val range = (1 to 100).map(_ * i)
  23.         rddQueue += ssc.sparkContext.makeRDD(range, 2)
  24.       }
  25.       Thread.sleep(2000)
  26.     }
  27.     ssc.stop()
  28.   }
  29. }
复制代码
运行效果

运行效果如图所示:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4