维度转换: Driver发过来的资源请求是资源的最终全局状态,而Yarn 的 API 要求的针对每一个Container进行增量请求。因此,allocateResources()会将Driver发送过来资源请求的最终状态,对比当前系统已经运行、分配未运行、已经发送请求但是还没有分配资源等等已经存在的状态,确定一个发送给Yarn的增量请求状态。这是一个全量到增量的维度的转换。
: Unit = { // 获取已经发送给Yarn但是待分配的ContainerRequest,计算待分配容器请求的数量 // 这些ContainerRequest是之前通过调用amClient.addContainerRequest 发送出去的 val pendingAllocate = getPendingAllocate val numPendingAllocate = pendingAllocate.size // 还没有发送请求的executor的数量 val missing = targetNumExecutors - numPendingAllocate - numExecutorsStarting.get - numExecutorsRunning.get // 还没有发送给Yarn的资源请求 if (missing > 0) { /** * 将待处置惩罚的container请求分为三组:本地匹配列表、本地不匹配列表和非本地列表。 */ val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
hostToLocalTaskCounts, pendingAllocate)
// staleRequests 的意思是,ApplicationMaster已经请求了这个Container, // 但是这个ContainerRequest所要求的hosts里面没有一个是在 hostToLocalTaskCounts (即task所倾向于)中的,因此,必要取消这个Container Request,由于已经没故意义了 // cancel "stale" requests for locations that are no longer needed staleRequests.foreach { stale => amClient.removeContainerRequest(stale) } val cancelledContainers = staleRequests.size // consider the number of new containers and cancelled stale containers available // 将新的container请求,以及刚刚取消的container,作为available container val availableContainers = missing + cancelledContainers // to maximize locality, include requests with no locality preference that can be cancelled // 在availableContainers的基础上,再算上没有任何locality要求的并且还没有分配成功的container val potentialContainers = availableContainers + anyHostRequests.size // LocalityPreferredContainerPlacementStrategy,计算每一个Container 的Node locality和 Rack locality val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers( potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts, allocatedHostToContainersMap, localRequests) val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest] // 遍历ContainerLocalityPreferences数组中的每一个ContainerLocalityPreferences containerLocalityPreferences.foreach { case ContainerLocalityPreferences(nodes, racks) if nodes != null => newLocalityRequests += createContainerRequest(resource, nodes, racks)// 根据获取的locality,重新创建ContainerRequest请求 } // 除了有locality需求的container以外,还有更多的available container必要被请求,因此对这些container请求也发送出去 if (availableContainers >= newLocalityRequests.size) { // more containers are available than needed for locality, fill in requests for any host for (i <- 0 until (availableContainers - newLocalityRequests.size)) { newLocalityRequests += createContainerRequest(resource, null, null) // 构造ContainerRequest对象 } } else { val numToCancel = newLocalityRequests.size - availableContainers // cancel some requests without locality preferences to schedule more local containers anyHostRequests.slice(0, numToCancel).foreach { nonLocal => amClient.removeContainerRequest(nonLocal) } } } else if (numPendingAllocate > 0 && missing < 0) { val numToCancel = math.min(numPendingAllocate, -missing) val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource) matchingRequests.iterator().next().asScala .take(numToCancel).foreach(amClient.removeContainerRequest) } }
假设一个Task必要的vCore是1,而一个Container(Executor)有2个vCore,因此,转换成Container以后的结果如下表所示:
Host 1Host 2Host 3Host 420 Tasks20202010 Tasks101010Sum of Tasks30302010Sum of Containers1515105上面的Sum of Containers的数字只是表现一个比例值,并不表现对应的Host上实际必要申请的Container的数量,我们实际必要的总的Container数量才15个。那么,这15个Container需求均匀到每台Host上是多少呢?
比如Host 1的Sum of Container 为15, 所有Host的Sum of Container 是45,因此占比是1/3,所以均匀下来分配到Host1上的Container数量应该是 15 * 1/3 = 5。经过向上取整(宁可轻微多分配也不要少分配)以后,每台机器所均匀到的15个Container需求是:
Host 1Host 2Host 3Host 420 Tasks20202010 Tasks101010Sum of Tasks30302010Sum of Containers1515105Allocated Container Target5542
在这里,计算完总的Allocated Container Target以后,必要减去当前已经在该Host上已经存在(正在运行或者在这个Host上pending的Container),由于我们最终发送给Yarn的Container请求是增量请求。假设现在在每一个Host上已经存在的Container数量都是1,即15个Container中有4个Container是已经分配的,那么,减去已经存在的Container数量以后的结果如下表所示,所以,我们必要新申请12个Container:
Host 1Host 2Host 3Host 420 Tasks20202010 Tasks101010Sum of Tasks30302010Sum of Containers1515126Allocated Container Target5542Newly Allocated Container4431
这里的计算,就是完成下表中从Sum of Tasks(每个机器上分配到的Task的比例) 到 Sum of Containers (每个机器上分配的Container的比例)的转换,然后根据Sum of Containers 减去每台机器上已经分配的Container,就得到了Average Allocated Container Total(每台机器上应该新分配的Container的数量):
Host 1Host 2Host 3Host 420 Tasks20202010 Tasks101010Sum of Tasks30302010Sum of Containers1515126Allocated Container Target5542Newly-Allocated Container4431