Spark的算子

打印 上一主题 下一主题

主题 792|帖子 792|积分 2376

目次
一、算子
二、转换算子(Transformations)
2.1、map算子
2.2、flatMap算子 
2.3、filter算子
2.4、union算子
2.5、distinct算子
2.6、分组聚合算子
2.6.1groupByKey算子
2.6.2reduceByKey算子
2.7、排序算子
2.7.1sortBy算子
2.7.2sortByKey
2.8、重分区算子
 2.8.1repartition算子
2.8.2coalesce算子 
三、动作算子(Action)
3.1count
3.2、foreach算子
3.3、saveAsTextFile算子
3.4、first 算子
3.5、take 算子
3.6、collect 算子
3.7、reduce算子
3.8、top算子


一、算子

在 Apache Spark 中,算子(Operators)是指用于处置处罚和转换数据的各种操作。Spark 的核心概念之一是 RDD(弹性分布式数据集),而算子则是对 RDD 进行操作的方法。算子可以分为两大类:转换算子(Transformation)和动作算子(Action)。
这篇文章要说的是转换算子
二、转换算子(Transformations)

转换算子是对 RDD 进行转换,天生新的 RDD。转换算子是惰性的(lazy),即它们不会立即实验,而是在遇到动作算子时才会触发实验。
2.1、map算子

功能:对RDD中每个元素调用一次参数中的函数,并将每次调用的返回值直接放入一个新的RDD中
分类:转换算子
场景:一对一的转换,须要返回值
语法格式:
def map(self , f: T -> U ) -> RDD[U]
f:代表参数是一个函数
T:代表RDD中的每个元素
U:代表RDD中每个元素转换的结果
   list01 = [1,2,3,4,5,6]
listRdd = sc.parallelize(list01)
mapRdd = listRdd.map(lambda x: math.pow(x,3))
mapRdd.foreach(lambda x: print(x))
  2.2、flatMap算子 

功能:将两层嵌套集合中的每个元素取出,扁平化处置处罚,放入一层
集合中返回,类似于SQL中explode函数
分类:转换算子
场景:多层集合元素睁开,一个集合对应多个元素【一对多】
语法:
def flatMap(self , f : T -> Iterable[U]) -> RDD[U]
   夜曲/发如雪/东风破/七里香
十年/爱情转移/你的背包
日不落/舞娘/倒带
鼓楼/成都/吉姆餐厅/无法长大
玉轮之上/荷塘月色
  编写代码:
fileRdd = sc.textFile("../datas/a.txt",2)
flatRdd = fileRdd.flatMap(lambda line: line.split("/"))
flatRdd.foreach(lambda x: print(x))
  2.3、filter算子

功能:对RDD集合中的每个元素调用一次参数中的表达式对数据进行过滤,符合条件就保存,不符合就过滤
场景:行的过滤,类似于SQL中where大概having
 def filter(self, f: T -> bool ) -> RDD[T]
   1 周杰伦 0 夜曲/发如雪/东风破/七里香
2 陈奕迅 0 十年/爱情转移/你的背包
3 1 日不落/舞娘/倒带
4 赵雷 0 鼓楼/成都/吉姆餐厅/无法长大
5 凤凰传奇 -1 玉轮之上/荷塘月色
  代码演示:
fileRdd = sc.textFile("../datas/b.txt",2)
filterRdd = fileRdd.filter(lambda line: re.split(r"\s",line)[2] != '-1' and len(re.split("\\s",line)) == 4)  #每一行第三列不等于‘-1’的和不足4列的会被剔除
 filterRdd.foreach(lambda x: print(x))
  2.4、union算子

union算子
功能:实现两个RDD中数据的归并
分类:转换算子
语法:
def union(self,other:RDD[U]) -> RDD[T/U]
   list1 = [1, 2, 3, 4, 5, 6, 7, 8]
list2 = [5, 6, 7, 8, 9, 10]
rdd1 = sc.parallelize(list1,2)
rdd2 = sc.parallelize(list2,2)
rdd3 = rdd1.union(rdd2)
  rdd3.foreach(print)
  2.5、distinct算子

功能:实现对RDD元素的去重
分类:转换算子
语法:
def distinct(self) -> RDD[T]
   list1 = [1, 2, 3, 4, 5, 6, 7, 8]
list2 = [5, 6, 7, 8, 9, 10]
rdd1 = sc.parallelize(list1,2)
rdd2 = sc.parallelize(list2,2)
rdd3 = rdd1.union(rdd2)
rdd4 = rdd3.distinct()
rdd4.foreach(print)
  2.6、分组聚合算子

2.6.1groupByKey算子

xxxByKey算子,只有KV类型的RDD才气调用
功能:对KV类型的RDD按照Key进行分组,相同K的Value放入一 个集合列表中,返回一个新的RDD
语法:RDD【K,V】.groupByKey => RDD【K, List[V]】
分类:转换算子
场景:须要对数据进行分组的场景,大概说分组以后的聚合逻辑 比力复杂,不适合用reduce
特点:必须颠末Shuffle,可以指定新的RDD分区个数,可以指定分区规则
   rdd1 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
rdd2 = rdd1.groupByKey()  # ("word",List[10,5])
rdd2.foreach(lambda x: print(x[0], *x[1]))
  2.6.2reduceByKey算子

功能:对KV类型的RDD按照Key进行分组,并对相同Key的全部
Value使用参数中的reduce函数进行聚合
要求:只有KV类型的RDD才气调用
分类:转换算子
特点:必须颠末shuffle,可以指定新的RDD分区个数,可以指定分区规则
语法:
def reduceByKey(self,f: (T,T) ->T,numPartitions,partitionFunction) ->RDD[Tuple[K,V]]
   rdd1 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
  rdd2= rdd1.reduceByKey(lambda total,num: total * num)
 rdd2.foreach(print)
  留意:能用reduceByKey就不要用groupByKey+map
reduceByKey代码更简洁,而且性能会更好
2.7、排序算子

2.7.1sortBy算子

功能:对RDD中的全部元素进行整体排序,可以指定排序规则
【按照谁排序,升序大概降序】
分类:转换算子
场景:适用于全部对大数据排序的场景,一般用于对大数据量非KV类型的RDD的数据排序
特点:颠末Shuffle,可以指定排序后新RDD的分区个数,底层只能使用RangePartitioner来实现
def sortBy(self, keyFuncT) -> 0, asc: bool,numPartitions) -> RDD
keyFuncT) -> 0:用于指定按照数据中的哪个值进行排序
asc: bool:用于指定升序还是降序,默认是升序
   rdd1 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
rdd1.sortByKey(ascending=False).foreach(print)
  2.7.2sortByKey

功能:对RDD中的全部元素按照Key进行整体排序,可以指定排序规则
要求:只有KV类型的RDD才气调用
分类:转换算子【sortByKey会触发job的运行】
场景:适用于大数据量的KV类型的RDD按照Key排序的场景
特点:颠末Shuffle,可以指定排序后新RDD的分区个数
语法:def sortByKey(self, asc, numPartitions) -> RR[Tuple[K,V]]
   rdd1 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
rdd1.sortBy(lambda tuple:tuple[1],ascending=False).foreach(print) #如许会根据value排序
  2.8、重分区算子

 2.8.1repartition算子

功能:调整RDD的分区个数
分类:转换算子
场景:一般用于调大分区个数,必须颠末shuffle才气实现
语法:
def repartition(self,numPartitions) -> RDD[T]
   list01 = [1, 5, 2, 6, 9, 10, 4, 3, 8, 7]
    # 没有指定分区,走默认,默认分区个数,因为是local 模式,以是跟核数有关,以是 分区数为2
rdd = sc.parallelize(list01)
print(rdd.getNumPartitions()) # 2
# repartition 是一个转换算子,必然经历shuffle过程
bigrdd = rdd.repartition(4)
print(bigrdd.getNumPartitions()) # 4
  2.8.2coalesce算子 

功能:调整RDD的分区个数
分类:转换算子
特点:可以选择是否颠末Shuffle,默认环境下不颠末shuffle
def coalesce(self, numPartitions, shuffle:bool) -> RDD[T]
   bigbigrdd = bigrdd.coalesce(8,shuffle=True) # 8
 print(bigbigrdd.getNumPartitions())
  三、动作算子(Action)

动作算子用于触发 RDD 的计算,并返回结果或将其保存到外部存储系统。动作算子会立即实验全部之前界说的转换算子。
3.1count

count算子
功能:统计RDD集合中元素的个数,返回一个int值
分类:动作算子
场景:统计RDD的数据量,计算行数
语法:
def count(self) -> int
   data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)
count = flow.count()
print(f"RDD 中的元素数量: {count}")
  3.2、foreach算子

功能:对RDD中每个元素调用一次参数中的函数,没有返回值【与map场景上区别】
分类:触发算子
场景:对RDD中的每个元素进行输出大概保存,一般用于测试打印大概保存数据到第三方系统【数据库等】
   data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)
  # 过滤数据
# 1. 过滤掉手机号码不正确(长度不为11位)的数据
# 2. 过滤掉数据长度不等于11的数据
flow_filtered = flow.filter(lambda line: len(re.split(r"\s+", line)) == 11 and len(re.split(r"\s+", line)[1]) == 11)
  3.3、saveAsTextFile算子

功能:用于将RDD的数据保存到外部文件系统中
分类:触发算子
场景:保存RDD的计算的结果,一般用于将结果保存到HDFS
文件个数 = Task个数 = 分区个数
def saveAsTextFile(self , path ) -> None
   # 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)
  # 过滤数据
# 1. 过滤掉手机号码不正确(长度不为11位)的数据
# 2. 过滤掉数据长度不一致的数据
flow_filtered = flow.filter(lambda line: len(re.split(r"\s+", line)) == 11 and len(re.split(r"\s+", line)[1]) == 11)
  # 剖析数据,提取手机号码、上行流量和下行流量
flow_parsed = flow_filtered.map(lambda line: (re.split(r"\s+", line)[1], (int(re.split(r"\s+", line)[2]), int(re.split(r"\s+", line)[3]))))
  # 计算每个手机号码的总流量
total_traffic = flow_parsed.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
  # 提取总流量
total_traffic_result = total_traffic.mapValues(lambda v: v[0] + v[1])
  # 将结果保存为文本文件
output_path = "D:\\pythonCode\\PySpark\\data\\output\\total_traffic"
total_traffic_result.saveAsTextFile(output_path)
  3.4、first 算子

功能:返回RDD集合中的第一个元素【RDD有多个分区,返回的是第一个分区的第一个元素】
分类:触发算子
语法:def first(self) -> T
   # 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)
  # 获取 RDD 中的第一个元素
first_element = flow.first()
  # 打印结果
print(f"RDD 中的第一个元素: {first_element}")
  3.5、take 算子

功能:返回RDD集合中的前N个元素【先从第一个分区取,如果不够再从第二个分区取】
分类:触发算子
留意:take返回的结果放入Driver内存中的,take数据量不能过大
   # 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)
  # 获取 RDD 中的前 5 个元素
first_five_elements = flow.take(5)
  # 打印结果
print(f"RDD 中的前 5 个元素: {first_five_elements}")
  3.6、collect 算子

collect算子
功能:将RDD转化成一个列表返回
分类:触发算子
这个RDD的数据一定不能过大,如果RDD数据量很大,导致Driver内存溢出
理解:假如如今有三个分区,三个分区中都有数据,假如你如今想打印数据,此时打印哪个分区呢?先网络,将数据汇总在一起,再打印。
   # 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)
  # 获取 RDD 中的全部元素
all_elements = flow.collect()
  # 打印结果
print(f"RDD 中的全部元素: {all_elements}")
  3.7、reduce算子

功能:将RDD中的每个元素按照给定的聚合函数进行聚合,返回聚合的结果
分类:触发算子
# tmp用于存储每次计算临时结果,item就是RDD中的每个元素
def reduce(self,f : (T,T) -> U) -> U
   # 创建一个包罗整数的 RDD
numbers = sc.parallelize([1, 2, 3, 4, 5])
  # 使用 reduce 算子计算总和
total_sum = numbers.reduce(lambda a, b: a + b)
  # 打印结果
print(f"整数的总和: {total_sum}")
  3.8、top算子

功能:对RDD中的全部元素降序排序,并返回前N个元素,即返回RDD中最大的前N个元数据
分类:触发算子
场景:取RDD数据中的最大的TopN个元素
特点:不颠末Shuffle,将全部元素放入Driver内存中排序,性能更好,只能适合处置处罚小数据量
语法:def top(self,num) -> List[0]
   # 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)
  # 获取 RDD 中的前 5 个元素,按字符串长度降序排序
top_five_elements = flow.top(5, key=lambda x: len(x))
  # 打印结果
print(f"RDD 中的前 5 个元素(按字符串长度降序): {top_five_elements}")
  3.9、takeOrdered算子
功能:对RDD中的全部元素升序排序,并返回前N个元素,即返回RDD中最小的前N个元数据
分类:触发算子
场景:取RDD数据中的最小的TopN个元素
特点:不颠末Shuffle,将全部元素放入Driver内存中排序,只能
适合处置处罚小数据量
语法:def takeOrdered(self,num) -> List[0]
   # 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)
  # 获取 RDD 中的前 5 个元素,按字符串长度升序排序
ordered_five_elements = flow.takeOrdered(5, key=lambda x: len(x))
  # 打印结果
print(f"RDD 中的前 5 个元素(按字符串长度升序): {ordered_five_elements}")

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

小小小幸运

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表