ToB企服应用市场:ToB评测及商务社交产业平台

标题: 大数据-95 Spark 集群 SparkSQL Action与Transformation操作 具体表明与测 [打印本页]

作者: 玛卡巴卡的卡巴卡玛    时间: 2024-8-24 14:12
标题: 大数据-95 Spark 集群 SparkSQL Action与Transformation操作 具体表明与测
点一下关注吧!!!非常感谢!!持续更新!!!

现在已经更新到了:


章节内容

上节完成的内容如下:

焦点操作

Transformation(转换操作)

定义:
Transformation是懒执行的操作,意味着这些操作在调用时并不会立即执行计算,而是会天生一个新的数据集(或RDD),它们描述了从输入数据到输出数据的转换逻辑。Transformation的计算会被延迟,直到遇到一个Action操作时才会真正触发执行。
常见操作:

Action(举措操作)

定义:
Action操作会触发Spark的计算并返回结果。与Transformation不同,Action操作会执行整个计算逻辑,并产生最终的输出,如将结果写入外部存储或将数据返回给驱动步伐。
常见操作:

Action操作

与RDD类似的操作


与布局相关


天生数据

生存并上传到服务器上
  1. EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO
  2. 7369,SMITH,CLERK,7902,2001-01-02 22:12:13,800,,20
  3. 7499,ALLEN,SALESMAN,7698,2002-01-02 22:12:13,1600,300,30
  4. 7521,WARD,SALESMAN,7698,2003-01-02 22:12:13,1250,500,30
  5. 7566,JONES,MANAGER,7839,2004-01-02 22:12:13,2975,,20
  6. 7654,MARTIN,SALESMAN,7698,2005-01-02 22:12:13,1250,1400,30
  7. 7698,BLAKE,MANAGER,7839,2005-04-02 22:12:13,2850,,30
  8. 7782,CLARK,MANAGER,7839,2006-03-02 22:12:13,2450,,10
  9. 7788,SCOTT,ANALYST,7566,2007-03-02 22:12:13,3000,,20
  10. 7839,KING,PRESIDENT,,2006-03-02 22:12:13,5000,,10
  11. 7844,TURNER,SALESMAN,7698,2009-07-02 22:12:13,1500,0,30
  12. 7876,ADAMS,CLERK,7788,2010-05-02 22:12:13,1100,,20
  13. 7900,JAMES,CLERK,7698,2011-06-02 22:12:13,950,,30
  14. 7902,FORD,ANALYST,7566,2011-07-02 22:12:13,3000,,20
  15. 7934,MILLER,CLERK,7782,2012-11-02 22:12:13,1300,,10
复制代码
写入内容如下图所示:

测试运行

我们进入 spark-shell 举行测试
  1. // 处理头,使用自动类型推断
  2. val df1 = spark.read.option("header", true).option("infershema", "true").csv("test_spark_03.txt")
  3. df1.count
  4. // 缺省显示20行
  5. df1.union(df1).show()
  6. // 显示2行
  7. df1.show(2)
复制代码
执行结果如下图所示:

继承举行测试:
  1. // 不截断字符
  2. df1.toJSON.show(false)
  3. // 显示10行 不截断字符
  4. df1.toJSON.show(10, false)
复制代码
运行结果如下图所示:

继承举行测试:
  1. // collect 返回数组 Array[Row]
  2. val c1 = df1.collect()
  3. // collectAsList 返回List Lits[Row]
  4. val c2 = df1.collectAsList()
  5. // 返回 Row
  6. val h1 = df1.head()
  7. val f1 = df1.first()
  8. // 返回 Array[Row]
  9. val h2 = df1.head(3)
  10. val f2 = df1.take(3)
  11. // 返回 List[Row]
  12. val t2 = df1.takeAsList(2)
复制代码
运行结果如下图所示:

继承举行测试:
  1. // 结构属性
  2. // 查看列名
  3. df1.columns
  4. // 查看列名和类型
  5. df1.dtypes
  6. // 查看执行计划
  7. df1.explain()
  8. // 获取某个列
  9. df1.col("ENAME")
  10. // 常用
  11. df1.printSchema
复制代码
运行结果如下图所示:

Transformation 操作


与RDD类似的操作


我们举行测试:
  1. val df1 = spark.read.csv("/opt/wzk/data/people1.csv")
  2. // 获取第1列
  3. df1.map(row => row.getAs[String](0)).show
  4. // randomSplit 将DF、DS按给定参数分成多份
  5. val df2 = df1.randomSplit(Array(0.5, 0.6, 0.7))
  6. df2(0).count
  7. df2(1).count
  8. df2(2).count
复制代码
测试结果如下图:

我们继承举行测试:
  1. // 取10行数据生成新的Dataset
  2. val df2 = df1.limit(10)
  3. // distinct 去重
  4. val df2 = df1.union(df1)
  5. df2.distinct.count
  6. // dropDuplicates 按列值去重
  7. df2.dropDuplicates.show
  8. df2.dropDuplicates("_c0").show
复制代码
执行结果如下图:

存储相关


备注:Dataset默认的存储级别是 MEMEORY_AND_DISK
  1. spark.sparkContext.setCheckpointDir("hdfs://h121.wzk.icu:9000/checkpoint")
  2. df1.show()
  3. df1.checkpoint()
  4. df1.cache()
  5. import org.apache.spark.storage.StorageLevel
  6. df1.persist(StorageLevel.MEMORY_ONLY)
  7. df1.count()
  8. df1.unpersist(true)
复制代码
执行结果如下图所示:

select相关


启动 Spark-Shell 继承举行测试
  1. // 这里注意 option("header", "true") 自动解析一下表头
  2. val df1 = spark.read.option("header", "true").csv("/opt/wzk/data/people1.csv")
  3. // $ col() 等等 不可以混用!!!(有解决方法,但是建议不混用!!!)
  4. // 可以多种形式获取到列
  5. df1.select($"name", $"age", $"job").show
复制代码
执行结果如下图所示:

继承举行测试

  1. df1.select("name", "age", "job").show(3)
  2. df1.select(col("name"), col("age"), col("job")).show(3)
  3. df1.select($"name", $"age"+1000, $"job").show(5)
复制代码
运行结果如下图所示:

where相关

接着对上述内容举行测试:
  1. df1.filter("age > 25").show
  2. df1.filter("age > 25 and name == 'wzk18'").show
  3. df1.where("age > 25").show
  4. df1.where("age > 25 and name == 'wzk19'").show
复制代码
运行测试结果如下图:

groupBy相关


举行测试:
  1. // 由于我的字段中没有数值类型的,就不做测试了
  2. df1.groupBy("Job").sum("sal").show
  3. df1.groupBy("Job").max("sal").show
  4. df1.groupBy("Job").min("sal").show
  5. df1.groupBy("Job").avg("sal").show
  6. df1.groupBy("Job").count.show
  7. df1.groupBy("Job").avg("sal").where("avg(sal) > 2000").show
  8. df1.groupBy("Job").avg("sal").where($"avg(sal)" > 2000).show
  9. df1.groupBy("Job").agg("sal"->"max", "sal"->"min", "sal"-
  10. >"avg", "sal"->"sum", "sal"->"count").show
  11. df1.groupBy("deptno").agg("sal"->"max", "sal"->"min", "sal"-
  12. >"avg", "sal"->"sum", "sal"->"count").show
复制代码
orderBy相关

orderBy == sort
  1. df1.orderBy("name").show(5)
  2. df1.orderBy($"name".asc).show(5)
  3. df1.orderBy(-$"age").show(5)
复制代码
运行测试的结果如下图所示:

继承举行测试:
  1. df1.sort("age").show(3)
  2. df1.sort($"age".asc).show(3)
  3. df1.sort(col("age")).show(3)
复制代码
测试结果如下图所示:

JOIN相关

  1. // 笛卡尔积
  2. df1.crossJoin(df1).count
  3. // 等值连接(单字段)
  4. df1.join(df1, "name").count
  5. // 等值连接(多字段)
  6. df1.join(df1, Seq("name", "age")).show
复制代码
运行的测试结果如下图所示:

这里编写两个case:
  1. // 第一个数据集
  2. case class StudentAge(sno: Int, name: String, age: Int)
  3. val lst = List(StudentAge(1,"Alice", 18), StudentAge(2,"Andy", 19), StudentAge(3,"Bob", 17), StudentAge(4,"Justin", 21), StudentAge(5,"Cindy", 20))
  4. val ds1 = spark.createDataset(lst)
  5. // 第二个数据集
  6. case class StudentHeight(sname: String, height: Int)
  7. val rdd = sc.makeRDD(List(StudentHeight("Alice", 160), StudentHeight("Andy", 159), StudentHeight("Bob", 170), StudentHeight("Cindy", 165), StudentHeight("Rose", 160)))
  8. val ds2 = rdd.toDS
复制代码
运行测试的结果如下图所示:

接下来我们举行连表操作:
  1. // 连表操作 不可以使用 "name"==="sname" !!!
  2. ds1.join(ds2, 'name==='sname).show
  3. ds1.join(ds2, ds1("name")===ds2("sname")).show
  4. ds1.join(ds2, $"name"===$"sname").show
  5. ds1.join(ds2, $"name"===$"sname", "inner").show
复制代码
测试的运行结果如下图所示:


聚集相关

  1. val ds3 = ds1.select("name")
  2. val ds4 = ds2.select("sname")
  3. // union 求并集、不去重
  4. ds3.union(ds4).show
  5. // unionAll(过时了)与union等价
  6. // intersect 求交
  7. ds3.intersect(ds4).show
  8. // except 求差
  9. ds3.except(ds4).show
复制代码
运行结果如下图所示:

空值处理

  1. math.sqrt(-1.0)
  2. math.sqrt(-1.0).inNaN()
  3. df1.show
  4. // 删除所有列的空值和NaN
  5. df1.na.drop.show
  6. // 删除某列的空值和NaN
  7. df1.na.drop(Array("xxx")).show
  8. // 对列进行填充
  9. df1.na.fill(1000).show
  10. df1.na.fill(1000, Array("xxx")).show
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4