pyspark详细讲解

打印 上一主题 下一主题

主题 1368|帖子 1368|积分 4104



spark概念

Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。
简而言之,spark是一款分布式的计算框架,用于调度成千上万的服务器集群,计算TB、PB乃至EB级别的海量数据
spark支持众多的编程语言进行开发,而python语言,则是spark的重点支持的方向。
pyspark是spark官方开发的python语言第三方库。

pyspark基础使用:

安装pyspark 包在命令提示符中 pip install --version
首先需要一个执行代码环境的入口对象:类 SparkContext的类对象
打开pycharm 基础示范:
  1. #第一步 导包
  2. from pyspark import SparkConf,SparkContext
  3. # 创建SparkConf类对象,服务器入口,执行环境的入口对象
  4. conf = SparkConf().setMaster("local[*]").setAppName("test_spark、")
  5. #上一行为链式结构,
  6. #与下面三行作用相同
  7. """
  8. conf = SparkConf()
  9. conf.setMaster("local[*]")#服务器为本机或者是数据集群
  10. conf.setAppName("test_spark_app")#程序的名字"""
  11. #基于SparkConf类对象创建SparkContext类对象,服务器入口,执行环境的入口对象
  12. sc = SparkContext(conf=conf)#pyspark代码写入的入口,连接集群和驱动代码
  13. #打印Pyspark版本
  14. print(sc.version)
  15. #停止Pyspark程序运行
  16. sc.stop()#关闭入口
复制代码
结果为:
你所下载的pyspark第三包的版本
数据输入

RDD(弹性分布式数据集 Resilient Distributed Datassets)对象

SparkContext类对象是pyspark编程所有功能的入口,相当店肆的门,打开后才能进行售卖,制作
数据输入:
通过SparkContext类对象的成员方法完成数据的读取操作,读取后得到RDD类对象,
挑选原质料
数据处理计算:
通过RDD类对象的成员方法完成各种数据计算的需求,将原材做成面,蛋糕等成品
数据输出:
将处理完成后的RDD对象调用各种成员方法完成写出文件、转换为list等操作
相当于打包售卖

让我们来使用一下rdd对象来感受一下吧
  1. # 导包
  2. from pyspark import SparkConf,SparkContext
  3. # 创建SparkConf类对象连接服务器,设置配置
  4. conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
  5. #基于SparkConf类对象创建SparkContext类对象,服务器入口,执行环境的入口对象
  6. sc = SparkContext(conf=conf)
  7. # parallelize(数据容器)转化为RDD数据
  8. rdd1 = sc.parallelize([123,"hello",56])
  9. rdd2 = sc.parallelize((1,2,456))
  10. rdd3 = sc.parallelize("hello word")
  11. rdd4 = sc.parallelize({123,"jane",67})
  12. rdd5 = sc.parallelize({'key1': 50,'key2':60})
  13. # collect()收集数据
  14. print(rdd1.collect())#[123, 'hello', 56]
  15. print(rdd2.collect())#[1, 2, 456]
  16. print(rdd3.collect())#['h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'd']
  17. print(rdd4.collect())#[123, 67, 'jane']
  18. print(rdd5.collect())#['key1', 'key2']
  19. # 通过textFile将本地文件转化为RDD对象
  20. rdd =sc.textFile("D:\Hello.txt")
  21. print(rdd.collect())#['Hello world!', 'Meet you to nice!']
  22. #停止Pyspark程序运行
  23. sc.stop()
复制代码
此为hello.txt文件内容:

数据计算

map()

将RDD 数据一条条处理(处理逻辑取决于map吸收的处理函数),返回新的rdd
语法:
rdd.map(func)
func(): (T) -->U
():传入参数,(T):传入一个参数
-->U :返回值
(T)-->U:传入一个恣意范例的参数,返回一个返回值,范例不限
(A) -->A:传入一个恣意范例的参数,返回一个同范例参数
  1. from pyspark import SparkConf,SparkContext
  2. import os
  3. os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
  4. conf=SparkConf().setMaster("local[*]").setAppName("text_spark_app")
  5. sc =SparkContext(conf=conf)
  6. rdd = sc.parallelize([1,2,3,4,5])
  7. #map(func)传入参数为函数,rdd中的每一个数据都会执行func函数,返回RDD对象
  8. # 且func()是传入参数为T 而返回值为U,两者不同且存在
  9. rdd = rdd.map(lambda x:x*10)
  10. print(rdd.collect())#[10,20,30,40,50]
  11. sc.stop()
复制代码
如果以上map()代码出现这种报错环境:


而且为python3.13版本的,是由于pyspark与该版本不兼容,不能支持python3.13,
解决办法:
1.低落python版本
2.安装虚拟环境anaconda,在虚拟环境中安装python版本:
发起安装3.10
记得查询pyspark与哪个版本python兼容
flatMap()

rdd执行map操作,然后进行排除嵌套操作:
  1. from pyspark import SparkContext,SparkConf
  2. import os
  3. os.environ['PYSPARK_PYTHON'] ="D:conda/envs/myenv/python.exe"
  4. conf = SparkConf().setMaster("local[*]").setAppName("text_spark_app")
  5. sc = SparkConftext(conf = conf)
  6. #准备一个rdd
  7. rdd = sc.parallelize(["itheima itcast 666","itheima itheima itcast","python itheima"])
  8. rdd = rdd.map(lambda x: x.split())
  9. print(rdd.collect())#[["itheima","itcast","666"],["itheima","itheima","itcast"],["python","itheima"]]
  10. #将单词逐个取出,逐个封装到列表中
  11. rdd = rdd.flatMap(lambda x: x.split())
  12. print(rdd.collect())#["itheima","itcast","666","itheima","itheima","itcast","python","itheima"]
  13. #关闭入口
  14. ​sc.stop()
复制代码
reduceByKey()

针对KV型(->二元元组,即(10,"it"))RDD,
自动按key(二元元组的第一个元素)分组,
然后根据你提供的聚合限定,完
成组内数据value(二元元组的第二个元素)的聚合操作
语法:
rdd.reduceByKey(func)
func:(v,v) ->v 传入两个返回一个,范例相称
代码实现:
  1. from pyspark import SparkConf,SparkContext
  2. import os
  3. os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
  4. conf = SparkConf().setMaster("local[*]").setAppName("test_app")
  5. sc = SparkContext(conf = conf)
  6. rdd = sc.parallelize([('a',1),('b',5),('a',5),('b',9)])
  7. #以元组的第一个进行分组,以第二元素进行相加
  8. rdd = rdd.reduceByKey(lambda a,b:a+b)
  9. print(rdd.collect())
  10. #[('b', 14), ('a', 6)]
  11. sc.stop()#关闭入口
复制代码
filter()

功能:过滤想要的数据进行保存
语法:
rdd.filter(func)
函数范例 func: (T) --> bool
传入参数T,返回值必须是bool范例:True or False
而返回值为True的将会被保存
  1. from pyspark import SparkConf,SparkContext
  2. import os
  3. os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
  4. conf = SparkConf().setMaster("local[*]").setAppName("test_app")
  5. sc = SparkContext(conf = conf)
  6. #准备一个rdd对象
  7. rdd = sc.parallelize([1,2,3,4,5])
  8. #filter筛选偶数,返回值True的保留
  9. rdd = rdd.filter(lambda num: num%2==0)#返回值为num%2==0,逻辑判断语句返回的是True/False
  10. print(rdd.collect())#result:[2, 4]
  11. sc.stop()
复制代码
distinct()

对RDD数据进行去重,返回新的RDD
语法:
rdd.distinct() 无需传参
  1. from pyspark import SparkConf,SparkContext
  2. import os
  3. #准备入口环境
  4. os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
  5. conf = SparkConf().setMaster("local[*]").setAppName("test_app")
  6. sc =SparkContext(conf = conf)
  7. #准备一个rdd数据
  8. rdd=sc.parallelize([1,2,1,4,2,54,6,45,75,3,24,5,6,4,6])
  9. #将rdd数据去重
  10. rdd = rdd.distinct()
  11. print(rdd.collect())#result:[24, 1, 2, 75, 3, 4, 5, 54, 6, 45]
  12. sc.stop()#关闭入口
复制代码
sortBy()

对RDD数据进行排序,基于你指定的排序依据
语法:
rdd.sortBy(func ,ascending = False ,numPartitions=1)
funcT)-->U:须告知按照rdd中的哪个数据,进行排序,比如lambda x:x[1],代表rdd中的第二列元素进行排序
ascending:True升序(默认值),False降序
numPartitions:用几个分区排序,numPartitions=1
  1. from pyspark import SparkConf,SparkContext
  2. import os
  3. #准备入口环境
  4. os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
  5. conf = SparkConf().setMaster("local[*]").setAppName("test_app")
  6. sc =SparkContext(conf = conf)
  7. #准备一个rdd对象
  8. rdd=sc.parallelize([('itcast', 4), ('python', 6), ('itheima', 7), ('spark', 4), ('pyspark', 3)])
  9. #列表中每一个元组的的第二个元素为根据,进行排序
  10. sor_rdd = rdd.sortBy(lambda x:x[1],ascending=True,numPartitions=1)
  11. print(sort_rdd.collect())#result:[('pyspark', 3), ('itcast', 4), ('spark', 4), ('python', 6), ('itheima', 7)]
  12. # 关闭入口
  13. sc.stop()
复制代码
数据输出

collect()

将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
语法:
rdd.collect() 返回值是一个list
  1. from pyspark import SparkConf,SparkContext
  2. import os
  3. #准备入口环境
  4. os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
  5. conf = SparkConf().setMaster("local[*]").setAppName("test_app")
  6. sc = SparkContext(conf = conf)
  7. #准备一个rdd对象
  8. rdd = sc.parallelize([1,2,3,4,5])
  9. print(rdd)
  10. #输出的是一个RDD类对象名
  11. # result:ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289
  12. print(rdd.collect())
  13. # result:[1, 2, 3, 4, 5]
  14. sc.stop()#关闭入口
复制代码
reduce()

对RDD数据集按照你传入的逻辑进行聚合
语法:
rdd.reduce(func)
funcT,T) --> T
  1. from pyspark import SparkConf,SparkContext
  2. import os
  3. #准备入口环境
  4. os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
  5. conf = SparkConf().setMaster("local[*]").setAppName("test_app")
  6. sc = SparkContext(conf = conf)
  7. #准备一个rdd对象,1~9数字
  8. rdd = sc.parallelize(range(1,10))
  9. #逐个取出rdd数据,reduce聚合处理
  10. print(rdd.reduce(lambda a,b:a+b))
  11. sc.stop()#关闭入口
复制代码
lambda a,b:a+b内部计算大致过程:


take()

取RDD的前N个元素,组合成list返回法:
语法:
rdd.take()
  1. from pyspark import SparkConf,SparkContext
  2. import os
  3. #准备入口环境
  4. os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
  5. conf = SparkConf().setMaster("local[*]").setAppName("test_app")
  6. sc = SparkContext(conf = conf)
  7. # 准备一个RDD对象
  8. rdd = sc.parallelize([32,263,4,5,8,120])
  9. #take()取走前三个数据,返回存储在一个列表中
  10. take_list = rdd.take(3)
  11. print(take_list)
  12. # result:[32, 263, 4]
  13. sc.stop()#关闭入口
复制代码
count()

计算RDD 中有多少条数据,返回值为一个数字
语法:
rdd.count()
  1. from pyspark import SparkConf,SparkContext
  2. import os
  3. #准备入口环境
  4. os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
  5. conf = SparkConf().setMaster("local[*]").setAppName("test_app")
  6. sc = SparkContext(conf = conf)
  7. # 准备一个RDD对象
  8. rdd = sc.parallelize([32,263,4,5,8,120])
  9. # 计算RDD中有多少数据
  10. count = rdd.count()
  11. print(count)
  12. #result:6
  13. sc.stop()#关闭入口
复制代码
savaAsTextFile()

将RDD数据写入文本文件中
注意:文本文件的路径名字在未运行前需不存在
  1. from pyspark import SparkConf,SparkContext
  2. import os
  3. #准备入口环境
  4. os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
  5. conf = SparkConf().setMaster("local[*]").setAppName("test_app")
  6. sc = SparkContext(conf = conf)
  7. # 准备一个RDD对象
  8. rdd = sc.parallelize([32,263,4,5,8,120])
  9. # 将RDD数据存储到文件
  10. rdd.saveAsTextFile("D:/output1")
  11. sc.stop()
复制代码
如果出现题目:


原因:调用保存文件的savaAsTextFile(),需要配置Hadoop依赖
1.下载winutils.exe:https://codeload.github.com/cdarlint/winutils/zip/refs/heads/master
2.配置环境变量:
搜索:高级系统设置:

点击框选

点击环境变量

下一步:

下一步:

点击path这一行

点击新建

在输入框中填入以下内容:

最后一连点击三个确认.
之后再次运行上述代码,会在D盘文件中出现output1文件
打开后:

有很多文件这就表明当前的RDD有多个分区,
原因:saveAsTextFile()输出文件的个数是由RDD的分区决定的。
如果有24个文件,这表明有24分区
电脑CPU有多少核心数就默认有多少分区,
rdd数据会匀称的分散到各个分区去存储,
输出文件有的是空的,有的是列表中数据元素
将RDD分区修改为1个

方法一
SparkConf对象设置属性全局并行度为1:
  1. conf = SparkConf().setMaster("local[*]").setAppName("test_app")
  2. conf.set("spark.default.parallelism",1) sc =SparkContext(conf = conf)
  3. sc = SparkContext(conf = conf)
复制代码
方法二
创建RDD对象时设置(parallelize方法传入numSlices参数为1):
  1. rdd=sc.parallelize([32,263,4,5,8,120],1)
复制代码
 or:
  1. rdd=sc.parallelize([32,263,4,5,8,120],numSlices=1)
复制代码
加入以上方法之一以后的代码和运行结果:
  1. from pyspark import SparkConf,SparkContext
  2. import os
  3. #准备入口环境
  4. os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
  5. conf = SparkConf().setMaster("local[*]").setAppName("test_app")
  6. conf.set("spark.default.parallelism",1)
  7. sc = SparkContext(conf = conf)
  8. # 准备一个RDD对象,设置分区为1
  9. rdd = sc.parallelize([32,263,4,5,8,120],1)
  10. # 将RDD数据存储到文件
  11. rdd.saveAsTextFile("D:/output2")
  12. sc.stop()#关闭入口
复制代码
        D:/output2文件的内容

 part-00000的内容:




免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

光之使者

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表