SparkStreaming

打印 上一主题 下一主题

主题 838|帖子 838|积分 2514


第一章 SparkStreaming 概述

1.1 Spark Streaming 是什么

sparkStreaming 用于流式数据处置惩罚,Spark Streaming 支持的数据输入源有很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等举行运算。而效果也能生存在很多地方,如HDFS,数据库等

和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表现,叫作DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而DStream是由这些RDD所组成的序列(因此得名“离散化”)。所以简单来将,DStream就是对RDD在及时数据处置惩罚场景的一种封装。
1.2 Spark Streaming 的特点



  • 易用
  • 容错
  • 易整合到Spark体系
1.3 SparkStreaming 架构

1.3.1 架构图


Spark Streaming架构图

1.3.2 背压机制

Spark Streaming执行过程中 ,由于接收器和执行器不在同一节点,所以无法保证内部数据的同等且高效,所以spark1.5以后Spark Streaming可以动态控制数据接收速率来适配集群数据处置惩罚本领。背压机制(即Spark Streaming Backpressure): 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。
通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。
但是假如是SparkStreaming外部传输数据过快,那就只能增长节点
第二章Dstream入门

2.1WordCount案例

需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计差异单词出现的次数

  • 添加依靠
  1. <dependency>
  2.     <groupId>org.apache.spark</groupId>
  3.     <artifactId>spark-streaming_2.12</artifactId>
  4.     <version>3.0.0</version>
  5. </dependency>
复制代码

  • 编写代码
  1. def main(args: Array[String]): Unit = {
  2.     val conf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
  3.     val ssc = new StreamingContext(conf,Seconds(3))
  4.     //通过监控端口创建Dstream ,读取数据为一行一行
  5.     val lineStreams: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.23.100",9999)
  6.     //切分数据
  7.     val wordStreams: DStream[String] = lineStreams.flatMap(_.split(" "))
  8.     //将单词映射成元组(word,1)并将相同的单词次数做统计
  9.     val wordAndCountStreams: DStream[(String, Int)] = wordStreams.map(
  10.       word => {
  11.         (word, 1)
  12.       }
  13.     ).reduceByKey(_ + _)
  14.     wordAndCountStreams.print()
  15.     //启动SparkStreamingContext
  16.     ssc.start()
  17.     ssc.awaitTermination()
  18.   }
复制代码

  • 启动步伐并通过netcat发送数据:
    nc -lk 9999
    hello spark
第三章 Dstream 创建

3.1 RDD队列

3.1.1 用法及阐明

测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处置惩罚。
3.1.2 案例实操

需求:循环创建几个RDD,将RDD放入队列。通过SparkStream创建Dstream,计算WordCount

  • 编写代码
  1. def main(args: Array[String]): Unit = {
  2.     val conf = new SparkConf().setMaster("local[*]").setAppName("QueueWordCount")
  3.     val ssc = new StreamingContext(conf,Seconds(4))
  4.     // 创建RDD队列
  5.     val rddQueue = new mutable.Queue[RDD[Int]]()
  6.     //创建QueueInputDStream
  7.     val inputStream: InputDStream[Int] = ssc.queueStream(rddQueue,oneAtATime = false)
  8.     //处理队列中的RDD数据
  9.     val reduceStream: DStream[(Int, Int)] = inputStream.map((_,1)).reduceByKey(_+_)
  10.     reduceStream.print()
  11.     //启动SparkStreamingContext
  12.     ssc.start()
  13.     //循环创建RDD并向RDD队列放入
  14.     for (i <- 1 to 5 ){
  15.       val rdd: RDD[Int] = ssc.sparkContext.makeRDD(1 to 100 ,2)
  16.       rddQueue.enqueue(rdd)
  17.       Thread.sleep(2000)
  18.     }
  19.     ssc.awaitTermination()
  20.   }
复制代码
3.2 自界说数据源

3.2.1 用法及阐明

需要继续Receiver,并实现onStart、onStop方法来自界说数据源采集。
3.2.2 案例实操

需求:自界说数据源,实现监控某个端标语,获取该端标语内容。
  1. //自定义数据源
  2. class CustomerReceiver(host: String,port: Int)extends Receiver[String](StorageLevel.MEMORY_ONLY){
  3.   //最初启动的时候,调用该方法,作用为:读取数据并发给Spark
  4.   override def onStart(): Unit = {
  5.     new Thread("Socket Receiver") {
  6.       override def run() {
  7.         receive()
  8.       }
  9.     }.start()
  10.     //读取数据并发给Spark
  11.     def receive():Unit = {
  12.       //创建一个Socket
  13.        val socket = new Socket(host,port)
  14.       //定义一个变量,用来接收端口传过来的数据
  15.       var input : String = null
  16.       //创建一个BufferedReader用于读取端口传来的数据
  17.       val reader = new BufferedReader(new InputStreamReader(socket.getInputStream,StandardCharsets.UTF_8))
  18.       //读取数据
  19.       input = reader.readLine()
  20.       //当receiver没有关闭并且输入数据不为空,则循环发送数据给Spark
  21.       while (!isStopped() && input  != null){
  22.         store(input)
  23.         input = reader.readLine()
  24.       }
  25.       //跳出循环则关闭资源
  26.       reader.close()
  27.       socket.close()
  28.       //重启任务
  29.       restart("restart")
  30.       }
  31.   }
  32.   override def onStop(): Unit = {}
  33. }
  34. //使用数据源
  35. object SparkStreaming03_FlileStream {
  36.   def main(args: Array[String]): Unit = {
  37.     val conf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
  38.     val ssc = new StreamingContext(conf,Seconds(3))
  39.     //创建自定义receiver的Streaming
  40.     val lineStream: ReceiverInputDStream[String] = ssc.receiverStream(new CustomerReceiver("192.168.23.100",9999))
  41.     //处理读到的数据
  42.     val wordStream = lineStream.flatMap(_.split("\t"))
  43.     val reduceStream: DStream[(String, Int)] = wordStream.map((_,1)).reduceByKey(_+_)
  44.     reduceStream.print()
  45.     //启动SparkStreamingContext
  46.     ssc.start()
  47.     ssc.awaitTermination()
  48.   }
复制代码
3.3Kafka数据源(紧张)

3.3.1 版本选型

ReceiverAPI:需要一个专门的Executor去接收数据,然后发送给其他的Executor做计算。存在的问题,接收数据的Executor和计算的Executor速度会有所差异,特殊在接收数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用
DirectAPI:是由计算的Executor来自动消费Kafka的数据,速度由自身控制。
3.3.2Kafka 0-10 Direct模式


  • 需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做计算,终极打印到控制台。
  • 导入依靠
  1. <dependency>
  2.      <groupId>org.apache.spark</groupId>
  3.      <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  4.      <version>3.0.0</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>com.fasterxml.jackson.core</groupId>
  8.     <artifactId>jackson-core</artifactId>
  9.     <version>2.10.1</version>
  10. </dependency>
复制代码

  • 编写代码
  1. def main(args: Array[String]): Unit = {
  2.     val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaStreamWordCount")
  3.     val ssc = new StreamingContext(conf,Seconds(3))
  4.     //读取Kafka数据创建DStream
  5.     val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
  6.       ssc,
  7.       LocationStrategies.PreferConsistent,
  8.       ConsumerStrategies.Subscribe(List("itguigu"), Map(
  9.         "bootstrap.servers" -> "192.168.23.100:9092,192.168.23.110:9092,192.168.23.120:9092",
  10.         "group.id" -> "itguigu",
  11.         "auto.offset.reset" -> "earliest",
  12.         "enable.auto.commit" -> (false:lang.Boolean),
  13.         "key.deserializer" -> classOf[StringDeserializer],
  14.         "value.deserializer" -> classOf[StringDeserializer]
  15.       )))
  16.     //处理读到的数据
  17.     val mapStream: DStream[String] = kafkaDStream.map(
  18.       result => {
  19.         result.value()
  20.       }
  21.     )
  22.     mapStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
  23.     //启动SparkStreamingContext
  24.     ssc.start()
  25.     ssc.awaitTermination()
  26.   }
复制代码
第4章 DStream转换

DStream上的操纵与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,别的转换操纵中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。
4.1 无状态转化操纵

无状态转化操纵就是把简单的RDD转化操纵应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操纵列在了下表中。留意,针对键值对的DStream转化操纵(比如 reduceByKey())要添加import StreamingContext._才能在Scala中使用。

需要留意的是,尽管这些函数看起来像作用在整个流上,但是事实上DStream在内部是由很多RDD(批次)组成,且无状态转化操纵是分别应用到每个RDD上的。
例如:热度测ByKey()会归约每个时间区中的数据,但不会归约差异时间区之间的数据
4.1.1 Transform

Transform 允许DStream上执行恣意的RDD-to-RDD函数,即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API 该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换
  1. def main(args: Array[String]): Unit = {
  2.     val conf = new SparkConf().setMaster("local[*]").setAppName("StreamTransform")
  3.     val ssc = new StreamingContext(conf,Seconds(3))
  4.     //通过监控端口创建Dstream ,读取数据
  5.     val lineDStreams: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.23.100",9999)
  6.    //转换RDD
  7.     val wordCountStreams: DStream[(String, Int)] = lineDStreams.transform(rdd => {
  8.       val words: RDD[String] = rdd.flatMap(_.split(" "))
  9.       words.map((_, 1)).reduceByKey(_ + _)
  10.     })
  11.     wordCountStreams.print()
  12.     //启动SparkStreamingContext
  13.     ssc.start()
  14.     ssc.awaitTermination()
  15.   }
复制代码
说白了,无状态转换就是对每一个RDD举行直接操纵
4.1.2join

两个流之间的join需要两个流的批次巨细同等,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的RDD举行join,与两个RDD的join效果雷同。
  1. def main(args: Array[String]): Unit = {
  2.     val conf = new SparkConf().setMaster("local[*]").setAppName("StreamJoin")
  3.     val ssc = new StreamingContext(conf,Seconds(5))
  4.     //通过监控端口创建Dstream ,读取数据
  5.     val lineDStream1: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.23.100",9999)
  6.     val lineDStream2: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.23.110",8888)
  7.     //将两个流处理为KV类型
  8.     val wordToOneDStream: DStream[(String, Int)] = lineDStream1.flatMap(_.split(" ")).map((_,1))
  9.     val wordToADStream: DStream[(String, String)] = lineDStream2.flatMap(_.split(" ")).map((_,"a"))
  10.     // 流之间进行join
  11.     val joinDStream: DStream[(String, (Int, String))] = wordToOneDStream.join(wordToADStream)
  12.     joinDStream.print()
  13.     //启动SparkStreamingContext
  14.     ssc.start()
  15.     ssc.awaitTermination()
  16.   }
复制代码
4.2 有状态转化操纵

所谓有状态转化,即在RDD操纵时,要用到前面RDD计算的效果,将RDD视为整体,不独立
4.2.1UpdateStateByKey

UpdateStateByKey原语用于记录历史记录,有时,我们需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对情势的DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。
updateStateByKey()的效果会是一个全新的DStream 。其内部的RDD序列是由每个时间区间对应的(键,状态)对组成
updateStateByKey操纵使得我们可以在用新信息举行更新时保持恣意的状态。为使用这个功能,需要做下面两步:

  • 界说状态,状态可以是一个恣意的数据类型。
  • 界说状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态举行更新。
    编写代码:
  1. def main(args: Array[String]): Unit = {
  2.     val conf = new SparkConf().setMaster("local[*]").setAppName("StreamJoin")
  3.     val ssc = new StreamingContext(conf,Seconds(5))
  4.     //如果使用有状态转换,就要设定检查节点,方便保存和计算
  5.     ssc.checkpoint("./ck")
  6.     val DStreams: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.23.100",9999)
  7.     val pairs: DStream[(String, Int)] = DStreams.flatMap(_.split(" ")).map((_,1))
  8.    // 使用updateStateByKey来更新状态,统计从运行开始以来 的单词次数
  9.     val stateDStream: DStream[(String, Int)] = pairs.updateStateByKey[Int](
  10.       (value: Seq[Int], state: Option[Int]) => {
  11.         val valueCount: Int = value.foldLeft(0)(_ + _)
  12.         val oldConut: Int = state.getOrElse(0)
  13.         Some(valueCount + oldConut)
  14.       })
  15.     stateDStream.print()
  16.     //启动SparkStreamingContext
  17.     ssc.start()
  18.     ssc.awaitTermination()
  19.   }
复制代码
4.2.2 WindowOperations

window Operations可以设置窗口巨细和华东窗口隔断来动态获取当前Streaming的运行状态。全部基于窗口的操纵都需要两参数,分别为窗口时长以及滑动步长。


  • 窗口时长 : 计算内容的时间范围
  • 滑动步长 : 隔多久触发一次计算
    ** 留意:这两者都必须为采集周期巨细的整数倍
    如:3秒一个批次,窗口12秒,滑步6秒。
  1. def main(args: Array[String]): Unit = {
  2.     val conf = new SparkConf().setMaster("local[*]").setAppName("StreamJoin")
  3.     val ssc = new StreamingContext(conf,Seconds(3))
  4.     //如果使用有状态转换,就要设定检查节点,方便保存和计算
  5.     ssc.checkpoint("./ck")
  6.     val DStreams: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.23.100",9999)
  7.     val pairs: DStream[(String, Int)] = DStreams.flatMap(_.split(" ")).map((_,1))
  8.     val wordCountByWindow: DStream[(String, Int)] = pairs.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(12),Seconds(6))
  9.     wordCountByWindow.print()
  10.     //启动SparkStreamingContext
  11.     ssc.start()
  12.     ssc.awaitTermination()
  13.   }
复制代码
关于window的操纵还有如下方法:

  • window(windowLength,sideInterval):基于对源DStream窗化的批次举行计算返回一个新的DStream
  • countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;
  • reduceByWindow(func, windowLength, slideInterval): 通过使用自界说函数整合滑动区间流元素来创建一个新的单位素流;
  • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。
  • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的厘革版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并”反向reduce”离开窗口的旧数据来实现这个操纵。一个例子是随着窗口滑动对keys的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于”可逆的reduce函数”,也就是这些reduce函数有相应的”反reduce”函数(以参数invFunc情势传入)。如前述函数,reduce任务的数目通过可选参数来配置。

第五章 DStream输出

输出操纵指定了对流数据经转化操纵得到的数据所要执行的操纵(例如把效果推入外部数据库或输出到屏幕上)。与RDD中的惰性求值类似,假如一个DStream及其派生出的DStream都没有被执行输出操纵,那么这些DStream就都不会被求值。假如StreamingContext中没有设定输出操纵,整个context就都不会启动。
输出操纵如下:


  • print():在运行流步伐的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中,同样的操纵叫print()。
  • saveAsTextFiles(prefix, [suffix]):以text文件情势存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”。
  • saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式将Stream中的数据生存为 SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python中现在不可用。
  • saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据生存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。Python API 中现在不可用。
  • foreachRDD(func):这是最通用的输出操纵,即将函数 func 用于产生于 stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部体系,如将RDD存入文件或者通过网络将其写入数据库。
    通用的输出操纵foreachRDD(),它用来对DStream中的RDD运行恣意计算。这和transform() 有些类似,都可以让我们访问恣意RDD。在foreachRDD()中,可以重用我们在Spark中实现的全部举措操纵。比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中。
    留意:

  • 连接不能写在driver层面(序列化)
  • 假如写在foreach则每个RDD中的每一条数据都创建,得不偿失;
  • 增长foreachPartition,在分区创建(获取)。
第六章 优雅关闭

流式任务需要7*24小时执行,但是有时涉及到升级代码需要自动停止步伐,但是分布式步伐,没办法做到一个个进程去杀死,全部配置优雅的关闭就显得至关紧张了。
使用外部文件体系来控制内部步伐关闭。
MonitorStop:
  1. import java.net.URI
  2. import org.apache.hadoop.conf.Configuration
  3. import org.apache.hadoop.fs.{FileSystem, Path}
  4. import org.apache.spark.streaming.{StreamingContext, StreamingContextState}
  5. class MonitorStop(ssc: StreamingContext) extends Runnable {
  6.   override def run(): Unit = {
  7.     val fs: FileSystem = FileSystem.get(new URI("hdfs://linux1:9000"), new Configuration(), "atguigu")
  8.     while (true) {
  9.       try
  10.         Thread.sleep(5000)
  11.       catch {
  12.         case e: InterruptedException =>
  13.           e.printStackTrace()
  14.       }
  15.       val state: StreamingContextState = ssc.getState
  16.       val bool: Boolean = fs.exists(new Path("hdfs://linux1:9000/stopSpark"))
  17.       if (bool) {
  18.         if (state == StreamingContextState.ACTIVE) {
  19.           ssc.stop(stopSparkContext = true, stopGracefully = true)
  20.           System.exit(0)
  21.         }
  22.       }
  23.     }
  24.   }
  25. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

祗疼妳一个

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表