大数据技术之Spark
1、Spark先容1.1、Spark是什么
Spark是什么定义:Apache Spark是用于大规模数据(large-scala data)处理的同一(unified)分析引擎。
https://i-blog.csdnimg.cn/img_convert/01488aafa44d53923a4effc6fcb5db7e.png
Spark最早源于一篇论文 Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing, 该论文是由加州大学柏克莱分校的 Matei Zaharia 等人发表的。论文中提出了一种弹性分布式数据集(即 RDD)的概念。
https://i-blog.csdnimg.cn/img_convert/65eb00ebdc479f22a8a9896bd6a5958a.png
翻译过来就是:RDD 是一种分布式内存抽象,其使得步伐员可以大概在大规模集群中做内存运算,而且有一定的容错方式。而这也是整个Spark的核心数据结构,Spark 整个平台都围绕着RDD进行。
https://i-blog.csdnimg.cn/img_convert/2bd216f937d587da726858eb41841efe.png
简而言之,Spark 借鉴了 MapReduce 思想发展而来,保留了其分布式并行计算的优点并改进了其明显的缺陷。让中心数据存储在内存中提 高了运行速率、并提供丰富的操作数据的API进步了开发速率。
[*] Spark是一款分布式内存计算的同一分析引擎。 其特点就是对恣意类型的数据进行自定义计算。
[*] Spark可以计算:结构化、半结构化、非结构化等各种类型的数据结构,同时也支持使用Python、Java、Scala、R以及SQL语言去开发应用 步伐计算数据。
[*] Spark的适用面非常广泛,以是,被称之为 同一的(适用面广)的分析引擎(数据处理)
1.2、Spark风雨十年
Spark 是加州大学伯克利分校AMP实行室(Algorithms Machines and People Lab)开发的通用大数据处理框架。 Spark的发展历史,履历过几大重要阶段,如下图所示:
https://i-blog.csdnimg.cn/img_convert/9ff77c58cc86293877173f64b1146957.png
Stack Overflow的数据可以看出,2015年开始Spark每月的问题提交数目已经逾越Hadoop,而2018年Spark Python版本的APIPySpark每月的问题提交数目也已超过Hadoop。2019年排名Spark第一,PySpark第二;而十年的累计排名是Spark第一,PySpark第 三。按照这个趋势发展下去,Spark和PySpark在未来很长一段时间内应该还会处于垄断地位。
https://i-blog.csdnimg.cn/img_convert/f360e212e0fc9399888b70d88ec02a29.png
十年走来,Spark目前已经迭代到了3.2.0版本(2021.10.13发布),本次课程基于最新的Spark 3.2.0版本进行授课
https://i-blog.csdnimg.cn/img_convert/b0e1c5595a3b115dc45e57af4528d139.png
1.3、Spark VS Hadoop(MapReduce)
Spark和前面学习的Hadoop技术栈有何区别呢?
https://i-blog.csdnimg.cn/img_convert/1920d0d94abc0ee55ebf24d9573bdaef.png
尽管Spark相对于Hadoop而言具有较大优势,但Spark并不能完全替代Hadoop
[*] 在计算层面,Spark相比较MR(MapReduce)有巨大的性能优势,但至今仍有很多计算工具基于MR构架,比如非常成熟的Hive
[*] Spark仅做计算,而Hadoop生态圈不仅有计算(MR)也有存储(HDFS)和资源管理调理(YARN),HDFS和YARN仍是很多大数据 体系的核心架构。
1.4、Spark四大特点
https://i-blog.csdnimg.cn/img_convert/02b530889964d15cb78d060b270ca3d9.png
速率快:由于Apache Spark支持内存计算,而且通过DAG(有向无环图)实行引擎支持无环数据流,以是官方宣称其在内存中的运算速率要比Hadoop的MapReduce快100倍,在硬盘中要快10倍。
https://i-blog.csdnimg.cn/img_convert/f2ac70b223e95e81decd18e2c0b8b957.png
Spark处理数据与MapReduce处理数据相比,有如下两个不同点:
[*] 其一、Spark处理数据时,可以将中心处理结果数据存储到内存中;
[*] 其二、Spark 提供了非常丰富的算子(API), 可以做到复杂使命在一个Spark 步伐中完成.
易于使用:Spark 的版本已经更新到 Spark 3.2.0(截止日期2021.10.13),支持了包括 Java、Scala、Python 、R和SQL语言在内的多种语言。为了 兼容Spark2.x企业级应用场景,Spark仍然持续更新Spark2版本。
https://i-blog.csdnimg.cn/img_convert/c7d1ce85657a85a84465c87d814d1e40.png
通用性强:在 Spark 的基础上,Spark 还提供了包括Spark SQL、Spark Streaming、MLib 及GraphX在内的多个工具库,我们可以在一个应用中无缝 地使用这些工具库。
https://i-blog.csdnimg.cn/img_convert/e57263bfc842a45e648966ed199b91e8.png
运行方式:Spark 支持多种运行方式,包括在 Hadoop 和 Mesos 上,也支持 Standalone的独立运行模式,同时也可以运行在云Kubernetes(Spark2.3开始支持)上。
https://i-blog.csdnimg.cn/img_convert/f5ec1a9b86a74833eaa968cfdb7560ac.png
对于数据源而言,Spark 支持从HDFS、HBase、Cassandra 及 Kafka 等多种途径获取数据。
1.5、Spark 框架模块-了解
整个Spark 框架模块包罗:Spark Core、 Spark SQL、 Spark Streaming、 Spark GraphX、 Spark MLlib,而后四项的本领都是建立在核心引擎之上
https://i-blog.csdnimg.cn/img_convert/41af7b8a464eb42971134422d5abceab.png
Spark Core:Spark的核心,Spark核心功能均由Spark Core模块提供,是Spark运行的基础。Spark Core以RDD为数据抽象,提供Python、Java、 Scala、R语言的API,可以编程进行海量离线数据批处理计算。
SparkSQL:基于SparkCore之上,提供结构化数据的处理模块。SparkSQL支持以SQL语言对数据进行处理,SparkSQL自己针对离线计算场景。同 时基于SparkSQL,Spark提供了StructuredStreaming模块,可以以SparkSQL为基础,进行数据的流式计算。
SparkStreaming:以SparkCore为基础,提供数据的流式计算功能。
MLlib:以SparkCore为基础,进行机器学习计算,内置了大量的机器学习库和API算法等。方便用户以分布式计算的模式进行机器学习计算。
GraphX:以SparkCore为基础,进行图计算,提供了大量的图计算API,方便用于以分布式计算模式进行图计算。
1.6、Spark的运行模式 - 了解
https://i-blog.csdnimg.cn/img_convert/3ec9bf8d65936ed654a76be6615ea9c4.png
Spark提供多种运行模式,包括:
[*] 本地模式(单机)本地模式就是以一个独立的进程,通过其内部的多个线程来模拟整个Spark运行时环境
[*] Standalone模式(集群)Spark中的各个角色以独立进程的情势存在,并构成Spark集群环境
[*] Hadoop YARN模式(集群)Spark中的各个角色运行在YARN的容器内部,并构成Spark集群环境
[*] Kubernetes模式(容器集群)Spark中的各个角色运行在Kubernetes的容器内部,并构成Spark集群环境
[*] 云服务模式(运行在云平台上)......
1.7、Spark的架构角色 - 理解
https://i-blog.csdnimg.cn/img_convert/8cefb27b5ca8b5fb5b81a1a8ec658d8f.png
YARN重要有4类角色,从2个层面去看
https://i-blog.csdnimg.cn/img_convert/858fb56e5d29a0b132e61e0c2f445ea4.png
Spark运行角色
https://i-blog.csdnimg.cn/img_convert/7a8dd7a0e71e07433399e63d7c9f7a15.png
https://i-blog.csdnimg.cn/img_convert/d52af37845dd97ca6689b572d9ab590f.png
https://i-blog.csdnimg.cn/img_convert/3125989b1405ac1a454373b8b2c405b0.png
注意:正常环境下Executor是干活的角色,不过在Local模式下,Driver即管理又干活
2、Spark环境搭建-Local
2.1、课程服务器环境
本次课程使用三台Linux虚拟机服务器来学习, 三台虚拟机的功能分配是:
hadoop102: Master和 Worker
hadoop103: Worker
hadoop104: Worker 2.2、根本原理
https://i-blog.csdnimg.cn/img_convert/0a15ca3673b6914f89a817c29f202cdc.png
本质:启动一个JVM Process进程(一个进程内里有多个线程),实行使命Task
Local模式可以限制模拟Spark集群环境的线程数目, 即Local 或 Local
[*]
此中N代表可以使用N个线程,每个线程拥有一个cpu core。如果不指定N, 则默认是1个线程(该线程有1个core)。 通常Cpu有几个Core,就指定几个 线程,最大化利用计算本领.*
如果是local
[*],则代表 Run Spark locally with as many worker threads as logical cores on your machine.按照Cpu最多的Cores设置线程数
Local 下的角色分布:
资源管理:
[*] Master:Local进程自己
[*] Worker:Local进程自己
使命实行:
[*] Driver:Local进程自己
[*] Executor:不存在,没有独立的Executor角色, 由Local进程(也就是Driver)内的线程提供计算本领
PS: Driver也算一种特别的Executor, 只不过多数时间, 我们将Executor当做纯Worker对待, 这样和Driver好区分(一类是管理 一类是工人)
2.3、搭建环境
[*] 开箱即用:直接启动bin目次下的
spark-shell:/opt/module/spark/bin/spark-shell 运行成功以后,有如下提示信息:
https://i-blog.csdnimg.cn/img_convert/2ca27818c7d5de14f297e8f75dd4558e.png
[*] sc:SparkContext实例对象:
[*] spark:SparkSession实例对象
[*] 4040:Web监控页面端标语
2.4、基于bin/pyspark
bin/pyspark 步伐, 可以提供一个 交互式的 Python解释器环境, 在这内里可以用Python语言调用Spark API 进行计算
https://i-blog.csdnimg.cn/img_convert/3fc1fab1610b034f4467cc44ae1a4237.png
示例代码 ,将数组内容都+进行 计 算 :
sc.parallelize().map(lambda x: x + 1).collect() 4040端口是一个WEBUI端口, 可以在浏览器内打开, 输入:服务器ip:4040 即可打开
https://i-blog.csdnimg.cn/img_convert/64345ce9cf87244ac903801415ba7306.png
打开监控页面后, 可以发现 在步伐内仅有一个Driver由于我们是Local模式, Driver即管理又干活. 同时, 输入jps可以看到local模式下的唯一进程存在 这个进程 即是master也是worker
2.5、基于bin/spark-submit测试
bin/spark-submit步伐, 作用: 提交指定的Spark代码到Spark环境中运行
./spark-submit ../examples/src/main/python/pi.py10 pyspark/spark-shell/spark-submit 对比
https://i-blog.csdnimg.cn/img_convert/cf81993cebca39d84268c8a577fc53f0.png
2.6、基于spark-shell
sc.parallelize(Array(1,2,3,4,5)).map(x=> x + 1).collect()
3、Spark环境搭建-Standalone
3.1、Standalone 架构
Standalone模式是Spark自带的一种集群模式,不同于前面本地模式启动多个进程来模拟集群的环境,Standalone模 式是真实地在多个机器之间搭建Spark集群的环境,完全可以利用该模式搭建多机器集群,用于实际的大数据处理。
StandAlone 是完整的Spark运行环境,此中:
Master角色以Master进程存在, Worker角色以Worker进程存在
Driver角色运行时间有大概存在Master节点上,也有大概不存在一起,Executor运行于Worker进程内,属于Worker一个子进程 由Worker提供资源供给它们运行
https://i-blog.csdnimg.cn/img_convert/0c8bdf1505e71d55206ba7a0ffb610be.png
StandAlone集群在进程上重要有3类进程:
主节点Master进程:
[*] Master角色, 管理整个集群资源,并托管运行各个使命的Driver
从节点Workers:
[*] Worker角色, 管理每个机器的资源,分配对应的资源来运行Executor(Task);每个从节点分配资源信息给Worker管理,资源信息包罗内存Memory和CPU Cores核数
历史服务器HistoryServer(可选):
Spark Application运行完成以后,生存变乱日记数据至HDFS,启动HistoryServer可以查看应用运行相干信息。
https://i-blog.csdnimg.cn/img_convert/b9dd7d0dd505b6d981c061819ced8ecb.png
3.2、环境搭建
3.2.1、安装Spark
上传Spark到/opt/software下
https://i-blog.csdnimg.cn/img_convert/d9f782a667d699c2e6bb89723ce3c8b1.png
tar -zxvf spark-3.2.0-bin-hadoop3.2.tgz -C /opt/module 解压到/opt/module下,
https://i-blog.csdnimg.cn/img_convert/5cd741de8435024810955a859341ba36.png
进入到/opt/module下,把文件夹重名为spark
mv spark-3.2.0-bin-hadoop3.2/ spark
https://i-blog.csdnimg.cn/img_convert/8f8a63a71890954e61e2344006a6e0fc.png
3.2.2、安装JDK和Python
JDK前提安装完毕
上传Anaconda3-2021.05-Linux-x86_64.sh
https://i-blog.csdnimg.cn/img_convert/95dd2081a19e6b4c7dffceb55df87d12.png
安装Anaconda3-2021.05-Linux-x86_64.sh
bash Anaconda3-2021.05-Linux-x86_64.sh
https://i-blog.csdnimg.cn/img_convert/b95b5b8db4fc6018edb9ebb95106ee2b.png
之间回车就可以
https://i-blog.csdnimg.cn/img_convert/423ece48f5f44817b7a35e0eca7a3aa8.png
输入Yes
https://i-blog.csdnimg.cn/img_convert/0378087947f4a906f9c9c11c208c4574.png
输入安装路径
https://i-blog.csdnimg.cn/img_convert/89fd7c46c08f842c118e01fc3c9f9790.png
输入yes进行初始化
退出机器,重新进入
https://i-blog.csdnimg.cn/img_convert/5aef5024a14df65b1b35b7928799bd15.png
3.2.3、设置国内源
如果你安装好后, 可以打开:vim ~/.condarc这个文件, 追加如下内容:
channels:
- defaults
show_channel_urls: true
default_channels:
- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main
- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/r
- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/msys2
custom_channels:
conda-forge: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
msys2: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
bioconda: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
menpo: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
pytorch: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
simpleitk: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud 3.2.4、创建捏造环境
conda env list
conda create -n pyspark python=3.8 #创建python3.8.8环境
conda activate pyspark #激活环境
conda deactivate pyspark #退出环境 3.3、环境变量
-SPARK_HOME: 表示Spark安装路径在哪里
-PYSPARK_PYTHON: 表示Spark想运行Python程序, 那么去哪里找python执行器
-JAVA_HOME: 告知Spark Java在哪里
-HADOOP_CONF_DIR: 告知Spark Hadoop的配置文件在哪里
-HADOOP_HOME: 告知Spark Hadoop安装在哪里 3.3.1、设置Spark
编辑 vim /etc/profile.d/my_env.sh
#SPARK_HOME
export SPARK_HOME=/opt/module/spark
export PYSPARK_PYTHON=/opt/module/anaconda3/envs/pyspark/bin/python3.8
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop 编辑 vim ~/.bashrc
export JAVA_HOME=/opt/module/jdk1.8.0_212
export PYSPARK_PYTHON=/opt/module/anaconda3/envs/pyspark/bin/python3.8 3.4、StandAlone部署
3.4.1、集群规划
hadoop102运行: Spark的Master进程 和 1个Worker进程
hadoop103运行: spark的1个worker进程
hadoop104运行: spark的1个worker进程
整个集群提供: 1个master进程 和 3个worker进程 3.4.2、在所有机器安装Anaconda3
3.4.3、编辑设置文件
进入到/opt/module/spark
[*] workers
# 改名
mv workers.template workers
# 编辑worker文件
vim workers
将里面的localhost删除, 追加
hadoop102
hadoop103
hadoop104
[*] 设置spark-env.sh文件
# 1. 改名
mv spark-env.sh.template spark-env.sh # 2. 编辑spark-env.sh, 在底部追加如下内容
## 设置JAVA安装目录
JAVA_HOME=/opt/module/jdk1.8.0_212
## HADOOP软件配置文件目录,读取HDFS上文件和运行YARN集群
HADOOP_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop
YARN_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop
## 指定spark老大Master的IP和提交任务的通信端口
# 告知Spark的master运行在哪个机器上
export SPARK_MASTER_HOST=hadoop102
# 告知sparkmaster的通讯端口
export SPARK_MASTER_PORT=7077
# 告知spark master的 webui端口
SPARK_MASTER_WEBUI_PORT=8080
# worker cpu可用核数
SPARK_WORKER_CORES=1
# worker可用内存
SPARK_WORKER_MEMORY=1g
# worker的工作通讯地址
SPARK_WORKER_PORT=7078
# worker的 webui地址
SPARK_WORKER_WEBUI_PORT=8081
## 设置历史服务器
# 配置的意思是将spark程序运行的历史日志 存到hdfs的/sparklog文件夹中
SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://hadoop102:9820/sparklog/ -Dspark.history.fs.cleaner.enabled=true" 在HDFS上创建步伐运行历史记录存放的文件夹:
hadoop fs -mkdir /sparklog
hadoop fs -chmod 777 /sparklog
[*] 设置spark-defaults.conf文件
# 1. 改名
mv spark-defaults.conf.template spark-defaults.conf
# 2. 修改内容, 追加如下内容
# 开启spark的日期记录功能
spark.eventLog.enabled true
# 设置spark日志记录的路径
spark.eventLog.dir hdfs://hadoop102:9820/sparklog/
# 设置spark日志是否启动压缩
spark.eventLog.compress true
[*] 设置log4j.properties(可选设置)
1、mv log4j.properties.template log4j.properties 2、把INFO级别改成WARN级别
3.4.4、启动历史服务器
进入到spark的sbin目次下
./start-history-server.sh
端口:18080 3.4.5、启动集群
./start-all.sh
https://i-blog.csdnimg.cn/img_convert/d7298a44e02883bbf291426fc540dff1.png
查看Master的WEB UI
默认端口master我们设置到了8080
如果端口被占用, 会顺延到8081 ...;8082... 8083... 直到申请到端口为止
可以在日记中查看, 详细顺延到哪个端口上:
https://i-blog.csdnimg.cn/img_convert/7b569d944ac555a02861f9f55a9f029b.png
3.4.5、集群测试
[*] bin/pyspark
bin/pyspark --master spark://hadoop102:7077
sc.parallelize().map(lambda x:x+1).collect()
[*] bin/spark-shell
bin/spark-shell --master spark://hadoop102:7077sc.parallelize(Array(1,2,3,4,5)).map(x=> x + 1).collect()
[*] bin/spark-submit (PI)
bin/spark-submit --master spark://hadoop102:7077 /opt/module/spark/examples/src/main/python/pi.py 100
3.4.6、Spark应用架构
https://i-blog.csdnimg.cn/img_convert/d0bb1b42e5afae5959154b514f88c09d.png
https://i-blog.csdnimg.cn/img_convert/d89f19ca7db9495f533bed9d5f9e19d3.png
Spark 应用架构从图中可以看到Spark Application运行到集群上时,由两部门构成:Driver Program和Executors。
第一、Driver Program
[*] 相当于AppMaster,整个应用管理者,负责应用中所有Job的调理实行;
[*] 运行JVM Process,运行步伐的MAIN函数,必须创建SparkContext上下文对象;
[*] 一个SparkApplication仅有一个;
第二、Executors
[*] 相当于一个线程池,运行JVM Process,此中有很多线程,每个线程运行一个Task使命,一个Task使命运行需要1 Core CPU,所 有可以认为Executor中线程数就等于CPU Core核数;
[*] 一个Spark Application可以有多个,可以设置个数和资源信息;
用户程序从最开始的提交到最终的计算执行,需要经历以下几个阶段:
用户程序创建 SparkContext 时,新创建的 SparkContext 实例会连接到 ClusterManager。 Cluster Manager 会根据用户 提交时设置的 CPU 和内存等信息为本次提交分配计算资源,启动 Executor。
Driver会将用户程序划分为不同的执行阶段Stage,每个执行阶段Stage由一组完全相同Task组成,这些Task分别作用于待处 理数据的不同分区。在阶段划分完成和Task创建后, Driver会向Executor发送 Task;
Executor在接收到Task后,会下载Task的运行时依赖,在准备好Task的执行环境后,会开始执行Task,并且将Task的运行状态 汇报给Driver;
Driver会根据收到的Task的运行状态来处理不同的状态更新。 Task分为两种:一种是Shuffle Map Task,它实现数据的重新 洗牌,洗牌的结果保存到Executor 所在节点的文件系统中;另外一种是Result Task,它负责生成结果数据;
Driver 会不断地调用Task,将Task发送到Executor执行,在所有的Task 都正确执行或者超过执行次数的限制仍然没有执行成 功时停止; 3.4.7、Spark步伐运行条理结构
在前面我们接触到了不少的监控页面,有4040,有8080,有18080,它们有何区别吗?
[*] 4040: 是一个运行的Application在运行的过程中临时绑定的端口,用以查看当前使命的状态.4040被占用会顺延到4041.4042等4040是一个临时端口,当前步伐运行完成后, 4040就会被注销
[*] 8080: 默认是StandAlone下, Master角色(进程)的WEB端口,用以查看当前Master(集群)的状态
[*] 18080: 默认是历史服务器的端口, 由于每个步伐运行完成后,4040端口就被注销了. 在以后想回看某个步伐的运行状态就可以通过历史 服务器查看,历史服务器恒久稳定运行,可供随时查看被记录的步伐的运行过程.
sc.textFile("hdfs://hadoop102:9820/word.txt").flatMap(lambda x:x.split(" ")).map(lambda x,(x,1)).reduceByKey(lambda a,b:a+b).collect()
https://i-blog.csdnimg.cn/img_convert/ae58f016da73495333a297baeb5f9099.png
Spark Application步伐运行时三个核心概念:Job、Stage、 Task,分析如下:
[*] Job:由多个 Task 的并行计算部门,一样平常 Spark 中的action 操作(如 save、collect,背面进一步分析),会 生成一个 Job。
[*] Stage:Job的构成单元,一个Job会切分成多个 Stage,Stage 彼此之间相互依赖顺序实行,而每个 Stage是多个Task 的集合,类似map 和reduce stage。
[*] Task:被分配到各个 Executor 的单元工作内容,它是Spark 中的最小实行单元,一样平常来说有多少个 Paritition(物理层面的概念,即分支可以理解为将数据分别成不同 部门并行处理),就会有多少个 Task,每个 Task 只会处 理单一分支上的数据。
3.4.8、总结
StandAlone的原理?
Master和Worker角色以独立进程的形式存在,并组成Spark运行时环境(集群) Standalone如何提交Spark应用?
bin/spark-submit --master spark://server:7077 4040\8080\18080分别是什么?
4040是单个程序运行的时候绑定的端口可供查看本任务运行情况 Job\State\Task的关系?
一个Spark程序会被分成多个子任务(Job)运行, 每一个Job会分成多个State(阶段)来 运行, 每一个State内会分出来多个Task(线程)来执行具体任务
4、Standalone HA
4.1、高可用HA
Spark Standalone集群是Master-Slaves架构的集群模式,和大部门的Master-Slaves结构集群一样,存在着Master单点故障(SPOF)的问题
https://i-blog.csdnimg.cn/img_convert/35a704b680d165291d5c98a6504dfe00.png
如何解决这个单点故障的问题,Spark提供了两种方案:
[*] 基于文件体系的单点恢复(Single-Node Recovery with Local File System)--只能用于开发或测试环境。
[*] 基于zookeeper的Standby Masters(Standby Masters with ZooKeeper)--可以用于生产环境。ZooKeeper提供了一个Leader Election机制,利用这个机制可以包管虽然集群存在多个Master,但是只有一个是Active的,其他的都是Standby。当Active的Master出现故障时,另外的一个Standby Master会被推选出来。由于集群的信息,包括Worker, Driver和Application的信息都已经长期化到文件体系,因此在切换的过程中只会影响新Job的提交,对 于正在进行的Job没有任何的影响。参加ZooKeeper的集群整体架构如下图所示。
https://i-blog.csdnimg.cn/img_convert/25eef0cacd30ec4be1e23247c599810d.png
4.2、基于Zookeeper实现HA
https://i-blog.csdnimg.cn/img_convert/9e408ed7dfae727c52bbfdb55e3640e5.png
在spark-env.sh中,删除
#export SPARK_MASTER_HOST=hadoop102 在spark-env.sh中, 增长:
SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hadoop102:2181,hadoop103:2181,hadoop104:2181 -Dspark.deploy.zookeeper.dir=/spark-ha" 将spark-env.sh 分发到每一台服务器上
xsync spark-env.sh 停止当前StandAlone集群
sbin/stop-all.sh 启动集群:
# 在hadoop102上 启动一个master 和全部worker
sbin/start-all.sh
# 注意, 下面命令在hadoop103上执行
sbin/start-master.sh
# 在hadoop103上启动一个备用的master进程 4.3、查看WebUI
http://hadoop102:8080/
http://hadoop103:8080/ 如果将hadoop102的Master进程Kill掉,hadoop103的Master在1Min-2Min左右会接替hadoop102的Master作用。 也就是在执行过程中,使用jps查看Active Master进程ID,将其kill,观察Master是否自动切换与应用运行完成结束。(需要等待1-2min)
5、Spark On YARN 环境搭建
5.1、引言
按照前面环境部署中所学习的, 如果我们想要一个稳定的生产Spark环境, 那么最优的选择就是构建:HA StandAlone集群.
不过在企业中, 服务器的资源总是告急的, 很多企业不管做什么业务,都根本上会有Hadoop集群. 也就是会有YARN集群.
对于企业来说,在已有YARN集群的前提下在单独准备Spark StandAlone集群,对资源的利用就不高. 以是, 在企业中,多 数场景下,会将Spark运行到YARN集群中.
YARN自己是一个资源调理框架, 负责对运行在内部的计算框架进行资源调理管理. 作为典型的计算框架, Spark自己也是直接运行在YARN中, 并接受YARN的调理的.
以是, 对于Spark On YARN, 无需部署Spark集群, 只要找一台服务器, 充当Spark的客户端, 即可提交使命到YARN集群中运行.
5.2、本质
https://i-blog.csdnimg.cn/img_convert/f9ca54ad1adf3cdd44ceeec1c177b7b8.png
Spark On Yarn的本质?
[*] Master角色由YARN的ResourceManager担任
[*] Worker角色由YARN的NodeManager担任
[*] Driver角色运行在YARN容器内 或 提交使命的客户端进程
[*] 真正干活的Executor运行在YARN提供的容器内
5.3、设置
确保:
[*] HADOOP_CONF_DIR
[*] YARN_CONF_DIR
在spark-env.sh 以及 环境变量设置文件中即可
5.4、测试
Spark On YARN是有两种运行模式的,
一种是Cluster模式一种是Client模式.这两种模式的区别就是Driver运行的位置.
Cluster模式即:Driver运行在YARN容器内部, 和ApplicationMaster在同一个容器内
Client模式即:Driver运行在客户端进程中, 比如Driver运行在spark-submit步伐的进程中
如图, 此为Cluster模式Driver运行在容器内部
https://i-blog.csdnimg.cn/img_convert/9ffed1c7b805ca2a669ffd2825837882.png
如图, 此为Client模式Driver运行在客户端步伐进程中(以spark-submit为例)
https://i-blog.csdnimg.cn/img_convert/c0031898cc958ce544cf4bf7348a4371.png
两种模式的区别
https://i-blog.csdnimg.cn/img_convert/915f2d43a1b311ec1b16456386ea0af0.png
bin/pyspark
bin/pyspark --master yarn --deploy-mode client|cluster
# --deploy-mode 选项是指定部署模式, 默认是 客户端模式
# client就是客户端模式
# cluster就是集群模式
# --deploy-mode 仅可以用在YARN模式下 注意: 交互式环境 pyspark 和 spark-shell 无法运行 cluster模式
bin/spark-shell
bin/spark-shell --master yarn --deploy-mode client|cluster 注意: 交互式环境 pyspark 和 spark-shell 无法运行 cluster模式
bin/spark-submit (PI)
bin/spark-submit --master yarn --deploy-mode client|cluster /xxx/xxx/xxx.py 参数
5.5、两种模式运行代码
假设运行圆周率PI程序,采用client模式,命令如下:
bin/spark-submit
--master yarn
--deploy-mode client
--driver-memory 512m
--executor-memory 512m
--num-executors 1
--texecutor-cores 2
/examples/src/main/python/pi.py10
bin/spark-submit
--master yarn
--deploy-mode cluster
--driver-memory 512m
--executor-memory 512m
--num-executors 1
--executor-cores 2
/examples/src/main/python/pi.py10
5.6、Spark On Yarn两种模式总结
Client模式和Cluster模式最最本质的区别是:Driver步伐运行在哪里。
[*] Client模式:学习测试时使用,生产不保举(要用也可以,性能略低,稳定性略低)
[*] Driver运行在Client上,和集群的通信成本高
[*] Driver输出结果会在客户端显示
[*] Cluster模式:生产环境中使用该模式
[*] Driver步伐在YARN集群中,和集群的通信成本低
[*] Driver输出结果不能在客户端显示
[*] 该模式下Driver运行ApplicattionMaster这个节点上,由Yarn管理,如果出现问题,yarn会重启 ApplicattionMaster(Driver)
在YARN Client模式下,Driver在使命提交的本地机器上运行,示意图如下
https://i-blog.csdnimg.cn/img_convert/b5a6a08cd2fd431f60a923d2c5a1ae15.png
https://i-blog.csdnimg.cn/img_convert/4220f04013be0e619380555916836cf0.png
1、Driver在任务提交的本地机器上运行,Driver启动后会和ResourceManager通讯申请启动ApplicationMaster
2、随后ResourceManager分配Container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster的功能相当于一个ExecutorLaucher,只负责向ResourceManager申请Executor内存;
3、ResourceManager接到ApplicationMaster的资源申请后会分配Container,然后ApplicationMaster在资源分 配指定的NodeManager上启动Executor进程;
4、Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数;
5、之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分Stage,每个Stage生成对应的TaskSet,之后 将Task分发到各个Executor上执行。
在YARN Cluster模式下,Driver运行在NodeManager Contanier中,此时Driver与AppMaster合为一体,示意图如
https://i-blog.csdnimg.cn/img_convert/16e1b6a772c6cad0c6d6f2bc15d4afce.png
1、任务提交后会和ResourceManager通讯申请启动ApplicationMaster;
2、随后ResourceManager分配Container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver;
3、Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请 后会分配Container,然后在合适的NodeManager上启动Executor进程;
4、Executor进程启动后会向Driver反向注册;
5、Executor全部注册完成后Driver开始执行main函数,之后执行到Action算子时,触发一个job,并根据宽依赖开 始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行;
6、Pyspark
我们前面使用过bin/pyspark 步伐, 要注意, 这个只是一个应用步伐, 提供一个Python解释器实行环境来运行Spark使命 我们现在说的PySpark, 指的是Python的运行类库, 是可以在Python代码中:import pyspark
PySpark 是Spark官方提供的一个Python类库, 内置了完全的Spark API, 可以通过PySpark类库来编写Spark应用步伐, 并将其提交到Spark集群中运行.下图是,PySpark类库和标准Spark框架的简单对比
https://i-blog.csdnimg.cn/img_convert/6b8e1cc5038ec11bbf0cd06408dd11cd.png
6.1、在集群安装Pyspark
conda activate pyspark #激活环境
pip install pyspark==3.2.0 6.2、在本地安装Pyspark
pip install pyspark==3.2.0 -i https://pypi.douban.com/simple 6.3、连接集群环境
通过pycharm--->file--->settings
https://i-blog.csdnimg.cn/img_convert/4df4880b00f37a1f01b48670ed0f647b.png
https://i-blog.csdnimg.cn/img_convert/6573f08bb8d51527be9c64ca3e597298.png
/opt/module/anaconda3/envs/pyspark/bin/python
https://i-blog.csdnimg.cn/img_convert/51dd3a83131b70ed53d800ed39fcd1ca.png
7、PySpark开发入口
Spark Application步伐入口为:SparkContext,任何一个应用首先需要构建SparkContext对象,如下两步构建:
[*] 第一步、创建SparkConf对象设置Spark Application根本信息,比如应用的名称AppName和应用运行Master
[*] 第二步、基于SparkConf对象,创建SparkContext对象
文档:http://spark.apache.org/docs/3.1.2/rdd-programming-guide.html
https://i-blog.csdnimg.cn/img_convert/3bd75ae89a52199c71ddb0d74e6563c5.png
7.1、WordCount
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("yarn").set("spark.hadoop.fs.defaultFS", "hdfs://hadoop102:9820").setAppName("wc")
sc = SparkContext(conf=conf)
rdd = sc.parallelize().map(lambda x: x + 1).collect()
print(rdd) # coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("WordCountHelloWorld").setMaster("local[*]")
# 通过SparkConf对象构建SparkContext对象
sc = SparkContext(conf=conf)
# 需求 : wordcount单词计数, 读取HDFS上的words.txt文件, 对其内部的单词统计出现 的数量
# 读取文件
file_rdd = sc.textFile("hdfs://hadoop102:9820/word.txt")
# 将单词进行切割, 得到一个存储全部单词的集合对象
words_rdd = file_rdd.flatMap(lambda line: line.split(" "))
# 将单词转换为元组对象, key是单词, value是数字1
words_with_one_rdd = words_rdd.map(lambda x: (x, 1))
# 将元组的value 按照key来分组, 对所有的value执行聚合操作(相加)
result_rdd = words_with_one_rdd.reduceByKey(lambda a, b: a + b)
# 通过collect方法收集RDD的数据打印输出结果
print(result_rdd.collect())
原理分析
https://i-blog.csdnimg.cn/img_convert/b7e4f2b5e0a5344e9bb26ff6d8fd0051.png
7.2、提交到集群运行
注意:去掉setMaster("local
[*]")
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("WordCountHelloWorld")
# 通过SparkConf对象构建SparkContext对象
sc = SparkContext(conf=conf)
# 需求 : wordcount单词计数, 读取HDFS上的words.txt文件, 对其内部的单词统计出现 的数量
# 读取文件
file_rdd = sc.textFile("hdfs://hadoop102:9820/word.txt")
# 将单词进行切割, 得到一个存储全部单词的集合对象
words_rdd = file_rdd.flatMap(lambda line: line.split(" "))
# 将单词转换为元组对象, key是单词, value是数字1
words_with_one_rdd = words_rdd.map(lambda x: (x, 1))
# 将元组的value 按照key来分组, 对所有的value执行聚合操作(相加)
result_rdd = words_with_one_rdd.reduceByKey(lambda a, b: a + b)
# 通过collect方法收集RDD的数据打印输出结果
print(result_rdd.collect()) bin/spark-submit --master yarn /tmp/pycharm_project_931/00_example/hw.py
7.3、Python On Spark 实行原理
PySpark宗旨是在不粉碎Spark已有的运行时架构,在Spark架构外层包装一层Python API,借助Py4j实现Python和Java的交互,进而实现通过Python编写Spark应用步伐,其运行时架构如下图所示。
https://i-blog.csdnimg.cn/img_convert/b024428e7c6ae8b9352b03799e92007f.png
https://i-blog.csdnimg.cn/img_convert/5a6c9a7a16e8e662b1fcfb0af30e34df.png
8、分布式代码实行分析
8.1、Spark 集群角色回顾(以YARN为例)
https://i-blog.csdnimg.cn/img_convert/128ba005423d8928cea36d107e2d888c.png
8.2、分布式代码实行分析
Spark Application应用步伐运行时,无论client还是cluster部署模式DeployMode,当Driver Program和Executors启动完成以后,就要开始实行应用步伐中MAIN函数的代码,以词频统计WordCount步伐为例剖析解说
https://i-blog.csdnimg.cn/img_convert/d95ed6e52703708e332ed22df6ad1ca4.png
第一、构建SparkContex对象和关闭SparkContext资源,都是在Driver Program中实行,上图中①和③都是,如 下图所示:
https://i-blog.csdnimg.cn/img_convert/4a856d110108ba2d678985685b16036f.png
第二、上图中②的加载数据【A】、处理数据【B】和输出数据【C】代码,都在Executors上实行,从WEB UI监控 页面可以看到此Job(RDD#action触发一个Job)对应DAG图,如下所示:
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]