Spark处置惩罚过程-转换算子

[复制链接]
发表于 2025-9-8 03:27:57 | 显示全部楼层 |阅读模式
RDD的处置惩罚过程

Spark使用Scala语言实现了RDD的API,步伐开发者可以通过调用API对RDD进行操纵处置惩罚。RDD的处置惩罚过程如图所示;

RDD经过一系列的“转换”操纵,每一次转换都会产生差别的RDD,以供给下一次“转换”操纵使用,直到最后一个RDD经过“行动”操纵才会真正被计算处置惩罚。
这里有两点注意:
延迟。RDD中所有的转换都是延迟的,它们并不会直接计算效果。相反,他们只是记住这些应用到基础数据集上的转换动作。只有当发生要求返回效果给driver的动作时,这些转换才会真正运行。
血缘关系。一个RDD运算之后,会产生新的RDD。

转换算子

转换算子用于对 RDD 进行转换操纵,生成一个新的 RDD。转换操纵是惰性的,即当调用转换算子时,Spark 并不会立即实行计算,而是记载下操纵步调,直到遇到行动算子时才会触发实际的计算。
从格式和用法上来看,它就是聚集对象的方法。
以下是一些常见的转换算子:
1.map 算子

作用:对 RDD 中的每个元素应用给定的函数 f,将每个元素转换为另一个元素,终极返回一个新的 RDD。这个函数 f 接收一个输入范例为 T 的元素,返回一个范例为 U 的元素。
格式:def map[U: ClassTag](f: T => U): RDD[U]
  1. import org.apache.spark.{SparkConf, SparkContext}
  2. object MapExample {
  3.   def main(args: Array[String]): Unit = {
  4.     val conf = new SparkConf().setAppName("MapExample").setMaster("local[*]")
  5.     val sc = new SparkContext(conf)
  6.     val rdd = sc.parallelize(Seq(1, 2, 3, 4))
  7.     val newRdd = rdd.map(x => x * 2)
  8.     newRdd.collect().foreach(println)
  9.     sc.stop()
  10.   }
  11. }
复制代码
2.filter 算子

作用:筛选出 RDD 中满足函数 f 条件(即 f 函数返回 true)的元素,返回一个新的 RDD,新 RDD 中的元素范例与原 RDD 相同。
格式:def filter(f: T => Boolean): RDD[T]
  1. import org.apache.spark.{SparkConf, SparkContext}
  2. object FilterExample {
  3.   def main(args: Array[String]): Unit = {
  4.     val conf = new SparkConf().setAppName("FilterExample").setMaster("local[*]")
  5.     val sc = new SparkContext(conf)
  6.     val rdd = sc.parallelize(Seq(1, 2, 3, 4))
  7.     val newRdd = rdd.filter(x => x % 2 == 0)
  8.     newRdd.collect().foreach(println)
  9.     sc.stop()
  10. }}
复制代码
3.flatMap算子

作用:对 RDD 中的每个元素应用函数 f,函数 f 返回一个可遍历的聚集,然后将这些聚集中的元素扁平化归并成一个新的 RDD。
格式:def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
  1. import org.apache.spark.{SparkConf, SparkContext}
  2. object FlatMapExample {
  3.   def main(args: Array[String]): Unit = {
  4.     val conf = new SparkConf().setAppName("FlatMapExample").setMaster("local[*]")
  5.     val sc = new SparkContext(conf)
  6.     val rdd = sc.parallelize(Seq("hello world", "spark is great"))
  7.     val newRdd = rdd.flatMap(x => x.split(" "))
  8.     newRdd.collect().foreach(println)
  9.     sc.stop()
  10.   }}
复制代码
4.reduceByKey 算子

reduceByKey 是 Spark 中用于处置惩罚键值对(Key - Value)范例 RDD 的一个重要转换算子。它的核心作用是对具有相同键的所有值进行聚合操纵,通过用户提供的聚合函数将这些值归并成一个效果,从而实现数据的归约和统计。例如统计每个键出现的次数、计算每个键对应值的总和、均匀值等。
格式
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
参数阐明:
func: (V, V) => V:这是一个二元函数,用于界说如何对相同键的值进行聚合。函数接收两个范例为 V 的值,返回一个范例为 V 的效果。例如,若要对相同键的值进行求和,func 可以是 (x, y) => x + y。
numPartitions: Int(可选):指定效果 RDD 的分区数。如果不提供该参数,将使用默认的分区数。
以下是一个使用 reduceByKey 计算每个单词出现次数的示例:
  1. import org.apache.spark.{SparkConf, SparkContext}
  2. object ReduceByKeyExample {
  3.   def main(args: Array[String]): Unit = {
  4.     // 创建 SparkConf 对象
  5.     val conf = new SparkConf().setAppName("ReduceByKeyExample").setMaster("local[*]")
  6.     // 创建 SparkContext 对象
  7.     val sc = new SparkContext(conf)
  8.     // 创建一个包含单词的 RDD
  9.     val words = sc.parallelize(List("apple", "banana", "apple", "cherry", "banana", "apple"))
  10.     // 将每个单词映射为 (单词, 1) 的键值对
  11.     val wordPairs = words.map(word => (word, 1))
  12.     // 使用 reduceByKey 计算每个单词的出现次数
  13.     val wordCounts = wordPairs.reduceByKey(_ + _)
  14.     // 输出结果
  15.     wordCounts.collect().foreach(println)
  16.     // 停止 SparkContext
  17.     sc.stop()
  18.   }
  19. }
复制代码

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
继续阅读请点击广告

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

×
登录参与点评抽奖,加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表