ToB企服应用市场:ToB评测及商务社交产业平台

标题: 2023_Spark_实行十一:RDD基础算子操作 [打印本页]

作者: 老婆出轨    时间: 2024-11-12 23:16
标题: 2023_Spark_实行十一:RDD基础算子操作
一、RDD的练习可以使用两种方式


二、使用Shell练习RDD

当你打开 Spark 的交互式命令行界面(也就是 Spark shell)的时间,它已经自动为你准备好了一个叫做 sc 的特别对象,这个对象是用来和 Spark 集群沟通的。你不需要,也不应该本身再创建一个如许的对象。
如果你想告诉 Spark 用哪个计算机大概计算机集群来执行你的命令,可以通过 --master 这个选项来设置。比如,你想在当地计算机上只用四个核心来运行,就可以在命令里加上 --master local[4]。
  1. $ ./bin/spark-shell --master local[4]
复制代码
如果你有一些本身的代码打包成了 JAR 文件,想要在 Spark shell 里用,可以通过 --jars 选项,反面跟上你的 JAR 文件名,用逗号分隔,来把它们加入到可以识别的路径里。
  1. $ ./bin/spark-shell --master local[4] --jars code.jar
复制代码
此外,如果你需要一些额外的库大概 Spark 的扩展包,可以通过 --packages 选项,反面跟上这些库的 Maven 坐标(一种常用的依靠管理方式),用逗号分隔,来添加它们。假设你需要的包是 org.apache.spark:spark-mllib_2.13:3.4.1,这是Spark的呆板学习库。
  1. $ ./bin/spark-shell --master local[4] --packages "org.apache.spark:spark-mllib_2.13:3.4.1"
复制代码
简单来说,这些选项就是让你告诉 Spark 怎么运行你的代码,以及在哪里找到运行代码所需要的资源。

RDD基础

  1. // 从array中创建RDD
  2. val data = Array(1, 2, 3, 4, 5)
  3. val distData = sc.parallelize(data)
  4. distData.foreach(println)
  5. // 读取文件创建RDD
  6. val lines = sc.textFile("D:\\PycharmProjects\\2024\\pyspark\\datas\\word.txt")
  7. val lineLengths = lines.map(s => s.length)
  8. val totalLength = lineLengths.reduce((a, b) => a + b)
  9. println(totalLength)
  10. // 数据持久化
  11. lineLengths.persist()
  12. print(lineLengths.reduce((a, b) => a + b))
  13. // 对象的函数
  14. object MyFunctions {
  15.   def func1(s: String): String = { s"打印RDD中的字符串,包含的字符串有: $s" }
  16. }
  17. val myRdd = lines.flatMap(lines => lines.split(" "))
  18. myRdd.map(MyFunctions.func1).foreach(println)
  19. import org.apache.spark.rdd.RDD
  20. // 类的函数
  21. class MyClass extends Serializable {
  22.   def func1(s: String): String = { f"在MyClass类中,打印RDD中的字符串,包含的字符串有: $s" }
  23.   def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
  24. }
  25. val f1 = new MyClass()
  26. f1.doStuff(myRdd).foreach(println)
  27. // 类的应用
  28. class MyClass2 extends Serializable {
  29.   val field = "你好,测试案例..."
  30.   def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
  31. }
  32. val f2 = new MyClass2()
  33. f2.doStuff(myRdd).foreach(println)
  34. // Pair RDD应用
  35. val lines = sc.textFile("D:\\PycharmProjects\\2024\\pyspark\\datas\\word.txt")
  36. val pairs = lines.flatMap(_.split(" ")).map(s => (s, 1))
  37. val counts = pairs.reduceByKey((a, b) => a + b)
  38. // 交换键和值的位置
  39. val swappedCounts = counts.map(_.swap)
  40. // 先根据值排序(降序),然后根据键排序(升序)
  41. val sortedByValueThenKeyDesc = swappedCounts.sortByKey(ascending = false)
  42. val CountsDescondvalue = sortedByValueThenKeyDesc .map(_.swap)
  43. CountsDescondvalue .collect()
  44. // 广播变量 Broadcast Variables
  45. val broadcastVar = sc.broadcast(Array(1, 2, 3))
  46. broadcastVar.value
  47. val accum = sc.longAccumulator("My Accumulator")
  48. sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
  49. accum.value
