spark概念
Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。
简而言之,spark是一款分布式的计算框架,用于调度成千上万的服务器集群,计算TB、PB乃至EB级别的海量数据
spark支持众多的编程语言进行开发,而python语言,则是spark的重点支持的方向。
pyspark是spark官方开发的python语言第三方库。
pyspark基础使用:
安装pyspark 包在命令提示符中 pip install --version
首先需要一个执行代码环境的入口对象:类 SparkContext的类对象
打开pycharm 基础示范:
- #第一步 导包
- from pyspark import SparkConf,SparkContext
- # 创建SparkConf类对象,服务器入口,执行环境的入口对象
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark、")
- #上一行为链式结构,
- #与下面三行作用相同
- """
- conf = SparkConf()
- conf.setMaster("local[*]")#服务器为本机或者是数据集群
- conf.setAppName("test_spark_app")#程序的名字"""
- #基于SparkConf类对象创建SparkContext类对象,服务器入口,执行环境的入口对象
- sc = SparkContext(conf=conf)#pyspark代码写入的入口,连接集群和驱动代码
- #打印Pyspark版本
- print(sc.version)
- #停止Pyspark程序运行
- sc.stop()#关闭入口
复制代码 结果为:
你所下载的pyspark第三包的版本
数据输入
RDD(弹性分布式数据集 Resilient Distributed Datassets)对象
SparkContext类对象是pyspark编程所有功能的入口,相当店肆的门,打开后才能进行售卖,制作
数据输入:
通过SparkContext类对象的成员方法完成数据的读取操作,读取后得到RDD类对象,
挑选原质料
数据处理计算:
通过RDD类对象的成员方法完成各种数据计算的需求,将原材做成面,蛋糕等成品
数据输出:
将处理完成后的RDD对象调用各种成员方法完成写出文件、转换为list等操作
相当于打包售卖
让我们来使用一下rdd对象来感受一下吧
- # 导包
- from pyspark import SparkConf,SparkContext
- # 创建SparkConf类对象连接服务器,设置配置
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
- #基于SparkConf类对象创建SparkContext类对象,服务器入口,执行环境的入口对象
- sc = SparkContext(conf=conf)
- # parallelize(数据容器)转化为RDD数据
- rdd1 = sc.parallelize([123,"hello",56])
- rdd2 = sc.parallelize((1,2,456))
- rdd3 = sc.parallelize("hello word")
- rdd4 = sc.parallelize({123,"jane",67})
- rdd5 = sc.parallelize({'key1': 50,'key2':60})
- # collect()收集数据
- print(rdd1.collect())#[123, 'hello', 56]
- print(rdd2.collect())#[1, 2, 456]
- print(rdd3.collect())#['h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'd']
- print(rdd4.collect())#[123, 67, 'jane']
- print(rdd5.collect())#['key1', 'key2']
- # 通过textFile将本地文件转化为RDD对象
- rdd =sc.textFile("D:\Hello.txt")
- print(rdd.collect())#['Hello world!', 'Meet you to nice!']
- #停止Pyspark程序运行
- sc.stop()
复制代码 此为hello.txt文件内容:
数据计算
map()
将RDD 数据一条条处理(处理逻辑取决于map吸收的处理函数),返回新的rdd
语法:
rdd.map(func)
func(): (T) -->U
():传入参数,(T):传入一个参数
-->U :返回值
(T)-->U:传入一个恣意范例的参数,返回一个返回值,范例不限
(A) -->A:传入一个恣意范例的参数,返回一个同范例参数
- from pyspark import SparkConf,SparkContext
- import os
- os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
- conf=SparkConf().setMaster("local[*]").setAppName("text_spark_app")
- sc =SparkContext(conf=conf)
- rdd = sc.parallelize([1,2,3,4,5])
- #map(func)传入参数为函数,rdd中的每一个数据都会执行func函数,返回RDD对象
- # 且func()是传入参数为T 而返回值为U,两者不同且存在
- rdd = rdd.map(lambda x:x*10)
- print(rdd.collect())#[10,20,30,40,50]
- sc.stop()
复制代码 如果以上map()代码出现这种报错环境:
而且为python3.13版本的,是由于pyspark与该版本不兼容,不能支持python3.13,
解决办法:
1.低落python版本
2.安装虚拟环境anaconda,在虚拟环境中安装python版本:
发起安装3.10
记得查询pyspark与哪个版本python兼容
flatMap()
对rdd执行map操作,然后进行排除嵌套操作:
-
- from pyspark import SparkContext,SparkConf
- import os
- os.environ['PYSPARK_PYTHON'] ="D:conda/envs/myenv/python.exe"
- conf = SparkConf().setMaster("local[*]").setAppName("text_spark_app")
- sc = SparkConftext(conf = conf)
- #准备一个rdd
- rdd = sc.parallelize(["itheima itcast 666","itheima itheima itcast","python itheima"])
- rdd = rdd.map(lambda x: x.split())
- print(rdd.collect())#[["itheima","itcast","666"],["itheima","itheima","itcast"],["python","itheima"]]
- #将单词逐个取出,逐个封装到列表中
- rdd = rdd.flatMap(lambda x: x.split())
- print(rdd.collect())#["itheima","itcast","666","itheima","itheima","itcast","python","itheima"]
- #关闭入口
- sc.stop()
复制代码 reduceByKey()
针对KV型(->二元元组,即(10,"it"))RDD,
自动按key(二元元组的第一个元素)分组,
然后根据你提供的聚合限定,完
成组内数据value(二元元组的第二个元素)的聚合操作
语法:
rdd.reduceByKey(func)
func:(v,v) ->v 传入两个返回一个,范例相称
代码实现:
- from pyspark import SparkConf,SparkContext
- import os
- os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
- conf = SparkConf().setMaster("local[*]").setAppName("test_app")
- sc = SparkContext(conf = conf)
- rdd = sc.parallelize([('a',1),('b',5),('a',5),('b',9)])
- #以元组的第一个进行分组,以第二元素进行相加
- rdd = rdd.reduceByKey(lambda a,b:a+b)
- print(rdd.collect())
- #[('b', 14), ('a', 6)]
- sc.stop()#关闭入口
复制代码 filter()
功能:过滤想要的数据进行保存
语法:
rdd.filter(func)
函数范例 func: (T) --> bool
传入参数T,返回值必须是bool范例:True or False
而返回值为True的将会被保存
- from pyspark import SparkConf,SparkContext
- import os
- os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
- conf = SparkConf().setMaster("local[*]").setAppName("test_app")
- sc = SparkContext(conf = conf)
- #准备一个rdd对象
- rdd = sc.parallelize([1,2,3,4,5])
- #filter筛选偶数,返回值True的保留
- rdd = rdd.filter(lambda num: num%2==0)#返回值为num%2==0,逻辑判断语句返回的是True/False
- print(rdd.collect())#result:[2, 4]
- sc.stop()
复制代码 distinct()
对RDD数据进行去重,返回新的RDD
语法:
rdd.distinct() 无需传参
- from pyspark import SparkConf,SparkContext
- import os
- #准备入口环境
- os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
- conf = SparkConf().setMaster("local[*]").setAppName("test_app")
- sc =SparkContext(conf = conf)
- #准备一个rdd数据
- rdd=sc.parallelize([1,2,1,4,2,54,6,45,75,3,24,5,6,4,6])
- #将rdd数据去重
- rdd = rdd.distinct()
- print(rdd.collect())#result:[24, 1, 2, 75, 3, 4, 5, 54, 6, 45]
- sc.stop()#关闭入口
复制代码 sortBy()
对RDD数据进行排序,基于你指定的排序依据
语法:
rdd.sortBy(func ,ascending = False ,numPartitions=1)
func T)-->U:须告知按照rdd中的哪个数据,进行排序,比如lambda x:x[1],代表rdd中的第二列元素进行排序
ascending:True升序(默认值),False降序
numPartitions:用几个分区排序,numPartitions=1
- from pyspark import SparkConf,SparkContext
- import os
- #准备入口环境
- os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
- conf = SparkConf().setMaster("local[*]").setAppName("test_app")
- sc =SparkContext(conf = conf)
- #准备一个rdd对象
- rdd=sc.parallelize([('itcast', 4), ('python', 6), ('itheima', 7), ('spark', 4), ('pyspark', 3)])
- #列表中每一个元组的的第二个元素为根据,进行排序
- sor_rdd = rdd.sortBy(lambda x:x[1],ascending=True,numPartitions=1)
- print(sort_rdd.collect())#result:[('pyspark', 3), ('itcast', 4), ('spark', 4), ('python', 6), ('itheima', 7)]
- # 关闭入口
- sc.stop()
复制代码 数据输出
collect()
将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
语法:
rdd.collect() 返回值是一个list
- from pyspark import SparkConf,SparkContext
- import os
- #准备入口环境
- os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
- conf = SparkConf().setMaster("local[*]").setAppName("test_app")
- sc = SparkContext(conf = conf)
- #准备一个rdd对象
- rdd = sc.parallelize([1,2,3,4,5])
- print(rdd)
- #输出的是一个RDD类对象名
- # result:ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289
- print(rdd.collect())
- # result:[1, 2, 3, 4, 5]
- sc.stop()#关闭入口
复制代码 reduce()
对RDD数据集按照你传入的逻辑进行聚合
语法:
rdd.reduce(func)
func T,T) --> T
- from pyspark import SparkConf,SparkContext
- import os
- #准备入口环境
- os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
- conf = SparkConf().setMaster("local[*]").setAppName("test_app")
- sc = SparkContext(conf = conf)
- #准备一个rdd对象,1~9数字
- rdd = sc.parallelize(range(1,10))
- #逐个取出rdd数据,reduce聚合处理
- print(rdd.reduce(lambda a,b:a+b))
- sc.stop()#关闭入口
复制代码 lambda a,b:a+b内部计算大致过程:
take()
取RDD的前N个元素,组合成list返回法:
语法:
rdd.take()
- from pyspark import SparkConf,SparkContext
- import os
- #准备入口环境
- os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
- conf = SparkConf().setMaster("local[*]").setAppName("test_app")
- sc = SparkContext(conf = conf)
- # 准备一个RDD对象
- rdd = sc.parallelize([32,263,4,5,8,120])
- #take()取走前三个数据,返回存储在一个列表中
- take_list = rdd.take(3)
- print(take_list)
- # result:[32, 263, 4]
- sc.stop()#关闭入口
复制代码 count()
计算RDD 中有多少条数据,返回值为一个数字
语法:
rdd.count()
- from pyspark import SparkConf,SparkContext
- import os
- #准备入口环境
- os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
- conf = SparkConf().setMaster("local[*]").setAppName("test_app")
- sc = SparkContext(conf = conf)
- # 准备一个RDD对象
- rdd = sc.parallelize([32,263,4,5,8,120])
- # 计算RDD中有多少数据
- count = rdd.count()
- print(count)
- #result:6
- sc.stop()#关闭入口
复制代码 savaAsTextFile()
将RDD数据写入文本文件中
注意:文本文件的路径名字在未运行前需不存在
- from pyspark import SparkConf,SparkContext
- import os
- #准备入口环境
- os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
- conf = SparkConf().setMaster("local[*]").setAppName("test_app")
- sc = SparkContext(conf = conf)
- # 准备一个RDD对象
- rdd = sc.parallelize([32,263,4,5,8,120])
- # 将RDD数据存储到文件
- rdd.saveAsTextFile("D:/output1")
- sc.stop()
复制代码 如果出现题目:
原因:调用保存文件的savaAsTextFile(),需要配置Hadoop依赖
1.下载winutils.exe:https://codeload.github.com/cdarlint/winutils/zip/refs/heads/master
2.配置环境变量:
搜索:高级系统设置:
点击框选
点击环境变量
下一步:
下一步:
点击path这一行
点击新建
在输入框中填入以下内容:
最后一连点击三个确认.
之后再次运行上述代码,会在D盘文件中出现output1文件
打开后:
有很多文件这就表明当前的RDD有多个分区,
原因:saveAsTextFile()输出文件的个数是由RDD的分区决定的。
如果有24个文件,这表明有24分区
电脑CPU有多少核心数就默认有多少分区,
rdd数据会匀称的分散到各个分区去存储,
输出文件有的是空的,有的是列表中数据元素
将RDD分区修改为1个
方法一:
SparkConf对象设置属性全局并行度为1:
- conf = SparkConf().setMaster("local[*]").setAppName("test_app")
- conf.set("spark.default.parallelism",1) sc =SparkContext(conf = conf)
- sc = SparkContext(conf = conf)
复制代码 方法二:
创建RDD对象时设置(parallelize方法传入numSlices参数为1):
- rdd=sc.parallelize([32,263,4,5,8,120],1)
复制代码 or:
- rdd=sc.parallelize([32,263,4,5,8,120],numSlices=1)
复制代码 加入以上方法之一以后的代码和运行结果:
- from pyspark import SparkConf,SparkContext
- import os
- #准备入口环境
- os.environ['PYSPARK_PYTHON'] = "D:conda/envs/myenv/python.exe"
- conf = SparkConf().setMaster("local[*]").setAppName("test_app")
- conf.set("spark.default.parallelism",1)
- sc = SparkContext(conf = conf)
- # 准备一个RDD对象,设置分区为1
- rdd = sc.parallelize([32,263,4,5,8,120],1)
- # 将RDD数据存储到文件
- rdd.saveAsTextFile("D:/output2")
- sc.stop()#关闭入口
复制代码 D:/output2文件的内容
part-00000的内容:
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |