Spark的一些重要概念

打印 上一主题 下一主题

主题 996|帖子 996|积分 2988

Shuffle的深入理解

什么是Shuffle,本意为洗牌,在数据处理领域里面,意为将数打散。
问题:shuffle一定有网络传输吗?有网络传输的一定是Shuffle吗?
Shuffle的概念

通过网络将数据传输到多台机器,数据被打散,但是有网络传输,不一定就有shuffle,Shuffle的功能是将具有相同规律的数据按照指定的分区器的分区规则,通过网络,传输到指定的机器的一个分区中,需要注意的是,不是上游的Task发送给下游的Task,而是下游的Task到上游拉取数据。

reduceByKey一定会Shuffle吗

不一定,如果一个RDD事先使用了HashPartitioner分区先进行分区,然后再调用reduceByKey方法,使用的也是HashPartitioner,并且没有改变分区数量,调用redcueByKey就不shuffle
如果自定义分区器,多次使用自定义的分区器,并且没有改变分区的数量,为了减少shuffle的次数,提高计算效率,需要重新自定义分区器的equals方法
例如:
  1. //创建RDD,并没有立即读取数据,而是触发Action才会读取数据
  2. val lines = sc.textFile("hdfs://node-1.51doit.cn:9000/words")
  3. val wordAndOne = lines.flatMap(_.split(" ")).map((_, 1))
  4. //先使用HashPartitioner进行partitionBy
  5. val partitioner = new HashPartitioner(wordAndOne.partitions.length)
  6. val partitioned = wordAndOne.partitionBy(partitioner)
  7. //然后再调用reduceByKey
  8. val reduced: RDD[(String, Int)] = partitioned.reduceByKey(_ + _)
  9. reduced.saveAsTextFile("hdfs://node-1.51doit.cn:9000/out-36-82")
复制代码

join一定会Shuffle吗

不一定,join一般情况会shuffle,但是如果两个要join的rdd实现都使用相同的分区去进行分区了,并且join时,依然使用相同类型的分区器,并且没有改变分区数据,那么不shuffle
  1. //通过并行化的方式创建一个RDD
  2. val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
  3. //通过并行化的方式再创建一个RDD
  4. val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
  5. //该join一定有shuffle,并且是3个Stage
  6. val rdd3: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
  7. val rdd11 = rdd1.groupByKey()
  8. val rdd22 = rdd2.groupByKey()
  9. //下面的join,没有shuffle
  10. val rdd33 = rdd11.join(rdd22)
  11. rdd33.saveAsTextFile("hdfs://node-1.51doit.cn:9000/out-36-86")
复制代码

shuffle数据的复用

spark在shuffle时,会应用分区器,当读取达到一定大小或整个分区的数据被处理完,会将数据溢写磁盘磁盘(数据文件和索引文件),溢写持磁盘的数据,会保存在Executor所在机器的本地磁盘(默认是保存在/temp目录,也可以配置到其他目录),只要application一直运行,shuffle的中间结果数据就会被保存。如果以后再次触发Action,使用到了以前shuffle的中间结果,那么就不会从源头重新计算而是,而是复用shuffle中间结果,所有说,shuffle是一种特殊的persist,以后再次触发Action,就会跳过前面的Stage,直接读取shuffle的数据,这样可以提高程序的执行效率。
广播变量

广播变量的使用场景

在很多计算场景,经常会遇到两个RDD进行JOIN,如果一个RDD对应的数据比较大,一个RDD对应的数据比较小,如果使用JOIN,那么会shuffle,导致效率变低。广播变量就是将相对较小的数据,先收集到Driver,然后再通过网络广播到属于该Application对应的每个Executor中,以后处理大量数据对应的RDD关联数据,就不用shuffle了,而是直接在内存中关联已经广播好的数据,即通实现mapside join,可以将Driver端的数据广播到属于该application的Executor,然后通过Driver广播变量返回的引用,获取实现广播到Executor的数据
广播变量的特点:广播出去的数据就无法在改变了,在没有Executor中是只读的操作,在每个Executor中,多个Task使用一份广播变量

广播变量的实现原理

广播变量是通过BT的方式广播的(TorrentBroadcast),多个Executor可以相互传递数据,可以提高效率
sc.broadcast这个方法是阻塞的(同步的)
广播变量一但广播出去就不能改变,为了以后可以定期的改变要关联的数据,可以定义一个object[单例对象],在函数内使用,并且加一个定时器,然后定期更新数据
广播到Executor的数据,可以在Driver获取到引用,然后这个引用会伴随着每一个Task发送到Executor,然后通过这个引用,获取到事先广播好的数据
序列化问题

序列化问题的场景

spark任务在执行过程中,由于编写的程序不当,任务在执行时,会出序列化问题,通常有以下两种情况,
•        封装数据的Bean没有实现序列化接口(Task已经生成了),在ShuffleWirte之前要将数据溢写磁盘,会抛出异常
•        函数闭包问题,即函数的内部,使用到了外部没有实现序列化的引用(Task没有生成)
数据Bean未实现序列化接口

spark在运算过程中,由于很多场景必须要shuffle,即向数据溢写磁盘并且在网络间进行传输,但是由于封装数据的Bean没有实现序列化接口,就会导致出现序列化的错误!
  1. object C02_CustomSort {
  2.   def main(args: Array[String]): Unit = {
  3.     val sc = SparkUtil.getContext(this.getClass.getSimpleName, true)
  4.     //使用并行化的方式创建RDD
  5.     val lines = sc.parallelize(
  6.       List(
  7.         "laoduan,38,99.99",
  8.         "nianhang,33,99.99",
  9.         "laozhao,18,9999.99"
  10.       )
  11.     )
  12.     val tfBoy: RDD[Boy] = lines.map(line => {
  13.       val fields = line.split(",")
  14.       val name = fields(0)
  15.       val age = fields(1).toInt
  16.       val fv = fields(2).toDouble
  17.       new Boy(name, age, fv) //将数据封装到一个普通的class中
  18.     })
  19.     implicit val ord = new Ordering[Boy] {
  20.       override def compare(x: Boy, y: Boy): Int = {
  21.         if (x.fv == y.fv) {
  22.           x.age - y.age
  23.         } else {
  24.           java.lang.Double.compare(y.fv, x.fv)
  25.         }
  26.       }
  27.     }
  28.     //sortBy会产生shuffle,如果Boy没有实现序列化接口,Shuffle时会报错
  29.     val sorted: RDD[Boy] = tfBoy.sortBy(bean => bean)
  30.     val res = sorted.collect()
  31.     println(res.toBuffer)
  32.   }
  33. }
  34. //如果以后定义bean,建议使用case class
  35. class Boy(val name: String, var age: Int, var fv: Double)  //extends Serializable
  36. {
  37.   
  38.   override def toString = s"Boy($name, $age, $fv)"
  39. }
复制代码
函数闭包问题

闭包的现象

在调用RDD的Transformation和Action时,可能会传入自定义的函数,如果函数内部使用到了外部未被序列化的引用,就会报Task无法序列化的错误。原因是spark的Task是在Driver端生成的,并且需要通过网络传输到Executor中,Task本身实现了序列化接口,函数也实现了序列化接口,但是函数内部使用到的外部引用不支持序列化,就会函数导致无法序列化,从而导致Task没法序列化,就无法发送到Executor中了

在调用RDD的Transformation或Action是传入函数,第一步就进行检测,即调用sc的clean方法
为了避免错误,在Driver初始化的object或class必须实现序列化接口,不然会报错误
  1. def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  2.   val cleanF = sc.clean(f) //检测函数是否可以序列化,如果可以直接将函数返回,如果不可以,抛出异常
  3.   new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
  4. }
复制代码
  1. private def ensureSerializable(func: AnyRef): Unit = {
  2.   try {
  3.     if (SparkEnv.get != null) {
  4.       //获取spark执行换的的序列化器,如果函数无法序列化,直接抛出异常,程序退出,根本就没有生成Task
  5.       SparkEnv.get.closureSerializer.newInstance().serialize(func)
  6.     }
  7.   } catch {
  8.     case ex: Exception => throw new SparkException("Task not serializable", ex)
  9.   }
  10. }
复制代码
在Driver端初始化实现序列化的object

在一个Executor中,多个Task使用同一个object对象,因为在scala中,object就是单例对象,一个Executor中只有一个实例,Task会反序列化多次,但是引用的单例对象只反序列化一次
  1. //从HDFS中读取数据,创建RDD
  2. //HDFS指定的目录中有4个小文件,内容如下:
  3. //1,ln
  4. val lines = sc.textFile(args(1))
  5. //函数外部定义的一个引用类型(变量)
  6. //RuleObjectSer是一个静态对象,实在第一次使用的时候被初始化了(实在Driver被初始化的)
  7. val rulesObj = RuleObjectSer
  8. //函数实在Driver定义的
  9. val func = (line: String) => {
  10.   val fields = line.split(",")
  11.   val id = fields(0).toInt
  12.   val code = fields(1)
  13.   val name = rulesObj.rulesMap.getOrElse(code, "未知") //闭包
  14.   //获取当前线程ID
  15.   val treadId = Thread.currentThread().getId
  16.   //获取当前Task对应的分区编号
  17.   val partitiondId = TaskContext.getPartitionId()
  18.   //获取当前Task运行时的所在机器的主机名
  19.   val host = InetAddress.getLocalHost.getHostName
  20.   (id, code, name, treadId, partitiondId, host, rulesObj.toString)
  21. }
  22. //处理数据,关联维度
  23. val res = lines.map(func)
  24. res.saveAsTextFile(args(2))
复制代码

在Driver端初始化实现序列化的class
在一个Executor中,每个Task都会使用自己独享的class实例,因为在scala中,class就是多例,Task会反序列化多次,每个Task引用的class实例也会被序列化
  1. //从HDFS中读取数据,创建RDD
  2. //HDFS指定的目录中有4个小文件,内容如下:
  3. //1,ln
  4. val lines = sc.textFile(args(1))
  5. //函数外部定义的一个引用类型(变量)
  6. //RuleClassNotSer是一个类,需要new才能实现(实在Driver被初始化的)
  7. val rulesClass = new RuleClassSer
  8. //处理数据,关联维度
  9. val res = lines.map(e => {
  10.   val fields = e.split(",")
  11.   val id = fields(0).toInt
  12.   val code = fields(1)
  13.   val name = rulesClass.rulesMap.getOrElse(code, "未知") //闭包
  14.   //获取当前线程ID
  15.   val treadId = Thread.currentThread().getId
  16.   //获取当前Task对应的分区编号
  17.   val partitiondId = TaskContext.getPartitionId()
  18.   //获取当前Task运行时的所在机器的主机名
  19.   val host = InetAddress.getLocalHost.getHostName
  20.   (id, code, name, treadId, partitiondId, host, rulesClass.toString)
  21. })
  22. res.saveAsTextFile(args(2))
复制代码

在函数内部初始化未序列化的object
object没有实现序列化接口,不会出现问题,因为该object实现函数内部被初始化的,而不是在Driver初始化的
  1. //从HDFS中读取数据,创建RDD
  2. //HDFS指定的目录中有4个小文件,内容如下:
  3. //1,ln
  4. val lines = sc.textFile(args(1))
  5. //不再Driver端初始化RuleObjectSer或RuleClassSer
  6. //函数实在Driver定义的
  7. val func = (line: String) => {
  8.   val fields = line.split(",")
  9.   val id = fields(0).toInt
  10.   val code = fields(1)
  11.   //在函数内部初始化没有实现序列化接口的RuleObjectNotSer
  12.   val name = RuleObjectNotSer.rulesMap.getOrElse(code, "未知")
  13.   //获取当前线程ID
  14.   val treadId = Thread.currentThread().getId
  15.   //获取当前Task对应的分区编号
  16.   val partitiondId = TaskContext.getPartitionId()
  17.   //获取当前Task运行时的所在机器的主机名
  18.   val host = InetAddress.getLocalHost.getHostName
  19.   (id, code, name, treadId, partitiondId, host, RuleObjectNotSer.toString)
  20. }
  21. //处理数据,关联维度
  22. val res = lines.map(func)
  23. res.saveAsTextFile(args(2))
  24. sc.stop()
复制代码

在函数内部初始化未序列化的class

这种方式非常不好,因为每来一条数据,new一个class的实例,会导致消耗更多资源,jvm会频繁GC
  1. //从HDFS中读取数据,创建RDD
  2. //HDFS指定的目录中有4个小文件,内容如下:
  3. //1,ln
  4. val lines = sc.textFile(args(1))
  5. //处理数据,关联维度
  6. val res = lines.map(e => {
  7.   val fields = e.split(",")
  8.   val id = fields(0).toInt
  9.   val code = fields(1)
  10.   //RuleClassNotSer是在Executor中被初始化的
  11.   val rulesClass = new RuleClassNotSer
  12.   //但是如果每来一条数据new一个RuleClassNotSer,不好,效率低,浪费资源,频繁GC
  13.   val name = rulesClass.rulesMap.getOrElse(code, "未知")
  14.   //获取当前线程ID
  15.   val treadId = Thread.currentThread().getId
  16.   //获取当前Task对应的分区编号
  17.   val partitiondId = TaskContext.getPartitionId()
  18.   //获取当前Task运行时的所在机器的主机名
  19.   val host = InetAddress.getLocalHost.getHostName
  20.   (id, code, name, treadId, partitiondId, host, rulesClass.toString)
  21. })
  22. res.saveAsTextFile(args(2))
复制代码
调用mapPartitions在函数内部初始化未序列化的class

