Spark集群搭建

立山  金牌会员 | 2024-7-15 16:20:06 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 840|帖子 840|积分 2520




  
Spark集群搭建

一、基础集群脚本

1、长途调用脚本(remote_call.sh)

如果有传命令参数,则执行该命令;如果没有传命令参数,则不执行。
  1. #!/bin/bash
  2. cmd=$1
  3. if [ ! $cmd ];then
  4.         cmd="jps"
  5. fi
  6. # 提取集群免密通信的虚拟机主机名
  7. hosts=`sed -n '3,$p' /etc/hosts | awk '{print $2}'`
  8. # 遍历所有主机
  9. for host in $hosts;do
  10.         echo "-------- $host --------"
  11.         # 此处使用"$cmd"的原因是避免将命令中的空格识别为多条命令
  12.         ssh root@$host "$cmd"
  13. done
复制代码
2、长途复制目次脚本(remote_copy.sh)

起首,要验证待复制的目次在本机是否存在;然后需要从/etc/hosts文件中获取撤消当前主机名的其他主机名,并且对每个主机举行循环操作,先判断父目次是否存在,再举行递归复制。
  1. #!/bin/bash
  2. path=$1 # 本机目录
  3. # 验证路径是否存在
  4. if [ ! -e $path ];then
  5.         echo "目录 $path 不存在,无法拷贝"
  6.         exit 0
  7. fi
  8. # 提取集群中排除当前主机名之外的所有主机名
  9. # 只需验证父目录是否存在即可
  10. me=`hostname`
  11. parent=`dirname $path`
  12. hosts=`sed -n '3,$p' /etc/hosts | awk '{print $2}' | grep -v "$me"`
  13. for host in $hosts;do
  14.         scp -r $path root@$host:$parent
  15.         echo "-------- 拷贝 $path 到 $host 成功 --------"
  16. done
复制代码
3、Spark集群服务管理脚本

