ToB企服应用市场:ToB评测及商务社交产业平台
标题:
自用 云盘算 | pyspark | 常见RDD算子及例子(云盘算期末)
[打印本页]
作者:
数据人与超自然意识
时间:
2024-6-21 13:45
标题:
自用 云盘算 | pyspark | 常见RDD算子及例子(云盘算期末)
媒介
期末复习
主要包括云盘算基本的概念
常见的pyspark算子于对应的例子
一、基本概念(云盘算、RDD等)
1. 云盘算的定义和优点
定义:云盘算是一种通过互联网提供盘算服务的技术。相比于传统盘算,它的资源获取方式,从“买”变为“租”
优点:
资源池化
弹性伸缩
安全可靠
2.RDD定义和五大特性
定义 :RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行盘算的集合。
五大特性:
RDD由一系列的分区组成
RDD的方法会作用在其所有的分区上
RDD之间是有依赖关系(RDD有血缘关系)
Key-Value型的RDD可以有分区器
RDD的分区规划,会尽量靠近数据所在的服务器
3. 云盘算部署类型(私有云/公有云/社区云/混淆云之间的区别)
部署类型私有云公有云社区云混淆云定义由单一构造独享的云盘算资源。由第三方服务提供商提供的云盘算资源,面向公众或多个租户。由多个构造共同拥有和利用的云盘算资源,通常有相似的需求和目的。联合了私有云和公有云的优点,允许数据和应用在私有云和公有云之间进行迁移和交互。优点1. 安全;
2.自主可控。1.本钱低;
2.高扩展性;
3.便捷性。1.本钱共享;
2.安全性和隐私性;
3.协作。1.灵活性;
2.本钱效益;
3.高可用性。缺点1.本钱高;
2.维护复杂;
3.扩展性有限。1.安全性和隐私性;
2.控制权有限;
3.依赖性。1. 复杂性;
2.扩展性受限;
3.合规性问题。1. 管理复杂;
2.安全挑战;
3.集成难度。
4. 云盘算服务类型
Infrastructure as a Service(IaaS):根本设备即服务
Platform as a Service(PaaS):平台即服务
Software as a Service(SaaS):软件即服务
二、常见RDD算子及例子
分布式集合对象上的API称之为算子
Python中利用pyspark初始化
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
//构建SparkConf对象
conf = SparkConf().setAppName('helloworld').setMaster('local[*]')
//构建SparkContext执行环境入口对象
sc = SparkContext(conf = conf)
复制代码
1. Transformation算子:举措算子
1)map算子和flatMap算子的区别和接洽
map
:是将RDD的数据一条条处置惩罚,返回新的RDD
一对一映射。
返回一个新的RDD,此中的每个元素是通过函数转换得到的单个元素。
适用于输入和输出元素之间存在一对一关系的情况。
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
#使用map 算子将每个元素乘以 2
mapped_rdd = rdd.map(lambda x: x * 2)
print(mapped_rdd.collect())
#输出: [2, 4, 6, 8, 10]
复制代码
flatMap
: 对rdd执行map操作,然后进行解除嵌套操作
一对多映射。
返回一个新的RDD,此中每个输入元素可以被映射为0个或多个输出元素。最终结果是扁平化的。
适用于输入元素必要映射为多个输出元素的情况,例如将句子拆分为单词。
data = ["hello world", "apache spark"]
rdd = sc.parallelize(data)
// 使用 flatMap 算子将每个字符串拆分为单词
flat_mapped_rdd = rdd.flatMap(lambda x: x.split(" "))
print(flat_mapped_rdd.collect())
// 输出: ['hello', 'world', 'apache', 'spark']
复制代码
2)sortBy算子和sortByKey算子的接洽区别
sortBy和sortByKey算子都是用于对RDD进行排序的
sortBy
可以对任意类型的RDD进行排序
必要指定一个函数,该函数从RDD的元素中提取排序键
data = [("Alice", 23), ("Bob", 20), ("Charlie", 25)]
rdd = sc.parallelize(data)
//按年龄排序
sorted_rdd = rdd.sortBy(lambda x: x[1])
print(sorted_rdd.collect())
//输出: [('Bob', 20), ('Alice', 23), ('Charlie', 25)]
复制代码
sortByKey
仅适用于键值对情势的RDD
根据键值对进行排序
data = [("Alice", 23), ("Bob", 20), ("Charlie", 25)]
rdd = sc.parallelize(data)
//按名字排序
sorted_rdd = rdd.sortByKey()
print(sorted_rdd.collect())
//输出: [('Alice', 23), ('Bob', 20), ('Charlie', 25)]
复制代码
rdd = sc.parallelize([('a', 1), ('E', 1), ('C', 1), ('D', 1), ('b', 1), ('g', 1), ('f', 1)], 3)
// 默认按照key进行升序排序
print("默认: ",rdd.sortByKey().collect())
#如果要确保全局有序,排序分区数要给1,不是1的话,只能确保各个分区内排好序整体上不保证
print("多分区: ",rdd.sortByKey(ascending=False,numPartitions=5).collect())
#对排序的key进行处理,拍排序前处理一下key ,让key以你处理的样子进行排序(不影响数据本身)
print("单分区: ",rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda key: str(key).lower()).collect())
// 默认: [('C', 1), ('D', 1), ('E', 1), ('a', 1), ('b', 1), ('f', 1), ('g', 1)]
// 多分区: [('g', 1), ('f', 1), ('b', 1), ('a', 1), ('E', 1), ('D', 1), ('C', 1)]
// 单分区: [('a', 1), ('b', 1), ('C', 1), ('D', 1), ('E', 1), ('f', 1), ('g', 1)]
复制代码
3)reduceByKey算子和groupByKey算子的接洽区别
reduceByKey和groupByKey算子都是用于对键值对(Pair RDD)进行操作的算子
reduceByKey
对相同键的值进行合并,返回每个键一个单一的值。
适用于必要对键对应的值进行聚合操作的情况,如求和、求均匀等。
在本地进行预聚合,减少网络传输,性能较高。
data = [("a", 1), ("b", 1), ("a", 2)]
rdd = sc.parallelize(data)
// 对相同键的值进行求和
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)
print(reduced_rdd.collect())
// 输出: [('a', 3), ('b', 1)]
复制代码
groupeByKey
对相同键的值进行分组,返回每个键一个可迭代对象。
适用于必要对键对应的值进行分组操作的情况,如必要对分组后的数据进行进一步处置惩罚。
可能会导致数据倾斜和网络传输增多,性能较低
rdd = sc.parallelize([1,2,3,4,5])
// 分组,将数字分层偶数和奇数2个组
rdd2 = rdd.groupBy(lambda num: 'even' if (num % 2 == 0) else 'odd')
// 将rdd2的元素的value转换成list,这样print可以输出内容.
print(rdd2.map(lambda x: (x[0], list(x[1]))).collect())
// [('odd', [1, 3, 5]), ('even', [2, 4])]
复制代码
rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 2), ('b', 3)])
result = rdd.groupBy(lambda t: t[0])
print(result.map(lambda t:(t[0], list(t[1]))).collect())
// [('b', [('b', 2), ('b', 3)]), ('a', [('a', 1), ('a', 1)])]
复制代码
data = [("a", 1), ("b", 1), ("a", 2)]
rdd = sc.parallelize(data)
// 对相同键的值进行分组
grouped_rdd = rdd.groupByKey()
// 将结果转换为列表以便查看
result = [(k, list(v)) for k, v in grouped_rdd.collect()]
print(result)
// 输出: [('a', [1, 2]), ('b', [1])]
复制代码
4)filter、distinct、union算子
filter算子
:返回是True的数据被保留,False的数据被扬弃
rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
// 通过Filter算子, 保留奇数
result = rdd.filter(lambda x: x % 2 == 1)
print(result.collect())
// 结果:[1,3,5]
复制代码
rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
// 保留奇数
rdd.filter(lambda x: True if (x % 2 == 1) else False)
print(rdd.filter(lambda x: x % 2 == 1).collect())
// 结果:[1,3,5]
复制代码
distinct算子
:去重操作
rdd1 = sc.parallelize([('a', 1), ('a', 1), ('a', 3)])
print(rdd1.distinct().collect())
// 结果: [('a', 3), ('a', 1)]
复制代码
union算子
:2个rdd合并成1个rdd返回
union算子是不会去重。
RDD的类型差异也是可以合并的
rdd1 = sc.parallelize([1, 1, 3, 3])
rdd2 = sc.parallelize(["a", "b", "a"])
rdd3 = rdd1.union(rdd2)
print(rdd3.collect())
// 结果: [1, 1, 3, 3, 'a', 'b', 'a']
复制代码
5)join、leftOuterJoin和rightOuterJoin算子
join
: join算子只能用于二元元组
结果RDD包罗两个RDD中键相同的元素对
结果RDD中的元素格式为 (key, (value1, value2))。
data1 = [("a", 1), ("b", 2)]
data2 = [("a", 3), ("a", 4), ("b", 5)]
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)
// 内连接
joined_rdd = rdd1.join(rdd2)
print(joined_rdd.collect())
// 输出: [('a', (1, 3)), ('a', (1, 4)), ('b', (2, 5))]
复制代码
leftOuterJoin 算子
返回左RDD中的所有元素,对于右RDD中没有匹配的键,用None添补
结果RDD中的元素格式为 (key, (value1, value2_opt)),value2_opt为None或value2。
data1 = [("a", 1), ("b", 2), ("c", 3)]
data2 = [("a", 3), ("a", 4), ("b", 5)]
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)
// 左外连接
left_joined_rdd = rdd1.leftOuterJoin(rdd2)
print(left_joined_rdd.collect())
// 输出: [('a', (1, 3)), ('a', (1, 4)), ('b', (2, 5)), ('c', (3, None))]
复制代码
rightOuterJoin 算子
返回右RDD中的所有元素,对于左RDD中没有匹配的键,用None添补。
结果RDD中的元素格式为 (key, (value1_opt, value2)),value1_opt为None或value1。
data1 = [("a", 1), ("b", 2), ("d", 4)]
data2 = [("a", 3), ("a", 4), ("b", 5), ("c", 6)]
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)
// 右外连接
right_joined_rdd = rdd1.rightOuterJoin(rdd2)
print(right_joined_rdd.collect())
// 输出: [('a', (1, 3)), ('a', (1, 4)), ('b', (2, 5)), ('c', (None, 6))]
复制代码
6)intersection、glom算子
intersection算子
:求2个rdd的交集,返回一个新rdd
rdd1 = sc.parallelize([('a', 1), ('a', 3)])
rdd2 = sc.parallelize([('a', 1), ('b', 3)])
// 通过intersection算子求RDD之间的交集, 将交集取出 返回新RDD
rdd3 = rdd1.intersection(rdd2)
print(rdd3.collect())
// 结果:[('a', 1)]
复制代码
glom
:将RDD的数据,加上嵌套,这个嵌套按照分区 来进行
比如RDD数据[1,2,3,4,5]有2个分区那么,被glom后,数据酿成:[[1,2,3],[4,5]]
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 2)
print(rdd.glom().flatMap(lambda x: x).collect())
//结果:[[1, 2, 3, 4], [5, 6, 7, 8, 9]]
复制代码
2. Action算子:举措算子
countByKey
: 统计key出现的次数(一样平常适用KV型RDD)
rdd = sc.textFile("../data/input/words.txt")
rdd2 = rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1))
# 通过countByKey来对key进行计数, 这是一个Action算子
# result 不是rdd而是dict
result = rdd2.countByKey()
print(result)
print(type(result))
# 结果:defaultdict(<class 'int'>, {'hello': 3, 'xing': 2, 'qian': 4})<class 'collections.defaultdict'>
复制代码
reduce
:对RDD数据集按照你传入的逻辑进行聚合
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.reduce(lambda a, b: a + b))
# 结果:15
复制代码
fold
:对RDD数据集按照你传入的逻辑进行聚合
这个初始值聚合,会作用在:分区内聚合、分区间聚合
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)
print(rdd.fold(10, lambda a, b: a + b))
# 结果:85
#分区1: 123聚合的时候带上10作为初始值得到16=1+2+3+ 10
#分区2: 456聚合的时候带上10作为初始值得到25 =4+5+6+ 10
#分区3: 789聚合的时候带上10作为初始值得到34 =7+8+9+ 10
#3个分区的结果做聚合也带上初始值10,
#所以结果是: 16+25+34+10 = 85
复制代码
first
:取出RDD的第一个元素
result = sc.parallelize([2,4,5,6,7]).first()
print(result)
# 结果 2
复制代码
take
:取RDD的前N个元素,组合成list返回给你
result = sc.parallelize([22,4,5,346,7,56,78,55]).take(5)
print(result)
# 结果 [22, 4, 5, 346, 7]
复制代码
top
:对RDD数据集进行降序排序,取前N个
result = sc.parallelize([22,4,5,346,7,56,78,55]).top(5)
print(result)
# 结果:[346, 78, 56, 55, 22]
复制代码
count
:盘算RDD有多少条数据,返回值是一个数字
result = sc.parallelize([22,4,5,346,7,56,78,55]).count(5)
print(result)
# 结果:8
复制代码
takeSample
:随机抽样RDD的数据
rdd = sc.parallelize([1, 3, 5, 3, 1, 3, 2, 6, 7, 8, 6], 1)
print(rdd.takeSample(False, 5, 1))
# 结果:[2, 7, 6, 6, 3]
复制代码
takeOrdered
:对RDD进行排序取前N个
rdd = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 1)
print(rdd.takeOrdered(3, lambda x: -x))
# 结果: [9, 7, 6]
复制代码
foreach
:对RDD的每一个元素,执行你提供的逻辑的操作(和map一个意思),但是这个方法没有返回值
rdd = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 1)
#注意,forreach函数没有返回值,因此在里面直接打印了
result = rdd.foreach(lambda x: print(x * 10, end=" "))
#这个result对象是None
print(result)
# 结果:None
复制代码
saveAsTextFile
:将RDD的数据写入文本文件中支持本地写出, hdfs等文件系统.
P.S.
foreach 与 saveAsTestFile这两个算子是分区(Executor) 跳过Driver,由分区所在的Executor直接执行。别的的Action算子都会将结果发送至Driver
编程题
某个类别在哪些城市销售(以电脑为例子)
盘算IDF值
总结
自用
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4