ToB企服应用市场:ToB评测及商务社交产业平台

标题: Spark的动态资源分配算法 [打印本页]

作者: 十念    时间: 2024-7-27 16:32
标题: Spark的动态资源分配算法
前言

在《Spark RPC框架详解》这篇文章中,介绍了Spark RPC通信的基本流程。可以看到,Spark中的Driver通过Stage调度生成了物理执行计划,这个物理执行计划包罗了所有必要运行的Task,以及,最关键的,这些Task希望运行的节点信息,我们叫做Locality Preference,即本地性偏好。
但是在Yarn的场景下,资源是以Executor(Container)为单位进行调度,整个Container的粒度与Task不一一对应,Container的生命周期与Task不一一对应。在这种场景下,动态资源调度的基本使命就是

本文将具体讲解这个过程。
对于资源申请算法的基本流程,以及将Task和资源进行匹配的基本流程,本文都用实际例子进行讲解。
基于使命需求进行资源请求的整体过程

向Yarn请求资源是由客户端向ApplicationMaster申请,然后ApplicationMaster向Yarn发起请求的,而不是客户端直接向Yarn申请的。
资源是为了服务于Task的运行,Task的生成显然是Driver端负责的,Driver会根据物理执行计划生成的Task信息发送给ApplicationMaster,ApplicationMaster根据这些Task的相干信息进行资源申请。
ApplicationMaster启动以后,会有一个独立线程不断通过调用YarnAllocator.allocateResources()进行连续的资源更新(查看ApplicationMaster的launchReporterThread()方法)。这里叫资源更新,而不叫资源申请,由于这里的操纵包罗新的资源的申请,旧的无用的Container的取消,以及Blocklist Node的更新等多种操纵。
总而言之,ApplicationMaster作为客户端和Yarn的中心方,其资源申请的方法allocateResource()在逻辑上的功能为:
ApplicationMaster端的allocateResources()方法的基本流程在代码YarnAllocator.allocateResources()中:
  1.    ---------------------------------- YarnAllocator ------------------------------------
  2.   def allocateResources(): Unit = synchronized {
  3.    
  4.     updateResourceRequests()
  5. // 与Yarn进行资源交互
  6.    
  7.     val allocateResponse = amClient.allocate(progressIndicator) // 从Yarn端获取资源结果,包括新分配的、已经结束的等等
  8.     val allocatedContainers = allocateResponse.getAllocatedContainers()
  9.         handleAllocatedContainers(allocatedContainers.asScala) // 处理新分配的Container
  10.     val completedContainers = allocateResponse.getCompletedContainersStatuses() // 处理已经结束的Container
  11.      processCompletedContainers(completedContainers.asScala)
  12.     }
  13.   }
复制代码
ApplicationMaster端的allocateResources()的基本流程如下图所示:

可以看到,updateResourceRequests()
是资源请求的核心方法,它会负责同Yarn进行通信以进行资源请求。
在《超时导致SparkContext构造失败的题目探究》中,我们也介绍过,生成资源请求,其决定过程发生在方法updateResourceRequests()
中。我们紧张来看updateResourceRequests()
方法:
  1.    ---------------------------------- YarnAllocator ------------------------------------  def updateResourceRequests()
  2. : Unit = {    // 获取已经发送给Yarn但是待分配的ContainerRequest,计算待分配容器请求的数量    // 这些ContainerRequest是之前通过调用amClient.addContainerRequest 发送出去的    val pendingAllocate = getPendingAllocate    val numPendingAllocate = pendingAllocate.size    // 还没有发送请求的executor的数量    val missing = targetNumExecutors - numPendingAllocate -      numExecutorsStarting.get - numExecutorsRunning.get    // 还没有发送给Yarn的资源请求    if (missing > 0) {              /**       * 将待处置惩罚的container请求分为三组:本地匹配列表、本地不匹配列表和非本地列表。       */      val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
  3.         hostToLocalTaskCounts, pendingAllocate)
  4.       // staleRequests 的意思是,ApplicationMaster已经请求了这个Container,      // 但是这个ContainerRequest所要求的hosts里面没有一个是在 hostToLocalTaskCounts (即task所倾向于)中的,因此,必要取消这个Container Request,由于已经没故意义了      // cancel "stale" requests for locations that are no longer needed      staleRequests.foreach { stale =>        amClient.removeContainerRequest(stale)      }      val cancelledContainers = staleRequests.size       // consider the number of new containers and cancelled stale containers available      // 将新的container请求,以及刚刚取消的container,作为available container      val availableContainers = missing + cancelledContainers      // to maximize locality, include requests with no locality preference that can be cancelled      // 在availableContainers的基础上,再算上没有任何locality要求的并且还没有分配成功的container      val potentialContainers = availableContainers + anyHostRequests.size      // LocalityPreferredContainerPlacementStrategy,计算每一个Container 的Node locality和 Rack locality      val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(        potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,          allocatedHostToContainersMap, localRequests)      val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]      // 遍历ContainerLocalityPreferences数组中的每一个ContainerLocalityPreferences      containerLocalityPreferences.foreach {        case ContainerLocalityPreferences(nodes, racks) if nodes != null =>          newLocalityRequests += createContainerRequest(resource, nodes, racks)// 根据获取的locality,重新创建ContainerRequest请求      }      // 除了有locality需求的container以外,还有更多的available container必要被请求,因此对这些container请求也发送出去      if (availableContainers >= newLocalityRequests.size) {        // more containers are available than needed for locality, fill in requests for any host        for (i <- 0 until (availableContainers - newLocalityRequests.size)) {          newLocalityRequests += createContainerRequest(resource, null, null) // 构造ContainerRequest对象        }      } else {        val numToCancel = newLocalityRequests.size - availableContainers        // cancel some requests without locality preferences to schedule more local containers        anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>          amClient.removeContainerRequest(nonLocal)        }      }    } else if (numPendingAllocate > 0 && missing < 0) {      val numToCancel = math.min(numPendingAllocate, -missing)      val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)      matchingRequests.iterator().next().asScala                  .take(numToCancel).foreach(amClient.removeContainerRequest)    }  }
复制代码
其基本过程为:
所以,从上面可以看到,最关键的方法是LocalityPreferredContainerPlacementStrategy.localityOfRequestedContainers()方法,它根据当前的已有信息(统共的Container需求,有locality需求的task的数量,这些locality分布在每一个task上的数量等),生成一个Array[ContainerLocalityPreferences]数组,数组中的每一个元素代表了一个Container的需求,并包含了其locality的要求信息,然后基于生成的ContainerLocalityPreferences经过转换成ContainerRequest,发送给Yarn。
资源申请的生成过程详解

资源申请的生成,就是根据当前集群运行的基本情况,Task的基本需求,生成Yarn上的资源请求的过程。
资源申请的生成过程的简单例子

在相识其具体实现以前,我们以具体例子的方式,看一下localityOfRequestedContainers()方法的基本实现逻辑,从而对其动机和达成的效果有一个很好的理解,然后,我们再看其实现细节。
资源调度算法的代码解析

上面以实际例子解释了Spark将当前的Task的Locality需求信息转换成Yarn的资源请求的细节。下面,我们结合代码,具体看一下localityOfRequestedContainers()方法的实现细节:
  1.   def localityOfRequestedContainers(
  2.       numContainer: Int, // 需要进行计算的container的数量,包括missing的,cancel掉的(本地性不符合任何task要求的pending container),以及对本地性没有要求的pending的container
  3.       numLocalityAwareTasks: Int, // 对locality有要求的task的数量,这个是Driver端通过stageIdToExecutorPlacementHints计算然后通过RequestExecutor传递过来的
  4.       hostToLocalTaskCount: Map[String, Int], // 在Stage提交了以后,这个map里面保存了从host到期望分配到这个host的task的数量,这个是Driver端通过stageIdToExecutorPlacementHints传递过来的
  5.       allocatedHostToContainersMap: HashMap[String, Set[ContainerId]], // 已经launch起来的host -> container的映射关系
  6.       localityMatchedPendingAllocations: Seq[ContainerRequest] // 对本地性有要求的pending的container
  7.     ): Array[ContainerLocalityPreferences] = {
  8.     //  预期的从host到期望在上面再launch的新的container数量的映射关系
  9.     val updatedHostToContainerCount = expectedHostToContainerCount(
  10.       numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap,
  11.         localityMatchedPendingAllocations)
  12.     // 希望再launch的所有Host上的container的数量之和,在这里的例子中,是15
  13.     val updatedLocalityAwareContainerNum = updatedHostToContainerCount.values.sum
  14.     // The number of containers to allocate, divided into two groups, one with preferred locality,
  15.     // and the other without locality preference.
  16.     //  没有locality需求的container的数量
  17.     val requiredLocalityFreeContainerNum =
  18.       math.max(0, numContainer - updatedLocalityAwareContainerNum)
  19.     //  有locality需求的container的数量
  20.     val requiredLocalityAwareContainerNum = numContainer - requiredLocalityFreeContainerNum
  21.     val containerLocalityPreferences = ArrayBuffer[ContainerLocalityPreferences]()
  22.     if (requiredLocalityFreeContainerNum > 0) { // 如果有container是没有locality需求的
  23.       for (i <- 0 until requiredLocalityFreeContainerNum) {
  24.         containerLocalityPreferences += ContainerLocalityPreferences( // 为这些没有locality需求的container一一创建container需求
  25.           null.asInstanceOf[Array[String]], null.asInstanceOf[Array[String]])
  26.       }
  27.     }
  28.     if (requiredLocalityAwareContainerNum > 0) { // 如果有container有locality需求
  29.       val largestRatio = updatedHostToContainerCount.values.max // 全局的所有host中最大的container数量
  30.       // Round the ratio of preferred locality to the number of locality required container
  31.       // number, which is used for locality preferred host calculating.
  32.       var preferredLocalityRatio = updatedHostToContainerCount.map { case(k, ratio) =>
  33.         val adjustedRatio = ratio.toDouble * requiredLocalityAwareContainerNum / largestRatio
  34.         (k, adjustedRatio.ceil.toInt) // 往上取整
  35.       }
  36.       // 每个有locality需求的的Container request,为他们确定对应的hosts和rack
  37.       for (i <- 0 until requiredLocalityAwareContainerNum) {
  38.         // Only filter out the ratio which is larger than 0, which means the current host can
  39.         // still be allocated with new container request.
  40.         val hosts = preferredLocalityRatio.filter(_._2 > 0).keys.toArray // 还有container可以分配的一个或者多个hosts
  41.         val racks = hosts.map { h =>
  42.           resolver.resolve(yarnConf, h) // 解析这些host所在的rack
  43.         }.toSet
  44.         // 每一个ContainerLocalityPreferences代表一个Container
  45.         containerLocalityPreferences += ContainerLocalityPreferences(hosts, racks.toArray)
  46.         // Minus 1 each time when the host is used. When the current ratio is 0,
  47.         // which means all the required ratio is satisfied, this host will not be allocated again.
  48.         preferredLocalityRatio = preferredLocalityRatio.map { case (k, v) => (k, v - 1) }
  49.       }
  50.     }
  51.     // containerLocalityPreferences中的每一项都会变成一个新的Container Request
  52.     containerLocalityPreferences.toArray
  53.   }
复制代码
其参数的基本寄义是:

localityOfRequestedContainers()算法的基本过程为:
申请资源以后的处置惩罚:Executor的启动或者竣事

上面讲过,资源调度的入口方法allocateResources()会通过updateResourceRequests()
来计算所需资源并向Yarn进行资源的更新,包罗申请新的资源、开释无用的资源等:
  1.   def allocateResources(): Unit = synchronized {        updateResourceRequests()
  2. //         val allocateResponse = amClient.allocate(progressIndicator)    val allocatedContainers = allocateResponse.getAllocatedContainers()        handleAllocatedContainers(allocatedContainers.asScala)    val completedContainers = allocateResponse.getCompletedContainersStatuses()     processCompletedContainers(completedContainers.asScala)    }  }
复制代码
通过调用Yarn的尺度API allocate(),获取了资源分配的结果。再次强调,Yarn这一端的资源调度是异步调度,因此这个资源分配的结果并不是刚刚通过addContainerRequest()进行资源申请的结果,只是调用者在两次调用allocate() API的之间Yarn对于这个Application的新的资源分配结果。拿到了分配的Container,Spark就可以将Executor启动起来了(注意,是启动一个空的Executor,不是启动Task)。启动起来的Executor随后就会向DriverEndpoint注册自己,通信的具体过程参考《超时导致SparkContext构造失败的题目探究》。这里不再赘述。
对分配结果的处置惩罚,紧张是处置惩罚已经分配的Container以及已经运行竣事的Container:
  1.     val allocatedContainers = allocateResponse.getAllocatedContainers()
  2.     handleAllocatedContainers(allocatedContainers.asScala)
  3.     val completedContainers = allocateResponse.getCompletedContainersStatuses()
  4.     processCompletedContainers(completedContainers.asScala)
复制代码
对于新启动的Container的处置惩罚

对于一个刚刚分配成功的Container,其处置惩罚工作紧张包罗两个

这些过程在方法handleAllocatedContainers()中进行:
  1. --------------------------------- YarnAllocator --------------------------------------
  2. def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
  3.   val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
  4.   // 先处理Host Match的Container
  5.   val remainingAfterHostMatches = new ArrayBuffer[Container]
  6.   for (allocatedContainer <- allocatedContainers) {
  7.     matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
  8.       containersToUse, remainingAfterHostMatches)
  9.   }
  10.   // 处理Host Match以后剩余的Container
  11.   val remainingAfterRackMatches = new ArrayBuffer[Container]
  12.   for (allocatedContainer <- remainingAfterHostMatches) {
  13.     val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)
  14.     matchContainerToRequest(allocatedContainer, rack, containersToUse,
  15.       remainingAfterRackMatches)
  16.   }
  17.   // 处理Host Match和Rack Match以后剩余的Container
  18.   val remainingAfterOffRackMatches = new ArrayBuffer[Container]
  19.   for (allocatedContainer <- remainingAfterRackMatches) {
  20.     matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,
  21.       remainingAfterOffRackMatches)
  22.   }
  23.   // 在Host Match,Rack Match,以及ANY_HOST Match以后,依然还有剩余的Container,这只能是Bug
  24.   if (!remainingAfterOffRackMatches.isEmpty) {
  25.     for (container <- remainingAfterOffRackMatches) {
  26.       internalReleaseContainer(container)
  27.     }
  28.   }
  29.   /**
  30.    * 在这里会打印 Launching container container_1714042499037_5294_01_000002 on host
  31.     */
  32.   runAllocatedContainers(containersToUse)
  33.   }
复制代码
对于竣事的Container的处置惩罚

对于竣事的Container的处置惩罚在方法processCompletedContainers()中进行:
  1. ---------------------------------- YarnAllocator ----------------------------------------
  2. private[yarn] def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = {
  3.   for (completedContainer <- completedContainers) {
  4.     val containerId = completedContainer.getContainerId
  5.     val alreadyReleased = releasedContainers.remove(containerId) //  alreadyReleased记录这个Container是否已经被释放
  6.     val hostOpt = allocatedContainerToHostMap.get(containerId)
  7.     val onHostStr = hostOpt.map(host => s" on host: $host").getOrElse("")
  8.     val exitReason = if (!alreadyReleased) { // 这个Container还没有释放,那么走释放流程
  9.       // Decrement the number of executors running. The next iteration of
  10.       // the ApplicationMaster's reporting thread will take care of allocating.
  11.       numExecutorsRunning.decrementAndGet()
  12.       // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
  13.       // there are some exit status' we shouldn't necessarily count against us, but for
  14.       // now I think its ok as none of the containers are expected to exit.
  15.       val exitStatus = completedContainer.getExitStatus
  16.       val (exitCausedByApp, containerExitReason) = exitStatus match {
  17.         case ContainerExitStatus.SUCCESS =>
  18.           .....
  19.         case ContainerExitStatus.PREEMPTED =>
  20.          ....
  21.         case VMEM_EXCEEDED_EXIT_CODE =>
  22.           ....
  23.         case PMEM_EXCEEDED_EXIT_CODE =>
  24.           ....
  25.       }
  26.       if (exitCausedByApp) {
  27.         logWarning(containerExitReason)
  28.       } else {
  29.         logInfo(containerExitReason)
  30.       }
  31.       ExecutorExited(exitStatus, exitCausedByApp, containerExitReason)
  32.     } else {
  33.       // 如果我们释放了这个Container,那么说明一定是Driver直接通过 killExecutor
  34.       // 释放掉了这个Container,而不是它自行结束
  35.       ExecutorExited(completedContainer.getExitStatus, exitCausedByApp = false,
  36.         s"Container $containerId exited from explicit termination request.")
  37.     }
  38.     // 解除host -> container 以及 container -> host mapping
  39.     for {
  40.       host <- hostOpt // 这个Container对应的Host
  41.       containerSet <- allocatedHostToContainersMap.get(host) // 这个Container对应的Host上的所有container
  42.     } {
  43.       containerSet.remove(containerId) // 删除这个container
  44.       if (containerSet.isEmpty) { // 这个container是这个host上的最后一个container
  45.         allocatedHostToContainersMap.remove(host) // 删除host
  46.       } else {
  47.         allocatedHostToContainersMap.update(host, containerSet)
  48.       }
  49.       allocatedContainerToHostMap.remove(containerId) // 解除container -> host map
  50.     }
  51.     // 解除container -> executor mapping
  52.     containerIdToExecutorId.remove(containerId).foreach { eid =>
  53.       executorIdToContainer.remove(eid)
  54.       ....
  55.       if (!alreadyReleased) {
  56.         // The executor could have gone away (like no route to host, node failure, etc)
  57.         // Notify backend about the failure of the executor
  58.         numUnexpectedContainerRelease += 1
  59.         driverRef.send(RemoveExecutor(eid, exitReason))
  60.       }
  61.     }
  62.   }
  63. }
复制代码
可以看到,方法processCompletedContainers()会遍历Yarn返回的每一个Completed(注意,Completed只是代表Container运行竣事,但是运行结果大概是Succeed大概是Fail),然后逐个处置惩罚:
基于资源分配结果进行使命调度

上面讲到对于新分配的Container的处置惩罚,在收到Yarn返回的分配的Container以后,ApplicationMaster会启动对应的Executor。这些Executor启动以后,会向Driver注册自己以告知Driver自己的存在,Driver进而将Task调度到Executor中。
其实,Task的调度的触发不仅仅是新分配了Container或者新Launch了Executor,基本上在集群的资源大概发生变化的情况下,都会触发Task的调度,因此,Task的调度是一个不断将Pending Task与可用资源进行匹配然后调度出去的过程
我列举了下面四种可以触发Driver端的CoarseGrainedSchedulerBackend通过运行makeOffers(或者只针对某一个Executor的makeOffer)来进行使命调度:
在以Locality为思量重点的Task的调度,就是根据locality从高到低(executor-local 优先级最高),参照当前答应的优先级,取出对应的Task进行调度。如果当前优先级的Task已经调度完毕,或者当前locality的一部门或者全部Task经过了很久还没有完成调度(即当前的系统资源无法完全满足当前的locality需求),那么就必要低落locality再次尝试进行调度。
下文会具体讲解makeOffers()的具体流程。
PendingTask的生成:

Task的调度的紧张逻辑是满足Task 的Location Preference, 即每一个Task对运行位置 (Executor, Host, Rack等等)上的偏好。我们先看看TaskLocation的寄义和生成过程,然后看看Spark的Driver是怎样通过Locality-Aware的调度方式,尽最大大概满足每一个Task的本地性需求。
TaskLocation的构造过程

在Spark中,一个Task的Location归根结底是由这个Task的Split决定的,Split代表了一个切分,比如,对一个HDFS文件的切分。在Hadoop上,一个Split表现为一个InputSplit接口的实现类的一个实例。
一个HDFS文件的一个Split是由FileSplit对象表达,对象中hostInfos存放了一个SplitLocationInfo数组,每一个SplitLocationInfo对象存放了一个这个Split的一个Replica的Location信息(由于在HDFS上文件是多副本的),包罗具体的Hostname(即DataNode地点的节点)以及这个Host是否在内存中缓存了这个Split的标记位。
当然,在HDFS的维度,我们说一个Split被缓存,其实缓存的是这个Split对应的Replica。在Hadoop中一个文件的Split对应一个FileSplit对象,代码如下:
  1. --------------------------------------- FileSplit ------------------------------------------------
  2. public FileSplit(Path file, long start, long length, String[] hosts,
  3.      String[] inMemoryHosts) {
  4.    this(file, start, length, hosts);
  5.    hostInfos = new SplitLocationInfo[hosts.length];
  6.    for (int i = 0; i < hosts.length; i++) {
  7.      // because N will be tiny, scanning is probably faster than a HashSet
  8.      boolean inMemory = false;
  9.      for (String inMemoryHost : inMemoryHosts) {
  10.        if (inMemoryHost.equals(hosts[i])) {
  11.          inMemory = true;
  12.          break;
  13.        }
  14.      }
  15.      hostInfos[i] = new SplitLocationInfo(hosts[i], inMemory);
  16.    }
  17. }
复制代码
其中,一个Replica的Location信息对应了一个SplitLocationInfo对象,包含了这个Split是否在对应的Host上缓存了,以及,对应的Host(DataNode) 信息。在多副本的环境下,一个FileSplit对应的是一个数组 SplitLocationInfo[]:
  1. ------------------------------------------ SplitLocationInfo -----------------------------------------
  2. public class SplitLocationInfo {
  3.   private boolean inMemory;
  4.   private String location;
  5.   
  6.   public SplitLocationInfo(String location, boolean inMemory) {
  7.     this.location = location;
  8.     this.inMemory = inMemory;
  9.   }
复制代码
我们看一下Spark在提交Stage和Task的过程中是怎么获取Task的Location信息的,以及在获取以后是如何基于Location调度Task的。
在HDFS的场景下,Spark是将每一个Stage对应的RDD(一个NewHadoopRDD对象)中的Partition(一个NewHadoopPartition对象)的Location信息(如果有)存放在这个NewHadoopPartition的serializableHadoopSplit中,其实是对InputSplit的封装:
  1. --------------------------------------- NewHadoopPartition ------------------------------------
  2. private[spark] class NewHadoopPartition(
  3.     rddId: Int,
  4.     val index: Int,
  5.     rawSplit: InputSplit with Writable) // 这个HadoopPartition对应的InputSplit
  6.   extends Partition {
  7.   val serializableHadoopSplit = new SerializableWritable(rawSplit)
复制代码
Stage以及Partition的生成是在提交以进步行的,即执行计划的生成阶段形成的。那么,到了Stage的提交阶段,是怎么利用Stage和Partition中的Location信息,生成具有Location Preference的Task信息的呢?我们看看Stage提交的时间是如何利用Partition的Location信息的。
下面的这段代码是一段经典代码,Spark中DagScheduler提交Stage的时间,会首先查抄这个Stage是否有未提交的Parent Stage:

  1.   /** Submits stage, but first recursively submits any missing parents. */
  2.   private def submitStage(stage: Stage) {
  3.     val jobId = activeJobForStage(stage)
  4.     if (jobId.isDefined) {
  5.       if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
  6.         val missing = getMissingParentStages(stage).sortBy(_.id)
  7.         if (missing.isEmpty) { // 没有missing parent stage,那么提交这个Stage的所有task
  8.           submitMissingTasks(stage, jobId.get)
  9.         } else {
  10.           for (parent <- missing) { // 有missing parent stage,那么先提交
  11.             submitStage(parent)
  12.           }
  13.           waitingStages += stage // 将当前stage放入到waitingStages中随后提交
  14.         }
  15.       }
  16.     }
  17.   }
复制代码
在提交Stage的时间,会提交这个Stage所有的Task。我们忽略其他细节,从下面的代码可以看到,Spark会通过方法getPreferredLocs()针对这个Stage的每一个Partition获取这个Partition的Location信息,放到一个taskIdToLocations: Map[Int, Seq[TaskLocation]]中,然后,会提交这些已经带有Location信息的Task:
  1. -------------------------------------- DAGScheduler --------------------------------------
  2.   /** Called when stage's parents are available and we can now do its task. */
  3.   private def submitMissingTasks(stage: Stage, jobId: Int) {
  4.     // First figure out the indexes of partition ids to compute.
  5.         val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
  6.     .....
  7.     val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
  8.       stage match {
  9.         case s: ShuffleMapStage =>
  10.           // 计算每一个partition(这里的partition就是一个task)的prefered location的list
  11.           partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
  12.         case s: ResultStage =>
  13.           partitionsToCompute.map { id =>
  14.             val p = s.partitions(id) // ResultStage.partitions中存放的是对应的partition id
  15.             (id, getPreferredLocs(stage.rdd, p))
  16.           }.toMap
  17.       }
  18.     }
  19.     // 在获取了这个Stage的每一个task的location preference信息(如果有)以后,创建一个新的Stage Attempt
  20.     // 随后,在调度这个Stage的task的时候,这个task中就包含了对应的location preference
  21.     stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
  22.     // 提交这个Stage。基于listenerBus的事件触发机制,调度方法就会将包含了Location Preference的Task调度出去
  23.     listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
  24.     .......
  25.     val tasks: Seq[Task[_]] = try {
  26.       val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
  27.       stage match {
  28.         case stage: ShuffleMapStage =>
  29.           stage.pendingPartitions.clear()
  30.           partitionsToCompute.map { id =>
  31.             val locs = taskIdToLocations(id)
  32.             val part = partitions(id)
  33.             stage.pendingPartitions += id
  34.             new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
  35.               taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
  36.               Option(sc.applicationId), sc.applicationAttemptId)
  37.           }
  38.         case stage: ResultStage =>
  39.           partitionsToCompute.map { id =>
  40.             val p: Int = stage.partitions(id)
  41.             val part = partitions(p)
  42.             val locs = taskIdToLocations(id)
  43.             new ResultTask(stage.id, stage.latestInfo.attemptNumber,
  44.               taskBinary, part, locs, id, properties, serializedTaskMetrics,
  45.               Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
  46.           }
  47.       }
  48.     }
  49.     if (tasks.size > 0) {
  50.       // 提交task
  51.       taskScheduler.submitTasks(new TaskSet(
  52.         tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
  53.     }
  54.   }
复制代码
submitMissingTasks()方法的基本过程为:
下面的代码显示了getPreferredLocsInternal()方法的细节,即获取一个RDD中某一个Partition的Location信息(一个Seq[String]),返回对应的TaskLocation接口的某个实现类的Seq:
  1. -------------------------------------- DAGScheduler --------------------------------------
  2.   private def getPreferredLocsInternal(
  3.       rdd: RDD[_], // 当前Stage的RDD
  4.       partition: Int, // 这个Partitoin的index,即在rdd.partitions中的index
  5.       visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
  6.     // If the partition has already been visited, no need to re-visit.
  7.     // This avoids exponential path exploration.  SPARK-695
  8.     if (!visited.add((rdd, partition))) {
  9.       // Nil has already been returned for previously visited partitions.
  10.       return Nil
  11.     }
  12.     // 如果这个partition已经缓存,那么就返回缓存以后的Partion的Location信息
  13.     val cached = getCacheLocs(rdd)(partition)
  14.     if (cached.nonEmpty) {
  15.       return cached
  16.     }
  17.     // 如果这个RDD含有一个Location Preference,那么就返回这个Location Preference
  18.     val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
  19.     if (rddPrefs.nonEmpty) {
  20.       return rddPrefs.map(TaskLocation(_)) // 这里会根据getPreferredLocations()的返回值Seq[String],将List中的每一个值映射到一个TaskLocation的实现类,即一个Seq[TaskLocation]
  21.     }
复制代码
TaskLocation的apply()方法会根据传入的参数的类型,返回其对应的差别实现类:
  1. ---------------------------------- TaskLocation ------------------------------------
  2.   /**
  3.    * Create a TaskLocation from a string returned by getPreferredLocations.
  4.    * These strings have the form executor_[hostname]_[executorid], [hostname], or
  5.    * hdfs_cache_[hostname], depending on whether the location is cached.
  6.    */
  7.   def apply(str: String): TaskLocation = {
  8.     val hstr = str.stripPrefix(inMemoryLocationTag)
  9.     if (hstr.equals(str)) {
  10.       if (str.startsWith(executorLocationTag)) {
  11.         val hostAndExecutorId = str.stripPrefix(executorLocationTag)
  12.         val splits = hostAndExecutorId.split("_", 2)
  13.         val Array(host, executorId) = splits
  14.         new ExecutorCacheTaskLocation(host, executorId)
  15.       } else {
  16.         new HostTaskLocation(str)
  17.       }
  18.     } else {
  19.       new HDFSCacheTaskLocation(hstr)
  20.     }
  21.   }
  22. }
复制代码
可以看到,rdd.partitions(partition)方法根据这个Partition Index,返回了这个Partition对象。对于NewHadoopRDD,这个Partition的实现是NewHadoopPartition。一个NewHadoopPartition对象中存放了对应的InputSplit接口的实现类。比如,对于一个文件,InputSplit的实现是FileSplit,而InputSplit则包含了对应的Location信息:
然后,调用对应的RDD的getPreferredLocations()方法获取Location Preference,即一个NewHadoopRDD中包含了所有的Partition,每一个Partition由一个Partition接口的实现类NewHadoopPartition对象表现,每个NewHadoopPartition中存放了这个Partition的InputSplit信息:
  1.   ----------------------------------- NewHadoopRDD ---------------------------------------
  2.   override def getPreferredLocations(hsplit: Partition): Seq[String] = {
  3.     // 返回这个 NewHadoopPartition对应的Hadoop层面的Split信息,比如,一个文件的FileSplit,
  4.     // 包含了这个File中这个split的起始位置,长度,replica的位置
  5.     val split = hsplit.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value
  6.     // 提取这个FileSplit的多个Replica位置信息
  7.     val locs = HadoopRDD.convertSplitLocationInfo(split.getLocationInfo)
  8.     locs.getOrElse(split.getLocations.filter(_ != "localhost"))
  9.   }
复制代码
在Spark端,在生成每一个RDD中的Partition的信息的时间,其映射关系其实从底向上为 HDFS Split(InputSplit) -> Partition(NewHadoopPartition) -> Task 的关系,因此,在生成RDD Partition的时间,会将对应的Split的Location信息转换成Partition的Location信息,这个Partition的Location信息利用TaskLocation对象的具体实现类来表达的。这个转换是由object方法convertSplitLocationInfo()负责的,该方法的输入是一个Array[SplitLocationInfo],输出为对应的Seq[String],即对应的Location的字符串表现,即:

TaskLocation的实现类除了HDFSCacheTaskLocation和HostTaskLocation,还有ExecutorCacheTaskLocation,即这个Task希望运行在这个Executor上,比如,下面两种情况,这个Partition的Locality Preference是希望精确到对应的Executor的:
  1.   private[scheduler]
  2.   def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
  3.     // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
  4.     if (!cacheLocs.contains(rdd.id)) {
  5.       // Note: if the storage level is NONE, we don't need to get locations from block manager.
  6.       val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {
  7.         IndexedSeq.fill(rdd.partitions.length)(Nil) // 如果对应RDD的StorageLevel是NONE,那么返回一个空个的TaskLocation数组
  8.       } else { // 如果RDD的StorageLevel不是空的,那么会构造对应的ExecutorCacheTaskLocation
  9.         val blockIds =
  10.           rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
  11.         blockManagerMaster.getLocations(blockIds).map { bms => // 通过BlockManagerMaster获取对应的Block的位置,然后构造对应的ExecutorCacheTaskLocation
  12.           bms.map(bm => TaskLocation(bm.host, bm.executorId))
  13.         }
  14.       }
  15.       cacheLocs(rdd.id) = locs
  16.     }
  17.     cacheLocs(rdd.id)
  18.   }
复制代码
对应的TaskLocation接口的实现类的toString()方法就返回了这个TaskLocation的String表现。比如,HostTaskLocation.toString()就返回对应的Hostname:
  1. ----------------------------------------- HostTaskLocation ----------------------------------
  2. /**
  3. * A location on a host.
  4. */
  5. private [spark] case class HostTaskLocation(override val host: String) extends TaskLocation {
  6.   override def toString: String = host
  7. }
复制代码
  1. -------------------------------------- HDFSCacheTaskLocation ----------------------------------
  2. /**
  3. * A location on a host that is cached by HDFS.
  4. */
  5. private [spark] case class HDFSCacheTaskLocation(override val host: String) extends TaskLocation {
  6.   override def toString: String = TaskLocation.inMemoryLocationTag + host
  7. }
复制代码
这样,DAGScheduler.submitMissingTasks()就会提交这些带有Location Preference的Task。这些Task会在Driver端经过一些统计计算,让ApplicationMaster根据提供的资源量、Task的位置倾向等,向Yarn申请资源,申请到的资源向Driver注册,Driver再将Task和Container进行最有匹配,最大程度满足Task的位置需求。
根据TaskLocation信息,将Task添加到差别的pendingTask数组中

所有必要运行因此必要相应资源的Task,Driver都会将对应的Task添加到pendingTask中,这些pendingTask是由TaskSetManager维护的,一个TaskSetManager对象是一个Stage的使命聚集,紧张负责这个Stage的Task的管理和调度。
根据每一个Task的资源本地性需求的差别,pendingTask分别维护在下面的Map中,代码如下:
  1.   // 希望在某一个Executor上运行的pendingTasks,Key是Executor ID,Value是希望在这个Executor上运行的TaskID的列表
  2.   private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
  3.   // 希望在某一个Host上运行的pendingTasks,Key是对应的Host,Value是希望在这个Host上运行的TaskID的列表
  4.   private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]
  5.   // 希望在某个一Rack上运行的pendingTasks,Key是对应的Rack,Value是希望在这个Rack上运行的TaskID的列表
  6.   private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
  7.   // 没有任何的本地性需求的pendingTasks的列表
  8.   private[scheduler] var pendingTasksWithNoPrefs = new ArrayBuffer[Int]
复制代码
向PendingTask中添加使命的时间,会根据这个task的Locality Preference,将对应的pendingTask放入到上面差别的使命聚会合:
  1.   /** 将一个Task添加到它所属的所有的pending-task lists中去 */
  2.   private[spark] def addPendingTask(index: Int) {
  3.     for (loc <- tasks(index).preferredLocations) {
  4.       loc match {
  5.         case e: ExecutorCacheTaskLocation => // Executor级别的Location Preference
  6.           pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
  7.         case e: HDFSCacheTaskLocation =>  // HDFS缓存 级别的Location Preference
  8.           val exe = sched.getExecutorsAliveOnHost(loc.host) // 获取这个location所在的host上的所有executor id
  9.           exe match {
  10.             case Some(set) =>
  11.               for (e <- set) { // 对于这个host上的每一个executor
  12.                  // 将这个task添加到这个host上的每一个executor上去
  13.                 pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
  14.               }
  15.           }
  16.         case _ =>
  17.       }
  18.       // 将这个task添加到pendingTasksForHost
  19.       pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
  20.       for (rack <- sched.getRackForHost(loc.host)) {
  21.         pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
  22.       }
  23.     } // end of for loop
  24.     if (tasks(index).preferredLocations == Nil) {
  25.       pendingTasksWithNoPrefs += index
  26.     }
  27.     allPendingTasks += index  // No point scanning this whole list to find the old task there
  28.   }
复制代码
其基本逻辑为:
  1.     for (loc <- tasks(index).preferredLocations) {
  2.       ......
  3.     }
复制代码
总之,我们必要注意:

可用的LocationLevel的计算

Spark用一个枚举类型TaskLocality来表达差别的本地级别:
  1. object TaskLocality extends Enumeration {
  2.   // Process local is expected to be used ONLY within TaskSetManager for now.
  3.   val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Value
复制代码
PROCESS_LOCAL: 进程本地,即这个Task希望调度到某一个特定的Executor上
NODE_LOCAL: 节点本地,即这个Task希望调度到某一个特定的节点
NO_PREF: 没有本地倾向,即这个Task可以任意调度到集群中的任何节点
RACK_LOCAL: 机架本地,即这个Task希望调度到某一个特定的rack上
ANY: 无法满足要求,是前面四种本地性倾向都无法满足要求时的最后的选择策略,将有节点倾向的Task任意调度到集群中无法满足其本地性倾向的位置上。
这里必要注意的是:
在一个TaskSet的TaskSetManager构造的时间,以及后来一个新的Executor到场或者丢失的时间,都会重新计算这个TaskSet的Valid Locality Levels。所以,应该注意到:

下面的方法显示了计算Valid Locality Levels的基本过程:
  1. ------------------------------------- TaskSetManager --------------------------------------
  2.   private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
  3.     import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
  4.     val levels = new ArrayBuffer[TaskLocality.TaskLocality]
  5.     // 如果有任务在等待执行器(executor),并且这些执行器中有活着的执行器,那么就将PROCESS_LOCAL添加到有效级别列表中。
  6.     if (!pendingTasksForExecutor.isEmpty &&
  7.         pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
  8.       levels += PROCESS_LOCAL
  9.     }
  10.     // 如果有任务在等待主机(host),并且这些主机上有活着的执行器,那么就将NODE_LOCAL添加到有效级别列表中。
  11.     if (!pendingTasksForHost.isEmpty &&
  12.         pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
  13.       levels += NODE_LOCAL
  14.     }
  15.     if (!pendingTasksWithNoPrefs.isEmpty) {
  16.       levels += NO_PREF
  17.     }
  18.     // 如果有任务在等待机架(rack),并且这些机架上有活着的主机,那么就将RACK_LOCAL添加到有效级别列表中。
  19.     if (!pendingTasksForRack.isEmpty &&
  20.         pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
  21.       levels += RACK_LOCAL
  22.     }
  23.     levels += ANY
  24.     levels.toArray
  25.   }
复制代码
该方法返回一个TaskLocality数组,即如果对应的pendingTask数组中有Task,那么就将对应的TaskLocality添加到数组中。这里必要注意,往数组中添加TaskLocality是按照TaskLocality的值从低到高(Locality优先级从高到低)的顺序添加的,从下文介绍基于Locality的Task调度可以看到,调度时会遍历这个返回的TaskLocality数组,即调度时按照Locality优先级从高到低进行的。最后的TaskLocality.ANY一定会最后添加到结果中,作为一个优先级的保底操纵
基于Locality的Task调度

上文讲过,Task的调度的触发,以及其通过makeOffers()方法进行调度。在这里,我们具体讲解这个调度的基本细节。
makeOffers()的基本功能,就是为当前的Executor的资源剩余情况,生成对应的WorkerOffer,代表这些Executor可提供的运行资源,其中包含了对应的executorId,地点的host信息以及可用的vCore信息:
  1. private[spark]
  2. case class WorkerOffer(executorId: String, host: String, cores: Int)
复制代码
然后TaskScheduler会根据这些剩余资源,将对应的pendingTask调度出去,当然,调度过程中必要依赖对应的Task的Locality信息。
  1. -------------------------------------- CoarseGrainedSchedulerBackend -------------------------------
  2.     private def makeOffers() {
  3.       // 根据集群当前的可用资源状况,生成Task的调度结果
  4.       val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
  5.         // 筛选出alive的executor
  6.         val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
  7.         val workOffers = activeExecutors.map {
  8.           case (id, executorData) =>
  9.             new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
  10.         }.toIndexedSeq // 构造每一个Executor上的可用资源
  11.         scheduler.resourceOffers(workOffers)  // 在这里依赖于TaskScheduler来调度对应的task到对应的worker上
  12.       }
  13.       launchTasks(taskDescs) // 启动这些tasks
  14.     }
复制代码
resourceOffers()方法是TaskSchedulerImpl的成员方法,其输入是一系列的WorkerOffer,返回可以进行调度的所有Task(每一个Task由一个TaskDescription表现)。我们后面会看到,TaskSchedulerImpl会遍历当前必要调度的所有TaskSet尝试进行调度,而不是某一个TaskSet。
resourceOffers()代码如下所示:
  1. ---------------------------------------- TaskSchedulerImpl ------------------------------
  2.   def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
  3.     // Mark each slave as alive and remember its hostname
  4.     // Also track if new executor is added
  5.     var newExecAvail = false
  6.     for (o <- offers) { // 对于每一个WorkerOffer
  7.       if (!hostToExecutors.contains(o.host)) {
  8.         hostToExecutors(o.host) = new HashSet[String]() // 构建这个Host的Executor的map
  9.       }
  10.       // 是否有新增的executor进来。如果有,则维护executor相关的map信息
  11.       if (!executorIdToRunningTaskIds.contains(o.executorId)) {
  12.         hostToExecutors(o.host) += o.executorId // 将这个executor加到这个host -> executors 的map中
  13.         executorAdded(o.executorId, o.host)
  14.         executorIdToHost(o.executorId) = o.host // 将这个host加到executor -> host 的毛重
  15.         executorIdToRunningTaskIds(o.executorId) = HashSet[Long]() // 构建这个executor -> tasks的map
  16.         newExecAvail = true
  17.       }
  18.       for (rack <- getRackForHost(o.host)) {
  19.         hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host // 构建rack -> hosts的map
  20.       }
  21.     }
  22.     // Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
  23.     // this here to avoid a separate thread and added synchronization overhead, and also because
  24.     // updating the blacklist is only relevant when task offers are being made.
  25.     blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())
  26.     val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
  27.       offers.filter { offer =>
  28.         !blacklistTracker.isNodeBlacklisted(offer.host) &&
  29.           !blacklistTracker.isExecutorBlacklisted(offer.executorId)
  30.       }
  31.     }.getOrElse(offers)
  32.     // 将offer进行随机shuffle,返回打乱顺序以后的IndexedSeq[WorkerOffer]
  33.     val shuffledOffers = shuffleOffers(filteredOffers)
  34.     // Build a list of tasks to assign to each worker.
  35.     // 根据当前的WorkerOffer,预构建一个TaskDescription的二维数组
  36.     val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
  37.     val availableCpus = shuffledOffers.map(o => o.cores).toArray // 构建WorkerOffer -> 可用CPU的对应关系
  38.     val sortedTaskSets = rootPool.getSortedTaskSetQueue
  39.     for (taskSet <- sortedTaskSets) {
  40.       if (newExecAvail) { // 如果有新的Executor加入进来
  41.         taskSet.executorAdded() // 重新计算这个TaskSet的locality的相关信息
  42.       }
  43.     }
  44.     // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
  45.     // of locality levels so that it gets a chance to launch local tasks on all of them.
  46.     // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
  47.     for (taskSet <- sortedTaskSets) { // 按照调度顺序,取出每一个TaskSet
  48.       var launchedAnyTask = false
  49.       var launchedTaskAtCurrentMaxLocality = false
  50.       // locality从低到高遍历这个TaskSet中的每一个可用的localityLevels,locality越低代表本地性越好
  51.       for (currentMaxLocality <- taskSet.myLocalityLevels) {
  52.         do {
  53.           launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
  54.             taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
  55.           launchedAnyTask |= launchedTaskAtCurrentMaxLocality
  56.         } while (launchedTaskAtCurrentMaxLocality)
  57.       }
  58.       if (!launchedAnyTask) { // 没有launch任何一个task
  59.         taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
  60.       }
  61.     }
  62.     if (tasks.size > 0) {
  63.       hasLaunchedTask = true
  64.     }
  65.     return tasks
  66.   }
复制代码
resourceOffers()的基本流程如下图所示:

其基本流程为:
所以,对于一个TaskSet中的task根据当前要求的TaskLocality进行使命调度,发生在方法resourceOfferSingleTaskSet()中。这个方法根据答应的最大的locality(currentMaxLocality, 这里的最大指的是最低要求,即,不可以比这个locality更宽松了),当前可用资源(shuffledOffers),必要调度的TaskSet,返回成功调度的这个TaskSet中的task:
  1.   private def resourceOfferSingleTaskSet(
  2.       taskSet: TaskSetManager, // 当前的TaskSet
  3.       maxLocality: TaskLocality, // 当前最大的locality,最大的意思是最优的locality
  4.       shuffledOffers: Seq[WorkerOffer], // 每一个WorkerOffer代表了一个可用资源
  5.       availableCpus: Array[Int], // 这个shuffledOffers中的每一个WorkerOffer所代表的可用的VCores
  6.       // 一个WorkerOffer按照可用cpu以及每个task的cpu,算出Task的数量
  7.       tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
  8.     var launchedTask = false
  9.     // nodes and executors that are blacklisted for the entire application have already been
  10.     // filtered out by this point
  11.     for (i <- 0 until shuffledOffers.size) { // 遍历每一个WorkerOffer(每一个WorkerOffer对应了一个executor)
  12.       val execId = shuffledOffers(i).executorId
  13.       val host = shuffledOffers(i).host
  14.       for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { // 往这个Offer上调度一个Task
  15.                tasks(i) += task // 保存调度结果
  16.                val tid = task.taskId
  17.                ....
  18.                launchedTask = true
  19.      }
  20.     }
  21.     return launchedTask
  22.   }
复制代码
从上面的代码可以看到,resourceOfferSingleTaskSet()方法会遍历当前所有的可用资源WorkerOffer,尝试按照当前的maxLocality,调用resourceOffer()方法,往这个WorkerOffer上面调度一个task。
resourceOffer()方法是TaskSet的成员方法,其根据executorID, hostname以及最大答应的locality(maxLocality, 即locality不可以再差了),尝试从pendingTask中选出一个满足条件的task调度:
  1.   def resourceOffer(
  2.       execId: String, // executor id
  3.       host: String, // executor所在的host
  4.       maxLocality: TaskLocality.TaskLocality) // 所容许的locality,不能比这个locality更宽松
  5.     : Option[TaskDescription] =
  6.   {
  7.       ....
  8.       val curTime = clock.getTimeMillis()
  9.       // allowedLocality 代表当前最宽松的locality是什么,显然,在开始的时候,我们希望allowedLocality严格一点儿,
  10.       // 后面如果分配失败了,再逐渐放松要求
  11.       var allowedLocality = maxLocality
  12.       //  如果 maxLocality == TaskLocality.NO_PREF,那么allowedLocality = maxLocality,
  13.       // 进入TaskLocality.NO_PREF本来就是对调度没有任何要求
  14.       if (maxLocality != TaskLocality.NO_PREF) {
  15.         allowedLocality = getAllowedLocalityLevel(curTime) // 根据当前的时间,更新当前时间节点下的allowedLocality,这个allowedLocality可能小于maxLocality,这时候就会使用这个更小(更严格)的locality
  16.         if (allowedLocality > maxLocality) { // 如果,allowedLocality比maxLocality更松弛,那么还是以maxLocality为准
  17.           // We're not allowed to search for farther-away tasks
  18.           allowedLocality = maxLocality
  19.         }
  20.       }
  21.       // 根据当前计算得到的locality 弹出对应的tasks,然后调度起来
  22.       dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
  23.         // Found a task; do some bookkeeping and return a task description
  24.         .....
  25.         currentLocalityIndex = getLocalityIndex(taskLocality)
  26.         lastLaunchTime = curTime // 更新这个TaskSetManager的最后一个task的启动时间
  27.         .....
  28.         ....
  29.         addRunningTask(taskId)
  30.         sched.dagScheduler.taskStarted(task, info)
  31.         new TaskDescription(.....)
  32.       }
  33.    
  34.   }
复制代码
这个方法的基本逻辑为:
可以看到,TaskSetManager.getAllowedLocalityLevel()方法根据当前时间,计算当前真正的答应的locality,其核心原理为,每一个TaskSet都维护了currentLocalityIndex,myLocalityLevels[currentLocalityIndex]就对应了这个TaskSet当前应该利用的Locality级别。只有在某种条件下,currentLocalityIndex索引会通过自增的方式向前移动以移动到下一个Locality(即Locality变得更松懈),即getAllowedLocalityLevel()方法控制了currentLocalityIndex移动条件,保证currentLocalityIndex只有在某些条件下才气移动到下一级。移动到下一级的条件为:
具体代码如下所示:
  1. ------------------------------------ TaskSetManager --------------------------------------------
  2.   private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
  3.     // Remove the scheduled or finished tasks lazily
  4.     def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
  5.       ....
  6.     }
  7.     def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
  8.       ......
  9.     }
  10.     // 循环遍历所有位置级别,直到达到最后一级,currentLocalityIndex从小到大,本地性由强到弱
  11.     while (currentLocalityIndex < myLocalityLevels.length - 1) {
  12.       // 查看当前的locality下面是否有task还没有运行
  13.       val moreTasks = myLocalityLevels(currentLocalityIndex) match {
  14.         case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
  15.         case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
  16.         case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
  17.         case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
  18.       }
  19.       if (!moreTasks) {
  20.         // 如果当前locality级别没有更多的任务,将lastLaunchTime更新为当前时间,并将位置级别索引增加1。
  21.         lastLaunchTime = curTime
  22.         currentLocalityIndex += 1 // 查看下一级locality
  23.       } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
  24.         // 尽管这个localityWaits(currentLocalityIndex) 的locality上还有对应的pending task,
  25.         // 但是达到等待时间的情况,会进入下一个locality
  26.         lastLaunchTime += localityWaits(currentLocalityIndex)
  27.         currentLocalityIndex += 1 // 查看下一级locality
  28.       } else {
  29.         // 在myLocalityLevels(currentLocalityIndex)的locality下还有更多的task,并且没有超时,那么就返回这个locality
  30.         return myLocalityLevels(currentLocalityIndex)
  31.       }
  32.     }
  33.     myLocalityLevels(currentLocalityIndex)
  34.   }
复制代码
其中,TaskSetManager中的lastLaunchTime维护了这个TaskSet中上一次启动一个Task的时间,即,只要我们在满足allowedLocalityLevel的条件下,在某一个WorkOffer上启动了一个Task,那么就将这个TaskSet的lastLaunchTime更新为当前时间。这样,如果一个TaskSet上长时间无法launch一个Task,那么这个allowedLocalityLevel就会自增,即低落Locality Preference的要求。
通过TaskSetManager中getLocalityWait()方法可以看到我们可以为每一个TaskLocality配置差别的超时时间:
  1. ------------------------------------ TaskSetManager -------------------------------------------
  2.   private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
  3.     val defaultWait = conf.get(config.LOCALITY_WAIT)
  4.     val localityWaitKey = level match {
  5.       case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process"
  6.       case TaskLocality.NODE_LOCAL => "spark.locality.wait.node"
  7.       case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack"
  8.       case _ => null
  9.     }
  10.     if (localityWaitKey != null) {
  11.       conf.getTimeAsMs(localityWaitKey, defaultWait.toString)
  12.     } else {
  13.       0L
  14.     }
  15.   }
复制代码
在获取了最后的allowedLocalityLevel以后,就尝试通过dequeueTask(execId, host, allowedLocality)方法,根据当前WorkOffer的信息(execID, host)以及allowedLocalityLevel,从对应的pendingTask中取出符合要求的task,这里的符合要求指的是,这个Task的locality偏好是答应在这个host+executorID上运行的,并且如果在这个host + executorID上运行,其locality是不会比maxLocality更差的。即:
dequeueTask()的逻辑如下所示。它根据当前的WorkerOffer(Executor ID + hostname)和答应的最大的locality(maxLocality),返回一个locality满足要求的Task准备运行。这里的满足要求指的是,这个Task如果运行在这个WorkerOffer上,能够满足所答应的最差的locality(maxLocality)的要求。
  1. ------------------------------------ TaskSetManager --------------------------------------------
  2.   private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
  3.     : Option[(Int, TaskLocality.Value, Boolean)] =
  4.   {
  5.     // TaskLocality.isAllowed(maxLocality, TaskLocality.PROCESS_LOCAL) 永远返回true,因此不做判断
  6.     for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {
  7.       return Some((index, TaskLocality.PROCESS_LOCAL, false))
  8.     }
  9.     // 假如maxLocality允许NODE_LOCAL,那么就从getPendingTasksForHost中取出对应的Host的task返回
  10.     if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
  11.       for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) {
  12.         return Some((index, TaskLocality.NODE_LOCAL, false))
  13.       }
  14.     }
  15.     // 假如maxLocality允许NO_PREF,那么就从pendingTasksWithNoPrefs中取出对应的task返回
  16.     if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
  17.       // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
  18.       for (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) {
  19.         return Some((index, TaskLocality.PROCESS_LOCAL, false))
  20.       }
  21.     }
  22.     // 假如maxLocality允许RACK_LOCAL,那么就获取当前的host的rack,从对应的rack的pendingTasks中取出对应的task返回
  23.     if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
  24.       for {
  25.         rack <- sched.getRackForHost(host)
  26.         index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))
  27.       } {
  28.         return Some((index, TaskLocality.RACK_LOCAL, false))
  29.       }
  30.     }
  31.     if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
  32.       for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) {
  33.         return Some((index, TaskLocality.ANY, false))
  34.       }
  35.     }
  36.     // find a speculative task if all others tasks have been scheduled
  37.     dequeueSpeculativeTask(execId, host, maxLocality).map {
  38.       case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
  39.   }
复制代码
示例1:
当前的WorkerOffer信息如下:

在当前的TaskSet中pendingTasksForExecutor中有一个Task A,其preference恰好是 Executor 25535。因此,根据上文所讲解的pendingTask的添加过程,Task A会同时存在于pendingTasksForExecutor,pendingTasksForHost,pendingTasksForRack以及allTasks中。
这时间dequeueTask()的运行过程为:

示例2:
当前的WorkerOffer信息如下:

在当前的TaskSet中pendingTasksForExecutor中没有针对ExecutorID: 25536
的pending Task。但是在pendingTasksForHost中有一个Task B, 其 locality preference恰好是Hostname: testhost1.corp.com。 根据上文所讲解的pendingTask的添加过程,Task B会同时存在于pendingTasksForHost,pendingTasksForRack以及allTasks中。
这时间dequeueTask()的运行过程为:

示例3:
当前的WorkerOffer信息如下:

在当前的TaskSet中pendingTasksForExecutor中没有针对ExecutorID: 25536
的pending Task。但是在pendingTasksForHost中有一个Task B, 其 locality preference是Hostname: testhost2-rack1.corp.com,即它的locality preference同当前的WorkerOffer不在一个节点但是在一个机架上。 根据上文所讲解的pendingTask的添加过程,Task B会同时存在于pendingTasksForHost,pendingTasksForRack以及allTasks中。
这时间dequeueTask()的运行过程为:

示例4:
当前的WorkerOffer信息如下:

在当前的TaskSet中pendingTasksForExecutor中没有针对ExecutorID: 25536的的pending Task。pendingTasksWithNoPrefs也没有Task。但是在pendingTasksForHost中有一个Task D, 其 locality preference是别的一个Hostname: testhost2-rack2.corp.com(这个testhost2地点的rack为rack2)。 根据上文所讲解的pendingTask的添加过程,Task D会同时存在于pendingTasksForHost,pendingTasksForRack以及allTasks中。
这时间dequeueTask()的运行过程为:

结语

Spark在Yarn上的资源管理是粗粒度的资源管理(Coarse Grained),即资源和Task并不一一对应。ApplicationMaster作为资源请求的署理,充当了细粒度的Task和粗粒度的Yarn Container之间的桥梁,即,根据细粒度的、全量的Task资源需求状态不断生成增量的、粗粒度的资源请求,并将Yarn不断异步返回的资源和当前的Task进行匹配,以最优化Task的放置。
本文全面地讲解了整个资源请求和调度过程。读者从中可以看到基于Yarn的整个资源调度过程,Spark的差别角色之间的通信过程,整个过程虽然是以Yarn为基础,但是其反映的是一个通用的资源调度决定的基本思路,因此很有参考意义。
我以为,感兴趣的读者,可以和Spark on K8S这种细粒度的资源调度过程作比力,将会有更多的收益。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4