一、连接Spark集群
初始化一个Spark应用程序,设置其名称(以便在Spark Web UI中识别)和连接到指定的Spark集群(通过spark://192.168.126.10:7077)。创建SparkSession对象,作为Spark应用程序的单一入口点,提供了对Spark SQL和DataFrame API的访问。
代码如下:
- from pyspark import SparkConf, SparkContext
- from pyspark.sql import SparkSession, Row
- from pyspark.sql.functions import mean, max, col, length, substring, count, avg
- from pyspark.sql.types import StructType, StructField, StringType, IntegerType
- conf = SparkConf().setAppName("PySpark的基本操作").setMaster('spark://192.168.126.10:7077')
- sc = SparkContext.getOrCreate(conf)
- spark = SparkSession(sc)
复制代码 二、创建数据框DataFrame
创建PySpark DataFrame数据框。创建3张数据框(数据表),分别为:门生表(stu),课程信息表(cou),门生选课表(sc)。
门生表(stu)

课程信息表(cou)

门生选课表(sc)

代码如下:
- # *********************创建数据框DataFrame*************************
- # 1.学生信息表
- da = [('202301', '张三', '男', '大一', 20, 1.72, '广西'), ('202302', '李四', '女', '大二', 21, 1.70, '广西'),
- ('202303', '王小五', '男', '大三', 20, 1.60, '湖南'), ('202304', '小马', '男', '大一', 19, 1.65, '四川'),
- ('202305', '小张', '女', '大二', None, 1.68, '贵州'), ('202306', '小李', '男', '大三', 22, 1.68, '福建'),
- ('202307', '明明', '女', '大三', 23, 1.62, '广西'), ('202308', '王霆锋', '女', '大一', 20, None, '四川'),
- ('202303', '王小五', '男', '大三', 20, 1.60, '湖南'), ]
- col = ['学号', '姓名', '性别', '年级', '年龄', '身高', '籍贯']
- stu = spark.createDataFrame(data=da, schema=col)
- # col = ['sno','name', 'sex', 'grade', 'age', 'height','birthplace']
- stu.show()
- stu.printSchema() # 查看表结构
- stu.describe().show()
- # stu.summary().show()
- # 2.课程信息表
- da2 = [('s01', '大数据开发技术', 32, 2), ('s02', '数据库原理', 48, 3), ('s03', '机器学习', 48, 3)]
- col2 = ['课程号', '课程名', '学时', '学分']
- cou = spark.createDataFrame(data=da2, schema=col2)
- cou.printSchema()
- cou.show()
- # 3.学生选课表
- da3 = [('202301', 's01', 89), ('202301', 's02', 95), ('202302', 's02', 98), ('202303', 's03', 100),
- ('202304', 's01', 96), ('202305', 's03', 99), ('202306', 's03', 94), ('202307', 's01', 93), ]
- col3 = ['学号', '课程号', '成绩']
- schema = StructType([
- StructField("学号", StringType(), True), # 学号字段是字符串类型,且可以为null
- StructField("课程号", StringType(), True),
- StructField("成绩", IntegerType(), True)]) # 成绩字段是整数类型,且可以为null
- sc = spark.createDataFrame(data=da3, schema=schema)
- sc.printSchema() # 打印表结构
- sc.show()
复制代码 三、Spark SQL的根本用法
可以通过SQL方式操作Spark中的数据,需要先将DataFrame对象映射出一个表名,然后通过表名进行各类操作。如要对门生信息表(stu)操作,先将stu映射为student表,再操作。
代码如下:
- print('*************SQL操作***********')
- stu.createOrReplaceTempView("student") # 需要先将DataFrame对象stu映射出一个表名student,然后通过表名进行各类操作
- stu2 = spark.sql("SELECT * FROM student")
- stu2.show()
- stu3 = spark.sql("SELECT `姓名`, `性别`, `年龄` FROM student") # 需要反引号(键盘左上角波浪线那个按键)来包围中文列名,英文列名不需要反引号。
- stu3.show()
复制代码 PySpark SQL可以对中文列名操作,但(键盘左上角波浪线谁人按键)来困绕中文列名,英文列名不需要反引号。
1. 一样平常查询及去重 (DISTINCT )
查询的下令和SQL语句是一样的。
门生表中,王小五有两条重复的记录,需要去重,使用DISTINCT 下令。
代码如下:
- spark.sql("SELECT DISTINCT * FROM student ").show() # 去除重复值(记录)
- spark.sql("SELECT DISTINCT `年级` FROM student").show() # 去除 年级 的重复值,可以看出有几个年级。
复制代码 2. 条件查询(WHERE )
使用WHERE实现条件查询,联合LIKE实现模糊查询,联合REGEXP实现正则匹配查询。
- # 2.条件查询
- spark.sql("SELECT `姓名`, `性别`, `年龄` FROM student WHERE `年龄`>21").show() # 查询年龄大于21岁的学生
- spark.sql("SELECT `姓名`, `性别`, `年龄` ,`籍贯` FROM student "
- "WHERE `籍贯`='广西' AND `性别`='女'").show() # 查询籍贯为广西的女生
- spark.sql("SELECT `姓名` FROM student WHERE `姓名` LIKE '%小%'").show() # LIKE模糊查询,查询含“小”的姓名
- spark.sql("SELECT * FROM student WHERE `姓名` REGEXP '^王.+(五|锋)$'").show() # 正则匹配查询,查询姓名第一个为王,最后一个为五或峰,且中间至少有一个字的学生
- spark.sql("SELECT `姓名`, `性别`, `年龄` FROM student WHERE `年龄` IS NULL ").show() # 查询年龄为空值的学生
复制代码 3. 聚合函数(aggregation function)
使用聚合函数(aggregation function),统计总和(sum)、平均数(mean)、最大值(max)、最小值(min)。聚合函数会对一组值进行统计并返回统计效果
- # 3.聚合函数(aggregation function),统计总和、平均数、最大值、最小值。聚合函数会对一组值进行统计并返回统计结果
- spark.sql("SELECT COUNT(*) AS `总人数` FROM student").show() # 统计学生总人数
- spark.sql("SELECT MAX(`年龄`) AS `最大年龄`,AVG(`身高`) AS `平均身高`,"
- "SUM(`年龄`) AS `总年龄` FROM student").show() # 统计年龄的最大值、总值,身高的平均值。
复制代码 4. 分组 查询(GROUP BY)
在分组查询(GROUP BY)。除聚合函数外,SELECT语句中的每个列都必须在GROUP BY子句中给出。如:如果需要再查询效果中显示性别和年级,则GROUP BY子句必须给出’性别’和’年级’。如:spark.sql("SELECT 性别,年级, COUNT(*) AS 人数 FROM student GROUP BY 性别,年级 ").show() ,则查询效果会有性别和年级两列数据。
代码如下:
- # 4.分组查询(GROUP BY)。除聚合函数外,SELECT语句中的每个列都必须在GROUP BY子句中给出。
- spark.sql("SELECT `性别`, COUNT(*) AS `人数` FROM student GROUP BY `性别` ").show() # 查询不同性别的学生人数
- spark.sql("SELECT `性别`, AVG(`年龄`) AS `平均年龄`,MAX(`身高`) AS `最高身高` "
- "FROM student "
- "GROUP BY `性别` ").show() # 按性别分组,求每组平均年龄,最高身高。
- # 对分组查询结果进行过滤。where分组前过滤,having分组后过滤
- spark.sql("SELECT `性别`, COUNT(*) AS `人数` FROM student GROUP BY `性别` HAVING COUNT(*)>4").show() # 查询性别人数大于4的
复制代码 where和having:都是对分组查询效果进行过滤。where分组前过滤,having分组后过滤
5.排序(ORDER BY)
使用排序(ORDER BY)排序,ASC升序,DESC降序。
- # 5.排序(ORDER BY)。
- spark.sql("SELECT * FROM student ORDER BY `年龄` ASC").show() # 按年龄升序ASC排序,DESC为降序
- spark.sql("SELECT * FROM student ORDER BY `年龄` ASC, `身高` DESC").show() # 按年龄升序ASC排序,身高DESC降序
复制代码 6. 多表联查
先用createOrReplaceTempView下令,把DataFrame对象映射出一个表名。
- # 6.多表连接查询
- cou.createOrReplaceTempView('course') # 将DataFrame对象映射出一个表名
- sc.createOrReplaceTempView('stuCourse') # 将DataFrame对象映射出一个表名
- spark.sql("SELECT student.`姓名`,stuCourse.`课程号`,stuCourse.`成绩` "
- "FROM student, stuCourse "
- "WHERE student.`学号`=stuCourse.`学号`").show() # 连表查询姓名,课程号,成绩
- spark.sql("SELECT S.`姓名`,C.`课程名`,SC.`成绩` "
- "FROM student AS S,course AS C,stuCourse AS SC "
- "WHERE S.`学号`=SC.`学号` AND C.`课程号`=SC.`课程号`").show() # 连表查询姓名,课程名,成绩
复制代码 7. udf函数
在Apache Spark中,UDF(User-Defined Function,用户自定义函数)答应扩展Spark SQL的功能,使其可以或许执行在标准SQL中不直接支持的复杂操作。UDF可以是用Scala、Java或Python等语言编写的函数,然后可以在Spark SQL查询中像调用内置函数一样调用它们。
先使用spark.udf.register注册UDF,然后再调用。
如下所示,如计算门生表姓名列的长度。先注册UDF函数,注册名为strLen,再使用UDF函数。
代码如下:
- # 用udf函数
- spark.udf.register("strLen", lambda x: len(x)) # 注册udf函数,实现Spark SQL 自定义函数。计算某列元素的长度
- spark.sql("SELECT *, strLen(`姓名`) AS len FROM student").show() # 计算姓名的长度,并新增一列len。
复制代码 查询效果如下图:
四、DataFrame根本操作
除了用Spark SQL对DataFrame对象进行操作外,DataFrame自身也支持各类数据操作。
1.一样平常查询(select,selectExpr)
select()方法用于选择特定列生成新的DataFrame。如果一个DataFrame列字段太多,只需查看某些列内容,可以使用select()方法。
selectExpr()方法看成SQL查询,既可以用于查看指定列,还可以对选定的列进行特别处理,如改列名、取绝对值、四舍五入等,终极返回新的DataFrame。
- # 1.一般查询(select,selectExpr),其中,selectExpr可对列进行特殊处理,如列相加、计算、绝对值、四舍五入。它被当作SQL查询
- stu2 = stu.select('姓名', '性别', '年龄') # 查询学生的姓名,性别,年龄
- stu2.show()
- stu.selectExpr("`姓名` ", "ROUND( `身高`,1) AS RoundedHeight").show() # selectExpr被当作SQL查询。取身高的四舍五入,保留1位小数。
- stu.selectExpr("`姓名`", "`年龄`", "`年龄`+10 AS AddAge").show() # 年龄增加10岁。
复制代码 2. 条件查询(where,filter)
根据指定条件筛选数据,where()和filter()方法都可以用于条件查询。与select()方法、contains()方法或正则匹配查询方法rlike()联合,可实现更复杂的查询功能。
- # 2. 条件查询。where,filter,效果一样。
- stu.select('姓名', '性别', '年龄').where(stu['年龄'] > 21).show() # 查询年龄大于21岁的学生
- stu.select("*").where((stu['籍贯'] == '广西') & (stu.性别 == '女')).show() # 查询籍贯为广西的女生信息
- stu.where(stu["姓名"].contains("小")).show() # 查询含“小”的姓名
- stu.where(stu["姓名"].rlike(r'^王.+(五|锋)$')).show() # 正则匹配查询,查询姓名第一个为王,最后一个为五或峰,且中间至少有一个字的学生
复制代码 3. 聚合(agg)
agg()方法用于聚合操作,用于部门列的统计,也可以与groupBy()方法组合使用,从而实现分组统计的功能。
用agg()方法做聚合时,常用的统计方法有:mean()、max()、min()、sum()等,这些方法来自pyspark.sql.functions类,需要先导入。
alias()方法为重命名列方法。
- # 3. 聚合(agg)
- stu.agg({'姓名': 'count'}).show() # 统计学生人数
- stu.agg(count('姓名').alias('学生人数')).show() # 统计学生人数,并重命名为:学生人数
- stu.agg({'年龄': 'max', '身高': 'avg'}).show() # 统计年龄的最大值,身高的平均值。
- stu.agg(max('年龄').alias('最大年龄'), avg('身高').alias('平均身高')).show() # 求最大年龄,平均身高,并重命名列名
复制代码 4. 分组(groupBy)
groupBy()方法可以根据指定的字段进行分组,在groupBy()方法之后,通常使用统计方法进行计算,如:count()(总和,仅用于数值型字段),mean()、max()、min()、sum()等。
- # 4. 分组(groupBy),根据指定的字段分组
- stu.groupBy('性别').count().show() # 查询不同性别的学生人数。求每组的总数
- stu.groupBy('性别').mean('年龄').show() # 按性别分组,求每组平均年龄
- # groupBy与agg组合使用,实现分组统计功能。可以重命名统计的列名
- stu.groupBy('性别').agg(mean('年龄').alias('平均年龄')).show() # 按性别分组,求每组平均年龄,并重命名列名为平均年龄
- stu.groupBy('性别').agg(mean('年龄').alias('平均年龄'),
- max('身高').alias('最高身高')).show() # 按性别分组,求每组平均年龄,最高身高。
复制代码 5. 排序(orderBy)
- # 5.排序(orderBy)
- stu.orderBy(stu.年龄.asc()).show() # 按照年龄升序排序
- stu.orderBy(stu.年龄.asc(), stu['身高'].desc()).show() # 按照年龄升序排序,如果年龄相同,再按照身高降序排序
复制代码 6. 数据去重(distinct)
去重是数据预处理中的重要环节。
**distinct()**方法用于删除重复行,返回不包罗重复记录的DataFrame。可以联合count()方法来判定数据是否有重复。
dropDuplicates()方法可以根据指定的字段进行去重操作。
- # 6.数据去重
- print('所有列名:', stu.columns) # 获取所有列名
- # 去重。distinct,dropDuplicates(可指定字段去重)
- print("去重前的数据行数:", stu.count())
- stu.distinct().show() # 去除重复记录
- print("去重后的数据行数:", stu.distinct().count())
- stu.dropDuplicates(['籍贯']).select('籍贯').show() # 去除 籍贯相同的记录,并选择查看籍贯,可以看出学生来自哪些省份。
复制代码 7. 缺失值处理(dropna,fillna)
缺失值指的是现有数据集中某个或某些属性的值是不完全的,存在空缺。处理缺失值是数据预处理时必不可少的环节。处理缺失值的方法有:删除、填充等。
**dropna()**方法可以删除含有缺失值的记录。只要某行记录有一个缺失值,则该行记录被删除。
**fillna()**方法可以用其他值填充缺失值。
先用summary()方法查看数据的根本信息,如下图所示。发现年事和身高两列只有8条记录,比其他列的9条记录少,存在缺失值。
代码如下:
- # 方法1:删除缺失值
- stu.summary().show() # 查看表数据的基本信息。发现年龄和身高只有8条记录,而其他列为9条。因此年龄和身高各有1个缺失值。
- print(stu.年龄.isNull()) # 判断年龄是否有缺失值。
- stu.dropna().show() # 删除缺失值,查看数据。删除含有缺失值的记录
- # 方法2:填充缺失值
- avg_age = stu.agg(mean("年龄")).collect()[0][0] # 先求年龄的平均值
- max_height = stu.agg(max("身高")).collect()[0][0] # 先求身高的最大值
- stu.fillna({'年龄': avg_age, '身高': max_height}).show() # 用年龄平均值、身高最大值填充对应的列的缺失值。
复制代码 上面的代码中,collect()方法返回一个列表,用于获取DataFrame的所有记录,并将DataFrame中每行的数据以Row情势完备地展示出来。如:stu.agg(mean(“年事”)).collect(),返回如下数据:
collect():返回的 DataFrame 只包罗一行一列(即“年事”的平均值),以是 collect() 方法将返回一个包罗单个元素的列表,这个元素自己也是一个列表(或 Row 对象),它包罗了平均值。
[0][0]: 这是一个索引操作,用于从 collect() 方法返回的列表中提取数据。[0] 提取了列表中的第一个(也是唯一一个)元素(它自己也是一个列表或 Row 对象),然后 [0] 再次被用来从这个内部列表中提取第一个(也是唯一一个)元素,即“年事”的平均值。
用聚合函数agg()和collect()方法计算出平均年事和最高身高 后,再用fillna()函数填充相应的缺失值。
8. 新增列(withColumn),重命名列(withColumnRenamed)
- # 7. 新增列(withColumn)、重命名列(withColumnRenamed)
- stu2 = stu.withColumn('AddAge', stu['年龄'] + 10) # 新增一列:AddAge,年龄+10
- stu2.show()
- stu2.withColumnRenamed('AddAge', '新增年龄').show() # 将AddAge列名改为:新增年龄
复制代码 9. udf函数
与Spark SQL注册udf函数有所差异,请留意区分。
- from pyspark.sql.functions import udf
- # 创建一个 UDF,它接受一个字符串并返回其长度(整数)
- mystrlen = udf(lambda x: len(x) if x is not None else 0, IntegerType())
- # 使用 UDF 在 stu DataFrame 上添加一个新列 'len',该列包含 '姓名' 列中每个字符串的长度
- stu.withColumn("len", mystrlen(stu['姓名'])).show()
复制代码 10. 其他常用方法(describe、printSchema)
describe():统计DataFrame数值型字段的信息,包括记录条数、平均值、样本标准差、最小值、最大值等。
printSchema():以树状格式输出DataFrame的模式信息,输出效果中有DataFrame的列名称、数据类型以及该数据字段的值是否可以为空。
- # 统计DataFrame数值型字段的信息,包括记录条数、平均值、样本标准差、最小值、最大值等。
- stu.describe().show()
- # 以树状格式输出DataFrame的模式信息,输出结果中有DataFrame的列名称、数据类型以及该数据字段的值是否可以为空。
- stu.printSchema()
复制代码 输出效果如下图所示:
五、 多表连接查询(join)
1. 内连接:inner
内连接:inner,把两张表中相互匹配的行选择出来
- # 学生信息表:stu,课程信息表:cou,学生选课表:sc
- stu.join(sc, stu.学号 == sc.学号) \
- .select(stu.姓名, sc.课程号, sc['成绩']).show() # 连接学生信息表:stu和学生选课表:sc。 查询姓名,课程号,成绩
- # 三表连接
- stu2 = stu.join(sc, stu.学号 == sc.学号, 'inner') # 1.先连接stu表和sc表。默认内连接inner
- stu3 = stu2.join(cou, stu2.课程号 == cou.课程号) # 2.再连接cou表
- stu3.select('姓名', '课程名', '成绩').show() # 3.查询学生 姓名,课程名,成绩
复制代码 三表内连接后的数据如下:
2. 外连接
**full/full_outer:**这种join就是把两张表的所有记录选择出来,如果一张表里有对应数据,另一张表里没有对应数据,就用NULL取代。
**left/left_outer:**这种join就是把左边的表的所有行都取出来,如果右边表有匹配的行,就用匹配的行,如果右边表没有匹配的行,就用NULL取代。
**right/right_outer:**这种join就是把右边的表的所有行都取出来,如果左边表有匹配的行,就用匹配的行,如果左边表没有匹配的行,就用NULL取代。
left_semi:这种join就是把左边表中能和右表中的行匹配的行取出来,只取左边表的记录。
**left_anti:**这种join是把左边表中不能和右边表中的行匹配的行取出来,也是只取左边表中的行。
**cross:**这种join就是把左边表中的所有行和右边表的所有行做乘积,相当于左边表中的每一行都和右边表中的所有行组合一次,即左边表×右边表。
六、小案例:分析哪种水果季节性最强
有四张水果贩卖数据表,分别是春季、夏季、秋季和冬季的水果贩卖情况。先对每个季度水果的贩卖量排名,然后取出每个季度销量最高的前3种水果做成一张表,分析这些水果中,哪种水果的季节性最强。所谓季节性,即有的季节销量非常高,而有的季节销量非常低。可以先计算四个季度之间的排名差,排名差最大的水果,就是季节性最强的水果。
1.创建春季、夏季、秋季和冬季的水果销量表
如下图所示:


 

代码如下:
- # -----------------小案例:计算哪种水果是季节性最强的----------------------------------------
- from pyspark.sql import Window
- from pyspark.sql.functions import * # 导入 functions的所有函数,包括:lit,col,min,max等
- data = [('香蕉', 90, 6), ('苹果', 50, 8), ('雪梨', 80, 7),
- ('葡萄', 150, 4), ('龙眼', 100, 5), ('荔枝', 200, 1),
- ('西瓜', 180, 2), ('榴莲', 170, 3), ('蓝莓', 15, 9),
- ('草莓', 13, 10), ('橙子', 12, 11)] # 创建数据
- spring = spark.createDataFrame(data, ['水果', '销量', '排名']) # spring,summer,autumn,winter
- spring = spring.withColumn("季节", lit("spring")) # 增加一列:季节
- spring.show()
- data = [('香蕉', 70, 5), ('苹果', 40, 8), ('雪梨', 60, 6),
- ('葡萄', 50, 7), ('龙眼', 150, 3), ('荔枝', 160, 2),
- ('西瓜', 200, 1), ('榴莲', 100, 4), ('蓝莓', 15, 9),
- ('草莓', 13, 10), ('橙子', 12, 11)] # 创建数据
- summer = spark.createDataFrame(data, ['水果', '销量', '排名']) # spring,summer,autumn,winter
- summer = summer.withColumn("季节", lit("summer"))
- summer.show()
- data = [('香蕉', 40), ('苹果', 150), ('雪梨', 120),
- ('葡萄', 60), ('龙眼', 20), ('荔枝', 30),
- ('西瓜', 50), ('榴莲', 35), ('蓝莓', 15),
- ('草莓', 13), ('橙子', 12)]
- autumn = spark.createDataFrame(data, ['水果', '销量'])
- import pyspark.sql.functions as F
- # windowSpec = Window.orderBy(autumn['销量'].desc()) # 创建一个 WindowSpec 对象,按销量降序排序。全局排序,会有警告信息提示没有分区
- windowSpec = Window.partitionBy(F.lit(1)).orderBy(autumn['销量'].desc()) # 按水果分区,可以均衡集群计算负载,避免警告信息
- autumn = autumn.withColumn("排名", row_number().over(windowSpec)) # 使用 row_number() 函数在窗口上生成排名,并添加到新的列 '排名'
- autumn = autumn.withColumn("季节", lit("autumn"))
- autumn.show()
- data = [('香蕉', 30), ('苹果', 200), ('雪梨', 182),
- ('葡萄', 50), ('龙眼', 20), ('荔枝', 20),
- ('西瓜', 30), ('榴莲', 35), ('蓝莓', 15),
- ('草莓', 13), ('橙子', 12)]
- winter = spark.createDataFrame(data, ['水果', '销量'])
- windowSpec = Window.partitionBy(F.lit(1)).orderBy(winter['销量'].desc()) # 使用 F.lit(1) 创建一个常量分区以及WindowSpec 对象。也可以消除警告
- winter = winter.withColumn("排名", dense_rank().over(windowSpec)) # dense_rank()在遇到相同值时给出相同的排名,
- winter = winter.withColumn("季节", lit("winter"))
- winter.show()
- # 合并四个季节的数据,计算全年每类水果的总销量
- all_seasons = spring.union(summer).union(autumn).union(winter) # 合并所有季节的DataFrame
- all_seasons.show()
- full_year = all_seasons.groupBy("水果").agg(F.sum("销量").alias("总销量")) # 按水果分组,计算每类水果的总销量
- full_year.show()
- windowSpec = Window.partitionBy(F.lit(1)).orderBy(full_year['总销量'].desc()) # 创建一个WindowSpec对象,对整个DataFrame进行排序
- full_year = full_year.withColumn("排名", dense_rank().over(windowSpec)) # 使用dense_rank()进行排名,并添加到新的列'销量排名'
- full_year.show()
复制代码 2.提取每个季节销量排名前3的水果名字
提取每个季节销量排名前3的水果名字,并将它们放在一张表top_fruits 中。效果如下图。
代码如下:
- # 提取每个季节销量排名前3的水果名字,并将它们合并放在一起。
- top_spring = spring.filter(spring["排名"] <= 3).select("水果")
- top_summer = summer.filter(summer['排名'] <= 3).select("水果")
- top_autumn = autumn.filter(autumn['排名'] <= 3).select("水果")
- top_winter = winter.filter(winter['排名'] <= 3).select("水果")
- top_winter.show()
- top_fruits = top_spring.union(top_summer).union(top_autumn).union(top_winter).distinct() # 使用union合并结果,去除重复项
- top_fruits.show()
复制代码 3. 春夏秋冬销量表左连接表top_fruits
将春季、夏季、秋季和冬季销量表,左连接排名前3的水果表top_fruits ,用于计算四个季度的排名差。需要留意,数据表连接后,有许多列名是重复的,因此连接时需要为每张表指定一个别名,否则连接后无法选择列。
连接表后,重命名列名,并选择相关的列,即选择每个季度的排名列。
代码如下:
- # 外连接:左连接4张表,需要为数据表指定一个别名,否则连接后无法选择列
- left_fruits = top_fruits.alias("t") \
- .join(spring.alias("s"), col('t.水果') == col('s.水果'), how='left_outer') \
- .join(summer.alias('su'), col('t.水果') == col('su.水果'), how='left_outer') \
- .join(autumn.alias('a'), col('t.水果') == col('a.水果'), how='left_outer') \
- .join(winter.alias('w'), col('t.水果') == col('w.水果'), how='left_outer') # 连接后有很多重复的列
- left_fruits.show()
- left_fruits = left_fruits.select(col('t.水果').alias('水果'),
- col('s.排名').alias('spring_排名'), col('s.销量').alias('spring_销量'),
- col('su.排名').alias('summer_排名'), col('su.销量').alias('summer_销量'),
- col('a.排名').alias('autumn_排名'), col('a.销量').alias('autumn_销量'),
- col('w.排名').alias('winter_排名'), col('w.销量').alias('winter_销量')) # 重命名相关列
- left_fruits_sel = left_fruits.select('水果', 'spring_排名', 'summer_排名', 'autumn_排名', 'winter_排名')
- left_fruits_sel.show()
复制代码 4.计算每类水果的季度排名差
先计算每类水果的季度排名差,然后计算排名差的最大值最小值,最后按照排名差最大值降序排序。
从效果可以看出,苹果和荔枝的季度排名差分别为7和6,名列排名差的第一和第二位,它们是季节性最强的水果。现实上,大部门水果都是季节性产品,本案例仅为说明外连接的编程思绪,分析效果无现实意义。
代码如下:
- # 计算每类水果各个季度排名差的最大值和最小值
- left_fruits_diffs = left_fruits_sel.withColumn(
- "min_diff",
- F.least(
- F.abs(F.col("spring_排名") - F.col("summer_排名")),
- F.abs(F.col("spring_排名") - F.col("autumn_排名")),
- F.abs(F.col("spring_排名") - F.col("winter_排名")),
- F.abs(F.col("summer_排名") - F.col("autumn_排名")),
- F.abs(F.col("summer_排名") - F.col("winter_排名")),
- F.abs(F.col("autumn_排名") - F.col("winter_排名"))
- )
- ).withColumn(
- "max_diff",
- F.greatest(
- F.abs(F.col("spring_排名") - F.col("summer_排名")),
- F.abs(F.col("spring_排名") - F.col("autumn_排名")),
- F.abs(F.col("spring_排名") - F.col("winter_排名")),
- F.abs(F.col("summer_排名") - F.col("autumn_排名")),
- F.abs(F.col("summer_排名") - F.col("winter_排名")),
- F.abs(F.col("autumn_排名") - F.col("winter_排名"))
- )
- )
- left_fruits_diffs.orderBy('max_diff', ascending=False).show() # 排名差降序排序
复制代码 5. 提取季节性最强的前2类水果
选择季节性最强的前2类水果,提取它们四个季度的销量情况,用于后续的可视化绘图。
代码如下:
- # 取季节性最强的前2类水果,提取它们四个季度的销量情况,用于后续的可视化绘图。
- top_diff = left_fruits_diffs.orderBy('max_diff', ascending=False).limit(2) # 取前两行数据
- top_diff.show() # 可以看出,苹果和荔枝的季节排名差最大,也就是它们是季节性最强的水果。
- top_diff = top_diff.select("水果")
- # collect_data=top_diff.collect()# 收集数据到驱动程序(只调用一次)。或者用topandas将pyspark dataframe转为pandas dataframe
- print(top_diff.collect())
- print(top_diff.collect()[0]) # 第1行数据
- print(top_diff.collect()[0][0]) # 第1行数据的值:苹果
- print(top_diff.collect()[1][0]) # 第2行数据的值:荔枝
- df5 = left_fruits.select("水果", 'spring_销量', 'summer_销量', 'autumn_销量', 'winter_销量') \
- .where(col('水果').isin(['苹果', top_diff.collect()[1][0]])) # 提取 苹果、荔枝 四个季度的销量情况
- df5.show()
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |