YARN-7574. Add support for Node Labels on Auto Created Leaf Queue Template. Contribut...
[hadoop.git] / hadoop-yarn-project / hadoop-yarn / hadoop-yarn-server / hadoop-yarn-server-resourcemanager / src / main / java / org / apache / hadoop / yarn / server / resourcemanager / RMServerUtils.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 * <p>
10 * http://www.apache.org/licenses/LICENSE-2.0
11 * <p>
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.yarn.server.resourcemanager;
20
21 import java.io.IOException;
22 import java.text.ParseException;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.EnumSet;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32
33 import com.google.common.collect.Sets;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.security.AccessControlException;
38 import org.apache.hadoop.security.UserGroupInformation;
39 import org.apache.hadoop.security.authorize.AccessControlList;
40 import org.apache.hadoop.security.authorize.ProxyUsers;
41 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
42 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
43 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
44 import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
45 import org.apache.hadoop.yarn.api.records.ContainerId;
46 import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
47 import org.apache.hadoop.yarn.api.records.ExecutionType;
48 import org.apache.hadoop.yarn.api.records.NodeId;
49 import org.apache.hadoop.yarn.api.records.NodeState;
50 import org.apache.hadoop.yarn.api.records.QueueInfo;
51 import org.apache.hadoop.yarn.api.records.Resource;
52 import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
53 import org.apache.hadoop.yarn.api.records.ResourceRequest;
54 import org.apache.hadoop.yarn.api.records.UpdateContainerError;
55 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
56 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
57 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
58 import org.apache.hadoop.yarn.conf.YarnConfiguration;
59 import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
60 import org.apache.hadoop.yarn.exceptions
61 .InvalidResourceBlacklistRequestException;
62 import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
63 import org.apache.hadoop.yarn.exceptions.YarnException;
64 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
65 import org.apache.hadoop.yarn.factories.RecordFactory;
66 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
67 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
68 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
69 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
70 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
71 .RMAppAttemptState;
72 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
73 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
74 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
75 import org.apache.hadoop.yarn.server.resourcemanager.scheduler
76 .ResourceScheduler;
77 import org.apache.hadoop.yarn.server.resourcemanager.scheduler
78 .SchedContainerChangeRequest;
79 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
80 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
81 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
82 import org.apache.hadoop.yarn.util.Clock;
83 import org.apache.hadoop.yarn.util.SystemClock;
84 import org.apache.hadoop.yarn.util.Times;
85 import org.apache.hadoop.yarn.util.resource.Resources;
86
87 /**
88 * Utility methods to aid serving RM data through the REST and RPC APIs
89 */
90 public class RMServerUtils {
91
92 private static final Log LOG_HANDLE = LogFactory.getLog(RMServerUtils.class);
93
94 public static final String UPDATE_OUTSTANDING_ERROR =
95 "UPDATE_OUTSTANDING_ERROR";
96 private static final String INCORRECT_CONTAINER_VERSION_ERROR =
97 "INCORRECT_CONTAINER_VERSION_ERROR";
98 private static final String INVALID_CONTAINER_ID =
99 "INVALID_CONTAINER_ID";
100 private static final String RESOURCE_OUTSIDE_ALLOWED_RANGE =
101 "RESOURCE_OUTSIDE_ALLOWED_RANGE";
102
103 protected static final RecordFactory RECORD_FACTORY =
104 RecordFactoryProvider.getRecordFactory(null);
105
106 private static Clock clock = SystemClock.getInstance();
107
108 public static List<RMNode> queryRMNodes(RMContext context,
109 EnumSet<NodeState> acceptedStates) {
110 // nodes contains nodes that are NEW, RUNNING, UNHEALTHY or DECOMMISSIONING.
111 ArrayList<RMNode> results = new ArrayList<RMNode>();
112 if (acceptedStates.contains(NodeState.NEW) ||
113 acceptedStates.contains(NodeState.RUNNING) ||
114 acceptedStates.contains(NodeState.DECOMMISSIONING) ||
115 acceptedStates.contains(NodeState.UNHEALTHY)) {
116 for (RMNode rmNode : context.getRMNodes().values()) {
117 if (acceptedStates.contains(rmNode.getState())) {
118 results.add(rmNode);
119 }
120 }
121 }
122
123 // inactiveNodes contains nodes that are DECOMMISSIONED, LOST, OR REBOOTED
124 if (acceptedStates.contains(NodeState.DECOMMISSIONED) ||
125 acceptedStates.contains(NodeState.LOST) ||
126 acceptedStates.contains(NodeState.REBOOTED)) {
127 for (RMNode rmNode : context.getInactiveRMNodes().values()) {
128 if ((rmNode != null) && acceptedStates.contains(rmNode.getState())) {
129 results.add(rmNode);
130 }
131 }
132 }
133 return results;
134 }
135
136 /**
137 * Check if we have:
138 * - Request for same containerId and different target resource.
139 * - If targetResources violates maximum/minimumAllocation.
140 * @param rmContext RM context.
141 * @param request Allocate Request.
142 * @param maximumAllocation Maximum Allocation.
143 * @param updateErrors Container update errors.
144 * @return ContainerUpdateRequests.
145 */
146 public static ContainerUpdates
147 validateAndSplitUpdateResourceRequests(RMContext rmContext,
148 AllocateRequest request, Resource maximumAllocation,
149 List<UpdateContainerError> updateErrors) {
150 ContainerUpdates updateRequests =
151 new ContainerUpdates();
152 Set<ContainerId> outstandingUpdate = new HashSet<>();
153 for (UpdateContainerRequest updateReq : request.getUpdateRequests()) {
154 RMContainer rmContainer = rmContext.getScheduler().getRMContainer(
155 updateReq.getContainerId());
156 String msg = validateContainerIdAndVersion(outstandingUpdate,
157 updateReq, rmContainer);
158 ContainerUpdateType updateType = updateReq.getContainerUpdateType();
159 if (msg == null) {
160 if ((updateType != ContainerUpdateType.PROMOTE_EXECUTION_TYPE) &&
161 (updateType !=ContainerUpdateType.DEMOTE_EXECUTION_TYPE)) {
162 if (validateIncreaseDecreaseRequest(
163 rmContext, updateReq, maximumAllocation)) {
164 if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
165 updateRequests.getIncreaseRequests().add(updateReq);
166 } else {
167 updateRequests.getDecreaseRequests().add(updateReq);
168 }
169 outstandingUpdate.add(updateReq.getContainerId());
170 } else {
171 msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
172 }
173 } else {
174 ExecutionType original = rmContainer.getExecutionType();
175 ExecutionType target = updateReq.getExecutionType();
176 if (target != original) {
177 if (target == ExecutionType.GUARANTEED &&
178 original == ExecutionType.OPPORTUNISTIC) {
179 updateRequests.getPromotionRequests().add(updateReq);
180 outstandingUpdate.add(updateReq.getContainerId());
181 } else if (target == ExecutionType.OPPORTUNISTIC &&
182 original == ExecutionType.GUARANTEED) {
183 updateRequests.getDemotionRequests().add(updateReq);
184 outstandingUpdate.add(updateReq.getContainerId());
185 }
186 }
187 }
188 }
189 checkAndcreateUpdateError(updateErrors, updateReq, rmContainer, msg);
190 }
191 return updateRequests;
192 }
193
194 private static void checkAndcreateUpdateError(
195 List<UpdateContainerError> errors, UpdateContainerRequest updateReq,
196 RMContainer rmContainer, String msg) {
197 if (msg != null) {
198 UpdateContainerError updateError = RECORD_FACTORY
199 .newRecordInstance(UpdateContainerError.class);
200 updateError.setReason(msg);
201 updateError.setUpdateContainerRequest(updateReq);
202 if (rmContainer != null) {
203 updateError.setCurrentContainerVersion(
204 rmContainer.getContainer().getVersion());
205 } else {
206 updateError.setCurrentContainerVersion(-1);
207 }
208 errors.add(updateError);
209 }
210 }
211
212 private static String validateContainerIdAndVersion(
213 Set<ContainerId> outstandingUpdate, UpdateContainerRequest updateReq,
214 RMContainer rmContainer) {
215 String msg = null;
216 if (rmContainer == null) {
217 msg = INVALID_CONTAINER_ID;
218 }
219 // Only allow updates if the requested version matches the current
220 // version
221 if (msg == null && updateReq.getContainerVersion() !=
222 rmContainer.getContainer().getVersion()) {
223 msg = INCORRECT_CONTAINER_VERSION_ERROR;
224 }
225 // No more than 1 container update per request.
226 if (msg == null &&
227 outstandingUpdate.contains(updateReq.getContainerId())) {
228 msg = UPDATE_OUTSTANDING_ERROR;
229 }
230 return msg;
231 }
232
233 /**
234 * Utility method to validate a list resource requests, by insuring that the
235 * requested memory/vcore is non-negative and not greater than max
236 */
237 public static void normalizeAndValidateRequests(List<ResourceRequest> ask,
238 Resource maximumResource, String queueName, YarnScheduler scheduler,
239 RMContext rmContext) throws InvalidResourceRequestException {
240 // Get queue from scheduler
241 QueueInfo queueInfo = null;
242 try {
243 queueInfo = scheduler.getQueueInfo(queueName, false, false);
244 } catch (IOException e) {
245 //Queue may not exist since it could be auto-created in case of
246 // dynamic queues
247 }
248
249 for (ResourceRequest resReq : ask) {
250 SchedulerUtils.normalizeAndvalidateRequest(resReq, maximumResource,
251 queueName, scheduler, rmContext, queueInfo);
252 }
253 }
254
255 /**
256 * Validate increase/decrease request.
257 * <pre>
258 * - Throw exception when any other error happens
259 * </pre>
260 */
261 public static void checkSchedContainerChangeRequest(
262 SchedContainerChangeRequest request, boolean increase)
263 throws InvalidResourceRequestException {
264 RMContext rmContext = request.getRmContext();
265 ContainerId containerId = request.getContainerId();
266 RMContainer rmContainer = request.getRMContainer();
267 Resource targetResource = request.getTargetCapacity();
268
269 // Compare targetResource and original resource
270 Resource originalResource = rmContainer.getAllocatedResource();
271
272 // Resource comparasion should be >= (or <=) for all resource vectors, for
273 // example, you cannot request target resource of a <10G, 10> container to
274 // <20G, 8>
275 if (increase) {
276 if (originalResource.getMemorySize() > targetResource.getMemorySize()
277 || originalResource.getVirtualCores() > targetResource
278 .getVirtualCores()) {
279 String msg =
280 "Trying to increase a container, but target resource has some"
281 + " resource < original resource, target=" + targetResource
282 + " original=" + originalResource + " containerId="
283 + containerId;
284 throw new InvalidResourceRequestException(msg);
285 }
286 } else {
287 if (originalResource.getMemorySize() < targetResource.getMemorySize()
288 || originalResource.getVirtualCores() < targetResource
289 .getVirtualCores()) {
290 String msg =
291 "Trying to decrease a container, but target resource has "
292 + "some resource > original resource, target=" + targetResource
293 + " original=" + originalResource + " containerId="
294 + containerId;
295 throw new InvalidResourceRequestException(msg);
296 }
297 }
298
299 // Target resource of the increase request is more than NM can offer
300 ResourceScheduler scheduler = rmContext.getScheduler();
301 RMNode rmNode = request.getSchedulerNode().getRMNode();
302 if (!Resources.fitsIn(scheduler.getResourceCalculator(), targetResource,
303 rmNode.getTotalCapability())) {
304 String msg = "Target resource=" + targetResource + " of containerId="
305 + containerId + " is more than node's total resource="
306 + rmNode.getTotalCapability();
307 throw new InvalidResourceRequestException(msg);
308 }
309 }
310
311 /*
312 * @throw <code>InvalidResourceBlacklistRequestException </code> if the
313 * resource is not able to be added to the blacklist.
314 */
315 public static void validateBlacklistRequest(
316 ResourceBlacklistRequest blacklistRequest)
317 throws InvalidResourceBlacklistRequestException {
318 if (blacklistRequest != null) {
319 List<String> plus = blacklistRequest.getBlacklistAdditions();
320 if (plus != null && plus.contains(ResourceRequest.ANY)) {
321 throw new InvalidResourceBlacklistRequestException(
322 "Cannot add " + ResourceRequest.ANY + " to the blacklist!");
323 }
324 }
325 }
326
327 // Sanity check and normalize target resource
328 private static boolean validateIncreaseDecreaseRequest(RMContext rmContext,
329 UpdateContainerRequest request, Resource maximumAllocation) {
330 if (request.getCapability().getMemorySize() < 0
331 || request.getCapability().getMemorySize() > maximumAllocation
332 .getMemorySize()) {
333 return false;
334 }
335 if (request.getCapability().getVirtualCores() < 0
336 || request.getCapability().getVirtualCores() > maximumAllocation
337 .getVirtualCores()) {
338 return false;
339 }
340 ResourceScheduler scheduler = rmContext.getScheduler();
341 request.setCapability(scheduler.getNormalizedResource(request.getCapability()));
342 return true;
343 }
344
345 /**
346 * It will validate to make sure all the containers belong to correct
347 * application attempt id. If not then it will throw
348 * {@link InvalidContainerReleaseException}
349 *
350 * @param containerReleaseList containers to be released as requested by
351 * application master.
352 * @param appAttemptId Application attempt Id
353 * @throws InvalidContainerReleaseException
354 */
355 public static void
356 validateContainerReleaseRequest(List<ContainerId> containerReleaseList,
357 ApplicationAttemptId appAttemptId)
358 throws InvalidContainerReleaseException {
359 for (ContainerId cId : containerReleaseList) {
360 if (!appAttemptId.equals(cId.getApplicationAttemptId())) {
361 throw new InvalidContainerReleaseException(
362 "Cannot release container : "
363 + cId.toString()
364 + " not belonging to this application attempt : "
365 + appAttemptId);
366 }
367 }
368 }
369
370 public static UserGroupInformation verifyAdminAccess(
371 YarnAuthorizationProvider authorizer, String method, final Log LOG)
372 throws IOException {
373 // by default, this method will use AdminService as module name
374 return verifyAdminAccess(authorizer, method, "AdminService", LOG);
375 }
376
377 /**
378 * Utility method to verify if the current user has access based on the
379 * passed {@link AccessControlList}
380 *
381 * @param authorizer the {@link AccessControlList} to check against
382 * @param method the method name to be logged
383 * @param module like AdminService or NodeLabelManager
384 * @param LOG the logger to use
385 * @return {@link UserGroupInformation} of the current user
386 * @throws IOException
387 */
388 public static UserGroupInformation verifyAdminAccess(
389 YarnAuthorizationProvider authorizer, String method, String module,
390 final Log LOG)
391 throws IOException {
392 UserGroupInformation user;
393 try {
394 user = UserGroupInformation.getCurrentUser();
395 } catch (IOException ioe) {
396 LOG.warn("Couldn't get current user", ioe);
397 RMAuditLogger.logFailure("UNKNOWN", method, "",
398 "AdminService", "Couldn't get current user");
399 throw ioe;
400 }
401
402 if (!authorizer.isAdmin(user)) {
403 LOG.warn("User " + user.getShortUserName() + " doesn't have permission" +
404 " to call '" + method + "'");
405
406 RMAuditLogger.logFailure(user.getShortUserName(), method, "", module,
407 RMAuditLogger.AuditConstants.UNAUTHORIZED_USER);
408
409 throw new AccessControlException("User " + user.getShortUserName() +
410 " doesn't have permission" +
411 " to call '" + method + "'");
412 }
413 if (LOG.isTraceEnabled()) {
414 LOG.trace(method + " invoked by user " + user.getShortUserName());
415 }
416 return user;
417 }
418
419 public static YarnApplicationState createApplicationState(
420 RMAppState rmAppState) {
421 switch (rmAppState) {
422 case NEW:
423 return YarnApplicationState.NEW;
424 case NEW_SAVING:
425 return YarnApplicationState.NEW_SAVING;
426 case SUBMITTED:
427 return YarnApplicationState.SUBMITTED;
428 case ACCEPTED:
429 return YarnApplicationState.ACCEPTED;
430 case RUNNING:
431 return YarnApplicationState.RUNNING;
432 case FINISHING:
433 case FINISHED:
434 return YarnApplicationState.FINISHED;
435 case KILLING:
436 case KILLED:
437 return YarnApplicationState.KILLED;
438 case FAILED:
439 return YarnApplicationState.FAILED;
440 default:
441 throw new YarnRuntimeException("Unknown state passed!");
442 }
443 }
444
445 public static YarnApplicationAttemptState createApplicationAttemptState(
446 RMAppAttemptState rmAppAttemptState) {
447 switch (rmAppAttemptState) {
448 case NEW:
449 return YarnApplicationAttemptState.NEW;
450 case SUBMITTED:
451 return YarnApplicationAttemptState.SUBMITTED;
452 case SCHEDULED:
453 return YarnApplicationAttemptState.SCHEDULED;
454 case ALLOCATED:
455 return YarnApplicationAttemptState.ALLOCATED;
456 case LAUNCHED:
457 return YarnApplicationAttemptState.LAUNCHED;
458 case ALLOCATED_SAVING:
459 case LAUNCHED_UNMANAGED_SAVING:
460 return YarnApplicationAttemptState.ALLOCATED_SAVING;
461 case RUNNING:
462 return YarnApplicationAttemptState.RUNNING;
463 case FINISHING:
464 return YarnApplicationAttemptState.FINISHING;
465 case FINISHED:
466 return YarnApplicationAttemptState.FINISHED;
467 case KILLED:
468 return YarnApplicationAttemptState.KILLED;
469 case FAILED:
470 return YarnApplicationAttemptState.FAILED;
471 default:
472 throw new YarnRuntimeException("Unknown state passed!");
473 }
474 }
475
476 /**
477 * Statically defined dummy ApplicationResourceUsageREport. Used as
478 * a return value when a valid report cannot be found.
479 */
480 public static final ApplicationResourceUsageReport
481 DUMMY_APPLICATION_RESOURCE_USAGE_REPORT =
482 BuilderUtils.newApplicationResourceUsageReport(-1, -1,
483 Resources.createResource(-1, -1), Resources.createResource(-1, -1),
484 Resources.createResource(-1, -1), new HashMap<>(), new HashMap<>());
485
486
487 /**
488 * Find all configs whose name starts with
489 * YarnConfiguration.RM_PROXY_USER_PREFIX, and add a record for each one by
490 * replacing the prefix with ProxyUsers.CONF_HADOOP_PROXYUSER
491 */
492 public static void processRMProxyUsersConf(Configuration conf) {
493 Map<String, String> rmProxyUsers = new HashMap<String, String>();
494 for (Map.Entry<String, String> entry : conf) {
495 String propName = entry.getKey();
496 if (propName.startsWith(YarnConfiguration.RM_PROXY_USER_PREFIX)) {
497 rmProxyUsers.put(ProxyUsers.CONF_HADOOP_PROXYUSER + "." +
498 propName.substring(YarnConfiguration.RM_PROXY_USER_PREFIX
499 .length()),
500 entry.getValue());
501 }
502 }
503 for (Map.Entry<String, String> entry : rmProxyUsers.entrySet()) {
504 conf.set(entry.getKey(), entry.getValue());
505 }
506 }
507
508 public static void validateApplicationTimeouts(
509 Map<ApplicationTimeoutType, Long> timeouts) throws YarnException {
510 if (timeouts != null) {
511 for (Map.Entry<ApplicationTimeoutType, Long> timeout : timeouts
512 .entrySet()) {
513 if (timeout.getValue() <= 0) {
514 String message = "Invalid application timeout, value="
515 + timeout.getValue() + " for type=" + timeout.getKey();
516 throw new YarnException(message);
517 }
518 }
519 }
520 }
521
522 /**
523 * Validate ISO8601 format with epoch time.
524 * @param timeoutsInISO8601 format
525 * @return expire time in local epoch
526 * @throws YarnException if given application timeout value is lesser than
527 * current time.
528 */
529 public static Map<ApplicationTimeoutType, Long> validateISO8601AndConvertToLocalTimeEpoch(
530 Map<ApplicationTimeoutType, String> timeoutsInISO8601)
531 throws YarnException {
532 long currentTimeMillis = clock.getTime();
533 Map<ApplicationTimeoutType, Long> newApplicationTimeout =
534 new HashMap<ApplicationTimeoutType, Long>();
535 if (timeoutsInISO8601 != null) {
536 for (Map.Entry<ApplicationTimeoutType, String> timeout : timeoutsInISO8601
537 .entrySet()) {
538 long expireTime = 0L;
539 try {
540 expireTime =
541 Times.parseISO8601ToLocalTimeInMillis(timeout.getValue());
542 } catch (ParseException ex) {
543 String message =
544 "Expire time is not in ISO8601 format. ISO8601 supported "
545 + "format is yyyy-MM-dd'T'HH:mm:ss.SSSZ. Configured "
546 + "timeout value is " + timeout.getValue();
547 throw new YarnException(message, ex);
548 }
549 if (expireTime < currentTimeMillis) {
550 String message =
551 "Expire time is less than current time, current-time="
552 + Times.formatISO8601(currentTimeMillis) + " expire-time="
553 + Times.formatISO8601(expireTime);
554 throw new YarnException(message);
555 }
556 newApplicationTimeout.put(timeout.getKey(), expireTime);
557 }
558 }
559 return newApplicationTimeout;
560 }
561
562 /**
563 * Get applicable Node count for AM.
564 *
565 * @param rmContext context
566 * @param conf configuration
567 * @param amReqs am resource requests
568 * @return applicable node count
569 */
570 public static int getApplicableNodeCountForAM(RMContext rmContext,
571 Configuration conf, List<ResourceRequest> amReqs) {
572 // Determine the list of nodes that are eligible based on the strict
573 // resource requests
574 Set<NodeId> nodesForReqs = new HashSet<>();
575 for (ResourceRequest amReq : amReqs) {
576 if (amReq.getRelaxLocality() &&
577 !amReq.getResourceName().equals(ResourceRequest.ANY)) {
578 nodesForReqs.addAll(
579 rmContext.getScheduler().getNodeIds(amReq.getResourceName()));
580 }
581 }
582
583 if (YarnConfiguration.areNodeLabelsEnabled(conf)) {
584 // Determine the list of nodes that are eligible based on the node label
585 String amNodeLabelExpression = amReqs.get(0).getNodeLabelExpression();
586 Set<NodeId> nodesForLabels =
587 getNodeIdsForLabel(rmContext, amNodeLabelExpression);
588 if (nodesForLabels != null && !nodesForLabels.isEmpty()) {
589 // If only node labels, strip out any wildcard NodeIds and return
590 if (nodesForReqs.isEmpty()) {
591 for (Iterator<NodeId> it = nodesForLabels.iterator(); it.hasNext();) {
592 if (it.next().getPort() == 0) {
593 it.remove();
594 }
595 }
596 return nodesForLabels.size();
597 } else {
598 // The NodeIds common to both the strict resource requests and the
599 // node label is the eligible set
600 return Sets.intersection(nodesForReqs, nodesForLabels).size();
601 }
602 }
603 }
604
605 // If no strict resource request NodeIds nor node label NodeIds, then just
606 // return the entire cluster
607 if (nodesForReqs.isEmpty()) {
608 return rmContext.getScheduler().getNumClusterNodes();
609 }
610 // No node label NodeIds, so return the strict resource request NodeIds
611 return nodesForReqs.size();
612 }
613
614 private static Set<NodeId> getNodeIdsForLabel(RMContext rmContext,
615 String label) {
616 label = (label == null || label.trim().isEmpty())
617 ? RMNodeLabelsManager.NO_LABEL : label;
618 if (label.equals(RMNodeLabelsManager.NO_LABEL)) {
619 // NO_LABEL nodes aren't tracked directly
620 return rmContext.getNodeLabelManager().getNodesWithoutALabel();
621 } else {
622 Map<String, Set<NodeId>> labelsToNodes =
623 rmContext.getNodeLabelManager().getLabelsToNodes(
624 Collections.singleton(label));
625 return labelsToNodes.get(label);
626 }
627 }
628
629 public static Long getOrDefault(Map<String, Long> map, String key,
630 Long defaultValue) {
631 if (map.containsKey(key)) {
632 return map.get(key);
633 }
634 return defaultValue;
635 }
636 }