ToB企服应用市场:ToB评测及商务社交产业平台
标题:
Spark 组件 GraphX、Streaming
[打印本页]
作者:
羊蹓狼
时间:
2024-8-16 21:45
标题:
Spark 组件 GraphX、Streaming
Spark 组件 GraphX、Streaming
一、Spark GraphX
1.1 GraphX 的主要概念
1.2 GraphX 的焦点操纵
1.3 示例代码
1.4 GraphX 的应用场景
二、Spark Streaming
2.1 Spark Streaming 的主要概念
2.2 示例代码
2.3 Spark Streaming 的集成
2.4 Spark Streaming 的应用场景
Spark GraphX
用于处理
图和图并行计算
。GraphX 将图的表达和操纵嵌入到 Spark 的数据流 API 中,允许用户在图上实行高效的并行计算。GraphX 结合了图计算和数据流计算的功能,使得它可以或许处理复杂的数据分析任务。
Spark Streaming
用于
处理实时数据流
。它允许开发者以微批处理(Micro-batch)的方式处理实时数据,提供了一个高层次的 API,可以轻松地将批处理操纵应用于实时数据流。
一、Spark GraphX
1.1 GraphX 的主要概念
极点 (Vertex)
:
图中的节点,表示实体或对象。每个极点都有一个唯一的标识符(ID)和属性。
边 (Edge)
:
图中的连接,表示极点之间的关系。每条边连接两个极点,而且也可以有属性。
图 (Graph)
:
由极点和边组成的结构。GraphX 利用 Graph 类来表示图,极点和边的聚集分别由 RDD[VertexId, VD] 和 RDD[Edge[ED]] 表示,此中 VD 和 ED 是极点和边的属性类型。
Triplet
:
GraphX 中的 EdgeTriplet 代表一条边及其连接的两个极点的信息,允许同时访问极点和边的属性。
1.2 GraphX 的焦点操纵
图构造 (Graph Construction)
:
通过极点和边的 RDD 来构建图。比方,利用 Graph(vertices, edges) 构造一个图。
图转换 (Graph Transformation)
:
对图进行操纵,比方过滤极点和边 (subgraph),或将极点和边的属性映射到新属性 (mapVertices 和 mapEdges)。
聚合消息 (Aggregate Messages)
:
用于从邻接极点或边聚合信息。这在实现图算法(如 PageRank)时特殊有效。
图算法 (Graph Algorithms)
:
GraphX 提供了一些预定义的图算法,如 PageRank、Connected Components、Shortest Paths 和 Triangle Counting。
1.3 示例代码
1.3.1 简朴的 GraphX 示例,创建一个图并运行 PageRank 算法:
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object GraphXExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("graphx example")
.setMaster("local[*]")
// 创建 SparkSession
val spark = SparkSession
.builder
.config(conf)
.getOrCreate()
// 创建顶点 RDD
val vertices: RDD[(VertexId, String)] = spark.sparkContext.parallelize(Seq(
(1L, "Alice"),
(2L, "Bob"),
(3L, "Charlie"),
(4L, "David")
))
// 创建边 RDD
val edges: RDD[Edge[Int]] = spark.sparkContext.parallelize(Seq(
Edge(1L, 2L, 1),
Edge(2L, 3L, 1),
Edge(3L, 4L, 1),
Edge(4L, 1L, 1)
))
// 创建图
val graph = Graph(vertices, edges)
// 运行 PageRank 算法
// PageRank 算法最初是用于搜索引擎中对网页进行排序的基础算法,通过分析网络中的链接结构来评估每个网页的相对重要性。
// 这里0.0001为收敛阈值
val ranks = graph.pageRank(0.0001).vertices
// 打印结果
ranks.collect().foreach { case (id, rank) => println(s"Vertex $id has rank $rank") }
// 停止 SparkSession
spark.stop()
}
}
复制代码
1.3.2 Spark GraphX 的一些基本操纵和概念
// 创建图
val graph = Graph(vertices, edges)
// vertices:获取图中的所有顶点。
graph.vertices.collect.foreach(println)
// mapVertices:对顶点的属性进行变换。
val newGraph = graph.mapVertices((id, attr) => attr.toUpperCase)
// edges:获取图中的所有边。
graph.edges.collect.foreach(println)
// mapEdges:对边的属性进行变换。
val newGraph = graph.mapEdges(e => e.attr * 2)
// 查看所有的边(将点带入)【完整】
graph.triplets.foreach(println)
// 入度:指向自己的个数
graph.inDegrees.foreach(println) // (人id,入度)
// 出度:指向别人的个数
graph.outDegrees.foreach(println) // (人id,出度)
// 将入度和出度一起显示
graph
.inDegrees
.join(graph.outDegrees)
.foreach(println) // (人id,(入度,出度))
// 度:入度+出度
graph.degrees.foreach(println)
// subgraph:根据条件创建一个子图,保留满足条件的顶点和边。
val subGraph = graph.subgraph(vpred = (id, attr) => attr != "Bob")
// joinVertices:将图的顶点属性与一个新的 RDD 进行连接,并更新顶点属性。
val newAttrs: RDD[(Long, String)] = sc.parallelize(Seq(
(1L, "Alice_new"),
(4L, "David_new")
))
val joinedGraph = graph.joinVertices(newAttrs) {
case (id, oldAttr, newAttr) => newAttr
}
复制代码
1.3.3 图算法
// PageRank 算法用于计算图中每个顶点的重要性。
val ranks = graph.pageRank(0.0001).vertices
ranks.collect.foreach(println)
// 连接组件: 连通组件算法用于查找图中的连通子图。
val connectedComponents = graph.connectedComponents().vertices
connectedComponents.collect.foreach(println)
// 三角计数: 三角计数算法用于计算每个顶点所属的三角形数量。
val triangleCounts = graph.triangleCount().vertices
triangleCounts.collect.foreach(println)
// 图的持久化与加载: 图可以通过将顶点和边的 RDD 存储在 HDFS 或其他文件系统中进行持久化。
// 保存图的顶点和边
graph.vertices.saveAsTextFile("hdfs://path/to/vertices")
graph.edges.saveAsTextFile("hdfs://path/to/edges")
// 从文件中加载图
val loadedVertices: RDD[(Long, String)] = sc.textFile("hdfs://path/to/vertices").map { line =>
val parts = line.split(",")
(parts(0).toLong, parts(1))
}
val loadedEdges: RDD[Edge[Int]] = sc.textFile("hdfs://path/to/edges").map { line =>
val parts = line.split(",")
Edge(parts(0).toLong, parts(1).toLong, parts(2).toInt)
}
val loadedGraph: Graph[String, Int] = Graph(loadedVertices, loadedEdges)
复制代码
1.3.4 Pregel 模型
Pregel 算法是一种迭代、消息通报的计算模型,特殊适用于处理图的遍历和递归题目,如最短路径计算、PageRank、连通组件检测等。
Pregel 的焦点头脑是将图计算任务表示为一个超等步(superstep)序列,每个超等步由以下几个阶段组成:
消息通报
:每个极点可以向相邻的极点发送消息。
消息处理
:每个极点接收来自相邻极点的消息,并更新自己的状态。
极点计算
:极点在处理完消息后可以决定是否继续活跃或制止计算(halt)。
这个过程会不停迭代,直到所有极点都制止计算或达到指定的迭代次数。
// Pregel 模型:
val initialMsg = ... // 定义初始消息
val maxIterations = 10 // 最大迭代次数
val resultGraph = graph.pregel(initialMsg, maxIterations)(
// 顶点程序,处理接收到的消息并更新顶点属性
vprog = (id, attr, msg) => {
// 根据顶点的属性和收到的消息,更新顶点属性
},
// 发送消息,定义如何从一个顶点向相邻的顶点发送消息
sendMsg = triplet => {
// 根据边的属性和顶点的状态,决定发送的消息
},
// 聚合消息,定义如何合并一个顶点接收到的所有消息
mergeMsg = (msg1, msg2) => {
// 合并接收到的多个消息
}
)
复制代码
利用 Pregel 模型可以计算图中某个出发点到其他所有极点的最短路径。
import org.apache.spark.SparkConf
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object GraphxTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("graphx example")
.setMaster("local[*]")
// 创建 SparkSession
val spark = SparkSession
.builder
.config(conf)
.getOrCreate()
val vertices: RDD[(VertexId, String)] = spark.sparkContext.parallelize(Seq(
(1L, "Alice"),
(2L, "Bob"),
(3L, "Charlie"),
(4L, "David")
))
// 创建边 RDD
val edges: RDD[Edge[Int]] = spark.sparkContext.parallelize(Seq(
Edge(1L, 2L, 1),
Edge(2L, 3L, 1),
Edge(2L, 4L, 5),
Edge(3L, 4L, 2),
Edge(4L, 1L, 1)
))
// 创建图
val graph: Graph[String, PartitionID] = Graph(vertices, edges)
// 设置源顶点 ID
val sourceId: VertexId = 1L
// 初始化图的顶点属性,将源顶点的距离设为 0.0,其他顶点设为无穷大
val initialGraph = graph.mapVertices((id, name) =>
if (id == sourceId) (name, 0.0) else (name, Double.PositiveInfinity)
)
// 运行 Pregel 算法计算单源最短路径 (SSSP)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
// 顶点程序:更新顶点属性(即距离)为当前距离与新接收到的距离中的较小值
vprog = (id, attr, msg) => (attr._1, Math.min(attr._2, msg)),
// 消息发送函数:计算从源顶点到目标顶点的距离,如果新计算的距离小于目标顶点当前的距离,则发送消息
sendMsg = triplet => {
if (triplet.srcAttr._2 + triplet.attr < triplet.dstAttr._2) {
Iterator((triplet.dstId, triplet.srcAttr._2 + triplet.attr))
} else {
Iterator.empty
}
},
// 消息合并函数:在目标顶点收到多个消息时,取最短的距离
mergeMsg = (a, b) => math.min(a, b)
)
// 打印最终的最短路径结果
sssp.vertices.collect.foreach { case (id, (name, dist)) =>
println(s"Distance from $sourceId to $id ($name) is $dist")
}
// 停止 SparkSession
spark.stop()
}
}
复制代码
1.4 GraphX 的应用场景
外交网络分析
:分析用户之间的关系,如挚友保举、外交影响力分析。
路径分析
:计算最短路径、页面排名等。
社区检测
:辨认图中连接精密的子图或社区。
网络优化
:通过图分析网络拓扑结构,优化数据流路由等。
二、Spark Streaming
2.1 Spark Streaming 的主要概念
Spark Streaming 提供了一个强大且易用的 API,使开发者可以或许轻松地构建实时数据处理应用,特殊适合必要低延长、高吞吐量的场景。
DStream (Discretized Stream)
:
DStream 是 Spark Streaming 中的焦点抽象,表示一个连续的数据流。它可以被视为一系列 RDD(Resilient Distributed Datasets)的聚集,每个 RDD 都包含某个时间隔断内的数据。
DStream 可以从各种输入源(如 Kafka、Flume、TCP 套接字等)创建,也可以通过对现有 DStream 的转换来创建。
Transformations (转换操纵)
:
雷同于 Spark 的 RDD 转换操纵,DStream 支持各种转换操纵,如 map、filter、reduceByKey 等。每个转换操纵都会应用于 DStream 中的每个 RDD,生成一个新的 DStream。
Output Operations (输出操纵)
:
DStream 提供多种输出操纵,将处理后的数据输出到外部系统。比方,print、saveAsTextFiles、saveAsHadoopFiles、foreachRDD 等。
Window Operations (窗口操纵)
:
Spark Streaming 支持窗口操纵,允许对一段时间范围内的数据进行聚合。比方,可以计算已往 10 秒钟内的每 2 秒的数据。
2.2 示例代码
一个简朴的 Spark Streaming 示例,读取 TCP 套接字数据,并进行词频统计:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStream {
def main(args: Array[String]): Unit = {
// 创建 Spark 配置对象,并设置应用名称和运行模式
val conf = new SparkConf()
.setMaster("local[*]") // 使用本地模式运行,[*] 表示使用所有可用的 CPU 核心
.setAppName("spark-streaming") // 设置应用程序名称
// 创建 StreamingContext,设置批处理间隔为 5 秒
val scc = new StreamingContext(conf, Seconds(5))
// 监听本地端口 9999,接收流式数据
scc.socketTextStream("localhost", 9999)
.mapPartitions(
_.flatMap(
_.replaceAll("[^a-zA-Z ]+", "")
.split("\\s+")
.map((_, 1))
)
)
// 按单词进行归约,计算每个单词的出现次数
.reduceByKey(_ + _)
// 打印结果到控制台
.print()
// 启动 Spark Streaming 计算
scc.start()
// 等待应用程序终止
scc.awaitTermination()
}
}
复制代码
2.3 Spark Streaming 的集成
与 Kafka 集成
:Spark Streaming 可以从 Kafka 中读取数据流,用于实时日记处理、监控等场景。
与 Flume 集成
:结合 Flume 进行分布式日记收集,然后利用 Spark Streaming 实时处理和分析日记。
与 HDFS、S3 等集成
:将处理后的数据输出到 HDFS、S3 等分布式文件系统进行长期化存储。
与 SQL 和 MLlib 集成
:Spark Streaming 可以与 Spark SQL 和 MLlib 集成,进行实时的数据分析和机器学习任务。
2.4 Spark Streaming 的应用场景
实时日记分析
:监控服务器日记、应用日记,检测异常环境。
实时 ETL (Extract, Transform, Load)
:对流式数据进行洗濯、转换,并写入到数据仓库。
实时监控与报警
:对实时数据流进行分析,当检测到特定条件时触发报警。
在线保举系统
:基于实时用户行为数据进行保举,比方在线广告保举。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4