第1关:Spark GraphX 定义图结构
任务形貌
本关任务:编写一个计算出度和入度总数相同站点的小程序。
- package chapter4
- import org.apache.spark.graphx.{Edge, Graph, VertexId}
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext, graphx}
- object Demo {
- def main(args: Array[String]): Unit = {
- // 初始化spark
- val sparkConf = new SparkConf().setAppName("GraphStreamDemo").set("spark.master", "local[*]")
- val sc = new SparkContext(sparkConf)
- // 顶点属性
- val vertices: RDD[(VertexId, String)] = sc.parallelize(List(
- (1L, "南马补给站"),
- (2L, "多贝尔补给站"),
- (3L, "安其补给站"),
- (4L, "雪山补给站"),
- (5L, "终极火力补给站"),
- (6L, "末日补给站"),
- (7L, "英迪补给站"),
- (8L, "远洋补给站")
- ))
- // 各顶点间的关系数据
- val edges: RDD[Edge[Double]] = sc.parallelize(List(
- Edge(1L, 2L, 2.1),
- Edge(2L, 3L, 2.2),
- Edge(3L, 4L, 4.4),
- Edge(1L, 5L, 3.4),
- Edge(5L, 4L, 1.1),
- Edge(4L, 8L, 2.3),
- Edge(1L, 8L, 6.5),
- Edge(3L, 7L, 3.4),
- Edge(7L, 5L, 8.5),
- Edge(2L, 6L, 3.3),
- Edge(6L, 7L, 3.2),
- Edge(7L, 8L, 2.2)
- ))
- // 定义图结构
- val graph = Graph(vertices, edges)
- // 计算所有出度和入度总数相同的补给站点,若出度和入度总数量相同则打印至控制台
- val inDegrees = graph.inDegrees
- val outDegrees = graph.outDegrees
- val inOutDegrees = inDegrees.join(outDegrees).filter {
- case (_, (inDeg, outDeg)) => inDeg == outDeg
- }.sortByKey()
- inOutDegrees.collect().foreach {
- case (vertexId, (inDeg, outDeg)) =>
- println(s"“${vertexId}”号补给站点的出度和入度总数相同,数量为:“${inDeg}”")
- }
- }
- }
复制代码 第2关:Spark GraphX 军用物资运输路线规划
任务形貌
本关任务:使用 Spark GraphX 编写一个计算生成最短路径的小程序。
- package chapter4
- import org.apache.spark.graphx.{Edge, EdgeDirection, EdgeTriplet, Graph, PartitionID, VertexId}
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext, graphx}
- import scala.collection.mutable.ArrayBuffer
- object Demo {
- def main(args: Array[String]): Unit = {
- // 初始化spark
- val sparkConf = new SparkConf().setAppName("GraphStreamDemo").set("spark.master", "local[*]")
- val sc = new SparkContext(sparkConf)
- // 顶点属性
- val vertices: RDD[(VertexId, String)] = sc.parallelize(List((1L, "南马补给站"),(2L, "多贝尔补给站"),(3L, "安其补给站"),(4L, "雪山补给站"),(5L, "终极火力补给站"),(6L, "末日补给站"),
- (7L, "英迪补给站"),(8L, "远洋补给站")))
- // 各顶点间的关系数据
- val edges: RDD[Edge[Double]] = sc.parallelize(List(
- Edge(1L, 2L, 2.1),Edge(2L, 3L, 2.2),Edge(3L, 4L, 4.4),Edge(1L, 5L, 3.4),Edge(5L, 4L, 1.1),Edge(4L, 8L, 2.3),Edge(1L, 8L, 6.5),Edge(3L, 7L, 3.4),Edge(7L, 5L, 8.5),Edge(2L, 6L, 3.3),Edge(6L, 7L, 3.2),Edge(7L, 8L, 2.2)))
- // 定义图结构
- val graph = Graph(vertices, edges)
- //被计算的图中 起始顶点id
- val srcVertexId = 1L
- // 初始化图结构
- var initialGraph = graph.mapVertices((id, _) =>
- if (id == srcVertexId) (ArrayBuffer(id), 0.0) else (ArrayBuffer[Long](), Double.PositiveInfinity)
- )
- // 使用 pregel 函数计算生成最短路径
- val resultGraph = initialGraph.pregel(
- (ArrayBuffer[Long](), Double.PositiveInfinity),
- Int.MaxValue,
- EdgeDirection.Out
- )((id, oldDist, newDist) => {
- if (oldDist._2 < newDist._2) oldDist else newDist
- },
- triplet => {
- if (triplet.srcAttr._2 + triplet.attr < triplet.dstAttr._2) {
- Iterator((triplet.dstId, (triplet.srcAttr._1 :+ triplet.dstId, triplet.srcAttr._2 + triplet.attr)))
- } else {
- Iterator.empty
- }
- },
- (a, b) => if (a._2 < b._2) a else b
- ).subgraph(vpred = (id, attr) => id != srcVertexId)
- // 遍历数据集 打印格式为:从总补给点“xxx”前往目标补给点“xxx”配送物资的最近距离为“xxx”km,其行走线路为:xxx
- resultGraph.vertices.collect().foreach { case (id, attr) =>
- val distance = f"${attr._2}%.1f" // 格式化距离保留一位小数
- val path = if (attr._1.nonEmpty) attr._1.mkString("->") else id.toString
- println(s"从总补给点“${srcVertexId}”前往目标补给点“${id}”配送物资的最近距离为“${distance}”km,其行走线路为:${path}")
- }
- }
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |