军用大数据--Spark图数据计算与分析实战

打印 上一主题 下一主题

主题 806|帖子 806|积分 2418

第1关:Spark GraphX 定义图结构

任务形貌

本关任务:编写一个计算出度和入度总数相同站点的小程序。
  1. package chapter4
  2. import org.apache.spark.graphx.{Edge, Graph, VertexId}
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.{SparkConf, SparkContext, graphx}
  5. object Demo {
  6.   def main(args: Array[String]): Unit = {
  7.     // 初始化spark
  8.     val sparkConf = new SparkConf().setAppName("GraphStreamDemo").set("spark.master", "local[*]")
  9.     val sc = new SparkContext(sparkConf)
  10.     // 顶点属性
  11.     val vertices: RDD[(VertexId, String)] = sc.parallelize(List(
  12.       (1L, "南马补给站"),
  13.       (2L, "多贝尔补给站"),
  14.       (3L, "安其补给站"),
  15.       (4L, "雪山补给站"),
  16.       (5L, "终极火力补给站"),
  17.       (6L, "末日补给站"),
  18.       (7L, "英迪补给站"),
  19.       (8L, "远洋补给站")
  20.     ))
  21.     // 各顶点间的关系数据
  22.     val edges: RDD[Edge[Double]] = sc.parallelize(List(
  23.         Edge(1L, 2L, 2.1),
  24.         Edge(2L, 3L, 2.2),
  25.         Edge(3L, 4L, 4.4),
  26.         Edge(1L, 5L, 3.4),
  27.         Edge(5L, 4L, 1.1),
  28.         Edge(4L, 8L, 2.3),
  29.         Edge(1L, 8L, 6.5),
  30.         Edge(3L, 7L, 3.4),
  31.         Edge(7L, 5L, 8.5),
  32.         Edge(2L, 6L, 3.3),
  33.         Edge(6L, 7L, 3.2),
  34.         Edge(7L, 8L, 2.2)
  35.     ))
  36.     // 定义图结构
  37.     val graph = Graph(vertices, edges)
  38.     // 计算所有出度和入度总数相同的补给站点,若出度和入度总数量相同则打印至控制台
  39.     val inDegrees = graph.inDegrees
  40.     val outDegrees = graph.outDegrees
  41.     val inOutDegrees = inDegrees.join(outDegrees).filter {
  42.       case (_, (inDeg, outDeg)) => inDeg == outDeg
  43.     }.sortByKey()
  44.     inOutDegrees.collect().foreach {
  45.       case (vertexId, (inDeg, outDeg)) =>
  46.         println(s"“${vertexId}”号补给站点的出度和入度总数相同,数量为:“${inDeg}”")
  47.     }
  48.   }
  49. }
复制代码
第2关:Spark GraphX 军用物资运输路线规划

任务形貌

本关任务:使用 Spark GraphX 编写一个计算生成最短路径的小程序。
  1. package chapter4
  2. import org.apache.spark.graphx.{Edge, EdgeDirection, EdgeTriplet, Graph, PartitionID, VertexId}
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.{SparkConf, SparkContext, graphx}
  5. import scala.collection.mutable.ArrayBuffer
  6. object Demo {
  7.   def main(args: Array[String]): Unit = {
  8.     // 初始化spark
  9.     val sparkConf = new SparkConf().setAppName("GraphStreamDemo").set("spark.master", "local[*]")
  10.     val sc = new SparkContext(sparkConf)
  11.     // 顶点属性
  12.     val vertices: RDD[(VertexId, String)] = sc.parallelize(List((1L, "南马补给站"),(2L, "多贝尔补给站"),(3L, "安其补给站"),(4L, "雪山补给站"),(5L, "终极火力补给站"),(6L, "末日补给站"),
  13. (7L, "英迪补给站"),(8L, "远洋补给站")))
  14.     // 各顶点间的关系数据
  15.     val edges: RDD[Edge[Double]] = sc.parallelize(List(
  16.       Edge(1L, 2L, 2.1),Edge(2L, 3L, 2.2),Edge(3L, 4L, 4.4),Edge(1L, 5L, 3.4),Edge(5L, 4L, 1.1),Edge(4L, 8L, 2.3),Edge(1L, 8L, 6.5),Edge(3L, 7L, 3.4),Edge(7L, 5L, 8.5),Edge(2L, 6L, 3.3),Edge(6L, 7L, 3.2),Edge(7L, 8L, 2.2)))
  17.     // 定义图结构
  18.     val graph = Graph(vertices, edges)
  19.     //被计算的图中 起始顶点id
  20.     val srcVertexId = 1L
  21.     // 初始化图结构
  22.     var initialGraph = graph.mapVertices((id, _) =>
  23.       if (id == srcVertexId) (ArrayBuffer(id), 0.0) else (ArrayBuffer[Long](), Double.PositiveInfinity)
  24.     )
  25.     // 使用 pregel 函数计算生成最短路径
  26.     val resultGraph = initialGraph.pregel(
  27.       (ArrayBuffer[Long](), Double.PositiveInfinity),
  28.       Int.MaxValue,
  29.       EdgeDirection.Out
  30.     )((id, oldDist, newDist) => {
  31.         if (oldDist._2 < newDist._2) oldDist else newDist
  32.       },
  33.       triplet => {
  34.         if (triplet.srcAttr._2 + triplet.attr < triplet.dstAttr._2) {
  35.           Iterator((triplet.dstId, (triplet.srcAttr._1 :+ triplet.dstId, triplet.srcAttr._2 + triplet.attr)))
  36.         } else {
  37.           Iterator.empty
  38.         }
  39.       },
  40.       (a, b) => if (a._2 < b._2) a else b
  41.     ).subgraph(vpred = (id, attr) => id != srcVertexId)
  42.     // 遍历数据集 打印格式为:从总补给点“xxx”前往目标补给点“xxx”配送物资的最近距离为“xxx”km,其行走线路为:xxx
  43.     resultGraph.vertices.collect().foreach { case (id, attr) =>
  44.       val distance = f"${attr._2}%.1f" // 格式化距离保留一位小数
  45.       val path = if (attr._1.nonEmpty) attr._1.mkString("->") else id.toString
  46.       println(s"从总补给点“${srcVertexId}”前往目标补给点“${id}”配送物资的最近距离为“${distance}”km,其行走线路为:${path}")
  47.     }
  48.   }
  49. }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

何小豆儿在此

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表