羊蹓狼 发表于 2024-8-16 21:45:23

Spark 组件 GraphX、Streaming

Spark 组件 GraphX、Streaming



[*]一、Spark GraphX

[*]1.1 GraphX 的主要概念
[*]1.2 GraphX 的焦点操纵
[*]1.3 示例代码
[*]1.4 GraphX 的应用场景

[*]二、Spark Streaming

[*]2.1 Spark Streaming 的主要概念
[*]2.2 示例代码
[*]2.3 Spark Streaming 的集成
[*]2.4 Spark Streaming 的应用场景

   Spark GraphX 用于处理图和图并行计算。GraphX 将图的表达和操纵嵌入到 Spark 的数据流 API 中,允许用户在图上实行高效的并行计算。GraphX 结合了图计算和数据流计算的功能,使得它可以或许处理复杂的数据分析任务。
Spark Streaming 用于处理实时数据流。它允许开发者以微批处理(Micro-batch)的方式处理实时数据,提供了一个高层次的 API,可以轻松地将批处理操纵应用于实时数据流。
一、Spark GraphX

1.1 GraphX 的主要概念


[*]极点 (Vertex):

[*]图中的节点,表示实体或对象。每个极点都有一个唯一的标识符(ID)和属性。

[*]边 (Edge):

[*]图中的连接,表示极点之间的关系。每条边连接两个极点,而且也可以有属性。

[*]图 (Graph):

[*]由极点和边组成的结构。GraphX 利用 Graph 类来表示图,极点和边的聚集分别由 RDD 和 RDD] 表示,此中 VD 和 ED 是极点和边的属性类型。

[*]Triplet:

[*]GraphX 中的 EdgeTriplet 代表一条边及其连接的两个极点的信息,允许同时访问极点和边的属性。

1.2 GraphX 的焦点操纵


[*]图构造 (Graph Construction):

[*]通过极点和边的 RDD 来构建图。比方,利用 Graph(vertices, edges) 构造一个图。

[*]图转换 (Graph Transformation):

[*]对图进行操纵,比方过滤极点和边 (subgraph),或将极点和边的属性映射到新属性 (mapVertices 和 mapEdges)。

[*]聚合消息 (Aggregate Messages):

[*]用于从邻接极点或边聚合信息。这在实现图算法(如 PageRank)时特殊有效。

[*]图算法 (Graph Algorithms):

[*]GraphX 提供了一些预定义的图算法,如 PageRank、Connected Components、Shortest Paths 和 Triangle Counting。

1.3 示例代码

1.3.1 简朴的 GraphX 示例,创建一个图并运行 PageRank 算法:

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object GraphXExample {
def main(args: Array): Unit = {
    val conf = new SparkConf()
      .setAppName("graphx example")
      .setMaster("local[*]")
   
    // 创建 SparkSession
    val spark = SparkSession
      .builder
      .config(conf)
      .getOrCreate()

    // 创建顶点 RDD
    val vertices: RDD[(VertexId, String)] = spark.sparkContext.parallelize(Seq(
      (1L, "Alice"),
      (2L, "Bob"),
      (3L, "Charlie"),
      (4L, "David")
    ))

    // 创建边 RDD
    val edges: RDD] = spark.sparkContext.parallelize(Seq(
      Edge(1L, 2L, 1),
      Edge(2L, 3L, 1),
      Edge(3L, 4L, 1),
      Edge(4L, 1L, 1)
    ))

    // 创建图
    val graph = Graph(vertices, edges)

    // 运行 PageRank 算法
    // PageRank 算法最初是用于搜索引擎中对网页进行排序的基础算法,通过分析网络中的链接结构来评估每个网页的相对重要性。
    // 这里0.0001为收敛阈值
    val ranks = graph.pageRank(0.0001).vertices

    // 打印结果
    ranks.collect().foreach { case (id, rank) => println(s"Vertex $id has rank $rank") }

    // 停止 SparkSession
    spark.stop()
}
}

1.3.2 Spark GraphX 的一些基本操纵和概念

// 创建图
val graph = Graph(vertices, edges)

// vertices:获取图中的所有顶点。
graph.vertices.collect.foreach(println)

// mapVertices:对顶点的属性进行变换。
val newGraph = graph.mapVertices((id, attr) => attr.toUpperCase)

// edges:获取图中的所有边。
graph.edges.collect.foreach(println)

// mapEdges:对边的属性进行变换。
val newGraph = graph.mapEdges(e => e.attr * 2)

// 查看所有的边(将点带入)【完整】
graph.triplets.foreach(println)
// 入度:指向自己的个数
graph.inDegrees.foreach(println) // (人id,入度)

// 出度:指向别人的个数
graph.outDegrees.foreach(println) // (人id,出度)

// 将入度和出度一起显示
graph
    .inDegrees
    .join(graph.outDegrees)
    .foreach(println) // (人id,(入度,出度))

// 度:入度+出度
graph.degrees.foreach(println)

// subgraph:根据条件创建一个子图,保留满足条件的顶点和边。
val subGraph = graph.subgraph(vpred = (id, attr) => attr != "Bob")

// joinVertices:将图的顶点属性与一个新的 RDD 进行连接,并更新顶点属性。
val newAttrs: RDD[(Long, String)] = sc.parallelize(Seq(
(1L, "Alice_new"),
(4L, "David_new")
))

val joinedGraph = graph.joinVertices(newAttrs) {
case (id, oldAttr, newAttr) => newAttr
}

1.3.3 图算法

// PageRank 算法用于计算图中每个顶点的重要性。
val ranks = graph.pageRank(0.0001).vertices
ranks.collect.foreach(println)

// 连接组件: 连通组件算法用于查找图中的连通子图。
val connectedComponents = graph.connectedComponents().vertices
connectedComponents.collect.foreach(println)

// 三角计数: 三角计数算法用于计算每个顶点所属的三角形数量。
val triangleCounts = graph.triangleCount().vertices
triangleCounts.collect.foreach(println)

// 图的持久化与加载: 图可以通过将顶点和边的 RDD 存储在 HDFS 或其他文件系统中进行持久化。
// 保存图的顶点和边
graph.vertices.saveAsTextFile("hdfs://path/to/vertices")
graph.edges.saveAsTextFile("hdfs://path/to/edges")

// 从文件中加载图
val loadedVertices: RDD[(Long, String)] = sc.textFile("hdfs://path/to/vertices").map { line =>
val parts = line.split(",")
(parts(0).toLong, parts(1))
}

val loadedEdges: RDD] = sc.textFile("hdfs://path/to/edges").map { line =>
val parts = line.split(",")
Edge(parts(0).toLong, parts(1).toLong, parts(2).toInt)
}

val loadedGraph: Graph = Graph(loadedVertices, loadedEdges)
1.3.4 Pregel 模型

   Pregel 算法是一种迭代、消息通报的计算模型,特殊适用于处理图的遍历和递归题目,如最短路径计算、PageRank、连通组件检测等。
Pregel 的焦点头脑是将图计算任务表示为一个超等步(superstep)序列,每个超等步由以下几个阶段组成:

[*]消息通报:每个极点可以向相邻的极点发送消息。
[*]消息处理:每个极点接收来自相邻极点的消息,并更新自己的状态。
[*]极点计算:极点在处理完消息后可以决定是否继续活跃或制止计算(halt)。
这个过程会不停迭代,直到所有极点都制止计算或达到指定的迭代次数。
// Pregel 模型:
val initialMsg = ... // 定义初始消息
val maxIterations = 10 // 最大迭代次数

val resultGraph = graph.pregel(initialMsg, maxIterations)(
// 顶点程序,处理接收到的消息并更新顶点属性
vprog = (id, attr, msg) => {
    // 根据顶点的属性和收到的消息,更新顶点属性
},
// 发送消息,定义如何从一个顶点向相邻的顶点发送消息
sendMsg = triplet => {
    // 根据边的属性和顶点的状态,决定发送的消息
},
// 聚合消息,定义如何合并一个顶点接收到的所有消息
mergeMsg = (msg1, msg2) => {
    // 合并接收到的多个消息
}
)
利用 Pregel 模型可以计算图中某个出发点到其他所有极点的最短路径。
import org.apache.spark.SparkConf
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object GraphxTest {
def main(args: Array): Unit = {

    val conf = new SparkConf()
      .setAppName("graphx example")
      .setMaster("local[*]")

    // 创建 SparkSession
    val spark = SparkSession
      .builder
      .config(conf)
      .getOrCreate()

    val vertices: RDD[(VertexId, String)] = spark.sparkContext.parallelize(Seq(
      (1L, "Alice"),
      (2L, "Bob"),
      (3L, "Charlie"),
      (4L, "David")
    ))

    // 创建边 RDD
    val edges: RDD] = spark.sparkContext.parallelize(Seq(
      Edge(1L, 2L, 1),
      Edge(2L, 3L, 1),
      Edge(2L, 4L, 5),
      Edge(3L, 4L, 2),
      Edge(4L, 1L, 1)
    ))

    // 创建图
    val graph: Graph = Graph(vertices, edges)

    // 设置源顶点 ID
    val sourceId: VertexId = 1L

    // 初始化图的顶点属性,将源顶点的距离设为 0.0,其他顶点设为无穷大
    val initialGraph = graph.mapVertices((id, name) =>
      if (id == sourceId) (name, 0.0) else (name, Double.PositiveInfinity)
    )

    // 运行 Pregel 算法计算单源最短路径 (SSSP)
    val sssp = initialGraph.pregel(Double.PositiveInfinity)(
      // 顶点程序:更新顶点属性(即距离)为当前距离与新接收到的距离中的较小值
      vprog = (id, attr, msg) => (attr._1, Math.min(attr._2, msg)),

      // 消息发送函数:计算从源顶点到目标顶点的距离,如果新计算的距离小于目标顶点当前的距离,则发送消息
      sendMsg = triplet => {
      if (triplet.srcAttr._2 + triplet.attr < triplet.dstAttr._2) {
          Iterator((triplet.dstId, triplet.srcAttr._2 + triplet.attr))
      } else {
          Iterator.empty
      }
      },
      // 消息合并函数:在目标顶点收到多个消息时,取最短的距离
      mergeMsg = (a, b) => math.min(a, b)
    )

    // 打印最终的最短路径结果
    sssp.vertices.collect.foreach { case (id, (name, dist)) =>
      println(s"Distance from $sourceId to $id ($name) is $dist")
    }

    // 停止 SparkSession
    spark.stop()
}
}

1.4 GraphX 的应用场景



[*]外交网络分析:分析用户之间的关系,如挚友保举、外交影响力分析。
[*]路径分析:计算最短路径、页面排名等。
[*]社区检测:辨认图中连接精密的子图或社区。
[*]网络优化:通过图分析网络拓扑结构,优化数据流路由等。
二、Spark Streaming

2.1 Spark Streaming 的主要概念

   Spark Streaming 提供了一个强大且易用的 API,使开发者可以或许轻松地构建实时数据处理应用,特殊适合必要低延长、高吞吐量的场景。

[*]DStream (Discretized Stream):

[*]DStream 是 Spark Streaming 中的焦点抽象,表示一个连续的数据流。它可以被视为一系列 RDD(Resilient Distributed Datasets)的聚集,每个 RDD 都包含某个时间隔断内的数据。
[*]DStream 可以从各种输入源(如 Kafka、Flume、TCP 套接字等)创建,也可以通过对现有 DStream 的转换来创建。

[*]Transformations (转换操纵):

[*]雷同于 Spark 的 RDD 转换操纵,DStream 支持各种转换操纵,如 map、filter、reduceByKey 等。每个转换操纵都会应用于 DStream 中的每个 RDD,生成一个新的 DStream。

[*]Output Operations (输出操纵):

[*]DStream 提供多种输出操纵,将处理后的数据输出到外部系统。比方,print、saveAsTextFiles、saveAsHadoopFiles、foreachRDD 等。

[*]Window Operations (窗口操纵):

[*]Spark Streaming 支持窗口操纵,允许对一段时间范围内的数据进行聚合。比方,可以计算已往 10 秒钟内的每 2 秒的数据。

2.2 示例代码

一个简朴的 Spark Streaming 示例,读取 TCP 套接字数据,并进行词频统计:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStream {
def main(args: Array): Unit = {
    // 创建 Spark 配置对象,并设置应用名称和运行模式
    val conf = new SparkConf()
      .setMaster("local[*]")// 使用本地模式运行,[*] 表示使用所有可用的 CPU 核心
      .setAppName("spark-streaming")// 设置应用程序名称

    // 创建 StreamingContext,设置批处理间隔为 5 秒
    val scc = new StreamingContext(conf, Seconds(5))

    // 监听本地端口 9999,接收流式数据
    scc.socketTextStream("localhost", 9999)
      .mapPartitions(
      _.flatMap(
          _.replaceAll("[^a-zA-Z ]+", "")
            .split("\\s+")
            .map((_, 1))
      )
      )
      // 按单词进行归约,计算每个单词的出现次数
      .reduceByKey(_ + _)
      // 打印结果到控制台
      .print()

    // 启动 Spark Streaming 计算
    scc.start()
    // 等待应用程序终止
    scc.awaitTermination()
}
}
2.3 Spark Streaming 的集成



[*]与 Kafka 集成:Spark Streaming 可以从 Kafka 中读取数据流,用于实时日记处理、监控等场景。
[*]与 Flume 集成:结合 Flume 进行分布式日记收集,然后利用 Spark Streaming 实时处理和分析日记。
[*]与 HDFS、S3 等集成:将处理后的数据输出到 HDFS、S3 等分布式文件系统进行长期化存储。
[*]与 SQL 和 MLlib 集成:Spark Streaming 可以与 Spark SQL 和 MLlib 集成,进行实时的数据分析和机器学习任务。
2.4 Spark Streaming 的应用场景



[*]实时日记分析:监控服务器日记、应用日记,检测异常环境。
[*]实时 ETL (Extract, Transform, Load):对流式数据进行洗濯、转换,并写入到数据仓库。
[*]实时监控与报警:对实时数据流进行分析,当检测到特定条件时触发报警。
[*]在线保举系统:基于实时用户行为数据进行保举,比方在线广告保举。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Spark 组件 GraphX、Streaming