点一下关注吧!!!非常感谢!!持续更新!!!
如今已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(正在更新!)
章节内容
上节我们完成了如下的内容:
- Spark 学习 WordCount 程序
- Scala & Java 的方式分别编写 WordCount 程序
计算圆周率
需求背景
我们要实现一个程序来实现圆周率的计算,将使用下面的公式:
编写代码
- package icu.wzk
- import org.apache.spark.{SparkConf, SparkContext}
- import scala.math.random
- object SparkPi {
- def main(args: Array[String]): Unit = {
- var conf = new SparkConf()
- .setAppName("ScalaSparkPi")
- .setMaster("local[*]")
- val sc = new SparkContext(conf)
- sc.setLogLevel("WARN")
- val slices = if (args.length > 0) {
- args(0).toInt
- } else {
- 0
- }
- val N = 100000000
- val count = sc.makeRDD(1 to N, slices)
- .map(idx => {
- val (x, y) = (random, random)
- if (x*x + y*y <= 1) {
- 1
- } else {
- 0
- }
- }).reduce(_ + _)
- println(s"Pi is ${4.0 * count / N}")
- }
- }
复制代码 代码部分截图如下所示:
代码解释
object SparkPi { … }
这个对象界说了一个 Scala 应用程序的入口。Scala 的 object 关键字用于界说一个单例对象,这意味着 SparkPi 只能有一个实例。
def main(args: Array[String]): Unit = { … }
main 方法是 Scala 应用程序的入口点,类似于 Java 中的 main 方法。args 是传递给程序的命令行参数,类型为 Array[String]。Unit 表现该方法没有返回值。
var conf = new SparkConf().setAppName(“ScalaSparkPi”)
- SparkConf() 用于配置 Spark 应用程序。setAppName(“ScalaSparkPi”) 设置应用程序的名称为 ScalaSparkPi。
- setMaster("local[]") 表现 Spark 应用程序将在本地运行,使用所有可用的 CPU 核心。local[] 是 Spark 中的特殊设置,表现本地模式下使用所有的 CPU 核心。
val sc = new SparkContext(conf)
SparkContext 是 Spark 应用程序的核心,负责与 Spark 集群举行交互。这里通过配置对象 conf 创建了一个新的 SparkContext 实例。
sc.setLogLevel(“WARN”)
设置日记的级别为 “WARN”。这意味着只会记录告诫级别及以上的日记信息,淘汰不必要的日记输出。
val slices = if (args.length > 0) { … }
这段代码用来处置惩罚传递给程序的第一个参数,如果有参数传递过来,则将其转换为整数,作为分片数 slices。如果没有参数,则默认值为 0。
val N = 100000000
界说一个常量 N,表现将举行一亿次随机点的天生,以此来估算 \pi 值。
val count = sc.makeRDD(1 to N, slices)
- sc.makeRDD(1 to N, slices) 创建一个包含从 1 到 N 的整数的 RDD(弹性分布式数据集),并将其划分为 slices 个分片举行并行计算。
- map(idx => { … }) 是对 RDD 中的每个元素举行映射操作。对于每个 idx,天生两个随机数 x 和 y,分别表现点的 x 和 y 坐标。
- if (xx + yy <= 1) 判断点 (x, y) 是否在单位圆内。如果在圆内,则返回 1,否则返回 0。
reduce(_ + _)
- reduce(_ + _) 将所有的 1 和 0 相加,得到在单位圆内的点的总数。
println(s" i is ${4.0 * count / N}")
- 计算 \pi 的估计值:使用公式 \pi \approx 4 \times (\text{圆内点的数量} / \text{总点数})。
- 输出计算结果。
打包上传
打包完成上传Jar包:
运行项目
- spark-submit --master local[*] --class icu.wzk.SparkPi spark-wordcount-1.0-SNAPSHOT.jar 15
复制代码 运行等候结果
运行完毕的结果如下:
找共同挚友
需求背景
如今有一组数据
- 100, 200 300 400 500 600
- 200, 100 300 400
- 300, 100 200 400 500
- 400, 100 200 300
- 500, 100 300
- 600, 100
复制代码 第一列表现用户,后边的数字表现该用户的挚友,我们要对上面的这几枚举行分析计算,得出共同的挚友。
编写代码
方法一
核心思想使用笛卡尔积求两两之间的挚友 然后去除多余的数据
- package icu.wzk
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
- object FindFriends {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf()
- .setAppName("SparkFindFriends")
- .setMaster("local[*]")
- val sc = new SparkContext(conf)
- sc.setLogLevel("WARN")
- val lines: RDD[String] = sc.textFile(args(0))
- val friendsRDD: RDD[(String, Array[String])] = lines.map{
- line =>
- val fields: Array[String] = line.split(",")
- val userId = fields(0).trim
- val friends: Array[String] = fields(1).trim.split("\\s+")
- (userId, friends)
- }
- friendsRDD
- .cartesian(friendsRDD)
- .filter({
- case ((id1, _), (id2, _)) => id1 < id2
- })
- .map{
- case ((id1, friends1), (id2, friends2)) => ((id1, id2), friends1.intersect(friends2).sorted.toBuffer)
- }
- .sortByKey()
- .collect()
- .foreach(println)
- sc.stop()
- }
- }
复制代码 方法二
消除笛卡尔积 核心思想是:将数据变形,找到两两的挚友,再实行数据的归并
- package icu.wzk
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
- object FindFriends2 {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf()
- .setAppName("SparkFindFriends")
- .setMaster("local[*]")
- val sc = new SparkContext(conf)
- sc.setLogLevel("WARN")
- val lines: RDD[String] = sc.textFile(args(0))
- val friendsRDD: RDD[(String, Array[String])] = lines.map{
- line =>
- val fields: Array[String] = line.split(",")
- val userId = fields(0).trim
- val friends: Array[String] = fields(1).trim.split("\\s+")
- (userId, friends)
- }
- friendsRDD
- .flatMapValues(friends => friends.combinations(2))
- .map{
- case (k, v) => (v.mkString(" & "), Set(k))
- }
- .reduceByKey(_ | _)
- .sortByKey()
- .collect()
- .foreach(println)
- sc.stop()
- }
- }
复制代码 打包上传
运行项目
方法一
- spark-submit --master local[*] --class icu.wzk.FindFriends spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/friends.txt
复制代码 运行结果如下图:
方法二
- spark-submit --master local[*] --class icu.wzk.FindFriends2 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/friends.txt
复制代码 运行结果如下图所示:
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |