3. PySpark的根本操作

打印 上一主题 下一主题

主题 906|帖子 906|积分 2718

一、连接Spark集群

  初始化一个Spark应用程序,设置其名称(以便在Spark Web UI中识别)和连接到指定的Spark集群(通过spark://192.168.126.10:7077)。创建SparkSession对象,作为Spark应用程序的单一入口点,提供了对Spark SQL和DataFrame API的访问。
  代码如下:
  1. from pyspark import SparkConf, SparkContext
  2. from pyspark.sql import SparkSession, Row
  3. from pyspark.sql.functions import mean, max, col, length, substring, count, avg
  4. from pyspark.sql.types import StructType, StructField, StringType, IntegerType
  5. conf = SparkConf().setAppName("PySpark的基本操作").setMaster('spark://192.168.126.10:7077')
  6. sc = SparkContext.getOrCreate(conf)
  7. spark = SparkSession(sc)
复制代码
二、创建数据框DataFrame

  创建PySpark DataFrame数据框。创建3张数据框(数据表),分别为:门生表(stu),课程信息表(cou),门生选课表(sc)。
  门生表(stu)

  课程信息表(cou)

  门生选课表(sc)

  代码如下:
  1. # *********************创建数据框DataFrame*************************
  2. #  1.学生信息表
  3. da = [('202301', '张三', '男', '大一', 20, 1.72, '广西'), ('202302', '李四', '女', '大二', 21, 1.70, '广西'),
  4.       ('202303', '王小五', '男', '大三', 20, 1.60, '湖南'), ('202304', '小马', '男', '大一', 19, 1.65, '四川'),
  5.       ('202305', '小张', '女', '大二', None, 1.68, '贵州'), ('202306', '小李', '男', '大三', 22, 1.68, '福建'),
  6.       ('202307', '明明', '女', '大三', 23, 1.62, '广西'), ('202308', '王霆锋', '女', '大一', 20, None, '四川'),
  7.       ('202303', '王小五', '男', '大三', 20, 1.60, '湖南'), ]
  8. col = ['学号', '姓名', '性别', '年级', '年龄', '身高', '籍贯']
  9. stu = spark.createDataFrame(data=da, schema=col)
  10. # col = ['sno','name', 'sex', 'grade', 'age', 'height','birthplace']
  11. stu.show()
  12. stu.printSchema()  # 查看表结构
  13. stu.describe().show()
  14. # stu.summary().show()
  15. # 2.课程信息表
  16. da2 = [('s01', '大数据开发技术', 32, 2), ('s02', '数据库原理', 48, 3), ('s03', '机器学习', 48, 3)]
  17. col2 = ['课程号', '课程名', '学时', '学分']
  18. cou = spark.createDataFrame(data=da2, schema=col2)
  19. cou.printSchema()
  20. cou.show()
  21. # 3.学生选课表
  22. da3 = [('202301', 's01', 89), ('202301', 's02', 95), ('202302', 's02', 98), ('202303', 's03', 100),
  23.        ('202304', 's01', 96), ('202305', 's03', 99), ('202306', 's03', 94), ('202307', 's01', 93), ]
  24. col3 = ['学号', '课程号', '成绩']
  25. schema = StructType([
  26.     StructField("学号", StringType(), True),  # 学号字段是字符串类型,且可以为null
  27.     StructField("课程号", StringType(), True),
  28.     StructField("成绩", IntegerType(), True)])  # 成绩字段是整数类型,且可以为null
  29. sc = spark.createDataFrame(data=da3, schema=schema)
  30. sc.printSchema()  # 打印表结构
  31. sc.show()
复制代码
三、Spark SQL的根本用法

  可以通过SQL方式操作Spark中的数据,需要先将DataFrame对象映射出一个表名,然后通过表名进行各类操作。如要对门生信息表(stu)操作,先将stu映射为student表,再操作。
  代码如下:
  1. print('*************SQL操作***********')
  2. stu.createOrReplaceTempView("student")  # 需要先将DataFrame对象stu映射出一个表名student,然后通过表名进行各类操作
  3. stu2 = spark.sql("SELECT * FROM student")
  4. stu2.show()
  5. stu3 = spark.sql("SELECT `姓名`, `性别`, `年龄` FROM student")  # 需要反引号(键盘左上角波浪线那个按键)来包围中文列名,英文列名不需要反引号。
  6. stu3.show()
复制代码
  PySpark SQL可以对中文列名操作,但(键盘左上角波浪线谁人按键)来困绕中文列名,英文列名不需要反引号。
1. 一样平常查询及去重 (DISTINCT )

  查询的下令和SQL语句是一样的。
  门生表中,王小五有两条重复的记录,需要去重,使用DISTINCT 下令。
  代码如下:
  1. spark.sql("SELECT DISTINCT *  FROM student ").show()  # 去除重复值(记录)
  2. spark.sql("SELECT DISTINCT `年级` FROM student").show()  # 去除 年级 的重复值,可以看出有几个年级。
复制代码
2. 条件查询(WHERE )

  使用WHERE实现条件查询,联合LIKE实现模糊查询,联合REGEXP实现正则匹配查询。
  1. # 2.条件查询
  2. spark.sql("SELECT `姓名`, `性别`, `年龄` FROM student WHERE `年龄`>21").show()  # 查询年龄大于21岁的学生
  3. spark.sql("SELECT `姓名`, `性别`, `年龄` ,`籍贯` FROM student "
  4.           "WHERE `籍贯`='广西' AND `性别`='女'").show()  # 查询籍贯为广西的女生
  5. spark.sql("SELECT `姓名` FROM student WHERE `姓名` LIKE '%小%'").show()  # LIKE模糊查询,查询含“小”的姓名
  6. spark.sql("SELECT * FROM student WHERE `姓名` REGEXP '^王.+(五|锋)$'").show()  # 正则匹配查询,查询姓名第一个为王,最后一个为五或峰,且中间至少有一个字的学生
  7. spark.sql("SELECT `姓名`, `性别`, `年龄` FROM student WHERE `年龄` IS NULL ").show()  # 查询年龄为空值的学生
复制代码
3. 聚合函数(aggregation function)

  使用聚合函数(aggregation function),统计总和(sum)、平均数(mean)、最大值(max)、最小值(min)。聚合函数会对一组值进行统计并返回统计效果
  1. # 3.聚合函数(aggregation function),统计总和、平均数、最大值、最小值。聚合函数会对一组值进行统计并返回统计结果
  2. spark.sql("SELECT COUNT(*) AS `总人数` FROM student").show()  # 统计学生总人数
  3. spark.sql("SELECT MAX(`年龄`) AS `最大年龄`,AVG(`身高`) AS `平均身高`,"
  4.           "SUM(`年龄`) AS `总年龄` FROM student").show()  # 统计年龄的最大值、总值,身高的平均值。
复制代码
4. 分组 查询(GROUP BY)

  在分组查询(GROUP BY)。除聚合函数外,SELECT语句中的每个列都必须在GROUP BY子句中给出。如:如果需要再查询效果中显示性别和年级,则GROUP BY子句必须给出’性别’和’年级’。如:spark.sql("SELECT 性别,年级, COUNT(*) AS 人数 FROM student GROUP BY 性别,年级 ").show() ,则查询效果会有性别和年级两列数据。
  代码如下:
  1. # 4.分组查询(GROUP BY)。除聚合函数外,SELECT语句中的每个列都必须在GROUP BY子句中给出。
  2. spark.sql("SELECT `性别`, COUNT(*) AS `人数` FROM student GROUP BY `性别` ").show()  # 查询不同性别的学生人数
  3. spark.sql("SELECT `性别`, AVG(`年龄`) AS `平均年龄`,MAX(`身高`) AS `最高身高` "
  4.           "FROM student "
  5.           "GROUP BY `性别` ").show()  # 按性别分组,求每组平均年龄,最高身高。
  6. # 对分组查询结果进行过滤。where分组前过滤,having分组后过滤
  7. spark.sql("SELECT `性别`, COUNT(*) AS `人数` FROM student GROUP BY `性别` HAVING COUNT(*)>4").show()  # 查询性别人数大于4的
复制代码
  where和having:都是对分组查询效果进行过滤。where分组前过滤,having分组后过滤
5.排序(ORDER BY)

  使用排序(ORDER BY)排序,ASC升序,DESC降序。
  1. # 5.排序(ORDER BY)。
  2. spark.sql("SELECT * FROM student ORDER BY `年龄` ASC").show()  # 按年龄升序ASC排序,DESC为降序
  3. spark.sql("SELECT * FROM student ORDER BY `年龄` ASC, `身高` DESC").show()  # 按年龄升序ASC排序,身高DESC降序
复制代码
6. 多表联查

  先用createOrReplaceTempView下令,把DataFrame对象映射出一个表名。
  1. # 6.多表连接查询
  2. cou.createOrReplaceTempView('course')  # 将DataFrame对象映射出一个表名
  3. sc.createOrReplaceTempView('stuCourse')  # 将DataFrame对象映射出一个表名
  4. spark.sql("SELECT student.`姓名`,stuCourse.`课程号`,stuCourse.`成绩` "
  5.           "FROM student, stuCourse "
  6.           "WHERE student.`学号`=stuCourse.`学号`").show()  # 连表查询姓名,课程号,成绩
  7. spark.sql("SELECT S.`姓名`,C.`课程名`,SC.`成绩` "
  8.           "FROM student AS S,course AS C,stuCourse AS SC "
  9.           "WHERE S.`学号`=SC.`学号` AND C.`课程号`=SC.`课程号`").show()  # 连表查询姓名,课程名,成绩
复制代码
7. udf函数

  在Apache Spark中,UDF(User-Defined Function,用户自定义函数)答应扩展Spark SQL的功能,使其可以或许执行在标准SQL中不直接支持的复杂操作。UDF可以是用Scala、Java或Python等语言编写的函数,然后可以在Spark SQL查询中像调用内置函数一样调用它们。
  先使用spark.udf.register注册UDF,然后再调用。
  如下所示,如计算门生表姓名列的长度。先注册UDF函数,注册名为strLen,再使用UDF函数。
  代码如下:
  1. # 用udf函数
  2. spark.udf.register("strLen", lambda x: len(x))  # 注册udf函数,实现Spark SQL 自定义函数。计算某列元素的长度
  3. spark.sql("SELECT *, strLen(`姓名`) AS len FROM student").show()   # 计算姓名的长度,并新增一列len。
复制代码
  查询效果如下图:

四、DataFrame根本操作

  除了用Spark SQL对DataFrame对象进行操作外,DataFrame自身也支持各类数据操作。
1.一样平常查询(select,selectExpr)

  select()方法用于选择特定列生成新的DataFrame。如果一个DataFrame列字段太多,只需查看某些列内容,可以使用select()方法。
  selectExpr()方法看成SQL查询,既可以用于查看指定列,还可以对选定的列进行特别处理,如改列名、取绝对值、四舍五入等,终极返回新的DataFrame。
  1. # 1.一般查询(select,selectExpr),其中,selectExpr可对列进行特殊处理,如列相加、计算、绝对值、四舍五入。它被当作SQL查询
  2. stu2 = stu.select('姓名', '性别', '年龄')  # 查询学生的姓名,性别,年龄
  3. stu2.show()
  4. stu.selectExpr("`姓名` ", "ROUND( `身高`,1) AS RoundedHeight").show()  # selectExpr被当作SQL查询。取身高的四舍五入,保留1位小数。
  5. stu.selectExpr("`姓名`", "`年龄`", "`年龄`+10 AS AddAge").show()   # 年龄增加10岁。
复制代码
2. 条件查询(where,filter)

  根据指定条件筛选数据,where()和filter()方法都可以用于条件查询。与select()方法、contains()方法或正则匹配查询方法rlike()联合,可实现更复杂的查询功能。
  1. # 2. 条件查询。where,filter,效果一样。
  2. stu.select('姓名', '性别', '年龄').where(stu['年龄'] > 21).show()  # 查询年龄大于21岁的学生
  3. stu.select("*").where((stu['籍贯'] == '广西') & (stu.性别 == '女')).show()  # 查询籍贯为广西的女生信息
  4. stu.where(stu["姓名"].contains("小")).show()  # 查询含“小”的姓名
  5. stu.where(stu["姓名"].rlike(r'^王.+(五|锋)$')).show()  # 正则匹配查询,查询姓名第一个为王,最后一个为五或峰,且中间至少有一个字的学生
复制代码
3. 聚合(agg)

  agg()方法用于聚合操作,用于部门列的统计,也可以与groupBy()方法组合使用,从而实现分组统计的功能。
  用agg()方法做聚合时,常用的统计方法有:mean()、max()、min()、sum()等,这些方法来自pyspark.sql.functions类,需要先导入。
  alias()方法为重命名列方法。
  1. # 3. 聚合(agg)
  2. stu.agg({'姓名': 'count'}).show()  # 统计学生人数
  3. stu.agg(count('姓名').alias('学生人数')).show()  # 统计学生人数,并重命名为:学生人数
  4. stu.agg({'年龄': 'max', '身高': 'avg'}).show()  # 统计年龄的最大值,身高的平均值。
  5. stu.agg(max('年龄').alias('最大年龄'), avg('身高').alias('平均身高')).show()  # 求最大年龄,平均身高,并重命名列名
复制代码
4. 分组(groupBy)

  groupBy()方法可以根据指定的字段进行分组,在groupBy()方法之后,通常使用统计方法进行计算,如:count()(总和,仅用于数值型字段),mean()、max()、min()、sum()等。
  1. # 4. 分组(groupBy),根据指定的字段分组
  2. stu.groupBy('性别').count().show()  # 查询不同性别的学生人数。求每组的总数
  3. stu.groupBy('性别').mean('年龄').show()  # 按性别分组,求每组平均年龄
  4. # groupBy与agg组合使用,实现分组统计功能。可以重命名统计的列名
  5. stu.groupBy('性别').agg(mean('年龄').alias('平均年龄')).show()  # 按性别分组,求每组平均年龄,并重命名列名为平均年龄
  6. stu.groupBy('性别').agg(mean('年龄').alias('平均年龄'),
  7.                       max('身高').alias('最高身高')).show()  # 按性别分组,求每组平均年龄,最高身高。
复制代码
5. 排序(orderBy)

  1. # 5.排序(orderBy)
  2. stu.orderBy(stu.年龄.asc()).show()  # 按照年龄升序排序
  3. stu.orderBy(stu.年龄.asc(), stu['身高'].desc()).show()  # 按照年龄升序排序,如果年龄相同,再按照身高降序排序
复制代码
6. 数据去重(distinct)

  去重是数据预处理中的重要环节。
  **distinct()**方法用于删除重复行,返回不包罗重复记录的DataFrame。可以联合count()方法来判定数据是否有重复。
  dropDuplicates()方法可以根据指定的字段进行去重操作。
  1. # 6.数据去重
  2. print('所有列名:', stu.columns)  # 获取所有列名
  3. # 去重。distinct,dropDuplicates(可指定字段去重)
  4. print("去重前的数据行数:", stu.count())
  5. stu.distinct().show()  # 去除重复记录
  6. print("去重后的数据行数:", stu.distinct().count())
  7. stu.dropDuplicates(['籍贯']).select('籍贯').show()  # 去除 籍贯相同的记录,并选择查看籍贯,可以看出学生来自哪些省份。
复制代码
7. 缺失值处理(dropna,fillna)

  缺失值指的是现有数据集中某个或某些属性的值是不完全的,存在空缺。处理缺失值是数据预处理时必不可少的环节。处理缺失值的方法有:删除、填充等。
  **dropna()**方法可以删除含有缺失值的记录。只要某行记录有一个缺失值,则该行记录被删除。
  **fillna()**方法可以用其他值填充缺失值。
  先用summary()方法查看数据的根本信息,如下图所示。发现年事和身高两列只有8条记录,比其他列的9条记录少,存在缺失值。

代码如下:
  1. # 方法1:删除缺失值
  2. stu.summary().show()  # 查看表数据的基本信息。发现年龄和身高只有8条记录,而其他列为9条。因此年龄和身高各有1个缺失值。
  3. print(stu.年龄.isNull())  # 判断年龄是否有缺失值。
  4. stu.dropna().show()  # 删除缺失值,查看数据。删除含有缺失值的记录
  5. # 方法2:填充缺失值
  6. avg_age = stu.agg(mean("年龄")).collect()[0][0]  # 先求年龄的平均值
  7. max_height = stu.agg(max("身高")).collect()[0][0]  # 先求身高的最大值
  8. stu.fillna({'年龄': avg_age, '身高': max_height}).show()  # 用年龄平均值、身高最大值填充对应的列的缺失值。
复制代码
  上面的代码中,collect()方法返回一个列表,用于获取DataFrame的所有记录,并将DataFrame中每行的数据以Row情势完备地展示出来。如:stu.agg(mean(“年事”)).collect(),返回如下数据:

  collect():返回的 DataFrame 只包罗一行一列(即“年事”的平均值),以是 collect() 方法将返回一个包罗单个元素的列表,这个元素自己也是一个列表(或 Row 对象),它包罗了平均值。
  [0][0]: 这是一个索引操作,用于从 collect() 方法返回的列表中提取数据。[0] 提取了列表中的第一个(也是唯一一个)元素(它自己也是一个列表或 Row 对象),然后 [0] 再次被用来从这个内部列表中提取第一个(也是唯一一个)元素,即“年事”的平均值。
  用聚合函数agg()和collect()方法计算出平均年事和最高身高 后,再用fillna()函数填充相应的缺失值。
8. 新增列(withColumn),重命名列(withColumnRenamed)

  1. # 7. 新增列(withColumn)、重命名列(withColumnRenamed)
  2. stu2 = stu.withColumn('AddAge', stu['年龄'] + 10)  # 新增一列:AddAge,年龄+10
  3. stu2.show()
  4. stu2.withColumnRenamed('AddAge', '新增年龄').show()  # 将AddAge列名改为:新增年龄
复制代码
9. udf函数

  与Spark SQL注册udf函数有所差异,请留意区分。
  1. from pyspark.sql.functions import udf
  2. #   创建一个 UDF,它接受一个字符串并返回其长度(整数)
  3. mystrlen = udf(lambda x: len(x) if x is not None else 0, IntegerType())
  4. # 使用 UDF 在 stu DataFrame 上添加一个新列 'len',该列包含 '姓名' 列中每个字符串的长度
  5. stu.withColumn("len", mystrlen(stu['姓名'])).show()
复制代码
10. 其他常用方法(describe、printSchema)

  describe():统计DataFrame数值型字段的信息,包括记录条数、平均值、样本标准差、最小值、最大值等。
  printSchema():以树状格式输出DataFrame的模式信息,输出效果中有DataFrame的列名称、数据类型以及该数据字段的值是否可以为空。
  1. # 统计DataFrame数值型字段的信息,包括记录条数、平均值、样本标准差、最小值、最大值等。
  2. stu.describe().show()  
  3. # 以树状格式输出DataFrame的模式信息,输出结果中有DataFrame的列名称、数据类型以及该数据字段的值是否可以为空。
  4. stu.printSchema()  
复制代码
  输出效果如下图所示:

五、 多表连接查询(join)

1. 内连接:inner

  内连接:inner,把两张表中相互匹配的行选择出来
  1. # 学生信息表:stu,课程信息表:cou,学生选课表:sc
  2. stu.join(sc, stu.学号 == sc.学号) \
  3.     .select(stu.姓名, sc.课程号, sc['成绩']).show()  # 连接学生信息表:stu和学生选课表:sc。 查询姓名,课程号,成绩
  4. # 三表连接
  5. stu2 = stu.join(sc, stu.学号 == sc.学号, 'inner')  # 1.先连接stu表和sc表。默认内连接inner
  6. stu3 = stu2.join(cou, stu2.课程号 == cou.课程号)  # 2.再连接cou表
  7. stu3.select('姓名', '课程名', '成绩').show()  # 3.查询学生 姓名,课程名,成绩
复制代码
  三表内连接后的数据如下:

2. 外连接

   **full/full_outer:**这种join就是把两张表的所有记录选择出来,如果一张表里有对应数据,另一张表里没有对应数据,就用NULL取代。
  **left/left_outer:**这种join就是把左边的表的所有行都取出来,如果右边表有匹配的行,就用匹配的行,如果右边表没有匹配的行,就用NULL取代。
  **right/right_outer:**这种join就是把右边的表的所有行都取出来,如果左边表有匹配的行,就用匹配的行,如果左边表没有匹配的行,就用NULL取代。
  left_semi:这种join就是把左边表中能和右表中的行匹配的行取出来,只取左边表的记录。
  **left_anti:**这种join是把左边表中不能和右边表中的行匹配的行取出来,也是只取左边表中的行。
  **cross:**这种join就是把左边表中的所有行和右边表的所有行做乘积,相当于左边表中的每一行都和右边表中的所有行组合一次,即左边表×右边表。
六、小案例:分析哪种水果季节性最强

  有四张水果贩卖数据表,分别是春季、夏季、秋季和冬季的水果贩卖情况。先对每个季度水果的贩卖量排名,然后取出每个季度销量最高的前3种水果做成一张表,分析这些水果中,哪种水果的季节性最强。所谓季节性,即有的季节销量非常高,而有的季节销量非常低。可以先计算四个季度之间的排名差,排名差最大的水果,就是季节性最强的水果。
1.创建春季、夏季、秋季和冬季的水果销量表

  如下图所示:




代码如下:
  1. # -----------------小案例:计算哪种水果是季节性最强的----------------------------------------
  2. from pyspark.sql import Window
  3. from pyspark.sql.functions import *  # 导入 functions的所有函数,包括:lit,col,min,max等
  4. data = [('香蕉', 90, 6), ('苹果', 50, 8), ('雪梨', 80, 7),
  5.         ('葡萄', 150, 4), ('龙眼', 100, 5), ('荔枝', 200, 1),
  6.         ('西瓜', 180, 2), ('榴莲', 170, 3), ('蓝莓', 15, 9),
  7.         ('草莓', 13, 10), ('橙子', 12, 11)]  # 创建数据
  8. spring = spark.createDataFrame(data, ['水果', '销量', '排名'])  # spring,summer,autumn,winter
  9. spring = spring.withColumn("季节", lit("spring"))  # 增加一列:季节
  10. spring.show()
  11. data = [('香蕉', 70, 5), ('苹果', 40, 8), ('雪梨', 60, 6),
  12.         ('葡萄', 50, 7), ('龙眼', 150, 3), ('荔枝', 160, 2),
  13.         ('西瓜', 200, 1), ('榴莲', 100, 4), ('蓝莓', 15, 9),
  14.         ('草莓', 13, 10), ('橙子', 12, 11)]  # 创建数据
  15. summer = spark.createDataFrame(data, ['水果', '销量', '排名'])  # spring,summer,autumn,winter
  16. summer = summer.withColumn("季节", lit("summer"))
  17. summer.show()
  18. data = [('香蕉', 40), ('苹果', 150), ('雪梨', 120),
  19.         ('葡萄', 60), ('龙眼', 20), ('荔枝', 30),
  20.         ('西瓜', 50), ('榴莲', 35), ('蓝莓', 15),
  21.         ('草莓', 13), ('橙子', 12)]
  22. autumn = spark.createDataFrame(data, ['水果', '销量'])
  23. import pyspark.sql.functions as F
  24. # windowSpec = Window.orderBy(autumn['销量'].desc())  # 创建一个 WindowSpec 对象,按销量降序排序。全局排序,会有警告信息提示没有分区
  25. windowSpec = Window.partitionBy(F.lit(1)).orderBy(autumn['销量'].desc())  # 按水果分区,可以均衡集群计算负载,避免警告信息
  26. autumn = autumn.withColumn("排名", row_number().over(windowSpec))  # 使用 row_number() 函数在窗口上生成排名,并添加到新的列 '排名'
  27. autumn = autumn.withColumn("季节", lit("autumn"))
  28. autumn.show()
  29. data = [('香蕉', 30), ('苹果', 200), ('雪梨', 182),
  30.         ('葡萄', 50), ('龙眼', 20), ('荔枝', 20),
  31.         ('西瓜', 30), ('榴莲', 35), ('蓝莓', 15),
  32.         ('草莓', 13), ('橙子', 12)]
  33. winter = spark.createDataFrame(data, ['水果', '销量'])
  34. windowSpec = Window.partitionBy(F.lit(1)).orderBy(winter['销量'].desc())  # 使用 F.lit(1) 创建一个常量分区以及WindowSpec 对象。也可以消除警告
  35. winter = winter.withColumn("排名", dense_rank().over(windowSpec))  # dense_rank()在遇到相同值时给出相同的排名,
  36. winter = winter.withColumn("季节", lit("winter"))
  37. winter.show()
  38. # 合并四个季节的数据,计算全年每类水果的总销量
  39. all_seasons = spring.union(summer).union(autumn).union(winter)  # 合并所有季节的DataFrame
  40. all_seasons.show()
  41. full_year = all_seasons.groupBy("水果").agg(F.sum("销量").alias("总销量"))  # 按水果分组,计算每类水果的总销量
  42. full_year.show()
  43. windowSpec = Window.partitionBy(F.lit(1)).orderBy(full_year['总销量'].desc())  # 创建一个WindowSpec对象,对整个DataFrame进行排序
  44. full_year = full_year.withColumn("排名", dense_rank().over(windowSpec))  # 使用dense_rank()进行排名,并添加到新的列'销量排名'
  45. full_year.show()
复制代码
2.提取每个季节销量排名前3的水果名字

  提取每个季节销量排名前3的水果名字,并将它们放在一张表top_fruits 中。效果如下图。

  代码如下:
  1. # 提取每个季节销量排名前3的水果名字,并将它们合并放在一起。
  2. top_spring = spring.filter(spring["排名"] <= 3).select("水果")
  3. top_summer = summer.filter(summer['排名'] <= 3).select("水果")
  4. top_autumn = autumn.filter(autumn['排名'] <= 3).select("水果")
  5. top_winter = winter.filter(winter['排名'] <= 3).select("水果")
  6. top_winter.show()
  7. top_fruits = top_spring.union(top_summer).union(top_autumn).union(top_winter).distinct()  # 使用union合并结果,去除重复项
  8. top_fruits.show()
复制代码
3. 春夏秋冬销量表左连接表top_fruits

  将春季、夏季、秋季和冬季销量表,左连接排名前3的水果表top_fruits ,用于计算四个季度的排名差。需要留意,数据表连接后,有许多列名是重复的,因此连接时需要为每张表指定一个别名,否则连接后无法选择列。

  连接表后,重命名列名,并选择相关的列,即选择每个季度的排名列。

  代码如下:
  1. # 外连接:左连接4张表,需要为数据表指定一个别名,否则连接后无法选择列
  2. left_fruits = top_fruits.alias("t") \
  3.     .join(spring.alias("s"), col('t.水果') == col('s.水果'), how='left_outer') \
  4.     .join(summer.alias('su'), col('t.水果') == col('su.水果'), how='left_outer') \
  5.     .join(autumn.alias('a'), col('t.水果') == col('a.水果'), how='left_outer') \
  6.     .join(winter.alias('w'), col('t.水果') == col('w.水果'), how='left_outer')  # 连接后有很多重复的列
  7. left_fruits.show()
  8. left_fruits = left_fruits.select(col('t.水果').alias('水果'),
  9.                                  col('s.排名').alias('spring_排名'), col('s.销量').alias('spring_销量'),
  10.                                  col('su.排名').alias('summer_排名'), col('su.销量').alias('summer_销量'),
  11.                                  col('a.排名').alias('autumn_排名'), col('a.销量').alias('autumn_销量'),
  12.                                  col('w.排名').alias('winter_排名'), col('w.销量').alias('winter_销量'))  # 重命名相关列
  13. left_fruits_sel = left_fruits.select('水果', 'spring_排名', 'summer_排名', 'autumn_排名', 'winter_排名')
  14. left_fruits_sel.show()
复制代码
4.计算每类水果的季度排名差

  先计算每类水果的季度排名差,然后计算排名差的最大值最小值,最后按照排名差最大值降序排序。

  从效果可以看出,苹果和荔枝的季度排名差分别为7和6,名列排名差的第一和第二位,它们是季节性最强的水果。现实上,大部门水果都是季节性产品,本案例仅为说明外连接的编程思绪,分析效果无现实意义。
代码如下:
  1. # 计算每类水果各个季度排名差的最大值和最小值
  2. left_fruits_diffs = left_fruits_sel.withColumn(
  3.     "min_diff",
  4.     F.least(
  5.         F.abs(F.col("spring_排名") - F.col("summer_排名")),
  6.         F.abs(F.col("spring_排名") - F.col("autumn_排名")),
  7.         F.abs(F.col("spring_排名") - F.col("winter_排名")),
  8.         F.abs(F.col("summer_排名") - F.col("autumn_排名")),
  9.         F.abs(F.col("summer_排名") - F.col("winter_排名")),
  10.         F.abs(F.col("autumn_排名") - F.col("winter_排名"))
  11.     )
  12. ).withColumn(
  13.     "max_diff",
  14.     F.greatest(
  15.         F.abs(F.col("spring_排名") - F.col("summer_排名")),
  16.         F.abs(F.col("spring_排名") - F.col("autumn_排名")),
  17.         F.abs(F.col("spring_排名") - F.col("winter_排名")),
  18.         F.abs(F.col("summer_排名") - F.col("autumn_排名")),
  19.         F.abs(F.col("summer_排名") - F.col("winter_排名")),
  20.         F.abs(F.col("autumn_排名") - F.col("winter_排名"))
  21.     )
  22. )
  23. left_fruits_diffs.orderBy('max_diff', ascending=False).show()  # 排名差降序排序
复制代码
5. 提取季节性最强的前2类水果

  选择季节性最强的前2类水果,提取它们四个季度的销量情况,用于后续的可视化绘图。

  代码如下:
  1. # 取季节性最强的前2类水果,提取它们四个季度的销量情况,用于后续的可视化绘图。
  2. top_diff = left_fruits_diffs.orderBy('max_diff', ascending=False).limit(2)  # 取前两行数据
  3. top_diff.show()  # 可以看出,苹果和荔枝的季节排名差最大,也就是它们是季节性最强的水果。
  4. top_diff = top_diff.select("水果")
  5. # collect_data=top_diff.collect()# 收集数据到驱动程序(只调用一次)。或者用topandas将pyspark dataframe转为pandas dataframe
  6. print(top_diff.collect())
  7. print(top_diff.collect()[0])  # 第1行数据
  8. print(top_diff.collect()[0][0])  # 第1行数据的值:苹果
  9. print(top_diff.collect()[1][0])  # 第2行数据的值:荔枝
  10. df5 = left_fruits.select("水果", 'spring_销量', 'summer_销量', 'autumn_销量', 'winter_销量') \
  11.     .where(col('水果').isin(['苹果', top_diff.collect()[1][0]]))  # 提取 苹果、荔枝 四个季度的销量情况
  12. df5.show()
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

络腮胡菲菲

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表