【大数据篇】Spark转换算子(Transformations)和举措算子(Actions)详解 ...

打印 上一主题 下一主题

主题 556|帖子 556|积分 1668

Apache Spark 提供了大量的算子(操纵),这些算子大致可以分为两类:转换算子(Transformations)和举措算子(Actions)。转换算子用于创建一个新的RDD,而举措算子则对RDD进行操纵并产生效果。下面是一些常用的Spark算子和相应的代码示例:
转换算子(Transformations)


  • map(func)

    • 对RDD的每个元素应用函数func,返回一个新的RDD。
    • 示例:将每个元素乘以2。
      1. scalaCopy codeval rdd = sc.parallelize(List(1, 2, 3, 4))
      2. val mappedRDD = rdd.map(x => x * 2)
      复制代码

  • filter(func)

    • 返回一个新的RDD,包罗应用函数func后返回值为true的原始元素。
    • 示例:过滤出大于2的元素。
      1. scalaCopy code
      2. val filteredRDD = rdd.filter(x => x > 2)
      复制代码

  • flatMap(func)

    • 与map类似,但每个输入元素可以映射到0或多个输出元素(因此,func应返回一个序列,而不是单一元素)。
    • 示例:将每个数字映射到它的值和它的平方。
      1. scalaCopy code
      2. val flatMappedRDD = rdd.flatMap(x => List(x, x*x))
      复制代码

  • reduceByKey(func)

    • 适用于键值对(Pair RDDs)的RDD,返回一个新的RDD,其中每个键的值是应用函数func聚合的效果。
    • 示例:对每个键进行值的累加。
      1. scalaCopy codeval pairRDD = sc.parallelize(List(("a", 1), ("b", 1), ("a", 1)))
      2. val reducedRDD = pairRDD.reduceByKey((x, y) => x + y)
      复制代码

  • join(otherDataset)

    • 对于两个键值对RDD,返回一个新的RDD,包罗两个RDD中键相同的元素的组合。
    • 示例:连接两个RDD。
      1. scalaCopy codeval rdd1 = sc.parallelize(List(("a", 1), ("b", 2)))
      2. val rdd2 = sc.parallelize(List(("a", 3), ("a", 4), ("b", 5)))
      3. val joinedRDD = rdd1.join(rdd2)
      复制代码

举措算子(Actions)


  • collect()

    • 在Driver步伐中以数组的形式返回RDD的所有元素。
    • 示例:收集RDD中的所有元素。
      1. scalaCopy code
      2. val result = mappedRDD.collect()
      复制代码

  • count()

    • 返回RDD中元素的数量。
    • 示例:盘算RDD中的元素个数。
      1. scalaCopy code
      2. val count = rdd.count()
      复制代码

  • reduce(func)

    • 通过函数func(接受两个参数并返回一个)来聚合RDD中的元素。
    • 示例:求和。
      1. scalaCopy code
      2. val sum = rdd.reduce((x, y) => x + y)
      复制代码

  • take(n)

    • 返回一个数组,包罗RDD中的前n个元素。
    • 示例:取RDD的前3个元素。
      1. scalaCopy code
      2. val first3 = rdd.take(3)
      复制代码

  • saveAsTextFile(path)

    • 将RDD中的元素写入到一个文本文件中,或者文本文件的集合中(取决于RDD的分区数)。
    • 示例:将RDD生存到文件系统。
      1. scalaCopy code
      2. mappedRDD.saveAsTextFile("path/to/output")
      复制代码

更多转换算子(Transformations)


  • distinct()

    • 返回一个新的RDD,其中包罗源RDD的所有差别元素。
    • 示例:去除重复元素。
      1. scalaCopy codeval rdd = sc.parallelize(List(1, 1, 2, 3, 3, 4))
      2. val distinctRDD = rdd.distinct()
      复制代码

  • union(otherDataset)

    • 返回一个新的RDD,包罗源RDD和另一个RDD的所有元素。
    • 示例:合并两个RDD。
      1. scalaCopy codeval rdd1 = sc.parallelize(List(1, 2, 3))
      2. val rdd2 = sc.parallelize(List(4, 5, 6))
      3. val unionRDD = rdd1.union(rdd2)
      复制代码

  • intersection(otherDataset)

    • 返回一个新的RDD,包罗两个RDD的共同元素。
    • 示例:找出两个RDD的交集。
      1. scalaCopy codeval rdd1 = sc.parallelize(List(1, 2, 3, 4))
      2. val rdd2 = sc.parallelize(List(3, 4, 5, 6))
      3. val intersectionRDD = rdd1.intersection(rdd2)
      复制代码

  • subtract(otherDataset)

    • 返回一个新的RDD,包罗源RDD中有而另一个RDD中没有的元素。
    • 示例:从一个RDD中减去另一个RDD的元素。
      1. scalaCopy codeval rdd1 = sc.parallelize(List(1, 2, 3, 4))
      2. val rdd2 = sc.parallelize(List(3, 4, 5, 6))
      3. val subtractedRDD = rdd1.subtract(rdd2)
      复制代码

  • cartesian(otherDataset)

    • 对两个RDD中的所有元素进行笛卡尔积操纵,返回所有可能的元素对。
    • 示例:盘算两个RDD的笛卡尔积。
      1. scalaCopy codeval rdd1 = sc.parallelize(List(1, 2))
      2. val rdd2 = sc.parallelize(List("a", "b"))
      3. val cartesianRDD = rdd1.cartesian(rdd2)
      复制代码

更多举措算子(Actions)


  • foreach(func)

    • 对RDD中的每个元素应用函数func。
    • 示例:对每个元素实行打印操纵。
      1. scalaCopy code
      2. rdd.foreach(x => println(x))
      复制代码

  • aggregate(zeroValue)(seqOp, combOp)

    • 聚合RDD中的元素,起首使用seqOp操纵聚合每个分区的数据,然后使用combOp操纵聚合所有分区的效果。
    • 示例:盘算RDD中所有元素的总和和计数。
      1. scalaCopy codeval result = rdd.aggregate((0, 0))(
      2.     (acc, value) => (acc._1 + value, acc._2 + 1),
      3.     (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
      复制代码

  • fold(zeroValue)(func)

    • 与aggregate类似,但是seqOp和combOp使用相同的函数。
    • 示例:盘算RDD中所有元素的总和。
      1. scalaCopy code
      2. val sum = rdd.fold(0)((acc, value) => acc + value)
      复制代码

  • countByKey()

    • 仅适用于键值对RDD,对每个键计数。
    • 示例:盘算每个键的出现次数。
      1. scalaCopy codeval pairRDD = sc.parallelize(List(("a", 1), ("b", 1), ("a", 1)))
      2. val counts = pairRDD.countByKey()
      复制代码

  • saveAsNewAPIHadoopFile(path)

    • 将RDD生存到Hadoop支持的文件系统中。
    • 示例:将RDD生存为Hadoop文件。
      1. scalaCopy coderdd.saveAsNewAPIHadoopFile("path/to/output",
      2.                            classOf[Text],
      3.                            classOf[IntWritable],
      4.                            classOf[TextOutputFormat[Text, IntWritable]])
      复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

我可以不吃啊

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表