实行五 Spark SQL编程初级实践

打印 上一主题 下一主题

主题 819|帖子 819|积分 2457

Spark SQL编程初级实践


  • Spark SQL基本操作
将下列JSON格式数据复制到Linux体系中,并保存命名为employee.json。
{ "id":1 , "name":" Ella" , "age":36 }
{ "id":2, "name":"Bob","age":29 }
{ "id":3 , "name":"Jack","age":29 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":5 , "name":"Damon" }
{ "id":5 , "name":"Damon" }
为employee.json创建DataFrame,并写出Scala语句完成下列操作:

  • 查询所有数据;
  • 查询所有数据,并去除重复的数据;
  • 查询所有数据,打印时去除id字段;
  • 筛选出age>30的记载;
  • 将数据按age分组;
  • 将数据按name升序分列;
  • 取出前3行数据;
  • 查询所有记载的name列,并为其取别名为username;
  • 查询年龄age的均匀值;
  • 查询年龄age的最小值。



  • 编程实现将RDD转换为DataFrame
源文件内容如下(包罗id,name,age):
1,Ella,36
2,Bob,29
3,Jack,29
请将数据复制保存到Linux体系中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。



  • 编程实现使用DataFrame读写MySQL的数据
(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包罗如表6-2所示的两行数据。
表6-2 employee表原有数据

id

name

gender

Age

1

Alice

F

22

2

John

M

25


(2)设置Spark通过JDBC连接数据库MySQL,编程实现使用DataFrame插入如表6-3所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。
表6-3 employee表新增数据

id

name

gender

age

3

Mary

F

26

4

Tom

M

23

实行一 :Spark SQL基本操作
  1. 1)
  2. // 导入必要的库
  3. import org.apache.spark.sql.SparkSession
  4. // 创建SparkSession
  5. val spark = SparkSession.builder()
  6.   .appName("Spark SQL Basic Operations")
  7.   .getOrCreate()
  8. // 读取JSON文件创建DataFrame
  9.         val df = spark.read.json("file:///home/hadoop/employee.json")
  10.           // (1) 查询所有数据
  11. df.show()
  12. (2)查询所有数据,并去除重复的数据
  13. df.distinct().show()
  14. (3)
  15. 查询所有数据,打印时去除id字段
  16. df.drop("id").show()
  17. (4)
  18. 筛选出age>30的记录
  19. df.filter("age > 30").show()
  20. (5)
  21. 将数据按age分组
  22. df.groupBy("age").count().show()
  23. (6)
  24. 将数据按name升序排列
  25. df.orderBy("name").show()
  26. (7)
  27. 取出前3行数据
  28. df.limit(3).show()
  29. (8)
  30. 查询所有记录的name列,并为其取别名为username
  31. df.select($"name".alias("username")).show()
  32. (9)
  33. 查询年龄age的平均值
  34. df.selectExpr("avg(age)").show()
  35. (10)
  36. 查询年龄age的最小值
  37. df.selectExpr("min(age)").show()
复制代码
实行二 :编程实现将RDD转换为DataFrame
编程代码:
  1. import org.apache.spark.sql.{SparkSession, Row}  
  2. import org.apache.spark.sql.types._  
  3.   
  4. object RDDToDataFrameExample {  
  5.   def main(args: Array[String]): Unit = {  
  6.     // 创建SparkSession  
  7.     val spark = SparkSession.builder()  
  8.       .appName("RDD to DataFrame Example")  
  9.       .master("local[*]") // 使用本地模式,如果连接到集群请更改这里  
  10.       .getOrCreate()  
  11.   
  12.     import spark.implicits._  
  13.   
  14.     // 指定employee.txt文件的位置  
  15.     val inputFilePath = "file:///home/hadoop/employee.txt"  
  16.   
  17.     // 从文本文件读取数据创建RDD  
  18.     val rdd = spark.sparkContext.textFile(inputFilePath)  
  19.   
  20.     // 定义DataFrame的schema  
  21.     val schema = StructType(Array(  
  22.       StructField("id", IntegerType, nullable = false),  
  23.       StructField("name", StringType, nullable = false),  
  24.       StructField("age", IntegerType, nullable = false)  
  25.     ))  
  26.   
  27.     // 将RDD转换为DataFrame  
  28.     val dataFrame = spark.createDataFrame(rdd.map { line =>  
  29.       val parts = line.split(",")  
  30.       Row(parts(0).toInt, parts(1), parts(2).toInt)  
  31.     }, schema)  
  32.   
  33.     // 显示DataFrame内容  
  34.     dataFrame.show(false)  
  35.   
  36.     // 按照指定格式打印所有数据  
  37.     dataFrame.collect().foreach { row =>  
  38.       println(s"id:${row.getAs[Int]("id")},name:${row.getAs[String]("name")},age:${row.getAs[Int]("age")}")  
  39.     }  
  40.   
  41.     // 停止SparkSession  
  42.     spark.stop()  
  43.   }  
  44. }
复制代码
 下令
  1. /usr/local/spark-3.5.1/bin/spark-submit --class "RDDToDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar
复制代码
 具体操作参考博客
如何安装sbt(sbt在ubuntu上的安装与设置)(有具体安装网站和图解)-CSDN博客
实行三:编程实现使用DataFrame读写MySQL的数据
mysql代码
  1. CREATE DATABASE sparktest;  
  2. USE sparktest;  
  3.   
  4. CREATE TABLE employee (  
  5.   id INT PRIMARY KEY,  
  6.   name VARCHAR(50),  
  7.   gender CHAR(1),  
  8.   age INT  
  9. );  
  10.   
  11. INSERT INTO employee (id, name, gender, age) VALUES (1, 'Alice', 'F', 22);  
  12. INSERT INTO employee (id, name, gender, age) VALUES (2, 'John', 'M', 25);
复制代码
如何安装msyql参考博客
 在ubuntu上安装mysql(在线安装必要)-CSDN博客
如何安装mysl驱动程序jar包-CSDN博客
编程代码
  1. import org.apache.spark.sql.{SparkSession, Row}  
  2. import java.util.Properties  
  3. import org.apache.spark.sql.SparkSession  
  4. import org.apache.spark.sql.Dataset  
  5. import org.apache.spark.sql.Row  
  6. import org.apache.spark.sql.functions.max  
  7. import org.apache.spark.sql.functions.sum  
  8.   
  9. object MySQLDataFrameExample {  
  10.   def main(args: Array[String]): Unit = {  
  11.     // 创建SparkSession  
  12.     val spark = SparkSession.builder()  
  13.       .appName("MySQL DataFrame Example")  
  14.       .master("local[*]") // 使用本地模式,如果连接到集群请更改这里  
  15.       .getOrCreate()  
  16.   
  17.     import spark.implicits._  
  18.   
  19.     // 配置MySQL JDBC连接  
  20.     val jdbcProperties = new Properties()  
  21.     jdbcProperties.setProperty("user", "root")  
  22.     jdbcProperties.setProperty("password", "mysql")  
  23.     jdbcProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")  
  24.   
  25.     // 定义MySQL的JDBC连接URL  
  26.     val jdbcUrl = "jdbc:mysql://localhost:3306/sparktest"  
  27.   
  28.     // 创建DataFrame以插入数据  
  29.     val newEmployeeData = Seq(  
  30.       (3, "Mary", "F", 26),  
  31.       (4, "Tom", "M", 23)  
  32.     ).toDF("id", "name", "gender", "age")  
  33.   
  34.     // 将DataFrame数据插入到MySQL的employee表中  
  35.     newEmployeeData.write  
  36.       .mode("append") // 使用append模式来添加数据,而不是覆盖  
  37.       .jdbc(jdbcUrl, "employee", jdbcProperties)  
  38.   
  39.     // 从MySQL读取employee表的数据  
  40.     val employeeDF = spark.read  
  41.       .jdbc(jdbcUrl, "employee", jdbcProperties)  
  42.   
  43.     // 打印age的最大值  
  44.     val maxAge = employeeDF.agg(max("age")).collect()(0).getAs[Int](0)  
  45.     println(s"Max age: $maxAge")  
  46.   
  47.     // 打印age的总和  
  48.     val sumAge = employeeDF.agg(sum("age")).collect()(0).getAs[Long](0)  
  49.     println(s"Sum of ages: $sumAge")  
  50.   
  51.     // 停止SparkSession  
  52.     spark.stop()  
  53.   }  
  54. }
复制代码
编程具体步骤参考
 如何安装sbt(sbt在ubuntu上的安装与设置)(有具体安装网站和图解)-CSDN博客
 运行下令
  1. /usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar  --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar
复制代码
产生错误
重要问题都在实行三中,因为实行三中涉及到一个mysql数据库连接
下令更新为
/usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar  --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar
加了一个mysl驱动的jar的引用
如何安装mysql驱动参考博客
如何安装mysl驱动程序jar包-CSDN博客
打包失败

这个问题是代码错误
代码未引入一些包
加上下面这些就可以了
import org.apache.spark.sql.{SparkSession, Row}  
import java.util.Properties  
import org.apache.spark.sql.SparkSession  
import org.apache.spark.sql.Dataset  
import org.apache.spark.sql.Row  
import org.apache.spark.sql.functions.max  
import org.apache.spark.sql.functions.sum  

运行失败

未引入mysl驱动程序
要下载mysql驱动

采取下令引入
  1. /usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar  --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar
复制代码


参考链接
如何安装sbt(sbt在ubuntu上的安装与设置)(有具体安装网站和图解)-CSDN博客
在ubuntu上安装mysql(在线安装必要)-CSDN博客
在ubuntu上安装mysql(在线安装必要)-CSDN博客

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

风雨同行

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表