Spark动态资源释放机制 详解

打印 上一主题 下一主题

主题 891|帖子 891|积分 2673

        Apache Spark 是一个分布式数据处理框架,其动态资源分配(或称为动态资源释放)机制,是为了更高效地使用集群资源,尤其是在实验具有差别工作负载的作业时。Spark 的动态资源释放机制允许它根据作业的需求自动分配和释放集群资源,从而提高资源使用率,降低空闲资源占用时间。
1. Spark 动态资源分配概述

动态资源分配(Dynamic Resource Allocation, DRA)机制主要办理以下题目:


  • 资源浪费:在传统的静态分配模式中,Spark 作业启动时会为其分配固定数目的实验器(Executor),即使任务实验过程中部门实验器处于空闲状态,这些资源也不会被释放。
  • 作业负载波动:Spark 作业的负载可能随时间厘革,差别阶段所需资源差别,动态资源分配允许作业根据任务负载的厘革自动调解资源数目。
        Spark 通过动态调解**实验器(Executor)**的数目,根据作业的负载来增长或减少资源。核心目的是:


  • 自动扩展资源:看成业必要更多资源时,可以动态增长实验器。
  • 释放空闲资源:看成业不再必要过多的资源时,自动释放空闲的实验器以减少资源占用。
2. Spark 动态资源分配的触发条件

Spark 的动态资源释放机制主要根据两个方面触发:

  • 作业的负载:如果作业在运行过程中,发现有更多的任务必要实验,但当前的实验器数目不敷,那么 Spark 可以请求新的实验器。
  • 实验器的空闲状态:如果发现某些实验器长时间处于空闲状态(没有任务运行),Spark 可以将这些实验器释放掉,归还集群资源。
        具体地,Spark 会监控每个实验器的任务分配和运行状态,并根据以下参数做出资源释放或扩展的决定:


  • spark.dynamicAllocation.enabled:启用或禁用动态资源分配机制。
  • spark.dynamicAllocation.minExecutors:设置最小实验器数目,即使资源空闲,Spark 也不会低于这个数目。
  • spark.dynamicAllocation.maxExecutors:设置最大实验器数目,限制作业可以使用的资源上限。
  • spark.dynamicAllocation.executorIdleTimeout:实验器空闲的最大时间,高出该时间后将被释放。
3. 底层原理

        Spark 的动态资源释放机制依靠于集群管理器(如 YARN、Kubernetes、Mesos)以及 Spark 自身的调度逻辑来实现资源的动态增减。其核心思想是通过监控任务的状态和资源使用情况,决定是否必要增长或者减少实验器。
3.1 动态资源分配的组件

以下是动态资源分配机制涉及的关键组件:


  • ExecutorAllocationManager:这是 Spark 动态资源分配的核心管理类,负责监控实验器的任务负载,决定是否要扩展或释放实验器。
  • ClusterManager:这是 Spark 的集群管理层,如 YARN 或 Kubernetes,负责现实的资源分配和管理。ExecutorAllocationManager 向集群管理器请求资源,集群管理器则现实分配或释放实验器。
  • TaskSchedulerImpl:任务调度器,用于调度任务给实验器,并与 ExecutorAllocationManager 协作,判断当前的资源使用状态。
  • BlockManager:负责 Spark 的存储管理,缓存数据块(RDD partitions 等),影响实验器是否可以释放。
3.2 实验器动态扩展的原理


  • 任务提交与资源不敷检测

    • 当一个 Spark 作业开始实验时,ExecutorAllocationManager 会根据当前待实验的任务数目和现有实验器的任务处理本领,判断是否必要更多实验器。
    • 如果发现待实验的任务远多于当前实验器可以大概处理的任务,ExecutorAllocationManager 就会向集群管理器(如 YARN 或 Kubernetes)请求更多的实验器。

  • 扩展实验器

    • ExecutorAllocationManager 通过 TaskSchedulerImpl 检查当前的任务负载。
    • 它会根据 spark.dynamicAllocation.initialExecutors 参数指定的初始实验器数目和当前任务队列长度,计算必要的实验器数目。
    • 然后向集群管理器发起请求,增长实验器数目,直到到达 spark.dynamicAllocation.maxExecutors 限制的最大值。

  1. class ExecutorAllocationManager(scheduler: TaskSchedulerImpl) {
  2.   def schedule(): Unit = {
  3.     // 根据任务负载计算需要的执行器数量
  4.     val numExecutorsNeeded = computeNumExecutorsNeeded()
  5.     if (numExecutorsNeeded > currentExecutors) {
  6.       // 请求集群管理器分配更多执行器
  7.       requestExecutors(numExecutorsNeeded - currentExecutors)
  8.     }
  9.   }
  10. }
复制代码
     3.实验器分配与任务调度
              1. 集群管理器响应后,分配新的实验器,并将它们添加到集群中。
              2. 新的实验器加入后,TaskSchedulerImpl 会为其分配待处理的任务,新的任务开始实验。
3.3 实验器动态释放的原理


  • 监控空闲实验器

    • ExecutorAllocationManager 也会持续监控实验器的使用情况,判断实验器是否空闲。每个实验器的状态(空闲或繁忙)会定期被更新。
    • 如果某个实验器高出 spark.dynamicAllocation.executorIdleTimeout 的空闲时间(默以为 60 秒),而且集群中运行的实验器数目大于 spark.dynamicAllocation.minExecutors,则该实验器会被标记为可释放状态。

  • 释放实验器

    • 一旦发现实验器空闲时间超时,ExecutorAllocationManager 会通知集群管理器释放这些空闲的实验器。
    • 在实验器释放之前,BlockManager 会确保该实验器上没有必要保存的缓存数据。如果缓存的数据紧张(如被其他实验器所依靠),它会将这些数据复制到其他实验器上。

  1. class ExecutorAllocationManager(scheduler: TaskSchedulerImpl) {
  2.   def schedule(): Unit = {
  3.     val idleExecutors = getIdleExecutors()
  4.     if (idleExecutors.nonEmpty) {
  5.       // 如果有空闲的执行器,且空闲时间超过阈值,则释放这些执行器
  6.       removeExecutors(idleExecutors)
  7.     }
  8.   }
  9. }
复制代码
     3.任务完成后的资源释放
              1.当 Spark 作业进入尾声,待实验的任务逐渐减少,实验器处于空闲状态。
     2.ExecutorAllocationManager 会根据 spark.dynamicAllocation.executorIdleTimeout 来判断哪些实验器可以释放,并渐渐将这些空闲实验器释放回集群管理器,从而克制浪费资源。
4. 源码解析:ExecutorAllocationManager 的工作流程

        ExecutorAllocationManager 是 Spark 动态资源分配的核心类。它通过定期检查任务队列和实验器状态来判断是否必要扩展或释放资源。其主要逻辑分为两个部门:实验器扩展和实验器释放。
4.1 实验器扩展逻辑

        扩展逻辑通过 schedule() 方法进行资源检查和调解。它会定期运行,检查是否有新的任务提交,如果有未分配的任务,且当前的实验器数目不敷以处理这些任务,就会请求新的实验器:
  1. private def schedule(): Unit = {
  2.   // 计算需要的执行器数量
  3.   val numExecutorsNeeded = computeNumExecutorsNeeded()
  4.   
  5.   if (numExecutorsNeeded > currentExecutors) {
  6.     // 请求新的执行器
  7.     requestExecutors(numExecutorsNeeded - currentExecutors)
  8.   }
  9. }
复制代码

4.2 实验器释放逻辑

        释放逻辑通过 schedule() 方法中的 removeExecutors() 进行资源释放。在每次调度周期内,ExecutorAllocationManager 会检查哪些实验器处于空闲状态,并判断它们是否可以被释放。
  1. private def removeExecutors(executors: Seq[String]): Unit = {
  2.   for (executor <- executors) {
  3.     if (canBeRemoved(executor)) {
  4.       // 通知集群管理器释放执行器
  5.       releaseExecutor(executor)
  6.     }
  7.   }
  8. }
复制代码

4.3 主要调度参数与其作用



  • spark.dynamicAllocation.enabled:是否启用动态资源分配。
  • spark.dynamicAllocation.minExecutors:最小实验器数目。
  • spark.dynamicAllocation.maxExecutors:最大实验器数目。
  • spark.dynamicAllocation.executorIdleTimeout:实验器空闲时间,高出该时间的空闲实验器会被释放。
5. 总结

Spark 的动态资源释放机制旨在提高资源使用服从,克制资源浪费。它通过以下步调实现:


  • 实验器动态扩展:根据任务负载,自动增长实验器。
  • 实验器动态释放:当实验器空闲时自动释放,减少资源占用。
  • 与集群管理器的协同工作:Spark 的资源扩展和释放都依靠于集群管理器(如 YARN 或 Kubernetes)来实现现实的资源管理。
        通过 ExecutorAllocationManager 和集群管理器的紧密协作,Spark 动态资源分配机制能有效地调度资源,保证作业实验的同时,最大限度地节省资源。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

惊雷无声

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表