按照"先开的后关"的原则,按照"ZooKeeper-HDFS-YARN-Spark-启用其他Master节点"的次序举行集群的服务管理
  1. #!/bin/bash
  2. if [ "$1" == "start" ];then
  3.         # 自定义启动服务函数,传入两个字符串参数:参数1为启动命令,参数2为启动方式
  4.     function startService(){
  5.                 # 定义两个局部遍历依次接收用户传入的两个参数
  6.                 # service 为启动命令
  7.                 local service=$1
  8.                 # exeType 为启动方式(daemon:后台启动(用于那些会占用窗口的服务),非daemon表示正常启动)
  9.                 local exeType=$2
  10.                 if [ "$exeType" == "daemon" ];then
  11.                         # 后台启动
  12.                         nohup $service 1>/dev/null 2>&1 &
  13.                 else
  14.                         # 正常启动
  15.                         $service 1>/dev/null 2>&1
  16.                 fi
  17.                 # 判断执行启动脚本后的状态
  18.                 if [ $? -ne 0 ];then
  19.                         # 因为服务按从上到下有依赖关系,所以任何一个脚本执行出错,将退出整个脚本,所以选择exit而非return
  20.                         # 函数按通用规则 return 0:正常,非0:表示异常;若函数中return缺省,则默认返回最后一条命名的状态即改命令执行后的$?
  21.                         echo "ERROR : FAIL TO EXECUTE $service IN MODE OF $exeType ... EXIT ..."
  22.                         # 退出脚本
  23.                         exit 1
  24.                 else
  25.                         # 成功则正常输出提示结果
  26.                         echo "INFO : SUCCESS TO EXECUTE $service IN MODE OF $exeType"
  27.                 fi
  28.     }
  29.         # 依次按服务依赖顺序调用启动服务函数,并为每次调用传入启动命令和启动方式
  30.         # 启动 Zookeeper
  31.         /root/install/remote_call.sh "zkServer.sh start"
  32.         # 启动 hadoop hdfs
  33.     startService "start-dfs.sh" "non-daemon"
  34.         # 启动 hadoop yarn
  35.     startService "start-yarn.sh" "non-daemon"
  36.         # 启动 Spark
  37.         startService "start-all.sh" "non-daemon"
  38.         # 实现集群高可用
  39.         ssh root@master02 "start-master.sh"
  40. elif [ "$1" == "stop" ];then
  41.         # 自定义启动服务函数,传入两个字符串参数:参数1为关闭命令,参数2为关闭方式
  42.     function stopService(){
  43.                 # 服务名称或关闭命令
  44.                 local service=$1
  45.                 # 关闭方式(command:命令关闭,pid:根据服务查找进程编号(pid)在借助kill命令关闭)
  46.                 local exeType=$2
  47.                 if [ "$exeType" == "command" ];then
  48.                         # 直接执行参数一命令关闭服务
  49.                         $service 1>/dev/null 2>&1
  50.                         # 根据关闭命令执行的状态展示结果
  51.                         if [ $? -eq  0 ];then
  52.                                         echo "INFO : SUCCESS TO EXECUTE $service"
  53.                         else
  54.                                         echo "ERROR : FAIL TO EXECUTE $service"
  55.                         fi
  56.                 else
  57.                         # 根据参数一传入的服务名称查看进程编号(pid)
  58.                         local pid=$(jps -ml|grep $service|awk '{print $1}')
  59.                         if [ "$pid" ];then
  60.                                 # 如果进程编号存在,则直接强制 kill
  61.                                 kill -9 $pid
  62.                                 # 根据kill的状态展示结果
  63.                                 if [ $? -eq  0 ];then
  64.                                                         echo "INFO : SUCCESS TO KILL $service WITH PID $pid"
  65.                                         else
  66.                                                         echo "ERROR : FAIL TO KILL $service WITH PID $pid"
  67.                                         fi
  68.                         else
  69.                                 echo "INFO : NO SERVICE EXIST WITH NAME OF $service"
  70.                         fi
  71.                 fi
  72.     }
  73.         # 根据服务的依赖关系,逆向逐个关闭服务
  74.         # 关闭高可用另外起的master
  75.         ssh root@master02 "stop-master.sh"
  76.         # 关闭Spark
  77.         stopService "stop-all.sh" "command"
  78.         # 最后关闭 yarn 和 hdfs
  79.     stopService "stop-yarn.sh" "command"
  80.     stopService "stop-dfs.sh" "command"
  81.         # 关闭ZooKeeper
  82.         /root/install/remote_call.sh "zkServer.sh stop"
  83. else
  84.         # 附带查一下java进程
  85.     jps -ml
  86. fi
复制代码
如果需要切换主节点的状态,可以先通过stop-master.sh使A-Master节点状态由Active->Standby,待B-Master根据机制切换为Active态之后再重启A-Master节点。
二、配置相关文件

  1. cd /opt/software/spark-3.1.2/conf
  2. // mv xxx.template xxx 将所有template文件去除后缀为正式配置文件
  3. vim spark-env.sh
  4. -------------------------------------------------------------------
  5. export JAVA_HOME=/opt/software/jdk1.8.0_171
  6. export HADOOP_CONF_DIR=/opt/software/hadoop-3.1.3/etc/hadoop
  7. SPARK_MASTER_WEBUI_PORT=9090
  8. SPARK_HISTORY_OPTS="
  9. -Dspark.history.ui.port=9091
  10. -Dspark.history.fs.logDirectory=hdfs://master01:8020/spark_event_log_dir
  11. -Dspark.history.retainedApplications=30
  12. "
  13. SPARK_DAEMON_JAVA_OPTS="
  14. -Dspark.deploy.recoveryMode=ZOOKEEPER
  15. -Dspark.deploy.zookeeper.url=master01,master02,worker01
  16. -Dspark.deploy.zookeeper.dir=/spark
  17. "
  18. -------------------------------------------------------------------
  19. vim workers // 配置集群所有机器名称
  20. -------------------------------------------------------------------
  21. master01
  22. master02
  23. worker01
  24. -------------------------------------------------------------------
  25. vim spark-default.conf
  26. -------------------------------------------------------------------
  27. spark.eventLog.enabled           true
  28. spark.eventLog.dir               hdfs://master01:8020/spark_event_log_dir
  29. spark.yarn.historyServer.address=master01:9091
  30. spark.history.ui.port=9091
  31. -------------------------------------------------------------------
  32. vim yarn-site.xml // [选配]:在启动Yarn模式时才需要配置
  33. -------------------------------------------------------------------
  34. <property>
  35.     <name>yarn.nodemanager.pmem-check-enabled</name>
  36.     <value>false</value>
  37. </property>
  38. <property>
  39.     <name>yarn.nodemanager.vmem-check-enabled</name>
  40.     <value>false</value>
  41. </property>
  42. -------------------------------------------------------------------
  43. ./remote_copy.sh /opt/software/spark-3.1.2/ // 拷贝Spark内容
  44. ./remote_copy.sh /etc/profile.d/myenv.sh // 拷贝环境变量
  45. ./remote_call.sh "source /etc/profile"
  46. ./remote_call.sh "/opt/software/service-bigdata.sh start" // 完整启动Spark集群中的所有服务
