Spark 基础
Spark 简朴先容
Why Spark
一、MapReduce编程模子的局限性
1、繁杂:只有Map和Reduce两个操作,复杂的逻辑须要大量的样板代码
2、处理效率低:
2.1、Map中间效果写磁盘,Reduce写HDFS,多个Map通过HDFS交换数据
2.2、任务调理与启动开销大
3、不适合迭代处理、交互式处理和流式处理
二、Spark是类Hadoop MapReduce的通用【并行】框架
1、Job中间输出效果可以保存在内存,不再须要读写HDFS
2、比MapReduce平均快10倍以上
三、版本
2014 1.0
2016 2.x
2020 3.x
四、上风
1、速度快
基于内存数据处理,比MR快100个数量级以上(逻辑回归算法测试)
基于硬盘数据处理,比MR快10个数量级以上
2、易用性
支持Java、【Scala】、【Python:pyspark】、R语言
交互式shell方便开发测试
3、通用性
一栈式办理方案:
批处理
交互式查询 spark sql
实时流处理(微批处理) spark streaming
图计算 spark graphic
呆板学习 spark mlib
4、多种运行模式
YARN,生产环境(ApplicationMaster)
Standalone,生产环境(Master/Worker)
Local
,本地模式【初学】√
Spark 技术栈
(图片侵权可删)
1、Spark Core:核心组件,分布式计算引擎 RDD
2、Spark SQL:高性能的基于Hadoop的SQL办理方案
3、Spark Streaming:可以实现高吞吐量、具备容错机制的准实时流处理系统
4、Spark GraphX:分布式图处理框架
5、Spark MLlib:构建在Spark上的分布式呆板学习库
spark-shell:Spark自带的交互式工具
local:spark-shell --master local
alone:spark-shell --master spark://MASTERHOST:7077
yarn :spark-shell --master yarn
Spark服务(服务上跑的是进程)
Master : Cluster Manager
Worker : Worker Node
Spark架构核心组件
Application :
建立在Spark上的用户程序,包罗Driver代码和运行在集群各节点Executor中的代码
Driver Program :
驱动程序,运行Application中的main函数并创建SparkContext
SparkContext :
Spark应用程序的入口,负责调理各个运算资源,协调各个Worker Node上的Executor
Cluster Manager :
在集群(Standalone、Mesos、YARN)上获取资源的外部服务
Worker Node :
集群中任何可以运行Application代码的节点,运行一个或多个Executor进程
Executor :
Application运行在Worker Node上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上
Task :
运行在Executor上的工作单元
Job :
多个Task构成的并行计算,由Action触发天生,一个Application中含多个Job
Stage :
每个Job会被拆分成多组Task,作为一个TaskSet,其名称为Stage
1、在驱动程序中,通过SparkContext主导应用的执行
2、SparkContext可以毗连差别类型的 CM(Standalone、YARN),毗连后,获得节点上的 Executor
3、一个节点默认一个Executor,可通过 SPARK_WORKER_INSTANCES 调解
4、每个应用获取自己的Executor
5、每个Task处理一个RDD分区
SparkContext
- /*
- 连接Driver与Spark Cluster(Workers)
- Spark的主入口
- 每个JVM仅能有一个活跃的SparkContext
- 【配置】
- master:
- local[*] : CPU核数为当前环境的最大值
- local[2] : CPU核数为2
- local : CPU核数为1
- yarn
- */
- val conf:SparkConf = new SparkConf()
- .setAppName(name:String)
- .set(key:String,value:String) // 多项设置
- .setMaster(master:String)
- val sc: SparkContext = SparkContext.getOrCreate(conf)
- /**
- 封装:工具类
- */
- class SparkCom(appName:String,master:String,logLevel:String="INFO") {
- private val conf:SparkConf = new SparkConf().setAppName(appName).setMaster(master)
- private var _sc:SparkContext = _
- private var _spark:SparkSession = _
- def sc() = {
- if (Objects.isNull(_sc)) {
- _sc = new SparkContext(conf)
- _sc.setLogLevel(logLevel)
- }
- _sc
- }
- def spark() = {
- if (Objects.isNull(_spark)) {
- _spark = SparkSession.builder().config(conf).getOrCreate()
- }
- _spark
- }
- def stop() = {
- if (Objects.nonNull(_sc)) {
- _sc.stop()
- }
- if (Objects.nonNull(_spark)) {
- _spark.stop()
- }
- }
- }
- object SparkCom{
- def apply(appName:String): SparkCom = new SparkCom(appName,"local[*]")
- def apply(appName:String, master:String): SparkCom = new SparkCom(appName,master)
- def apply(appName:String, master:String, logLevel:String): SparkCom = new SparkCom(appName,master,logLevel)
- }
复制代码 RDD
RDD的简朴先容
RDD:Spark核心,重要数据抽象
将数据项拆分为多个分区的集合,存储在集群的工作节点上的内存和【磁盘】
RDD是用于数据转换的接口
RDD指向了
或存储在(HIVE)HDFS、Cassandra、HBase等
或缓存(内存、内存+磁盘、仅磁盘等)
或在故障或缓存收回时重新计算其他RDD分区中的数据
RDD:弹性分布式数据集(Resilient Distributed Datasets)
抽象的:RDD并不存储真正的数据,只是【对数据和操作】的描述
分布式数据集:
RDD是只读的、分区记录的集合,每个分区分布在集群的差别节点上,每个任务处理一个分区,每个分区上都有compute函数,用来计算该分区中的数据
弹性:
RDD默认存放在内存中,当内存不足,Spark自动将RDD写入磁盘
容错性:
根据【数据血统】,可以自动从节点失败中恢复分区
RDD分区:Partition -> Partitioner -> Hash | Range …
分区是RDD被拆分并发送到节点的差别块之一
我们拥有的分区越多,得到的并行性就越强(RDD 是可以并行操作的元素的容错集合)
每个分区都是被分发到差别Worker Node的候选者
每个分区对应一个Task
获取RDD的方式
- /*
- 1.小型化数据集
- 2.读本地文件
- 3.读远程文件(HDFS)
- */
- // 集合创建:小数据集,可通过 numSlices 指定并行度(分区数)
- val rdd: RDD[T] = sc.parallelize(seq:Seq[T], numSlices:Int) // ✔
- val rdd: RDD[T] = sc.makeRDD(seq:Seq[T], numSlices:Int) // 调用了 parallelize
- // 外部数据源创建: 可通过 minPartitions 指定分区数,CPU核占用数
- // 文件系统:local(file:///...)或hadoop(hdfs://)
- val rdd: RDD[String] = sc.textFile(path:String, minPartitions:Int)【path】
- val rdd: RDD[String] = sc.wholeTextFiles(dir:String, minPartitions:Int)【dir】
- // 其他 RDD 创建
- val rdd2: RDD[Map[String, Int]] = rdd
- .mapPartitions(_
- .map(
- _
- .split("[^a-zA-Z]+")
- .map((_, 1))
- .groupBy(_._1)
- .map(t2 => (t2._1, t2._2.length))
- )
- )
复制代码 RDD操作类型
分为lazy与non-lazy两种
Transformation(lazy):也称转换操作、转换算子,返回一个新的RDD
Actions(non-lazy):立即执行,也称动作操作、动作算子,举措(为)算子,无返回值或者返回其他的
RDD中的全部转换都是惰性求值/延迟执行的,也就是说不会直接进行计算,只有当发生一个要求返回效果给 Driver 的 Action(举措算子) 动作时,这些转换才会真正进行。
之所以使用惰性求值/延迟执行,是因为这样可以在 Action 时对 RDD 操作形成 DAG 有向无环图进行 Stage 的分别和并行优化,这种设计让 Spark 更加有用率的进行。
RDD与DAG: Spark提供的核心抽象
DAG【有向无环图:如下图】反映了RDD之间的依赖关系
转换(transform)算子(窄依赖)==》stage1里面的相互平行
shuffle算子(宽依赖)==》stage2里面的相互交叉
action算子(行为算子)==》末了一个算子为行为算子,不到它不进行计算
(图片侵权可删)
数据预备
- // 数据类
- case class Customer(
- cus_id :Int,
- cus_fname :String,
- cus_lname :String,
- cus_email :String,
- cus_password :String,
- cus_street :String,
- cus_city :String,
- cus_state :String,
- cus_zipcode :String
- )
- case class OrderItem(
- ori_id :Int,
- ori_order_id :Int,
- ori_product_id :Int,
- ori_quantity :Int,
- ori_subtotal :Float,
- ori_product_price :Float
- )
- case class Order(
- or_id :Int,
- or_date :String,
- or_customer_id :Int,
- or_status :String
- )
- case class Product(
- pro_id :Int,
- pro_category_id :Int,
- pro_name :String,
- pro_description :String,
- pro_price :Float,
- pro_image :String
- )
- // 工具
- implicit class EntityUtil(line:String){
- val r_customer = ""(.*?)","(.*?)","(.*?)","(.*?)","(.*?)","(.*?)","(.*?)","(.*?)","(.*?)"".r
- val r_product = ""(.*?)","(.*?)","(.*?)","(.*?)","(.*?)","(.*?)"".r
- val r_order = ""(.*?)","(.*?)","(.*?)","(.*?)"".r
- val r_order_item = ""(.*?)","(.*?)","(.*?)","(.*?)","(.*?)","(.*?)"".r
- def toCustomer = line match {
- case r_customer(cust_id,fName,lName,email,password,street,city,state,zipcode)
- =>Customer(cust_id.toInt,fName,lName,email,password,street,city,state,zipcode)
- }
- def toProduct = line match {
- case r_product(prod_id,category_id,name,description,price,image)
- =>Product(prod_id.toInt,category_id.toInt,name,description,price.toFloat,image)
- }
- def toOrder = line match {
- case r_order(or_id,date,customer_id,status)
- =>Order(or_id.toInt,date,customer_id.toInt,status)
- }
- def toOrderItem = line match {
- case r_order_item(ori_id,order_id,product_id,quantity,subtotal,product_price)
- =>OrderItem(ori_id.toInt,order_id.toInt,product_id.toInt,quantity.toInt,subtotal.toFloat,product_price.toFloat)
- }
- }
复制代码 转换算子:RDD transform(窄依赖)
- /*
- 默认的并行度:200
- 控制并行度:
- 带(并行度) 的算子(执行完计算后对数据进行再分配)
- 并行度的体现方式:
- 分区数:numParitions|numSlices:Int 5[, fieldName]
- partitionIndex = fieldName.hashCode() % numParitions
- 扩展随机字段:0~numPartitions
- 分区器:partioner:Partioner (key)
- 默认的分区器:HashPartitioner
- 再分区算子(对数据进行重新分配)
- coalesce(numPartitions:Int, shuffle:Boolean)
- ✔ repartition(numPartitions:Int) => coalesce(numPartitions, true)
- 简单类型 RDD[T]
- val rddLine: RDD[String] = sc.textFile("/spark/practice_01/customers", 4)
- */
- // 【逐条处理】
- val rdd2: RDD[U] = rdd.map(f:T=>U)
- val rddAggByState: RDD[(String, Int)] = rddGroupByState.map(t => (t._1, t._2.size))
- // 【扁平化处理】:TraversableOnce : Trait用于遍历和处理集合类型元素,类似于java:Iterable
- val rdd2: RDD[U] = rdd.flatMap(f:T=>TraversableOnce[U]) //经典案例为:WordCount
- /* 【✔ 分区内逐行处理】:以分区为单位(分区不变)逐行处理数据
- map VS mapPartitions
- 1、数量:前者一进一出IO数量一致,后者多进多出IO数量不一定一致
- 2、性能:前者多分区逐条处理,后者各分区并行逐条处理更佳,常时间占用内存易导致OOM,内存小不推荐
- 3、类型:两者入口和出口类型都必须一致,后者进出都必须是迭代器
-
- // 推荐
- mapParitions(
- ≈ 子查询
- it.filter(...) <=> 谓词下缀
- )
-
- // 不推荐
- mapParitions(...)
- fielter(...) <=> where
- */
- val rdd2: RDD[U] = rdd.mapPartitions(f:Iterator[T]=>Iterator[U][,preservePar:Boolean])
- // 【分区内逐行处理】:以分区为单位(分区不变)逐行处理数据,并追加分区编号
- val rdd2: RDD[U] = rdd.mapPartitionsWithIndex(f:(Int,Iterator[T])=>Iterator[U][,preservePar:Boolean])
- val rddCust: RDD[Customer] = rddLine.mapPartitions(_.map(_.toCustomer))
- // 【转内存数组】:同分区的数据转为同类型的内存数组,分区不变 rdd:RDD[T]
- val rdd2: RDD[Array[T]] = rdd.glom();
- val rddCustGlom: RDD[Array[Customer]] = rddCust.glom()
- // 【× 数据过滤】:过滤规则 f:T=>Boolean,保留满足条件数据,分区不变,【数据可能倾斜skew】
- val rdd2: RDD[T] = rdd.filter(f:T=>Boolean)
- val rddFilter: RDD[Customer] = rddCust.filter(_.cus_state.matches("TX|CO|PR"))
- // 【数据分组】:同键同组同区,同区可多组;打乱shuffle,按f:T=>K规则,分区不变,【数据可能倾斜skew】
- val rdd2: RDD[(K,Iterable[T])] = rdd.groupBy(f:T=>K)
- val rdd2: RDD[(K,Iterable[T])] = rdd.groupBy(f:T=>K, partioner:Partitioner)
- val rdd2: RDD[(K,Iterable[T])] = rdd.groupBy(f:T=>K, numPartitions:Int)
- val rddGroupByCity: RDD[(String, Iterable[Customer])] = rddFilter.groupBy(_.cus_state)
- /* 【✔ 数据抽样】
- withReplacement:Boolean 是否有放回抽样
- fraction:Double 抽样率
- seed:Long 随机种子,默认为当前时间戳(一般缺省)
- 若数据为100条
- false, 0.4 => 抽样40%的数据,40条左右
- true, 0.4 => 每条数据被抽取的概率为40%
- */
- val rdd2: RDD[T] = rdd.sample(withReplacement:Boolean,fraction:Double,seed:Long)
- val rddSample: RDD[Customer] = rddCust.sample(true, 0.1, 1)
- // 【× 数据去重】:numPartitions:Int 设定去重后的分区数
- // 采用该方法去重,数据规模比较大的情况下,数据压力比较大,因为数据需要在不同的分区间比较
- // 一般采用分组的方式,将去重字段作为分组字段,在不同的分区内并行去重
- val rdd2: RDD[T] = rdd.distinct()
- val rdd2: RDD[T] = rdd.distinct(numPartitions:Int)(implicit order:Ording[T] = null)
- /* 【数据排序】
- 处理数据f:T=>K,升降序asc:Boolean,分区数numPartitions:Int
- 默认排序前后分区一致,【有shuffle】,除非重新设定 numPartitions
- 全局排序,多分区间交换数据,效率较低。优化见 PairRDD
- 若:K为基本类型,则无需提供第二参数列表中的隐式参数 ord: Ordering[K]
- 若:K为自定义类型,则必须提供第二参数
- */
- val rdd2: RDD[T] = rdd.sortBy(f:T=>K,asc:Boolean,numPartitions:Int)(implicit ord: Ordering[K], ctag: ClassTag[K])
- val rddSortBy: RDD[Customer] = rddCust.sortBy(c => c.cus_state, asc=true, np=3)
- /*
- 多个类型 RDD[T]:纵向
- 交并差操作:数据类型一致,根据元素 equals 认定是否相同
- 【自定义类型】:必须重写 equals 方法,因为默认等值判断 == 判断地址
- 拉链操作:要求分区数和分区内的数据量一致
- */
- // 【求交集】:重载可重设分区数numPartitions:Int,或定义分区规则par:Partitioner[T]
- val rdd2: RDD[T] = rdd.intersection(rdd3:RDD[T])
- val rdd2: RDD[T] = rdd.intersection(rdd3:RDD[T], numPartitions:Int)
- val rdd2: RDD[T] = rdd.intersection(rdd3:RDD[T], par:Partitioner[T])
- // 【求并集】:不去重
- val rdd2: RDD[T] = rdd.union(rdd3:RDD[T])
- // 【求差集】:重载可重设分区数numPartitions:Int,或定义分区规则par:Partitioner[T]
- val rdd2: RDD[T] = rdd.subtract(rdd3:RDD[T])
- val rdd2: RDD[T] = rdd.subtract(rdd3:RDD[T], numPartitions:Int)
- val rdd2: RDD[T] = rdd.subtract(rdd3:RDD[T], par:Partitioner[T])
- // 【拉链操作】
- val rdd2: RDD[(T,U)] = rdd.zip(rdd3:RDD[U])
- val rdd2: RDD[(T,Long)] = rdd.zipWithIndex()
- val rdd2: RDD[(T,Long)] = rdd.zipWithUniqueId()
- // 有三个重载:1+1,1+2,1+3
- val rdd2: RDD[V]=rdd.zipPartitions(rddA:RDD[A])(f:(Iterator[T],Iterator[A])=>Iterator[V])
- val rdd2: RDD[V]=rdd.zipPartitions(rddA:RDD[A],preserveParitioning:Boolean)(f:(Iterator[T],Iterator[A])=>Iterator[V])
复制代码 键值对算子:PairRDD(K,V)
- // 若再分区器和现有分区器相同,则不执行分区操作
- val rddPair: RDD[(String, Int)] = rddLine.mapPartitions(it => {
- it.map(line => {
- val c: Customer = line.toCustomer
- (c.cus_state, 1)
- })
- })
- // 按指定分区器执行再分区操作,根据 K 的自定义 Partitioner 进行,默认为 HashPartitioner
- val pairRdd2: RDD[(K,V)] = pairRdd.partitionBy(p:Partitioner)
- // 【按键排序】:
- /*
- 排序优化:
- 1、自定义分区规则,实现分区内排序
- org.apache.spark.Partitioner
- HashPartitioner
- RangePargitioner
- 2、自定义类型作为 K ,需要提供隐式 Ordering,才可以看到 sortByKey
- 3、使用 repartitionAndSortWithinPartitions(par:Partitioner)方法
- */
- val rddCI: RDD[(Customer, Int)] = rddCust
- .mapPartitions(_.map(c=>(c, 1)))
- .cache()
- // 【方法一】
- val rddCIOrder: RDD[(Customer, Int)] = rddCI
- .partitionBy(new CustPartitioner)
- .sortBy(_.cust_id)
- // 【方法二】
- val pairRdd2: RDD[(K,V)] = pairRdd.sortByKey(ascending:Boolean=true, numPartitions:Int)
- // 【方法二.一】
- implicit val orderCust:Ordering[Customer] = Ordering.by(_.cus_id)
- val rddCIOrder: RDD[(Customer, Int)] = rddCI.sortByKey(true)
- // 【方法二.二】
- case class Customer2(
- cus_id :Int,
- cus_fname :String,
- cus_lname :String,
- cus_email :String,
- cus_password :String,
- cus_street :String,
- cus_city :String,
- cus_state :String,
- cus_zipcode :String
- )
- extends Ordered[Customer2]
- with Serializable {
- override def compare(that: Customer2): Int =
- if(cus_state.compareTo(that.cus_state)==0) cus_id-that.cus_id
- else cus_state.compareTo(that.cus_state)
- }
- sc.textFile("D:\\projects\\0.2_phrase_hive&spark\\spark_first\\data\\retail_db\\customers.csv", 4)
- .mapPartitions(_.map(line=>(line.toCustomer2,1)))
- .sortByKey(false)
- .foreach(println)
- // 【方法三】:重新分区,并在当前分区内排序
- val pairRdd2: RDD[(K,V)] = pairRdd.repartitionAndSortWithinPartitions(par:Partitioner)
- class CustPartitioner extends Partitioner {
- // 分区数:Executor 数量
- override def numPartitions: Int = 3
- // 根据键获取分区号
- override def getPartition(key: Any): Int = key match {
- case Customer(_,_,_,_,_,_,_,state,_) => state match {
- case "Canada" => 0
- case "China" => 1
- case "United States" => 2
- }
- }
- }
- implicit val orderCust:Ordering[Customer] = Ordering.by(_.cus_id)
- val rddCIOrder: RDD[(Customer, Int)] = rddCI
- .repartitionAndSortWithinPartitions(new CustPartitioner)
- // reduceByKey + foldByKey + aggregateByKey 都调用 combineByKeyClassTag
- // 【✔ 按键聚合值】: combiner和reduce的值类型相同,计算规则相同
- // group by + combiner + reduce
- val pairRdd2:RDD[(K,V)] = pairRdd.reduceByKey(f:(V,V)=>V)
- val pairRdd2:RDD[(K,V)] = pairRdd.reduceByKey(f:(V,V)=>V, numPartitions:Int)
- val pairRdd2:RDD[(K,V)] = pairRdd.reduceByKey(partitioner:Partitioner, f:(V,V)=>V)
- val rddPair2: RDD[(String, Int)] = rddPair.reduceByKey(_+_)
- // 【按键聚合值】: combiner和reduce的值类型相同,计算规则相同,带初值
- val pairRdd2:RDD[(K,V)] = pairRdd.foldByKey(initV:V)(inParOp:(V,V)=>V)
- val pairRdd2:RDD[(K,V)] = pairRdd.foldByKey(initV:V,numPartitions:Int)(inParOp:(V,V)=>V)
- val pairRdd2:RDD[(K,V)] = pairRdd.foldByKey(initV:V,partitioner:Partitioner)(inParOp:(V,V)=>V)
- // 【✔ 按键分别执行分区内和分区间计算】: combiner和reduce的值类型可不同,计算规则可不同
- val pairRdd2:RDD[(K,U)] = pairRdd.aggregateByKey(initV:U)(inParOp:(U,V)=>U,betParOp:(U,U)=>U)
- val pairRdd2:RDD[(K,U)] = pairRdd.aggregateByKey(initV:U,numPartitions:Int)(inParOp:(U,V)=>U,betParOp:(U,U)=>U)
- val pairRdd2:RDD[(K,U)] = pairRdd.aggregateByKey(initV:U,partitioner:Partitioner)(inParOp:(U,V)=>U,betParOp:(U,U)=>U)
- val rddPair2: RDD[(String, Float)] = rddPair.aggregateByKey(0.0f)(_+_,_+_)
- // 【按键分别执行分区内和分区间计算】: combiner和reduce的值类型可不同,计算规则可不同
- val pairRdd2:RDD[(K,U)] = pairRdd.combineByKey(initV:V=>U,inParOp:(U,V)=>U,betParOp:(U,U)=>U)
- val pairRdd2:RDD[(K,U)] = pairRdd.combineByKey(initV:V=>U,inParOp:(U,V)=>U,betParOp:(U,U)=>U,numPartitions:Int)
- val pairRdd2:RDD[(K,U)] = pairRdd.combineByKey(initV:V=>U,inParOp:(U,V)=>U,betParOp:(U,U)=>U,partitioner:Partitioner,mapSideCombine:Boolean,serializer:Serializer)
- // 【✔ 按键分组】
- val pairRdd2: RDD[(K, Iterable[V])] = pairRdd.groupByKey()
- val pairRdd2: RDD[(K, Iterable[V])] = pairRdd.groupByKey(numPartitions:Int)
- val pairRdd2: RDD[(K, Iterable[V])] = pairRdd.groupByKey(partitioner:Partitioner)
- // 【多数据集分组】:1VN 同键同组,不同RDD值进入TupleN的不同Iterable
- -------------------------------------------------------------------------------
- val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.groupWith(otherA: RDD[(K,V1)])
- val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1],Iterable[V2])] = pairRdd.groupWith(otherA: RDD[(K,V1)],otherB: RDD[(K,V2)])
- val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1],Iterable[V2],Iterable[V3])] = pairRdd.groupWith(otherA: RDD[(K,V1)],otherB: RDD[(K,V2)],otherC: RDD[(K,V3)])
- -------------------------------------------------------------------------------
- // 重载 1+1 1+2 1+3,追加再分区操作
- val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.cogroup(otherA: RDD[(K,V1)])
- val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.cogroup(otherA: RDD[(K,V1)],numPartitions:Int)
- val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.cogroup(otherA: RDD[(K,V1)],partitioner:Partitioner)
- // 每国家有多少人?
- val rddAvgByState: RDD[(String, Float)] = rddLine.mapPartitions(it => {
- it.map(line => {
- val c: Customer = line.toCustomer
- (c.cus_state, 1)
- })
- })
- .reduceByKey(_ + _)
- // 每个国家平均每个城市有多少人?
- val rddAvgByState: RDD[(String, Float)] = rddLine.mapPartitions(it => {
- it.map(line => {
- val c: Customer = line.toCustomer
- (s"${c.cus_state}_${c.cus_city}", 1)
- })
- })
- .reduceByKey(_ + _)
- .mapPartitions(it => {
- it.map(t => {
- val ps: Array[String] = t._1.split("_")
- (ps(0), t._2)
- })
- })
- .groupByKey()
- .mapPartitions(_.map(t=>(t._1,t._2.sum.toFloat/t._2.size)))
- /*
- 【关联操作】:1V1 Shuffle ?
- 横向,根据键做关联
- 重载:numPartitions:Int 或 partitioner:Partitioner
- */
- val pairRdd: RDD[(K, (V, V1))] = pairRdd1.join(pairRdd3:RDD[(K,V1)])
- val pairRdd: RDD[(K, (V, Option[V1]))] = pairRdd1.leftOuterJoin(pairRdd3:RDD[(K,V1)])
- val pairRdd: RDD[(K, (Option[V]), V1)] = pairRdd1.rightOuterJoin(pairRdd3:RDD[(K,V1)])
- val pairRdd: RDD[(K, (Option[V]), Option[V1])] = pairRdd1.fullOuterJoin( pairRdd3:RDD[(K,V1)])
- val rddCus: RDD[Customer] =
- sc.textFile("D:\\projects\\0.2_phrase_hive&spark\\spark_first\\data\\retail_db\\customers.csv", 4)
- .mapPartitions(_.map(_.toCustomer)).cache()
- val rddOrder: RDD[(Int,Order)] =
- sc.textFile("D:\\projects\\0.2_phrase_hive&spark\\spark_first\\data\\retail_db\\orders.csv", 4)
- .mapPartitions(_.map(line=>{
- val order: Order = line.toOrder
- (order.or_customer_id,order)
- }))
- .cache()
- // 用户的订单数量
- val rddCusOrd: RDD[(Int, Int)] = rddCus.mapPartitions(
- _.map(c => (c.cus_id, c))
- ).leftOuterJoin(rddOrder)
- .mapPartitions(_.map(t => (t._1, if (t._2._2.isEmpty) 0 else 1)))
- .reduceByKey(_ + _)
- // 用户订单金额
- val rddOI: RDD[(Int,Float)] =
- sc.textFile("D:\\projects\\0.2_phrase_hive&spark\\spark_first\\data\\retail_db\\order_items", 4)
- .mapPartitions(_.map(line=>{
- val oi: OrderItem = line.toOrderItem
- (oi.ori_order_id,oi.ori_subtotal)
- }))
- .cache()
-
- val rddCusSum: RDD[(Int, Float)] = rddCus.mapPartitions(
- _.map(c => (c.cus_id, c))
- ).leftOuterJoin(rddOrder)
- .mapPartitions(_.map(t => (if (t._2._2.isEmpty) 0 else t._2._2.get.or_id, t._1)))
- .leftOuterJoin(rddOI)
- .mapPartitions(_.map(t => (t._2._1, if (t._2._2.isEmpty) 0 else t._2._2.get)))
- .reduceByKey(_ + _)
复制代码 举措算子:action
- /* 【返回】所有元素分别在分区间和分区内执行【聚集】操作的结果
- reduce & fold 分区内和分区间执行相同操作,且类型与元素类型一致
- aggregate 分区内和分区间执行不同操作,且类型与元素类型不一致
- */
- val rst:T = rdd.reduce(f:(T,T)=>T)
- val rst:T = rdd.fold(init:T)(f:(T,T)=>T)
- val rst:U = rdd.aggregate(init:U)(f:(U,T)=>U,f:(U,T)=>U)
- // 返回包含数据集中所有元素的数组
- val array:Array[T] = rdd.collect()
- // 返回数据集中元素数量
- val rst:Long = rdd.count()
- val rst:Map[K,Long] = pairRdd.countByKey()
- // 返回数据集中最大值
- val rst:T = rdd.max()
- // 返回数据集中最小值
- val rst:T = rdd.min()
- // 返回数据集中的第一个元素
- val rst:T = rdd.first()
- // 返回数据集中的前 num 个元素
- val array:Array[T] = rdd.take(num:Int)
- // 返回排序后数据集中的前 num 个元素
- val array:Array[T] = rdd.takeOrdered(num:Int)(implicit ord:Ordering[T])
- /* 持久化至文本文件,重载追加压缩功能
- import org.apache.hadoop.io.compress.{BZip2Codec, SnappyCodec}
- import io.airlift.compress.lzo.LzopCodec
- rdd.saveAsTextFile("out_path",classOf[BZip2Codec])
- */
- rdd.saveAsTextFile(path:String)
- rdd.saveAsTextFile(path:String,codec: Class[_ <: CompressionCodec])
- rdd.saveAsObjectFile(path:String)
- // 遍历迭代
- rdd.foreach(f:T=>Unit)
复制代码 经典案例(wordCount《==》单词计数功能)

- // 工具类的创建(SparkUtil,实现一劳永逸)
- package spark_util
- import org.apache.spark.sql.SparkSession
- import org.apache.spark.{SparkConf, SparkContext}
- class SparkUtil {
- /**
- * spark configuration:配置工具类
- * appName,master,logLevel,sparkHome
- */
- private var config:SparkConf = null
- /**
- * spark rdd:弹性分布式数据集(rdd算子)
- */
- private var sc:SparkContext = null
- /**
- * spark sql:结构化查询(sql算子,sql语句)
- */
- private var spark:SparkSession = null
- private def check(info:String,value:Any,regex:String=".*")={
- if(null==value){
- throw new RuntimeException(s"$info 空指针异常")
- }
- if(value.isInstanceOf[String] && value.toString.size==0){
- throw new RuntimeException(s"$info 空字符串异常")
- }
- if (null!=regex && !value.toString.matches(regex)) {
- throw new RuntimeException(s"$info 不符合正则规则 $regex 异常: ${value.toString}")
- }
- }
- def build()={
- config = new SparkConf()
- this
- }
- def appName(name:String) ={
- check("SparkConfig",config)
- check("appName",name,"[a-z]\\w+")
- config.setAppName(name)
- this
- }
- /**
- * @param master
- * 1、local:单线程
- * 2、local[*]:所有可用线程
- * 3、local[N]:N条线程
- * 4、spark://host:7077: standalone
- * 5、yarn-client:yarn模式,本地化运行
- * 6、yarn-cluster:yarn模式,ApplicationMaster
- */
- def master(master:String)={
- check("SparkConfig",config)
- check("appName",master,
- "local\\[(\\*|\\d+)]|spark://[a-z][a-z0-9]*:\\d+|yarn-(client|cluster)")
- config.setMaster(master)
- this
- }
- def set(item:String,value:String)={
- check("SparkConfig",config)
- check("ConfigItemName",item)
- check(item,value)
- config.set(item,value)
- }
- def sparkContext(level:String="INFO")={
- if(null==sc){
- check("SparkConfig",config)
- check("LogLevel",level.toUpperCase, "INFO|DEBUG|WARN|ERROR|FATAL")
- sc = new SparkContext(config)
- sc.setLogLevel(level)
- }
- sc
- }
- def sparkSession(hiveSupport:Boolean=false,level:String="INFO")={
- if(null==spark){
- check("SparkConfig",config)
- check("LogLevel",level.toUpperCase, "INFO|DEBUG|WARN|ERROR|FATAL")
- // 在创建 SparkSession 之前先创建 SparkContext
- spark ={
- if(hiveSupport)
- SparkSession
- .builder()
- .config(config)
- .enableHiveSupport()
- .getOrCreate()
- else
- SparkSession
- .builder()
- .config(config)
- .getOrCreate()
- }
- sc = spark.sparkContext
- }
- spark
- }
- def close={
- if (null != spark) {
- // SparkSession 和 SparkContext 并存
- // 关闭 SparkSession 同时自动关闭 SparkContext
- spark.stop()
- }else if(null != sc){
- sc.stop()
- }
- }
- }
- object SparkUtil{
- def apply(): SparkUtil = new SparkUtil()
- }
- // wordCount 具体实现代码( SparkRDD )
- // 读取一个文本文件,统计每个单词出现的次数,并将结果按单词出现次数降序排序
- package spark_rdd
- import org.apache.spark.{Partitioner, SparkContext}
- import spark_util.SparkUtil
- import spark_util.Data._
- object SparkRDD {
- def main(args: Array[String]): Unit = {
- val sparkUtil: SparkUtil = SparkUtil()
- .build()
- .appName("spark_rdd_01")
- .master("local[4]")
-
- val sc: SparkContext = sparkUtil.sparkContext(level = "ERROR")
-
- // 从指定路径读取文本文件story.txt,并设置并行度为2。这意味着Spark将尝试使用两个分区来处理文件
- sc.textFile("D:\\projects\\0.2_phrase_hive&spark\\spark_first\\data\\story.txt",2)
- // .mapPartitions(it=> { ... }):
- // 一个转换操作,mapPartitions允许你处理每个分区的数据。参数it是一个Iterator[String],表示每个分区中的行
- .mapPartitions(it=>{
- // it.flatMap(_.trim.replaceAll("[^a-zA-Z ]+","").split("\\s+").map((_,1)):
- /**
- flatMap:将每个元素转换为多个元素
- trim:去除每行的前后空白字符
- replaceAll("[^a-zA-Z ]+",""):删除行中除了字母和空格之外的所有字符
- split("\\s+"):按空白字符分割字符串,生成单词数组
- map((_,1)):将每个单词与常量1配对,形成一个二元组
- */
- it.flatMap(
- _.trim
- .replaceAll("[^a-zA-Z ]+","")
- .split("\\s+")
- .map((_,1))
- ).toArray // 将 Iterator 转换为数组
- // 分区内聚合 : Combiner
- .groupBy(_._1) // 按二元组的第一个元素进行分组
- .mapValues(_.size) // 计算每个单词出现的次数
- .toIterator // 奖结果转换回 Iterator
- }).groupBy(_._1,4) // 再次按单词进行分组,这次指定了4个分区。这有助于在后续步骤中减少数据的shuffle
- // 再次使用mapPartitions来处理每个分区的数据
- .mapPartitions(it=>{
- it.map(tp2=>{
- val word: String = tp2._1
- // Reducer : 不同分区同一个单词出现的次数,将不同分区同一个单词的次数求和
- val count: Int = tp2._2 // Iterator[(String,Int)]
- .map(_._2) // Iterator[Int]
- .sum
- (word,count)
- }).toArray
- .sortWith(_._2>_._2) // (a:(String,Int),b:(String,Int))=>{...} 按单词出现的次数进行降序排序
- .toIterator
- }).sortBy(_._2,false)
- .saveAsTextFile("file:///D:\\projects\\0.2_phrase_hive&spark\\spark_first\\data\\result\\wc")
- }
- }
复制代码 RDD专项练习
- /*
- 现有客户信息文件 customers.csv,请找出:
- 客户中的前5个最大家族
- 客户中的前10个最流行的名字
- */
- /*
- 现有客户信息文件 scores.txt,请找出:
- 班级 ID 姓名 年龄 性别 科目 成绩
- 需求如下:
- 1. 一共有多少人参加考试?
- 1.1 一共有多少个小于 20 岁的人参加考试?
- 1.2 一共有多少个等于 20 岁的人参加考试?
- 1.3 一共有多少个大于 20 岁的人参加考试?
- 2. 一共有多个男生参加考试?
- 2.1 一共有多少个女生参加考试?
- 3. 12 班有多少人参加考试?
- 3.1 13 班有多少人参加考试?
- 4. 语文科目的平均成绩是多少?
- 4.1 数学科目的平均成绩是多少?
- 4.2 英语科目的平均成绩是多少?
- 5. 单个人平均成绩是多少?
- 6. 12 班平均成绩是多少?
- 6.1 12 班男生平均总成绩是多少?
- 6.2 12 班女生平均总成绩是多少?
- 6.3 同理求 13 班相关成绩
- 7. 全校语文成绩最高分是多少?
- 7.1 12 班语文成绩最低分是多少?
- 7.2 13 班数学最高成绩是多少?
- 8. 总成绩大于 150 分的 12 班的女生有几个?
- */
- package spark_rdd2
- import org.apache.spark.rdd.RDD
- import org.apache.spark.util.LongAccumulator
- import org.apache.spark.{SparkConf, SparkContext}
- object SparkRDD {
- // 样例类参与 RDD 运算不能写在 main 中,否则报错:序列化异常
- case class Score(classId:Int,name:String,age:Int,gender:String,subject:String,score:Int,_type:String)
- def main(args: Array[String]): Unit = {
- val conf: SparkConf = new SparkConf()
- .setAppName("spark")
- .setMaster("local[4]")
- val sc: SparkContext = SparkContext.getOrCreate(conf)
- // 设置检查点路径
- sc.setCheckpointDir("hdfs://single01:9000/spark/checkpoint")
- val map = Map(
- "GT20"->"20岁以上",
- "EQ20"-> "20岁",
- "LT20"-> "20岁以下"
- )
- // val mapBroad: Broadcast[Map[String, String]] = sc.broadcast(map)
- val maleCount: LongAccumulator = sc.longAccumulator("maleCount")
- val path = "file:///D:\\projects\\0.2_phrase_hive&spark\\spark_first\\data\\scores.txt"
- val scores: RDD[Score] = sc
- .textFile(path, 3)
- .mapPartitions(
- _.drop(1)
- .map(line=>{
- val a: Array[String] = line.split("\\s+")
- val age: Int = a(2).toInt
- val _type = age match {
- case age if (age > 20) => map.get("GT20").get
- case age if (age == 20) => map.get("EQ20").get
- case _ => map.get("LT20").get
- }
- if(a(3).equals("男")){
- maleCount.add(1)
- }
- Score(a(0).toInt,a(1),age,a(3),a(4),a(5).toInt,_type)
- })
- )
- .cache()
- scores.foreach(println)
- println(maleCount.value)
- /**
- * mapPartitions接收一个函数,该函数将每个分区中的迭代器转换为另一个迭代器(出入相同,都为Iterator)
- * groupByKey方法的作用是将具有相同键的所有值聚集在一起
- * reduceByKey方法用于合并具有相同键的所有值
- */
- // 一共有多少人参加考试(小于20,等于20,大于20)
- val personCount: Long = scores
- .mapPartitions(_.map(_.name))
- .distinct()
- .count()
- println(s"$personCount 人考试")
- scores
- .mapPartitions(_.map(s=>(s._type,1)))
- .reduceByKey(_+_)
- .foreach(println)
- // 一共有多少男生(女生)参加考试
- val genderCount: RDD[(String, Int)] = scores
- .mapPartitions(_.map(s => (s.gender, 1)))
- .reduceByKey(_ + _)
- genderCount.checkpoint()
- genderCount.foreach(println)
- // 12班(13班)一共有多少人参加考试
- scores
- .mapPartitions(_.map(s=>((s.classId,s.name),1)))
- .groupByKey()
- .mapPartitions(_.map(t=>(t._1._1,1)))
- .reduceByKey(_+_)
- .foreach(println)
- scores
- .mapPartitions(_.map(t=>(t.classId,t.name)).toSet[(Int,String)].toIterator.map(t=>(t._1,1)))
- .reduceByKey(_+_)
- .foreach(println)
- // 语数外各科目的平均成绩
- scores
- .mapPartitions(_.map({
- case s if s.subject matches("chinese|math|english") => (s.subject,s.score)
- }))
- .groupByKey()
- .mapPartitions(_.map(t=>(t._1,t._2.sum*1.0f/t._2.size)))
- .reduceByKey(_+_)
- .foreach(println)
- // 单个人的平均成绩
- scores
- .mapPartitions(_.map(t=>(t.name,t.score)))
- .groupByKey()
- .mapPartitions(_.map(t=>(t._1,t._2.sum*1.0f/t._1.size)))
- .reduceByKey(_+_)
- .foreach(println)
- // 12班(13)平均总成绩
- scores
- .mapPartitions(_.map(t=>(t.classId,t.score)))
- .groupByKey()
- .mapPartitions(_.map(t=>(t._1,t._2.sum*1.0f/t._2.size)))
- .reduceByKey(_+_)
- .foreach(println)
- // 12班(13班)男生(女生)平均总成绩
- scores
- .mapPartitions(_.map(t=>((t.classId,t.gender),t.score)))
- .groupByKey()
- .mapPartitions(_.map(t=>(t._1,t._2.sum*1.0f/t._2.size)))
- .reduceByKey(_+_)
- .foreach(println)
- // 全校语文成绩最高分
- val maxScore: Int = scores
- .mapPartitions(_.filter(_.subject.equals("chinese")).map(_.score))
- .max()
- println(s"全校语文成绩最高分为 $maxScore")
- // 12 班语文成绩最高分
- val chineseScore: RDD[(Int, Int)] = scores
- .mapPartitions(_.filter(_.subject == "chinese").map(t => (t.classId, t.score)))
- val chineseMaxScore: Int = chineseScore.mapPartitions(_.filter(_._1 == 12).map(_._2)).max()
- println(s"12 班语文成绩最高分为 $chineseMaxScore")
- // 13 班数学成绩最高分
- val mathScore: RDD[(Int, Int)] = scores
- .mapPartitions(_.filter(_.subject == "math").map(t => (t.classId, t.score)))
- val mathMaxScore: Int = mathScore.mapPartitions(_.filter(_._1 == 13).map(_._2)).max()
- println(s"13 班数学成绩最高分为 $mathMaxScore")
- // 总成绩大于 150 分 的 12 班的女生有几个
- // 方法一
- val BigScores: RDD[((Int, String), Iterable[Int])] = scores.mapPartitions(_.map(t => ((t.classId, t.gender), t.score)))
- .groupByKey()
- .mapPartitions(_.map({
- case t if t._2.sum > 150 => (t._1, t._2)
- }))
- val BigGenderScore: RDD[(Int, Iterable[Int])] = BigScores.mapPartitions(_.filter(_._1._2 == "女")).map(t => (t._1._1, t._2))
- val resultScore: Long = BigGenderScore.mapPartitions(_.filter(_._1 == 12)).map(_._2).count()
- println(s"总成绩大于 150 分的 12 班的女生有 $resultScore 个")
- val numSumScore12Gt150: Long = scores
- .filter(score => score.classId == 12 && score.gender.equals("女"))
- .mapPartitions(_.map(score => (score.name, score.score)))
- .reduceByKey(_+_)
- .filter(_._2 > 150)
- .count()
- sc.stop()
- }
- }
复制代码 优化:optimize
- org.apache.spark.util.Utils
-
- /*
- shuffle性能较差:因为shuffle必须落盘,内存中等数据会OOM
- groupByKey只分组(存在Shuffle) + reduce只聚合
- <=结果同,性能不同=>
- reduceByKey先分组、预聚合、再聚合(存在Shuffle) ✔
- */
- /*
- 【设置日志管理】
- 日志级别:INFO|DEGUG|WARN|ERROR|FATAL
- */
- sc.setLogLevel(logLevel:String)
- /*
- 【设置检查点:容错,恢复】
- */
- sc.setCheckpointDir(path:String)
- /*
- 【RDD重用:检查点、缓存与持久化】
- cache 临时存储于【内存】重用,job结束后自动删除 ✔
- cache 相当于 persist(StorageLevel.MEMORY_ONLY)
- persisit 临时存储于【磁盘】重用,job结束后自动删除,涉及IO性能较差
- StorageLevel.MEMORY_ONLY
- StorageLevel.DISK_ONLY
- StorageLevel.OFF_HEAP
- StorageLevel.MEMORY_AND_DISK
- StorageLevel.MEMORY_AND_DISK_SER
- StorageLevel.MEMORY_AND_DISK_SER_2
- checkpoint 长久存储于【磁盘】重用,job结束后不会删除,涉及IO性能较差,安全且一般和cache组合使用
- */
- val rddCache: RDD[T] = rdd.cache()
- val rddCache: RDD[T] = rdd.persist(level:StorageLevel)
- rdd.checkpoint()
- /*
- 【广播变量】(分布式只读变量):broadcast
- 作用:
- 广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本
- 将数据集或配置广播到每个Executor以readonly方式存在,不会在Task之间传输
- 优势:
- 如果Executor端需要访问Driver端的某个变量,spark会向Executor端每个task都发送一个此变量的副本,
- 如果此变量很大,就会占用大量的Executor节点的内存,
- 利用广播变量,spark只会给一个Executor节点发送一个变量
- */
- val bc:BroadCast[T] = sc.broadcast(value:T)
- rdd.mapPartitions(itPar=>{
- val v:T = bc.value
- ...
- })
- /*
- 累加器:accumulate:只能 add 操作,常用于计数(累加器在Driver定义初始化,在Excutor端更新)
- 1、定义在Driver端的一个变量,Cluster中每一个Task都会有一份Copy
- 2、所有的Task都计算完成后,将所有Task中的Copy合并到驱动程序中的变量中
- 非累加器:在所有Task中的都会是独立Copy,不会有合并
- 自定义累加器:写一个类继承 AccumulatorV2[IN, OUT]
- abstract class AccumulatorV2[IN, OUT] extends Serializable {
- // Returns if this accumulator is zero value or not
- def isZero: Boolean
- // Creates a new copy of this accumulator, which is zero value
- def copyAndReset(): AccumulatorV2[IN, OUT] = {...}
- // Creates a new copy of this accumulator.
- def copy(): AccumulatorV2[IN, OUT]
- // Resets this accumulator, which is zero value.
- def reset(): Unit
- // 添加:Takes the inputs and accumulates.
- def add(v: IN): Unit
- // 合并:Merges another same-type accumulator and update its state.
- def merge(other: AccumulatorV2[IN, OUT]): Unit
- // 值列表:Defines the current value of this accumulator
- def value: OUT
- }
- */
- val accLong: LongAccumulator = sc.longAccumulator("longAcc")
- val accDouble: DoubleAccumulator = sc.doubleAccumulator("doubleAcc")
- rdd.mapPartitions(itPar=>{
- ...
- accLong.add(v:Long)
- accDouble.add(v:Double)
- ...
- })
- accXxx.reset()
- val isZero:Boolean = accXxx.isZero
- val num:Long|Double = accXxx.value|sum|count|avg
- /*
- 【分区控制】
- 【缩减分区节省资源】 或 【扩大分区提高并行度】
- coalesce(numPartitions:Int, shuffle:Boolean):
- 缩小分区
- 存在过多的小任务的时候收缩合并分区,减少分区的个数,减少任务调度成本
- 默认情况下,不会对数据重组,比如:3个合成2个,采用 {1+2},{3},容易导致数据倾斜
- 若需数据均衡,则将 shuffle 参数设置为 true 即可
- 扩大分区
- 若需要扩大分区,shuffle 参数必须设置为 true
- 若将2个分区拆分成3个,必须打乱重新分区,否则数据还是在两个分区,{1},{2},{空}
- repartition(numPartitions:Int) <=> coalesce(numPartitions,true)
- */
- val rdd: RDD[String] = rdd.coalesce(numPartitions:Int, shuffle:Boolean)
- val rdd: RDD[String] = rdd.repartition(numPartitions:Int) // ✔
复制代码 阶段分别 DAG
- /*
- 【为什么要划分阶段】
- 1、基于数据的分区,本着传递计算的性能远高于传递数据,所以数据本地化是提升性能的重要途径之一
- 2、一组串行的算子,无需 Shuffle,基于数据本地化可以作为一个独立的阶段连续执行
- 3、经过一组串行算子计算,遇到 Shuffle 操作,默认情况下 Shuffle 不会改变分区数量,但会因为numPartitions:Int, partitioner:Partitioner 等参数重新分配,过程数据会【写盘供子RDD拉取(类MapReduce)】
- */
- /*
- Driver程序提交后
- 1、Spark调度器将所有的RDD看成是一个Stage
- 2、然后对此Stage进行逆向回溯,遇到Shuffle就断开,形成一个新的Stage
- 3、遇到窄依赖,则归并到同一个Stage(TaskSet)
- 4、等到所有的步骤回溯完成,便生成一个DAG图
- RDD依赖关系
- Lineage:血统、遗传
- RDD最重要的特性之一,保存了RDD的依赖关系
- RDD实现了基于Lineage的容错机制
- 依赖关系 org.apache.spark.Dependency
- 窄依赖 NarrowDependency
- 1V1 OneToOneDependency
- 1VN RangeDependency
- 宽依赖 ShuffleDependency
- 当RDD分区丢失时
- Spark会对数据进行重新计算,对于窄依赖只需重新计算一次子RDD的父RDD分区
- 若配合持久化更佳:cache,persist,checkpoint
- */
复制代码 算子宽窄依赖分别
- // 窄依赖
- rdd.dependencies
- map
- flatMap
- mapPartitions ✔
- mapPartitionsWithIndex
- glom
- filter
- distinct
- intersection ✔
- sample ✔
- union ✔
- subtract ✔
- zip...
- cogroup
- // 宽依赖
- ShuffledRDD extends RDD
- sortBy
- sortByKey
- partitionBy
- repartition
- // 不一定
- /*
- reduceByKey(【partitioner: Partitioner】, func: (V, V) => V)
- 若使用的是带 partitioner 的重载且 Partitioner 和父RDD的 Partitioner一致
- 则为窄依赖RDD,否则为宽依赖ShuffledRDD
- */
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |