Spark Streaming实时数据处置惩罚实战:从DStream基础到自定义数据源集成

[复制链接]
发表于 2025-4-29 02:13:58 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

×
park-Streaming概述
Spark-Streaming是什么
Spark Streaming 用于流式数据的处置惩罚。Spark Streaming 支持的数据输入源很多,比方:Kafka、Flume、Twitter等,以及和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。
和 Spark 基于 RDD 的概念很相似,Spark Streaming 利用离散化流(discretized stream)作为抽象表示,叫作 DStream。
DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。
所以简单来将,DStream 就是对 RDD 在实时数据处置惩罚场景的一种封装。
Spark-Streaming的特点:易用、容错、易整合到spark体系。
易用性:Spark Streaming支持Java、Python、Scala等编程语言,可以像编写离线步伐一样编写实时盘算的步伐
容错:Spark Streaming在没有额外代码配置的环境下,可以恢复丢失的数据。对于实时盘算来说,容错性至关重要。
易整合:Spark Streaming可以在Spark上运行,并且还允许重复利用雷同的代码进行批处置惩罚。也就是说,实时处置惩罚可以与离线处置惩罚相结合,实现交互式的查询操作。
Spark-Streaming架构
背压机制:
在Spark 1.5 从前版本,用户假如要限制 Receiver 的数据吸收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制吸收速率,来适配当前的处置惩罚能力,防止内存溢出,但也会引入其它问题。好比:producer 数据生产高于 maxRate,当前集群处置惩罚能力也高于 maxRate,这就会造成资源利用率下降等问题。
为了更好的协调数据吸收速率与资源处置惩罚能力,1.5 版本开始 Spark Streaming 可以动态控制数据吸收速率来适配集群数据处置惩罚能力。背压机制(即 Spark Streaming Backpressure): 根据JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据吸收率。通过属性“spark.streaming.backpressure.enabled”来控制是否启用 backpressure 机制,默认值为false,即不启用。
DStream实操
案例一:WordCount案例
需求:利用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并统计不同单词出现的次数
实验步调:
1.添加依赖
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>3.0.0</version>
</dependency>
2.编写代码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object value26 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local
  • ").setAppName("streaming")
        val ssc = new StreamingContext(sparkConf, Seconds(3))
        val lineStreams = ssc.socketTextStream("node01", 9999)
        val wordStreams = lineStreams.flatMap(_.split(" "))
        val wordAndOneStreams = wordStreams.map((_, 1))
        val wordAndCountStreams = wordAndOneStreams.reduceByKey(_ + _)
        wordAndCountStreams.print()
        ssc.start()
        ssc.awaitTermination()
    3.启动netcat发送数据
    nc -lk 9999
    案例解析:
    Discretized Stream 是 Spark Streaming 的基础抽象,代表持续性的数据流和颠末各种 Spark 原语操作后的结果数据流。在内部实现上,DStream 是一系列一连的 RDD 来表示。每个 RDD 含有 一段时间间隔内的数据。
    对数据的操作也是按照 RDD 为单元来进行的
    DStream 创建
    创建DStream的三种方式:RDD队列、自定义数据源、kafka数据源
    RDD队列
    可以通过利用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到这个队列中的 RDD,都会作为一个DStream 处置惩罚。
    案例:
    需求:循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,盘算 WordCount
    代码:
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import scala.collection.mutable.Queue
    object value27 {
      def main(args: Array[String]): Unit = {
        // 创建Spark配置,设置应用在本地模式运行,利用所有可用核心,应用名为"RDDStream"
        val sparkConf = new SparkConf().setMaster("local
  • ").setAppName("RDDStream")
        // 创建StreamingContext,批处置惩罚间隔为4秒
        val ssc = new StreamingContext(sparkConf, Seconds(4))
        // 创建一个可变队列用于存放RDD
        val rddQueue = new mutable.Queue[org.apache.spark.rdd.RDD[Int]]()
        // 通过队列创建DStream,设置不每次处置惩罚一个RDD
        val inputStream = ssc.queueStream(rddQueue, oneAtATime = false)
        // 将DStream中的元素映射为键值对,值为1
        val mappedStream = inputStream.map((_, 1))
        // 按键对值进行累加
        val reducedStream = mappedStream.reduceByKey(_ + _)
        // 打印结果
        reducedStream.print()
        // 启动StreamingContext
        ssc.start()
        // 循环创建5个RDD并放入队列,每个RDD包含1到300的数字,分区数为10
        for (i <- 1 to 5) {
          rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
          try {
            Thread.sleep(2000)
          } catch {
            case e: InterruptedException => e.printStackTrace()
          }
        }
        // 等待StreamingContext终止
        ssc.awaitTermination()
      }
    }
    自定义数据源
    自定义数据源需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。
    案例:自定义数据源,实现监控监控某个端口号,获取该端口号内容。
    1)自定义数据源
    import org.apache.spark.streaming.receiver.Receiver
    import org.apache.spark.storage.StorageLevel
    import java.net.Socket
    import java.io.{BufferedReader, InputStreamReader}
    import java.nio.charset.StandardCharsets
    Class CustomerReceiver(host:String,port:Int)extends Receiver[String](StorageLevel.MEMORY_ONLY) {
      override def onStart(): Unit = {
        new Thread("Socket - Receiver") {
          override def run(): Unit = {
            receive()
          }
        }.start()
      }
      def receive(): Unit = {
        var socket: Socket = null
        var input: String = null
        var reader: BufferedReader = null
        try {
          socket = new Socket(host, port)
          reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
          while (!isStopped && {input = reader.readLine(); input != null}) {
            store(input)
          }
        } catch {
          case e: Exception =>
            restart("Error receiving data", e)
        } finally {
          if (reader != null) reader.close()
          if (socket != null) socket.close()
          restart("restart")
        }
      }
      override def onStop(): Unit = {
        // 可添加停止相关逻辑,目前为空实现
      }
    }
    2)利用自定义的数据源采集数据
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    object value28 {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local
  • ").setAppName("stream")
        val ssc = new StreamingContext(sparkConf, Seconds(3))
        // 利用自定义数据源创建DStream
        val customStream = ssc.receiverStream(new CustomerReceiver("localhost", 9999))
        customStream.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }

    免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
    继续阅读请点击广告
  • 回复

    使用道具 举报

    © 2001-2025 Discuz! Team. Powered by Discuz! X3.5

    GMT+8, 2025-7-10 03:06 , Processed in 0.242411 second(s), 32 queries 手机版|qidao123.com技术社区-IT企服评测▪应用市场 ( 浙ICP备20004199 )|网站地图

    快速回复 返回顶部 返回列表