Spark-Streaming有状态计算

打印 上一主题 下一主题

主题 1728|帖子 1728|积分 5184

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
一、上下文

《Spark-Streaming初识》中的NetworkWordCount示例只能统计每个微批下的单词的数量,那么如何才能统计从开始加载数据到当下的所有数量呢?下面我们就来通过官方例子学习下Spark-Streaming有状态计算。
二、官方例子

所属包:org.apache.spark.examples.streaming
  1. object StatefulNetworkWordCount {
  2.   def main(args: Array[String]): Unit = {
  3.     if (args.length < 2) {
  4.       System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")
  5.       System.exit(1)
  6.     }
  7.     StreamingExamples.setStreamingLogLevels()
  8.     val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
  9.     //创建微批为 1 秒的上下文
  10.     val ssc = new StreamingContext(sparkConf, Seconds(1))
  11.     //指定 checkpoint 目录
  12.     ssc.checkpoint(".")
  13.     // 用一个 List 初始化一个 RDD
  14.     val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
  15.     // 在目标ip:port上创建一个ReceiverInputDStream,并对分隔测试的输入流中的单词进行计数(例如由'nc'生成)
  16.     val lines = ssc.socketTextStream(args(0), args(1).toInt)
  17.     val words = lines.flatMap(_.split(" "))
  18.     val wordDstream = words.map(x => (x, 1))
  19.     // 使用mapWithState更新累积计数这将给出一个由状态组成的DStream(即单词的累积计数)
  20.     val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
  21.       val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
  22.       val output = (word, sum)
  23.       state.update(sum)
  24.       output
  25.     }
  26.     val stateDstream = wordDstream.mapWithState(
  27.       StateSpec.function(mappingFunc).initialState(initialRDD))
  28.     stateDstream.print()
  29.     ssc.start()
  30.     ssc.awaitTermination()
  31.   }
  32. }
复制代码
三、分析

1、构建SparkConf

它是Spark应用程序的配置,用于设置Spark的各种参数。支持链式设置
  1. new SparkConf().setMaster("local").setAppName("My app")
复制代码
 一旦SparkConf对象通报给Spark,用户就不能再对其进行修改。Spark不支持在运行时修改配置
2、构建StreamingContext

它是Spark Streaming功能的主要入口点,且提供了从各种输入源创建[[org.apache.spark.streaming.dstream.DStream]] 的方法。
创建和转换DStreams后,可以分别使用start()、stop()启动和停止流计算,awaitTermination()答应当前线程通过stop()或非常等候上下文的停止。
3、设置checkpoint

StreamingContext最终还是通过SparkContext来设置checkpoint,但其实都是为各自的checkpointDir设置checkpoint路径,在有状态计算中checkpoint是必须的。
所谓有状态计算就必须要把历史状态给存储下来,spark中使用使用checkpoint来实现这个存储,每个微批的数据的计算都要更新到历史状态中。
  1. class SparkContext(config: SparkConf) extends Logging {
  2.   private[spark] var checkpointDir: Option[String] = None
  3. }
复制代码
  1. class StreamingContext private[streaming] (
  2.     _sc: SparkContext,
  3.     _cp: Checkpoint,
  4.     _batchDur: Duration
  5.   ) extends Logging {
  6.   private[streaming] var checkpointDir: String = {
  7.     if (isCheckpointPresent) {
  8.       sc.setCheckpointDir(_cp.checkpointDir)
  9.       _cp.checkpointDir
  10.     } else {
  11.       null
  12.     }
  13.   }
  14. }
复制代码
4、初始化一个RDD

为什么要初始化一个RDD呢?我们看看下面是如何用到的。
5、创建一个ReceiverInputDStream

这里是从TCP源hostname:port创建输入流。使用TCP套接字接收数据,并使用给定的转换器将接收字节解释为对象
6、处理单词
从源码中可以看出会把这样的文本
   hadoop spark flink kafka hadoop spark-streaming
  处理成这样的格式
   hadoop 1
  spark 1
  flink 1
  kafka 1
  hadoop 1
  spark-streaming 1
  6、使用mapWithState更新累积计数

该算子可以维护并更新每个key的状态。
这里用到一个新对象:StateSpec,且用到了它的两个方法,initialState和function
initialState:设置包罗“mapWithState”将使用的初始状态的RDD`
function:设置实际的状态更新操作
  1. //第1个参数:状态 key 的类别
  2. //第2个参数:状态 value 的类别
  3. //第3个参数:状态 数据 的类别
  4. //第4个参数:状态 处理完要返回 的类别
  5. def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = {
  6.   // 使用state.exists()、state.get()、state.update()和state.remove()来管理状态,并返回必要的字符串
  7. }
复制代码
四、运行

运行Netcat
   nc -lk 9999
  新建一个窗口运行官方例子
   cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/
bin/run-example org.apache.spark.examples.streaming.StatefulNetworkWordCount cdh1 9999
  




   大多数高校硕博生毕业要求必要参加学术会议,发表EI大概SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:
  第四届大数据、信息与计算机网络国际学术会议(BDICN 2025)



  • 广州
  • https://ais.cn/u/fi2yym
第四届电子信息工程、大数据与计算机技术国际学术会议(EIBDCT 2025)



  • 青岛
  • https://ais.cn/u/nuQr6f
第六届大数据与信息化教诲国际学术会议(ICBDIE 2025)



  • 苏州
  • https://ais.cn/u/eYnmQr
第三届通讯网络与机器学习国际学术会议(CNML 2025)



  • 南京
  • https://ais.cn/u/vUNva2

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

曂沅仴駦

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表