复制代码









三、使用IDEA练习RDD

基于Spark3.4.1,IDEA练习基础的RDD
  1. package test
  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.sql.SparkSession
  4. /**
  5. * @projectName GNUSpark20204  
  6. * @package test  
  7. * @className test.RDD_spark341  
  8. * @description ${description}  
  9. * @author pblh123
  10. * @date 2024/9/26 23:08
  11. * @version 1.0
  12. *
  13. */
  14.    
  15. object RDD_spark341 extends App {
  16. //  创建SparkSession sparkcontext
  17.   val spark = SparkSession.builder
  18.     .appName("RDD_spark341")
  19.     .master("local[2]")
  20.     .getOrCreate()
  21.   val sc: SparkContext = spark.sparkContext
  22. //  spark代码主体
  23.   // 从array中创建RDD
  24.     val data = Array(1, 2, 3, 4, 5)
  25.     val distData = sc.parallelize(data)
  26.     distData.foreach(println)
  27.     // 读取文件创建RDD
  28.     val lines = sc.textFile("D:\\PycharmProjects\\2024\\pyspark\\datas\\word.txt")
  29.     val lineLengths = lines.map(s => s.length)
  30.     val totalLength = lineLengths.reduce((a, b) => a + b)
  31.     println(totalLength)
  32.     // 数据持久化
  33.     lineLengths.persist()
  34.     print(lineLengths.reduce((a, b) => a + b))
  35.     // 对象的函数
  36.     object MyFunctions {
  37.       def func1(s: String): String = {
  38.         s"打印RDD中的字符串,包含的字符串有: $s"
  39.       }
  40.     }
  41.     val myRdd = lines.flatMap(lines => lines.split(" "))
  42.     myRdd.map(MyFunctions.func1).foreach(println)
  43.     import org.apache.spark.rdd.RDD
  44.     // 类的函数
  45.     class MyClass extends Serializable {
  46.       def func1(s: String): String = {
  47.         f"在MyClass类中,打印RDD中的字符串,包含的字符串有: $s"
  48.       }
  49.       def doStuff(rdd: RDD[String]): RDD[String] = {
  50.         rdd.map(func1)
  51.       }
  52.     }
  53.     val f1 = new MyClass()
  54.     f1.doStuff(myRdd).foreach(println)
  55.     // 类的应用
  56.     class MyClass2 extends Serializable {
  57.       val field = "你好,测试案例..."
  58.       def doStuff(rdd: RDD[String]): RDD[String] = {
  59.         rdd.map(x => field + x)
  60.       }
  61.     }
  62.     val f2 = new MyClass2()
  63.     f2.doStuff(myRdd).foreach(println)
  64.     // Pair RDD应用
  65.     val pairs = lines.flatMap(_.split(" ")).map(s => (s, 1))
  66.     val counts = pairs.reduceByKey((a, b) => a + b)
  67.     // 交换键和值的位置
  68.     val swappedCounts = counts.map(_.swap)
  69.     // 先根据值排序(降序),然后根据键排序(升序)
  70.     val sortedByValueThenKeyDesc = swappedCounts.sortByKey(ascending = false)
  71.     val CountsDescondvalue = sortedByValueThenKeyDesc.map(_.swap)
  72.     println(CountsDescondvalue.collect())
  73.     // 广播变量 Broadcast Variables
  74.     val broadcastVar = sc.broadcast(Array(1, 2, 3))
  75.     println(broadcastVar.value)
  76.     val accum = sc.longAccumulator("My Accumulator")
  77.     sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
  78.     println(accum.value)
  79. //  关闭sparkSesssion sparkcontext
  80.   sc.stop()
  81.   spark.stop()
  82. }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4