大数据-101 Spark Streaming DStream转换 窗口利用状态 跟踪利用 附带多个 ...

打印 上一主题 下一主题

主题 879|帖子 879|积分 2637

点一下关注吧!!!非常感谢!!连续更新!!!

现在已经更新到了:



  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)
章节内容

上节我们完成了如下的内容:


  • Spark Streaming DStream 转换函数
  • DStream 无状态转换
  • DStream 无状态转换 案例

转换方式

有两个类型:


  • 无状态转换(已经完成)
  • 有状态转换
接下来开始有状态转换。
有状态转换

有状态转换主要有两种:


  • 窗口利用
  • 状态跟踪利用
窗口利用

Window Operations 可以设置窗口巨细和滑动窗口间隔来动态获取当前Streaming的状态
基于窗口的利用会在一个比 StreamingContext 的 batchDuration(批次间隔)更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。

基于窗口的利用必要两个参数:


  • 窗口长度(Window Duration):控制每次计算近来的多少个批次的数据
  • 滑动间隔(Slide Duration):用来控制对新的 DStream 举行计算的间隔
两者都必须是StreamingContext中批次间隔(batchDuration)的整数倍
准备编码

我们先编写一个每秒发送一个数字:
  1. package icu.wzk
  2. import java.io.PrintWriter
  3. import java.net.{ServerSocket, Socket}
  4. object SocketWithWindow {
  5.   def main(args: Array[String]): Unit = {
  6.     val port = 9999
  7.     val ss = new ServerSocket(port)
  8.     val socket: Socket = ss.accept()
  9.     var i = 0
  10.     while (true) {
  11.       i += 1
  12.       val out = new PrintWriter(socket.getOutputStream)
  13.       out.println(i)
  14.       out.flush()
  15.       Thread.sleep(1000)
  16.     }
  17.   }
  18. }
复制代码
[窗口利用] 案例2观察窗口数据



  • 观察窗口的数据
  • 观察 batchDuration、windowDuration、slideDuration 三者之间的关系
  • 使用窗口相关的利用
编写代码

  1. package icu.wzk
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  4. import org.apache.spark.streaming.{Seconds, StreamingContext}
  5. object WindowDemo {
  6.   def main(args: Array[String]): Unit = {
  7.     val conf = new SparkConf()
  8.       .setAppName("WindowDemo")
  9.       .setMaster("local[*]")
  10.     val ssc = new StreamingContext(conf, Seconds(5))
  11.     ssc.sparkContext.setLogLevel("WARN")
  12.     val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
  13.     lines.foreachRDD {
  14.       (rdd, time) => {
  15.         println(s"rdd = ${rdd.id}; time = $time")
  16.       }
  17.         rdd.foreach(value => println(value))
  18.     }
  19.     // 20秒窗口长度(DS包含窗口长度范围内的数据)
  20.     // 10秒滑动间隔(多次时间处理一次数据)
  21.     val res1: DStream[String] = lines
  22.       .reduceByWindow(_ + " " + _, Seconds(20), Seconds(10))
  23.     res1.print()
  24.     val res2: DStream[String] = lines
  25.       .reduceByWindow(_ + _, Seconds(20), Seconds(10))
  26.     res2.print()
  27.     // 求窗口元素的和
  28.     val res3: DStream[Int] = lines
  29.       .map(_.toInt)
  30.       .reduceByWindow(_ + _, Seconds(20), Seconds(10))
  31.     res3.print()
  32.     // 请窗口元素和
  33.     val res4 = res2.map(_.toInt).reduce(_ + _)
  34.     res4.print()
  35.     // 程序启动
  36.     ssc.start()
  37.     ssc.awaitTermination()
  38.   }
  39. }
复制代码
运行结果

  1. -------------------------------------------
  2. Time: 1721628860000 ms
  3. -------------------------------------------
  4. rdd = 39; time = 1721628865000 ms
  5. rdd = 40; time = 1721628870000 ms
  6. -------------------------------------------
  7. Time: 1721628870000 ms
  8. -------------------------------------------
  9. -------------------------------------------
  10. Time: 1721628870000 ms
  11. -------------------------------------------
  12. -------------------------------------------
  13. Time: 1721628870000 ms
  14. -------------------------------------------
复制代码
运行之后控制截图如下:

[窗口利用] 案例3 热门搜刮词实时统计

编写代码

  1. package icu.wzk
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  4. import org.apache.spark.streaming.{Seconds, StreamingContext}
  5. object HotWordStats {
  6.   def main(args: Array[String]): Unit = {
  7.     val conf = new SparkConf()
  8.       .setAppName("HotWordStats")
  9.       .setMaster("local[*]")
  10.     val ssc = new StreamingContext(conf, Seconds(2))
  11.     // 检查点设置 也可以设置到 HDFS
  12.     ssc.sparkContext.setLogLevel("ERROR")
  13.     ssc.checkpoint("checkpoint")
  14.     val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
  15.     val words: DStream[String] = lines.flatMap(_.split("\\s+"))
  16.     val pairs: DStream[(String, Int)] = words.map(x => (x, 1))
  17.     // 通过 reduceByKeyAndWindow算子 每隔10秒统计最近20秒的词出现的的次数
  18.     val wordCounts1: DStream[(String, Int)] = pairs
  19.       .reduceByKeyAndWindow(
  20.         (a: Int, b: Int) => a + b, Seconds(20), Seconds(10), 2
  21.       )
  22.     wordCounts1.print()
  23.     // 需要CheckPoint的支持
  24.     val wordCounts2: DStream[(String, Int)] = pairs
  25.       .reduceByKeyAndWindow(
  26.         _ + _, _ - _, Seconds(20), Seconds(10), 2
  27.       )
  28.     wordCounts2.print()
  29.     // 运行程序
  30.     ssc.start()
  31.     ssc.awaitTermination()
  32.   }
  33. }
复制代码
运行结果

  1. -------------------------------------------
  2. Time: 1721629842000 ms
  3. -------------------------------------------
  4. (4,1)
  5. (8,1)
  6. (6,1)
  7. (2,1)
  8. (7,1)
  9. (5,1)
  10. (3,1)
  11. (1,1)
  12. -------------------------------------------
  13. Time: 1721629842000 ms
  14. --------------------
复制代码
运行结果如下图:

[状态追踪利用] updateStateByKey

UpdateStateByKey的主要功能:


  • 为Streaming中每一个Key维护一份State状态,state类型可以是任意类型的,可以是自界说对象,更新函数也可以是自界说的
  • 通过更新函数对该Key的状态不断更新,对于每个新的batch而言,Spark Streaming会在使用updateStateByKey的时候已经存在的key举行state状态更新
  • 使用updateStateByKey时要开启 CheckPoint 功能
编写代码1

流式步伐启动后计算wordcount的累计值,将每个批次的结果保存到文件
  1. package icu.wzk
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  4. import org.apache.spark.streaming.{Seconds, StreamingContext}
  5. object StateTracker1 {
  6.   def main(args: Array[String]): Unit = {
  7.     val conf = new SparkConf()
  8.       .setAppName("StateTracker1")
  9.       .setMaster("local[*]")
  10.     val ssc = new StreamingContext(conf, Seconds(5))
  11.     ssc.sparkContext.setLogLevel("ERROR")
  12.     ssc.checkpoint("checkpoint")
  13.     val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
  14.     val words: DStream[String] = lines.flatMap(_.split("\\s+"))
  15.     val wordDStream: DStream[(String, Int)] = words.map(x => (x, 1))
  16.     // 定义状态更新函数
  17.     // 函数常量定义 返回类型是 Some(Int),表示的含义是最新状态
  18.     // 函数的功能是将当前时间间隔内产生的Key的Value集合,加到上一个状态中,得到最新状态
  19.     val updateFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {
  20.       // 通过Spark内部的reduceByKey按Key规约,然后这里传入某Key当前批次的Seq,再计算当前批次的总和
  21.       val currentCount = currValues.sum
  22.       // 已累加的值
  23.       val previousCount = prevValueState.getOrElse(0)
  24.       Some(currentCount + previousCount)
  25.     }
  26.     val stateDStream: DStream[(String, Int)] = wordDStream.updateStateByKey[Int](updateFunc)
  27.     stateDStream.print()
  28.     // 把DStream保存到文本文件中 会生成很多的小文件 一个批次生成一个目录
  29.     val outputDir = "output1"
  30.     stateDStream
  31.       .repartition(1)
  32.       .saveAsTextFiles(outputDir)
  33.     // 开始运行
  34.     ssc.start()
  35.     ssc.awaitTermination()
  36.   }
  37. }
复制代码
运行结果1

  1. -------------------------------------------
  2. Time: 1721631080000 ms
  3. -------------------------------------------
  4. (1,1)
  5. (2,1)
  6. (3,1)
  7. -------------------------------------------
  8. Time: 1721631085000 ms
  9. -------------------------------------------
  10. (8,1)
  11. (1,1)
  12. (2,1)
  13. (3,1)
  14. (4,1)
  15. (5,1)
  16. (6,1)
  17. (7,1)
复制代码
运行结果是:

统计全局的Key的状态,但是就算没有数据输入,也会在每一个批次的时候返回之前的Key的状态。
如许的缺点:


  • 如果数据量很大的话,CheckPoint数据会占用较大存储,而且服从也不高
编写代码2

mapWithState:也是用于全局统计Key的状态,如果没有数据输入,便不会返回之前的Key的状态,有一点增量的感觉。
如许做的利益是,只关心那些已经发生的变革的Key,对于没有数据输入,则不会返回那些没有变革的Key的数据,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储。
  1. package icu.wzk
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  4. import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
  5. object StateTracker2 {
  6.   def main(args: Array[String]): Unit = {
  7.     val conf: SparkConf = new SparkConf()
  8.       .setAppName("StateTracker2")
  9.       .setMaster("local[*]")
  10.     val ssc = new StreamingContext(conf, Seconds(2))
  11.     ssc.sparkContext.setLogLevel("ERROR")
  12.     ssc.checkpoint("checkpoint")
  13.     val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
  14.     val words: DStream[String] = lines.flatMap(_.split("\\s+"))
  15.     val wordDStream: DStream[(String, Int)] = words.map(x => (x, 1))
  16.     def mappingFunction(key: String, one: Option[Int], state: State[Int]): (String, Int) = {
  17.       val sum: Int = one.getOrElse(0) + state.getOption.getOrElse(0)
  18.       state.update(sum)
  19.       (key, sum)
  20.     }
  21.     val spec = StateSpec.function(mappingFunction _)
  22.     val resultDStream: DStream[(String, Int)] = wordDStream.mapWithState(spec)
  23.     resultDStream.cache()
  24.     // 把DStream保存到文本文件中,会生成很多的小文件。一个批次生成一个目录
  25.     val outputDir = "output2"
  26.     resultDStream.repartition(1).saveAsTextFiles(outputDir)
  27.     ssc.start()
  28.     ssc.awaitTermination()
  29.   }
  30. }
复制代码
运行代码



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

光之使者

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表