光之使者 发表于 2025-4-23 08:33:47

pyspark详细讲解



spark概念

Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。
简而言之,spark是一款分布式的计算框架,用于调度成千上万的服务器集群,计算TB、PB乃至EB级别的海量数据
spark支持众多的编程语言进行开发,而python语言,则是spark的重点支持的方向。
pyspark是spark官方开发的python语言第三方库。
https://i-blog.csdnimg.cn/direct/b56d35666fc84884b59f23160dcb69c0.png
pyspark基础使用:

安装pyspark 包在命令提示符中 pip install --version
首先需要一个执行代码环境的入口对象:类 SparkContext的类对象
打开pycharm 基础示范:
#第一步 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象,服务器入口,执行环境的入口对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark、")
#上一行为链式结构,
#与下面三行作用相同
"""
conf = SparkConf()
conf.setMaster("local[*]")#服务器为本机或者是数据集群
conf.setAppName("test_spark_app")#程序的名字"""

#基于SparkConf类对象创建SparkContext类对象,服务器入口,执行环境的入口对象
sc = SparkContext(conf=conf)#pyspark代码写入的入口,连接集群和驱动代码
#打印Pyspark版本
print(sc.version)
#停止Pyspark程序运行
sc.stop()#关闭入口 结果为:
你所下载的pyspark第三包的版本
数据输入

RDD(弹性分布式数据集 Resilient Distributed Datassets)对象

SparkContext类对象是pyspark编程所有功能的入口,相当店肆的门,打开后才能进行售卖,制作
数据输入:
通过SparkContext类对象的成员方法完成数据的读取操作,读取后得到RDD类对象,
挑选原质料
数据处理计算:
通过RDD类对象的成员方法完成各种数据计算的需求,将原材做成面,蛋糕等成品
数据输出:
将处理完成后的RDD对象调用各种成员方法完成写出文件、转换为list等操作
相当于打包售卖
https://i-blog.csdnimg.cn/direct/f37d5a2da03a4a76892435bd8fbed9b5.png
让我们来使用一下rdd对象来感受一下吧
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象连接服务器,设置配置
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
#基于SparkConf类对象创建SparkContext类对象,服务器入口,执行环境的入口对象
sc = SparkContext(conf=conf)

# parallelize(数据容器)转化为RDD数据
rdd1 = sc.parallelize()
rdd2 = sc.parallelize((1,2,456))
rdd3 = sc.parallelize("hello word")
rdd4 = sc.parallelize({123,"jane",67})
rdd5 = sc.parallelize({'key1': 50,'key2':60})

# collect()收集数据
print(rdd1.collect())#
print(rdd2.collect())#
print(rdd3.collect())#['h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'd']
print(rdd4.collect())#
print(rdd5.collect())#['key1', 'key2']

# 通过textFile将本地文件转化为RDD对象
rdd =sc.textFile("D:\Hello.txt")

print(rdd.collect())#['Hello world!', 'Meet you to nice!']
#停止Pyspark程序运行
sc.stop() 此为hello.txt文件内容:
https://i-blog.csdnimg.cn/direct/b3852eec346a4ca48542ad218c36aa55.png
数据计算

map()

将RDD 数据一条条处理(处理逻辑取决于map吸收的处理函数),返回新的rdd
语法:
rdd.map(func)
func(): (T) -->U
():传入参数,(T):传入一个参数
-->U :返回值
(T)-->U:传入一个恣意范例的参数,返回一个返回值,范例不限
(A) -->A:传入一个恣意范例的参数,返回一个同范例参数
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
conf=SparkConf().setMaster("local[*]").setAppName("text_spark_app")
sc =SparkContext(conf=conf)
rdd = sc.parallelize()
#map(func)传入参数为函数,rdd中的每一个数据都会执行func函数,返回RDD对象
# 且func()是传入参数为T 而返回值为U,两者不同且存在
rdd = rdd.map(lambda x:x*10)
print(rdd.collect())#
sc.stop() 如果以上map()代码出现这种报错环境:

https://i-blog.csdnimg.cn/direct/f5d2fade13294279ba509ee3b0e41b24.png
而且为python3.13版本的,是由于pyspark与该版本不兼容,不能支持python3.13,
解决办法:
1.低落python版本
2.安装虚拟环境anaconda,在虚拟环境中安装python版本:
发起安装3.10
记得查询pyspark与哪个版本python兼容
flatMap()

对rdd执行map操作,然后进行排除嵌套操作:

from pyspark import SparkContext,SparkConf
import os
os.environ['PYSPARK_PYTHON'] ="D:conda/envs/myenv/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("text_spark_app")
sc = SparkConftext(conf = conf)

#准备一个rdd
rdd = sc.parallelize(["itheima itcast 666","itheima itheima itcast","python itheima"])
rdd = rdd.map(lambda x: x.split())
print(rdd.collect())#[["itheima","itcast","666"],["itheima","itheima","itcast"],["python","itheima"]]

#将单词逐个取出,逐个封装到列表中
rdd = rdd.flatMap(lambda x: x.split())
print(rdd.collect())#["itheima","itcast","666","itheima","itheima","itcast","python","itheima"]

#关闭入口
​sc.stop() reduceByKey()

针对KV型(->二元元组,即(10,"it"))RDD,
自动按key(二元元组的第一个元素)分组,
然后根据你提供的聚合限定,完
成组内数据value(二元元组的第二个元素)的聚合操作
语法:
rdd.reduceByKey(func)
func:(v,v) ->v 传入两个返回一个,范例相称
代码实现:
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_app")
sc = SparkContext(conf = conf)

rdd = sc.parallelize([('a',1),('b',5),('a',5),('b',9)])

#以元组的第一个进行分组,以第二元素进行相加
rdd = rdd.reduceByKey(lambda a,b:a+b)

print(rdd.collect())
#[('b', 14), ('a', 6)]
sc.stop()#关闭入口 filter()

功能:过滤想要的数据进行保存
语法:
rdd.filter(func)
函数范例 func: (T) --> bool
传入参数T,返回值必须是bool范例:True or False
而返回值为True的将会被保存
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_app")
sc = SparkContext(conf = conf)
#准备一个rdd对象
rdd = sc.parallelize()
#filter筛选偶数,返回值True的保留
rdd = rdd.filter(lambda num: num%2==0)#返回值为num%2==0,逻辑判断语句返回的是True/False
print(rdd.collect())#result:
sc.stop() distinct()

对RDD数据进行去重,返回新的RDD
语法:
rdd.distinct() 无需传参
from pyspark import SparkConf,SparkContext
import os
#准备入口环境
os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_app")
sc =SparkContext(conf = conf)
#准备一个rdd数据
rdd=sc.parallelize()
#将rdd数据去重
rdd = rdd.distinct()
print(rdd.collect())#result:
sc.stop()#关闭入口 sortBy()

对RDD数据进行排序,基于你指定的排序依据
语法:
rdd.sortBy(func ,ascending = False ,numPartitions=1)
func:(T)-->U:须告知按照rdd中的哪个数据,进行排序,比如lambda x:x,代表rdd中的第二列元素进行排序
ascending:True升序(默认值),False降序
numPartitions:用几个分区排序,numPartitions=1
from pyspark import SparkConf,SparkContext
import os
#准备入口环境
os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_app")
sc =SparkContext(conf = conf)
#准备一个rdd对象
rdd=sc.parallelize([('itcast', 4), ('python', 6), ('itheima', 7), ('spark', 4), ('pyspark', 3)])
#列表中每一个元组的的第二个元素为根据,进行排序
sor_rdd = rdd.sortBy(lambda x:x,ascending=True,numPartitions=1)
print(sort_rdd.collect())#result:[('pyspark', 3), ('itcast', 4), ('spark', 4), ('python', 6), ('itheima', 7)]
# 关闭入口
sc.stop() 数据输出

collect()

将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
语法:
rdd.collect() 返回值是一个list
from pyspark import SparkConf,SparkContext
import os
#准备入口环境
os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_app")
sc = SparkContext(conf = conf)
#准备一个rdd对象
rdd = sc.parallelize()
print(rdd)
#输出的是一个RDD类对象名
# result:ParallelCollectionRDD at readRDDFromFile at PythonRDD.scala:289
print(rdd.collect())
# result:
sc.stop()#关闭入口 reduce()

对RDD数据集按照你传入的逻辑进行聚合
语法:
rdd.reduce(func)
func:(T,T) --> T
from pyspark import SparkConf,SparkContext
import os
#准备入口环境
os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_app")
sc = SparkContext(conf = conf)
#准备一个rdd对象,1~9数字
rdd = sc.parallelize(range(1,10))
#逐个取出rdd数据,reduce聚合处理
print(rdd.reduce(lambda a,b:a+b))
sc.stop()#关闭入口 lambda a,b:a+b内部计算大致过程:
https://i-blog.csdnimg.cn/direct/9cd91e8e9e9d4125b06668944ec42c2e.png

take()

取RDD的前N个元素,组合成list返回法:
语法:
rdd.take()
from pyspark import SparkConf,SparkContext
import os
#准备入口环境
os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_app")
sc = SparkContext(conf = conf)
# 准备一个RDD对象
rdd = sc.parallelize()
#take()取走前三个数据,返回存储在一个列表中
take_list = rdd.take(3)
print(take_list)
# result:
sc.stop()#关闭入口 count()

计算RDD 中有多少条数据,返回值为一个数字
语法:
rdd.count()
from pyspark import SparkConf,SparkContext
import os
#准备入口环境
os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_app")
sc = SparkContext(conf = conf)
# 准备一个RDD对象
rdd = sc.parallelize()
# 计算RDD中有多少数据
count = rdd.count()
print(count)
#result:6
sc.stop()#关闭入口 savaAsTextFile()

将RDD数据写入文本文件中
注意:文本文件的路径名字在未运行前需不存在
from pyspark import SparkConf,SparkContext
import os
#准备入口环境
os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_app")
sc = SparkContext(conf = conf)
# 准备一个RDD对象
rdd = sc.parallelize()
# 将RDD数据存储到文件
rdd.saveAsTextFile("D:/output1")
sc.stop() 如果出现题目:

https://i-blog.csdnimg.cn/direct/d48fedd47a9a4b589c22aa0d1d989971.png
原因:调用保存文件的savaAsTextFile(),需要配置Hadoop依赖
1.下载winutils.exe:https://codeload.github.com/cdarlint/winutils/zip/refs/heads/master
2.配置环境变量:
搜索:高级系统设置:
https://i-blog.csdnimg.cn/direct/3eea3b09f8d140f6944bdbc3ac0c1ff4.png
点击框选
https://i-blog.csdnimg.cn/direct/65c7611db0174eeeb84d4f6e48ad6b6b.png
点击环境变量
https://i-blog.csdnimg.cn/direct/c4ec102394e6407bbe9fa105c764f9c0.png
下一步:
https://i-blog.csdnimg.cn/direct/9334c3078fe2433dbb92cdeb5b8e32a0.png
下一步:
https://i-blog.csdnimg.cn/direct/4ac18c2a833f4003ad9271d684839192.png
点击path这一行
https://i-blog.csdnimg.cn/direct/afb48bec8532478ea4a410a1ac58b79c.png
点击新建
https://i-blog.csdnimg.cn/direct/9479d45da33646c6949e19d373670580.png
在输入框中填入以下内容:
https://i-blog.csdnimg.cn/direct/ad44dac12294450ab294ed13a824437f.png
最后一连点击三个确认.
之后再次运行上述代码,会在D盘文件中出现output1文件
打开后:
https://i-blog.csdnimg.cn/direct/06db45bad1cf4696883ea93a08d7676c.png
有很多文件这就表明当前的RDD有多个分区,
原因:saveAsTextFile()输出文件的个数是由RDD的分区决定的。
如果有24个文件,这表明有24分区
电脑CPU有多少核心数就默认有多少分区,
rdd数据会匀称的分散到各个分区去存储,
输出文件有的是空的,有的是列表中数据元素
将RDD分区修改为1个

方法一:
SparkConf对象设置属性全局并行度为1:
conf = SparkConf().setMaster("local[*]").setAppName("test_app")

conf.set("spark.default.parallelism",1) sc =SparkContext(conf = conf)

sc = SparkContext(conf = conf) 方法二:
创建RDD对象时设置(parallelize方法传入numSlices参数为1):
rdd=sc.parallelize(,1)  or:
rdd=sc.parallelize(,numSlices=1) 加入以上方法之一以后的代码和运行结果:
from pyspark import SparkConf,SparkContext
import os
#准备入口环境
os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_app")
conf.set("spark.default.parallelism",1)
sc = SparkContext(conf = conf)
# 准备一个RDD对象,设置分区为1
rdd = sc.parallelize(,1)
# 将RDD数据存储到文件
rdd.saveAsTextFile("D:/output2")
sc.stop()#关闭入口         D:/output2文件的内容
https://i-blog.csdnimg.cn/direct/5291db8a314748369b22cd2b99fde416.png
 part-00000的内容:
https://i-blog.csdnimg.cn/direct/5139eef0786a4c2ab7023efa3c32a1f3.png



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: pyspark详细讲解