ToB企服应用市场:ToB评测及商务社交产业平台

标题: SparkStreaming [打印本页]

作者: 祗疼妳一个    时间: 2024-11-13 02:44
标题: SparkStreaming

第一章 SparkStreaming 概述

1.1 Spark Streaming 是什么

sparkStreaming 用于流式数据处置惩罚,Spark Streaming 支持的数据输入源有很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等举行运算。而效果也能生存在很多地方,如HDFS,数据库等

和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表现,叫作DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而DStream是由这些RDD所组成的序列(因此得名“离散化”)。所以简单来将,DStream就是对RDD在及时数据处置惩罚场景的一种封装。
1.2 Spark Streaming 的特点


1.3 SparkStreaming 架构

1.3.1 架构图


Spark Streaming架构图

1.3.2 背压机制

Spark Streaming执行过程中 ,由于接收器和执行器不在同一节点,所以无法保证内部数据的同等且高效,所以spark1.5以后Spark Streaming可以动态控制数据接收速率来适配集群数据处置惩罚本领。背压机制(即Spark Streaming Backpressure): 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。
通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。
但是假如是SparkStreaming外部传输数据过快,那就只能增长节点
第二章Dstream入门

2.1WordCount案例

需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计差异单词出现的次数
  1. <dependency>
  2.     <groupId>org.apache.spark</groupId>
  3.     <artifactId>spark-streaming_2.12</artifactId>
  4.     <version>3.0.0</version>
  5. </dependency>
复制代码
  1. def main(args: Array[String]): Unit = {
  2.     val conf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
  3.     val ssc = new StreamingContext(conf,Seconds(3))
  4.     //通过监控端口创建Dstream ,读取数据为一行一行
  5.     val lineStreams: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.23.100",9999)
  6.     //切分数据
  7.     val wordStreams: DStream[String] = lineStreams.flatMap(_.split(" "))
  8.     //将单词映射成元组(word,1)并将相同的单词次数做统计
  9.     val wordAndCountStreams: DStream[(String, Int)] = wordStreams.map(
  10.       word => {
  11.         (word, 1)
  12.       }
  13.     ).reduceByKey(_ + _)
  14.     wordAndCountStreams.print()
  15.     //启动SparkStreamingContext
  16.     ssc.start()
  17.     ssc.awaitTermination()
  18.   }
复制代码
第三章 Dstream 创建

3.1 RDD队列

3.1.1 用法及阐明

测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处置惩罚。
3.1.2 案例实操

需求:循环创建几个RDD,将RDD放入队列。通过SparkStream创建Dstream,计算WordCount
  1. def main(args: Array[String]): Unit = {
  2.     val conf = new SparkConf().setMaster("local[*]").setAppName("QueueWordCount")
  3.     val ssc = new StreamingContext(conf,Seconds(4))
  4.     // 创建RDD队列
  5.     val rddQueue = new mutable.Queue[RDD[Int]]()
  6.     //创建QueueInputDStream
  7.     val inputStream: InputDStream[Int] = ssc.queueStream(rddQueue,oneAtATime = false)
  8.     //处理队列中的RDD数据
  9.     val reduceStream: DStream[(Int, Int)] = inputStream.map((_,1)).reduceByKey(_+_)
  10.     reduceStream.print()
  11.     //启动SparkStreamingContext
  12.     ssc.start()
  13.     //循环创建RDD并向RDD队列放入
  14.     for (i <- 1 to 5 ){
  15.       val rdd: RDD[Int] = ssc.sparkContext.makeRDD(1 to 100 ,2)
  16.       rddQueue.enqueue(rdd)
  17.       Thread.sleep(2000)
  18.     }
  19.     ssc.awaitTermination()
  20.   }
复制代码
3.2 自界说数据源

3.2.1 用法及阐明

需要继续Receiver,并实现onStart、onStop方法来自界说数据源采集。
3.2.2 案例实操

需求:自界说数据源,实现监控某个端标语,获取该端标语内容。
  1. //自定义数据源
  2. class CustomerReceiver(host: String,port: Int)extends Receiver[String](StorageLevel.MEMORY_ONLY){
  3.   //最初启动的时候,调用该方法,作用为:读取数据并发给Spark
  4.   override def onStart(): Unit = {
  5.     new Thread("Socket Receiver") {
  6.       override def run() {
  7.         receive()
  8.       }
  9.     }.start()
  10.     //读取数据并发给Spark
  11.     def receive():Unit = {
  12.       //创建一个Socket
  13.        val socket = new Socket(host,port)
  14.       //定义一个变量,用来接收端口传过来的数据
  15.       var input : String = null
  16.       //创建一个BufferedReader用于读取端口传来的数据
  17.       val reader = new BufferedReader(new InputStreamReader(socket.getInputStream,StandardCharsets.UTF_8))
  18.       //读取数据
  19.       input = reader.readLine()
  20.       //当receiver没有关闭并且输入数据不为空,则循环发送数据给Spark
  21.       while (!isStopped() && input  != null){
  22.         store(input)
  23.         input = reader.readLine()
  24.       }
  25.       //跳出循环则关闭资源
  26.       reader.close()
  27.       socket.close()
  28.       //重启任务
  29.       restart("restart")
  30.       }
  31.   }
  32.   override def onStop(): Unit = {}
  33. }
  34. //使用数据源
  35. object SparkStreaming03_FlileStream {
  36.   def main(args: Array[String]): Unit = {
  37.     val conf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
  38.     val ssc = new StreamingContext(conf,Seconds(3))
  39.     //创建自定义receiver的Streaming
  40.     val lineStream: ReceiverInputDStream[String] = ssc.receiverStream(new CustomerReceiver("192.168.23.100",9999))
  41.     //处理读到的数据
  42.     val wordStream = lineStream.flatMap(_.split("\t"))
  43.     val reduceStream: DStream[(String, Int)] = wordStream.map((_,1)).reduceByKey(_+_)
  44.     reduceStream.print()
  45.     //启动SparkStreamingContext
  46.     ssc.start()
  47.     ssc.awaitTermination()
  48.   }
复制代码
3.3Kafka数据源(紧张)

3.3.1 版本选型

ReceiverAPI:需要一个专门的Executor去接收数据,然后发送给其他的Executor做计算。存在的问题,接收数据的Executor和计算的Executor速度会有所差异,特殊在接收数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用
DirectAPI:是由计算的Executor来自动消费Kafka的数据,速度由自身控制。
3.3.2Kafka 0-10 Direct模式

  1. <dependency>
  2.      <groupId>org.apache.spark</groupId>
  3.      <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  4.      <version>3.0.0</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>com.fasterxml.jackson.core</groupId>
  8.     <artifactId>jackson-core</artifactId>
  9.     <version>2.10.1</version>
  10. </dependency>
复制代码
  1. def main(args: Array[String]): Unit = {
  2.     val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaStreamWordCount")
  3.     val ssc = new StreamingContext(conf,Seconds(3))
  4.     //读取Kafka数据创建DStream
  5.     val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
  6.       ssc,
  7.       LocationStrategies.PreferConsistent,
  8.       ConsumerStrategies.Subscribe(List("itguigu"), Map(
  9.         "bootstrap.servers" -> "192.168.23.100:9092,192.168.23.110:9092,192.168.23.120:9092",
  10.         "group.id" -> "itguigu",
  11.         "auto.offset.reset" -> "earliest",
  12.         "enable.auto.commit" -> (false:lang.Boolean),
  13.         "key.deserializer" -> classOf[StringDeserializer],
  14.         "value.deserializer" -> classOf[StringDeserializer]
  15.       )))
  16.     //处理读到的数据
  17.     val mapStream: DStream[String] = kafkaDStream.map(
  18.       result => {
  19.         result.value()
  20.       }
  21.     )
  22.     mapStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
  23.     //启动SparkStreamingContext
  24.     ssc.start()
  25.     ssc.awaitTermination()
  26.   }
复制代码
第4章 DStream转换

DStream上的操纵与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,别的转换操纵中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。
4.1 无状态转化操纵

无状态转化操纵就是把简单的RDD转化操纵应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操纵列在了下表中。留意,针对键值对的DStream转化操纵(比如 reduceByKey())要添加import StreamingContext._才能在Scala中使用。

需要留意的是,尽管这些函数看起来像作用在整个流上,但是事实上DStream在内部是由很多RDD(批次)组成,且无状态转化操纵是分别应用到每个RDD上的。
例如:热度测ByKey()会归约每个时间区中的数据,但不会归约差异时间区之间的数据
4.1.1 Transform

Transform 允许DStream上执行恣意的RDD-to-RDD函数,即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API 该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换
  1. def main(args: Array[String]): Unit = {
  2.     val conf = new SparkConf().setMaster("local[*]").setAppName("StreamTransform")
  3.     val ssc = new StreamingContext(conf,Seconds(3))
  4.     //通过监控端口创建Dstream ,读取数据
  5.     val lineDStreams: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.23.100",9999)
  6.    //转换RDD
  7.     val wordCountStreams: DStream[(String, Int)] = lineDStreams.transform(rdd => {
  8.       val words: RDD[String] = rdd.flatMap(_.split(" "))
  9.       words.map((_, 1)).reduceByKey(_ + _)
  10.     })
  11.     wordCountStreams.print()
  12.     //启动SparkStreamingContext
  13.     ssc.start()
  14.     ssc.awaitTermination()
  15.   }
复制代码
说白了,无状态转换就是对每一个RDD举行直接操纵
4.1.2join

两个流之间的join需要两个流的批次巨细同等,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的RDD举行join,与两个RDD的join效果雷同。
  1. def main(args: Array[String]): Unit = {
  2.     val conf = new SparkConf().setMaster("local[*]").setAppName("StreamJoin")
  3.     val ssc = new StreamingContext(conf,Seconds(5))
  4.     //通过监控端口创建Dstream ,读取数据
  5.     val lineDStream1: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.23.100",9999)
  6.     val lineDStream2: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.23.110",8888)
  7.     //将两个流处理为KV类型
  8.     val wordToOneDStream: DStream[(String, Int)] = lineDStream1.flatMap(_.split(" ")).map((_,1))
  9.     val wordToADStream: DStream[(String, String)] = lineDStream2.flatMap(_.split(" ")).map((_,"a"))
  10.     // 流之间进行join
  11.     val joinDStream: DStream[(String, (Int, String))] = wordToOneDStream.join(wordToADStream)
  12.     joinDStream.print()
  13.     //启动SparkStreamingContext
  14.     ssc.start()
  15.     ssc.awaitTermination()
  16.   }
复制代码
4.2 有状态转化操纵

所谓有状态转化,即在RDD操纵时,要用到前面RDD计算的效果,将RDD视为整体,不独立
4.2.1UpdateStateByKey

UpdateStateByKey原语用于记录历史记录,有时,我们需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对情势的DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。
updateStateByKey()的效果会是一个全新的DStream 。其内部的RDD序列是由每个时间区间对应的(键,状态)对组成
updateStateByKey操纵使得我们可以在用新信息举行更新时保持恣意的状态。为使用这个功能,需要做下面两步:
  1. def main(args: Array[String]): Unit = {
  2.     val conf = new SparkConf().setMaster("local[*]").setAppName("StreamJoin")
  3.     val ssc = new StreamingContext(conf,Seconds(5))
  4.     //如果使用有状态转换,就要设定检查节点,方便保存和计算
  5.     ssc.checkpoint("./ck")
  6.     val DStreams: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.23.100",9999)
  7.     val pairs: DStream[(String, Int)] = DStreams.flatMap(_.split(" ")).map((_,1))
  8.    // 使用updateStateByKey来更新状态,统计从运行开始以来 的单词次数
  9.     val stateDStream: DStream[(String, Int)] = pairs.updateStateByKey[Int](
  10.       (value: Seq[Int], state: Option[Int]) => {
  11.         val valueCount: Int = value.foldLeft(0)(_ + _)
  12.         val oldConut: Int = state.getOrElse(0)
  13.         Some(valueCount + oldConut)
  14.       })
  15.     stateDStream.print()
  16.     //启动SparkStreamingContext
  17.     ssc.start()
  18.     ssc.awaitTermination()
  19.   }
复制代码
4.2.2 WindowOperations

window Operations可以设置窗口巨细和华东窗口隔断来动态获取当前Streaming的运行状态。全部基于窗口的操纵都需要两参数,分别为窗口时长以及滑动步长。

  1. def main(args: Array[String]): Unit = {
  2.     val conf = new SparkConf().setMaster("local[*]").setAppName("StreamJoin")
  3.     val ssc = new StreamingContext(conf,Seconds(3))
  4.     //如果使用有状态转换,就要设定检查节点,方便保存和计算
  5.     ssc.checkpoint("./ck")
  6.     val DStreams: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.23.100",9999)
  7.     val pairs: DStream[(String, Int)] = DStreams.flatMap(_.split(" ")).map((_,1))
  8.     val wordCountByWindow: DStream[(String, Int)] = pairs.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(12),Seconds(6))
  9.     wordCountByWindow.print()
  10.     //启动SparkStreamingContext
  11.     ssc.start()
  12.     ssc.awaitTermination()
  13.   }
复制代码
关于window的操纵还有如下方法:
第五章 DStream输出

输出操纵指定了对流数据经转化操纵得到的数据所要执行的操纵(例如把效果推入外部数据库或输出到屏幕上)。与RDD中的惰性求值类似,假如一个DStream及其派生出的DStream都没有被执行输出操纵,那么这些DStream就都不会被求值。假如StreamingContext中没有设定输出操纵,整个context就都不会启动。
输出操纵如下:

第六章 优雅关闭

流式任务需要7*24小时执行,但是有时涉及到升级代码需要自动停止步伐,但是分布式步伐,没办法做到一个个进程去杀死,全部配置优雅的关闭就显得至关紧张了。
使用外部文件体系来控制内部步伐关闭。
MonitorStop:
  1. import java.net.URI
  2. import org.apache.hadoop.conf.Configuration
  3. import org.apache.hadoop.fs.{FileSystem, Path}
  4. import org.apache.spark.streaming.{StreamingContext, StreamingContextState}
  5. class MonitorStop(ssc: StreamingContext) extends Runnable {
  6.   override def run(): Unit = {
  7.     val fs: FileSystem = FileSystem.get(new URI("hdfs://linux1:9000"), new Configuration(), "atguigu")
  8.     while (true) {
  9.       try
  10.         Thread.sleep(5000)
  11.       catch {
  12.         case e: InterruptedException =>
  13.           e.printStackTrace()
  14.       }
  15.       val state: StreamingContextState = ssc.getState
  16.       val bool: Boolean = fs.exists(new Path("hdfs://linux1:9000/stopSpark"))
  17.       if (bool) {
  18.         if (state == StreamingContextState.ACTIVE) {
  19.           ssc.stop(stopSparkContext = true, stopGracefully = true)
  20.           System.exit(0)
  21.         }
  22.       }
  23.     }
  24.   }
  25. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4