day06_Spark SQL

王柳  论坛元老 | 2025-1-14 12:40:19 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1911|帖子 1911|积分 5733

day06_Spark SQL课程笔记

一、今日课程内容



  • 1- DataFrame详解(掌握)
  • 2- Spark SQL的综合案例(掌握)
  • 3- Spark SQL函数定义(掌握)
今日目的:掌握DataFrame详解

二、DataFrame详解(掌握)

5.清洗相干的API

API是什么
   

  • 简单来说:API(应用程序编程接口)就像是“软件之间的翻译官”,定义了不同系统或组件之间如何交互和通讯,让开发者能够更方便地利用外部功能或服务。
  • 具体而言

    • 定义:API是一组预定义的函数、协议和工具,用于构建软件应用程序。它规定了如何哀求服务、通报数据以及吸收结果。
    • 类型

      • Web API:基于HTTP协议,用于Web服务之间的通讯,如RESTful API、GraphQL。
      • 库API:编程语言提供的函数库,如Python的NumPy、Pandas。
      • 操作系统API:操作系统提供的接口,如Windows API、Linux系统调用。

    • 工作原理

      • 哀求:客户端发送哀求(如HTTP哀求)到API。
      • 处理:服务器吸收哀求并处理。
      • 响应:服务器返回结果(如JSON数据)给客户端。


  • 实际生产场景

    • 在移动应用中,利用Web API获取天气数据或支付服务。
    • 在数据分析中,利用Python库API(如Pandas)处理数据。
    • 在系统开发中,利用操作系统API管理文件或进程。

  • 总之:API是软件开发的基石,通过标准化接口简化了系统间的交互,进步了开发效率和代码复用性。
  1. 总结:
  2. 1- dropDuplicates(subset):用来删除重复数据。
  3.         1.1- 如果没有指定参数subset,那么要比对行中的所有字段内容,如果全部相同,就认为是重复数据,会被删除;
  4.         1.2- 如果有指定参数subset,那么只比对subset中指定的字段范围内删除重复数据
  5.             
  6. 2- dropna(thresh,subset):删除缺失值数据.
  7.    2.1- 如果不传递任何参数,只要有任意一个字段值为null,那么就删除整行数据
  8.    2.2- 如果只指定了subset,那么空值的检查,就只会限定在subset指定的范围内
  9.    2.3- 如果只指定了thresh,那么空值检查的这些字段中,至少需要有thresh个字段的值不为空,才不会被删除
  10.             
  11. 3- fillna(value,subset): 替换缺失值数据
  12.    3.1- value: 必须要传递参数.是用来填充缺失值的,默认填充所有的缺失值
  13.    3.1- subset: 如果有指定参数subset,那么只比对subset中指定的字段范围内替换
  14.    
  15.    注意:value最常用的是传递字典的形式
复制代码
代码演示:
  1. # 导包
  2. import os
  3. from pyspark.sql import SparkSession
  4. # 绑定指定的python解释器
  5. os.environ['SPARK_HOME'] = '/export/server/spark'
  6. os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  8. def demo1_dropDuplicates():
  9.     # dropDuplicates: 如果有重复就会去重,保留第一个,当然也可以指定参考的列
  10.     df.dropDuplicates().show()
  11.     df.dropDuplicates(['name']).show()
  12. def demo2_dropna():
  13.     # dropna: 默认去除带null每行数据,当然也可以指定参考的列
  14.     df.dropna().show()
  15.     df.dropna(thresh=2).show()
  16.     df.dropna(thresh=1, subset=['id', 'name']).show()
  17. def demo3_fillna():
  18.     # fillna: 默认只要带null就补充指定内容,当然也可以指定参考列
  19.     df.fillna('空').show()
  20.     df.fillna('空', subset=['address']).show()
  21.     df.fillna({'name': '未知', 'address': '广州'}).show()
  22. # 创建main函数
  23. if __name__ == '__main__':
  24.     # 1.创建spark对象
  25.     # appName:应用程序名称  master:提交模式
  26.     # getOrCreate:在builder构建器中获取一个存在的SparkSession,如果不存在,则创建一个新的
  27.     spark = SparkSession.builder.appName('sparksql_demo').master('local[*]').getOrCreate()
  28.     # 2.通过read读取外部文件方式创建DF对象
  29.     df = spark.read \
  30.         .format('csv') \
  31.         .option('header', True) \
  32.         .load('file:///export/data/spark_project/spark_sql/data/clear_data.csv')
  33.     # 3.show直接展示
  34.     df.show()
  35.     # 4.清洗数据
  36.     # dropDuplicates: 如果有重复就会去重,保留第一个,当然也可以指定参考的列
  37.     # demo1_dropDuplicates()
  38.     print('------------------------------------------------------')
  39.     # dropna: 默认去除带null每行数据,当然也可以指定参考的列
  40.     # demo2_dropna()
  41.     print('------------------------------------------------------')
  42.     # fillna: 默认只要带null就补充指定内容,当然也可以指定参考列
  43.     demo3_fillna()
  44.     # 5.释放资源
  45.     spark.stop()
复制代码
6.Spark SQL的Shuffle分区设置

   补充:
  如果运行sparksql,发现Shuffle分区每次都是1,大概后续count_distinct找不到,那么是因为pyspark版本缘故原由导致。解决办法如下:
  1- 检查自己3台呆板的pyspark版本是否是3.1.2版本
  pip list | grep pyspark
  2-如果不是3.1.2版本,那么先卸载pyspark
  下令: pip uninstall pyspark
  3- 再按照【Spark课程阶段_部署文档.doc】中重新安装3.1.2版本pyspark
  下令: pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark==3.1.2
  ​
​ Spark SQL底层本质上还是Spark的RDD程序,以为 Spark SQL组件就是一款翻译软件,用于将SQL/DSL翻译为Spark RDD程序, 实验运行
​ Spark SQL中同样也是存在shuffle的分区的,在实验shuffle分区后, shuffle分区数量默以为 200个,但是实际中, 一般都是需要调整这个分区的, 因为当数据量比较少的数据, 200个分区相对来说比较大一些, 但是当数据量比较大的时间, 200个分区显得比较小

如何调整shuffle分区数量呢? spark.sql.shuffle.partitions
  1. 方案一(不推荐):  直接修改spark的配置文件spark-defaults.conf。全局设置,默认值为200。设置为:
  2.                 spark.sql.shuffle.partitions     20
  3. 方案二(常用,推荐使用): 在客户端通过submit命令提交的时候, 动态设置shuffle的分区数量。部署、上线的时候、基于spark-submit提交运行的时候
  4.         ./spark-submit --conf "spark.sql.shuffle.partitions=20"
  5. 方案三(比较常用): 在代码中设置。主要在测试环境中使用, 但是一般在部署上线的时候, 会删除。优先级也是最高的。一般的使用场景是,当你的数据量未来不会发生太大的波动。
  6. 设置shuffle分区的数量方式1:SparkSession.builder.config('spark.sql.shuffle.partitions', 1)
  7. 设置shuffle分区的数量方式2:spark.conf.set("spark.sql.shuffle.partitions",'1')
  8.        
  9. 获取shuffle分区的数量:spark.conf.get("spark.sql.shuffle.partitions")
复制代码
  1. # 导包
  2. import os
  3. import time
  4. from pyspark.sql import SparkSession, functions as F
  5. # 绑定指定的python解释器
  6. os.environ['SPARK_HOME'] = '/export/server/spark'
  7. os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  8. os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  9. # 创建main函数
  10. if __name__ == '__main__':
  11.     # 7.1 TODO: 记录程序开始时间
  12.     start = time.time()
  13.     # 1.创建spark对象
  14.     # appName:应用程序名称  master:提交模式
  15.     # getOrCreate:在builder构建器中获取一个存在的SparkSession,如果不存在,则创建一个新的
  16.     spark = SparkSession.builder \
  17.         .config('spark.sql.shuffle.partitions', 1)\
  18.         .appName('sparksql_demo')\
  19.         .master('local[*]')\
  20.         .getOrCreate()
  21.     # 获取shuffle分区的数量
  22.     shuffle_partitions = spark.conf.get("spark.sql.shuffle.partitions")
  23.     print("Shuffle partitions:", shuffle_partitions)
  24.    
  25.     # 2.通过read读取外部文件方式创建DF对象
  26.     df = spark.read \
  27.         .format('text') \
  28.         .schema('words string') \
  29.         .load('file:///export/data/spark_project/spark_sql/data/data3.txt')
  30.     print(type(df))
  31.     # 需求: 从data3.txt读取所有单词,然后统计每个单词出现的次数
  32.     # 3.SQL风格
  33.     # 方式1: 使用子查询方式
  34.     # 先创建临时视图,然后通过sql语句查询展示
  35.     df.createTempView('words_tb')
  36.     qdf = spark.sql(
  37.         "select words,count(1) as cnt from (select explode(split(words,' ')) as words from words_tb) t group by words"
  38.     )
  39.     print(type(qdf))
  40.     qdf.show()
  41.     # 4.DSL风格
  42.     # 方式2: 分组后用agg函数
  43.     df.select(
  44.         F.explode(F.split('words', ' ')).alias('words')
  45.     ).groupBy('words').agg(
  46.         F.count('words').alias('cnt')
  47.     ).show()
  48.     # 7.2 TODO:记录结束时间
  49.     end = time.time()
  50.     # 7.3 计算运行时间
  51.     # 结论: 合理设置分区数,效率会提高!
  52.     t = end - start
  53.     print(f"程序运行了{t}秒")
  54.     # 6.TODO:为了方便查看web页面可以让程序多睡会儿
  55.     # time.sleep(1000)
  56.     # 5.释放资源
  57.     spark.stop()
复制代码
7.数据写出操作

写出到文件

同一的输出语法:

  1. 对应的简写API格式如下,以CSV为例:
  2. init_df.write.csv(
  3.     path='存储路径',
  4.     mode='模式',
  5.     header=True,
  6.     sep='\001',
  7.     encoding='UTF-8'
  8. )
  9. 常用参数说明:
  10.     1- path:指定结果数据输出路径。支持本地文件系统和HDFS文件系统
  11.     2- mode:当输出目录中文件已经存在的时候处理办法
  12.         2.1- append:追加。如果文件已经存在,那么继续在该目录下产生新的文件
  13.         2.2- overwrite:覆盖。如果文件已经存在,那么就先将已有的文件清除,再写入进去
  14.         2.3- ignore:忽略。如果文件已经存在,那么不执行任何操作
  15.         2.4- error:报错。如果文件已经存在,那么直接报错。会报错AnalysisException: path        
  16.                                 file:xxx already exists.
  17.         
  18.     3- sep:字段间的分隔符
  19.     4- header:数据输出的时候,是否要将字段名称输出到文件的第一行。推荐设置为True
  20.     5- encoding:文件输出的编码方式
复制代码


  • 演示1: 输出到文件中 json csv orc text …
  1. # 导包
  2. import os
  3. from pyspark.sql import SparkSession
  4. # 绑定指定的python解释器
  5. os.environ['SPARK_HOME'] = '/export/server/spark'
  6. os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  8. # 创建main函数
  9. if __name__ == '__main__':
  10.     # 1.创建SparkContext对象
  11.     spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()
  12.     spark.conf.set("spark.sql.shuffle.partitions", '1')
  13.     # 2.数据输入
  14.     df = spark.read\
  15.         .format('csv')\
  16.         .option('header',True)\
  17.         .load('file:///export/data/spark_project/spark_sql/data/clear_data.csv')
  18.     # 3.数据处理(切分,转换,分组聚合)
  19.     etldf = df.dropDuplicates().dropna()
  20.     etldf.show()
  21.     # 4.数据输出
  22.     # 原始API
  23.     etldf.write\
  24.         .format('csv')\
  25.         .option('sep',',')\
  26.         .option('header',True)\
  27.         .mode('overwrite')\
  28.         .save('file:///export/data/spark_project/spark_sql/data/output')
  29.     # 简化API
  30.     etldf.write.csv(
  31.         sep=',',
  32.         header=True,
  33.         mode='overwrite',
  34.         path='file:///export/data/spark_project/spark_sql/data/output2'
  35.     )
  36.     # 5.关闭资源
  37.     spark.stop()
复制代码
写出到数据库



  • 将结果数据基于JDBC方案, 输出到关系型数据库, 比方说: MySql
  1. from pyspark import SparkConf, SparkContext
  2. import os
  3. from pyspark.sql import SparkSession
  4. # 绑定指定的Python解释器
  5. os.environ['SPARK_HOME'] = '/export/server/spark'
  6. os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  8. if __name__ == '__main__':
  9.     print("数据输出到数据库")
  10.     # 1- 创建SparkSession对象
  11.     spark = SparkSession.builder\
  12.         .config("spark.sql.shuffle.partitions","1")\
  13.         .appName('sparksql_database')\
  14.         .master('local[*]')\
  15.         .getOrCreate()
  16.     # 2- 数据输入
  17.     init_df = spark.read.csv(
  18.         path='file:///export/data/gz16_pyspark/02_spark_sql/data/stu.txt',
  19.         sep=' ',
  20.         encoding='UTF-8',
  21.         header="True",
  22.         inferSchema=True
  23.     )
  24.     # 3- 数据处理
  25.     result = init_df.where('age>=20')
  26.     # 4- 数据输出
  27.     result.show()
  28.     result.printSchema()
  29.     # 数据输出到数据
  30.     """
  31.         创建数据库命令:create database day06 character set utf8;
  32.     """
  33.     result.write.jdbc(
  34.         url='jdbc:mysql://node1:3306/day06?useUnicode=true&characterEncoding=utf-8',
  35.         table='student',
  36.         mode='append',
  37.         sql={ 'user' : 'root', 'password' : '123456' }
  38.     )
  39.     # 5- 释放资源
  40.     spark.stop()
复制代码
运行结果截图:

可能出现的错误一:

  1.     etldf.write.jdbc(
  2.        url='jdbc:mysql://node1:3306/库名',
  3.        table='表名',
  4.        mode='append',
  5.        # 解决方法: 给密码123456加上引号
  6.        sql={'user':'root','password':'123456'}
  7.     )
