01-Spark的Local模式与应用开发入门

打印 上一主题 下一主题

主题 855|帖子 855|积分 2567

1 Spark 的 local 模式

Spark 运行模式之一,用于在本地呆板上单机模拟分布式计算的环境。在 local 模式下,Spark 会利用单个 JVM 进程来模拟分布式集群行为,所有 Spark 组件(如 SparkContext、Executor 等)都运行在同一个 JVM 进程中,不涉及集群间通信,实用本地开发、测试和调试。
1.1 重要特点和利用场景


  • 本地开发和测试:在开发 Spark 应用程序时,可以利用 local 模式进行本地开发和测试。如许可以制止毗连到集群的开销,加快开发迭代速度。同时,可以模拟集群环境中的作业执行流程,验证代码逻辑和功能。
  • 单机数据处理:对于较小规模的数据处理使命,例如处理数百兆或数个 GB 的数据,可以利用 local 模式进行单机数据处理。如许可以充实利用本地呆板的资源,快速完成数据处理使命。
  • 调试和故障排查:在调试和故障排查过程中,利用 local 模式可以更方便地查看日志、变量和数据,加快发现息争决问题的速度。可以在本地环境中模拟各种情况,验证代码的健壮性和可靠性。
  • 教学和学习:对于 Spark 的初学者或教学场景,local 模式提供了一个简单直观的学习环境。学习者可以在本地环境中快速运行 Spark 应用程序,理解 Spark 的基本概念和工作原理。
1.2 利用 local 模式

设置 SparkConf 中的 spark.master 属性为 "local" 来指定运行模式。如Scala中如许设置:
  1. import org.apache.spark.{SparkConf, SparkContext}
  2. object SparkLocalExample {
  3.   def main(args: Array[String]): Unit = {
  4.     val conf = new SparkConf().setAppName("SparkLocalExample").setMaster("local")
  5.     val sc = new SparkContext(conf)
  6.     // 在这里编写你的 Spark 应用程序逻辑
  7.     sc.stop()  // 停止 SparkContext
  8.   }
  9. }
复制代码
1.3 注意

local 模式仅实用于小规模数据处理和本地开发测试场景,并不实用于生产环境的大规模数据处理使命。在生产环境中,必要利用集群模式(如 standalone、YARN、Mesos 等)来运行 Spark 应用程序,以便充实利用集群资源和进步作业的并行度。
2 Spark应用开发

2.1 SparkContext

通常一个 Spark 程序对应一个 SparkContext 实例。SparkContext 是 Spark 应用程序的主入口点,负责与集群进行通信,管理作业的调治和执行,以及维护应用程序的状态。因此,一个 SparkContext 实例通常对应一个独立的 Spark 应用程序。
在正常情况下,创建多个 SparkContext 实例是不推荐的,由于这可能会导致资源冲突、内存泄漏和性能降落等问题。Spark 本身设计为单个应用程序对应一个 SparkContext,以便于有用地管理资源和执行作业。
然而,在某些特殊情况下,可能会存在多个 SparkContext 实例的情况:

  • 测试和调试:在测试和调试阶段,有时会创建额外的 SparkContext 实例来模拟不同的场景或测试不同的配置。如许可以更好地理解 Spark 应用程序的行为和性能,以便进行优化和调整。
  • 交互式环境:在交互式环境下(如 Spark Shell、Jupyter Notebook 等),有时会创建多个 SparkContext 实例来进行实验、测试或不同的作业执行。这些 SparkContext 实例可能是由不同的用户或会话创建的,用于并行执行不同的使命或查询。
  • 多应用程序共享资源:在同一个集群上运行多个独立的 Spark 应用程序,而且它们必要共享同一组集群资源时,可能会创建多个 SparkContext 实例来管理各自的作业和资源。这种情况下,必要确保各个应用程序的 SparkContext 实例可以或许精确地管理资源,制止资源冲突和竞争。
创建多个 SparkContext 实例时必要谨慎处理,而且必要确保它们可以或许精确地管理资源、制止冲突,而且不会影响其他应用程序或作业的正常运行。在生产环境中,发起仅利用一个 SparkContext 实例来管理整个应用程序。
SparkContext是Spark应用的入口点,负责初始化Spark应用所必要的环境和数据结构。
2.2 运行一个Spark应用的步骤


  • 创建SparkContext,这会初始化Spark应用环境、资源和驱动程序
  • 通过SparkContext 创建RDD、DataFrame和Dataset
  • 在RDD、DataFrame和Dataset上进行转换和行动操作
  • 关闭SparkContext来关闭Spark应用
以是,一个标准的Spark应用对应一个SparkContext实例。通过创建SparkContext来开始我们的程序,在其上执行各种操作,并在竣事时关闭该实例。
3 案例

3.1 测试数据文件

input.txt
  1. JavaEdge,JavaEdge,JavaEdge
  2. go,go
  3. scalascala
复制代码
3.2 代码
  1. package com.javaedge.bigdata.chapter02
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. /**
  4. * 词频统计案例
  5. * 输入:文件
  6. * 需求:统计出文件中每个单词出现的次数
  7. * 1)读每一行数据
  8. * 2)按照分隔符把每一行的数据拆成单词
  9. * 3)每个单词赋上次数为1
  10. * 4)按照单词进行分发,然后统计单词出现的次数
  11. * 5)把结果输出到文件中
  12. * 输出:文件
  13. */
  14. object SparkWordCountApp {
  15.   def main(args: Array[String]): Unit = {
  16.     val sparkConf = new SparkConf()
  17.     val sc = new SparkContext(sparkConf)
  18.     val rdd = sc.textFile("/Users/javaedge/Downloads/sparksql-train/data/input.txt")
  19.     rdd.collect().foreach(println)
  20.     sc.stop()
  21. }
复制代码
发现启动后,报错啦:
  1. ERROR SparkContext: Error initializing SparkContext.
  2. org.apache.spark.SparkException: A master URL must be set in your configuration
  3.         at org.apache.spark.SparkContext.<init>(SparkContext.scala:368)
  4.         at com.javaedge.bigdata.chapter02.SparkWordCountApp$.main(SparkWordCountApp.scala:25)
  5.         at com.javaedge.bigdata.chapter02.SparkWordCountApp.main(SparkWordCountApp.scala)
  6. ERROR Utils: Uncaught exception in thread main
复制代码
必须设置集群?我才刚入门大数据诶,这么麻烦?劝退,不学了!还好 spark 也支持简单摆设:
  1. val sparkConf = new SparkConf().setMaster("local")
复制代码
重启,又报错:
  1. ERROR SparkContext: Error initializing SparkContext.
  2. org.apache.spark.SparkException: An application name must be set in your configuration
  3.         at org.apache.spark.SparkContext.<init>(SparkContext.scala:371)
  4.         at com.javaedge.bigdata.chapter02.SparkWordCountApp$.main(SparkWordCountApp.scala:25)
  5.         at com.javaedge.bigdata.chapter02.SparkWordCountApp.main(SparkWordCountApp.scala)
  6. ERROR Utils: Uncaught exception in thread main
复制代码
  1. val sparkConf = new SparkConf().setMaster("local").setAppName("SparkWordCountApp")
复制代码
成功了!
  1. val rdd = sc.textFile("/Users/javaedge/Downloads/sparksql-train/data/input.txt")
  2. rdd.flatMap(_.split(","))
  3.   .map(word => (word, 1)).collect().foreach(println)
  4. sc.stop()
  5. output:
  6. (pk,1)
  7. (pk,1)
  8. (pk,1)
  9. (jepson,1)
  10. (jepson,1)
  11. (xingxing,1)
复制代码
3.3 输出到文件
  1. rdd.flatMap(_.split(","))
  2.   // 3)每个单词赋上次数为1
  3.   .map(word => (word, 1))
  4.   .reduceByKey(_ + _)
  5.   .saveAsTextFile("/Users/javaedge/Downloads/sparksql-train/data/output.txt")
复制代码


3.4 按频率降序排
  1. // 2)按照分隔符把每一行的数据拆成单词
  2. rdd.flatMap(_.split(","))
  3.   // 3)每个单词赋上次数为1
  4.   .map(word => (word, 1))
  5.   // 4)按照单词进行分发,然后统计单词出现的次数
  6.   .reduceByKey(_ + _)
  7.   // 结果按单词频率降序排列,既然之前是 <单词,频率> 且 sortKey 只能按 key 排序,那就在这里反转 kv 顺序
  8.   .map(x => (x._2, x._1))
  9.   .collect().foreach(println)
  10. output:
  11. (2,go)
  12. (1,scalascala)
  13. (3,JavaEdge)
复制代码
显然结果不符合期望。怎样调整呢?再翻转一次!
  1. rdd.flatMap(_.split(","))
  2.   .map(word => (word, 1))
  3.   .reduceByKey(_ + _)
  4.   // 结果按单词频率降序排列,既然之前是 <单词,频率> 且 sortKey 只能按 key 排序,那就在这里反转 kv 顺序
  5.   .map(x => (x._2, x._1))
  6.   .sortByKey(false)
  7.   .map(x => (x._2, x._1))
  8.   .collect().foreach(println)
  9. output:
  10. (JavaEdge,3)
  11. (go,2)
  12. (scalascala,1)
复制代码
4 spark-shell启动
  1. javaedge@JavaEdgedeMac-mini bin % ./spark-shell --master local
  2. 23/03/23 16:28:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  3. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
  4. Setting default log level to "WARN".
  5. To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
  6. Spark context Web UI available at http://172.16.1.55:4040
  7. Spark context available as 'sc' (master = local, app id = local-1679560146321).
  8. Spark session available as 'spark'.
  9. Welcome to
  10.       ____              __
  11.      / __/__  ___ _____/ /__
  12.     _\ \/ _ \/ _ `/ __/  '_/
  13.    /___/ .__/\_,_/_/ /_/\_\   version 2.4.3
  14.       /_/
  15. Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_362)
  16. Type in expressions to have them evaluated.
  17. Type :help for more information.
  18. scala>
复制代码
4 通过YARN提交使命
  1. $ ./spark-submit --master yarn \
  2.   --deploy-mode client \
  3.   --class <main_class> \
  4.   --num-executors <num_executors> \
  5.   --executor-memory <executor_memory> \
  6.   --executor-cores <executor_cores> \
  7.   <path_to_jar_or_py_file> \
  8.   <app_arguments>
复制代码
各参数含义:

  • --master yarn: 指定利用YARN作为Spark的资源管理器。
  • --deploy-mode client: 指定摆设模式为client模式,即Driver程序运行在提交Spark使命的客户端呆板上。
  • --class : 指定Spark应用程序的主类。
  • --num-executors : 指定执行器的数目。
  • --executor-memory : 指定每个执行器的内存大小。
  • --executor-cores : 指定每个执行器的核心数。
  • : 指定要提交的Spark应用程序的JAR文件或Python文件的路径。
  • : 指定Spark应用程序的参数。
如提交一个Scala版本的Spark应用程序的命令:
  1. $ ./spark-submit --master yarn \
  2.   --deploy-mode client \
  3.   --class com.example.MySparkApp \
  4.   --num-executors 4 \
  5.   --executor-memory 2G \
  6.   --executor-cores 2 \
  7.   /path/to/my-spark-app.jar \
  8.   arg1 arg2 arg3
复制代码
假如你要提交一个Python版本的Spark应用程序,可以利用以下命令:
  1. $ ./spark-submit --master yarn \
  2.   --deploy-mode client \
  3.   /path/to/my-spark-app.py \
  4.   arg1 arg2 arg3
复制代码
如许就可以通过YARN提交Spark使命,Spark会向YARN请求资源并在集群上执行使命。
关注我,紧跟本系列专栏文章,咱们下篇再续!
作者简介:魔都技能专家兼架构,多家大厂后端一线研发经验,各大技能社区头部专家博主。具有丰富的引领团队经验,深厚业务架构息争决方案的积累。
负责:

  • 中心/分销预订体系性能优化
  • 活动&优惠券等营销中台建设
  • 生意业务平台及数据中台等架构和开发设计
  • 车联网核心平台-物联网毗连平台、大数据平台架构设计及优化
目前主攻低落软件复杂性设计、构建高可用体系方向。
参考:
本文由博客一文多发平台 OpenWrite 发布!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

tsx81428

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表