大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现
喜大普奔!破百了!点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
[*]Hadoop(已更完)
[*]HDFS(已更完)
[*]MapReduce(已更完)
[*]Hive(已更完)
[*]Flume(已更完)
[*]Sqoop(已更完)
[*]Zookeeper(已更完)
[*]HBase(已更完)
[*]Redis (已更完)
[*]Kafka(已更完)
[*]Spark(正在更新!)
章节内容
上节我们完成了如下的内容:
[*]Spark Streaming 底子数据源
[*]文件流、Socket流、RDD队列流
[*]引入依赖、Java编写多种流举行测试
https://i-blog.csdnimg.cn/direct/23fccf04d4b344b28cd5ca79b9be5856.png
DStream 转换
DStream上的操作与RDD类似,分为Transformations(转换)和 Output Operations(输出)两种,别的转换操作中还有一些比力特别的方法,如:
[*]updateStateByKey
[*]transform
[*]window干系操作
https://i-blog.csdnimg.cn/direct/5a2e67e477264e0798caf4f10b739c1a.png
https://i-blog.csdnimg.cn/direct/88c59c0ec96143fc96a687be396fdbb7.png
https://i-blog.csdnimg.cn/direct/cebec74241234979a8df6ecdf54138ab.png
map(func)
对 DStream 中的每个元素应用 func 函数,并返回一个新的 DStream。
例如,将每个记载转换为其长度。
示例:val lengths = lines.map(line => line.length)
flatMap(func)
对 DStream 中的每个元素应用 func 函数,并将结果展平(即将集合的集合展开)。
例如,将每一行文本拆分为单词。
示例:val words = lines.flatMap(line => line.split(" "))
filter(func)
对 DStream 中的每个元素应用 func 函数,并保留返回值为 true 的元素。
例如,过滤掉长度小于 5 的单词。
示例:val filteredWords = words.filter(word => word.length > 5)
reduceByKey(func)
对键值对 DStream 举行聚合操作,对具有相同键的元素应用 func 函数。
例如,盘算每个单词的总数。
示例:val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
groupByKey()
对键值对 DStream 中的每个键举行分组,并将具有相同键的值聚合到一个列表中。
示例:val grouped = pairs.groupByKey()
count()
统计 DStream 中每个 RDD 的元素个数。
示例:val count = words.count()
countByValue()
统计 DStream 中每个 RDD 中每个值的出现次数。
示例:val valueCounts = words.countByValue()
union(otherDStream)
将两个 DStream 归并为一个新的 DStream,包罗两个 DStream 中的所有元素。
示例:val mergedStream = stream1.union(stream2)
join(otherDStream)
对两个键值对 DStream 举行毗连操作,类似 SQL 中的 JOIN 操作。
示例:val joinedStream = stream1.join(stream2)
备注:
[*]在DStream与RDD上的转换操作非常类似(无状态操作)
[*]DStream有自己特别的操作(窗口操作、追踪状态变化操作)
[*]在DStream上的转换操作比RDD上的转换操作少
DStream 的转换操作可以分为 无状态(stateless)和 有状态(stateful)两种:
[*]无状态转换操作,每个批次的处理不依赖与之前批次的数据,常见的RDD转化操作,例如:map、Filter、reduceByKey等
[*]有状态转换操作,需要使用之前批次的数据或者是中间结果来盘算当前批次的数据,有状态转换操作包括:基于滑动窗口的转换操作或追踪状态变化的转化操作
无状态转换
无状态转换操作就是把简单的RDD转换操作应用到每个批次上,也就是转换DStream中的每一个RDD。
常见的无状态转换包括:
[*]map
[*]flatMap
[*]repartition
[*]reduceByKey
[*]groupByKey
重要的转换操作:transform,通过对源DStream的每个RDD应用RDD-To-RDD函数,创建一个新的DStream,支持在新的DStream中任何RDD操作。
这是一个功能强大的函数,它可以允许开发者直接操作其内部的RDD,也就是说开发者,可以恣意提供一个RDDToRDD的函数,这个函数在数据流每个批次中都被调用,天生一个新的流。
案例1 黑名单过滤
假设:arr1为黑名单数据(自定义),true表示数据生效,需要被过滤掉;false表示数据
未生效
val arr1 = Array(("spark", true), ("scala", false))
假设:流式数据格式为"time word",需要根据黑名单中的数据对流式数据执行过滤操
作。如"2 spark"要被过滤掉
1 hadoop
2 spark
3 scala
4 java
5 hive
结果:"2 spark" 被过滤
方案1 外毗连实现
package icu.wzk
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object BlackListFilter1 {
def main(args: Array): Unit = {
val conf = new SparkConf()
.setAppName("BlackListFilter1")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(10))
// 黑名单
val blackList = Array(("spark", true), ("scala", true))
val blackListRDD = ssc.sparkContext.makeRDD(blackList)
// 测试数据
val strArray: Array = "spark java scala hadoop kafka hive hbase zookeeper"
.split("\\s+")
.zipWithIndex
.map {
case (word, index) => s"$index $word"
}
val rdd = ssc.sparkContext.makeRDD(strArray)
val clickStream = new ConstantInputDStream(ssc, rdd)
// 流式数据的处理
val clickStreamFormatted = clickStream
.map(value => (value.split(" ")(1), value))
clickStreamFormatted.transform(clickRDD => {
val joinedBlockListRDD: RDD[(String, (String, Option))] = clickRDD.leftOuterJoin(blackListRDD)
joinedBlockListRDD.filter {
case (word, (streamingLine, flag)) => {
if (flag.getOrElse(false)) {
false
} else {
true
}
}
}.map {
case (word, (streamingLine, flag)) => streamingLine
}
}).print()
// 启动
ssc.start()
ssc.awaitTermination()
}
}
方案1 运行结果
-------------------------------------------
Time: 1721618670000 ms
-------------------------------------------
5 hive
6 hbase
1 java
7 zookeeper
3 hadoop
4 kafka
... 下一批
对应的结果如下图所示:
https://i-blog.csdnimg.cn/direct/adaef28cce5a4bf282174876b2e2434e.png
方案2 SQL实现
package icu.wzk
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object BlackListFilter2 {
def main(args: Array): Unit = {
val conf = new SparkConf()
.setAppName("BlackListFilter2")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(10))
ssc.sparkContext.setLogLevel("WARN")
// 黑名单
val blackList = Array(("spark", true), ("scala", true))
val blackListRDD = ssc.sparkContext.makeRDD(blackList)
// 生成测试 DStream
val strArray: Array = "spark java scala hadoop kafka hive hbase zookeeper"
.split("\\s+")
.zipWithIndex
.map {
case (word, index) => s"$index $word"
}
val rdd = ssc.sparkContext.makeRDD(strArray)
val clickStream = new ConstantInputDStream(ssc, rdd)
// 流式数据的处理
val clickStreamFormatted = clickStream
.map(value => (value.split(" ")(1), value))
clickStreamFormatted.transform {
clickRDD =>
val spark = SparkSession
.builder()
.config(rdd.sparkContext.getConf)
.getOrCreate()
import spark.implicits._
val clickDF: DataFrame = clickRDD.toDF("word", "line")
val blackDF: DataFrame = blackListRDD.toDF("word", "flag")
clickDF.join(blackDF, Seq("word"), "left")
.filter("flag is null or flag == false")
.select("line")
.rdd
}.print()
ssc.start()
ssc.awaitTermination()
}
}
方案2 SQL运行结果
-------------------------------------------
Time: 1721619900000 ms
-------------------------------------------
运行结果截图如下图所示:
https://i-blog.csdnimg.cn/direct/de58c95bc8a3466eb546824f8c9111a8.png
方案3 直接过滤
package icu.wzk
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object BlackListFilter3 {
def main(args: Array): Unit = {
val conf = new SparkConf()
.setAppName("BlackListFilter3")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(10))
ssc.sparkContext.setLogLevel("WARN")
// 黑名单
val blackList = Array(("spark", true), ("scala", true))
val blackListBC: Broadcast] = ssc
.sparkContext
.broadcast(blackList.filter(_._2).map(_._1))
// 生成测试DStream
val strArray: Array = "spark java scala hadoop kafka hive hbase zookeeper"
.split("\\s+")
.zipWithIndex
.map {
case (word, index) => s"$index $word"
}
val rdd = ssc.sparkContext.makeRDD(strArray)
val clickStream = new ConstantInputDStream(ssc, rdd)
// 流式数据的处理
clickStream.map(value => (value.split(" ")(1), value))
.filter {
case (word, _) => !blackListBC.value.contains(word)
}
.map(_._2)
.print()
// 启动
ssc.start()
ssc.awaitTermination()
}
}
方案3 直接过滤运行结果
-------------------------------------------
Time: 1721627600000 ms
-------------------------------------------
1 java
3 hadoop
4 kafka
5 hive
6 hbase
7 zookeeper
... 下一批
运行结果如下图所示:
https://i-blog.csdnimg.cn/direct/d6c0a3df20a141989a1bc0c99525a52c.png
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]