大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共 ...

金歌  论坛元老 | 2024-8-18 09:08:57 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1712|帖子 1712|积分 5136

点一下关注吧!!!非常感谢!!持续更新!!!

如今已经更新到了:



  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)
章节内容

上节我们完成了如下的内容:


  • Spark 学习 WordCount 程序
  • Scala & Java 的方式分别编写 WordCount 程序

计算圆周率

需求背景

我们要实现一个程序来实现圆周率的计算,将使用下面的公式:

编写代码

  1. package icu.wzk
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import scala.math.random
  4. object SparkPi {
  5.   def main(args: Array[String]): Unit = {
  6.     var conf = new SparkConf()
  7.       .setAppName("ScalaSparkPi")
  8.       .setMaster("local[*]")
  9.     val sc = new SparkContext(conf)
  10.     sc.setLogLevel("WARN")
  11.     val slices = if (args.length > 0) {
  12.       args(0).toInt
  13.     } else {
  14.       0
  15.     }
  16.     val N =  100000000
  17.     val count = sc.makeRDD(1 to N, slices)
  18.       .map(idx => {
  19.         val (x, y) = (random, random)
  20.         if (x*x + y*y <= 1) {
  21.           1
  22.         } else {
  23.           0
  24.         }
  25.       }).reduce(_ + _)
  26.     println(s"Pi is ${4.0 * count / N}")
  27.   }
  28. }
复制代码
代码部分截图如下所示:

代码解释

object SparkPi { … }

这个对象界说了一个 Scala 应用程序的入口。Scala 的 object 关键字用于界说一个单例对象,这意味着 SparkPi 只能有一个实例。
def main(args: Array[String]): Unit = { … }

main 方法是 Scala 应用程序的入口点,类似于 Java 中的 main 方法。args 是传递给程序的命令行参数,类型为 Array[String]。Unit 表现该方法没有返回值。
var conf = new SparkConf().setAppName(“ScalaSparkPi”)



  • SparkConf() 用于配置 Spark 应用程序。setAppName(“ScalaSparkPi”) 设置应用程序的名称为 ScalaSparkPi。
  • setMaster("local[]") 表现 Spark 应用程序将在本地运行,使用所有可用的 CPU 核心。local[] 是 Spark 中的特殊设置,表现本地模式下使用所有的 CPU 核心。
val sc = new SparkContext(conf)

SparkContext 是 Spark 应用程序的核心,负责与 Spark 集群举行交互。这里通过配置对象 conf 创建了一个新的 SparkContext 实例。
sc.setLogLevel(“WARN”)

设置日记的级别为 “WARN”。这意味着只会记录告诫级别及以上的日记信息,淘汰不必要的日记输出。
val slices = if (args.length > 0) { … }

这段代码用来处置惩罚传递给程序的第一个参数,如果有参数传递过来,则将其转换为整数,作为分片数 slices。如果没有参数,则默认值为 0。
val N = 100000000

界说一个常量 N,表现将举行一亿次随机点的天生,以此来估算 \pi 值。
val count = sc.makeRDD(1 to N, slices)



  • sc.makeRDD(1 to N, slices) 创建一个包含从 1 到 N 的整数的 RDD(弹性分布式数据集),并将其划分为 slices 个分片举行并行计算。
  • map(idx => { … }) 是对 RDD 中的每个元素举行映射操作。对于每个 idx,天生两个随机数 x 和 y,分别表现点的 x 和 y 坐标。
  • if (xx + yy <= 1) 判断点 (x, y) 是否在单位圆内。如果在圆内,则返回 1,否则返回 0。
reduce(_ + _)



  • reduce(_ + _) 将所有的 1 和 0 相加,得到在单位圆内的点的总数。
println(s"i is ${4.0 * count / N}")



  • 计算 \pi 的估计值:使用公式 \pi \approx 4 \times (\text{圆内点的数量} / \text{总点数})。
  • 输出计算结果。
打包上传

  1. mvn clean package
复制代码
打包完成上传Jar包:

运行项目

  1. spark-submit --master local[*] --class icu.wzk.SparkPi spark-wordcount-1.0-SNAPSHOT.jar 15
复制代码
运行等候结果

运行完毕的结果如下:

找共同挚友

需求背景

如今有一组数据
  1. 100, 200 300 400 500 600
  2. 200, 100 300 400
  3. 300, 100 200 400 500
  4. 400, 100 200 300
  5. 500, 100 300
  6. 600, 100
复制代码
第一列表现用户,后边的数字表现该用户的挚友,我们要对上面的这几枚举行分析计算,得出共同的挚友。

编写代码

方法一

核心思想使用笛卡尔积求两两之间的挚友 然后去除多余的数据
  1. package icu.wzk
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object FindFriends {
  5.   def main(args: Array[String]): Unit = {
  6.     val conf = new SparkConf()
  7.       .setAppName("SparkFindFriends")
  8.       .setMaster("local[*]")
  9.     val sc = new SparkContext(conf)
  10.     sc.setLogLevel("WARN")
  11.     val lines: RDD[String] = sc.textFile(args(0))
  12.     val friendsRDD: RDD[(String, Array[String])] = lines.map{
  13.       line =>
  14.         val fields: Array[String] = line.split(",")
  15.         val userId = fields(0).trim
  16.         val friends:  Array[String] = fields(1).trim.split("\\s+")
  17.         (userId, friends)
  18.     }
  19.     friendsRDD
  20.       .cartesian(friendsRDD)
  21.       .filter({
  22.         case ((id1, _), (id2, _)) => id1 < id2
  23.       })
  24.       .map{
  25.         case ((id1, friends1), (id2, friends2)) => ((id1, id2), friends1.intersect(friends2).sorted.toBuffer)
  26.       }
  27.       .sortByKey()
  28.       .collect()
  29.       .foreach(println)
  30.     sc.stop()
  31.   }
  32. }
复制代码
方法二

消除笛卡尔积 核心思想是:将数据变形,找到两两的挚友,再实行数据的归并
  1. package icu.wzk
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object FindFriends2 {
  5.   def main(args: Array[String]): Unit = {
  6.     val conf = new SparkConf()
  7.       .setAppName("SparkFindFriends")
  8.       .setMaster("local[*]")
  9.     val sc = new SparkContext(conf)
  10.     sc.setLogLevel("WARN")
  11.     val lines: RDD[String] = sc.textFile(args(0))
  12.     val friendsRDD: RDD[(String, Array[String])] = lines.map{
  13.       line =>
  14.         val fields: Array[String] = line.split(",")
  15.         val userId = fields(0).trim
  16.         val friends:  Array[String] = fields(1).trim.split("\\s+")
  17.         (userId, friends)
  18.     }
  19.     friendsRDD
  20.       .flatMapValues(friends => friends.combinations(2))
  21.       .map{
  22.         case (k, v) => (v.mkString(" & "), Set(k))
  23.       }
  24.       .reduceByKey(_ | _)
  25.       .sortByKey()
  26.       .collect()
  27.       .foreach(println)
  28.     sc.stop()
  29.   }
  30. }
复制代码
打包上传


运行项目

方法一

  1. spark-submit --master local[*] --class icu.wzk.FindFriends spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/friends.txt
复制代码
运行结果如下图:

方法二

  1. spark-submit --master local[*] --class icu.wzk.FindFriends2 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/friends.txt
复制代码
运行结果如下图所示:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

金歌

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表