ToB企服应用市场:ToB评测及商务社交产业平台

标题: Spark 组件 GraphX、Streaming [打印本页]

作者: 羊蹓狼    时间: 2024-8-16 21:45
标题: Spark 组件 GraphX、Streaming
Spark 组件 GraphX、Streaming


   Spark GraphX 用于处理图和图并行计算。GraphX 将图的表达和操纵嵌入到 Spark 的数据流 API 中,允许用户在图上实行高效的并行计算。GraphX 结合了图计算和数据流计算的功能,使得它可以或许处理复杂的数据分析任务。
  Spark Streaming 用于处理实时数据流。它允许开发者以微批处理(Micro-batch)的方式处理实时数据,提供了一个高层次的 API,可以轻松地将批处理操纵应用于实时数据流。
  一、Spark GraphX

1.1 GraphX 的主要概念

1.2 GraphX 的焦点操纵

1.3 示例代码

1.3.1 简朴的 GraphX 示例,创建一个图并运行 PageRank 算法:

  1. import org.apache.spark.graphx._
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.sql.SparkSession
  4. object GraphXExample {
  5.   def main(args: Array[String]): Unit = {
  6.     val conf = new SparkConf()
  7.       .setAppName("graphx example")
  8.       .setMaster("local[*]")
  9.    
  10.     // 创建 SparkSession
  11.     val spark = SparkSession
  12.       .builder
  13.       .config(conf)
  14.       .getOrCreate()
  15.     // 创建顶点 RDD
  16.     val vertices: RDD[(VertexId, String)] = spark.sparkContext.parallelize(Seq(
  17.       (1L, "Alice"),
  18.       (2L, "Bob"),
  19.       (3L, "Charlie"),
  20.       (4L, "David")
  21.     ))
  22.     // 创建边 RDD
  23.     val edges: RDD[Edge[Int]] = spark.sparkContext.parallelize(Seq(
  24.       Edge(1L, 2L, 1),
  25.       Edge(2L, 3L, 1),
  26.       Edge(3L, 4L, 1),
  27.       Edge(4L, 1L, 1)
  28.     ))
  29.     // 创建图
  30.     val graph = Graph(vertices, edges)
  31.     // 运行 PageRank 算法
  32.     // PageRank 算法最初是用于搜索引擎中对网页进行排序的基础算法,通过分析网络中的链接结构来评估每个网页的相对重要性。
  33.     // 这里0.0001为收敛阈值
  34.     val ranks = graph.pageRank(0.0001).vertices
  35.     // 打印结果
  36.     ranks.collect().foreach { case (id, rank) => println(s"Vertex $id has rank $rank") }
  37.     // 停止 SparkSession
  38.     spark.stop()
  39.   }
  40. }
复制代码
1.3.2 Spark GraphX 的一些基本操纵和概念

  1. // 创建图
  2. val graph = Graph(vertices, edges)
  3. // vertices:获取图中的所有顶点。
  4. graph.vertices.collect.foreach(println)
  5. // mapVertices:对顶点的属性进行变换。
  6. val newGraph = graph.mapVertices((id, attr) => attr.toUpperCase)
  7. // edges:获取图中的所有边。
  8. graph.edges.collect.foreach(println)
  9. // mapEdges:对边的属性进行变换。
  10. val newGraph = graph.mapEdges(e => e.attr * 2)
  11. // 查看所有的边(将点带入)【完整】
  12. graph.triplets.foreach(println)
  13. // 入度:指向自己的个数
  14. graph.inDegrees.foreach(println) // (人id,入度)
  15. // 出度:指向别人的个数
  16. graph.outDegrees.foreach(println) // (人id,出度)
  17. // 将入度和出度一起显示
  18. graph
  19.     .inDegrees
  20.     .join(graph.outDegrees)
  21.     .foreach(println) // (人id,(入度,出度))
  22. // 度:入度+出度
  23. graph.degrees.foreach(println)
  24. // subgraph:根据条件创建一个子图,保留满足条件的顶点和边。
  25. val subGraph = graph.subgraph(vpred = (id, attr) => attr != "Bob")
  26. // joinVertices:将图的顶点属性与一个新的 RDD 进行连接,并更新顶点属性。
  27. val newAttrs: RDD[(Long, String)] = sc.parallelize(Seq(
  28.   (1L, "Alice_new"),
  29.   (4L, "David_new")
  30. ))
  31. val joinedGraph = graph.joinVertices(newAttrs) {
  32.   case (id, oldAttr, newAttr) => newAttr
  33. }
复制代码
1.3.3 图算法

  1. // PageRank 算法用于计算图中每个顶点的重要性。
  2. val ranks = graph.pageRank(0.0001).vertices
  3. ranks.collect.foreach(println)
  4. // 连接组件: 连通组件算法用于查找图中的连通子图。
  5. val connectedComponents = graph.connectedComponents().vertices
  6. connectedComponents.collect.foreach(println)
  7. // 三角计数: 三角计数算法用于计算每个顶点所属的三角形数量。
  8. val triangleCounts = graph.triangleCount().vertices
  9. triangleCounts.collect.foreach(println)
  10. // 图的持久化与加载: 图可以通过将顶点和边的 RDD 存储在 HDFS 或其他文件系统中进行持久化。
  11. // 保存图的顶点和边
  12. graph.vertices.saveAsTextFile("hdfs://path/to/vertices")
  13. graph.edges.saveAsTextFile("hdfs://path/to/edges")
  14. // 从文件中加载图
  15. val loadedVertices: RDD[(Long, String)] = sc.textFile("hdfs://path/to/vertices").map { line =>
  16.   val parts = line.split(",")
  17.   (parts(0).toLong, parts(1))
  18. }
  19. val loadedEdges: RDD[Edge[Int]] = sc.textFile("hdfs://path/to/edges").map { line =>
  20.   val parts = line.split(",")
  21.   Edge(parts(0).toLong, parts(1).toLong, parts(2).toInt)
  22. }
  23. val loadedGraph: Graph[String, Int] = Graph(loadedVertices, loadedEdges)
复制代码
1.3.4 Pregel 模型

   Pregel 算法是一种迭代、消息通报的计算模型,特殊适用于处理图的遍历和递归题目,如最短路径计算、PageRank、连通组件检测等。
Pregel 的焦点头脑是将图计算任务表示为一个超等步(superstep)序列,每个超等步由以下几个阶段组成:
    这个过程会不停迭代,直到所有极点都制止计算或达到指定的迭代次数。
  1. // Pregel 模型:
  2. val initialMsg = ... // 定义初始消息
  3. val maxIterations = 10 // 最大迭代次数
  4. val resultGraph = graph.pregel(initialMsg, maxIterations)(
  5.   // 顶点程序,处理接收到的消息并更新顶点属性
  6.   vprog = (id, attr, msg) => {
  7.     // 根据顶点的属性和收到的消息,更新顶点属性
  8.   },
  9.   // 发送消息,定义如何从一个顶点向相邻的顶点发送消息
  10.   sendMsg = triplet => {
  11.     // 根据边的属性和顶点的状态,决定发送的消息
  12.   },
  13.   // 聚合消息,定义如何合并一个顶点接收到的所有消息
  14.   mergeMsg = (msg1, msg2) => {
  15.     // 合并接收到的多个消息
  16.   }
  17. )
复制代码
利用 Pregel 模型可以计算图中某个出发点到其他所有极点的最短路径。
  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.graphx._
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.sql.SparkSession
  5. object GraphxTest {
  6.   def main(args: Array[String]): Unit = {
  7.     val conf = new SparkConf()
  8.       .setAppName("graphx example")
  9.       .setMaster("local[*]")
  10.     // 创建 SparkSession
  11.     val spark = SparkSession
  12.       .builder
  13.       .config(conf)
  14.       .getOrCreate()
  15.     val vertices: RDD[(VertexId, String)] = spark.sparkContext.parallelize(Seq(
  16.       (1L, "Alice"),
  17.       (2L, "Bob"),
  18.       (3L, "Charlie"),
  19.       (4L, "David")
  20.     ))
  21.     // 创建边 RDD
  22.     val edges: RDD[Edge[Int]] = spark.sparkContext.parallelize(Seq(
  23.       Edge(1L, 2L, 1),
  24.       Edge(2L, 3L, 1),
  25.       Edge(2L, 4L, 5),
  26.       Edge(3L, 4L, 2),
  27.       Edge(4L, 1L, 1)
  28.     ))
  29.     // 创建图
  30.     val graph: Graph[String, PartitionID] = Graph(vertices, edges)
  31.     // 设置源顶点 ID
  32.     val sourceId: VertexId = 1L
  33.     // 初始化图的顶点属性,将源顶点的距离设为 0.0,其他顶点设为无穷大
  34.     val initialGraph = graph.mapVertices((id, name) =>
  35.       if (id == sourceId) (name, 0.0) else (name, Double.PositiveInfinity)
  36.     )
  37.     // 运行 Pregel 算法计算单源最短路径 (SSSP)
  38.     val sssp = initialGraph.pregel(Double.PositiveInfinity)(
  39.       // 顶点程序:更新顶点属性(即距离)为当前距离与新接收到的距离中的较小值
  40.       vprog = (id, attr, msg) => (attr._1, Math.min(attr._2, msg)),
  41.       // 消息发送函数:计算从源顶点到目标顶点的距离,如果新计算的距离小于目标顶点当前的距离,则发送消息
  42.       sendMsg = triplet => {
  43.         if (triplet.srcAttr._2 + triplet.attr < triplet.dstAttr._2) {
  44.           Iterator((triplet.dstId, triplet.srcAttr._2 + triplet.attr))
  45.         } else {
  46.           Iterator.empty
  47.         }
  48.       },
  49.       // 消息合并函数:在目标顶点收到多个消息时,取最短的距离
  50.       mergeMsg = (a, b) => math.min(a, b)
  51.     )
  52.     // 打印最终的最短路径结果
  53.     sssp.vertices.collect.foreach { case (id, (name, dist)) =>
  54.       println(s"Distance from $sourceId to $id ($name) is $dist")
  55.     }
  56.     // 停止 SparkSession
  57.     spark.stop()
  58.   }
  59. }
复制代码
1.4 GraphX 的应用场景


二、Spark Streaming

2.1 Spark Streaming 的主要概念

   Spark Streaming 提供了一个强大且易用的 API,使开发者可以或许轻松地构建实时数据处理应用,特殊适合必要低延长、高吞吐量的场景。
   2.2 示例代码

一个简朴的 Spark Streaming 示例,读取 TCP 套接字数据,并进行词频统计:
  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.streaming.{Seconds, StreamingContext}
  3. object SparkStream {
  4.   def main(args: Array[String]): Unit = {
  5.     // 创建 Spark 配置对象,并设置应用名称和运行模式
  6.     val conf = new SparkConf()
  7.       .setMaster("local[*]")  // 使用本地模式运行,[*] 表示使用所有可用的 CPU 核心
  8.       .setAppName("spark-streaming")  // 设置应用程序名称
  9.     // 创建 StreamingContext,设置批处理间隔为 5 秒
  10.     val scc = new StreamingContext(conf, Seconds(5))
  11.     // 监听本地端口 9999,接收流式数据
  12.     scc.socketTextStream("localhost", 9999)
  13.       .mapPartitions(
  14.         _.flatMap(
  15.           _.replaceAll("[^a-zA-Z ]+", "")  
  16.             .split("\\s+")
  17.             .map((_, 1))
  18.         )
  19.       )
  20.       // 按单词进行归约,计算每个单词的出现次数
  21.       .reduceByKey(_ + _)
  22.       // 打印结果到控制台
  23.       .print()
  24.     // 启动 Spark Streaming 计算
  25.     scc.start()
  26.     // 等待应用程序终止
  27.     scc.awaitTermination()
  28.   }
  29. }
复制代码
2.3 Spark Streaming 的集成


2.4 Spark Streaming 的应用场景



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4