spark汇总

打印 上一主题 下一主题

主题 770|帖子 770|积分 2310

描述


Apache Spark 是用于大规模数据处理惩罚的统一分析引擎。它提供 Java、Scala、Python 和 R 中的高级 API,以及支持通用实行图的优化引擎。它还支持一组丰富的高级工具,包罗用于 SQL 和结构化数据处理惩罚的Spark SQL 、用于机器学习的MLlib、用于图形处理惩罚的 GraphX 以及用于增量盘算和流处理惩罚的结构化流。
1. Spark Core
Spark的核心,是Spark运行的基础。Spark Core以RDD为数据抽象,提供Python、Java、Scala、R语言的API,可以编程举行海量离线数据批处理惩罚盘算。
2. Spark SQL
Spark SQL是Spark用来操纵结构化数据的组件。通过Spark SQL对数据举行处理惩罚。
3. Spark Streaming
Spark Streaming是Spark平台上针对实时数据举行流式盘算的组件,提供了丰富的处理惩罚数据流的API。
4. Spark MLlib
MLlib是Spark提供的一个机器学习算法库。MLlib不但提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语。
5. Spark GraphX
GraphX是Spark面向图盘算提供的框架与算法库。
运行模式

1. Windows模式

多用于当地测试,不必要假造机或服务器。
代码示例

WordCount.scala
  1. package com.wunaiieq
  2. //1.导入SparkConf,SparkContext
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.{SparkConf, SparkContext}
  5. object WordCount {
  6.   def main(args: Array[String]): Unit = {
  7.     //2.构建SparkConf对象,并设置本地运行和程序的名称
  8.     val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
  9.     //3.通过SparkConf对象构建SparkContext对象
  10.     val sc = new SparkContext(conf)
  11.     //4.读取文件,并生成RDD对象
  12.     val fileRdd: RDD[String] = sc.textFile("data/words.txt")
  13.     //5.将单词进行切割,得到一个存储全部单词的集合对象
  14.     val wordsRdd: RDD[String] = fileRdd.flatMap(_.split(" "))
  15.     //6.将单词转换为Tuple2对象("hello"->("hello",1))
  16.     val wordAndOneRdd: RDD[(String, Int)] = wordsRdd.map((_, 1))
  17.     //7.将元组的value按照key进行分组,并对该组所有的value进行聚合操作
  18.     val resultRdd: RDD[(String, Int)] = wordAndOneRdd.reduceByKey(_ + _)
  19.     //8.通过collect方法收集RDD数据
  20.     val wordCount: Array[(String, Int)] = resultRdd.collect()
  21.     //9.输出结果
  22.     wordCount.foreach(println)
  23.   }
  24. }
复制代码
log4j.properties
这个没什么说的直接复制用即可
  1. # Set everything to be logged to the console
  2. log4j.rootCategory=ERROR, console
  3. log4j.appender.console=org.apache.log4j.ConsoleAppender
  4. log4j.appender.console.target=System.err
  5. log4j.appender.console.layout=org.apache.log4j.PatternLayout
  6. log4j.appender.console.layout.ConversionPattern=%d{MM/dd HH:mm:ss} %p %c{1}: %m%n
  7. # Set the default spark-shell/spark-sql log level to WARN. When running the
  8. # spark-shell/spark-sql, the log level for these classes is used to overwrite
  9. # the root logger's log level, so that the user can have different defaults
  10. # for the shell and regular Spark apps.
  11. log4j.logger.org.apache.spark.repl.Main=WARN
  12. log4j.logger.org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver=WARN
  13. # Settings to quiet third party logs that are too verbose
  14. log4j.logger.org.sparkproject.jetty=WARN
  15. log4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=ERROR
  16. log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
  17. log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
  18. log4j.logger.org.apache.parquet=ERROR
  19. log4j.logger.parquet=ERROR
  20. # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
  21. log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
  22. log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
  23. # For deploying Spark ThriftServer
  24. # SPARK-34128:Suppress undesirable TTransportException warnings involved in THRIFT-4805
  25. log4j.appender.console.filter.1=org.apache.log4j.varia.StringMatchFilter
  26. log4j.appender.console.filter.1.StringToMatch=Thrift error occurred during processing of message
  27. log4j.appender.console.filter.1.AcceptOnMatch=false
复制代码
2. Local模式

一台服务器或假造机搞定,所谓的Local模式,就是不必要其他任何节点资源就可以在当地实行Spark代码的环境,一般用于讲授,调试,演示等。
  1. # 进入spark根目录
  2. cd /opt/module/spark/bin
  3. # 运行视频spark-shell
  4. ./spark-shell
复制代码
webUI
  1. [atguigu@master bin]$ jps
  2. 2081 SparkSubmit
  3. 2206 Jps
  4. [atguigu@master bin]$ netstat -anp|grep 2081
  5. (Not all processes could be identified, non-owned process info
  6. will not be shown, you would have to be root to see it all.)
  7. tcp6       0      0 192.168.16.100:42050    :::*                    LISTEN      2081/java           
  8. tcp6       0      0 :::4040                 :::*                    LISTEN      2081/java           
  9. tcp6       0      0 192.168.16.100:35770    :::*                    LISTEN      2081/java           
  10. unix  2      [ ]         STREAM     CONNECTED     33071    2081/java            
  11. unix  2      [ ]         STREAM     CONNECTED     36801    2081/java        
复制代码
欣赏器访问
  1. http://192.168.16.100:4040/
复制代码
spark-submit
以下为利用spark提交jar包示例
  1. ./spark-submit --master local[2] --class org.apache.spark.examples.SparkPi /opt/module/spark/examples/jars/spark-examples_2.12-3.1.1.jar 100
复制代码
参数描述--class要实行程序的主类,可以更换为自己写的应用程序的主类名称--master local[2]摆设模式,默认为当地模式;数字 2 表示分配的假造 CPU 核数量spark-examples_2.12-3.2.1.jar运行的应用类所在的 jar 包,现实利用时可以设定为自己打的 jar 包20程序的入口参数,根据应用程序的必要,可以是任何有用的输入值 几种提交方式比较
工具功能特点利用场景bin/spark-submit提交 Java/Scala/Python/R 代码到 Spark 中运行提交代码用正式场所,正式提交 Spark 程序运行bin/spark-shell提供一个 Scala 解释器环境,用来以 Scala 代码实行 Spark 程序解释器环境,写一行实行一行测试、学习、写一行实行一行、用来验证代码等bin/pyspark提供一个 Python 解释器环境,用来以 Python 代码实行 Spark 程序解释器环境,写一行实行一行测试、学习、写一行实行一行、用来验证代码等 3. Standalone模式

Standalone是Spark自带的一个资源调度框架,它支持完全分布式,也支持HA

   

  • Master角色:管理整个集群的资源,重要负责资源的调度和分配,并举行集群的监控等职责;并托管运行各个使命的Driver。如Yarn的ResourceManager。
  • Worker角色:每个从节点分配资源信息给Worker管理,管理单个服务器的资源类,分配对应的资源来运行Executor(Task);资源信息包含内存Memory和CPU
    Cores核数。如Yarn的NodeManager。
  • Driver角色,管理单个Spark使命在运行的时候的工作,如Yarn的ApplicationMaster “
  • Executor角色,单个使命运行的时候的一堆工作者,干活的。它是集群中工作节点(Worker)中的一个JVM进程,负责在 Spark 作业中运行具体使命(Task),使命彼此之间相互独立。Spark 应用启动时,Executor节点被同时启动,并且始终伴随着整个 Spark应用的生命周期而存在。如果有Executor节点发生了故障或崩溃,Spark应用也可以继续实行,会将出错节点上的使命调度到其他Executor节点上继续运行。
    Executor有两个核心功能:
    1.负责运行构成Spark应用的使命,并将效果返回给驱动器进程。
    2.它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的
    RDD 提供内存式存储。RDD 是直接缓存在Executor进程内的,因此使命可以在运行时充分利用缓存数据加速运算。
  总结
   资源管理维度
集群资源管理者:Master
单机资源管理者:Worker
  使命盘算维度
单使命管理者:Driver
单使命实行者:Executor
  注:Executor运行于Worker进程内,由Worker提供资源供给它们运行
扩展:历史服务器HistoryServer(可选),Spark Application运行完成以后,保存事件日志数据至HDFS,启动HistoryServer可以查看应用运行相关信息。
4. Yarn模式
Hadoop生态圈内里的一个资源调度框架,Spark也是可以基于Yarn来盘算的。
5. 云服务模式(运行在云平台上)
Kubernetes(K8S)容器模式
Spark中的各个角色运行在Kubernetes的容器内部,并构成Spark集群环境。容器化摆设是现在业界很流行的一项技术,基于Docker镜像运行能够让用户更加方便地对应用举行管理和运维。容器管理工具中最为流行的就是(K8S),而Spark也在新版本中支持了k8s摆设模式。
6. Mesos
Mesos是Apache下的开源分布式资源管理框架,它被称为是分布式系统的内核,在Twitter得到广泛利用,管理着Twitter超过30,0000台服务器上的应用摆设,但是在国内,依然利用着传统的Hadoop大数据框架,以是国内利用Mesos框架的并不多。
模式Spark安装机器数需启动的进程所属者应用场景Local1无Spark测试Standalone3Master及WorkerSpark单独摆设Yarn1Yarn及HDFSHadoop混合摆设 RDD

描述

Spark RDD(Resilient Distributed Dataset,弹性分布式数据集)代表一个不可变、可分区、元素可并行盘算的聚集,是Spark举行数据处理惩罚的根本单元。


  • 不可变性:RDD一旦创建,其数据就不可改变。对RDD的所有操纵(如map、filter、reduce等)都会生成一个新的RDD,而不会修改原始RDD。这种不可变性使得RDD在分布式盘算环境下非常稳定,制止了并发冲突。
  • 可分区性:RDD可以分成多个分区(Partition),每个分区就是一个数据集片段。一个RDD的差别分区可以保存到集群中的差别节点上,从而可以在集群中的差别节点上举行并行盘算。分区是Spark作业并行盘算的根本单元,每个分区都会被一个盘算使命处理惩罚,分区的数量决定了并行盘算的粒度。
  • 弹性:RDD具有弹性容错的特点。当运算中出现非常情况导致分区数据丢失或运算失败时,可以根据RDD的血统(Lineage)关系对数据举行重修。别的,RDD的数据可以保存在内存中,内存放不下时也可以保存在磁盘中,实现了存储的弹性。
特性

   1. 分区(Partitions) 寄义:RDD的数据被划分为多个分区,每个分区是一个数据块,分布在集群的差别节点上。 作用:每个分区会被一个盘算使命处理惩罚,分区的数量决定了并行盘算的粒度。用户可以在创建RDD时指定分区数,如果没有指定,Spark会根据集群的资源自动设置。
示例:从HDFS文件创建RDD时,默认分区数为文件的Block数。
2. 盘算函数(Compute Function) 寄义:RDD的盘算方法会作用到每个分区上。 作用:当对RDD举行操纵(如map、filter等)时,Spark会对每个分区应用这个函数。
示例:在map操纵中,盘算函数会对每个元素实行指定的转换逻辑。
3. 依赖关系(Dependencies) 寄义:RDD之间存在依赖关系。 作用:在部分分区数据丢失时,Spark可以利用依赖关系重新盘算丢失的数据,而不是重新盘算整个RDD,提高了容错本领。
分类:依赖关系分为窄依赖(Narrow Dependency)和宽依赖(Wide
Dependency)。窄依赖指一个父RDD的分区最多被一个子RDD的分区利用;宽依赖指一个父RDD的分区被多个子RDD的分区利用。
4. 分区器(Partitioner,可选,只有kv型RDD才有) 寄义:对于键值对(Key-Value)范例的RDD,可以指定一个分区器来决定数据的分区方式。
作用:分区器决定了数据在集群中的分布,影响并行盘算的性能。
范例:Spark支持多种分区器,如HashPartitioner(基于哈希值分区)和RangePartitioner(基于范围分区)。
5. 优先位置(Preferred Locations,可选) 寄义:RDD分区规划应当尽量靠近数据所在的服务器 作用:Spark在举行使命调度时,会优先将数据分配到其存储位置举行盘算,淘汰数据传输开销,提高盘算服从。
示例:对于HDFS文件,优先位置通常是文件块所在的节点。
  RDD创建

1. 通过并行化聚集创建,将当地聚集对象转分布式RDD
  1. val sc = new SparkContext(conf)
  2. val rdd1:RDD[Int]=sc.parallelize(List(1, 2, 3, 4, 5, 6), 3)
  3. rdd1.glom().collect()
复制代码
makeRdd()创建,本质上也是利用sc.parallelize(…)
  1. def makeRDD[T: ClassTag](
  2.   seq: Seq[T],
  3.   numSlices: Int = defaultParallelism): RDD[T] = withScope {
  4.   parallelize(seq, numSlices)
  5. }
复制代码
2. 读取外部数据源 (比如:读取文件 )
  1. //通过SparkConf对象构建SparkContext对象
  2. val sc = new SparkContext(conf)
  3. //读取文件
  4. val fileRdd:RDD[String] = sc.textFile("data/words.txt")
复制代码
程序实行入口:SparkContext对象
Spark RDD 编程的程序入口对象是SparkContext对象(Scala、Python、Java都是云云)
只有构建出SparkContext, 基于它才能实行后续的API调用和盘算
本质上, SparkContext对编程来说, 重要功能就是创建第一个RDD出来。
代码示例(并行化创建)

  1. package com.wunaiieq
  2. //1.导入SparkConf类、SparkContext
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.{SparkConf, SparkContext}
  5. object CreateByParallelize {
  6.   def main(args: Array[String]): Unit = {
  7.     //2.构建SparkConf对象。并设置本地运行和程序的名称,*表示使用全部cpu内核,可以指定数量
  8.     val sparkconf = new SparkConf().setMaster("local[*]").setAppName("CreateRdd1")
  9.     //3.构建SparkContext对象
  10.     val sparkContext = new SparkContext(sparkconf)
  11.     //4.通过并行化创建RDD对象:将本地集合->分布式的RDD对象,如果不指定分区,则根据cpu内核数进行自动分配
  12.     val rdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8),3)
  13.     //5.输出默认的分区数
  14.     println("默认分区数:"+rdd.getNumPartitions)//已经指定为3
  15.     //6.collect方法:将rdd对象中每个分区的数据,都发送到Driver,形成一个Array对象
  16.     val array1: Array[Int] = rdd.collect()
  17.     println("rdd.collect()="+array1.mkString(","))
  18.     //7.显示出rdd对象中元素被分布到不同分区的数据信息
  19.     val array2: Array[Array[Int]] = rdd.glom().collect()
  20.     println("rdd.glom().collect()的内容是:")
  21.     for(eleArr<- array2){
  22.      println(eleArr.mkString(","))
  23.     }
  24.   }
  25. }
复制代码
代码示例(读取外部数据)

  1. package com.wunaiieq
  2. //1.导入SparkConf,SparkContext类
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.{SparkConf, SparkContext}
  5. object CreateByTextFile {
  6.   def main(args: Array[String]): Unit = {
  7.     //2.构建SparkConf对象,并设置本地运行和程序名
  8.     val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("textFile")
  9.     //3.通过sparkconf创建SparkContext对象
  10.     val sparkContext = new SparkContext(sparkConf)
  11.     //4.通过textFile读取文件
  12.     //4.1.读取hdfs分布式文件系统上的文件
  13. //    val hdfsRdd: RDD[String] = sparkContext.textFile("hdfs://192.168.16.100:9820/input/data.txt")
  14. //    val hdfsResult: Array[String] = hdfsRdd.collect()
  15. //    println("hdfsRdd分区数"+hdfsRdd.getNumPartitions)
  16. //    println("hdfsRdd内容"+hdfsResult.mkString(","))
  17.     //4.2读取本地文件
  18.     val localRdd1: RDD[String] = sparkContext.textFile("data/words.txt")
  19.     println("localRdd1分区数"+localRdd1.getNumPartitions)
  20.     println("localRdd1内容"+localRdd1.collect().mkString(","))
  21.     //5.设置最小分区数
  22.     val localRdd2: RDD[String] = sparkContext.textFile("data/words.txt",3)
  23.     println("localRdd2分区数"+localRdd2.getNumPartitions)
  24.     println("localRdd2内容"+localRdd2.collect().mkString(","))
  25.     //6.最小分区数设置是一个参考值,Spark会有自己的判断,值太大Spark不会理会
  26.     val localRdd3: RDD[String] = sparkContext.textFile("data/words.txt", 100)
  27.     println("localRdd3的分区数"+localRdd3.getNumPartitions)
  28.   }
  29. }
复制代码
代码示例(读取目录下的所有文件)

  1. package com.wunaiieq
  2. //1.导入类
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.{SparkConf, SparkContext}
  5. object CreateByWholeTextFiles {
  6.   def main(args: Array[String]): Unit = {
  7.     //2.构建SparkConf对象,并设置本地运行和程序名称
  8.     val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WholeTextFiles")
  9.     //3.使用sparkconf对象构建SparkContet对象
  10.     val sparkContext = new SparkContext(sparkConf)
  11.     //5.读取指定目录下的小文件
  12.     val rdd: RDD[(String, String)] = sparkContext.wholeTextFiles("data")
  13.     val tuples: Array[(String, String)] = rdd.collect()
  14.     tuples.foreach(ele=>println(ele._1,ele._2))
  15.     //6.获取小文件中的内容
  16.     val array: Array[String] = rdd.map(_._2).collect()
  17.     println("---------------------------")
  18.     println(array.mkString("|"))
  19.     //4.关闭sparkContext对象
  20.     sparkContext.stop()
  21.   }
  22. }
复制代码
算子

详见如下专题RDD算子聚集
DAG

详见如下专题DAG专题
SparkSQL

详见如下专题SparkSQL专题
SparkStreaming

详见如下专题SparkStreaming专题

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

祗疼妳一个

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表