在sheel中运行Spark

打印 上一主题 下一主题

主题 1713|帖子 1713|积分 5139

(一RDD根本概念

  Resilient Distributed Dataset 叫做弹性分布式数据集,是Spark中最根本的数据抽象,是分布式盘算的实现载体,代表一个不可变,可分区,里面的元素并行盘算的集合。
  先来看看这三个单词的寄义:
  - Dataset: 一个数据集合,用来存放数据的。之前我们学习的Scala中,Array, Set等也叫数据集。
  - Distributed: 分布式存储的,表示数据是存放在不同的呆板上的。这就和我们前面学习数据结构就不同了。
  - Resilient: 数据可以保存在内存大概磁盘中。
  然后,我们在看看它的定义中的一些关键字:
  不可变的:immutable。类比明白scala中的不可变集合大概是利用val修饰的变量。
  可分区的:集合的数据课分别成为许多部分,每部分称为分区:Partition
  并行盘算:集合中的数据可以被并行的盘算处理,每个分区数据被一个Task使命处理。
  (二RDD的创建

  spark的盘算功能是通过RDD来实现的,那么怎样去创建RDD呢?有两种创建方式。
  1.从集合内存中创建
  可以通过将本地集合(如数组、列表等)传递给 SparkContext 的 parallelize 方法来创建 RDD。
  1.     // 创建 SparkConf 和 SparkContext
  2.     val conf = new SparkConf().setAppName("RDDFromCollection").setMaster("local[*]")
  3.     val sc = new SparkContext(conf)
  4.     // 创建一个本地集合
  5.     val data = Array(1, 2, 3, 4, 5)
  6.     // 通过 parallelize 方法将本地集合转换为 RDD
  7.     val distData = sc.parallelize(data, 2) // 第二个参数是分区数
复制代码
  2.从外部存储中创建。比方,读入外部的文件。
  1. // 创建 SparkConf 和 SparkContext
  2.     val conf = new SparkConf().setAppName("RDDFromHDFS").setMaster("local[*]")
  3.     val sc = new SparkContext(conf)
  4.     // 从 HDFS 加载文本文件
  5.     val hdfsRDD = sc.textFile("hdfs://namenode:8020/path/to/your/file.txt")
  6. // 获取并打印分区数val partitionCount = hdfsRDD.getNumPartitions
  7. println(s"The number of partitions is: $partitionCount")
复制代码
 可以通过getNumPartitions来获取分区的数量。
   (三SparkConf 和 SparkContext

    SparkConf :类用于设置 Spark 应用程序的各种参数。通过 SparkConf 类,你可以设置应用程序的名称、运行模式(如本地模式、集群模式)、资源分配(如内存、CPU 焦点数)等。
    主要作用设置应用程序参数:可以设置 Spark 应用程序的各种属性,如应用程序名称、主节点地点等。
    管理设置信息:将设置信息封装在一个对象中,方便在应用程序中传递和利用。
    SparkContext :是 Spark 应用程序的入口点,它代表了与 Spark 集群的连接。通过 SparkContext,你可以创建 RDD(弹性分布式数据集)、累加器、广播变量等,还可以与外部数据源举行交互。
     (四在shell中运行RDD程序

     案例:启动hdfs集群,打开hadoop100:9870,在wcinput目录下上传一个包罗许多个单词的文本文件。
     启动之后在spark-shell中写代码。
   

   
  1.     // 读取文件,得到RDD
  2.     val rdd1 = sc.textFile("hdfs://hadoop100:8020/wcinput/words.txt")
  3.     // 将单词进行切割,得到一个存储全部单词的RDD
  4.     val rdd2= fileRDD.flatMap(line => line.split(" "))
  5.     // 将单词转换为元组对象,key是单词,value是数字1
  6.     val rdd3= wordsRDD.map(word => (word, 1))
  7.     // 将元组的value按照key来分组,对所有的value执行聚合操作(相加)
  8.     val rdd4= wordsWithOneRDD.reduceByKey((a, b) => a + b)
  9.     // 收集RDD的数据并打印输出结果
  10.     rdd4.collect().foreach(println)
复制代码
  执行过程如下

   (五RDD的五大特征

  RDD有5个特征,我们分别来先容。
  1.RDD是有分区的。
  RDD的分区是RDD数据存储的最小单位。一份数据本质是分隔了多个分区。 如下图示,如果1个RDD有3个分区,RDD内存储了123456,那么数据本质上分散在三个分区内举行存储。
  

  举个生存中的例子:高考的时间,每个班的同学都打散到不同的科场,此时的高3(8)班就是一个抽象的概念,在现实中,这个班级的门生可能分布在5个不同的科场。
  2.盘算函数会作用于每个分区
  RDD的方法会作用在所有的分区上。
  3.每个RDD之间是有依赖关系(RDD有血缘关系)
  RDD的每次转换都会生成一个新的RDD,以是RDD之间就会形成雷同于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新盘算丢失的分区数据,而不是对RDD的所有分区举行重新盘算。
  
        4.Key-Value型的RDD可以有分区器
      数据默认分区器:Hash分区规则,可以手动设置一个分区器(rdd.partitionBy的方式来设置)
         5.每一个分区都有一个优先位置列表
    优先位置列表会存储每个Partition的优先位置,对于一个HDFS文件来说,就是每个Partition块的位置。按照“移动数据不如移动盘算”的理念,Spark在举行使命调理时,会尽可能地将使命分配到其所要处理数据块的存储位置。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

风雨同行

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表