Spark 应用程序优化指南
1. 内存设置
a. Executor 和 Driver 的内存
- --executor-memory:指定每个 executor 的堆内存大小。比方,--executor-memory 4g 表示每个 executor 分配 4GB 内存。
- --driver-memory:指定 driver 程序的堆内存大小。比方,--driver-memory 2g。
b. Spark 内存管理参数
- spark.memory.fraction:定义用于存储和 shuffle 的内存比例,默认值是 0.6,意味着60%的executor内存将被分配给此用途。
- spark.memory.storageFraction:定义了 spark.memory.fraction 中有多少比例专门用于缓存数据,默认值是 0.5。
2. 并行度设置
a. Executor 数量 (--num-executors)
- 指定运行应用程序时使用的 executor 数量。更多 executors 可以提高任务的并行度,但也会增加集群管理开销。
b. 默认并行度 (spark.default.parallelism)
- 设置默认的并行度,即每个 stage 的最小 task 数量。它通常应等于或略大于集群中的总核心数。更高的并行度可以加速处理速率,但也大概导致过多的任务竞争资源。
c. 分区数量 (partition count)
- 在读取文件时可以通过 repartition() 或 coalesce() 方法来控制 RDD/DataFrame 的分区数量。公道的分区数量有助于均衡负载,并且避免过多的小文件导致的性能题目。
3. 其他告急参数
a. 每个 Executor 的核心数 (spark.executor.cores)
- 定义每个 executor 使用的核心数。这影响到单个 executor 上可以并发实验的任务数量。
b. Shuffle 后的分区数量 (spark.sql.shuffle.partitions)
- 控制 shuffle 操作后的分区数量,默认值为 200。根据数据集大小适当调整这个参数可以资助淘汰 shuffle 数据量,从而提升性能。
c. 每个 Task 的 CPU 核心数 (spark.task.cpus)
- 每个 task 所需的 CPU 核心数,默认为 1。如果您有 CPU 麋集型任务,大概须要增加这个值。
4. 动态分配资源
- 启用动态分配:通过设置 spark.dynamicAllocation.enabled=true 来启用动态分配功能,这样可以根据工作负载自动调整分配给应用的资源。
- 最小/最大 executors:使用 spark.dynamicAllocation.minExecutors 和 spark.dynamicAllocation.maxExecutors 来限制最小和最大的 executors 数量。
5. 数据本地性(Data Locality)
- spark.locality.wait:定义了 Spark 等候数据本地性的时间,默认是3秒。如果在该时间段内找不到符合的数据本地性节点,任务将被发送到其他节点实验。
- spark.locality.wait.node:指定等候同一节点上数据本地性的最长时间。
6. 垃圾接纳 (Garbage Collection, GC) 调优
- 选择符合的 GC 算法:对于大堆内存的应用,建议使用 G1GC 或者 Parallel GC 来淘汰停顿时间。可以通过 spark.executor.extraJavaOptions 和 spark.driver.extraJavaOptions 设置 JVM 参数来启用这些 GC 算法。
- 对于 G1GC: -XX:+UseG1GC
- 对于 Parallel GC: -XX:+UseParallelGC
- 调整新生代大小:通过设置 -Xmn 参数来调整新生代的大小,确保有足够的空间处理短期对象,从而淘汰老年代的压力。
7. 高效编程实践
a. 广播变量 (Broadcast Variables)
- 当须要在多个任务之间共享一个小而常驻的数据集时,使用广播变量可以节省网络传输开销。比方,如果有一个小表须要与大数据集进行连接操作,可以考虑将其广播给所有 worker。
- val broadcastVar = sc.broadcast(smallDataset)
复制代码 b. 累加器 (Accumulators)
- 使用累加器来进行分布式计数或聚合操作,如统计错误数量、盘算总和等。它们提供了一种安全的方式来进行跨节点的数值汇总。
- val accum = sc.accumulator(0)
- rdd.foreach(x => accum += x)
- println(s"Total count: ${accum.value}")
复制代码 c. 长期化策略 (Persistence Levels)
- 根据差异的需求选择符合的长期化级别(如 MEMORY_ONLY, MEMORY_AND_DISK, DISK_ONLY)。对于频繁读取的数据,可以选择更高的长期化级别以加速访问速率;而对于不常访问的数据,则可以选择较低级别的长期化来节省内存。
- rdd.persist(StorageLevel.MEMORY_AND_DISK)
复制代码 8. Shuffle 操作优化
- 避免不须要的 shuffle:尽量淘汰 shuffle 操作的数量,因为它们会导致大量的 I/O 和网络通信。可以通过适当调整业务逻辑或者使用 join 的替换方案(如 map-side join)来实现这一点。
- 使用宽依赖转窄依赖的方法:比方,通过 key-based 分区取代全局排序,可以在某些情况下转换为更高效的窄依赖关系。
9. 资源管理器集成
a. YARN 集成
- 如果您使用的是 YARN 作为资源管理器,可以通过 spark.yarn.am.memory 和 spark.yarn.am.cores 来控制 Application Master 的资源分配。
- 还可以利用 YARN 的动态资源请求功能 (spark.yarn.executor.memoryOverheadFactor) 来根据实际须要动态调整每个 executor 的内存开销。
b. Kubernetes 集成
- 在 Kubernetes 情况中,可以通过 spark.kubernetes.* 系列参数来配置 pod 规格、服务账户以及其他 K8s 特定选项。
10. 监控与调优工具
- Spark UI:通过 Spark 提供的 Web 界面查看作业实验情况、阶段信息、任务进度等。它提供了丰富的可视化信息,有助于快速定位性能瓶颈。
- Metrics System:启用 Spark 内置的 metrics system 可以网络详细的运行时指标,并通过 Graphite、Ganglia 等体系进行外部监控。
- 第三方工具:考虑使用诸如 Cloudera Manager、Ambari 或 Datadog 等平台提供的监控工具,这些工具通常包含更全面的集群管理和性能分析功能。
11. 最佳实践
- 预分区数据:在输入源端就对数据进行适当的分区,这样可以淘汰后续处理中的 shuffle 操作。
- 压缩中间结果:启用压缩可以淘汰磁盘 I/O 和网络传输量,提高整体性能。比方,设置 spark.io.compression.codec=snappy。
- 缓存常用数据:对于反复使用的数据集,应该考虑使用 cache() 或 persist() 方法将其保存在内存中,以加速后续访问。
12. 调试与日志
- 启用详细日志记载:当碰到题目时,增加日志级别可以资助更好地理解应用程序的举动。比方,设置 log4j.rootCategory=DEBUG, console。
- 捕捉堆栈跟踪:当任务失败时,确保能够获取完整的异常堆栈跟踪信息,这有助于快速诊断题目根源。
实践建议
- 渐渐调整:不要一次性更改太多参数,应该渐渐进行调整,每次只改变一个或少数几个参数,以便观察其对性能的影响。
- 监控与分析:利用 Spark UI(http://:4040)监控作业实验情况,查看内存使用、任务完成时间等指标,资助做出更明智的调整决策。
- 测试与验证:在生产情况中实施任何庞大变更之前,请先在一个小型测试情况中验证新配置的效果。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |