PySpark用sort-merge join解决数据倾斜的完整案例

饭宝  金牌会员 | 2025-1-11 11:47:17 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 843|帖子 843|积分 2529

假设有两个大表 table1 和 table2 ,并通过 sort-merge join 来解决大概的数据倾斜问题。
  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.functions import col
  3. # 初始化SparkSession
  4. spark = SparkSession.builder.appName("SortMergeJoinExample").getOrCreate()
  5. # 加载数据,假设数据来自parquet文件
  6. table1 = spark.read.parquet("path/to/table1.parquet")
  7. table2 = spark.read.parquet("path/to/table2.parquet")
  8. # 查看表的大小
  9. print("table1 size: ", table1.count())
  10. print("table2 size: ", table2.count())
  11. # 为了演示数据倾斜,假设我们直接使用join,这里用inner join举例
  12. joined = table1.join(table2, table1["id"] == table2["id"], "inner")
  13. # 先对连接键进行排序,为sort-merge join做准备
  14. sorted_table1 = table1.sortWithinPartitions("id")
  15. sorted_table2 = table2.sortWithinPartitions("id")
  16. # 使用sort-merge join进行连接
  17. joined = sorted_table1.join(sorted_table2, sorted_table1["id"] == sorted_table2["id"], "inner")
  18. # 触发Action,查看执行计划,此时可以去Spark WebUI查看任务执行情况
  19. joined.count()
  20. # 停止SparkSession
  21. spark.stop()
复制代码
代码解释
初始化SparkSession:创建一个SparkSession对象,这是与Spark交互的入口。
  1. spark = SparkSession.builder.appName("SortMergeJoinExample").getOrCreate()
复制代码
加载数据并查察表大小:从Parquet文件加载两张表,并打印出它们的行数,以此来相识表的规模。
  1. table1 = spark.read.parquet("path/to/table1.parquet")
  2. table2 = spark.read.parquet("path/to/table2.parquet")
  3. print("table1 size: ", table1.count())
  4. print("table2 size: ", table2.count())
复制代码
数据预处理:在进行 sort-merge join 之前,对两个表按照连接键 id 在每个分区内进行排序。
  1. sorted_table1 = table1.sortWithinPartitions("id")
  2. sorted_table2 = table2.sortWithinPartitions("id")
复制代码
实行sort-merge join:使用排序后的表,实行 sort-merge join 操作,这里选择的是内连接。
  1. joined = sorted_table1.join(sorted_table2, sorted_table1["id"] == sorted_table2["id"], "inner")
复制代码
触发Action并查察实行情况:调用 count() 方法触发一个Action,此时Spark会真正实行整个盘算流程。与此同时,可以打开Spark WebUI(通常是 http://your-spark-master:4040 ),在 Stages 页面查察使命实行筹划,尤其是查察各个阶段的数据分布情况,确认数据倾斜是否得到解决。
  1. joined.count()
复制代码
克制SparkSession:使命完成后,关闭SparkSession释放资源。
  1. spark.stop()
复制代码
要在Spark WebUI中查察数据倾斜:


  • 在实行 joined.count()
    后,敏捷打开浏览器访问Spark WebUI。进入 Stages 标签页,找到正在实行的 join 相关阶段。查察每个使命的处理数据量,假如之前存在数据倾斜,经过 sort-merge join 处理后,各个使命处理的数据量应该相对匀称。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

饭宝

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表