大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现 ...

打印 上一主题 下一主题

主题 345|帖子 345|积分 1035

喜大普奔!破百了!

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:



  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)
章节内容

上节我们完成了如下的内容:


  • Spark Streaming 底子数据源
  • 文件流、Socket流、RDD队列流
  • 引入依赖、Java编写多种流举行测试

DStream 转换

DStream上的操作与RDD类似,分为Transformations(转换)和 Output Operations(输出)两种,别的转换操作中还有一些比力特别的方法,如:


  • updateStateByKey
  • transform
  • window干系操作



map(func)

对 DStream 中的每个元素应用 func 函数,并返回一个新的 DStream。
例如,将每个记载转换为其长度。
示例:val lengths = lines.map(line => line.length)
flatMap(func)

对 DStream 中的每个元素应用 func 函数,并将结果展平(即将集合的集合展开)。
例如,将每一行文本拆分为单词。
示例:val words = lines.flatMap(line => line.split(" "))
filter(func)

对 DStream 中的每个元素应用 func 函数,并保留返回值为 true 的元素。
例如,过滤掉长度小于 5 的单词。
示例:val filteredWords = words.filter(word => word.length > 5)
reduceByKey(func)

对键值对 DStream 举行聚合操作,对具有相同键的元素应用 func 函数。
例如,盘算每个单词的总数。
示例:val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
groupByKey()

对键值对 DStream 中的每个键举行分组,并将具有相同键的值聚合到一个列表中。
示例:val grouped = pairs.groupByKey()
count()

统计 DStream 中每个 RDD 的元素个数。
示例:val count = words.count()
countByValue()

统计 DStream 中每个 RDD 中每个值的出现次数。
示例:val valueCounts = words.countByValue()
union(otherDStream)

将两个 DStream 归并为一个新的 DStream,包罗两个 DStream 中的所有元素。
示例:val mergedStream = stream1.union(stream2)
join(otherDStream)

对两个键值对 DStream 举行毗连操作,类似 SQL 中的 JOIN 操作。
示例:val joinedStream = stream1.join(stream2)
备注:


  • 在DStream与RDD上的转换操作非常类似(无状态操作)
  • DStream有自己特别的操作(窗口操作、追踪状态变化操作)
  • 在DStream上的转换操作比RDD上的转换操作少
DStream 的转换操作可以分为 无状态(stateless)和 有状态(stateful)两种:


  • 无状态转换操作,每个批次的处理不依赖与之前批次的数据,常见的RDD转化操作,例如:map、Filter、reduceByKey等
  • 有状态转换操作,需要使用之前批次的数据或者是中间结果来盘算当前批次的数据,有状态转换操作包括:基于滑动窗口的转换操作或追踪状态变化的转化操作
无状态转换

无状态转换操作就是把简单的RDD转换操作应用到每个批次上,也就是转换DStream中的每一个RDD。
常见的无状态转换包括:


  • map
  • flatMap
  • repartition
  • reduceByKey
  • groupByKey
重要的转换操作:transform,通过对源DStream的每个RDD应用RDD-To-RDD函数,创建一个新的DStream,支持在新的DStream中任何RDD操作。
这是一个功能强大的函数,它可以允许开发者直接操作其内部的RDD,也就是说开发者,可以恣意提供一个RDDToRDD的函数,这个函数在数据流每个批次中都被调用,天生一个新的流。
案例1 黑名单过滤

  1. 假设:arr1为黑名单数据(自定义),true表示数据生效,需要被过滤掉;false表示数据
  2. 未生效
  3. val arr1 = Array(("spark", true), ("scala", false))
  4. 假设:流式数据格式为"time word",需要根据黑名单中的数据对流式数据执行过滤操
  5. 作。如"2 spark"要被过滤掉
  6. 1 hadoop
  7. 2 spark
  8. 3 scala
  9. 4 java
  10. 5 hive
  11. 结果:"2 spark" 被过滤
复制代码
方案1 外毗连实现

  1. package icu.wzk
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.streaming.dstream.ConstantInputDStream
  5. import org.apache.spark.streaming.{Seconds, StreamingContext}
  6. object BlackListFilter1 {
  7.   def main(args: Array[String]): Unit = {
  8.     val conf = new SparkConf()
  9.       .setAppName("BlackListFilter1")
  10.       .setMaster("local[*]")
  11.     val ssc = new StreamingContext(conf, Seconds(10))
  12.     // 黑名单
  13.     val blackList = Array(("spark", true), ("scala", true))
  14.     val blackListRDD = ssc.sparkContext.makeRDD(blackList)
  15.     // 测试数据
  16.     val strArray: Array[String] = "spark java scala hadoop kafka hive hbase zookeeper"
  17.       .split("\\s+")
  18.       .zipWithIndex
  19.       .map {
  20.         case (word, index) => s"$index $word"
  21.       }
  22.     val rdd = ssc.sparkContext.makeRDD(strArray)
  23.     val clickStream = new ConstantInputDStream(ssc, rdd)
  24.     // 流式数据的处理
  25.     val clickStreamFormatted = clickStream
  26.       .map(value => (value.split(" ")(1), value))
  27.     clickStreamFormatted.transform(clickRDD => {
  28.       val joinedBlockListRDD: RDD[(String, (String, Option[Boolean]))] = clickRDD.leftOuterJoin(blackListRDD)
  29.       joinedBlockListRDD.filter {
  30.         case (word, (streamingLine, flag)) => {
  31.           if (flag.getOrElse(false)) {
  32.             false
  33.           } else {
  34.             true
  35.           }
  36.         }
  37.       }.map {
  38.         case (word, (streamingLine, flag)) => streamingLine
  39.       }
  40.     }).print()
  41.     // 启动
  42.     ssc.start()
  43.     ssc.awaitTermination()
  44.   }
  45. }
复制代码
方案1 运行结果

  1. -------------------------------------------
  2. Time: 1721618670000 ms
  3. -------------------------------------------
  4. 5 hive
  5. 6 hbase
  6. 1 java
  7. 7 zookeeper
  8. 3 hadoop
  9. 4 kafka
  10. ... 下一批
复制代码
对应的结果如下图所示:

方案2 SQL实现

  1. package icu.wzk
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.sql.{DataFrame, SparkSession}
  4. import org.apache.spark.streaming.dstream.ConstantInputDStream
  5. import org.apache.spark.streaming.{Seconds, StreamingContext}
  6. object BlackListFilter2 {
  7.   def main(args: Array[String]): Unit = {
  8.     val conf = new SparkConf()
  9.       .setAppName("BlackListFilter2")
  10.       .setMaster("local[*]")
  11.     val ssc = new StreamingContext(conf, Seconds(10))
  12.     ssc.sparkContext.setLogLevel("WARN")
  13.     // 黑名单
  14.     val blackList = Array(("spark", true), ("scala", true))
  15.     val blackListRDD = ssc.sparkContext.makeRDD(blackList)
  16.     // 生成测试 DStream
  17.     val strArray: Array[String] = "spark java scala hadoop kafka hive hbase zookeeper"
  18.       .split("\\s+")
  19.       .zipWithIndex
  20.       .map {
  21.         case (word, index) => s"$index $word"
  22.       }
  23.     val rdd = ssc.sparkContext.makeRDD(strArray)
  24.     val clickStream = new ConstantInputDStream(ssc, rdd)
  25.     // 流式数据的处理
  26.     val clickStreamFormatted = clickStream
  27.       .map(value => (value.split(" ")(1), value))
  28.     clickStreamFormatted.transform {
  29.       clickRDD =>
  30.         val spark = SparkSession
  31.           .builder()
  32.           .config(rdd.sparkContext.getConf)
  33.           .getOrCreate()
  34.         import spark.implicits._
  35.         val clickDF: DataFrame = clickRDD.toDF("word", "line")
  36.         val blackDF: DataFrame = blackListRDD.toDF("word", "flag")
  37.         clickDF.join(blackDF, Seq("word"), "left")
  38.           .filter("flag is null or flag == false")
  39.           .select("line")
  40.           .rdd
  41.     }.print()
  42.     ssc.start()
  43.     ssc.awaitTermination()
  44.   }
  45. }
复制代码
方案2 SQL运行结果

  1. -------------------------------------------
  2. Time: 1721619900000 ms
  3. -------------------------------------------
  4. [6 hbase]
  5. [4 kafka]
  6. [7 zookeeper]
  7. [1 java]
  8. [3 hadoop]
  9. [5 hive]
复制代码
运行结果截图如下图所示:

方案3 直接过滤

  1. package icu.wzk
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.broadcast.Broadcast
  4. import org.apache.spark.streaming.dstream.ConstantInputDStream
  5. import org.apache.spark.streaming.{Seconds, StreamingContext}
  6. object BlackListFilter3 {
  7.   def main(args: Array[String]): Unit = {
  8.     val conf = new SparkConf()
  9.       .setAppName("BlackListFilter3")
  10.       .setMaster("local[*]")
  11.     val ssc = new StreamingContext(conf, Seconds(10))
  12.     ssc.sparkContext.setLogLevel("WARN")
  13.     // 黑名单
  14.     val blackList = Array(("spark", true), ("scala", true))
  15.     val blackListBC: Broadcast[Array[String]] = ssc
  16.       .sparkContext
  17.       .broadcast(blackList.filter(_._2).map(_._1))
  18.     // 生成测试DStream
  19.     val strArray: Array[String] = "spark java scala hadoop kafka hive hbase zookeeper"
  20.       .split("\\s+")
  21.       .zipWithIndex
  22.       .map {
  23.         case (word, index) => s"$index $word"
  24.       }
  25.     val rdd = ssc.sparkContext.makeRDD(strArray)
  26.     val clickStream = new ConstantInputDStream(ssc, rdd)
  27.     // 流式数据的处理
  28.     clickStream.map(value => (value.split(" ")(1), value))
  29.       .filter {
  30.         case (word, _) => !blackListBC.value.contains(word)
  31.       }
  32.       .map(_._2)
  33.       .print()
  34.     // 启动
  35.     ssc.start()
  36.     ssc.awaitTermination()
  37.    
  38.   }
  39. }
复制代码
方案3 直接过滤运行结果

  1. -------------------------------------------
  2. Time: 1721627600000 ms
  3. -------------------------------------------
  4. 1 java
  5. 3 hadoop
  6. 4 kafka
  7. 5 hive
  8. 6 hbase
  9. 7 zookeeper
  10. ... 下一批
复制代码
运行结果如下图所示:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

张裕

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表