复制代码
在此配置下,有以下关键信息:


  • 端口配置

    • Spark Master Web UI:9090
    • Spark HistoryServer UI:9091

  • 日志文件路径

    • 日志文件路径:hdfs://master01:8020/spark_event_log_dir

  • 关闭内存检查的原因:Spark使命吃资源,不能为了保证资源不被太过利用就强行停止Spark使命。
三、使命提交案例

1. 案例详解

ProductsAnalyzer类
val path = args(0) val dirPath = args(1) 表示输入和输出路径在提交使命时具体指定。
  1. package envtest
  2. import core.{SparkSessionBuilder, SparkSessionBuilderDeployer}
  3. import org.apache.spark.SparkContext
  4. import org.apache.spark.rdd.RDD
  5. import org.apache.spark.sql.expressions.Window
  6. object ProductsAnalyzer {
  7.   case class Product(name: String, price: Double, date: String, market: String, province: String, city: String)
  8.   implicit class StrExt(line: String){
  9.     // 香菜        2.80        2018/1/1        山西汾阳市晋阳农副产品批发市场        山西        汾阳
  10.     val regexProduct = "(.*?)\t(.*?)\t(.*?)\t(.*?)\t(.*?)\t(.*?)".r
  11.     def toProduct: Product = {
  12.       line match {
  13.         // 使用模式匹配将字符串和正则表达式进行匹配,如果成功,则将各个匹配到的部分转换为Product对象并进行返回
  14.         // java.lang.NumberFormatException: empty String:输入数据中的某些行可能不符合预期的格式,导致 toDouble 转换失败。解决方案是:使其转化为Product对象时,如果为空,则转化为0.0f
  15.         case regexProduct(name, price, date, market, province, city) => Product(name, if (price.trim.isEmpty) 0.0f else price.toDouble, date, market, province, city)
  16.         case _ => throw new RuntimeException(s"产品数据格式有误:$line")
  17.       }
  18.     }
  19.   }
  20.   def main(args: Array[String]): Unit = {
  21.     /**
  22.      * 在Edit Configurations中指定参数,通过args[0]和args[1]获取输入路径和输出路径
  23.      */
  24.     val path = args(0)
  25.     val dirPath = args(1)
  26.     val ssb: SparkSessionBuilderDeployer = SparkSessionBuilderDeployer()
  27.     val spark = ssb.spark
  28.     val sc: SparkContext = ssb.sc
  29.     val rddProduct: RDD[Product] = sc.textFile(path)
  30.       .mapPartitions(_.map(_.toProduct))
  31.     import org.apache.spark.sql.functions._
  32.     import spark.implicits._
  33.     // 统计每个省份下每个商品的平均价格和商品数量,并且在不同省份下按照商品数量创建排名指标
  34.     spark.createDataFrame(rddProduct)
  35.       .groupBy($"province",$"name")
  36.       .agg(
  37.         avg($"price").as("avg_price"),
  38.         count("*").as("product_cnt")
  39.       )
  40.       .select($"province",$"name",$"avg_price",$"product_cnt",dense_rank().over(Window.partitionBy($"province").orderBy($"avg_price".desc)).as("rank"))
  41.       .repartition(1)
  42.       .write
  43.       .mode(saveMode = "overwrite")
  44.       .option("separator",",")
  45.       .option("header", "true")
  46.       .csv(dirPath)
  47.     spark.stop()
  48.   }
  49. }
复制代码
SparkSessionBuilderDeployer类
SparkSessionBuilderDeployer() 使使命提交案例关于运行模式和名称的配置在使命提交时再举行设置,提升案例的重用性。
  1. package core
  2. import org.apache.spark.sql.SparkSession
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. // 封装SparkSession的创建方法
  5. // SparkSessionBuilderDeployer() 使任务提交案例关于运行模式和名称的配置在任务提交时再进行设置,提升案例的重用性。
  6. class SparkSessionBuilderDeployer(){
  7.   lazy val config:SparkConf = {
  8.     new SparkConf()
  9.   }
  10.   lazy val spark:SparkSession = {
  11.     SparkSession.builder()
  12.       .config(config)
  13.       .getOrCreate()
  14.   }
  15.   lazy val sc:SparkContext = {
  16.     spark.sparkContext
  17.   }
  18.   def stop(): Unit = {
  19.     if (null != spark) {
  20.       spark.stop()
  21.     }
  22.   }
  23. }
  24. object SparkSessionBuilderDeployer {
  25.   def apply(): SparkSessionBuilderDeployer = new SparkSessionBuilderDeployer
  26. }
复制代码
在具体测试案例代码时,可以先通过对path和dirpath指定具体路径的方式来测试整体逻辑,再通过Edit Configurations设置具体参数来测试传参后的程序逻辑,测试完毕之后,即可直接将输入和输出路径分别修改为args(0)和args(1),并且直接打包发布到假造机,供给Spark集群举行执行。
2. Spark Submit 选项详解



  • 主要选项

    • –class :

      • 指定应用程序的主类,即包含 main 方法的类。
      • 示例:--class envtest.ProductsAnalyzer

    • –master :

      • 指定 Spark Master 节点的 URL。
      • 示例:

        • Standalone 模式:
          1. --master spark://master01:7077
          复制代码

          • 表示 Spark Master 节点运行在 master01 主机的 7077 端口上。

        • YARN 模式:
          1. --master yarn
          复制代码

          • 利用 YARN 作为资源管理器。



    • –deploy-mode :

      • 指定应用程序的运行模式。
      • 模式:

        • client:Driver 程序运行在提交使命的当地呆板上。
        • cluster:Driver 程序运行在集群的某个节点上。

      • 示例:--deploy-mode cluster

    • –name :

      • 指定应用程序的名称。
      • 示例:--name spark-sql-product

    • :

      • Spark 应用程序的 JAR 文件的路径,包含了主类及其依靠。
      • 示例:/root/spark/spark_sql-1.0.jar

    • <application-argument(s)>:

      • 应用程序的输入参数。
      • 示例:

        • 输入路径:hdfs://master02:8020/spark/data/products.txt
        • 输出路径:hdfs://master02:8020/spark/result/product_result



  • 具体代码
standalone
  1. spark-submit \--class envtest.ProductsAnalyzer \--master spark://master01:7077
  2. \--deploy-mode client \--name spark-sql-product \/root/spark/spark_sql-1.0.jar \hdfs://master02:8020/spark/data/products.txt \hdfs://master02:8020/spark/result/product_result
复制代码
yarn-cluster
  1. spark-submit \--class envtest.ProductsAnalyzer \--master yarn
  2. \--deploy-mode cluster \--name spark-sql-product \/root/spark/spark_sql-1.0.jar \hdfs://master02:8020/spark/data/products.txt \hdfs://master02:8020/spark/result/product_result
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

立山

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表