复制代码
可能出现的错误 二:

  1. 原因:  缺少连接MySQL数据库的驱动
  2. 解决方法如下:
  3. 数据库的驱动包, 一般都是一些Jar包,放置【mysql-connector-java-5.1.41.jar】驱动包到以下位置:
  4.         1- 放置位置一: 当spark-submit提交的运行环境为Spark集群环境的时候,以及运行模式为local, 默认从 spark的jars目录下加载相关的jar包,
  5.                 目录位置: /export/server/spark/jars
  6.        
  7.         2- 放置位置二: 当我们使用pycharm运行代码的时候, 基于python的环境来运行的, 需要在python的环境中可以加载到此jar包
  8.                 目录位置:
  9.                         /root/anaconda3/lib/python3.8/site-packages/pyspark/jars/
  10.        
  11.         3- 放置位置三: 当我们提交选择的on yarn模式 需要保证此jar包在HDFS上对应目录下
  12.                 hdfs的spark的jars目录下:  hdfs://node1:8020/spark/jars
  13.                
  14.         请注意: 以上三个位置, 主要是用于放置一些 spark可能会经常使用的jar包, 对于一些不经常使用的jar包, 在后续spark-submit 提交运行的时候, 会有专门的处理方案:  spark-submit --jars  ....
复制代码
可能出现的错误三:


  1. 原因:将中文输出到了数据表中
  2. 解决办法:
  3. 1- 数据库连接要加上:  ?useUnicode=true&characterEncoding=utf-8
  4. 2- 创建数据库的时候需要指定编码: character set utf8
复制代码
  1.     # 最终连接数据代码如下:
  2.     etldf.write.jdbc(
  3.        url='jdbc:mysql://node1:3306/spark_db1?useUnicode=true&characterEncoding=utf-8',
  4.        table='student',
  5.        mode='append',
  6.        sql={'user':'root','password':'123456'}
  7.     )
复制代码
三、Spark SQL的综合案例(掌握)

1、常见DSL代码整理

分类格式含义示例API/方法select查询字段select(‘id1’, ‘id2’)where对数据过滤where(‘avg_score>3’)groupBy对数据分组groupBy(‘userid’)orderBy对数据排序orderBy(‘cnt’, ascending=False)limit取前几条数据orderBy(‘cnt’, ascending=False).limit(1)agg聚合操作,里面可以写多个聚合表达式agg(F.round(F.avg(‘score’), 2).alias(‘avg_score’))show打印数据init_df.show()printSchema打印数据的schema信息,也就是元数据信息init_df.printSchema()alias对字段取别名F.count(‘movieid’).alias(‘cnt’)join关联2个DataFrameetl_df.join(avg_score_dsl_df, ‘movieid’)withColumn基于目前的数据产生一个新列init_df.withColumn(‘word’,F.explode(F.split(‘value’, ’ ')))dropDuplicates删除重复数据init_df.dropDuplicates(subset=[“id”,“name”])dropna删除缺失值init_df.dropna(thresh=2,subset=[“name”,“age”,“address”])fillna替换缺失值init_df.fillna(value={“id”:111,“name”:“未知姓名”,“age”:100,“address”:“北京”})first取DataFrame中的第一行数据over创建一个窗口列窗口partitionBy对数据分区orderBy对数据排序orderBy(F.desc(‘pv’))函数row_number行号。从1开始编号desc降序排序avg计算均值count计数round保存小数位col将字段包装成Column对象,一般用于对新列的包装
  1. 1- 什么使用使用select(),什么时候使用groupBy()+agg()/select()实现聚合?:如果不需要对数据分组,那么可以直接使用select()实现聚合;如果有分组操作,需要使用groupBy()+agg()/select(),推荐使用agg()
  2.         
  3. 2- first(): 如果某个DataFrame中只有一行数据,并且不使用join来对比数据,那么一般需要使用first()明确指定和第一行进行比较
  4.    
  5. 3- F.col(): 对于在计算过程中临时产生的字段,需要使用F.col()封装成Column对象,然后去使用
复制代码


  • API/方法:是由DataFrame来调用
  • 函数:需要先通过import pyspark.sql.functions as F导入,利用F调用。Spark SQL内置提供的函数https://spark.apache.org/docs/3.1.2/api/sql/index.html
  • 窗口:需要先通过from pyspark.sql import Window导入
2、电影分析案例

需求说明:

数据集的介绍:

  1. 数据说明 :  userid,movieid,score,datestr
  2. 字段的分隔符号为:  \t
复制代码

需求分析:



  • 需求一: 查询用户平均分
           需求分析:
        维度:用户
    指标:平均分
  • 需求二: 查询每部电影的平均分(课后作业,将自己对需求的分析步调以文字的形式放在代码中)
  • 需求三: 查询大于平均分的电影的数量
           需求分析:
        1- 统计全部打分的平均分,这个结果就是一个数字
        2- 统计每部电影各自的平均分
        3- 查询大于平均分的电影的数量
  • 需求四: 查询高分电影中(电影平均分大于3)打分次数最多的用户, 并且求出此人全部的打分记载中, 打的平均分是多少
           需求分析:
        1- 筛选出高分电影。统计每部电影的平均分,再过滤出>3分的电影信息
        2- 从高分电影中,统计每个用户分别打了多少次分。再选择TOP1的用户
        3- 统计该用户全部打分记载的平均分
  • 需求五: 查询每个用户的平均打分, 最低打分, 最高打分(课后作业)
  • 需求六: 查询被评分超过100次的电影的平均分 排名 TOP10(课后作业)
一三四需求实现代码:
  1. # 导包
  2. import os
  3. from pyspark.sql import SparkSession,functions as F
  4. # 绑定指定的python解释器
  5. os.environ['SPARK_HOME'] = '/export/server/spark'
  6. os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  8. def demo1_get_user_avg_score():
  9.     # 方式1: SQL方式
  10.     spark.sql(
  11.         'select userid,round(avg(score),3) as user_avg_score from movie group by userid'
  12.     ).show()
  13.     # 方式2: DSL方式
  14.     etldf.groupBy('userid').agg(
  15.         F.round(F.avg('score'), 3).alias('user_avg_score')
  16.     ).show()
  17. def demo2_get_lag_avg_movie_cnt():
  18.     # 方式1: SQL
  19.     spark.sql(
  20.         """
  21.         select count(1) as cnt from (
  22.             select movieid,avg(score) as movie_avg_score from movie group by movieid
  23.             having movie_avg_score > (select avg(score) as all_avg_score from movie)  
  24.         ) t
  25.         """
  26.     ).show()
  27.     # 方式2: DSL
  28.     # col():把临时结果作为新列使用   first():取第一个值
  29.     etldf.groupBy('movieid').agg(
  30.         F.avg('score').alias('movie_avg_score')
  31.     ).where(
  32.         F.col('movie_avg_score') > etldf.select(F.avg('score').alias('all_avg_score')).first()['all_avg_score']
  33.     ).agg(
  34.         F.count('movieid').alias('cnt')
  35.     ).show()
  36. def demo3_get_top1_user_avg_sql():
  37.     # 方式1: SQL
  38.     # ①先查询高分电影:
  39.     spark.sql(
  40.         "select movieid,avg(score) as movie_avg_score from movie group by movieid having movie_avg_score > 3"
  41.     ).createTempView('hight_score_tb')
  42.     # ②再求打分次数最多的用户(先不考虑并列,只取最大1个)
  43.     spark.sql(
  44.         "select userid,count(1) as cnt from hight_score_tb h join movie m on h.movieid = m.movieid group by userid order by cnt desc limit 1"
  45.     ).createTempView('top1_user_tb')
  46.     # ③最后求此人所有打分的平均分
  47.     spark.sql(
  48.         "select avg(score) as top1_user_avg from movie where userid = (select userid from top1_user_tb)"
  49.     ).show()
  50. def demo3_get_top1_user_avg_dsl():
  51.     # ①先查询高分电影:
  52.     hight_score_df = etldf.groupBy('movieid').agg(
  53.         F.avg('score').alias('movie_avg_score')
  54.     ).where('movie_avg_score>3')
  55.     # ②再求打分次数最多的用户(先不考虑并列,只取最大1个)
  56.     top1_user_df = hight_score_df.join(etldf, on=hight_score_df['movieid'] == etldf['movieid']) \
  57.         .groupBy('userid').agg(F.count('userid').alias('cnt')) \
  58.         .orderBy('cnt', ascending=False).limit(1)
  59.     # ③最后求此人所有打分的平均分
  60.     etldf.where(
  61.         etldf['userid'] == top1_user_df.first()['userid']
  62.     ).agg(
  63.         F.avg('score').alias('top1_user_avg')
  64.     ).show()
  65. # 创建main函数
  66. if __name__ == '__main__':
  67.     # 1.创建SparkContext对象
  68.     spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()
  69.     # 2.数据输入
  70.     df = spark.read.csv(
  71.         schema='userid string,movieid string,score int,datestr string',
  72.         sep='\t',
  73.         path='file:///export/data/spark_project/spark_sql/data/u.data'
  74.     )
  75.     print(df.count())
  76.     # 3.数据处理(切分,转换,分组聚合)
  77.     etldf = df.dropDuplicates().dropna()
  78.     print(etldf.count())
  79.     # 4.数据分析
  80.     # 方便后续所有SQL方式使用,提前创建临时视图作为表
  81.     etldf.createTempView('movie')
  82.     # 需求1: 查询用户的平均分
  83.     # demo1_get_user_avg_score()
  84.     # 需求3: 查询大于平均分的电影的数量
  85.     # demo2_get_lag_avg_movie_cnt()
  86.     # 需求4: 查询高分电影(平均分>3)中,打分次数最多的用户,并求出此人所有打分的平均分
  87.     # 方式1: SQL
  88.     demo3_get_top1_user_avg_sql()
  89.     # 方式2: DSL
  90.     demo3_get_top1_user_avg_dsl()
  91.     # 5.数据输出
  92.     # 6.关闭资源
  93.     spark.stop()
复制代码
附录: 题目
可能出现的错误一:

  1. 原因: 是使用withColumn产生新列,但是表达式中有聚合的操作。缺少groupBy调用
复制代码
可能出现的错误二:

错误缘故原由:DataFrame结果是单行的情况,列值获取错误

解决办法:
  1. 将df_total_avg_score['total_avg_score']改成df_total_avg_score.first()['total_avg_score']
复制代码
可能遇到的错误三:

缘故原由:对于在计算过程中临时产生的字段,需要利用F.col封装成Column对象
解决办法:F.col(‘avg_score’)
四、Spark SQL函数定义

1、窗口函数

回首之前学习过的窗口函数:
  1. 分析函数 over(partition by xxx order by xxx [asc|desc] [rows between xxx and xxx])
  2. 分析函数可以大致分成如下3类:
  3. 1- 第一类: 聚合函数 sum() count() avg() max() min()
  4. 2- 第二类: row_number() rank() dense_rank() ntile()
  5. 3- 第三类: first_value() last_value() lead() lag()
复制代码
在Spark SQL中利用窗口函数案例:
需求是找出每个cookie中pv排在前3位的数据,也就是分组取TOPN题目
  1. from pyspark import SparkConf, SparkContext
  2. import os
  3. from pyspark.sql import SparkSession
  4. import pyspark.sql.functions as F
  5. from pyspark.sql import Window as win
  6. # 绑定指定的Python解释器
  7. os.environ['SPARK_HOME'] = '/export/server/spark'
  8. os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  9. os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  10. if __name__ == '__main__':
  11.     # 1- 创建SparkSession对象
  12.     spark = SparkSession.builder\
  13.         .config('spark.sql.shuffle.partitions',1)\
  14.         .appName('sparksql_win_function')\
  15.         .master('local[*]')\
  16.         .getOrCreate()
  17.     # 2- 数据输入
  18.     init_df = spark.read.csv(
  19.         path='file:///export/data/gz16_pyspark/02_spark_sql/data/cookie.txt',
  20.         schema='cookie string,datestr string,pv int',
  21.         sep=',',
  22.         encoding='UTF-8'
  23.     )
  24.     init_df.createTempView('win_data')
  25.     init_df.show()
  26.     init_df.printSchema()
  27.     # 3- 数据处理
  28.     # SQL
  29.     spark.sql("""
  30.         select
  31.             cookie,datestr,pv
  32.         from (
  33.             select
  34.                 cookie,datestr,pv,
  35.                 row_number() over (partition by cookie order by pv desc) as rn
  36.             from win_data
  37.         ) tmp where rn<=3
  38.     """).show()
  39.     # DSL
  40.     """
  41.         select:注意点,结果中需要看到哪几个字段,就要明确写出来
  42.     """
  43.     init_df.select(
  44.         "cookie","datestr","pv",
  45.         F.row_number().over(win.partitionBy('cookie').orderBy(F.desc('pv'))).alias('rn')
  46.     ).where('rn<=3').select("cookie","datestr","pv").show()
  47.     # 4- 数据输出
  48.     # 5- 释放资源
  49.     spark.stop()
复制代码
运行结果截图:

2、SQL函数分类

SQL函数,主要分为以下三大类:


  • UDF函数:用户自定义函数

    • 特点:一对一,输入一个得到一个
    • 比方:split() substr()
    • 在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,实在就是实现一个方法;

  • UDAF函数:用户自定义聚合函数

    • 特点:多对一,输入多个得到一个
    • 比方:sum() avg() count() min()

  • UDTF函数:用户自定义表数据生成函数

    • 特点:一对多,输入一个得到多个
    • 比方:explode()

   

  • 简单来说:SQL函数中的UDF、UDAF和UDTF就像是“数据处理的三剑客”,分别善于“单点突破”、“团队协作”和“多点开花”,满足不同的数据处理需求。
  • 具体而言

    • UDF(用户自定义函数)

      • 比喻:像是“单兵作战”,一对一处理数据。
      • 特点:输入一个值,输出一个值。
      • 示例:split()(字符串分割)、substr()(字符串截取)。
      • 实现:在Hive中继承UDF类,实现evaluate方法。

    • UDAF(用户自定义聚合函数)

      • 比喻:像是“团队协作”,多对一聚合数据。
      • 特点:输入多行数据,输出一个聚合结果。
      • 示例:sum()(求和)、avg()(平均值)、count()(计数)。
      • 实现:在Hive中继承UDAF类,实现init()、iterate()、merge()等方法。

    • UDTF(用户自定义表生成函数)

      • 比喻:像是“多点开花”,一对多展开数据。
      • 特点:输入一行数据,输出多行数据。
      • 示例:explode()(将数组或Map展开为多行)。
      • 实现:在Hive中继承GenericUDTF类,实现initialize()、process()等方法。


  • 实际生产场景

    • 在数据清洗中,利用UDF对字符串进行格式化处理。
    • 在数据分析中,利用UDAF计算复杂的业务指标,如加权平均值。
    • 在数据展开中,利用UDTF将嵌套的JSON数据拆分为多行,便于后续分析。

  • 总之:UDF、UDAF和UDTF是SQL中强大的扩展工具,分别适用于“一对一”、“多对一”和“一对多”的数据处理场景,为业务需求提供了机动的支持。
  在SQL中提供的全部的内置函数,都是属于以上三类中某一类函数
思考:有这么多的内置函数,为啥还需要自定义函数呢?
  1. 为了扩充函数功能。在实际使用中,并不能保证所有的操作函数都已经提前的内置好了。很多基于业务处理的功能,其实并没有提供对应的函数,提供的函数更多是以公共功能函数。此时需要进行自定义,来扩充新的功能函数
复制代码


  1. 1- SparkSQL原生的时候,Python只能开发UDF函数
  2. 2- SparkSQL借助其他第三方组件,Python可以开发UDF、UDAF函数
复制代码
​ 在Spark SQL中,针对Python语言,对于自定义函数,原生支持的并不是特别好。目前原生仅支持自定义UDF函数,而无法自定义UDAF函数和UDTF函数。
​ 在1.6版本后,Java 和scala语言支持自定义UDAF函数,但Python并不支持。
Spark SQL原生存在的题目:大量的序列化和反序列

  1. 虽然Python支持自定义UDF函数,但是其效率并不是特别的高效。因为在使用的时候,传递一行处理一行,返回一行的方式。这样会带来非常大的序列化的开销的问题,导致原生UDF函数效率不好
  2.        
  3. 早期解决方案: 基于Java/Scala来编写自定义UDF函数,然后基于python调用即可
  4.        
  5. 目前主要的解决方案: 引入Arrow框架,可以基于内存来完成数据传输工作,可以大大的降低了序列化的开销,提供传输的效率,解决原生的问题。同时还可以基于pandas的自定义函数,利用pandas的函数优势完成各种处理操作
复制代码
3、Spark原生自定义UDF函数

自定义函数流程:
  1. 第一步: 在PySpark中创建一个Python的函数,在这个函数中书写自定义的功能逻辑代码即可
  2. 第二步: 将Python函数注册到Spark SQL中
  3.         注册方式一: udf对象 = sparkSession.udf.register(参数1,参数2,参数3)
  4.                 参数1: 【UDF函数名称】,此名称用于后续在SQL中使用,可以任意取值,但是要符合名称的规范
  5.                 参数2: 【自定义的Python函数】,表示将哪个Python的函数注册为Spark SQL的函数
  6.                 参数3: 【UDF函数的返回值类型】。用于表示当前这个Python的函数返回的类型
  7.                 udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用
  8.                 注意: 如果通过方式一来注册函数, 【可以用在SQL和DSL】
  9.        
  10.         注册方式二:  udf对象 = F.udf(参数1,参数2)
  11.                 参数1: Python函数的名称,表示将那个Python的函数注册为Spark SQL的函数
  12.                 参数2: 返回值的类型。用于表示当前这个Python的函数返回的类型
  13.                 udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用
  14.                 注意: 如果通过方式二来注册函数,【仅能用在DSL中】
  15.                
  16.         注册方式三:  语法糖写法  @F.udf(returnType=返回值类型)  放置到对应Python的函数上面
  17.                 说明: 实际是方式二的扩展。如果通过方式三来注册函数,【仅能用在DSL中】
  18.                
  19. 第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可
复制代码
自定义演示一:
请自定义一个函数,完成对数据同一添加一个后缀名的操作
  1. from pyspark import SparkConf, SparkContext
  2. import os
  3. from pyspark.sql import SparkSession
  4. import pyspark.sql.functions as F
  5. # 绑定指定的Python解释器
  6. from pyspark.sql.types import StringType
  7. os.environ['SPARK_HOME'] = '/export/server/spark'
  8. os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  9. os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  10. if __name__ == '__main__':
  11.     print("请自定义一个函数,完成对数据统一添加一个后缀名的操作_itheima")
  12.     # 1- 创建SparkSession对象
  13.     spark = SparkSession.builder\
  14.         .config("spark.sql.shuffle.partitions",1)\
  15.         .appName('sparksql_udf_basetype')\
  16.         .master('local[*]')\
  17.         .getOrCreate()
  18.     # 2- 数据输入
  19.     init_df = spark.createDataFrame(
  20.         data=[(1,'张三','广州'),(2,'李四','深圳')],
  21.         schema='id int,name string,address string'
  22.     )
  23.     init_df.printSchema()
  24.     init_df.show()
  25.     init_df.createTempView('tmp')
  26.     # 3- 数据处理
  27.     # 3.1- 创建自定义的Python函数
  28.     def add_suffix(address):
  29.         return address + "_itheima"
  30.     # 3.2- 将Python函数注册到Spark SQL
  31.     # 注册方式一
  32.     dsl_add_suffix = spark.udf.register('sql_add_suffix',add_suffix,StringType())
  33.     # 3.3- 在SQL/DSL中调用
  34.     # SQL
  35.     spark.sql("""
  36.         select
  37.             id,name,address,
  38.             sql_add_suffix(address) as new_address
  39.         from tmp
  40.     """).show()
  41.     # DSL
  42.     init_df.select(
  43.         "id",
  44.         "name",
  45.         "address",
  46.         dsl_add_suffix("address").alias("new_address")
  47.     ).show()
  48.     print("-"*30)
  49.     # 在错误的地方调用了错误的函数。spark.udf.register参数1取的函数名只能在SQL中使用,不能在DSL中用。
  50.     # spark.sql("""
  51.     #     select
  52.     #         id,name,address,
  53.     #         dsl_add_suffix(address) as new_address
  54.     #     from tmp
  55.     # """).show()
  56.     # 注册方式二:UDF返回值类型传值方式一
  57.     dsl2_add_suffix = F.udf(add_suffix,StringType())
  58.     # DSL
  59.     init_df.select(
  60.         "id",
  61.         "name",
  62.         "address",
  63.         dsl2_add_suffix("address").alias("new_address")
  64.     ).show()
  65.     # 注册方式二:UDF返回值类型传值方式二
  66.     dsl3_add_suffix = F.udf(add_suffix, 'string')
  67.     # DSL
  68.     init_df.select(
  69.         "id",
  70.         "name",
  71.         "address",
  72.         dsl3_add_suffix("address").alias("new_address")
  73.     ).show()
  74.     # 注册方式三:语法糖/装饰器
  75.     @F.udf(returnType=StringType())
  76.     def add_suffix_candy(address):
  77.         return address + "_itheima"
  78.     # DSL
  79.     init_df.select(
  80.         "id",
  81.         "name",
  82.         "address",
  83.         add_suffix_candy("address").alias("new_address")
  84.     ).show()
  85.     # 4- 数据输出
  86.     # 5- 释放资源
  87.     spark.stop()
复制代码
运行结果截图:

可能遇到的题目:

  1. 原因: 在错误的地方调用了错误的函数。spark.udf.register参数1取的函数名只能在SQL中使用,不能在DSL中用。
复制代码
自定义演示二:
自定义UDF函数,让其返回值类型为复杂类型: 字典 列表 元组
  1. 1- 返回列表和元组,表现一致
  2. 2- 返回字典。需要注意,字典中的key需要和schema中的字段名保持一致,否则获取不到值,以null填充
复制代码
  1. from pyspark import SparkConf, SparkContext
  2. import os
  3. from pyspark.sql import SparkSession
  4. import pyspark.sql.functions as F
  5. # 绑定指定的Python解释器
  6. from pyspark.sql.types import StringType, StructType
  7. os.environ['SPARK_HOME'] = '/export/server/spark'
  8. os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  9. os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  10. if __name__ == '__main__':
  11.     # 1- 创建SparkSession对象
  12.     spark = SparkSession.builder\
  13.         .config("spark.sql.shuffle.partitions",1)\
  14.         .appName('sparksql_udf_compaxtype')\
  15.         .master('local[*]')\
  16.         .getOrCreate()
  17.     # 2- 数据输入
  18.     init_df = spark.createDataFrame(
  19.         data=[(1,'张三 广州'),(2,'李四 深圳')],
  20.         schema='id int,name_address string'
  21.     )
  22.     init_df.printSchema()
  23.     init_df.show()
  24.     init_df.createTempView('tmp')
  25.     # 3- 数据处理
  26.     # 3.1- 创建自定义的Python函数
  27.     def my_split(name_address):
  28.         data_list = name_address.split(' ')
  29.         # 返回列表
  30.         # return data_list
  31.         # 返回元组。与列表表现一致
  32.         # return tuple(data_list)
  33.         # 返回字典。需要注意,字典中的key需要和schema中的字段名保持一致,否则获取不到值,以null填充
  34.         # return {"n": data_list[0], "a": data_list[1]}
  35.         return {"name":data_list[0],"address":data_list[1]}
  36.     # 3.2- 将Python函数注册到Spark SQL
  37.     schema = StructType().add("name",StringType()).add("address",StringType())
  38.     dsl_my_split = spark.udf.register('sql_my_split',my_split,schema)
  39.     # 3.3- 在SQL/DSL中调用
  40.     # SQL方式
  41.     sql_result = spark.sql("""
  42.         select
  43.             id,name_address,
  44.             sql_my_split(name_address) as new_field,
  45.             sql_my_split(name_address)['name'] as name,
  46.             sql_my_split(name_address)['address'] as address
  47.         from tmp
  48.     """)
  49.     sql_result.printSchema()
  50.     sql_result.show()
  51.     # DSL方式
  52.     init_df.select(
  53.         "id",
  54.         "name_address",
  55.         dsl_my_split("name_address").alias("new_field"),
  56.         dsl_my_split("name_address")['name'].alias("name"),
  57.         dsl_my_split("name_address")['address'].alias("address")
  58.     ).show()
  59.     # 4- 数据输出
  60.     # 5- 释放资源
  61.     spark.stop()
复制代码
运行结果截图:

01_数据清洗的dropDuplicates函数.py

dropDuplicates 行来看,保存第一个重复值,有subset也是

  1. # 导包
  2. import os
  3. from pyspark.sql import SparkSession
  4. # 解决JAVA_HOME 未设置问题
  5. os.environ['SPARK_HOME'] = '/export/server/spark'
  6. os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  8. if __name__ == '__main__':
  9.     # 先创建spark session对象
  10.     spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()
  11.     # 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象
  12.     sc = spark.sparkContext
  13.     # 先读取数据生成df对象
  14.     df = spark.read.csv(
  15.         path='file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/clear_data.csv',
  16.         sep=',',
  17.         header=True
  18.     )
  19.     # 测试是否有数据
  20.     print(df.count())
  21.     df.show()
  22.     print('-------------------------------------------------------------')
  23.     # 对df数据清洗
  24.     # dropDuplicates(): 删除重复数据,默认整行一样就删除
  25.     df1 = df.dropDuplicates(subset=['id','name'])
  26.     # 测试数据是否清洗
  27.     print(df1.count())
  28.     df1.show()
  29.     print('-------------------------------------------------------------')
  30.     # dropDuplicates(): 删除重复数据,默认整行一样就删除
  31.     # 参数subset: 指定删除哪几列数据重复,默认整行一样就删除
  32.     # 测试数据是否清洗
  33.     # 注意: 最后一定释放资源
  34.     sc.stop()
  35.     spark.stop()
  36.    
  37. ### dropDuplicates ==行来看,保留第一个重复值,有subset也是==
复制代码

结果


02_数据清洗的dropna函数.py

dropna 行来看,how默认=any行中有空就删行,how='all’行中全部都是空才删行,subset=[‘name’]列中有空删行,thresh=3最少有三个 正常值(非空值)

  1. # 导包
  2. import os
  3. from pyspark.sql import SparkSession
  4. # 解决JAVA_HOME 未设置问题
  5. os.environ['SPARK_HOME'] = '/export/server/spark'
  6. os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  8. if __name__ == '__main__':
  9.     # 先创建spark session对象
  10.     spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()
  11.     # 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象
  12.     sc = spark.sparkContext
  13.     # 先读取数据生成df对象
  14.     # 注意: 如果读取的文件首行是表头,可以使用header=True,把首行作为表头使用
  15.     df = spark.read.csv(
  16.         path='file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/clear_data.csv',
  17.         sep=',',
  18.         header=True
  19.     )
  20.     # 测试是否有数据
  21.     print(df.count())
  22.     df.show()
  23.     print('-------------------------------------------------------------')
  24.     # 对df数据清洗
  25.     # dropna() : 默认删除有空值的行
  26.     df1 = df.dropna()
  27.     # 测试是否数据被清洗
  28.     print(df1.count())
  29.     df1.show()
  30.     print('-------------------------------------------------------------')
  31.     # 对df数据清洗
  32.     # dropna() : 默认删除有空值的行
  33.     # how参数: how默认=any行中有空就删行,how='all'行中全部都是空才删行
  34.     df2 = df.dropna(how='all')
  35.     # 测试是否数据被清洗
  36.     print(df2.count())
  37.     df2.show()
  38.     print('-------------------------------------------------------------')
  39.     # 对df数据清洗
  40.     # dropna() : 默认删除有空值的行
  41.     # subset参数: 指定哪几列有空值就删除对应行
  42.     df3 = df.dropna(subset=['name'])
  43.     # 测试是否数据被清洗
  44.     print(df3.count())
  45.     df3.show()
  46.     print('-------------------------------------------------------------')
  47.     # 对df数据清洗
  48.     # dropna() : 默认删除有空值的行
  49.     # thresh参数: 表示至少有thresh个非空值才保留,默认是1
  50.     df3 = df.dropna(thresh=3)
  51.     # 测试是否数据被清洗
  52.     print(df3.count())
  53.     df3.show()
  54.     # 注意: 最后一定释放资源
  55.     sc.stop()
  56.     spark.stop()
  57. ### dropna ==行来看,how默认=any行中有空就删行,how='all'行中全部都是空才删行,subset=['name']列中有空删行,thresh=3最少有三个 正常值(非空值)==
复制代码

结果


03_数据清洗的fillna函数.py

fillna 列来看,value用指定值添补空值

  1. # 导包
  2. import os
  3. from pyspark.sql import SparkSession
  4. # 解决JAVA_HOME 未设置问题
  5. os.environ['SPARK_HOME'] = '/export/server/spark'
  6. os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  8. if __name__ == '__main__':
  9.     # 先创建spark session对象
  10.     spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()
  11.     # 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象
  12.     sc = spark.sparkContext
  13.     # 读取文件生成df对象
  14.     df = spark.read.csv(
  15.         path="file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/clear_data.csv",
  16.         sep=',',
  17.         header=True
  18.     )
  19.     df.show()
  20.     print('--------------------------------------------------------------------------------')
  21.     # fillna(): 默认用指定值填充所有空值
  22.     # value: 制定单值
  23.     df2 = df.fillna(value='未知')
  24.     df2.show()
  25.     print('--------------------------------------------------------------------------------')
  26.     # value: 制定单值
  27.     # subset: 指定哪几列有空值就填充对应列
  28.     df3 = df.fillna(value='未知', subset=['name', 'age'])
  29.     df3.show()
  30.     print('--------------------------------------------------------------------------------')
  31.     # value: 制定字典(每个字段都能单独指定值)
  32.     # 通过字典方式填充空值,k就是列名,v就是要填充的值
  33.     df4 = df.fillna(value={'name': '未知', 'age': 0, 'address': '深圳'})
  34.     df4.show()
  35.     # 注意: 最后一定释放资源
  36.     sc.stop()
  37.     spark.stop()
  38. ### fillna ==列来看,value用指定值填充空值==
复制代码

结果


04_清洗后数据存储到文件.py

  1. # 导包
  2. import os
  3. from pyspark.sql import SparkSession
  4. # 解决JAVA_HOME 未设置问题
  5. os.environ['SPARK_HOME'] = '/export/server/spark'
  6. os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  8. if __name__ == '__main__':
  9.     # 先创建spark session对象
  10.     spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()
  11.     # 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象
  12.     sc = spark.sparkContext
  13.     # 读取原始未清洗文件生成df对象
  14.     df = spark.read.csv(
  15.         path="file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/clear_data.csv",
  16.         sep=',',
  17.         header=True
  18.     )
  19.     # 清洗df数据
  20.     etl_df = df.dropDuplicates().fillna('未知')
  21.     # 输出清洗后的数据 方式1: 各种文件   方式2: 数据库
  22.     # 先演示 输出到文件
  23.     # 注意: 路径可以hdfs可以linux本地,分隔符也可以自定义
  24.     etl_df.write.csv(
  25.         path='hdfs://node1:8020/output',
  26.         sep='\001',
  27.         header=True,
  28.         mode='overwrite',
  29.         encoding='utf-8'
  30.     )
  31.     data = "John\00125\001Male\001New York"
  32.     fields = data.split('\001')
  33.     print(fields)  # 输出:['John', '25', 'Male', 'New York']
  34.     # 注意: 最后一定释放资源
  35.     sc.stop()
  36.     spark.stop()
复制代码
结果



05_清洗后数据存储到mysql数据库.py

  1. # 导包
  2. import os
  3. from pyspark.sql import SparkSession
  4. # 解决JAVA_HOME 未设置问题
  5. os.environ['SPARK_HOME'] = '/export/server/spark'
  6. os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  8. if __name__ == '__main__':
  9.     # 先创建spark session对象
  10.     spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()
  11.     # 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象
  12.     sc = spark.sparkContext
  13.     # 读取原始未清洗文件生成df对象
  14.     df = spark.read.csv(
  15.         path="file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/clear_data.csv",
  16.         sep=',',
  17.         header=True
  18.     )
  19.     # 清洗df数据
  20.     etl_df = df.dropDuplicates().fillna('未知')
  21.     # 输出清洗后的数据 方式1: 各种文件   方式2: 数据库
  22.     # 再演示 输出到数据库
  23.     # 注意: 输出到数据库,需要先在spark中配置jdbc驱动,否则会报错
  24.     etl_df.write.jdbc(
  25.         url="jdbc:mysql://node1:3306/数据库名字?useUnicode=true&characterEncoding=utf-8",
  26.         table="clear_data_etl_tb2",
  27.         mode="append",
  28.         properties={
  29.             "user": "root",
  30.             "password": "123456"
  31.         }
  32.     )
  33.        ###############################################################################################
  34.     etl_df.write.jdbc(
  35.         url="jdbc:mysql://{host}:{port}/{db_name}?useUnicode=true&characterEncoding=utf-8",
  36.         table="表名",  # 替换表名,例如 "clear_data_etl_tb2"
  37.         mode="模式",  # 替换模式,例如 "append" 或 "overwrite"
  38.         properties={
  39.             "user": "用户",  # 替换用户名,例如 "root"
  40.             "password": "密码"  # 替换密码,例如 "123456"
  41.         }
  42.     )
  43.     ################################################################################################
  44.     # 注意: 最后一定释放资源
  45.     sc.stop()
  46.     spark.stop()
复制代码
结果


07_清洗后数据存储到文件_优化分区数.py

  1. # 导包
  2. import os
  3. import time
  4. from pyspark.sql import SparkSession
  5. # 解决JAVA_HOME 未设置问题
  6. os.environ['SPARK_HOME'] = '/export/server/spark'
  7. os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  8. os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  9. if __name__ == '__main__':
  10.     # 先创建spark session对象
  11.     spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()
  12.     # TODO: 任务前: 修改saprkSQL的分区数
  13.     spark.conf.set("spark.sql.shuffle.partitions", '1')
  14.     # TODO: 获取SparkSQL的分区数
  15.     print(spark.conf.get("spark.sql.shuffle.partitions"))
  16.     # 读取原始未清洗文件生成df对象
  17.     df = spark.read.csv(
  18.         path="file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/clear_data.csv",
  19.         sep=',',
  20.         header=True
  21.     )
  22.     # 清洗df数据
  23.     etl_df = df.dropDuplicates().fillna('未知')
  24.     # 输出清洗后的数据 方式1: 各种文件   方式2: 数据库
  25.     # 先演示 输出到文件
  26.     # 注意: path路径可以hdfs可以linux本地,sep分隔符也可以自定义
  27.     etl_df.write.csv(
  28.         path='hdfs://node1:8020/output1',
  29.         sep='\001',
  30.         header=True,
  31.         mode='overwrite',
  32.         encoding='utf-8'
  33.     )
  34.     # 为了查看分区效果,可以先让程序休息会儿
  35.     time.sleep(500)
  36.     # 注意: 最后一定释放资源
  37.     spark.stop()
复制代码
结果



08_电影数据分析案例.py

  1. # 导包
  2. import os
  3. from pyspark.sql import SparkSession, functions as F
  4. # 解决JAVA_HOME 未设置问题
  5. os.environ['SPARK_HOME'] = '/export/server/spark'
  6. os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  8. def get_user_avg_score():
  9.     """
  10.     需求一: 统计每个用户的平均评分
  11.     """
  12.     # 方式1: SQL风格
  13.     spark.sql("""
  14.         select user_id,round(avg(score),2) as avg_score
  15.         from movie_tb
  16.         group by user_id
  17.     """).show()
  18.     # 方式2: DSL风格
  19.     movie_df.groupby("user_id").agg(
  20.         F.round(F.avg("score"), 2).alias("avg_score")
  21.     ).show()
  22. def get_big_avg_score_movie_cnt():
  23.     """
  24.     需求: 获取大于平均分电影数量
  25.         1- 统计所有打分的平均分,这个结果就是一个数字
  26.         2- 统计每部电影各自的平均分
  27.         3- 查询大于平均分的电影的数量
  28.     """
  29.     # 方式1: SQL风格
  30.     spark.sql("""
  31.         with t as (
  32.             select movie_id,avg(score) from movie_tb group by movie_id
  33.                                             having avg(score) > (select avg(score) as all_movie_avg_score from movie_tb)
  34.         )
  35.         select count(1) as cnt from t
  36.     """).show()
  37.     # 方式2: DSL风格
  38.     movie_df.groupby("movie_id").agg(
  39.         F.avg("score").alias("movie_avg_score")
  40.     ).where(
  41.         F.col("movie_avg_score") > movie_df.select(F.avg("score").alias("all_movie_avg_score")).first()[
  42.             'all_movie_avg_score']
  43.     ).agg(
  44.         F.count('movie_avg_score').alias("cnt")
  45.     ).show()
  46. def get_big_score_top1_user_avg_score_sql():
  47.     """
  48.     需求四: 查询高分电影中(电影平均分大于3)打分次数最多的用户, 并且求出此人所有的打分记录中, 打的平均分是多少
  49.         1- 筛选出高分电影。统计每部电影的平均分,再过滤出>3分的电影信息
  50.         2- 从高分电影中,统计每个用户分别打了多少次分。再选择TOP1的用户
  51.         3- 统计该用户所有打分记录的平均分
  52.     """
  53.     # 1 - 筛选出高分电影。统计每部电影的平均分,再过滤出 > 3分的电影信息
  54.     spark.sql("""
  55.         select movie_id,avg(score) as movie_avg_score
  56.         from movie_tb
  57.         group by movie_id
  58.         having movie_avg_score > 3
  59.     """).createTempView("high_score_movie_tb")
  60.     # 2 - 从高分电影中,统计每个用户分别打了多少次分。再选择TOP1的用户(不考虑并列)
  61.     spark.sql("""
  62.         select user_id,count(*) as cnt
  63.         from high_score_movie_tb h
  64.                        join movie_tb m on h.movie_id = m.movie_id
  65.         group by user_id
  66.         order by cnt desc
  67.         limit 1
  68.     """).createTempView("top1_user_tb")
  69.     # 3 - 统计该用户所有打分记录的平均分
  70.     spark.sql("""
  71.         select user_id,round(avg(score),2) as avg_score
  72.         from movie_tb where user_id = (select user_id from top1_user_tb)
  73.         group by user_id
  74.     """).show()
  75. def get_big_score_top1_user_avg_score_dsl():
  76.     """
  77.     需求四: 查询高分电影中(电影平均分大于3)打分次数最多的用户, 并且求出此人所有的打分记录中, 打的平均分是多少
  78.         1- 筛选出高分电影。统计每部电影的平均分,再过滤出>3分的电影信息
  79.         2- 从高分电影中,统计每个用户分别打了多少次分。再选择TOP1的用户
  80.         3- 统计该用户所有打分记录的平均分
  81.     """
  82.     # 1 - 筛选出高分电影。统计每部电影的平均分,再过滤出 > 3分的电影信息
  83.     high_score_movie_df = movie_df.groupby("movie_id").agg(
  84.         F.avg("score").alias('movie_avg_score')
  85.     ).where(
  86.         F.col("movie_avg_score") > 3
  87.     )
  88.     # 2 - 从高分电影中,统计每个用户分别打了多少次分。再选择TOP1的用户
  89.     top1_user_df = high_score_movie_df.join(movie_df, on="movie_id", how="inner").groupby("user_id").agg(
  90.         F.count("user_id").alias("cnt")
  91.     ).orderBy("cnt", ascending=False).limit(1)
  92.     # 3 - 统计该用户所有打分记录的平均分
  93.     movie_df.where(
  94.         F.col("user_id") == top1_user_df.first()['user_id']
  95.     ).groupby("user_id").agg(
  96.         F.round(F.avg("score"), 2).alias("avg_score")
  97.     ).show()
  98. if __name__ == '__main__':
  99.     # 先创建spark session对象
  100.     spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()
  101.     # 1.数据收集
  102.     df = spark.read.csv(
  103.         path="file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/u.data",
  104.         sep="\t",
  105.         schema="user_id string, movie_id string, score int, dt string"
  106.     )
  107.     df.show()
  108.     df.printSchema()
  109.     print(df.count())
  110.     # 2.数据处理
  111.     movie_df = df.dropDuplicates().dropna()
  112.     print(movie_df.count())
  113.     # 3.数据分析
  114.     # 因为多个需求都会用到临时表,所以提前创建临时表
  115.     movie_df.createTempView("movie_tb")
  116.     # 需求一: 统计每个用户的平均评分
  117.     # get_user_avg_score()
  118.     # 需求三: 查询大于平均分的电影的数量
  119.     # get_big_avg_score_movie_cnt()
  120.     # 需求四: 查询高分电影中(电影平均分大于3)打分次数最多的用户, 并且求出此人所有的打分记录中, 打的平均分是多少
  121.     # get_big_score_top1_user_avg_score_sql()
  122.     # get_big_score_top1_user_avg_score_dsl()
  123.     # 4.数据可视化
  124.     # 可视化本次略
  125.     # 注意: 最后一定释放资源
  126.     spark.stop()
复制代码
09_sparkSQL中应用开窗函数.py

  1. # 导包
  2. import os
  3. from pyspark.sql import SparkSession, functions as F
  4. from pyspark.sql.window import Window
  5. # 解决JAVA_HOME 未设置问题
  6. os.environ['SPARK_HOME'] = '/export/server/spark'
  7. os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  8. os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  9. if __name__ == '__main__':
  10.     # 先创建spark session对象
  11.     spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()
  12.     # 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象
  13.     sc = spark.sparkContext
  14.     # 数据收集
  15.     df = spark.read.csv(
  16.         path="file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/cookie.txt",
  17.         sep=',',
  18.         schema="cid string,dt string,pv int"
  19.     )
  20.     df.show()
  21.     # 数据处理
  22.     etldf = df.dropDuplicates().fillna('未知')
  23.     # 数据分析
  24.     # 需求: 统计每个cookie的top3的pv值
  25.     # 方式1: SQL方式
  26.     etldf.createTempView("cookie_tb")
  27.     spark.sql("""
  28.     with t as (
  29.         select cid,dt,pv,
  30.                row_number() over(partition by cid order by pv desc) as rn
  31.         from cookie_tb
  32.     )
  33.     select * from t where rn <=3
  34.         
  35.     """).show()
  36.     # 方式2: dsl方式
  37.     etldf.select(
  38.         "cid",
  39.         "dt",
  40.         "pv",
  41.         F.row_number().over(
  42.             Window.partitionBy("cid").orderBy(F.desc("pv"))
  43.         ).alias("rn")
  44.     ).where("rn <= 3").show()
  45.     # 数据可视化: 略
  46.     # 注意: 最后一定释放资源
  47.     sc.stop()
  48.     spark.stop()
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

王柳

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表