守听 发表于 2025-4-14 23:11:56

【无标题】spark编程

Value类型:
9) distinct
➢ 函数署名
def distinct()(implicit ord: Ordering = null): RDD
def distinct(numPartitions: Int)(implicit ord: Ordering = null): RDD
➢ 函数阐明
将数据会合重复的数据去重
 
val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
))
val dataRDD1 = dataRDD.distinct()
val dataRDD2 = dataRDD.distinct(2)
 
10) coalesce
➢ 函数署名
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option = Option.empty) 
(implicit ord: Ordering = null)
: RDD
➢ 函数阐明
根据数据量缩减分区,用于大数据集过滤后,进步小数据集的实行服从
当 spark 步伐中,存在过多的小任务的时候,可以通过 coalesce 方法,紧缩合并分区,减少分区的个数,减小任务调度成本
val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
),6)
val dataRDD1 = dataRDD.coalesce(2)
 
11) repartition
➢ 函数署名
def repartition(numPartitions: Int)(implicit ord: Ordering = null): RDD
➢ 函数阐明
该操作内部实在实行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的 RDD,照旧将分区数少的 RDD 转换为分区数多的 RDD,repartition操作都可以完成,因为无论怎样都会经 shuffle 过程。
val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.repartition(4)
 
12) sortBy
➢ 函数署名
def sortBy(
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length) 
(implicit ord: Ordering, ctag: ClassTag): RDD
➢ 函数阐明
该操作用于排序数据。在排序之前,可以将数据通过f 函数举行处理,之后按照 f 函数处理的效果举行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程。
val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.sortBy(num=>num, false, 4)
val dataRDD2 = dataRDD.sortBy(num=>num, true, 4)
 
 
双Value类型:
13) intersection
➢ 函数署名
def intersection(other: RDD): RDD
➢ 函数阐明
对源 RDD 和参数 RDD 求交集后返回一个新的 RDD
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.intersection(dataRDD2)
 
14) union
➢ 函数署名
def union(other: RDD): RDD
➢ 函数阐明
对源 RDD 和参数 RDD 求并集后返回一个新的 RDD(重复数据不会去重)
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.union(dataRDD2)
 
15) subtract
➢ 函数署名
def subtract(other: RDD): RDD
➢ 函数阐明
以源 RDD 元素为主,去除两个 RDD 中重复元素,将源RDD的其他元素保留下来。(求差集)
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.subtract(dataRDD2)
 
16) zip
➢ 函数署名
def zip(other: RDD): RDD[(T, U)]
➢ 函数阐明
将两个 RDD 中的元素,以键值对的情势举行合并。其中,键值对中的 Key 为第 1 个 RDD
中的元素,Value 为第 2 个 RDD 中的相同位置的元素。
val dataRDD1 = sparkContext.makeRDD(List("a","b","c","d"))
val dataRDD2 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD = dataRDD1.zip(dataRDD2)
flatMap
➢ 函数署名
def flatMap(f: T => TraversableOnce): RDD
➢ 函数阐明
将处理的数据举行扁平化后再举行映射处理,所以算子也称之为扁平映射。
val dataRDD = sparkContext.makeRDD(List(
 List(1,2),List(3,4)
),1)
val dataRDD1 = dataRDD.flatMap(
 list => list
)
 
map和flatMap的区别:
 
map会将每一条输入数据映射为一个新对象。
 
flatMap包含两个操作:会将每一个输入对象输入映射为一个新集合,然后把这些新集合连成一个大集合。
partitionBy
➢ 函数署名
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
➢ 函数阐明
将数据按照指定 Partitioner 重新举行分区。Spark 默认的分区器是 HashPartitioner
val rdd: RDD[(Int, String)] =
 sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)

val rdd2: RDD[(Int, String)] =
 rdd.partitionBy(new HashPartitioner(2))
函数阐明
将数据源的数据根据 key 对 value 举行分组
val dataRDD1 =
 sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val dataRDD2 = dataRDD1.groupByKey()
val dataRDD3 = dataRDD1.groupByKey(2)
val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))
可以将数据按照相同的 Key 对 Value 举行聚合
val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val dataRDD2 = dataRDD1.reduceByKey(_+_)
val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)
将数据根据不同的规则举行分区内计算和分区间计算val dataRDD1 =
 sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val dataRDD2 =
 dataRDD1.aggregateByKey(0)(_+_,_+_)
val dataRDD1 =
 sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val dataRDD2 = dataRDD1.foldByKey(0)(_+_)
现有数据 List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),求每个key的总值及每个key对应键值对的个数
val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93),("a", 95), ("b", 98))
val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
val combineRDD: RDD[(String, (Int, Int))] = input.combineByKey(
 (_, 1), //a=>(a,1)
 (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), //acc_1为数据源的value,acc_2为key出现的次数,二者举行分区内部的计算
 (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) //将分区内部计算的效果举行分区间的汇总计算,得到每个key的总值以及每个key出现的次数
)
在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 举行排序
val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
val sortRDD2: RDD[(String, Int)] = dataRDD1.sortByKey(false)
 
 
 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 【无标题】spark编程