实行五 Spark SQL编程初级实践
Spark SQL编程初级实践[*]Spark SQL基本操作
将下列JSON格式数据复制到Linux体系中,并保存命名为employee.json。
{ "id":1 , "name":" Ella" , "age":36 }
{ "id":2, "name":"Bob","age":29 }
{ "id":3 , "name":"Jack","age":29 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":5 , "name":"Damon" }
{ "id":5 , "name":"Damon" }
为employee.json创建DataFrame,并写出Scala语句完成下列操作:
[*]查询所有数据;
[*]查询所有数据,并去除重复的数据;
[*]查询所有数据,打印时去除id字段;
[*]筛选出age>30的记载;
[*]将数据按age分组;
[*]将数据按name升序分列;
[*]取出前3行数据;
[*]查询所有记载的name列,并为其取别名为username;
[*]查询年龄age的均匀值;
[*]查询年龄age的最小值。
[*]编程实现将RDD转换为DataFrame
源文件内容如下(包罗id,name,age):
1,Ella,36
2,Bob,29
3,Jack,29
请将数据复制保存到Linux体系中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。
[*]编程实现使用DataFrame读写MySQL的数据
(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包罗如表6-2所示的两行数据。
表6-2 employee表原有数据
id
name
gender
Age
1
Alice
F
22
2
John
M
25
(2)设置Spark通过JDBC连接数据库MySQL,编程实现使用DataFrame插入如表6-3所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。
表6-3 employee表新增数据
id
name
gender
age
3
Mary
F
26
4
Tom
M
23
实行一 :Spark SQL基本操作
1)
// 导入必要的库
import org.apache.spark.sql.SparkSession
// 创建SparkSession
val spark = SparkSession.builder()
.appName("Spark SQL Basic Operations")
.getOrCreate()
// 读取JSON文件创建DataFrame
val df = spark.read.json("file:///home/hadoop/employee.json")
// (1) 查询所有数据
df.show()
(2)查询所有数据,并去除重复的数据
df.distinct().show()
(3)
查询所有数据,打印时去除id字段
df.drop("id").show()
(4)
筛选出age>30的记录
df.filter("age > 30").show()
(5)
将数据按age分组
df.groupBy("age").count().show()
(6)
将数据按name升序排列
df.orderBy("name").show()
(7)
取出前3行数据
df.limit(3).show()
(8)
查询所有记录的name列,并为其取别名为username
df.select($"name".alias("username")).show()
(9)
查询年龄age的平均值
df.selectExpr("avg(age)").show()
(10)
查询年龄age的最小值
df.selectExpr("min(age)").show() 实行二 :编程实现将RDD转换为DataFrame
编程代码:
import org.apache.spark.sql.{SparkSession, Row}
import org.apache.spark.sql.types._
object RDDToDataFrameExample {
def main(args: Array): Unit = {
// 创建SparkSession
val spark = SparkSession.builder()
.appName("RDD to DataFrame Example")
.master("local[*]") // 使用本地模式,如果连接到集群请更改这里
.getOrCreate()
import spark.implicits._
// 指定employee.txt文件的位置
val inputFilePath = "file:///home/hadoop/employee.txt"
// 从文本文件读取数据创建RDD
val rdd = spark.sparkContext.textFile(inputFilePath)
// 定义DataFrame的schema
val schema = StructType(Array(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false)
))
// 将RDD转换为DataFrame
val dataFrame = spark.createDataFrame(rdd.map { line =>
val parts = line.split(",")
Row(parts(0).toInt, parts(1), parts(2).toInt)
}, schema)
// 显示DataFrame内容
dataFrame.show(false)
// 按照指定格式打印所有数据
dataFrame.collect().foreach { row =>
println(s"id:${row.getAs("id")},name:${row.getAs("name")},age:${row.getAs("age")}")
}
// 停止SparkSession
spark.stop()
}
} 下令
/usr/local/spark-3.5.1/bin/spark-submit --class "RDDToDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar 具体操作参考博客
如何安装sbt(sbt在ubuntu上的安装与设置)(有具体安装网站和图解)-CSDN博客
实行三:编程实现使用DataFrame读写MySQL的数据
mysql代码
CREATE DATABASE sparktest;
USE sparktest;
CREATE TABLE employee (
id INT PRIMARY KEY,
name VARCHAR(50),
gender CHAR(1),
age INT
);
INSERT INTO employee (id, name, gender, age) VALUES (1, 'Alice', 'F', 22);
INSERT INTO employee (id, name, gender, age) VALUES (2, 'John', 'M', 25); 如何安装msyql参考博客
在ubuntu上安装mysql(在线安装必要)-CSDN博客
如何安装mysl驱动程序jar包-CSDN博客
编程代码
import org.apache.spark.sql.{SparkSession, Row}
import java.util.Properties
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.functions.sum
object MySQLDataFrameExample {
def main(args: Array): Unit = {
// 创建SparkSession
val spark = SparkSession.builder()
.appName("MySQL DataFrame Example")
.master("local[*]") // 使用本地模式,如果连接到集群请更改这里
.getOrCreate()
import spark.implicits._
// 配置MySQL JDBC连接
val jdbcProperties = new Properties()
jdbcProperties.setProperty("user", "root")
jdbcProperties.setProperty("password", "mysql")
jdbcProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")
// 定义MySQL的JDBC连接URL
val jdbcUrl = "jdbc:mysql://localhost:3306/sparktest"
// 创建DataFrame以插入数据
val newEmployeeData = Seq(
(3, "Mary", "F", 26),
(4, "Tom", "M", 23)
).toDF("id", "name", "gender", "age")
// 将DataFrame数据插入到MySQL的employee表中
newEmployeeData.write
.mode("append") // 使用append模式来添加数据,而不是覆盖
.jdbc(jdbcUrl, "employee", jdbcProperties)
// 从MySQL读取employee表的数据
val employeeDF = spark.read
.jdbc(jdbcUrl, "employee", jdbcProperties)
// 打印age的最大值
val maxAge = employeeDF.agg(max("age")).collect()(0).getAs(0)
println(s"Max age: $maxAge")
// 打印age的总和
val sumAge = employeeDF.agg(sum("age")).collect()(0).getAs(0)
println(s"Sum of ages: $sumAge")
// 停止SparkSession
spark.stop()
}
}
编程具体步骤参考
如何安装sbt(sbt在ubuntu上的安装与设置)(有具体安装网站和图解)-CSDN博客
运行下令
/usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar 产生错误
重要问题都在实行三中,因为实行三中涉及到一个mysql数据库连接
下令更新为
/usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar
加了一个mysl驱动的jar的引用
如何安装mysql驱动参考博客
如何安装mysl驱动程序jar包-CSDN博客
打包失败
https://i-blog.csdnimg.cn/blog_migrate/b729702f14e08d57eebfb324272e3def.png
这个问题是代码错误
代码未引入一些包
加上下面这些就可以了
import org.apache.spark.sql.{SparkSession, Row}
import java.util.Properties
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.functions.sum
运行失败
https://i-blog.csdnimg.cn/blog_migrate/6ce710d6c55a926a54743a89c2f6842f.png
未引入mysl驱动程序
要下载mysql驱动
https://i-blog.csdnimg.cn/blog_migrate/9edc8948a643c0445467aba0b0b9a7e9.png
采取下令引入
/usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar
参考链接
如何安装sbt(sbt在ubuntu上的安装与设置)(有具体安装网站和图解)-CSDN博客
在ubuntu上安装mysql(在线安装必要)-CSDN博客
在ubuntu上安装mysql(在线安装必要)-CSDN博客
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]