风雨同行 发表于 2024-7-21 15:49:14

Spark编程底子:(实行四)Sark SQL**编程初级实践**

Sark SQL**编程初级实践**
一、实行环境

操作系统:Ubunt 16.04.
Spark 版本:2.4.0。
数据库:MySQL。
Python 版本:3.4.3.
from pyspark import SparkContext, SparkConf
 from pyspark.sql import SparkSession
 import os
 os.environ["JAVA_HOME"]="/home/spark021/servers/jdk"
 os.environ["PYSPARK_PYTHON"]="/home/spark021/servers/anaconda3/envs/pyspark/bin/python3.6"
 os.environ["PYSPARK_DRIVER_PYTHON"]="/home/spark021/servers/anaconda3/envs/pyspark/bin/python3.6"
 spark= SparkSession.builder.appName("employee021").config(conf=SparkConf()).getOrCreate()

二、 实行内容与完成情况

1. Spark SQL根本操作

为 employee.json 创建 DataFrame,并编写 Python 语句完成下列操作:
(1)查询所有数据;
(2)查询所有数据,并去除重复的数据;
(3)查询所有数据,打印时去除id 字段;
(4)筛选出 age>30 的记载;
(5)将数据按 age 分组;
(6)将数据按name升序分列;
(7)取出前3行数据;
(8)查询所有记载的 name 列,并为其取别名为 userame;
(9)查询年龄 age的平均值;
(10)查询年龄 age 的最小值。

# (1) 加载 JSON 文件并查询所有数据
 df = spark.read.json("file:///home/spark021/data/employee021.json")
 df.show()
https://img-blog.csdnimg.cn/direct/e8015ed7b0d944b38f150c5920c507c6.png
# (2) 去除重复的数据
 unique_df = df.dropDuplicates()
 unique_df.show()
https://img-blog.csdnimg.cn/direct/d7c44ed268dc49e8889bde9d5c8b3a2b.png
# (3) 去除 id 字段
 df_no_id = df.selectExpr(*)
 df_no_id.show()
https://img-blog.csdnimg.cn/direct/4fe79599c0a5451a905dae117e08656c.png
# (4) 筛选出 age > 30 的记载
 older_than_30 = df.filter(df['age'] > 30)
 older_than_30.show()
https://img-blog.csdnimg.cn/direct/73805ce7b30d4791aadee5292c23f75c.png
# (5) 按 age 分组
 grouped_by_age = df.groupBy('age')
 grouped_by_age.count().show()
https://img-blog.csdnimg.cn/direct/a4d4946e5b404df6a98626d5a8df7085.png
# (6) 按 name 升序分列
 sorted_by_name = df.orderBy('name')
 sorted_by_name.show()
https://img-blog.csdnimg.cn/direct/c99ffc8c3af943528b7b643c6670592d.png
# (7) 取出前3行数据
 first_three = df.limit(3)
 first_three.show()
https://img-blog.csdnimg.cn/direct/ec6e4947dc694e569ca8dc6f20848544.png
# (8) 重定名 name 列为 username
 renamed_df = df.withColumnRenamed('name', 'username')
 renamed_df.show()
https://img-blog.csdnimg.cn/direct/80b1e63d56014f75b6a779ab0b5b152e.png
# (9)利用聚合函数avg()盘算年龄的平均值
 average_age = df.agg({"age": "avg"}).collect()0
 print("年龄平均值:", average_age)
 # (10)利用聚合函数min()盘算年龄的最小值
 min_age = df.agg({"age": "min"}).collect()0
 print("年龄最小值:", min_age)
 # 关闭 SparkSession
 spark.stop()
https://img-blog.csdnimg.cn/direct/80f0e143ace649ae93850e06fdedcbcb.png
2.编程实现将RDD转换为DataFrame

 from pyspark import SparkConf
 from pyspark.sql import SparkSession,Row
 from pyspark.sql.types import *
 import os
 os.environ["JAVA_HOME"]="/home/spark021/servers/jdk"
 os.environ["PYSPARK_PYTHON"]="/home/spark021/servers/anaconda3/envs/pyspark/bin/python3.6"
 os.environ["PYSPARK_DRIVER_PYTHON"]="/home/spark021/servers/anaconda3/envs/pyspark/bin/python3.6"
 spark = SparkSession.builder.appName("RDD转换成DataFrame").config(conf=SparkConf()).getOrCreate()
 *#**配置Spark应用程序
* employee021 = spark.sparkContext.\
 textFile("file:///home/spark021/data/employee021.txt").\
 map(lambda line: line.split(",")).\
 map(lambda x: Row(id=x,name=x,age=x))
 employee021.foreach(print)
 *#创建DataFrame(具有模式)
* schemaPeople = spark.createDataFrame(employee021)
 *#注册为暂时表,才可以通过sql查询
* schemaPeople.createOrReplaceTempView("people")
 peopleDF = spark.sql("select * from people ")
 peopleDF.show()
 peopleRDD = peopleDF.rdd.map(lambda p: "id"+":"+p.id+","+"name"+":"+p.name+","+"age"+":"+p.age)
 peopleRDD.foreach(print)
https://img-blog.csdnimg.cn/direct/61ba6c13bce6489d8305fdfc41277503.png

3.编码实现利用DataFrame读写Mysql的数据

(1)创建数据库

创建表
create table employee021 (id int(4), name char(20), gender char(4), age int(4));
插入数据
insert into employee021 values(1,'Alice','F',22);
insert into employee021 values(2,'John','M',25);
查看表
select * from employee021;

(2)进入saprk,配置 Spark通过 JDBC 连按数据库 MySQL,编程实现利用 DataFrame 插入数据到 MySQL 中,末了打印出 age 的最大值和 age 的总和。
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import  SparkSession
import os
os.environ["JAVA_HOME"]="/home/spark021/servers/jdk"
os.environ["PYSPARK_PYTHON"]="/home/spark021/servers/anaconda3/envs/pyspark/bin/python3.6"
os.environ["PYSPARK_DRIVER_PYTHON"]="/home/spark021/servers/anaconda3/envs/pyspark/bin/python3.6"

spark=SparkSession.builder.config("spark.jars", "/home/spark021/servers/spark-local/jars/mysql-connector-java-5.1.40-bin.jar").getOrCreate()

#配置 Spark通过 JDBC 连按数据库 MySQL
employeeDF = spark.read \
    .format("jdbc")\
    .option("driver","com.mysql.jdbc.Driver")\
    .option("url","jdbc:mysql://localhost:3306/sparktest")\
    .option("dbtable","employee021")\
    .option("user","spark021")\
    .option("password","123456")\
    .load()
employeeDF.show()

#编程实现利用 DataFrame 插入数据到 MySQL 中
#设置模式信息
schema =StructType([StructField("id", IntegerType(),True),\
                    StructField("name",StringType(),True),\
                    StructField("gender",StringType(),True),\
                    StructField("age",IntegerType(),True)])

#设置两条数据,表现两个员工的信息
employeeRDD =spark\
            .sparkContext\
            .parallelize(["3 Mary F 26","4 Tom M 23"])\
            .map(lambda x:x.split(" "))

#创建 Row对象,每个Row 对象都是 rowRDD 中的一行
rowRDD = employeeRDD.map(lambda p:Row(int(p.strip()),p.strip(),p.strip(),int(p.strip())))

#建立 Row 对象和模式之间的对应关系,也就是把数据和模式对应起来
employeeDF = spark.createDataFrame(rowRDD,schema)

#写人数据库
prop ={}
prop['user']='spark021'
prop['password']='123456'
prop['driver']="com.mysql.jdbc.Driver"
employeeDF.write.jdbc("jdbc:mysql://localhost:3306/sparktest",'employee021','append', prop)

employeeDF.show()

#末了打印出 age 的最大值和 age 的总和。
jdbcDF=spark.read.format("jdbc")\
    .option("driver","com.mysql.jdbc.Driver") .option("url","jdbc:mysql://localhost:3306/sparktest")\
    .option("dbtable","employee021") .option("user","spark021")\
    .option("password","123456")\
    .load()
jdbcDF.createOrReplaceTempView("employee021")
spark.sql("select MAX(age) from employee021").show()
spark.sql("select Sum(age) from employee021").show()

 https://img-blog.csdnimg.cn/direct/93a07b0e71494bcfbdf9c115a8170953.png
https://img-blog.csdnimg.cn/direct/133c553b5a4346db87db046c39109957.png




免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Spark编程底子:(实行四)Sark SQL**编程初级实践**