一个分区使用一个class的实例,即每个Task都是自己的class实例
  1. //从HDFS中读取数据,创建RDD
  2. //HDFS指定的目录中有4个小文件,内容如下:
  3. //1,ln
  4. val lines = sc.textFile(args(1))
  5. //处理数据,关联维度
  6. val res = lines.mapPartitions(it => {
  7.   //RuleClassNotSer是在Executor中被初始化的
  8.   //一个分区的多条数据,使用同一个RuleClassNotSer实例
  9.   val rulesClass = new RuleClassNotSer
  10.   it.map(e => {
  11.     val fields = e.split(",")
  12.     val id = fields(0).toInt
  13.     val code = fields(1)
  14.     val name = rulesClass.rulesMap.getOrElse(code, "未知")
  15.     //获取当前线程ID
  16.     val treadId = Thread.currentThread().getId
  17.     //获取当前Task对应的分区编号
  18.     val partitiondId = TaskContext.getPartitionId()
  19.     //获取当前Task运行时的所在机器的主机名
  20.     val host = InetAddress.getLocalHost.getHostName
  21.     (id, code, name, treadId, partitiondId, host, rulesClass.toString)
  22.   })
  23. })
  24. res.saveAsTextFile(args(2))
  25. sc.stop()
复制代码

Task线程安全问题

在一个Executor可以同时运行多个Task,如果多个Task使用同一个共享的单例对象,如果对共享的数据同时进行读写操作,会导致线程不安全的问题,为了避免这个问题,可以加锁,但效率变低了,因为在一个Executor中同一个时间点只能有一个Task使用共享的数据,这样就变成了串行了,效率低!
定义一个工具类object,格式化日期,因为SimpleDateFormat线程不安全,会出现异常
  1. val conf = new SparkConf()
  2.   .setAppName("WordCount")
  3.   .setMaster("local[*]") //本地模式,开多个线程
  4. //1.创建SparkContext
  5. val sc = new SparkContext(conf)
  6. val lines = sc.textFile("data/date.txt")
  7. val timeRDD: RDD[Long] = lines.map(e => {
  8.   //将字符串转成long类型时间戳
  9.   //使用自定义的object工具类
  10.   val time: Long = DateUtilObj.parse(e)
  11.   time
  12. })
  13. val res = timeRDD.collect()
  14. println(res.toBuffer)
复制代码
  1. object DateUtilObj {
  2.   //多个Task使用了一个共享的SimpleDateFormat,SimpleDateFormat是线程不安全
  3.   val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  4.   //线程安全的
  5.   //val sdf: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
  6.   def parse(str: String): Long = {
  7.     //2022-05-23 11:39:30
  8.     sdf.parse(str).getTime
  9.   }
  10. }
复制代码
上面的程序会出现错误,因为多个Task同时使用一个单例对象格式化日期,报错,如果加锁,程序会变慢,改进后的代码:
  1. val conf = new SparkConf()
  2.   .setAppName("WordCount")
  3.   .setMaster("local[*]") //本地模式,开多个线程
  4. //1.创建SparkContext
  5. val sc = new SparkContext(conf)
  6. val lines = sc.textFile("data/date.txt")
  7. val timeRDD = lines.mapPartitions(it => {
  8.   //一个Task使用自己单独的DateUtilClass实例,缺点是浪费内存资源
  9.   val dataUtil = new DateUtilClass
  10.   it.map(e => {
  11.     dataUtil.parse(e)
  12.   })
  13. })
  14. val res = timeRDD.collect()
  15. println(res.toBuffer)
复制代码
  1. class DateUtilClass {
  2.   val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  3.   def parse(str: String): Long = {
  4.     //2022-05-23 11:39:30
  5.     sdf.parse(str).getTime
  6.   }
  7. }
复制代码
改进后,一个Task使用一个DateUtilClass实例,不会出现线程安全的问题。
累加器

累加器是Spark中用来做计数功能的,在程序运行过程当中,可以做一些额外的数据指标统计
触发一次Action,并且将附带的统计指标计算出来,可以使用Accumulator进行处理,Accumulator的本质数一个实现序列化接口class,每个Task都有自己的累加器,避免累加的数据发送冲突
  1. object C14_AccumulatorDemo3 {
  2.   def main(args: Array[String]): Unit = {
  3.     val conf = new SparkConf()
  4.       .setAppName("WordCount")
  5.       .setMaster("local[*]") //本地模式,开多个线程
  6.     //1.创建SparkContext
  7.     val sc = new SparkContext(conf)
  8.     val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
  9.     //在Driver定义一个特殊的变量,即累加器
  10.     //Accumulator可以将每个分区的计数结果,通过网络传输到Driver,然后进行全局求和
  11.     val accumulator: LongAccumulator = sc.longAccumulator("even-acc")
  12.     val rdd2 = rdd1.map(e => {
  13.       if (e % 2 == 0) {
  14.         accumulator.add(1)  //闭包,在Executor中累计的
  15.       }
  16.       e * 10
  17.     })
  18.     //就触发一次Action
  19.     rdd2.saveAsTextFile("out/113")
  20.     //每个Task中累计的数据会返回到Driver吗?
  21.     println(accumulator.count)
  22.   }
  23. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

不到断气不罢休

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表