一、RDD的练习可以使用两种方式
二、使用Shell练习RDD
当你打开 Spark 的交互式命令行界面(也就是 Spark shell)的时间,它已经自动为你准备好了一个叫做 sc 的特别对象,这个对象是用来和 Spark 集群沟通的。你不需要,也不应该本身再创建一个如许的对象。
如果你想告诉 Spark 用哪个计算机大概计算机集群来执行你的命令,可以通过 --master 这个选项来设置。比如,你想在当地计算机上只用四个核心来运行,就可以在命令里加上 --master local[4]。
- $ ./bin/spark-shell --master local[4]
复制代码 如果你有一些本身的代码打包成了 JAR 文件,想要在 Spark shell 里用,可以通过 --jars 选项,反面跟上你的 JAR 文件名,用逗号分隔,来把它们加入到可以识别的路径里。
- $ ./bin/spark-shell --master local[4] --jars code.jar
复制代码 此外,如果你需要一些额外的库大概 Spark 的扩展包,可以通过 --packages 选项,反面跟上这些库的 Maven 坐标(一种常用的依靠管理方式),用逗号分隔,来添加它们。假设你需要的包是 org.apache.spark:spark-mllib_2.13:3.4.1,这是Spark的呆板学习库。
- $ ./bin/spark-shell --master local[4] --packages "org.apache.spark:spark-mllib_2.13:3.4.1"
复制代码 简单来说,这些选项就是让你告诉 Spark 怎么运行你的代码,以及在哪里找到运行代码所需要的资源。
RDD基础
- // 从array中创建RDD
- val data = Array(1, 2, 3, 4, 5)
- val distData = sc.parallelize(data)
- distData.foreach(println)
- // 读取文件创建RDD
- val lines = sc.textFile("D:\\PycharmProjects\\2024\\pyspark\\datas\\word.txt")
- val lineLengths = lines.map(s => s.length)
- val totalLength = lineLengths.reduce((a, b) => a + b)
- println(totalLength)
- // 数据持久化
- lineLengths.persist()
- print(lineLengths.reduce((a, b) => a + b))
- // 对象的函数
- object MyFunctions {
- def func1(s: String): String = { s"打印RDD中的字符串,包含的字符串有: $s" }
- }
- val myRdd = lines.flatMap(lines => lines.split(" "))
- myRdd.map(MyFunctions.func1).foreach(println)
- import org.apache.spark.rdd.RDD
- // 类的函数
- class MyClass extends Serializable {
- def func1(s: String): String = { f"在MyClass类中,打印RDD中的字符串,包含的字符串有: $s" }
- def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
- }
- val f1 = new MyClass()
- f1.doStuff(myRdd).foreach(println)
- // 类的应用
- class MyClass2 extends Serializable {
- val field = "你好,测试案例..."
- def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
- }
- val f2 = new MyClass2()
- f2.doStuff(myRdd).foreach(println)
- // Pair RDD应用
- val lines = sc.textFile("D:\\PycharmProjects\\2024\\pyspark\\datas\\word.txt")
- val pairs = lines.flatMap(_.split(" ")).map(s => (s, 1))
- val counts = pairs.reduceByKey((a, b) => a + b)
- // 交换键和值的位置
- val swappedCounts = counts.map(_.swap)
- // 先根据值排序(降序),然后根据键排序(升序)
- val sortedByValueThenKeyDesc = swappedCounts.sortByKey(ascending = false)
- val CountsDescondvalue = sortedByValueThenKeyDesc .map(_.swap)
- CountsDescondvalue .collect()
- // 广播变量 Broadcast Variables
- val broadcastVar = sc.broadcast(Array(1, 2, 3))
- broadcastVar.value
- val accum = sc.longAccumulator("My Accumulator")
- sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
- accum.value
复制代码


三、使用IDEA练习RDD
基于Spark3.4.1,IDEA练习基础的RDD
- package test
- import org.apache.spark.SparkContext
- import org.apache.spark.sql.SparkSession
- /**
- * @projectName GNUSpark20204
- * @package test
- * @className test.RDD_spark341
- * @description ${description}
- * @author pblh123
- * @date 2024/9/26 23:08
- * @version 1.0
- *
- */
-
- object RDD_spark341 extends App {
- // 创建SparkSession sparkcontext
- val spark = SparkSession.builder
- .appName("RDD_spark341")
- .master("local[2]")
- .getOrCreate()
- val sc: SparkContext = spark.sparkContext
- // spark代码主体
- // 从array中创建RDD
- val data = Array(1, 2, 3, 4, 5)
- val distData = sc.parallelize(data)
- distData.foreach(println)
- // 读取文件创建RDD
- val lines = sc.textFile("D:\\PycharmProjects\\2024\\pyspark\\datas\\word.txt")
- val lineLengths = lines.map(s => s.length)
- val totalLength = lineLengths.reduce((a, b) => a + b)
- println(totalLength)
- // 数据持久化
- lineLengths.persist()
- print(lineLengths.reduce((a, b) => a + b))
- // 对象的函数
- object MyFunctions {
- def func1(s: String): String = {
- s"打印RDD中的字符串,包含的字符串有: $s"
- }
- }
- val myRdd = lines.flatMap(lines => lines.split(" "))
- myRdd.map(MyFunctions.func1).foreach(println)
- import org.apache.spark.rdd.RDD
- // 类的函数
- class MyClass extends Serializable {
- def func1(s: String): String = {
- f"在MyClass类中,打印RDD中的字符串,包含的字符串有: $s"
- }
- def doStuff(rdd: RDD[String]): RDD[String] = {
- rdd.map(func1)
- }
- }
- val f1 = new MyClass()
- f1.doStuff(myRdd).foreach(println)
- // 类的应用
- class MyClass2 extends Serializable {
- val field = "你好,测试案例..."
- def doStuff(rdd: RDD[String]): RDD[String] = {
- rdd.map(x => field + x)
- }
- }
- val f2 = new MyClass2()
- f2.doStuff(myRdd).foreach(println)
- // Pair RDD应用
- val pairs = lines.flatMap(_.split(" ")).map(s => (s, 1))
- val counts = pairs.reduceByKey((a, b) => a + b)
- // 交换键和值的位置
- val swappedCounts = counts.map(_.swap)
- // 先根据值排序(降序),然后根据键排序(升序)
- val sortedByValueThenKeyDesc = swappedCounts.sortByKey(ascending = false)
- val CountsDescondvalue = sortedByValueThenKeyDesc.map(_.swap)
- println(CountsDescondvalue.collect())
- // 广播变量 Broadcast Variables
- val broadcastVar = sc.broadcast(Array(1, 2, 3))
- println(broadcastVar.value)
- val accum = sc.longAccumulator("My Accumulator")
- sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
- println(accum.value)
- // 关闭sparkSesssion sparkcontext
- sc.stop()
- spark.stop()
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |