SparkStraming

打印 上一主题 下一主题

主题 571|帖子 571|积分 1713

SparkStraming 3.0.0

【Spark Version:3.0.0】
【Scala Version:2.12】
第 1 章 SparkStreaming 概述

1.1 Spark Streaming 是什么


Spark Streaming 用于流式数据的处置惩罚。Spark Streaming 支持的数据输入源很多,比方:Kafka、Flume、Twitter、ZeroMQ 和简朴的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语,如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。

和 Spark 基于 RDD 的概念很相似,Spark Streaming 利用离散化流(discretized stream)作为抽象表示,叫作 DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所构成的序列(因此得名“离散化”)。所以简朴来将,DStream 就是对 RDD 在及时数据处置惩罚场景的一种封装。
FYI:通俗来讲,就是我们俗称的微批处置惩罚,与Flink流式处置惩罚框架的根本区别便在于此。
1.2 Spark Streaming 的特点

➢ 易用 Spark Streaming将Spark与语言集成的API引入到流处置惩罚中,可以像写批处置惩罚作业一样写流处置惩罚作业。支持Java、Scala和Python。
➢ 容错 有开箱即用的Exactly-once语义,联合Checkpoint实现流式处置惩罚中的高度容错本领。
➢ 易整合到 Spark 体系 通过在Spark上运行,Spark Streaming允许你重用相同的代码来进行批处置惩罚、将流与汗青数据关联,大概对流状态进行暂时查询,构建强大的交互式应用步伐。即:流批利用相同的逻辑和API,减少开和运维成本。
FYI:Streaming未能实现流批一体,但在背面Spark Structured Streaming发布时增补了这一缺陷,但我更喜欢Flink~
1.3 Spark Streaming 架构

1.3.1 架构图

➢ 团体架构图

➢ SparkStreaming 架构图

1.3.2 背压机制

Spark 1.5 以前版本,用户如果要限制 Receiver 的数据吸收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制吸收速率,来适配当前的处置惩罚本领,防止内存溢出,但也会引入其它问题。比如:producer 数据生产高于 maxRate,当前集群处置惩罚本领也高于 maxRate,这就会造成资源利用率下降等问题。
为了更好的协调数据吸收速率与资源处置惩罚本领,1.5 版本开始 Spark Streaming 可以动态控制数据吸收速率来适配集群数据处置惩罚本领。背压机制(即 Spark Streaming Backpressure): 根据JobScheduler 反馈作业的执行信息来动态调解 Receiver 数据吸收率。
通过属性“spark.streaming.backpressure.enabled”来控制是否启用 backpressure 机制,默认值false,即不启用。
第 2 章 Dstream 入门

2.1 WordCount 案例实操

➢ 需求:利用 netcat 工具向 9999 端口不停的发送数据,通过 SparkStreaming 读取端口数据并
统计不同单词出现的次数

  • 添加依赖
  1. <dependency>
  2.         <groupId>org.apache.spark</groupId>
  3.          <artifactId>spark-streaming_2.12</artifactId>
  4.          <version>3.0.0</version>
  5. </dependency>
复制代码

  • 编写代码
  1. object StreamWordCount {
  2. def main(args: Array[String]): Unit = {
  3. //1.初始化 Spark 配置信息
  4. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
  5. //2.初始化 SparkStreamingContext
  6. val ssc = new StreamingContext(sparkConf, Seconds(3))
  7. //3.通过监控端口创建 DStream,读进来的数据为一行行
  8. val lineStreams = ssc.socketTextStream("linux1", 9999)
  9. //将每一行数据做切分,形成一个个单词
  10. val wordStreams = lineStreams.flatMap(_.split(" "))
  11. //将单词映射成元组(word,1)
  12. val wordAndOneStreams = wordStreams.map((_, 1))
  13. //将相同的单词次数做统计
  14. val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_)
  15. //打印
  16. wordAndCountStreams.print()
  17. //启动 SparkStreamingContext
  18. ssc.start()
  19. ssc.awaitTermination()
  20. }
  21. }
复制代码

  • 启动步伐并通过 netcat 发送数据:
  1.         nc -lk 9999
  2.         hello spark
复制代码
2.2 WordCount 解析

Discretized Stream 是 Spark Streaming 的底子抽象,代表持续性的数据流和经过各种 Spark 原语操纵后的结果数据流。在内部实现上,DStream 是一系列连续的 RDD 来表示。每个 RDD 含有一段时间间隔内的数据。

对数据的操纵也是按照 RDD 为单元来进行的
盘算过程由 Spark Engine 来完成

第 3 章 DStreams 创建

3.1 RDD 队列

3.1.1 用法及说明

测试过程中,可以通过利用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到这个队列中的 RDD,都会作为一个 DStream 处置惩罚。
3.1.2 案例实操

➢ 需求:循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,盘算WordCount。

  • 编写代码
  1. object RDDStream {
  2. def main(args: Array[String]) {
  3. //1.初始化 Spark 配置信息
  4. val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")
  5. //2.初始化 SparkStreamingContext
  6. val ssc = new StreamingContext(conf, Seconds(4))
  7. //3.创建 RDD 队列
  8. val rddQueue = new mutable.Queue[RDD[Int]]()
  9. //4.创建 QueueInputDStream
  10. val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)
  11. //5.处理队列中的 RDD 数据
  12. val mappedStream = inputStream.map((_,1))
  13. val reducedStream = mappedStream.reduceByKey(_ + _)
  14. //6.打印结果
  15. reducedStream.print()
  16. //7.启动任务
  17. ssc.start()
  18. //8.循环创建并向 RDD 队列中放入 RDD
  19. for (i <- 1 to 5) {
  20. rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
  21. Thread.sleep(2000)
  22. }
  23. ssc.awaitTermination()
  24. }
  25. }
复制代码

  • 结果展示
  1. -------------------------------------------
  2. Time: 1539075280000 ms
  3. -------------------------------------------
  4. (4,60)
  5. (0,60)
  6. (6,60)
  7. (8,60)
  8. (2,60)
  9. (1,60)
  10. (3,60)
  11. (7,60)
  12. (9,60)
  13. (5,60)
  14. -------------------------------------------
  15. Time: 1539075284000 ms
  16. -------------------------------------------
  17. (4,60)
  18. (0,60)
  19. (6,60)
  20. (8,60)
  21. (2,60)
  22. (1,60)
  23. (3,60)
  24. (7,60)
  25. (9,60)
  26. (5,60)
  27. -------------------------------------------
  28. Time: 1539075288000 ms
  29. -------------------------------------------
  30. (4,30)
  31. (0,30)
  32. (6,30)
  33. (8,30)
  34. (2,30)
  35. (1,30)
  36. (3,30)
  37. (7,30)
  38. (9,30)
  39. (5,30)
  40. -------------------------------------------
  41. Time: 1539075292000 ms
  42. -------------------------------------------
复制代码
3.2 自定义数据源

3.2.1 用法及说明

必要继续 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。
3.2.2 案例实操

需求:自定义数据源,实现监控某个端标语,获取该端标语内容

  • 自定义数据源
  1. class CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
  2. //最初启动的时候,调用该方法,作用为:读数据并将数据发送给 Spark
  3. override def onStart(): Unit = {
  4. new Thread("Socket Receiver") {
  5. override def run() {
  6. receive()
  7. }
  8. }.start()
  9. }
  10. //读数据并将数据发送给 Spark
  11. def receive(): Unit = {
  12. //创建一个 Socket
  13. var socket: Socket = new Socket(host, port)
  14. //定义一个变量,用来接收端口传过来的数据
  15. var input: String = null
  16. //创建一个 BufferedReader 用于读取端口传来的数据
  17. val reader = new BufferedReader(new InputStreamReader(socket.getInputStream,
  18. StandardCharsets.UTF_8))
  19. //读取数据
  20. input = reader.readLine()
  21. //当 receiver 没有关闭并且输入数据不为空,则循环发送数据给 Spark
  22. while (!isStopped() && input != null) {
  23. store(input)
  24. input = reader.readLine()
  25. }
  26. //跳出循环则关闭资源
  27. reader.close()
  28. socket.close()
  29. //重启任务
  30. restart("restart")
  31. }
  32. override def onStop(): Unit = {}
  33. }
复制代码

  • 利用自定义的数据源采集数据
  1. object FileStream {
  2. def main(args: Array[String]): Unit = {
  3. //1.初始化 Spark 配置信息
  4. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
  5. //2.初始化 SparkStreamingContext
  6. val ssc = new StreamingContext(sparkConf, Seconds(5))
  7. //3.创建自定义 receiver 的 Streaming
  8. val lineStream = ssc.receiverStream(new CustomerReceiver("hadoop102", 9999))
  9. //4.将每一行数据做切分,形成一个个单词
  10. val wordStream = lineStream.flatMap(_.split("\t"))
  11. //5.将单词映射成元组(word,1)
  12. val wordAndOneStream = wordStream.map((_, 1))
  13. //6.将相同的单词次数做统计
  14. val wordAndCountStream = wordAndOneStream.reduceByKey(_+_)
  15. //7.打印
  16. wordAndCountStream.print()
  17. //8.启动 SparkStreamingContext
  18. ssc.start()
  19. ssc.awaitTermination()
  20. }
  21. }
复制代码
linux开放监听端口: nc -lk 9999
linux开放监听端口: nc -lp 9999
3.3 Kafka 数据源(面试、开发重点)

3.3.1 版本选型

ReceiverAPI:必要一个专门的 Executor 去吸收数据,然后发送给其他的 Executor 做盘算。存在的问题,吸收数据的 Executor 和盘算的 Executor 速率会有所不同,特别在吸收数据的 Executor速率大于盘算的 Executor 速率,会导致盘算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用。
DirectAPI:是由盘算的 Executor 来主动消费 Kafka 的数据,速率由自身控制。
3.3.2 Kafka 0-8 Receiver 模式(看看就行,缺点太大,现实项目不消。当前版本不适用,所以必要引入低版本的依赖包)

1) 需求:通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简朴盘算,终极打印到控制台。
2)导入依赖
  1. <dependency>
  2.     <groupId>org.apache.spark</groupId>
  3.     <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
  4.         <version>2.4.5</version>
  5. </dependency>
复制代码
3)编写代码
  1. package com.atguigu.kafka
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.streaming.dstream.ReceiverInputDStream
  4. import org.apache.spark.streaming.kafka.KafkaUtils
  5. import org.apache.spark.streaming.{Seconds, StreamingContext}
  6. object ReceiverAPI {
  7. def main(args: Array[String]): Unit = {
  8. //1.创建 SparkConf
  9. val sparkConf: SparkConf = new SparkConf().setAppName("ReceiverWordCount").setMaster("local[*]")
  10. //2.创建 StreamingContext
  11. val ssc = new StreamingContext(sparkConf, Seconds(3))
  12. //3.读取 Kafka 数据创建 DStream(基于 Receive 方式)
  13. val kafkaDStream: ReceiverInputDStream[(String, String)] =
  14. KafkaUtils.createStream(ssc,
  15. "linux1:2181,linux2:2181,linux3:2181",
  16. "atguigu",
  17. Map[String, Int]("atguigu" -> 1))
  18. //4.计算 WordCount
  19. kafkaDStream.map { case (_, value) =>
  20. (value, 1)}
  21. .reduceByKey(_ + _)
  22. .print()
  23. //5.开启任务
  24. ssc.start()
  25. ssc.awaitTermination()
  26. }
  27. }
复制代码
3.3.3 Kafka 0-8 Direct 模式(当前版本不适用,看看就行)

1)需求:通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简朴盘算,终极打印
到控制台。
2)导入依赖
  1. <dependency>
  2.         <groupId>org.apache.spark</groupId>
  3.          <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
  4.         <version>2.4.5</version>
  5. </dependency>
复制代码
3)编写代码(自动维护 offset)
  1. import kafka.serializer.StringDecoder
  2. import org.apache.kafka.clients.consumer.ConsumerConfig
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.streaming.dstream.InputDStream
  5. import org.apache.spark.streaming.kafka.KafkaUtils
  6. import org.apache.spark.streaming.{Seconds, StreamingContext}
  7. object DirectAPIAuto02 {
  8. val getSSC1: () => StreamingContext = () => {
  9. val sparkConf: SparkConf = new SparkConf().setAppName("ReceiverWordCount").setMaster("local[*]")
  10. val ssc = new StreamingContext(sparkConf, Seconds(3))
  11. ssc
  12. }
  13. def getSSC: StreamingContext = {
  14. //1.创建 SparkConf
  15. val sparkConf: SparkConf = new SparkConf().setAppName("ReceiverWordCount").setMaster("local[*]")
  16. //2.创建 StreamingContext
  17. val ssc = new StreamingContext(sparkConf, Seconds(3))
  18. //设置 CK
  19. ssc.checkpoint("./ck2")
  20. //3.定义 Kafka 参数
  21. val kafkaPara: Map[String, String] = Map[String, String](
  22. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->
  23. "linux1:9092,linux2:9092,linux3:9092",
  24. ConsumerConfig.GROUP_ID_CONFIG -> "atguigu"
  25. )
  26. //4.读取 Kafka 数据
  27. val kafkaDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder,StringDecoder](
  28. ssc,
  29. kafkaPara,
  30. Set("atguigu")
  31. )
  32. //5.计算 WordCount
  33. kafkaDStream.map(_._2)
  34. .flatMap(_.split(" "))
  35. .map((_, 1))
  36. .reduceByKey(_ + _)
  37. .print()
  38. //6.返回数据
  39. ssc
  40. }
  41. def main(args: Array[String]): Unit = {
  42. //获取 SSC
  43. val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck2", () =>
  44. getSSC)
  45. //开启任务
  46. ssc.start()
  47. ssc.awaitTermination()
  48. }
  49. }
复制代码
4)编写代码(手动维护 offset)
  1. import kafka.common.TopicAndPartition
  2. import kafka.message.MessageAndMetadata
  3. import kafka.serializer.StringDecoder
  4. import org.apache.kafka.clients.consumer.ConsumerConfig
  5. import org.apache.spark.SparkConf
  6. import org.apache.spark.streaming.dstream.{DStream, InputDStream}
  7. import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils,
  8. OffsetRange}
  9. import org.apache.spark.streaming.{Seconds, StreamingContext}
  10. object DirectAPIHandler {
  11. def main(args: Array[String]): Unit = {
  12. //1.创建 SparkConf
  13. val sparkConf: SparkConf = new
  14. SparkConf().setAppName("ReceiverWordCount").setMaster("local[*]")
  15. //2.创建 StreamingContext
  16. val ssc = new StreamingContext(sparkConf, Seconds(3))
  17. //3.Kafka 参数
  18. val kafkaPara: Map[String, String] = Map[String, String](
  19. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->
  20. "hadoop102:9092,hadoop103:9092,hadoop104:9092",
  21. ConsumerConfig.GROUP_ID_CONFIG -> "atguigu"
  22. )
  23. //4.获取上一次启动最后保留的 Offset=>getOffset(MySQL)
  24. val fromOffsets: Map[TopicAndPartition, Long] = Map[TopicAndPartition,
  25. Long](TopicAndPartition("atguigu", 0) -> 20)
  26. //5.读取 Kafka 数据创建 DStream
  27. val kafkaDStream: InputDStream[String] = KafkaUtils.createDirectStream[String,
  28. String, StringDecoder, StringDecoder, String](ssc,
  29. kafkaPara,
  30. fromOffsets,
  31. (m: MessageAndMetadata[String, String]) => m.message())
  32. //6.创建一个数组用于存放当前消费数据的 offset 信息
  33. var offsetRanges = Array.empty[OffsetRange]
  34. //7.获取当前消费数据的 offset 信息
  35. val wordToCountDStream: DStream[(String, Int)] = kafkaDStream.transform { rdd
  36. =>
  37. offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  38. rdd
  39. }.flatMap(_.split(" "))
  40. .map((_, 1))
  41. .reduceByKey(_ + _)
  42. //8.打印 Offset 信息
  43. wordToCountDStream.foreachRDD(rdd => {
  44. for (o <- offsetRanges) {
  45. println(s"${o.topic}:${o.partition}:${o.fromOffset}:${o.untilOffset}")
  46. }
  47. rdd.foreach(println)
  48. })
  49. //9.开启任务
  50. ssc.start()
  51. ssc.awaitTermination()
  52. }
  53. }
复制代码
3.3.4 Kafka 0-10 Direct 模式(这才是重点)

1)需求:通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简朴盘算,终极打印到控制台。
2)导入依赖
  1. <dependency>
  2.          <groupId>org.apache.spark</groupId>
  3.          <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  4.          <version>3.0.0</version>
  5. </dependency>
  6. <dependency>
  7.          <groupId>com.fasterxml.jackson.core</groupId>
  8.          <artifactId>jackson-core</artifactId>
  9.          <version>2.10.1</version>
  10. </dependency>
复制代码
3)编写代码
  1. import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.streaming.dstream.{DStream, InputDStream}
  4. import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
  5. import org.apache.spark.streaming.{Seconds, StreamingContext}
  6. object DirectAPI {
  7. def main(args: Array[String]): Unit = {
  8. //1.创建 SparkConf
  9. val sparkConf: SparkConf = new SparkConf().setAppName("ReceiverWordCount").setMaster("local[*]")
  10. //2.创建 StreamingContext
  11. val ssc = new StreamingContext(sparkConf, Seconds(3))
  12. //3.定义 Kafka 参数
  13. val kafkaPara: Map[String, Object] = Map[String, Object](
  14. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux1:9092,linux2:9092,linux3:9092",
  15. ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
  16. "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
  17. "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
  18. )
  19. //4.读取 Kafka 数据创建 DStream
  20. val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))
  21. //5.将每条消息的 KV 取出
  22. val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
  23. //6.计算 WordCount
  24. valueDStream.flatMap(_.split(" "))
  25. .map((_, 1))
  26. .reduceByKey(_ + _)
  27. .print()
  28. //7.开启任务
  29. ssc.start()
  30. ssc.awaitTermination()
  31. }
  32. }
复制代码
第 4 章 DStream 转换

DStream 上的操纵与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输出)两种,别的转换操纵中尚有一些比较特别的原语,如:updateStateByKey()、transform()以及各种 Window 相关的原语。
4.1 无状态转化操纵

无状态转化操纵就是把简朴的 RDD 转化操纵应用到每个批次上,也就是转化 DStream 中的每一个 RDD。部分无状态转化操纵列在了下表中。注意,针对键值对的 DStream 转化操纵(比如reduceByKey())要添加 import StreamingContext._才能在 Scala 中利用。

必要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个 DStream 在内部是由很多 RDD(批次)构成,且无状态转化操纵是分别应用到每个 RDD 上的。
比方:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。
4.1.1 Transform

Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。实在也就是对 DStream 中的 RDD 应用转换。
  1. object Transform {
  2. def main(args: Array[String]): Unit = {
  3. //创建 SparkConf
  4. `val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
  5. //创建 StreamingContext
  6. val ssc = new StreamingContext(sparkConf, Seconds(3))
  7. //创建 DStream
  8. val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)
  9. //转换为 RDD 操作
  10. val wordAndCountDStream: DStream[(String, Int)] = lineDStream.transform(rdd => {
  11. val words: RDD[String] = rdd.flatMap(_.split(" "))
  12. val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
  13. val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
  14. value
  15. })
  16. //打印
  17. wordAndCountDStream.print
  18. //启动
  19. ssc.start()
  20. ssc.awaitTermination()
  21. }
  22. }
复制代码
两个流之间的 join 必要两个流的批次大小同等,如许才能做到同时触发盘算。盘算过程就是对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。
  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.streaming.{Seconds, StreamingContext}
  3. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  4. object JoinTest {
  5. def main(args: Array[String]): Unit = {
  6. //1.创建 SparkConf
  7. val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JoinTest")
  8. //2.创建 StreamingContext
  9. val ssc = new StreamingContext(sparkConf, Seconds(5))
  10. //3.从端口获取数据创建流
  11. val lineDStream1: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)
  12. val lineDStream2: ReceiverInputDStream[String] = ssc.socketTextStream("linux2", 8888)
  13. //4.将两个流转换为 KV 类型
  14. val wordToOneDStream: DStream[(String, Int)] = lineDStream1.flatMap(_.split(" ")).map((_, 1))
  15. val wordToADStream: DStream[(String, String)] = lineDStream2.flatMap(_.split(" ")).map((_, "a"))
  16. //5.流的 JOIN
  17. val joinDStream: DStream[(String, (Int, String))] = wordToOneDStream.join(wordToADStream)
  18. //6.打印
  19. joinDStream.print()
  20. //7.启动任务
  21. ssc.start()
  22. ssc.awaitTermination()
  23. }
  24. }
复制代码
4.2 有状态转化操纵

4.2.1 UpdateStateByKey

UpdateStateByKey 原语用于记载汗青记载,偶尔,我们必要在 DStream 中跨批次维护状态(比方流盘算中累加 wordcount)。针对这种环境,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对情势的 DStream。给定一个由(键,事件)对构成的 DStream,并通报一个指定怎样根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。
updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对构成的。
updateStateByKey 操纵使得我们可以在用新信息进行更新时保持任意的状态。为利用这个功能,必要做下面两步:

  • 定义状态,状态可以是一个任意的数据类型。
  • 定义状态更新函数,用此函数分析怎样利用之前的状态和来自输入流的新值对状态进行更新。
    利用 updateStateByKey 必要对检查点目次进行配置,会利用检查点来保存状态
    更新版的 wordcount

  • 编写代码
  1. object WorldCount {
  2. def main(args: Array[String]) {
  3. // 定义更新状态方法,参数 values 为当前批次单词频度,state 为以往批次单词频度
  4. val updateFunc = (values: Seq[Int], state: Option[Int]) => {
  5. val currentCount = values.foldLeft(0)(_ + _)
  6. val previousCount = state.getOrElse(0)
  7. Some(currentCount + previousCount)
  8. }
  9. val conf = new SparkConf().setMaster("local[*]").setAppName("NetworkWordCount")
  10. val ssc = new StreamingContext(conf, Seconds(3))
  11. ssc.checkpoint("./ck")
  12. // Create a DStream that will connect to hostname:port, like hadoop102:9999
  13. val lines = ssc.socketTextStream("linux1", 9999)
  14. // Split each line into words
  15. val words = lines.flatMap(_.split(" "))
  16. //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
  17. // Count each word in each batch
  18. val pairs = words.map(word => (word, 1))
  19. // 使用 updateStateByKey 来更新状态,统计从运行开始以来单词总的次数
  20. val stateDstream = pairs.updateStateByKey[Int](updateFunc)
  21. stateDstream.print()
  22. ssc.start() // Start the computation
  23. ssc.awaitTermination() // Wait for the computation to terminate
  24. //ssc.stop()
  25. }
  26. }
复制代码

  • 启动步伐并向 9999 端口发送数据
  1. nc -lk 9999
  2. Hello World
  3. Hello Scala
复制代码

  • 结果展示
  1. -------------------------------------------
  2. Time: 1504685175000 ms
  3. -------------------------------------------
  4. -------------------------------------------
  5. Time: 1504685181000 ms
  6. -------------------------------------------
  7. (shi,1)
  8. (shui,1)
  9. (ni,1)
  10. -------------------------------------------
  11. Time: 1504685187000 ms
  12. -------------------------------------------
  13. (shi,1)
  14. (ma,1)
  15. (hao,1)
  16. (shui,1)
复制代码
4.2.2 WindowOperations

Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Steaming 的允许状态。所有基于窗口的操纵都必要两个参数,分别为窗口时长以及滑动步长。
➢ 窗口时长:盘算内容的时间范围;
➢ 滑动步长:隔多久触发一次盘算。
注意:这两者都必须为采集周期大小的整数倍。
WordCount 第三版:3 秒一个批次,窗口 12 秒,滑步 6 秒。
  1. object WorldCount {
  2. def main(args: Array[String]) {
  3. val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
  4. val ssc = new StreamingContext(conf, Seconds(3))
  5. ssc.checkpoint("./ck")
  6. // Create a DStream that will connect to hostname:port, like localhost:9999
  7. val lines = ssc.socketTextStream("linux1", 9999)
  8. // Split each line into words
  9. val words = lines.flatMap(_.split(" "))
  10. // Count each word in each batch
  11. val pairs = words.map(word => (word, 1))
  12. val wordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(12), Seconds(6))
  13. // Print the first ten elements of each RDD generated in this DStream to the console
  14. wordCounts.print()
  15. ssc.start() // Start the computation
  16. ssc.awaitTermination() // Wait for the computation to terminate
  17. }
  18. }
复制代码
关于 Window 的操纵尚有如下方法:
(1)window(windowLength, slideInterval): 基于对源 DStream 窗化的批次进行盘算返回一个新的 Dstream;
(2)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;
(3)reduceByWindow(func, windowLength, slideInterval): 通过利用自定义函数整合滑动区间流元素来创建一个新的单元素流;
(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的 DStream 上调用此函数,会返回一个新(K,V)对的 DStream,此处通过对滑动窗口中批次数据利用 reduce 函数来整合每个 key 的 value 值。
(5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的变化版本,每个窗口的 reduce 值都是通过用前一个窗的 reduce 值来递增盘算。通过 reduce 进入到滑动窗口数据并”反向 reduce”脱离窗口的旧数据来实现这个操纵。一个例子是随着窗口滑动对 keys 的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于”可逆的 reduce 函数”,也就是这些reduce 函数有相应的”反 reduce”函数(以参数 invFunc 情势传入)。如前述函数,reduce 任务的数目通过可选参数来配置。

  1. val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1))
  2. val ipCountDStream = ipDStream.reduceByKeyAndWindow(
  3. {(x, y) => x + y},
  4. {(x, y) => x - y},
  5. Seconds(30),
  6. Seconds(10))
  7. //加上新进入窗口的批次中的元素 //移除离开窗口的老批次中的元素 //窗口时长// 滑动步长
复制代码
countByWindow()和 countByValueAndWindow()作为对数据进行计数操纵的简写。
countByWindow()返回一个表示每个窗口中元素个数的 DStream,而 countByValueAndWindow()返回的 DStream 则包罗窗口中每个值的个数。
  1. val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()}
  2. val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30),
  3. Seconds(10))
  4. val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))
复制代码
第 5 章 DStream 输出

输出操纵指定了对流数据经转化操纵得到的数据所要执行的操纵(比方把结果推入外部数据库或输出到屏幕上)。与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没有被执行输出操纵,那么这些 DStream 就都不会被求值。如果 StreamingContext 中没有设定输出操纵,整个 context 就都不会启动。(懒加载机制,必要行动算子触发盘算)
输出操纵如下:
➢ print():在运行流步伐的驱动结点上打印 DStream 中每一批次数据的最开始 10 个元素。这用于开发和调试。在 Python API 中,同样的操纵叫 print()。
➢ saveAsTextFiles(prefix, [suffix]):以 text 文件情势存储这个 DStream 的内容。每一批次的存储文件名基于参数中的 prefix 和 suffix。”prefix-Time_IN_MS[.suffix]”。
➢ saveAsObjectFiles(prefix, [suffix]):以 Java 对象序列化的方式将 Stream 中的数据保存为SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]“. Python中目前不可用。
➢ saveAsHadoopFiles(prefix, [suffix]):将 Stream 中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]”。Python API 中目前不可用。
➢ foreachRDD(func):这是最通用的输出操纵,即将函数 func 用于产生于 stream 的每一个RDD。此中参数传入的函数 func 应该实现将每一个 RDD 中数据推送到外部体系,如将RDD 存入文件大概通过网络将其写入数据库。
通用的输出操纵 foreachRDD(),它用来对 DStream 中的 RDD 运行任意盘算。这和 transform() 有些类似,都可以让我们访问任意 RDD。在 foreachRDD()中,可以重用我们在 Spark 中实现的所有行动操纵。比如,常见的用例之一是把数据写到诸如 MySQL 的外部数据库中。
注意:

  • 毗连不能写在 driver 层面(序列化)
  • 如果写在 foreach 则每个 RDD 中的每一条数据都创建,得不偿失;
  • 增加 foreachPartition,在分区创建(获取)。
第 6 章 优雅关闭

流式任务必要 7*24 小时执行,但是偶尔涉及到升级代码必要主动停止步伐,但是分布式步伐,没办法做到一个个进程去杀死,所有配置优雅的关闭就显得至关重要了。利用外部文件体系来控制内部步伐关闭。
➢ MonitorStop
  1. import java.net.URI
  2. import org.apache.hadoop.conf.Configuration
  3. import org.apache.hadoop.fs.{FileSystem, Path}
  4. import org.apache.spark.streaming.{StreamingContext, StreamingContextState}
  5. class MonitorStop(ssc: StreamingContext) extends Runnable {
  6.         override def run(): Unit = {
  7.          val fs: FileSystem = FileSystem.get(new URI("hdfs://linux1:9000"), new Configuration(), "atguigu")
  8.          while (true) {
  9.                  try
  10.                         Thread.sleep(5000)
  11.                 catch {
  12.                         case e: InterruptedException => e.printStackTrace()
  13.                  }
  14.                 val state: StreamingContextState = ssc.getState
  15.                 val bool: Boolean = fs.exists(new Path("hdfs://linux1:9000/stopSpark"))
  16.                  if (bool) {
  17.                         if (state == StreamingContextState.ACTIVE) {
  18.                                 ssc.stop(stopSparkContext = true, stopGracefully = true)
  19.                                 System.exit(0)
  20.                                  }
  21.                          }
  22.                  }
  23.         }
  24. }
复制代码
➢ SparkTest
  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  3. import org.apache.spark.streaming.{Seconds, StreamingContext}
  4. object SparkTest {
  5. // _root_.org.apache.spark.streaming.StreamingContext(_root_解决在当前域中类名或对象名冲突,慎用,可读性太TM差了)
  6. def createSSC(): _root_.org.apache.spark.streaming.StreamingContext = {
  7. val update: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int], status: Option[Int]) => {
  8. //当前批次内容的计算
  9. val sum: Int = values.sum
  10. //取出状态信息中上一次状态
  11. val lastStatu: Int = status.getOrElse(0)
  12. Some(sum + lastStatu)
  13. }
  14. val sparkConf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("SparkTest")
  15. //设置优雅的关闭
  16. sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
  17. val ssc = new StreamingContext(sparkConf, Seconds(5))
  18. ssc.checkpoint("./ck")
  19. val line: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)
  20. val word: DStream[String] = line.flatMap(_.split(" "))
  21. val wordAndOne: DStream[(String, Int)] = word.map((_, 1))
  22. val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(update)
  23. wordAndCount.print()
  24. ssc
  25. }
  26. def main(args: Array[String]): Unit = {
  27. val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck", () => createSSC())
  28. new Thread(new MonitorStop(ssc)).start()
  29. ssc.start()
  30. ssc.awaitTermination()
  31. }
  32. }
复制代码
第 7 章 SparkStreaming 案例实操

7.1 环境准备

7.1.1 pom 文件

  1. <dependencies>
  2.          <dependency>
  3.                 <groupId>org.apache.spark</groupId>
  4.                 <artifactId>spark-core_2.12</artifactId>
  5.                 <version>3.0.0</version>
  6.          </dependency>
  7.          <dependency>
  8.                 <groupId>org.apache.spark</groupId>
  9.                 <artifactId>spark-streaming_2.12</artifactId>
  10.                 <version>3.0.0</version>
  11.         </dependency>
  12.         <dependency>
  13.                 <groupId>org.apache.spark</groupId>
  14.                 <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  15.                 <version>3.0.0</version>
  16.         </dependency>
  17. <!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
  18.         <dependency>
  19.                 <groupId>com.alibaba</groupId>
  20.                 <artifactId>druid</artifactId>
  21.                 <version>1.1.10</version>
  22.         </dependency>
  23.         <dependency>
  24.                 <groupId>mysql</groupId>
  25.                 <artifactId>mysql-connector-java</artifactId>
  26.                 <version>5.1.27</version>
  27.         </dependency>
  28.         <dependency>
  29.                 <groupId>com.fasterxml.jackson.core</groupId>
  30.                 <artifactId>jackson-core</artifactId>
  31.                 <version>2.10.1</version>
  32.         </dependency>
  33. </dependencies>
复制代码
7.1.2 工具类

➢ PropertiesUtil
  1. import java.io.InputStreamReader
  2. import java.util.Properties
  3. object PropertiesUtil {
  4. def load(propertiesName:String): Properties ={
  5. val prop=new Properties()
  6. prop.load(new InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propertiesName) ,"UTF-8"))
  7. prop
  8. }
  9. }
复制代码
7.2 及时数据天生模块

➢ config.properties
  1. #jdbc 配置
  2. jdbc.datasource.size=10
  3. jdbc.url=jdbc:mysql://linux1:3306/spark2020?useUnicode=true&characterEncoding=utf
  4. 8&rewriteBatchedStatements=true
  5. jdbc.user=root
  6. jdbc.password=000000
  7. # Kafka 配置
  8. kafka.broker.list=linux1:9092,linux2:9092,linux3:9092
复制代码
➢ CityInfo
  1. /**
  2. *
  3. * 城市信息表
  4. *
  5. * @param city_id 城市 id
  6. * @param city_name 城市名称
  7. * @param area 城市所在大区
  8. */
  9. case class CityInfo (city_id:Long,city_name:String,area:String)
复制代码
➢ RandomOptions
  1. import scala.collection.mutable.ListBuffer
  2. import scala.util.Random
  3. case class RanOpt[T](value: T, weight: Int)
  4. object RandomOptions {
  5. def apply[T](opts: RanOpt[T]*): RandomOptions[T] = {
  6. val randomOptions = new RandomOptions[T]()
  7. for (opt <- opts) {
  8. randomOptions.totalWeight += opt.weight
  9. for (i <- 1 to opt.weight) {
  10. randomOptions.optsBuffer += opt.value
  11. }
  12. }
  13. randomOptions
  14. }
  15. }
  16. class RandomOptions[T](opts: RanOpt[T]*) {
  17. var totalWeight = 0
  18. var optsBuffer = new ListBuffer[T]
  19. def getRandomOpt: T = {
  20. val randomNum: Int = new Random().nextInt(totalWeight)
  21. optsBuffer(randomNum)
  22.         }
  23. }
复制代码
➢ MockerRealTime
  1. import java.util.{Properties, Random}
  2. import com.atguigu.bean.CityInfo
  3. import com.atguigu.utils.{PropertiesUtil, RanOpt, RandomOptions}
  4. import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
  5. ProducerRecord}
  6. import scala.collection.mutable.ArrayBuffer
  7. object MockerRealTime {
  8. /**
  9. * 模拟的数据
  10. *
  11. * 格式 :timestamp area city userid adid
  12. * 某个时间点 某个地区 某个城市 某个用户 某个广告
  13. */
  14. def generateMockData(): Array[String] = {
  15. val array: ArrayBuffer[String] = ArrayBuffer[String]()
  16. val CityRandomOpt = RandomOptions(RanOpt(CityInfo(1, "北京", "华北"), 30),
  17. RanOpt(CityInfo(2, "上海", "华东"), 30),
  18. RanOpt(CityInfo(3, "广州", "华南"), 10),
  19. RanOpt(CityInfo(4, "深圳", "华南"), 20),
  20. RanOpt(CityInfo(5, "天津", "华北"), 10))
  21. val random = new Random()
  22. // 模拟实时数据:
  23. // timestamp province city userid adid
  24. for (i <- 0 to 50) {
  25. val timestamp: Long = System.currentTimeMillis()
  26. val cityInfo: CityInfo = CityRandomOpt.getRandomOpt
  27. val city: String = cityInfo.city_name
  28. val area: String = cityInfo.area
  29. val adid: Int = 1 + random.nextInt(6)
  30. val userid: Int = 1 + random.nextInt(6)
  31. // 拼接实时数据
  32. array += timestamp + " " + area + " " + city + " " + userid + " " + adid
  33. }
  34. array.toArray
  35. }
  36. def createKafkaProducer(broker: String): KafkaProducer[String, String] = {
  37. // 创建配置对象
  38. val prop = new Properties()
  39. // 添加配置
  40. prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker)
  41. prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  42. "org.apache.kafka.common.serialization.StringSerializer")
  43. prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  44. "org.apache.kafka.common.serialization.StringSerializer")
  45. // 根据配置创建 Kafka 生产者
  46. new KafkaProducer[String, String](prop)
  47. }
  48. def main(args: Array[String]): Unit = {
  49. // 获取配置文件 config.properties 中的 Kafka 配置参数
  50. val config: Properties = PropertiesUtil.load("config.properties")
  51. val broker: String = config.getProperty("kafka.broker.list")
  52. val topic = "test"
  53. // 创建 Kafka 消费者
  54. val kafkaProducer: KafkaProducer[String, String] = createKafkaProducer(broker)
  55. while (true) {
  56. // 随机产生实时数据并通过 Kafka 生产者发送到 Kafka 集群中
  57. for (line <- generateMockData()) {
  58. kafkaProducer.send(new ProducerRecord[String, String](topic, line))
  59. println(line)
  60. }
  61. Thread.sleep(2000)
  62. }
  63. }
  64. }
复制代码
7.3 需求一:广告黑名单

实现及时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑。
注:黑名单保存到 MySQL 中。
7.3.1 思路分析

1)读取 Kafka 数据之后,并对 MySQL 中存储的黑名单数据做校验;
2)校验通过则对给用户点击广告次数累加一并存入 MySQL;
3)在存入 MySQL 之后对数据做校验,如果单日超过 100 次则将该用户参加黑名单。
7.3.2 MySQL 建表

创建库 spark2020
1)存放黑名单用户的表
  1. CREATE TABLE black_list (userid CHAR(1) PRIMARY KEY);
复制代码
2)存放单日各用户点击每个广告的次数
  1. CREATE TABLE user_ad_count (
  2. dt varchar(255),
  3. userid CHAR (1),
  4. adid CHAR (1),
  5. count BIGINT,
  6. PRIMARY KEY (dt, userid, adid)
  7. );
复制代码
7.3.3 环境准备

接下来开始及时需求的分析,必要用到 SparkStreaming 来做及时数据的处置惩罚,在生产环境中,绝大部分时间都是对接的 Kafka 数据源,创建一个 SparkStreaming 读取 Kafka 数据的工具类。
➢ MyKafkaUtil
  1. import java.util.Properties
  2. import org.apache.kafka.clients.consumer.ConsumerRecord
  3. import org.apache.kafka.common.serialization.StringDeserializer
  4. import org.apache.spark.streaming.StreamingContext
  5. import org.apache.spark.streaming.dstream.InputDStream
  6. import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils,
  7. LocationStrategies}
  8. object MyKafkaUtil {
  9. //1.创建配置信息对象
  10. private val properties: Properties = PropertiesUtil.load("config.properties")
  11. //2.用于初始化链接到集群的地址
  12. val broker_list: String = properties.getProperty("kafka.broker.list")
  13. //3.kafka 消费者配置
  14. val kafkaParam = Map(
  15. "bootstrap.servers" -> broker_list,
  16. "key.deserializer" -> classOf[StringDeserializer],
  17. "value.deserializer" -> classOf[StringDeserializer],
  18. //消费者组
  19. "group.id" -> "commerce-consumer-group",
  20. //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
  21. //可以使用这个配置,latest 自动重置偏移量为最新的偏移量
  22. "auto.offset.reset" -> "latest",
  23. //如果是 true,则这个消费者的偏移量会在后台自动提交,但是 kafka 宕机容易丢失数据
  24. //如果是 false,会需要手动维护 kafka 偏移量
  25. "enable.auto.commit" -> (true: java.lang.Boolean)
  26. )
  27. // 创建 DStream,返回接收到的输入数据
  28. // LocationStrategies:根据给定的主题和集群地址创建 consumer
  29. // LocationStrategies.PreferConsistent:持续的在所有 Executor 之间分配分区
  30. // ConsumerStrategies:选择如何在 Driver 和 Executor 上创建和配置 Kafka Consumer
  31. // ConsumerStrategies.Subscribe:订阅一系列主题
  32. def getKafkaStream(topic: String, ssc: StreamingContext):
  33. InputDStream[ConsumerRecord[String, String]] = {
  34. val dStream: InputDStream[ConsumerRecord[String, String]] =
  35. KafkaUtils.createDirectStream[String, String](ssc,
  36. LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,
  37. String](Array(topic), kafkaParam))
  38. dStream
  39. }
  40. }
复制代码
➢ JdbcUtil
  1. import java.sql.{Connection, PreparedStatement, ResultSet}
  2. import java.util.Properties
  3. import javax.sql.DataSource
  4. import com.alibaba.druid.pool.DruidDataSourceFactory
  5. object JdbcUtil {
  6. //初始化连接池
  7. var dataSource: DataSource = init()
  8. //初始化连接池方法
  9. def init(): DataSource = {
  10. val properties = new Properties()
  11. val config: Properties = PropertiesUtil.load("config.properties")
  12. properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")
  13. properties.setProperty("url", config.getProperty("jdbc.url"))
  14. properties.setProperty("username", config.getProperty("jdbc.user"))
  15. properties.setProperty("password", config.getProperty("jdbc.password"))
  16. properties.setProperty("maxActive",
  17. config.getProperty("jdbc.datasource.size"))
  18. DruidDataSourceFactory.createDataSource(properties)
  19. }
  20. //获取 MySQL 连接
  21. def getConnection: Connection = {
  22. dataSource.getConnection
  23. }
  24. //执行 SQL 语句,单条数据插入
  25. def executeUpdate(connection: Connection, sql: String, params: Array[Any]): Int = {
  26. var rtn = 0
  27. var pstmt: PreparedStatement = null
  28. try {
  29. connection.setAutoCommit(false)
  30. pstmt = connection.prepareStatement(sql)
  31. if (params != null && params.length > 0) {
  32. for (i <- params.indices) {
  33. pstmt.setObject(i + 1, params(i))
  34. }
  35. }
  36. rtn = pstmt.executeUpdate()
  37. connection.commit()
  38. pstmt.close()
  39. } catch {
  40. case e: Exception => e.printStackTrace()
  41. }
  42. rtn
  43. }
  44. //执行 SQL 语句,批量数据插入
  45. def executeBatchUpdate(connection: Connection, sql: String, paramsList:
  46. Iterable[Array[Any]]): Array[Int] = {
  47. var rtn: Array[Int] = null
  48. var pstmt: PreparedStatement = null
  49. try {
  50. connection.setAutoCommit(false)
  51. pstmt = connection.prepareStatement(sql)
  52. for (params <- paramsList) {
  53. if (params != null && params.length > 0) {
  54. for (i <- params.indices) {
  55. pstmt.setObject(i + 1, params(i))
  56. }
  57. pstmt.addBatch()
  58. }
  59. }
  60. rtn = pstmt.executeBatch()
  61. connection.commit()
  62. pstmt.close()
  63. } catch {
  64. case e: Exception => e.printStackTrace()
  65. }
  66. rtn
  67. }
  68. //判断一条数据是否存在
  69. def isExist(connection: Connection, sql: String, params: Array[Any]): Boolean =
  70. {
  71. var flag: Boolean = false
  72. var pstmt: PreparedStatement = null
  73. try {
  74. pstmt = connection.prepareStatement(sql)
  75. for (i <- params.indices) {
  76. pstmt.setObject(i + 1, params(i))
  77. }
  78. flag = pstmt.executeQuery().next()
  79. pstmt.close()
  80. } catch {
  81. case e: Exception => e.printStackTrace()
  82. }
  83. flag
  84. }
  85. //获取 MySQL 的一条数据
  86. def getDataFromMysql(connection: Connection, sql: String, params: Array[Any]):
  87. Long = {
  88. var result: Long = 0L
  89. var pstmt: PreparedStatement = null
  90. try {
  91. pstmt = connection.prepareStatement(sql)
  92. for (i <- params.indices) {
  93. pstmt.setObject(i + 1, params(i))
  94. }
  95. val resultSet: ResultSet = pstmt.executeQuery()
  96. while (resultSet.next()) {
  97. result = resultSet.getLong(1)
  98. }
  99. resultSet.close()
  100. pstmt.close()
  101. } catch {
  102. case e: Exception => e.printStackTrace()
  103. }
  104. result
  105. }
  106. //主方法,用于测试上述方法
  107. def main(args: Array[String]): Unit = {
  108. }
  109. }
复制代码
7.3.4 代码实现

➢ Ads_log
  1. case class Ads_log(
  2. timestamp: Long,
  3. area: String,
  4. city: String,
  5. userid: String,
  6. adid: String)
复制代码
➢ BlackListHandler
  1. import java.sql.Connection
  2. import java.text.SimpleDateFormat
  3. import java.util.Date
  4. import com.atguigu.bean.Ads_log
  5. import com.atguigu.utils.JdbcUtil
  6. import org.apache.spark.streaming.dstream.DStream
  7. object BlackListHandler {
  8. //时间格式化对象
  9. private val sdf = new SimpleDateFormat("yyyy-MM-dd")
  10. def addBlackList(filterAdsLogDSteam: DStream[Ads_log]): Unit = {
  11. //统计当前批次中单日每个用户点击每个广告的总次数
  12. //1.将数据接转换结构 ads_log=>((date,user,adid),1)
  13. val dateUserAdToOne: DStream[((String, String, String), Long)] =
  14. filterAdsLogDSteam.map(adsLog => {
  15. //a.将时间戳转换为日期字符串
  16. val date: String = sdf.format(new Date(adsLog.timestamp))
  17. //b.返回值
  18. ((date, adsLog.userid, adsLog.adid), 1L)
  19. })
  20. //2.统计单日每个用户点击每个广告的总次数
  21. ((date,user,adid),1)=>((date,user,adid),count)
  22. val dateUserAdToCount: DStream[((String, String, String), Long)] = dateUserAdToOne.reduceByKey(_ + _)
  23. dateUserAdToCount.foreachRDD(rdd => {
  24. rdd.foreachPartition(iter => {
  25. val connection: Connection = JdbcUtil.getConnection
  26. iter.foreach { case ((dt, user, ad), count) =>
  27. JdbcUtil.executeUpdate(connection,
  28. """
  29. |INSERT INTO user_ad_count (dt,userid,adid,count)
  30. |VALUES (?,?,?,?)
  31. |ON DUPLICATE KEY
  32. |UPDATE count=count+?
  33. """.stripMargin, Array(dt, user, ad, count, count))
  34. val ct: Long = JdbcUtil.getDataFromMysql(connection, "select count from user_ad_count where dt=? and userid=? and adid =?", Array(dt, user, ad))
  35. if (ct >= 30) {
  36. JdbcUtil.executeUpdate(connection, "INSERT INTO black_list (userid)
  37. VALUES (?) ON DUPLICATE KEY update userid=?", Array(user, user))
  38. }
  39. }
  40. connection.close()
  41. })
  42. })
  43. }
  44. def filterByBlackList(adsLogDStream: DStream[Ads_log]): DStream[Ads_log] = {
  45. adsLogDStream.transform(rdd => {
  46. rdd.filter(adsLog => {
  47. val connection: Connection = JdbcUtil.getConnection
  48. val bool: Boolean = JdbcUtil.isExist(connection, "select * from black_list
  49. where userid=?", Array(adsLog.userid))
  50. connection.close()
  51. !bool
  52. })
  53. })
  54. }
  55. }
复制代码
➢ RealtimeApp
  1. import com.atguigu.bean.Ads_log
  2. import com.atguigu.handler.BlackListHandler
  3. import com.atguigu.utils.MyKafkaUtil
  4. import org.apache.kafka.clients.consumer.ConsumerRecord
  5. import org.apache.spark.SparkConf
  6. import org.apache.spark.streaming.{Seconds, StreamingContext}
  7. import org.apache.spark.streaming.dstream.{DStream, InputDStream}
  8. object RealTimeApp {
  9. def main(args: Array[String]): Unit = {
  10. //1.创建 SparkConf
  11. val sparkConf: SparkConf = new SparkConf().setAppName("RealTimeApp").setMaster("local[*]")
  12. //2.创建 StreamingContext
  13. val ssc = new StreamingContext(sparkConf, Seconds(3))
  14. //3.读取数据
  15. val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream("ads_log", ssc)
  16. //4.将从 Kafka 读出的数据转换为样例类对象
  17. val adsLogDStream: DStream[Ads_log] = kafkaDStream.map(record => {
  18. val value: String = record.value()
  19. val arr: Array[String] = value.split(" ")
  20. Ads_log(arr(0).toLong, arr(1), arr(2), arr(3), arr(4))
  21. })
  22. //5.需求一:根据 MySQL 中的黑名单过滤当前数据集
  23. val filterAdsLogDStream: DStream[Ads_log] = BlackListHandler2.filterByBlackList(adsLogDStream)
  24. //6.需求一:将满足要求的用户写入黑名单
  25. BlackListHandler2.addBlackList(filterAdsLogDStream)
  26. //测试打印
  27. filterAdsLogDStream.cache()
  28. filterAdsLogDStream.count().print()
  29. //启动任务
  30. ssc.start()
  31. ssc.awaitTermination()
  32. }
  33. }
复制代码
7.4 需求二:广告点击量及时统计

形貌:及时统计每天各地域各城市各广告的点击总流量,并将其存入 MySQL。
7.4.1 思路分析

1)单个批次内对数据进行按照天维度的聚合统计;
2)联合 MySQL 数据跟当前批次数据更新原有的数据。
7.4.2 MySQL 建表

  1. CREATE TABLE area_city_ad_count (
  2. dt VARCHAR(255),
  3. area VARCHAR(255),
  4. city VARCHAR(255),
  5. adid VARCHAR(255),
  6. count BIGINT,
  7. PRIMARY KEY (dt,area,city,adid)
  8. );
复制代码
7.4.3 代码实现

➢ DateAreaCityAdCountHandler
  1. import java.sql.Connection
  2. import java.text.SimpleDateFormat
  3. import java.util.Date
  4. import com.atguigu.bean.Ads_log
  5. import com.atguigu.utils.JdbcUtil
  6. import org.apache.spark.streaming.dstream.DStream
  7. object DateAreaCityAdCountHandler {
  8. //时间格式化对象
  9. private val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
  10. /**
  11. * 统计每天各大区各个城市广告点击总数并保存至 MySQL 中
  12. *
  13. * @param filterAdsLogDStream 根据黑名单过滤后的数据集
  14. */
  15. def saveDateAreaCityAdCountToMysql(filterAdsLogDStream: DStream[Ads_log]): Unit
  16. = {
  17. //1.统计每天各大区各个城市广告点击总数
  18. val dateAreaCityAdToCount: DStream[((String, String, String, String), Long)] =
  19. filterAdsLogDStream.map(ads_log => {
  20. //a.取出时间戳
  21. val timestamp: Long = ads_log.timestamp
  22. //b.格式化为日期字符串
  23. val dt: String = sdf.format(new Date(timestamp))
  24. //c.组合,返回
  25. ((dt, ads_log.area, ads_log.city, ads_log.adid), 1L)
  26. }).reduceByKey(_ + _)
  27. //2.将单个批次统计之后的数据集合 MySQL 数据对原有的数据更新
  28. dateAreaCityAdToCount.foreachRDD(rdd => {
  29. //对每个分区单独处理
  30. rdd.foreachPartition(iter => {
  31. //a.获取连接
  32. val connection: Connection = JdbcUtil.getConnection
  33. //b.写库
  34. iter.foreach { case ((dt, area, city, adid), count) =>
  35. JdbcUtil.executeUpdate(connection,
  36. """
  37. |INSERT INTO area_city_ad_count (dt,area,city,adid,count)
  38. |VALUES(?,?,?,?,?)
  39. |ON DUPLICATE KEY
  40. |UPDATE count=count+?;
  41. """.stripMargin,
  42. Array(dt, area, city, adid, count, count))
  43. }
  44. //c.释放连接
  45. connection.close()
  46. })
  47. })
  48. }
  49. }
复制代码
➢ RealTimeApp
  1. import java.sql.Connection
  2. import com.atguigu.bean.Ads_log
  3. import com.atguigu.handler.{BlackListHandler, DateAreaCityAdCountHandler,
  4. LastHourAdCountHandler}
  5. import com.atguigu.utils.{JdbcUtil, MyKafkaUtil, PropertiesUtil}
  6. import org.apache.kafka.clients.consumer.ConsumerRecord
  7. import org.apache.spark.SparkConf
  8. import org.apache.spark.streaming.dstream.{DStream, InputDStream}
  9. import org.apache.spark.streaming.{Seconds, StreamingContext}
  10. object RealTimeApp {
  11. def main(args: Array[String]): Unit = {
  12. //1.创建 SparkConf
  13. val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RealTimeApp")
  14. //2.创建 StreamingContext
  15. val ssc = new StreamingContext(sparkConf, Seconds(3))
  16. //3.读取 Kafka 数据 1583288137305 华南 深圳 4 3
  17. val topic: String = PropertiesUtil.load("config.properties").getProperty("kafka.topic")
  18. val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(topic, ssc)
  19. //4.将每一行数据转换为样例类对象
  20. val adsLogDStream: DStream[Ads_log] = kafkaDStream.map(record => {
  21. //a.取出 value 并按照" "切分
  22. val arr: Array[String] = record.value().split(" ")
  23. //b.封装为样例类对象
  24. Ads_log(arr(0).toLong, arr(1), arr(2), arr(3), arr(4))
  25. })
  26. //5.根据 MySQL 中的黑名单表进行数据过滤
  27. val filterAdsLogDStream: DStream[Ads_log] = adsLogDStream.filter(adsLog => {
  28. //查询 MySQL,查看当前用户是否存在。
  29. val connection: Connection = JdbcUtil.getConnection
  30. val bool: Boolean = JdbcUtil.isExist(connection, "select * from black_list where userid=?", Array(adsLog.userid))
  31. connection.close()
  32. !bool
  33. })
  34. filterAdsLogDStream.cache()
  35. //6.对没有被加入黑名单的用户统计当前批次单日各个用户对各个广告点击的总次数,
  36. // 并更新至 MySQL
  37. // 之后查询更新之后的数据,判断是否超过 100 次。
  38. // 如果超过则将给用户加入黑名单
  39. BlackListHandler.saveBlackListToMysql(filterAdsLogDStream)
  40. //7.统计每天各大区各个城市广告点击总数并保存至 MySQL 中
  41. dateAreaCityAdCountHandler.saveDateAreaCityAdCountToMysql(filterAdsLogDStream)
  42. //10.开启任务
  43. ssc.start()
  44. ssc.awaitTermination()
  45. }
  46. }
复制代码
7.5 需求三:最近一小时广告点击量

结果展示:
  1. 1:List [15:50->10,15:51->25,15:52->30]
  2. 2:List [15:50->10,15:51->25,15:52->30]
  3. 3:List [15:50->10,15:51->25,15:52->30]
复制代码
7.5.1 思路分析

1)开窗确定时间范围;
2)在窗口内将数据转换数据布局为((adid,hm),count);
3)按照广告 id 进行分组处置惩罚,组内按照时分排序。
7.5.2 代码实现

➢ LastHourAdCountHandler
  1. import java.text.SimpleDateFormat
  2. import java.util.Date
  3. import com.atguigu.bean.Ads_log
  4. import org.apache.spark.streaming.Minutes
  5. import org.apache.spark.streaming.dstream.DStream
  6. object LastHourAdCountHandler {
  7. //时间格式化对象
  8. private val sdf: SimpleDateFormat = new SimpleDateFormat("HH:mm")
  9. /**
  10. * 统计最近一小时(2 分钟)广告分时点击总数
  11. *
  12. * @param filterAdsLogDStream 过滤后的数据集
  13. * @return
  14. */
  15. def getAdHourMintToCount(filterAdsLogDStream: DStream[Ads_log]): DStream[(String, List[(String, Long)])] = {
  16. //1.开窗 => 时间间隔为 1 个小时 window()
  17. val windowAdsLogDStream: DStream[Ads_log] = filterAdsLogDStream.window(Minutes(2))
  18. //2.转换数据结构 ads_log =>((adid,hm),1L) map()
  19. val adHmToOneDStream: DStream[((String, String), Long)] = windowAdsLogDStream.map(adsLog => {
  20. val timestamp: Long = adsLog.timestamp
  21. val hm: String = sdf.format(new Date(timestamp))
  22. ((adsLog.adid, hm), 1L)
  23. })
  24. //3.统计总数 ((adid,hm),1L)=>((adid,hm),sum) reduceBykey(_+_)
  25. val adHmToCountDStream: DStream[((String, String), Long)] = adHmToOneDStream.reduceByKey(_ + _)
  26. //4.转换数据结构 ((adid,hm),sum)=>(adid,(hm,sum)) map()
  27. val adToHmCountDStream: DStream[(String, (String, Long))] = adHmToCountDStream.map { case ((adid, hm), count) =>
  28. (adid, (hm, count))
  29. }
  30. //5.按照 adid 分组 (adid,(hm,sum))=>(adid,Iter[(hm,sum),...]) groupByKey
  31. adToHmCountDStream.groupByKey()
  32. .mapValues(iter =>
  33. iter.toList.sortWith(_._1 < _._1)
  34. )
  35. }
  36. }
复制代码
➢ RealTimeApp
  1. import java.sql.Connection
  2. import com.atguigu.bean.Ads_log
  3. import com.atguigu.handler.{BlackListHandler, DateAreaCityAdCountHandler, LastHourAdCountHandler}
  4. import com.atguigu.utils.{JdbcUtil, MyKafkaUtil, PropertiesUtil}
  5. import org.apache.kafka.clients.consumer.ConsumerRecord
  6. import org.apache.spark.SparkConf
  7. import org.apache.spark.streaming.dstream.{DStream, InputDStream}
  8. import org.apache.spark.streaming.{Seconds, StreamingContext}
  9. object RealTimeApp {
  10. def main(args: Array[String]): Unit = {
  11. //1.创建 SparkConf
  12. val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RealTimeApp")
  13. //2.创建 StreamingContext
  14. val ssc = new StreamingContext(sparkConf, Seconds(3))
  15. //3.读取 Kafka 数据 1583288137305 华南 深圳 4 3
  16. val topic: String = PropertiesUtil.load("config.properties").getProperty("kafka.topic")
  17. val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(topic, ssc)
  18. //4.将每一行数据转换为样例类对象
  19. val adsLogDStream: DStream[Ads_log] = kafkaDStream.map(record => {
  20. //a.取出 value 并按照" "切分
  21. val arr: Array[String] = record.value().split(" ")
  22. //b.封装为样例类对象
  23. Ads_log(arr(0).toLong, arr(1), arr(2), arr(3), arr(4))
  24. })
  25. //5.根据 MySQL 中的黑名单表进行数据过滤
  26. val filterAdsLogDStream: DStream[Ads_log] = adsLogDStream.filter(adsLog => {
  27. //查询 MySQL,查看当前用户是否存在。
  28. val connection: Connection = JdbcUtil.getConnection
  29. val bool: Boolean = JdbcUtil.isExist(connection, "select * from black_list where userid=?", Array(adsLog.userid))
  30. connection.close()
  31. !bool
  32. })
  33. filterAdsLogDStream.cache()
  34. //6.对没有被加入黑名单的用户统计当前批次单日各个用户对各个广告点击的总次数,
  35. // 并更新至 MySQL
  36. // 之后查询更新之后的数据,判断是否超过 100 次。
  37. // 如果超过则将给用户加入黑名单
  38. BlackListHandler.saveBlackListToMysql(filterAdsLogDStream)
  39. //7.统计每天各大区各个城市广告点击总数并保存至 MySQL 中
  40. DateAreaCityAdCountHandler.saveDateAreaCityAdCountToMysql(filterAdsLogDStream)
  41. //8.统计最近一小时(2 分钟)广告分时点击总数
  42. val adToHmCountListDStream: DStream[(String, List[(String, Long)])] = LastHourAdCountHandler.getAdHourMintToCount(filterAdsLogDStream)
  43. //9.打印
  44. adToHmCountListDStream.print()
  45. //10.开启任务
  46. ssc.start()
  47. ssc.awaitTermination()
  48. }
  49. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

张裕

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表