ToB企服应用市场:ToB评测及商务社交产业平台
标题:
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队
[打印本页]
作者:
十念
时间:
2024-8-27 20:58
标题:
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队
点一下关注吧!!!非常感谢!!连续更新!!!
现在已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(已更完)
Spark(正在更新!)
章节内容
上节我们完成了如下的内容:
Spark Streaming 根本概述
Spark Streaming 架构概念
编程模子
长处缺点概括
与 Kafka 集成
根本概念
根本数据源包括:
文件体系(File System):Spark Streaming 支持监控 HDFS、S3、本地文件体系等目录中的新文件,并将这些文件作为数据流的一部分进行处理。这个数据源实用于处理批量生成的文件。
Socket 数据流(Socket Stream):这是最简单的数据源之一,Spark Streaming 可以通过 TCP 套接字连接接收文本数据流。例如,你可以利用 nc(Netcat)工具向指定端口发送数据,Spark Streaming 可以及时读取这些数据。
Kafka:Kafka 是一个分布式消息体系,常用于构建及时流处理应用。Spark Streaming 提供了直接和高级两种 Kafka 数据源集成方式,支持从 Kafka 主题中读取数据流。
Flume:Apache Flume 是一个分布式、可靠且高可用的体系,用于高效收集、聚合和传输大量日记数据。Spark Streaming 可以通过 Flume 接收数据并处理,常用于日记收集和分析。
Kinesis:Amazon Kinesis 是一个及时流处理服务,Spark Streaming 提供了 Kinesis 数据源的支持,可以或许从 Kinesis 流中读取数据,并进行及时分析。
自定义数据源:Spark Streaming 允许用户实现自定义的输入源。用户可以通过实现 Receiver 类或利用 Direct DStream API 来创建新的数据源。
引入依赖
我们利用的话,需要引入依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
复制代码
文件数据流
根本概念
通过 textFileStreama 方法进行读取 HDFS 兼容的文件体系文件
Spark Streaming 将会监控 directory 目录,并不停处理移动进来的文件
不支持嵌套目录
文件需要有相同的数据格式
文件进入 Directory 的方式需要通过移动大概重命名来实现
一旦文件移动进目录,则不能再修改,即便修改了也不会读取新数据
文件流不需要接收器(Receiver),不需要单独分配CPU核
编写代码
package icu.wzk
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FileDStream {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf()
.setAppName("FileDStream")
.setMaster("local[*]")
// 时间间隔
val ssc = new StreamingContext(conf, Seconds(5))
// 本地文件,也可以使用 HDFS 文件
val lines = ssc.textFileStream("goodtbl.java")
val words = lines.flatMap(_.split("\\s+"))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
// 打印信息
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
复制代码
代码分析
object FileDStream: 定义了一个名为 FileDStream 的单例对象,包罗 main 方法,这是 Scala 中的入口点,相当于 Java 的 public static void main 方法。
Logger.getLogger(“org”).setLevel(Level.ERROR): 这行代码将日记级别设置为 ERROR,以减少控制台输出的日记信息,只显示错误级别的信息。这通常是为了避免不须要的日记干扰焦点的输出。
val conf = new SparkConf(): 创建一个 SparkConf 对象,包罗了应用程序的配置信息。
setAppName(“FileDStream”): 设置应用程序的名称为 “FileDStream”。这个名称会在 Spark Web UI 中显示,用于识别应用。
setMaster("local[
]"): 设置 Spark 的运行模式为本地模式(local[
]),这意味着应用程序将在本地运行,并利用全部可用的 CPU 焦点。
val ssc = new StreamingContext(conf, Seconds(5)): 创建一个 StreamingContext 对象,负责管理 Spark Streaming 应用程序的上下文。Seconds(5) 指定了微批处理的时间隔断为 5 秒,也就是每 5 秒钟会处理一次数据。
val words = lines.flatMap(_.split(“\s+”)): 对每一行文本内容进行处理,利用空格或其他空缺字符(\s+)进行分割,将每行文本拆分成单词。flatMap 操纵会将效果睁开为一个包罗全部单词的 DStream。
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _): 通过 map 操纵将每个单词映射为 (word, 1) 形式的键值对,然后利用 reduceByKey 按键(即单词)进行聚合,盘算每个单词的出现次数。
wordCounts.print(): 将盘算效果打印到控制台,每 5 秒钟输出一次当前批次中每个单词的计数效果。
ssc.start(): 启动 Spark Streaming 的盘算,这会使得 Spark 开始监听数据源并开始处理数据流。
ssc.awaitTermination(): 阻塞当前线程,等候流盘算结束,通常是等候手动克制应用程序。这个方法会让程序保持运行,直得手动停止或遇到非常。
运行效果
【备注:利用 local[
],可能会存在问题。】
【如果给虚拟机配置的CPU数为1,利用 local[
] 也会只启动一个线程,该线程用于 Receiver Task,此时没有资源处理继承到达的数据。】
【现象:程序正常执行,不会打印时间戳,屏幕上也不会有其他有消息信息】
Socket数据流
编写代码
Spark Streaming 可以通过Socket端口监听并继承数据,然后进行相应处理:
打开一个新的命令窗口,启动 nc 程序。(在Flink中也这么用过)
# 如果没有的话 你需要安装一下
nc -lk 9999
复制代码
编写运行的代码:
package icu.wzk
import org.apache.log4j.{Level, Logger}
import org.apachea.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SocketDStream {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf()
.setAppName("SocketStream")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1));
val lines = ssc.socketTextStream("0.0.0.0", 9999)
val words = lines.flatMap(_.split("\\s+"))
val wordCount = words.map(x => (x.trim, 1)).reduceByKey(_ + _)
wordCount.print()
ssc.start()
ssc.awaitTermination()
}
}
复制代码
随后可以在nc窗口中随意输入一些单词,监听窗口会自动获取单词数据流信息,在监听窗口每X秒就会打印出词频的统计信息,可以在屏幕是上出现效果。
运行效果
【备注:利用 local[
],可能会存在问题。】
【如果给虚拟机配置的CPU数为1,利用 local[
] 也会只启动一个线程,该线程用于 Receiver Task,此时没有资源处理继承到达的数据。】
【现象:程序正常执行,不会打印时间戳,屏幕上也不会有其他有消息信息】
此时,从控制台启动后,输入内容
RDD队列流
根本概念
调用 Spark Streaming应用程序的时候,可利用 streamingContext.queueStream(queueOfRDD)创建基于RDD队列的DStream
备注:
oneAtTime:缺省为true,一次处理一个RDD,设为False,一次处理全部RDD
RDD队列流可以利用 local[1]
涉及到同时出队和入队操纵,所以要做同步
每秒创建一个RDD(RDD存放1-100的整数),Streaming每隔1秒就对数据进行处理,盘算RDD中数据除10取余的个数。
队列流长处
实用于测试和开发:RDD 队列流重要用于开发和调试阶段,它允许你在没有真实数据源的环境下测试 Spark Streaming 应用程序。
RDD 队列:你可以创建一个包罗 RDD 的队列(Queue),Spark Streaming 会从这个队列中逐一获取 RDD,并将其作为数据流的一部分进行处理。
机动性:由于是手动创建的 RDD 队列,因此你可以完全控制数据的内容、数目以及生成的速度,从而测试各种场景下的应用表现。
编写代码
编写代码如下:
package icu.wzk
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.Queue
object RDDQueueDStream {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
val sparkConf = new SparkConf()
.setAppName("RDDQueueStream")
.setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val rddQueue = new Queue[RDD[Int]]()
val queueStream = ssc.queueStream(rddQueue)
val mappedStream = queueStream.map(r => (r % 10, 1))
val reducedStream = mappedStream.reduceByKey(_ + _)
reducedStream.print()
ssc.start()
for (i <- 1 to 5) {
rddQueue.synchronized {
val range = (1 to 100).map(_ * i)
rddQueue += ssc.sparkContext.makeRDD(range, 2)
}
Thread.sleep(2000)
}
ssc.stop()
}
}
复制代码
运行效果
运行效果如图所示:
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4