Spark集群搭建
一、基础集群脚本
1、长途调用脚本(remote_call.sh)
如果有传命令参数,则执行该命令;如果没有传命令参数,则不执行。
- #!/bin/bash
- cmd=$1
- if [ ! $cmd ];then
- cmd="jps"
- fi
- # 提取集群免密通信的虚拟机主机名
- hosts=`sed -n '3,$p' /etc/hosts | awk '{print $2}'`
- # 遍历所有主机
- for host in $hosts;do
- echo "-------- $host --------"
- # 此处使用"$cmd"的原因是避免将命令中的空格识别为多条命令
- ssh root@$host "$cmd"
- done
复制代码 2、长途复制目次脚本(remote_copy.sh)
起首,要验证待复制的目次在本机是否存在;然后需要从/etc/hosts文件中获取撤消当前主机名的其他主机名,并且对每个主机举行循环操作,先判断父目次是否存在,再举行递归复制。
- #!/bin/bash
- path=$1 # 本机目录
- # 验证路径是否存在
- if [ ! -e $path ];then
- echo "目录 $path 不存在,无法拷贝"
- exit 0
- fi
- # 提取集群中排除当前主机名之外的所有主机名
- # 只需验证父目录是否存在即可
- me=`hostname`
- parent=`dirname $path`
- hosts=`sed -n '3,$p' /etc/hosts | awk '{print $2}' | grep -v "$me"`
- for host in $hosts;do
- scp -r $path root@$host:$parent
- echo "-------- 拷贝 $path 到 $host 成功 --------"
- done
复制代码 3、Spark集群服务管理脚本
按照"先开的后关"的原则,按照"ZooKeeper-HDFS-YARN-Spark-启用其他Master节点"的次序举行集群的服务管理
- #!/bin/bash
- if [ "$1" == "start" ];then
- # 自定义启动服务函数,传入两个字符串参数:参数1为启动命令,参数2为启动方式
- function startService(){
- # 定义两个局部遍历依次接收用户传入的两个参数
- # service 为启动命令
- local service=$1
- # exeType 为启动方式(daemon:后台启动(用于那些会占用窗口的服务),非daemon表示正常启动)
- local exeType=$2
- if [ "$exeType" == "daemon" ];then
- # 后台启动
- nohup $service 1>/dev/null 2>&1 &
- else
- # 正常启动
- $service 1>/dev/null 2>&1
- fi
- # 判断执行启动脚本后的状态
- if [ $? -ne 0 ];then
- # 因为服务按从上到下有依赖关系,所以任何一个脚本执行出错,将退出整个脚本,所以选择exit而非return
- # 函数按通用规则 return 0:正常,非0:表示异常;若函数中return缺省,则默认返回最后一条命名的状态即改命令执行后的$?
- echo "ERROR : FAIL TO EXECUTE $service IN MODE OF $exeType ... EXIT ..."
- # 退出脚本
- exit 1
- else
- # 成功则正常输出提示结果
- echo "INFO : SUCCESS TO EXECUTE $service IN MODE OF $exeType"
- fi
- }
- # 依次按服务依赖顺序调用启动服务函数,并为每次调用传入启动命令和启动方式
- # 启动 Zookeeper
- /root/install/remote_call.sh "zkServer.sh start"
- # 启动 hadoop hdfs
- startService "start-dfs.sh" "non-daemon"
- # 启动 hadoop yarn
- startService "start-yarn.sh" "non-daemon"
- # 启动 Spark
- startService "start-all.sh" "non-daemon"
- # 实现集群高可用
- ssh root@master02 "start-master.sh"
- elif [ "$1" == "stop" ];then
- # 自定义启动服务函数,传入两个字符串参数:参数1为关闭命令,参数2为关闭方式
- function stopService(){
- # 服务名称或关闭命令
- local service=$1
- # 关闭方式(command:命令关闭,pid:根据服务查找进程编号(pid)在借助kill命令关闭)
- local exeType=$2
- if [ "$exeType" == "command" ];then
- # 直接执行参数一命令关闭服务
- $service 1>/dev/null 2>&1
- # 根据关闭命令执行的状态展示结果
- if [ $? -eq 0 ];then
- echo "INFO : SUCCESS TO EXECUTE $service"
- else
- echo "ERROR : FAIL TO EXECUTE $service"
- fi
- else
- # 根据参数一传入的服务名称查看进程编号(pid)
- local pid=$(jps -ml|grep $service|awk '{print $1}')
- if [ "$pid" ];then
- # 如果进程编号存在,则直接强制 kill
- kill -9 $pid
- # 根据kill的状态展示结果
- if [ $? -eq 0 ];then
- echo "INFO : SUCCESS TO KILL $service WITH PID $pid"
- else
- echo "ERROR : FAIL TO KILL $service WITH PID $pid"
- fi
- else
- echo "INFO : NO SERVICE EXIST WITH NAME OF $service"
- fi
- fi
- }
- # 根据服务的依赖关系,逆向逐个关闭服务
- # 关闭高可用另外起的master
- ssh root@master02 "stop-master.sh"
- # 关闭Spark
- stopService "stop-all.sh" "command"
- # 最后关闭 yarn 和 hdfs
- stopService "stop-yarn.sh" "command"
- stopService "stop-dfs.sh" "command"
- # 关闭ZooKeeper
- /root/install/remote_call.sh "zkServer.sh stop"
- else
- # 附带查一下java进程
- jps -ml
- fi
复制代码 如果需要切换主节点的状态,可以先通过stop-master.sh使A-Master节点状态由Active->Standby,待B-Master根据机制切换为Active态之后再重启A-Master节点。
二、配置相关文件
- cd /opt/software/spark-3.1.2/conf
- // mv xxx.template xxx 将所有template文件去除后缀为正式配置文件
- vim spark-env.sh
- -------------------------------------------------------------------
- export JAVA_HOME=/opt/software/jdk1.8.0_171
- export HADOOP_CONF_DIR=/opt/software/hadoop-3.1.3/etc/hadoop
- SPARK_MASTER_WEBUI_PORT=9090
- SPARK_HISTORY_OPTS="
- -Dspark.history.ui.port=9091
- -Dspark.history.fs.logDirectory=hdfs://master01:8020/spark_event_log_dir
- -Dspark.history.retainedApplications=30
- "
- SPARK_DAEMON_JAVA_OPTS="
- -Dspark.deploy.recoveryMode=ZOOKEEPER
- -Dspark.deploy.zookeeper.url=master01,master02,worker01
- -Dspark.deploy.zookeeper.dir=/spark
- "
- -------------------------------------------------------------------
- vim workers // 配置集群所有机器名称
- -------------------------------------------------------------------
- master01
- master02
- worker01
- -------------------------------------------------------------------
- vim spark-default.conf
- -------------------------------------------------------------------
- spark.eventLog.enabled true
- spark.eventLog.dir hdfs://master01:8020/spark_event_log_dir
- spark.yarn.historyServer.address=master01:9091
- spark.history.ui.port=9091
- -------------------------------------------------------------------
- vim yarn-site.xml // [选配]:在启动Yarn模式时才需要配置
- -------------------------------------------------------------------
- <property>
- <name>yarn.nodemanager.pmem-check-enabled</name>
- <value>false</value>
- </property>
- <property>
- <name>yarn.nodemanager.vmem-check-enabled</name>
- <value>false</value>
- </property>
- -------------------------------------------------------------------
- ./remote_copy.sh /opt/software/spark-3.1.2/ // 拷贝Spark内容
- ./remote_copy.sh /etc/profile.d/myenv.sh // 拷贝环境变量
- ./remote_call.sh "source /etc/profile"
- ./remote_call.sh "/opt/software/service-bigdata.sh start" // 完整启动Spark集群中的所有服务
复制代码 在此配置下,有以下关键信息:
- 端口配置
- Spark Master Web UI:9090
- Spark HistoryServer UI:9091
- 日志文件路径
- 日志文件路径:hdfs://master01:8020/spark_event_log_dir
- 关闭内存检查的原因:Spark使命吃资源,不能为了保证资源不被太过利用就强行停止Spark使命。
三、使命提交案例
1. 案例详解
ProductsAnalyzer类
val path = args(0) val dirPath = args(1) 表示输入和输出路径在提交使命时具体指定。
- package envtest
- import core.{SparkSessionBuilder, SparkSessionBuilderDeployer}
- import org.apache.spark.SparkContext
- import org.apache.spark.rdd.RDD
- import org.apache.spark.sql.expressions.Window
- object ProductsAnalyzer {
- case class Product(name: String, price: Double, date: String, market: String, province: String, city: String)
- implicit class StrExt(line: String){
- // 香菜 2.80 2018/1/1 山西汾阳市晋阳农副产品批发市场 山西 汾阳
- val regexProduct = "(.*?)\t(.*?)\t(.*?)\t(.*?)\t(.*?)\t(.*?)".r
- def toProduct: Product = {
- line match {
- // 使用模式匹配将字符串和正则表达式进行匹配,如果成功,则将各个匹配到的部分转换为Product对象并进行返回
- // java.lang.NumberFormatException: empty String:输入数据中的某些行可能不符合预期的格式,导致 toDouble 转换失败。解决方案是:使其转化为Product对象时,如果为空,则转化为0.0f
- case regexProduct(name, price, date, market, province, city) => Product(name, if (price.trim.isEmpty) 0.0f else price.toDouble, date, market, province, city)
- case _ => throw new RuntimeException(s"产品数据格式有误:$line")
- }
- }
- }
- def main(args: Array[String]): Unit = {
- /**
- * 在Edit Configurations中指定参数,通过args[0]和args[1]获取输入路径和输出路径
- */
- val path = args(0)
- val dirPath = args(1)
- val ssb: SparkSessionBuilderDeployer = SparkSessionBuilderDeployer()
- val spark = ssb.spark
- val sc: SparkContext = ssb.sc
- val rddProduct: RDD[Product] = sc.textFile(path)
- .mapPartitions(_.map(_.toProduct))
- import org.apache.spark.sql.functions._
- import spark.implicits._
- // 统计每个省份下每个商品的平均价格和商品数量,并且在不同省份下按照商品数量创建排名指标
- spark.createDataFrame(rddProduct)
- .groupBy($"province",$"name")
- .agg(
- avg($"price").as("avg_price"),
- count("*").as("product_cnt")
- )
- .select($"province",$"name",$"avg_price",$"product_cnt",dense_rank().over(Window.partitionBy($"province").orderBy($"avg_price".desc)).as("rank"))
- .repartition(1)
- .write
- .mode(saveMode = "overwrite")
- .option("separator",",")
- .option("header", "true")
- .csv(dirPath)
- spark.stop()
- }
- }
复制代码 SparkSessionBuilderDeployer类
SparkSessionBuilderDeployer() 使使命提交案例关于运行模式和名称的配置在使命提交时再举行设置,提升案例的重用性。
- package core
- import org.apache.spark.sql.SparkSession
- import org.apache.spark.{SparkConf, SparkContext}
- // 封装SparkSession的创建方法
- // SparkSessionBuilderDeployer() 使任务提交案例关于运行模式和名称的配置在任务提交时再进行设置,提升案例的重用性。
- class SparkSessionBuilderDeployer(){
- lazy val config:SparkConf = {
- new SparkConf()
- }
- lazy val spark:SparkSession = {
- SparkSession.builder()
- .config(config)
- .getOrCreate()
- }
- lazy val sc:SparkContext = {
- spark.sparkContext
- }
- def stop(): Unit = {
- if (null != spark) {
- spark.stop()
- }
- }
- }
- object SparkSessionBuilderDeployer {
- def apply(): SparkSessionBuilderDeployer = new SparkSessionBuilderDeployer
- }
复制代码 在具体测试案例代码时,可以先通过对path和dirpath指定具体路径的方式来测试整体逻辑,再通过Edit Configurations设置具体参数来测试传参后的程序逻辑,测试完毕之后,即可直接将输入和输出路径分别修改为args(0)和args(1),并且直接打包发布到假造机,供给Spark集群举行执行。
2. Spark Submit 选项详解
- 主要选项
- –class :
- 指定应用程序的主类,即包含 main 方法的类。
- 示例:--class envtest.ProductsAnalyzer
- –master :
- 指定 Spark Master 节点的 URL。
- 示例:
- Standalone 模式:
- --master spark://master01:7077
复制代码
- 表示 Spark Master 节点运行在 master01 主机的 7077 端口上。
- YARN 模式:
- –deploy-mode :
- 指定应用程序的运行模式。
- 模式:
- client:Driver 程序运行在提交使命的当地呆板上。
- cluster:Driver 程序运行在集群的某个节点上。
- 示例:--deploy-mode cluster
- –name :
- 指定应用程序的名称。
- 示例:--name spark-sql-product
- :
- Spark 应用程序的 JAR 文件的路径,包含了主类及其依靠。
- 示例:/root/spark/spark_sql-1.0.jar
- <application-argument(s)>:
- 应用程序的输入参数。
- 示例:
- 输入路径:hdfs://master02:8020/spark/data/products.txt
- 输出路径:hdfs://master02:8020/spark/result/product_result
- 具体代码
standalone
- spark-submit \--class envtest.ProductsAnalyzer \--master spark://master01:7077
- \--deploy-mode client \--name spark-sql-product \/root/spark/spark_sql-1.0.jar \hdfs://master02:8020/spark/data/products.txt \hdfs://master02:8020/spark/result/product_result
复制代码 yarn-cluster
- spark-submit \--class envtest.ProductsAnalyzer \--master yarn
- \--deploy-mode cluster \--name spark-sql-product \/root/spark/spark_sql-1.0.jar \hdfs://master02:8020/spark/data/products.txt \hdfs://master02:8020/spark/result/product_result
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |