【深入浅出 Yarn 架构与实现】5-3 Yarn 调度器资源抢占模型 ...

打印 上一主题 下一主题

主题 970|帖子 970|积分 2910

本篇将对 Yarn 调度器中的资源抢占方式进行探究。分析当集群资源不足时,占用量资源少的队列,是如何从其他队列中抢夺资源的。我们将深入源码,一步步分析抢夺资源的具体逻辑。
一、简介

在资源调度器中,以 CapacityScheduler 为例(Fair 类似),每个队列可设置一个最小资源量和最大资源量。其中,最小资源量是资源紧缺情况下每个队列需保证的资源量,而最大资源量则是极端情况下队列也不能超过的资源使用量。
资源抢占发生的原因,是为了提高资源利用率,资源调度器(包括 Capacity Scheduler 和 Fair Scheduler)会将负载较轻的队列的资源暂时分配给负载重的队列。
仅当负载较轻队列突然收到新提交的应用程序时,调度器才进一步将本属于该队列的资源归还给它。
但由于此时资源可能正被其他队列使用,因此调度器必须等待其他队列释放资源后,才能将这些资源“物归原主”,为了防止应用程序等待时间过长,RM 在等待一段时间后强制回收。
开启容器抢占需要配置的参数 yarn-site.xml:
  1. yarn.resourcemanager.scheduler.monitor.enable
  2. yarn.resourcemanager.scheduler.monitor.policies
复制代码
二、抢占具体逻辑

这里我们主要分析如何选出待抢占容器这一过程。
整理流程如下图所示:

接下来我们深入源码,看看具体的逻辑:
首先 ResourceManager 通过 ResourceManager#createPolicyMonitors 方法创建资源抢占服务:
  1.     protected void createPolicyMonitors() {
  2.       // 只有 capacity scheduler 实现了 PreemptableResourceScheduler 接口,fair 是如何实现资源抢占的?
  3.       if (scheduler instanceof PreemptableResourceScheduler
  4.           && conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
  5.           YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) {
  6.         LOG.info("Loading policy monitors");
  7.         // 是否配置了 scheduler.monitor.policies
  8.         // 默认值是 ProportionalCapacityPreemptionPolicy? 代码中没看到默认值,但是 yarn-site.xml doc 中有默认值
  9.         List<SchedulingEditPolicy> policies = conf.getInstances(
  10.             YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
  11.             SchedulingEditPolicy.class);
  12.         if (policies.size() > 0) {
  13.           for (SchedulingEditPolicy policy : policies) {
  14.             LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
  15.             // periodically check whether we need to take action to guarantee
  16.             // constraints
  17.             // 此处创建了资源抢占服务类。
  18.             // 当此服务启动时,会启动一个线程每隔 PREEMPTION_MONITORING_INTERVAL(默认 3s)调用一次
  19.             // ProportionalCapacityPreemptionPolicy 类中的 editSchedule方法,
  20.             // 【重点】在此方法中实现了具体的资源抢占逻辑。
  21.             SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy);
  22.             addService(mon);
  23.           }
复制代码
资源抢占服务会启动一个线程每隔 3 秒钟调用配置的抢占规则,这里以 ProportionalCapacityPreemptionPolicy(比例容量抢占规则)为例介绍其中的抢占具体逻辑(editSchedule 方法):
  1. // ProportionalCapacityPreemptionPolicy#editSchedule
  2.   public void editSchedule() {
  3.     updateConfigIfNeeded();
  4.     long startTs = clock.getTime();
  5.     CSQueue root = scheduler.getRootQueue();
  6.     // 获取集群当前资源快照
  7.     Resource clusterResources = Resources.clone(scheduler.getClusterResource());
  8.     // 具体的资源抢占逻辑
  9.     containerBasedPreemptOrKill(root, clusterResources);
  10.     if (LOG.isDebugEnabled()) {
  11.       LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms.");
  12.     }
  13.   }
复制代码
editSchedule 方法很简单,逻辑都被封装到 containerBasedPreemptOrKill() 方法中,我们继续深入。
其中主要分三步:

  • 生成资源快照
  • 根据规则找出各队列待抢占的容器(重点)
  • 执行容器资源抢占 或 kill超时未自动停止的容器
  1. // 仅保留重要逻辑
  2.   private void containerBasedPreemptOrKill(CSQueue root,
  3.       Resource clusterResources) {
  4.     // ------------ 第一步 ------------ (生成资源快照)
  5.     // extract a summary of the queues from scheduler
  6.     // 将所有队列信息拷贝到 queueToPartitions - Map<队列名, Map<资源池, 队列详情>>。生成快照,防止队列变化造成计算问题。
  7.       for (String partitionToLookAt : allPartitions) {
  8.         cloneQueues(root, Resources
  9.                 .clone(nlm.getResourceByLabel(partitionToLookAt, clusterResources)), partitionToLookAt);
  10.       }
  11.     // ------------ 第二步 ------------ (找出待抢占的容器)
  12.     // compute total preemption allowed
  13.     // based on ideal allocation select containers to be preemptionCandidates from each queue and each application
  14.     // candidatesSelectionPolicies 默认会放入 FifoCandidatesSelector,
  15.     // 如果配置了 INTRAQUEUE_PREEMPTION_ENABLED,会增加 IntraQueueCandidatesSelector
  16.     for (PreemptionCandidatesSelector selector :
  17.         candidatesSelectionPolicies) {
  18.       // 【核心方法】 计算待抢占 Container 放到 preemptMap
  19.       toPreempt = selector.selectCandidates(toPreempt,
  20.           clusterResources, totalPreemptionAllowed);
  21.     }
  22.     // 这里有个类似 dryrun 的参数 yarn.resourcemanager.monitor.capacity.preemption.observe_only
  23.     if (observeOnly) {
  24.       return;
  25.     }
  26.     // ------------ 第三步 ------------ (执行容器资源抢占 或 kill超时未自动停止的容器)
  27.     // preempt (or kill) the selected containers
  28.     preemptOrkillSelectedContainerAfterWait(toPreempt);
  29.     // cleanup staled preemption candidates
  30.     cleanupStaledPreemptionCandidates();
  31.   }
复制代码
一)找出待抢占的容器

第一步资源快照没什么好说的,直接进入到重点:第二步找出待抢占的容器
即 selector.selectCandidates(),以默认的 FifoCandidatesSelector 实现为例讲解,其他的同理。
主要分两步:

  • 根据使用量和需求量重新分配资源,得到各队列要被抢占的资源量
  • 根据资源差额,计算要 kill 的 container
  1. // yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
  2.   public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
  3.       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
  4.       Resource clusterResource, Resource totalPreemptionAllowed) {
  5.     // ------------ 第一步 ------------ (根据使用量和需求量重新分配资源)
  6.     // Calculate how much resources we need to preempt
  7.     // 计算出每个资源池每个队列当前资源分配量,和实际要 preempt 的量
  8.     preemptableAmountCalculator.computeIdealAllocation(clusterResource,
  9.         totalPreemptionAllowed);
  10.     // ------------ 第二步 ------------ (根据资源差额,计算要 kill 的 container)
  11.     // 选 container 是有优先级的: 使用共享池的资源 -> 队列中后提交的任务 -> amContainer
  12.     for (String queueName : preemptionContext.getLeafQueueNames()) {
  13.       synchronized (leafQueue) {
  14.           // 省略了大部分逻辑,在后面介绍
  15.           // 从 application 中选出要被抢占的容器
  16.           preemptFrom(fc, clusterResource, resToObtainByPartition,
  17.               skippedAMContainerlist, skippedAMSize, selectedCandidates,
  18.               totalPreemptionAllowed);
  19.         }
  20.     }
复制代码
重新计算各队列分配的资源量

我们先来看「根据使用量和需求量重新分配资源」,即 PreemptableResourceCalculator#computeIdealAllocation()
  1.   // 计算每个队列实际要被 preempt 的量
  2.   public void computeIdealAllocation(Resource clusterResource,
  3.       Resource totalPreemptionAllowed) {
  4.     for (String partition : context.getAllPartitions()) {
  5.       TempQueuePerPartition tRoot = context.getQueueByPartition(
  6.           CapacitySchedulerConfiguration.ROOT, partition);
  7.       // 这里计算好每个队列超出资源配置的部分,存在 TempQueuePerPartition
  8.       // preemptableExtra 表示可以被抢占的
  9.       // untouchableExtra 表示不可被抢占的(队列配置了不可抢占)
  10.       // yarn.scheduler.capacity.<queue>.disable_preemption
  11.       updatePreemptableExtras(tRoot);
  12.       tRoot.idealAssigned = tRoot.getGuaranteed();
  13.       // 【重点】遍历队列树,重新计算资源分配,并计算出每个队列计划要 Preempt 的量
  14.       recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
  15.     }
  16.     // 计算实际每个队列要被 Preempt 的量 actuallyToBePreempted(有个阻尼因子,不会一下把所有超量的都干掉)
  17.     calculateResToObtainByPartitionForLeafQueues(context.getLeafQueueNames(),
  18.         clusterResource);
  19.   }
  20. }
复制代码
我们直接深入到 recursivelyComputeIdealAssignment() 方法中的核心逻辑:重新计算各队列资源分配值 AbstractPreemptableResourceCalculator#computeFixpointAllocation()
主要逻辑如下:

  • 首先保障每个队列有自己配置的资源。若使用量小于配置量,多余的资源会被分配到其他队列
  • 若队列有超出配置资源需求,则放到一个优先级队列中,按 (使用量 / 配置量) 从小到大排序
  • 对于有资源需求的队列,在剩余的资源中,按配置比例计算每个队列可分配的资源量
  • 每次从优先级队列中选需求优先级最高的,进行分配
  • 计算 min(可分配量, 队列最大剩余用量, 需求量)。作为本次分配的资源。若仍有资源需求则放回优先级队列,等待下次分配
  • 当满足所有队列资源需求,或者没有剩余资源时结束
  • 仍有资源需求的队列会记录在 underServedQueues
  1.   // 按一定规则将资源分给各个队列
  2.   protected void computeFixpointAllocation(Resource totGuarant,
  3.       Collection<TempQueuePerPartition> qAlloc, Resource unassigned,
  4.       boolean ignoreGuarantee) {
  5.     // 传进来 unassigned = totGuarant
  6.     // 有序队列,(使用量 / 配置量) 从小到大排序
  7.     PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,
  8.         tqComparator);
  9.       // idealAssigned = min(使用量,配置量)。  对于不可抢占队列,则再加上超出的部分,防止资源被再分配。
  10.       if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) {
  11.         q.idealAssigned = Resources.add(q.getGuaranteed(), q.untouchableExtra);
  12.       } else {
  13.         q.idealAssigned = Resources.clone(used);
  14.       }
  15.       // 如果该队列有超出配置资源需求,就把这个队列放到 orderedByNeed 有序队列中(即这个队列有资源缺口)
  16.       if (Resources.lessThan(rc, totGuarant, q.idealAssigned, curPlusPend)) {
  17.         orderedByNeed.add(q);
  18.       }
  19.     }
  20.     // 此时 unassigned 是 整体可用资源 排除掉 所有已使用的资源(used)
  21.     // 把未分配的资源(unassigned)分配出去
  22.     // 方式就是从 orderedByNeed 中每次取出 most under-guaranteed 队列,按规则分配一块资源给他,如果仍不满足就按顺序再放回 orderedByNeed
  23.     // 直到满足所有队列资源,或者没有资源可分配
  24.     while (!orderedByNeed.isEmpty() && Resources.greaterThan(rc, totGuarant,
  25.         unassigned, Resources.none())) {
  26.       Resource wQassigned = Resource.newInstance(0, 0);
  27.       // 对于有资源缺口的队列,重新计算他们的资源保证比例:normalizedGuarantee。
  28.       // 即 (该队列保证量 / 所有资源缺口队列保证量)
  29.       resetCapacity(unassigned, orderedByNeed, ignoreGuarantee);
  30.       // 这里返回是个列表,是因为可能有需求度(优先级)相等的情况
  31.       Collection<TempQueuePerPartition> underserved = getMostUnderservedQueues(
  32.           orderedByNeed, tqComparator);
  33.       for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
  34.           .hasNext();) {
  35.         TempQueuePerPartition sub = i.next();
  36.         // 按照 normalizedGuarantee 比例能从剩余资源中分走多少。
  37.         Resource wQavail = Resources.multiplyAndNormalizeUp(rc, unassigned,
  38.             sub.normalizedGuarantee, Resource.newInstance(1, 1));
  39.         // 【重点】按一定规则将资源分配给队列,并返回剩下的资源。
  40.         Resource wQidle = sub.offer(wQavail, rc, totGuarant,
  41.             isReservedPreemptionCandidatesSelector);
  42.         // 分配给队列的资源
  43.         Resource wQdone = Resources.subtract(wQavail, wQidle);
  44.         // 这里 wQdone > 0 证明本次迭代分配出去了资源,那么还会放回到待分配资源的集合中(哪怕本次已满足资源请求),直到未再分配资源了才退出。
  45.         if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) {
  46.           orderedByNeed.add(sub);
  47.         }
  48.         Resources.addTo(wQassigned, wQdone);
  49.       }
  50.       Resources.subtractFrom(unassigned, wQassigned);
  51.     }
  52.     // 这里有可能整个资源都分配完了,还有队列资源不满足
  53.     while (!orderedByNeed.isEmpty()) {
  54.       TempQueuePerPartition q1 = orderedByNeed.remove();
  55.       context.addPartitionToUnderServedQueues(q1.queueName, q1.partition);
  56.     }
  57.   }
复制代码
上面第 5 步是重点,也就是 sub.offer(),是计算给该队列在保证值之外,还能提供多少资源:
  1.   /**
  2.    * 计算队列 idealAssigned,在原有基础上增加新分配的资源。同时返回 avail 中未使用的资源。
  3.    * 参数说明:
  4.    * avail 按比例该队列能从剩余资源中分配到的
  5.    * clusterResource 整体资源量
  6.    * considersReservedResource ?
  7.    * idealAssigned = min(使用量,配置量)
  8.    */
  9.   Resource offer(Resource avail, ResourceCalculator rc,
  10.       Resource clusterResource, boolean considersReservedResource) {
  11.     // 计算的是还有多少可分配资源的空间( maxCapacity - assigned )
  12.     Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(
  13.         Resources.subtract(getMax(), idealAssigned),
  14.         Resource.newInstance(0, 0));
  15.     // remain = avail - min(avail, (max - assigned), (current + pending - assigned))
  16.     // 队列接受资源的计算方法:可提供的资源,队列最大资源-已分配资源,当前已使用资源+未满足的资源-min(使用量,配置量) 三者中的最小值。
  17.     Resource accepted = Resources.min(rc, clusterResource,
  18.         absMaxCapIdealAssignedDelta,
  19.         Resources.min(rc, clusterResource, avail, Resources
  20.             .subtract(
  21.                 Resources.add((considersReservedResource
  22.                     ? getUsed()
  23.                     : getUsedDeductReservd()), pending),
  24.                 idealAssigned)));
  25.     Resource remain = Resources.subtract(avail, accepted);
  26.     Resources.addTo(idealAssigned, accepted);
  27.     return remain;
  28.   }
复制代码
核心的资源重新分配算法逻辑已经计算完毕,剩下的就是:
根据重新计算的资源分配,得到各队列超用的资源,这部分就是要被抢占的资源。
这里不会一下把队列超用的资源都干掉,有个阻尼因子,用于平滑抢占处理。
根据资源差额,计算要抢占的容器

回到 selector.selectCandidates(),上面已经介绍了各队列抢占量的计算逻辑,接下来介绍「如何选出各队列中的 container」

  • 抢占该队列在共享池使用资源的 container
  • 抢占后提交任务中,后生成的 container(也就是越晚生成的 container,会被先处理)
  • 抢占 amContainer
  1.   public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
  2.       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
  3.       Resource clusterResource, Resource totalPreemptionAllowed) {
  4.         // ......
  5.     // ------------ 第二步 ------------ (根据资源差额,计算要 kill 的 container)
  6.     // 根据计算得到的要抢占的量,计算各资源池各队列要 kill 的 container
  7.     List<RMContainer> skippedAMContainerlist = new ArrayList<>();
  8.     // Loop all leaf queues
  9.     // 这里是有优先级的: 使用共享池的资源 -> 队列中后提交的任务 -> amContainer
  10.     for (String queueName : preemptionContext.getLeafQueueNames()) {
  11.       // 获取该队列在每个资源池要被抢占的量
  12.       Map<String, Resource> resToObtainByPartition =
  13.           CapacitySchedulerPreemptionUtils
  14.               .getResToObtainByPartitionForLeafQueue(preemptionContext,
  15.                   queueName, clusterResource);
  16.       synchronized (leafQueue) {
  17.         // 使用共享池资源的,先处理
  18.         Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityContainers =
  19.             leafQueue.getIgnoreExclusivityRMContainers();
  20.         for (String partition : resToObtainByPartition.keySet()) {
  21.           if (ignorePartitionExclusivityContainers.containsKey(partition)) {
  22.             TreeSet<RMContainer> rmContainers =
  23.                 ignorePartitionExclusivityContainers.get(partition);
  24.             // 最后提交的任务,会被最先抢占
  25.             for (RMContainer c : rmContainers.descendingSet()) {
  26.               if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,
  27.                   selectedCandidates)) {
  28.                 // Skip already selected containers
  29.                 continue;
  30.               }
  31.               // 将 Container 放到待抢占集合 preemptMap 中
  32.               boolean preempted = CapacitySchedulerPreemptionUtils
  33.                   .tryPreemptContainerAndDeductResToObtain(rc,
  34.                       preemptionContext, resToObtainByPartition, c,
  35.                       clusterResource, selectedCandidates,
  36.                       totalPreemptionAllowed);
  37.             }
  38.           }
  39.         }
  40.         // preempt other containers
  41.         Resource skippedAMSize = Resource.newInstance(0, 0);
  42.         // 默认是 FifoOrderingPolicy,desc 也就是最后提交的在最前面
  43.         Iterator<FiCaSchedulerApp> desc =
  44.             leafQueue.getOrderingPolicy().getPreemptionIterator();
  45.         while (desc.hasNext()) {
  46.           FiCaSchedulerApp fc = desc.next();
  47.           if (resToObtainByPartition.isEmpty()) {
  48.             break;
  49.           }
  50.           // 从 application 中选出要被抢占的容器(后面介绍)
  51.           preemptFrom(fc, clusterResource, resToObtainByPartition,
  52.               skippedAMContainerlist, skippedAMSize, selectedCandidates,
  53.               totalPreemptionAllowed);
  54.         }
  55.         // Can try preempting AMContainers
  56.         Resource maxAMCapacityForThisQueue = Resources.multiply(
  57.             Resources.multiply(clusterResource,
  58.                 leafQueue.getAbsoluteCapacity()),
  59.             leafQueue.getMaxAMResourcePerQueuePercent());
  60.         preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist,
  61.             resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue,
  62.             totalPreemptionAllowed);
  63.       }
  64.     }
  65.     return selectedCandidates;
  66.   }
复制代码
二)执行容器资源抢占

把要被抢占的 container 都选出来之后,就剩最后一步, kill 这些 container。
回到 containerBasedPreemptOrKill():
  1.   private void containerBasedPreemptOrKill(CSQueue root,
  2.       Resource clusterResources) {
  3.         // ......
  4.     // ------------ 第三步 ------------ (执行容器资源抢占 或 kill超时未自动停止的容器)
  5.     // preempt (or kill) the selected containers
  6.     preemptOrkillSelectedContainerAfterWait(toPreempt);
  7.     // cleanup staled preemption candidates
  8.     cleanupStaledPreemptionCandidates();
  9.   }
复制代码
三、总结

至此,分析完毕整个资源抢占的过程。
总结一下主要逻辑:

  • 重新计算各资源池中各队列应分配的资源;
  • 与现在已使用的资源进行对比,如果超过新计算的分配量,(超用的部分*阻尼系数)就是要被抢占的资源量;
  • 各队列根据要被抢占的资源量,选出要被 kill 的 container。优先度低的 container 就会被先处理(使用了共享资源的、后生成的 container);
  • 通过心跳通知 AM 要被 kill 的 container,或者处理掉通知过已超时的 container。
参考文章:
Yarn FairScheduler的抢占机制详解_小昌昌的博客的博客-CSDN博客
Yarn抢占最核心剖析_Geoffrey Turing的博客-CSDN博客 - 针对 fair
Yarn调度之CapacityScheduler源码分析资源抢占
Better SLAs via Resource-preemption in YARN's CapacityScheduler - Cloudera Blog

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

写过一篇

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表