冬雨财经 发表于 2024-12-18 01:32:17

Spark SQL编程初级实践

参考链接

Spark编程: Spark SQL基本操作 2020.11.01_df.agg("age"->"avg")-CSDN博客
RDD编程初级实践-CSDN博客
Spark和Hadoop的安装-CSDN博客
Spark SQL 编程初级实践-CSDN博客
1. Spark SQL基本操作

{ "id":1 , "name":" Ella" , "age":36 }
{ "id":2, "name":"Bob","age":29 }
{ "id":3 , "name":"Jack","age":29 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":5 , "name":"Damon" }
{ "id":5 , "name":"Damon" }
创建employee.json文件
sudo vim employee.json
cat employee.json https://i-blog.csdnimg.cn/blog_migrate/2db0356a6712ba0f5b0c679c18c84116.png
启动spark-shell
cd /usr/local/spark/
./bin/spark-shell
https://i-blog.csdnimg.cn/blog_migrate/01938445984d69790ba60ab5fed407a1.png
1.1  查询全部数据

import spark.implicits._
val df=spark.read.json("file:home/hadoop/下载/employee.json")
df.show()
import spark.implicits._是Spark的一个工具,资助 我们将RDD 转换为DataFrame。
spark.read.json是 Apache Spark 中的一个方法,用于从 JSON 文件中读取数据并将其加载到 DataFrame 中。
df.show()用于显示DataFrame中的内容。
https://i-blog.csdnimg.cn/blog_migrate/276cf707dc2f68ce170ae82d2ea97d58.png
1.2  查询全部数据,并去除重复的数据

df.distinct().show()  distinct()去重。
 https://i-blog.csdnimg.cn/blog_migrate/5cae1bca4cd363bc170bf6455dab7fd5.png
1.3  查询全部数据,打印时去除id字段

df.drop(df("id")).show()  df.drop()用于删除DataFrame中指定的列。
 https://i-blog.csdnimg.cn/blog_migrate/2c53956e05e448cfddb427b5314ded4b.png
1.4  筛选出age>30的记载

df.filter(df("age")>30).show()  df.filter()用于根据指定条件过滤DataFrame中的行。
https://i-blog.csdnimg.cn/blog_migrate/10e063c05c1a21a565908282475b7e91.png
1.5  将数据按age分组

df.groupBy(df("age")).count.show() df.groupBy()用于根据指定的列对DataFrame举行分组。
df.count().show()用于显示分组后的DataFrame的内容。
https://i-blog.csdnimg.cn/blog_migrate/18bce17b7a8b2eeb6065e4922e5f7827.png
1.6  将数据按name升序分列

df.sort(df("name").asc).show() df.sort()用于对DataFrame中的行举行排序(默认升序)。
升序asc
降序desc
这里“Ella”比“Bob”小是由于“Ella”字符串现实上是“ Ella”,所以他的第一个字符不是‘E’而是‘ ’,对应的ASCII,‘E’是69,‘B’是66,‘ ’是32.
https://i-blog.csdnimg.cn/blog_migrate/27dbcaf5276a0cdc616e3db385dd601a.png
 https://i-blog.csdnimg.cn/blog_migrate/9160646a2283e563d748cbfd480a935d.png
https://i-blog.csdnimg.cn/blog_migrate/2e7896776c0d1a5c3b9203b3ec7ae0a6.png
1.7  取出前3行数据

df.show(3)  df.show(n)用于显示DataFrame的前n行。(n超出后会打印原始的大小)
https://i-blog.csdnimg.cn/blog_migrate/d8af214a323bc74bc80427c504014990.png
https://i-blog.csdnimg.cn/blog_migrate/f85e24dd266b55f3e548b510c7a1bb74.png
1.8  查询全部记载的name列,并为其取别名为username

df.select(df("name").as("username")).show()  df.select()用于选择DataFrame中指定的列。
 https://i-blog.csdnimg.cn/blog_migrate/63c4112224ad9f2cb0055d6cd7b9bb77.png
1.9  查询年龄age的均匀值

df.agg("age"->"avg").show()  df.agg()用于对DataFrame举行聚合操作。
avg均匀。
https://i-blog.csdnimg.cn/blog_migrate/79dc3a10bdd45ae26d425bb4c24e3f5b.png
1.10 查询年龄age的最小值

df.agg("age"->"min").show() min最小。 
https://i-blog.csdnimg.cn/blog_migrate/c9804949a5e3f30cb2430d43a2683053.png
2.编程实现将RDD转换为DataFrame

1,Ella,36
2,Bob,29
3,Jack,29
创建项目
sudo mkdir -p /example/sparkapp6/src/main/scala
cd /example/sparkapp6/src/main/scala
  创建employee.txt
sudo vim employee.txt
cat employee.txt https://i-blog.csdnimg.cn/blog_migrate/d8f140965de45d066c60f71dcc8be593.png
创建Scala文件
sudo vim ./SimpleApp.scala import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

object SimpleApp {
def main(args: Array): Unit = {
    val spark = SparkSession.builder
      .appName("MyApp")
      .getOrCreate()
    val peopleRDD = spark.sparkContext.textFile("file:///example/sparkapp6/src/main/scala/employee.txt")
    val schemaString = "id name age"
    val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)
    val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim, attributes(2).trim))
    val peopleDF = spark.createDataFrame(rowRDD, schema)
    peopleDF.createOrReplaceTempView("people")
    val results = spark.sql("SELECT id,name,age FROM people")
    results.map(attributes => "id: " + attributes(0) + "," + "name:" + attributes(1) + "," + "age:" + attributes(2)).show()
}
} 这个代码没乐成,继续往下面看。
 创建.sbt文件
sudo vim build.sbt 这里须要的依赖发生了变化,不改会报错。 
name := "Simple Project"
version := "1.0"
scalaVersion := "2.13.13"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.1"
 打包执行
/usr/local/sbt/sbt package
spark-submit --class "SimpleApp" ./target/scala-2.13/simple-project_2.13-1.0.jar https://i-blog.csdnimg.cn/blog_migrate/2a4ace5dedc6b522cb84b9c21858e5a9.png 直接启动spark-shell(乐成运行是看这里)
cd /usr/local/spark/
./bin/spark-shell
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val peopleRDD = spark.sparkContext.textFile("file:///example/sparkapp6/src/main/scala/employee.txt")
val schemaString = "id name age"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim, attributes(2).trim))
val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF.createOrReplaceTempView("people")
val results = spark.sql("SELECT id,name,age FROM people")
results.map(attributes => "id: " + attributes(0) + "," + "name:" + attributes(1) + "," + "age:" + attributes(2)).show()

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val peopleRDD = spark.sparkContext.textFile("file:///example/sparkapp6/src/main/scala/employee.txt")
val peopleRDD: org.apache.spark.rdd.RDD = file:///example/sparkapp6/src/main/scala/employee.txt MapPartitionsRDD at textFile at <console>:1

scala> val schemaString = "id name age"
val schemaString: String = id name age

scala> val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
val fields: Array = Array(StructField(id,StringType,true), StructField(name,StringType,true), StructField(age,StringType,true))

scala> val schema = StructType(fields)
val schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,StringType,true),StructField(name,StringType,true),StructField(age,StringType,true))

scala> val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim, attributes(2).trim))
val rowRDD: org.apache.spark.rdd.RDD = MapPartitionsRDD at map at <console>:1

scala> val peopleDF = spark.createDataFrame(rowRDD, schema)
val peopleDF: org.apache.spark.sql.DataFrame =

scala> peopleDF.createOrReplaceTempView("people")

scala> val results = spark.sql("SELECT id,name,age FROM people")
val results: org.apache.spark.sql.DataFrame =

scala> results.map(attributes => "id: " + attributes(0) + "," + "name:" + attributes(1) + "," + "age:" + attributes(2)).show()
+--------------------+
|               value|
+--------------------+
|id: 1,name:Ella,a...|
|id: 2,name:Bob,ag...|
|id: 3,name:Jack,a...|
+--------------------+
https://i-blog.csdnimg.cn/blog_migrate/0842be751b008fac1c53424ee10cb6a3.png
https://i-blog.csdnimg.cn/blog_migrate/38cbc939fc3f428ebed8efe2e1b594d3.png
3.编程实现利用DataFrame读写MySQL的数据

安装MySQL
MySQL :: Download MySQL Connector/J (Archived Versions)
https://i-blog.csdnimg.cn/blog_migrate/fe261564f581097e3d3723b46d91bfc1.png
sudo tar -zxf ./mysql-connector-java-5.1.40.tar.gz -C /usr/local
cd /usr/local/
mv mysql-connector-java-5.1.40/ mysql
3.1 在MySQL数据库中新建数据库sparktest,再创建表employee

https://i-blog.csdnimg.cn/blog_migrate/8ef6e14e0a4ce6fc8b791635d9371530.png
su root
service mysql start
mysql -u root -p  建库
create database sparktest; https://i-blog.csdnimg.cn/blog_migrate/21b6b3f536f6bb3ad032cb56c0dac916.png 
建表
use sparktest;
create table employee(id int(4),name char(20),gender char(4),Age int(4));
mysql> use sparktest;
Database changed
mysql> create table employee(id int(4),name char(20),gender char(4),Age int(4));
Query OK, 0 rows affected, 2 warnings (0.02 sec)
https://i-blog.csdnimg.cn/blog_migrate/5b829ca67d82dc493218c1b7cbe45319.png 
插入数据
insert into employee values(1,'Alice','F',22);
insert into employee values(2,'John','M',25);
select * from employee; mysql> insert into employee values(1,'Alice','F',22);
Query OK, 1 row affected (0.01 sec)

mysql> insert into employee values(2,'John','M',25);
Query OK, 1 row affected (0.01 sec)

mysql> select * from employee;

+------+-------+--------+------+
| id   | name| gender | Age|
+------+-------+--------+------+
|    1 | Alice | F      |   22 |
|    2 | John| M      |   25 |
+------+-------+--------+------+
2 rows in set (0.00 sec)
https://i-blog.csdnimg.cn/blog_migrate/827375d033d7cd148fb2517dfd9c7402.png 
3.2 配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入




免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Spark SQL编程初级实践