YARN-7574. Add support for Node Labels on Auto Created Leaf Queue Template. Contribut...
[hadoop.git] / hadoop-yarn-project / hadoop-yarn / hadoop-yarn-server / hadoop-yarn-server-resourcemanager / src / main / java / org / apache / hadoop / yarn / server / resourcemanager / scheduler / capacity / CapacitySchedulerConfiguration.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
20
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.collect.ImmutableSet;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.classification.InterfaceAudience.Private;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.security.authorize.AccessControlList;
28 import org.apache.hadoop.util.ReflectionUtils;
29 import org.apache.hadoop.util.StringUtils;
30 import org.apache.hadoop.yarn.api.records.Priority;
31 import org.apache.hadoop.yarn.api.records.QueueACL;
32 import org.apache.hadoop.yarn.api.records.QueueState;
33 import org.apache.hadoop.yarn.api.records.ReservationACL;
34 import org.apache.hadoop.yarn.api.records.Resource;
35 import org.apache.hadoop.yarn.api.records.ResourceInformation;
36 import org.apache.hadoop.yarn.conf.YarnConfiguration;
37 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
38 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
39 import org.apache.hadoop.yarn.security.AccessType;
40 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
41 import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMappingEntity;
42 import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
43 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
44 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
45 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType;
46 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
47 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
48 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
49 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
50 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
51 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
52 import org.apache.hadoop.yarn.util.UnitsConversionUtil;
53 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
54 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
55 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
56 import org.apache.hadoop.yarn.util.resource.Resources;
57
58 import java.util.ArrayList;
59 import java.util.Collection;
60 import java.util.Collections;
61 import java.util.HashMap;
62 import java.util.HashSet;
63 import java.util.Iterator;
64 import java.util.List;
65 import java.util.Map;
66 import java.util.Map.Entry;
67 import java.util.regex.Matcher;
68 import java.util.regex.Pattern;
69 import java.util.Set;
70 import java.util.StringTokenizer;
71
72 public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration {
73
74 private static final Log LOG =
75 LogFactory.getLog(CapacitySchedulerConfiguration.class);
76
77 private static final String CS_CONFIGURATION_FILE = "capacity-scheduler.xml";
78
79 @Private
80 public static final String PREFIX = "yarn.scheduler.capacity.";
81
82 @Private
83 public static final String DOT = ".";
84
85 @Private
86 public static final String MAXIMUM_APPLICATIONS_SUFFIX =
87 "maximum-applications";
88
89 @Private
90 public static final String MAXIMUM_SYSTEM_APPLICATIONS =
91 PREFIX + MAXIMUM_APPLICATIONS_SUFFIX;
92
93 @Private
94 public static final String MAXIMUM_AM_RESOURCE_SUFFIX =
95 "maximum-am-resource-percent";
96
97 @Private
98 public static final String MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT =
99 PREFIX + MAXIMUM_AM_RESOURCE_SUFFIX;
100
101 @Private
102 public static final String QUEUES = "queues";
103
104 @Private
105 public static final String CAPACITY = "capacity";
106
107 @Private
108 public static final String MAXIMUM_CAPACITY = "maximum-capacity";
109
110 @Private
111 public static final String USER_LIMIT = "minimum-user-limit-percent";
112
113 @Private
114 public static final String USER_LIMIT_FACTOR = "user-limit-factor";
115
116 @Private
117 public static final String USER_WEIGHT = "weight";
118
119 @Private
120 public static final String USER_SETTINGS = "user-settings";
121
122 @Private
123 public static final float DEFAULT_USER_WEIGHT = 1.0f;
124
125 @Private
126 public static final String STATE = "state";
127
128 @Private
129 public static final String ACCESSIBLE_NODE_LABELS = "accessible-node-labels";
130
131 @Private
132 public static final String DEFAULT_NODE_LABEL_EXPRESSION =
133 "default-node-label-expression";
134
135 public static final String RESERVE_CONT_LOOK_ALL_NODES = PREFIX
136 + "reservations-continue-look-all-nodes";
137
138 @Private
139 public static final boolean DEFAULT_RESERVE_CONT_LOOK_ALL_NODES = true;
140
141 @Private
142 public static final String MAXIMUM_ALLOCATION_MB = "maximum-allocation-mb";
143
144 @Private
145 public static final String MAXIMUM_ALLOCATION_VCORES =
146 "maximum-allocation-vcores";
147
148 /**
149 * Ordering policy of queues
150 */
151 public static final String ORDERING_POLICY = "ordering-policy";
152
153 /*
154 * Ordering policy inside a leaf queue to sort apps
155 */
156 public static final String FIFO_APP_ORDERING_POLICY = "fifo";
157
158 public static final String FAIR_APP_ORDERING_POLICY = "fair";
159
160 public static final String DEFAULT_APP_ORDERING_POLICY =
161 FIFO_APP_ORDERING_POLICY;
162
163 @Private
164 public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
165
166 @Private
167 public static final float
168 DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT = 0.1f;
169
170 @Private
171 public static final float UNDEFINED = -1;
172
173 @Private
174 public static final float MINIMUM_CAPACITY_VALUE = 0;
175
176 @Private
177 public static final float MAXIMUM_CAPACITY_VALUE = 100;
178
179 @Private
180 public static final float DEFAULT_MAXIMUM_CAPACITY_VALUE = -1.0f;
181
182 @Private
183 public static final int DEFAULT_USER_LIMIT = 100;
184
185 @Private
186 public static final float DEFAULT_USER_LIMIT_FACTOR = 1.0f;
187
188 @Private
189 public static final String ALL_ACL = "*";
190
191 @Private
192 public static final String NONE_ACL = " ";
193
194 @Private public static final String ENABLE_USER_METRICS =
195 PREFIX +"user-metrics.enable";
196 @Private public static final boolean DEFAULT_ENABLE_USER_METRICS = false;
197
198 /** ResourceComparator for scheduling. */
199 @Private public static final String RESOURCE_CALCULATOR_CLASS =
200 PREFIX + "resource-calculator";
201
202 @Private public static final Class<? extends ResourceCalculator>
203 DEFAULT_RESOURCE_CALCULATOR_CLASS = DefaultResourceCalculator.class;
204
205 @Private
206 public static final String ROOT = "root";
207
208 @Private
209 public static final String NODE_LOCALITY_DELAY =
210 PREFIX + "node-locality-delay";
211
212 @Private
213 public static final int DEFAULT_NODE_LOCALITY_DELAY = 40;
214
215 @Private
216 public static final String RACK_LOCALITY_ADDITIONAL_DELAY =
217 PREFIX + "rack-locality-additional-delay";
218
219 @Private
220 public static final int DEFAULT_RACK_LOCALITY_ADDITIONAL_DELAY = -1;
221
222 @Private
223 public static final String RACK_LOCALITY_FULL_RESET =
224 PREFIX + "rack-locality-full-reset";
225
226 @Private
227 public static final int DEFAULT_OFFSWITCH_PER_HEARTBEAT_LIMIT = 1;
228
229 @Private
230 public static final String OFFSWITCH_PER_HEARTBEAT_LIMIT =
231 PREFIX + "per-node-heartbeat.maximum-offswitch-assignments";
232
233 @Private
234 public static final boolean DEFAULT_RACK_LOCALITY_FULL_RESET = true;
235
236 @Private
237 public static final String SCHEDULE_ASYNCHRONOUSLY_PREFIX =
238 PREFIX + "schedule-asynchronously";
239
240 @Private
241 public static final String SCHEDULE_ASYNCHRONOUSLY_ENABLE =
242 SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".enable";
243
244 @Private
245 public static final String SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD =
246 SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".maximum-threads";
247
248 @Private
249 public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false;
250
251 @Private
252 public static final String QUEUE_MAPPING = PREFIX + "queue-mappings";
253
254 @Private
255 public static final String ENABLE_QUEUE_MAPPING_OVERRIDE = QUEUE_MAPPING + "-override.enable";
256
257 @Private
258 public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false;
259
260 @Private
261 public static final String QUEUE_PREEMPTION_DISABLED = "disable_preemption";
262
263 @Private
264 public static final String DEFAULT_APPLICATION_PRIORITY = "default-application-priority";
265
266 @Private
267 public static final Integer DEFAULT_CONFIGURATION_APPLICATION_PRIORITY = 0;
268
269 @Private
270 public static final String AVERAGE_CAPACITY = "average-capacity";
271
272 @Private
273 public static final String IS_RESERVABLE = "reservable";
274
275 @Private
276 public static final String RESERVATION_WINDOW = "reservation-window";
277
278 @Private
279 public static final String INSTANTANEOUS_MAX_CAPACITY =
280 "instantaneous-max-capacity";
281
282 @Private
283 public static final String RESERVATION_ADMISSION_POLICY =
284 "reservation-policy";
285
286 @Private
287 public static final String RESERVATION_AGENT_NAME = "reservation-agent";
288
289 @Private
290 public static final String RESERVATION_SHOW_RESERVATION_AS_QUEUE =
291 "show-reservations-as-queues";
292
293 @Private
294 public static final String RESERVATION_PLANNER_NAME = "reservation-planner";
295
296 @Private
297 public static final String RESERVATION_MOVE_ON_EXPIRY =
298 "reservation-move-on-expiry";
299
300 @Private
301 public static final String RESERVATION_ENFORCEMENT_WINDOW =
302 "reservation-enforcement-window";
303
304 @Private
305 public static final String LAZY_PREEMPTION_ENABLED =
306 PREFIX + "lazy-preemption-enabled";
307
308 @Private
309 public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false;
310
311 @Private
312 public static final String ASSIGN_MULTIPLE_ENABLED = PREFIX
313 + "per-node-heartbeat.multiple-assignments-enabled";
314
315 @Private
316 public static final boolean DEFAULT_ASSIGN_MULTIPLE_ENABLED = true;
317
318 /** Maximum number of containers to assign on each check-in. */
319 @Private
320 public static final String MAX_ASSIGN_PER_HEARTBEAT = PREFIX
321 + "per-node-heartbeat.maximum-container-assignments";
322
323 @Private
324 public static final int DEFAULT_MAX_ASSIGN_PER_HEARTBEAT = -1;
325
326 /** Configuring absolute min/max resources in a queue. **/
327 @Private
328 public static final String MINIMUM_RESOURCE = "min-resource";
329
330 @Private
331 public static final String MAXIMUM_RESOURCE = "max-resource";
332
333 public static final String DEFAULT_RESOURCE_TYPES = "memory,vcores";
334
335 public static final String PATTERN_FOR_ABSOLUTE_RESOURCE = "^\\[[\\w\\.,\\-_=\\ /]+\\]$";
336
337 private static final Pattern RESOURCE_PATTERN = Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE);
338
339 /**
340 * Different resource types supported.
341 */
342 public enum AbsoluteResourceType {
343 MEMORY, VCORES;
344 }
345
346 AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser();
347
348 public CapacitySchedulerConfiguration() {
349 this(new Configuration());
350 }
351
352 public CapacitySchedulerConfiguration(Configuration configuration) {
353 this(configuration, true);
354 }
355
356 public CapacitySchedulerConfiguration(Configuration configuration,
357 boolean useLocalConfigurationProvider) {
358 super(configuration);
359 if (useLocalConfigurationProvider) {
360 addResource(CS_CONFIGURATION_FILE);
361 }
362 }
363
364 static String getQueuePrefix(String queue) {
365 String queueName = PREFIX + queue + DOT;
366 return queueName;
367 }
368
369 static String getQueueOrderingPolicyPrefix(String queue) {
370 String queueName = PREFIX + queue + DOT + ORDERING_POLICY + DOT;
371 return queueName;
372 }
373
374 private String getNodeLabelPrefix(String queue, String label) {
375 if (label.equals(CommonNodeLabelsManager.NO_LABEL)) {
376 return getQueuePrefix(queue);
377 }
378 return getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label + DOT;
379 }
380
381 public int getMaximumSystemApplications() {
382 int maxApplications =
383 getInt(MAXIMUM_SYSTEM_APPLICATIONS, DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS);
384 return maxApplications;
385 }
386
387 public float getMaximumApplicationMasterResourcePercent() {
388 return getFloat(MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
389 DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT);
390 }
391
392
393 /**
394 * Get the maximum applications per queue setting.
395 * @param queue name of the queue
396 * @return setting specified or -1 if not set
397 */
398 public int getMaximumApplicationsPerQueue(String queue) {
399 int maxApplicationsPerQueue =
400 getInt(getQueuePrefix(queue) + MAXIMUM_APPLICATIONS_SUFFIX,
401 (int)UNDEFINED);
402 return maxApplicationsPerQueue;
403 }
404
405 /**
406 * Get the maximum am resource percent per queue setting.
407 * @param queue name of the queue
408 * @return per queue setting or defaults to the global am-resource-percent
409 * setting if per queue setting not present
410 */
411 public float getMaximumApplicationMasterResourcePerQueuePercent(String queue) {
412 return getFloat(getQueuePrefix(queue) + MAXIMUM_AM_RESOURCE_SUFFIX,
413 getMaximumApplicationMasterResourcePercent());
414 }
415
416 public void setMaximumApplicationMasterResourcePerQueuePercent(String queue,
417 float percent) {
418 setFloat(getQueuePrefix(queue) + MAXIMUM_AM_RESOURCE_SUFFIX, percent);
419 }
420
421 public float getNonLabeledQueueCapacity(String queue) {
422 String configuredCapacity = get(getQueuePrefix(queue) + CAPACITY);
423 boolean matcher = (configuredCapacity != null)
424 && RESOURCE_PATTERN.matcher(configuredCapacity).find();
425 if (matcher) {
426 // Return capacity in percentage as 0 for non-root queues and 100 for
427 // root.From AbstractCSQueue, absolute resource will be parsed and
428 // updated. Once nodes are added/removed in cluster, capacity in
429 // percentage will also be re-calculated.
430 return queue.equals("root") ? 100.0f : 0f;
431 }
432
433 float capacity = queue.equals("root")
434 ? 100.0f
435 : (configuredCapacity == null)
436 ? 0f
437 : Float.parseFloat(configuredCapacity);
438 if (capacity < MINIMUM_CAPACITY_VALUE
439 || capacity > MAXIMUM_CAPACITY_VALUE) {
440 throw new IllegalArgumentException(
441 "Illegal " + "capacity of " + capacity + " for queue " + queue);
442 }
443 LOG.debug("CSConf - getCapacity: queuePrefix=" + getQueuePrefix(queue)
444 + ", capacity=" + capacity);
445 return capacity;
446 }
447
448 public void setCapacity(String queue, float capacity) {
449 if (queue.equals("root")) {
450 throw new IllegalArgumentException(
451 "Cannot set capacity, root queue has a fixed capacity of 100.0f");
452 }
453 setFloat(getQueuePrefix(queue) + CAPACITY, capacity);
454 LOG.debug("CSConf - setCapacity: queuePrefix=" + getQueuePrefix(queue) +
455 ", capacity=" + capacity);
456 }
457
458 public float getNonLabeledQueueMaximumCapacity(String queue) {
459 String configuredCapacity = get(getQueuePrefix(queue) + MAXIMUM_CAPACITY);
460 boolean matcher = (configuredCapacity != null)
461 && RESOURCE_PATTERN.matcher(configuredCapacity).find();
462 if (matcher) {
463 // Return capacity in percentage as 0 for non-root queues and 100 for
464 // root.From AbstractCSQueue, absolute resource will be parsed and
465 // updated. Once nodes are added/removed in cluster, capacity in
466 // percentage will also be re-calculated.
467 return 100.0f;
468 }
469
470 float maxCapacity = (configuredCapacity == null)
471 ? MAXIMUM_CAPACITY_VALUE
472 : Float.parseFloat(configuredCapacity);
473 maxCapacity = (maxCapacity == DEFAULT_MAXIMUM_CAPACITY_VALUE)
474 ? MAXIMUM_CAPACITY_VALUE
475 : maxCapacity;
476 return maxCapacity;
477 }
478
479 public void setMaximumCapacity(String queue, float maxCapacity) {
480 if (maxCapacity > MAXIMUM_CAPACITY_VALUE) {
481 throw new IllegalArgumentException("Illegal " +
482 "maximum-capacity of " + maxCapacity + " for queue " + queue);
483 }
484 setFloat(getQueuePrefix(queue) + MAXIMUM_CAPACITY, maxCapacity);
485 LOG.debug("CSConf - setMaxCapacity: queuePrefix=" + getQueuePrefix(queue) +
486 ", maxCapacity=" + maxCapacity);
487 }
488
489 public void setCapacityByLabel(String queue, String label, float capacity) {
490 setFloat(getNodeLabelPrefix(queue, label) + CAPACITY, capacity);
491 }
492
493 public void setMaximumCapacityByLabel(String queue, String label,
494 float capacity) {
495 setFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY, capacity);
496 }
497
498 public int getUserLimit(String queue) {
499 int userLimit = getInt(getQueuePrefix(queue) + USER_LIMIT,
500 DEFAULT_USER_LIMIT);
501 return userLimit;
502 }
503
504 // TODO (wangda): We need to better distinguish app ordering policy and queue
505 // ordering policy's classname / configuration options, etc. And dedup code
506 // if possible.
507 @SuppressWarnings("unchecked")
508 public <S extends SchedulableEntity> OrderingPolicy<S> getAppOrderingPolicy(
509 String queue) {
510
511 String policyType = get(getQueuePrefix(queue) + ORDERING_POLICY,
512 DEFAULT_APP_ORDERING_POLICY);
513
514 OrderingPolicy<S> orderingPolicy;
515
516 if (policyType.trim().equals(FIFO_APP_ORDERING_POLICY)) {
517 policyType = FifoOrderingPolicy.class.getName();
518 }
519 if (policyType.trim().equals(FAIR_APP_ORDERING_POLICY)) {
520 policyType = FairOrderingPolicy.class.getName();
521 }
522 try {
523 orderingPolicy = (OrderingPolicy<S>)
524 Class.forName(policyType).newInstance();
525 } catch (Exception e) {
526 String message = "Unable to construct ordering policy for: " + policyType + ", " + e.getMessage();
527 throw new RuntimeException(message, e);
528 }
529
530 Map<String, String> config = new HashMap<String, String>();
531 String confPrefix = getQueuePrefix(queue) + ORDERING_POLICY + ".";
532 for (Map.Entry<String, String> kv : this) {
533 if (kv.getKey().startsWith(confPrefix)) {
534 config.put(kv.getKey().substring(confPrefix.length()), kv.getValue());
535 }
536 }
537 orderingPolicy.configure(config);
538 return orderingPolicy;
539 }
540
541 public void setUserLimit(String queue, int userLimit) {
542 setInt(getQueuePrefix(queue) + USER_LIMIT, userLimit);
543 LOG.debug("here setUserLimit: queuePrefix=" + getQueuePrefix(queue) +
544 ", userLimit=" + getUserLimit(queue));
545 }
546
547 public float getUserLimitFactor(String queue) {
548 float userLimitFactor =
549 getFloat(getQueuePrefix(queue) + USER_LIMIT_FACTOR,
550 DEFAULT_USER_LIMIT_FACTOR);
551 return userLimitFactor;
552 }
553
554 public void setUserLimitFactor(String queue, float userLimitFactor) {
555 setFloat(getQueuePrefix(queue) + USER_LIMIT_FACTOR, userLimitFactor);
556 }
557
558 public QueueState getConfiguredState(String queue) {
559 String state = get(getQueuePrefix(queue) + STATE);
560 if (state == null) {
561 return null;
562 } else {
563 return QueueState.valueOf(StringUtils.toUpperCase(state));
564 }
565 }
566
567 public QueueState getState(String queue) {
568 QueueState state = getConfiguredState(queue);
569 return (state == null) ? QueueState.RUNNING : state;
570 }
571
572 @Private
573 @VisibleForTesting
574 public void setState(String queue, QueueState state) {
575 set(getQueuePrefix(queue) + STATE, state.name());
576 }
577
578 public void setAccessibleNodeLabels(String queue, Set<String> labels) {
579 if (labels == null) {
580 return;
581 }
582 String str = StringUtils.join(",", labels);
583 set(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS, str);
584 }
585
586 public Set<String> getAccessibleNodeLabels(String queue) {
587 String accessibleLabelStr =
588 get(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS);
589
590 // When accessible-label is null,
591 if (accessibleLabelStr == null) {
592 // Only return null when queue is not ROOT
593 if (!queue.equals(ROOT)) {
594 return null;
595 }
596 } else {
597 // print a warning when accessibleNodeLabel specified in config and queue
598 // is ROOT
599 if (queue.equals(ROOT)) {
600 LOG.warn("Accessible node labels for root queue will be ignored,"
601 + " it will be automatically set to \"*\".");
602 }
603 }
604
605 // always return ANY for queue root
606 if (queue.equals(ROOT)) {
607 return ImmutableSet.of(RMNodeLabelsManager.ANY);
608 }
609
610 // In other cases, split the accessibleLabelStr by ","
611 Set<String> set = new HashSet<String>();
612 for (String str : accessibleLabelStr.split(",")) {
613 if (!str.trim().isEmpty()) {
614 set.add(str.trim());
615 }
616 }
617
618 // if labels contains "*", only keep ANY behind
619 if (set.contains(RMNodeLabelsManager.ANY)) {
620 set.clear();
621 set.add(RMNodeLabelsManager.ANY);
622 }
623 return Collections.unmodifiableSet(set);
624 }
625
626 private float internalGetLabeledQueueCapacity(String queue, String label, String suffix,
627 float defaultValue) {
628 String capacityPropertyName = getNodeLabelPrefix(queue, label) + suffix;
629 boolean matcher = (capacityPropertyName != null)
630 && RESOURCE_PATTERN.matcher(capacityPropertyName).find();
631 if (matcher) {
632 // Return capacity in percentage as 0 for non-root queues and 100 for
633 // root.From AbstractCSQueue, absolute resource will be parsed and
634 // updated. Once nodes are added/removed in cluster, capacity in
635 // percentage will also be re-calculated.
636 return defaultValue;
637 }
638
639 float capacity = getFloat(capacityPropertyName, defaultValue);
640 if (capacity < MINIMUM_CAPACITY_VALUE
641 || capacity > MAXIMUM_CAPACITY_VALUE) {
642 throw new IllegalArgumentException("Illegal capacity of " + capacity
643 + " for node-label=" + label + " in queue=" + queue
644 + ", valid capacity should in range of [0, 100].");
645 }
646 if (LOG.isDebugEnabled()) {
647 LOG.debug("CSConf - getCapacityOfLabel: prefix="
648 + getNodeLabelPrefix(queue, label) + ", capacity=" + capacity);
649 }
650 return capacity;
651 }
652
653 public float getLabeledQueueCapacity(String queue, String label) {
654 return internalGetLabeledQueueCapacity(queue, label, CAPACITY, 0f);
655 }
656
657 public float getLabeledQueueMaximumCapacity(String queue, String label) {
658 return internalGetLabeledQueueCapacity(queue, label, MAXIMUM_CAPACITY, 100f);
659 }
660
661 public String getDefaultNodeLabelExpression(String queue) {
662 String defaultLabelExpression = get(getQueuePrefix(queue)
663 + DEFAULT_NODE_LABEL_EXPRESSION);
664 if (defaultLabelExpression == null) {
665 return null;
666 }
667 return defaultLabelExpression.trim();
668 }
669
670 public void setDefaultNodeLabelExpression(String queue, String exp) {
671 set(getQueuePrefix(queue) + DEFAULT_NODE_LABEL_EXPRESSION, exp);
672 }
673
674 public float getMaximumAMResourcePercentPerPartition(String queue,
675 String label) {
676 // If per-partition max-am-resource-percent is not configured,
677 // use default value as max-am-resource-percent for this queue.
678 return getFloat(getNodeLabelPrefix(queue, label)
679 + MAXIMUM_AM_RESOURCE_SUFFIX,
680 getMaximumApplicationMasterResourcePerQueuePercent(queue));
681 }
682
683 public void setMaximumAMResourcePercentPerPartition(String queue,
684 String label, float percent) {
685 setFloat(getNodeLabelPrefix(queue, label)
686 + MAXIMUM_AM_RESOURCE_SUFFIX, percent);
687 }
688
689 /*
690 * Returns whether we should continue to look at all heart beating nodes even
691 * after the reservation limit was hit. The node heart beating in could
692 * satisfy the request thus could be a better pick then waiting for the
693 * reservation to be fullfilled. This config is refreshable.
694 */
695 public boolean getReservationContinueLook() {
696 return getBoolean(RESERVE_CONT_LOOK_ALL_NODES,
697 DEFAULT_RESERVE_CONT_LOOK_ALL_NODES);
698 }
699
700 private static String getAclKey(QueueACL acl) {
701 return "acl_" + StringUtils.toLowerCase(acl.toString());
702 }
703
704 public AccessControlList getAcl(String queue, QueueACL acl) {
705 String queuePrefix = getQueuePrefix(queue);
706 // The root queue defaults to all access if not defined
707 // Sub queues inherit access if not defined
708 String defaultAcl = queue.equals(ROOT) ? ALL_ACL : NONE_ACL;
709 String aclString = get(queuePrefix + getAclKey(acl), defaultAcl);
710 return new AccessControlList(aclString);
711 }
712
713 public void setAcl(String queue, QueueACL acl, String aclString) {
714 String queuePrefix = getQueuePrefix(queue);
715 set(queuePrefix + getAclKey(acl), aclString);
716 }
717
718 private static String getAclKey(ReservationACL acl) {
719 return "acl_" + StringUtils.toLowerCase(acl.toString());
720 }
721
722 private static String getAclKey(AccessType acl) {
723 return "acl_" + StringUtils.toLowerCase(acl.toString());
724 }
725
726 @Override
727 public Map<ReservationACL, AccessControlList> getReservationAcls(String
728 queue) {
729 Map<ReservationACL, AccessControlList> resAcls = new HashMap<>();
730 for (ReservationACL acl : ReservationACL.values()) {
731 resAcls.put(acl, getReservationAcl(queue, acl));
732 }
733 return resAcls;
734 }
735
736 private AccessControlList getReservationAcl(String queue, ReservationACL
737 acl) {
738 String queuePrefix = getQueuePrefix(queue);
739 // The root queue defaults to all access if not defined
740 // Sub queues inherit access if not defined
741 String defaultAcl = ALL_ACL;
742 String aclString = get(queuePrefix + getAclKey(acl), defaultAcl);
743 return new AccessControlList(aclString);
744 }
745
746 private void setAcl(String queue, ReservationACL acl, String aclString) {
747 String queuePrefix = getQueuePrefix(queue);
748 set(queuePrefix + getAclKey(acl), aclString);
749 }
750
751 private void setAcl(String queue, AccessType acl, String aclString) {
752 String queuePrefix = getQueuePrefix(queue);
753 set(queuePrefix + getAclKey(acl), aclString);
754 }
755
756 public Map<AccessType, AccessControlList> getAcls(String queue) {
757 Map<AccessType, AccessControlList> acls =
758 new HashMap<AccessType, AccessControlList>();
759 for (QueueACL acl : QueueACL.values()) {
760 acls.put(SchedulerUtils.toAccessType(acl), getAcl(queue, acl));
761 }
762 return acls;
763 }
764
765 public void setAcls(String queue, Map<QueueACL, AccessControlList> acls) {
766 for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
767 setAcl(queue, e.getKey(), e.getValue().getAclString());
768 }
769 }
770
771 @VisibleForTesting
772 public void setReservationAcls(String queue,
773 Map<ReservationACL, AccessControlList> acls) {
774 for (Map.Entry<ReservationACL, AccessControlList> e : acls.entrySet()) {
775 setAcl(queue, e.getKey(), e.getValue().getAclString());
776 }
777 }
778
779 @VisibleForTesting
780 public void setPriorityAcls(String queue, Priority priority,
781 Priority defaultPriority, String[] acls) {
782 StringBuilder aclString = new StringBuilder();
783
784 StringBuilder userAndGroup = new StringBuilder();
785 for (int i = 0; i < acls.length; i++) {
786 userAndGroup.append(AppPriorityACLKeyType.values()[i] + "=" + acls[i].trim())
787 .append(" ");
788 }
789
790 aclString.append("[" + userAndGroup.toString().trim() + " "
791 + "max_priority=" + priority.getPriority() + " " + "default_priority="
792 + defaultPriority.getPriority() + "]");
793
794 setAcl(queue, AccessType.APPLICATION_MAX_PRIORITY, aclString.toString());
795 }
796
797 public List<AppPriorityACLGroup> getPriorityAcls(String queue,
798 Priority clusterMaxPriority) {
799 String queuePrefix = getQueuePrefix(queue);
800 String defaultAcl = ALL_ACL;
801 String aclString = get(
802 queuePrefix + getAclKey(AccessType.APPLICATION_MAX_PRIORITY),
803 defaultAcl);
804
805 return priorityACLConfig.getPriorityAcl(clusterMaxPriority, aclString);
806 }
807
808 public String[] getQueues(String queue) {
809 LOG.debug("CSConf - getQueues called for: queuePrefix=" + getQueuePrefix(queue));
810 String[] queues = getStrings(getQueuePrefix(queue) + QUEUES);
811 List<String> trimmedQueueNames = new ArrayList<String>();
812 if (null != queues) {
813 for (String s : queues) {
814 trimmedQueueNames.add(s.trim());
815 }
816 queues = trimmedQueueNames.toArray(new String[0]);
817 }
818
819 LOG.debug("CSConf - getQueues: queuePrefix=" + getQueuePrefix(queue) +
820 ", queues=" + ((queues == null) ? "" : StringUtils.arrayToString(queues)));
821 return queues;
822 }
823
824 public void setQueues(String queue, String[] subQueues) {
825 set(getQueuePrefix(queue) + QUEUES, StringUtils.arrayToString(subQueues));
826 LOG.debug("CSConf - setQueues: qPrefix=" + getQueuePrefix(queue) +
827 ", queues=" + StringUtils.arrayToString(subQueues));
828 }
829
830 public Resource getMinimumAllocation() {
831 int minimumMemory = getInt(
832 YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
833 YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
834 int minimumCores = getInt(
835 YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
836 YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
837 return Resources.createResource(minimumMemory, minimumCores);
838 }
839
840 @Private
841 public Priority getQueuePriority(String queue) {
842 String queuePolicyPrefix = getQueuePrefix(queue);
843 Priority pri = Priority.newInstance(
844 getInt(queuePolicyPrefix + "priority", 0));
845 return pri;
846 }
847
848 @Private
849 public void setQueuePriority(String queue, int priority) {
850 String queuePolicyPrefix = getQueuePrefix(queue);
851 setInt(queuePolicyPrefix + "priority", priority);
852 }
853
854 /**
855 * Get the per queue setting for the maximum limit to allocate to
856 * each container request.
857 *
858 * @param queue
859 * name of the queue
860 * @return setting specified per queue else falls back to the cluster setting
861 */
862 public Resource getMaximumAllocationPerQueue(String queue) {
863 // Only support to specify memory and vcores maximum allocation per queue
864 // for now.
865 String queuePrefix = getQueuePrefix(queue);
866 long maxAllocationMbPerQueue = getInt(queuePrefix + MAXIMUM_ALLOCATION_MB,
867 (int)UNDEFINED);
868 int maxAllocationVcoresPerQueue = getInt(
869 queuePrefix + MAXIMUM_ALLOCATION_VCORES, (int)UNDEFINED);
870 if (LOG.isDebugEnabled()) {
871 LOG.debug("max alloc mb per queue for " + queue + " is "
872 + maxAllocationMbPerQueue);
873 LOG.debug("max alloc vcores per queue for " + queue + " is "
874 + maxAllocationVcoresPerQueue);
875 }
876 Resource clusterMax = ResourceUtils.fetchMaximumAllocationFromConfig(this);
877 if (maxAllocationMbPerQueue == (int)UNDEFINED) {
878 LOG.info("max alloc mb per queue for " + queue + " is undefined");
879 maxAllocationMbPerQueue = clusterMax.getMemorySize();
880 }
881 if (maxAllocationVcoresPerQueue == (int)UNDEFINED) {
882 LOG.info("max alloc vcore per queue for " + queue + " is undefined");
883 maxAllocationVcoresPerQueue = clusterMax.getVirtualCores();
884 }
885 // Copy from clusterMax and overwrite per-queue's maximum memory/vcore
886 // allocation.
887 Resource result = Resources.clone(clusterMax);
888 result.setMemorySize(maxAllocationMbPerQueue);
889 result.setVirtualCores(maxAllocationVcoresPerQueue);
890 if (maxAllocationMbPerQueue > clusterMax.getMemorySize()
891 || maxAllocationVcoresPerQueue > clusterMax.getVirtualCores()) {
892 throw new IllegalArgumentException(
893 "Queue maximum allocation cannot be larger than the cluster setting"
894 + " for queue " + queue
895 + " max allocation per queue: " + result
896 + " cluster setting: " + clusterMax);
897 }
898 return result;
899 }
900
901 public boolean getEnableUserMetrics() {
902 return getBoolean(ENABLE_USER_METRICS, DEFAULT_ENABLE_USER_METRICS);
903 }
904
905 public int getOffSwitchPerHeartbeatLimit() {
906 int limit = getInt(OFFSWITCH_PER_HEARTBEAT_LIMIT,
907 DEFAULT_OFFSWITCH_PER_HEARTBEAT_LIMIT);
908 if (limit < 1) {
909 LOG.warn(OFFSWITCH_PER_HEARTBEAT_LIMIT + "(" + limit + ") < 1. Using 1.");
910 limit = 1;
911 }
912 return limit;
913 }
914
915 public void setOffSwitchPerHeartbeatLimit(int limit) {
916 setInt(OFFSWITCH_PER_HEARTBEAT_LIMIT, limit);
917 }
918
919 public int getNodeLocalityDelay() {
920 return getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY);
921 }
922
923 @VisibleForTesting
924 public void setNodeLocalityDelay(int nodeLocalityDelay) {
925 setInt(NODE_LOCALITY_DELAY, nodeLocalityDelay);
926 }
927
928 public int getRackLocalityAdditionalDelay() {
929 return getInt(RACK_LOCALITY_ADDITIONAL_DELAY,
930 DEFAULT_RACK_LOCALITY_ADDITIONAL_DELAY);
931 }
932
933 public boolean getRackLocalityFullReset() {
934 return getBoolean(RACK_LOCALITY_FULL_RESET,
935 DEFAULT_RACK_LOCALITY_FULL_RESET);
936 }
937
938 public ResourceCalculator getResourceCalculator() {
939 return ReflectionUtils.newInstance(
940 getClass(
941 RESOURCE_CALCULATOR_CLASS,
942 DEFAULT_RESOURCE_CALCULATOR_CLASS,
943 ResourceCalculator.class),
944 this);
945 }
946
947 public boolean getUsePortForNodeName() {
948 return getBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
949 YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
950 }
951
952 public void setResourceComparator(
953 Class<? extends ResourceCalculator> resourceCalculatorClass) {
954 setClass(
955 RESOURCE_CALCULATOR_CLASS,
956 resourceCalculatorClass,
957 ResourceCalculator.class);
958 }
959
960 public boolean getScheduleAynschronously() {
961 return getBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE,
962 DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE);
963 }
964
965 public void setScheduleAynschronously(boolean async) {
966 setBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE, async);
967 }
968
969 public boolean getOverrideWithQueueMappings() {
970 return getBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE,
971 DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE);
972 }
973
974 @Private
975 @VisibleForTesting
976 public void setOverrideWithQueueMappings(boolean overrideWithQueueMappings) {
977 setBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE, overrideWithQueueMappings);
978 }
979
980 public List<QueueMappingEntity> getQueueMappingEntity(
981 String queueMappingSuffix) {
982 String queueMappingName = buildQueueMappingRuleProperty(queueMappingSuffix);
983
984 List<QueueMappingEntity> mappings =
985 new ArrayList<QueueMappingEntity>();
986 Collection<String> mappingsString =
987 getTrimmedStringCollection(queueMappingName);
988 for (String mappingValue : mappingsString) {
989 String[] mapping =
990 getTrimmedStringCollection(mappingValue, ":")
991 .toArray(new String[] {});
992 if (mapping.length != 2 || mapping[1].length() == 0) {
993 throw new IllegalArgumentException(
994 "Illegal queue mapping " + mappingValue);
995 }
996
997 QueueMappingEntity m = new QueueMappingEntity(mapping[0], mapping[1]);
998
999 mappings.add(m);
1000 }
1001
1002 return mappings;
1003 }
1004
1005 private String buildQueueMappingRuleProperty (String queueMappingSuffix) {
1006 StringBuilder queueMapping = new StringBuilder();
1007 queueMapping.append(YarnConfiguration.QUEUE_PLACEMENT_RULES)
1008 .append(".").append(queueMappingSuffix);
1009 return queueMapping.toString();
1010 }
1011
1012 @VisibleForTesting
1013 public void setQueueMappingEntities(List<QueueMappingEntity> queueMappings,
1014 String queueMappingSuffix) {
1015 if (queueMappings == null) {
1016 return;
1017 }
1018
1019 List<String> queueMappingStrs = new ArrayList<>();
1020 for (QueueMappingEntity mapping : queueMappings) {
1021 queueMappingStrs.add(mapping.toString());
1022 }
1023
1024 String mappingRuleProp = buildQueueMappingRuleProperty(queueMappingSuffix);
1025 setStrings(mappingRuleProp, StringUtils.join(",", queueMappingStrs));
1026 }
1027
1028 /**
1029 * Returns a collection of strings, trimming leading and trailing whitespeace
1030 * on each value
1031 *
1032 * @param str
1033 * String to parse
1034 * @param delim
1035 * delimiter to separate the values
1036 * @return Collection of parsed elements.
1037 */
1038 private static Collection<String> getTrimmedStringCollection(String str,
1039 String delim) {
1040 List<String> values = new ArrayList<String>();
1041 if (str == null)
1042 return values;
1043 StringTokenizer tokenizer = new StringTokenizer(str, delim);
1044 while (tokenizer.hasMoreTokens()) {
1045 String next = tokenizer.nextToken();
1046 if (next == null || next.trim().isEmpty()) {
1047 continue;
1048 }
1049 values.add(next.trim());
1050 }
1051 return values;
1052 }
1053
1054 /**
1055 * Get user/group mappings to queues.
1056 *
1057 * @return user/groups mappings or null on illegal configs
1058 */
1059 public List<QueueMapping> getQueueMappings() {
1060 List<QueueMapping> mappings =
1061 new ArrayList<QueueMapping>();
1062 Collection<String> mappingsString =
1063 getTrimmedStringCollection(QUEUE_MAPPING);
1064 for (String mappingValue : mappingsString) {
1065 String[] mapping =
1066 getTrimmedStringCollection(mappingValue, ":")
1067 .toArray(new String[] {});
1068 if (mapping.length != 3 || mapping[1].length() == 0
1069 || mapping[2].length() == 0) {
1070 throw new IllegalArgumentException(
1071 "Illegal queue mapping " + mappingValue);
1072 }
1073
1074 QueueMapping m;
1075 try {
1076 QueueMapping.MappingType mappingType;
1077 if (mapping[0].equals("u")) {
1078 mappingType = QueueMapping.MappingType.USER;
1079 } else if (mapping[0].equals("g")) {
1080 mappingType = QueueMapping.MappingType.GROUP;
1081 } else {
1082 throw new IllegalArgumentException(
1083 "unknown mapping prefix " + mapping[0]);
1084 }
1085 m = new QueueMapping(
1086 mappingType,
1087 mapping[1],
1088 mapping[2]);
1089 } catch (Throwable t) {
1090 throw new IllegalArgumentException(
1091 "Illegal queue mapping " + mappingValue);
1092 }
1093
1094 if (m != null) {
1095 mappings.add(m);
1096 }
1097 }
1098
1099 return mappings;
1100 }
1101
1102 @Private
1103 @VisibleForTesting
1104 public void setQueuePlacementRules(Collection<String> queuePlacementRules) {
1105 if (queuePlacementRules == null) {
1106 return;
1107 }
1108 String str = StringUtils.join(",", queuePlacementRules);
1109 setStrings(YarnConfiguration.QUEUE_PLACEMENT_RULES, str);
1110 }
1111
1112 @Private
1113 @VisibleForTesting
1114 public void setQueueMappings(List<QueueMapping> queueMappings) {
1115 if (queueMappings == null) {
1116 return;
1117 }
1118
1119 List<String> queueMappingStrs = new ArrayList<>();
1120 for (QueueMapping mapping : queueMappings) {
1121 queueMappingStrs.add(mapping.toString());
1122 }
1123
1124 setStrings(QUEUE_MAPPING, StringUtils.join(",", queueMappingStrs));
1125 }
1126
1127 public boolean isReservable(String queue) {
1128 boolean isReservable =
1129 getBoolean(getQueuePrefix(queue) + IS_RESERVABLE, false);
1130 return isReservable;
1131 }
1132
1133 public void setReservable(String queue, boolean isReservable) {
1134 setBoolean(getQueuePrefix(queue) + IS_RESERVABLE, isReservable);
1135 LOG.debug("here setReservableQueue: queuePrefix=" + getQueuePrefix(queue)
1136 + ", isReservableQueue=" + isReservable(queue));
1137 }
1138
1139 @Override
1140 public long getReservationWindow(String queue) {
1141 long reservationWindow =
1142 getLong(getQueuePrefix(queue) + RESERVATION_WINDOW,
1143 DEFAULT_RESERVATION_WINDOW);
1144 return reservationWindow;
1145 }
1146
1147 @Override
1148 public float getAverageCapacity(String queue) {
1149 float avgCapacity =
1150 getFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY,
1151 MAXIMUM_CAPACITY_VALUE);
1152 return avgCapacity;
1153 }
1154
1155 @Override
1156 public float getInstantaneousMaxCapacity(String queue) {
1157 float instMaxCapacity =
1158 getFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY,
1159 MAXIMUM_CAPACITY_VALUE);
1160 return instMaxCapacity;
1161 }
1162
1163 public void setInstantaneousMaxCapacity(String queue, float instMaxCapacity) {
1164 setFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY,
1165 instMaxCapacity);
1166 }
1167
1168 public void setReservationWindow(String queue, long reservationWindow) {
1169 setLong(getQueuePrefix(queue) + RESERVATION_WINDOW, reservationWindow);
1170 }
1171
1172 public void setAverageCapacity(String queue, float avgCapacity) {
1173 setFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY, avgCapacity);
1174 }
1175
1176 @Override
1177 public String getReservationAdmissionPolicy(String queue) {
1178 String reservationPolicy =
1179 get(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY,
1180 DEFAULT_RESERVATION_ADMISSION_POLICY);
1181 return reservationPolicy;
1182 }
1183
1184 public void setReservationAdmissionPolicy(String queue,
1185 String reservationPolicy) {
1186 set(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY, reservationPolicy);
1187 }
1188
1189 @Override
1190 public String getReservationAgent(String queue) {
1191 String reservationAgent =
1192 get(getQueuePrefix(queue) + RESERVATION_AGENT_NAME,
1193 DEFAULT_RESERVATION_AGENT_NAME);
1194 return reservationAgent;
1195 }
1196
1197 public void setReservationAgent(String queue, String reservationPolicy) {
1198 set(getQueuePrefix(queue) + RESERVATION_AGENT_NAME, reservationPolicy);
1199 }
1200
1201 @Override
1202 public boolean getShowReservationAsQueues(String queuePath) {
1203 boolean showReservationAsQueues =
1204 getBoolean(getQueuePrefix(queuePath)
1205 + RESERVATION_SHOW_RESERVATION_AS_QUEUE,
1206 DEFAULT_SHOW_RESERVATIONS_AS_QUEUES);
1207 return showReservationAsQueues;
1208 }
1209
1210 @Override
1211 public String getReplanner(String queue) {
1212 String replanner =
1213 get(getQueuePrefix(queue) + RESERVATION_PLANNER_NAME,
1214 DEFAULT_RESERVATION_PLANNER_NAME);
1215 return replanner;
1216 }
1217
1218 @Override
1219 public boolean getMoveOnExpiry(String queue) {
1220 boolean killOnExpiry =
1221 getBoolean(getQueuePrefix(queue) + RESERVATION_MOVE_ON_EXPIRY,
1222 DEFAULT_RESERVATION_MOVE_ON_EXPIRY);
1223 return killOnExpiry;
1224 }
1225
1226 @Override
1227 public long getEnforcementWindow(String queue) {
1228 long enforcementWindow =
1229 getLong(getQueuePrefix(queue) + RESERVATION_ENFORCEMENT_WINDOW,
1230 DEFAULT_RESERVATION_ENFORCEMENT_WINDOW);
1231 return enforcementWindow;
1232 }
1233
1234 /**
1235 * Sets the <em>disable_preemption</em> property in order to indicate
1236 * whether or not container preemption will be disabled for the specified
1237 * queue.
1238 *
1239 * @param queue queue path
1240 * @param preemptionDisabled true if preemption is disabled on queue
1241 */
1242 public void setPreemptionDisabled(String queue, boolean preemptionDisabled) {
1243 setBoolean(getQueuePrefix(queue) + QUEUE_PREEMPTION_DISABLED,
1244 preemptionDisabled);
1245 }
1246
1247 /**
1248 * Indicates whether preemption is disabled on the specified queue.
1249 *
1250 * @param queue queue path to query
1251 * @param defaultVal used as default if the <em>disable_preemption</em>
1252 * is not set in the configuration
1253 * @return true if preemption is disabled on <em>queue</em>, false otherwise
1254 */
1255 public boolean getPreemptionDisabled(String queue, boolean defaultVal) {
1256 boolean preemptionDisabled =
1257 getBoolean(getQueuePrefix(queue) + QUEUE_PREEMPTION_DISABLED,
1258 defaultVal);
1259 return preemptionDisabled;
1260 }
1261
1262 /**
1263 * Indicates whether intra-queue preemption is disabled on the specified queue
1264 *
1265 * @param queue queue path to query
1266 * @param defaultVal used as default if the property is not set in the
1267 * configuration
1268 * @return true if preemption is disabled on queue, false otherwise
1269 */
1270 public boolean getIntraQueuePreemptionDisabled(String queue,
1271 boolean defaultVal) {
1272 return
1273 getBoolean(getQueuePrefix(queue) + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX
1274 + QUEUE_PREEMPTION_DISABLED, defaultVal);
1275 }
1276
1277 /**
1278 * Get configured node labels in a given queuePath
1279 */
1280 public Set<String> getConfiguredNodeLabels(String queuePath) {
1281 Set<String> configuredNodeLabels = new HashSet<String>();
1282 Entry<String, String> e = null;
1283
1284 Iterator<Entry<String, String>> iter = iterator();
1285 while (iter.hasNext()) {
1286 e = iter.next();
1287 String key = e.getKey();
1288
1289 if (key.startsWith(getQueuePrefix(queuePath) + ACCESSIBLE_NODE_LABELS
1290 + DOT)) {
1291 // Find <label-name> in
1292 // <queue-path>.accessible-node-labels.<label-name>.property
1293 int labelStartIdx =
1294 key.indexOf(ACCESSIBLE_NODE_LABELS)
1295 + ACCESSIBLE_NODE_LABELS.length() + 1;
1296 int labelEndIndx = key.indexOf('.', labelStartIdx);
1297 String labelName = key.substring(labelStartIdx, labelEndIndx);
1298 configuredNodeLabels.add(labelName);
1299 }
1300 }
1301
1302 // always add NO_LABEL
1303 configuredNodeLabels.add(RMNodeLabelsManager.NO_LABEL);
1304
1305 return configuredNodeLabels;
1306 }
1307
1308 public Integer getDefaultApplicationPriorityConfPerQueue(String queue) {
1309 Integer defaultPriority = getInt(getQueuePrefix(queue)
1310 + DEFAULT_APPLICATION_PRIORITY,
1311 DEFAULT_CONFIGURATION_APPLICATION_PRIORITY);
1312 return defaultPriority;
1313 }
1314
1315 @VisibleForTesting
1316 public void setOrderingPolicy(String queue, String policy) {
1317 set(getQueuePrefix(queue) + ORDERING_POLICY, policy);
1318 }
1319
1320 @VisibleForTesting
1321 public void setOrderingPolicyParameter(String queue,
1322 String parameterKey, String parameterValue) {
1323 set(getQueuePrefix(queue) + ORDERING_POLICY + "." + parameterKey,
1324 parameterValue);
1325 }
1326
1327 public boolean getLazyPreemptionEnabled() {
1328 return getBoolean(LAZY_PREEMPTION_ENABLED, DEFAULT_LAZY_PREEMPTION_ENABLED);
1329 }
1330
1331 private static final String PREEMPTION_CONFIG_PREFIX =
1332 "yarn.resourcemanager.monitor.capacity.preemption.";
1333
1334 private static final String INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX =
1335 "intra-queue-preemption.";
1336
1337 /** If true, run the policy but do not affect the cluster with preemption and
1338 * kill events. */
1339 public static final String PREEMPTION_OBSERVE_ONLY =
1340 PREEMPTION_CONFIG_PREFIX + "observe_only";
1341 public static final boolean DEFAULT_PREEMPTION_OBSERVE_ONLY = false;
1342
1343 /** Time in milliseconds between invocations of this policy */
1344 public static final String PREEMPTION_MONITORING_INTERVAL =
1345 PREEMPTION_CONFIG_PREFIX + "monitoring_interval";
1346 public static final long DEFAULT_PREEMPTION_MONITORING_INTERVAL = 3000L;
1347
1348 /** Time in milliseconds between requesting a preemption from an application
1349 * and killing the container. */
1350 public static final String PREEMPTION_WAIT_TIME_BEFORE_KILL =
1351 PREEMPTION_CONFIG_PREFIX + "max_wait_before_kill";
1352 public static final long DEFAULT_PREEMPTION_WAIT_TIME_BEFORE_KILL = 15000L;
1353
1354 /** Maximum percentage of resources preemptionCandidates in a single round. By
1355 * controlling this value one can throttle the pace at which containers are
1356 * reclaimed from the cluster. After computing the total desired preemption,
1357 * the policy scales it back within this limit. */
1358 public static final String TOTAL_PREEMPTION_PER_ROUND =
1359 PREEMPTION_CONFIG_PREFIX + "total_preemption_per_round";
1360 public static final float DEFAULT_TOTAL_PREEMPTION_PER_ROUND = 0.1f;
1361
1362 /** Maximum amount of resources above the target capacity ignored for
1363 * preemption. This defines a deadzone around the target capacity that helps
1364 * prevent thrashing and oscillations around the computed target balance.
1365 * High values would slow the time to capacity and (absent natural
1366 * completions) it might prevent convergence to guaranteed capacity. */
1367 public static final String PREEMPTION_MAX_IGNORED_OVER_CAPACITY =
1368 PREEMPTION_CONFIG_PREFIX + "max_ignored_over_capacity";
1369 public static final double DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY = 0.1;
1370 /**
1371 * Given a computed preemption target, account for containers naturally
1372 * expiring and preempt only this percentage of the delta. This determines
1373 * the rate of geometric convergence into the deadzone ({@link
1374 * #PREEMPTION_MAX_IGNORED_OVER_CAPACITY}). For example, a termination factor of 0.5
1375 * will reclaim almost 95% of resources within 5 * {@link
1376 * #PREEMPTION_WAIT_TIME_BEFORE_KILL}, even absent natural termination. */
1377 public static final String PREEMPTION_NATURAL_TERMINATION_FACTOR =
1378 PREEMPTION_CONFIG_PREFIX + "natural_termination_factor";
1379 public static final double DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR =
1380 0.2;
1381
1382 /**
1383 * By default, reserved resource will be excluded while balancing capacities
1384 * of queues.
1385 *
1386 * Why doing this? In YARN-4390, we added preemption-based-on-reserved-container
1387 * Support. To reduce unnecessary preemption for large containers. We will
1388 * not include reserved resources while calculating ideal-allocation in
1389 * FifoCandidatesSelector.
1390 *
1391 * Changes in YARN-4390 will significantly reduce number of containers preempted
1392 * When cluster has heterogeneous container requests. (Please check test
1393 * report: https://issues.apache.org/jira/secure/attachment/12796197/YARN-4390-test-results.pdf
1394 *
1395 * However, on the other hand, in some corner cases, especially for
1396 * fragmented cluster. It could lead to preemption cannot kick in in some
1397 * cases. Please see YARN-5731.
1398 *
1399 * So to solve the problem, make this change to be configurable, and please
1400 * note that it is an experimental option.
1401 */
1402 public static final String
1403 ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS =
1404 PREEMPTION_CONFIG_PREFIX
1405 + "additional_res_balance_based_on_reserved_containers";
1406 public static final boolean
1407 DEFAULT_ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS = false;
1408
1409 /**
1410 * When calculating which containers to be preempted, we will try to preempt
1411 * containers for reserved containers first. By default is false.
1412 */
1413 public static final String PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS =
1414 PREEMPTION_CONFIG_PREFIX + "select_based_on_reserved_containers";
1415 public static final boolean DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS =
1416 false;
1417
1418 /**
1419 * For intra-queue preemption, priority/user-limit/fairness based selectors
1420 * can help to preempt containers.
1421 */
1422 public static final String INTRAQUEUE_PREEMPTION_ENABLED =
1423 PREEMPTION_CONFIG_PREFIX +
1424 INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "enabled";
1425 public static final boolean DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED = false;
1426
1427 /**
1428 * For intra-queue preemption, consider those queues which are above used cap
1429 * limit.
1430 */
1431 public static final String INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD =
1432 PREEMPTION_CONFIG_PREFIX +
1433 INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "minimum-threshold";
1434 public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD =
1435 0.5f;
1436
1437 /**
1438 * For intra-queue preemption, allowable maximum-preemptable limit per queue.
1439 */
1440 public static final String INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT =
1441 PREEMPTION_CONFIG_PREFIX +
1442 INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "max-allowable-limit";
1443 public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT =
1444 0.2f;
1445
1446 /**
1447 * For intra-queue preemption, enforce a preemption order such as
1448 * "userlimit_first" or "priority_first".
1449 */
1450 public static final String INTRAQUEUE_PREEMPTION_ORDER_POLICY = PREEMPTION_CONFIG_PREFIX
1451 + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "preemption-order-policy";
1452 public static final String DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY = "userlimit_first";
1453
1454 /**
1455 * Maximum application for a queue to be used when application per queue is
1456 * not defined.To be consistent with previous version the default value is set
1457 * as UNDEFINED.
1458 */
1459 @Private
1460 public static final String QUEUE_GLOBAL_MAX_APPLICATION =
1461 PREFIX + "global-queue-max-application";
1462
1463 public int getGlobalMaximumApplicationsPerQueue() {
1464 int maxApplicationsPerQueue =
1465 getInt(QUEUE_GLOBAL_MAX_APPLICATION, (int) UNDEFINED);
1466 return maxApplicationsPerQueue;
1467 }
1468
1469 public void setGlobalMaximumApplicationsPerQueue(int val) {
1470 setInt(QUEUE_GLOBAL_MAX_APPLICATION, val);
1471 }
1472
1473 /**
1474 * Ordering policy inside a parent queue to sort queues
1475 */
1476
1477 /**
1478 * Less relative usage queue can get next resource, this is default
1479 */
1480 public static final String QUEUE_UTILIZATION_ORDERING_POLICY = "utilization";
1481
1482 /**
1483 * Combination of relative usage and priority
1484 */
1485 public static final String QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY =
1486 "priority-utilization";
1487
1488 public static final String DEFAULT_QUEUE_ORDERING_POLICY =
1489 QUEUE_UTILIZATION_ORDERING_POLICY;
1490
1491
1492 @Private
1493 public void setQueueOrderingPolicy(String queue, String policy) {
1494 set(getQueuePrefix(queue) + ORDERING_POLICY, policy);
1495 }
1496
1497 @Private
1498 public QueueOrderingPolicy getQueueOrderingPolicy(String queue,
1499 String parentPolicy) {
1500 String defaultPolicy = parentPolicy;
1501 if (null == defaultPolicy) {
1502 defaultPolicy = DEFAULT_QUEUE_ORDERING_POLICY;
1503 }
1504
1505 String policyType = get(getQueuePrefix(queue) + ORDERING_POLICY,
1506 defaultPolicy);
1507
1508 QueueOrderingPolicy qop;
1509 if (policyType.trim().equals(QUEUE_UTILIZATION_ORDERING_POLICY)) {
1510 // Doesn't respect priority
1511 qop = new PriorityUtilizationQueueOrderingPolicy(false);
1512 } else if (policyType.trim().equals(
1513 QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY)) {
1514 qop = new PriorityUtilizationQueueOrderingPolicy(true);
1515 } else {
1516 String message =
1517 "Unable to construct queue ordering policy=" + policyType + " queue="
1518 + queue;
1519 throw new YarnRuntimeException(message);
1520 }
1521
1522 return qop;
1523 }
1524
1525 /*
1526 * Get global configuration for ordering policies
1527 */
1528 private String getOrderingPolicyGlobalConfigKey(String orderPolicyName,
1529 String configKey) {
1530 return PREFIX + ORDERING_POLICY + DOT + orderPolicyName + DOT + configKey;
1531 }
1532
1533 /**
1534 * Global configurations of queue-priority-utilization ordering policy
1535 */
1536 private static final String UNDER_UTILIZED_PREEMPTION_ENABLED =
1537 "underutilized-preemption.enabled";
1538
1539 /**
1540 * Do we allow under-utilized queue with higher priority to preempt queue
1541 * with lower priority *even if queue with lower priority is not satisfied*.
1542 *
1543 * For example, two queues, a and b
1544 * a.priority = 1, (a.used-capacity - a.reserved-capacity) = 40%
1545 * b.priority = 0, b.used-capacity = 30%
1546 *
1547 * Set this configuration to true to allow queue-a to preempt container from
1548 * queue-b.
1549 *
1550 * (The reason why deduct reserved-capacity from used-capacity for queue with
1551 * higher priority is: the reserved-capacity is just scheduler's internal
1552 * implementation to allocate large containers, it is not possible for
1553 * application to use such reserved-capacity. It is possible that a queue with
1554 * large container requests have a large number of containers but cannot
1555 * allocate from any of them. But scheduler will make sure a satisfied queue
1556 * will not preempt resource from any other queues. A queue is considered to
1557 * be satisfied when queue's used-capacity - reserved-capacity ≥
1558 * guaranteed-capacity.)
1559 *
1560 * @return allowed or not
1561 */
1562 public boolean getPUOrderingPolicyUnderUtilizedPreemptionEnabled() {
1563 return getBoolean(getOrderingPolicyGlobalConfigKey(
1564 QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
1565 UNDER_UTILIZED_PREEMPTION_ENABLED), false);
1566 }
1567
1568 @VisibleForTesting
1569 public void setPUOrderingPolicyUnderUtilizedPreemptionEnabled(
1570 boolean enabled) {
1571 setBoolean(getOrderingPolicyGlobalConfigKey(
1572 QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
1573 UNDER_UTILIZED_PREEMPTION_ENABLED), enabled);
1574 }
1575
1576 private static final String UNDER_UTILIZED_PREEMPTION_DELAY =
1577 "underutilized-preemption.reserved-container-delay-ms";
1578
1579 /**
1580 * When a reserved container of an underutilized queue is created. Preemption
1581 * will kick in after specified delay (in ms).
1582 *
1583 * The total time to preempt resources for a reserved container from higher
1584 * priority queue will be: reserved-container-delay-ms +
1585 * {@link CapacitySchedulerConfiguration#PREEMPTION_WAIT_TIME_BEFORE_KILL}.
1586 *
1587 * This parameter is added to make preemption from lower priority queue which
1588 * is underutilized to be more careful. This parameter takes effect when
1589 * underutilized-preemption.enabled set to true.
1590 *
1591 * @return delay
1592 */
1593 public long getPUOrderingPolicyUnderUtilizedPreemptionDelay() {
1594 return getLong(getOrderingPolicyGlobalConfigKey(
1595 QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
1596 UNDER_UTILIZED_PREEMPTION_DELAY), 60000L);
1597 }
1598
1599 @VisibleForTesting
1600 public void setPUOrderingPolicyUnderUtilizedPreemptionDelay(
1601 long timeout) {
1602 setLong(getOrderingPolicyGlobalConfigKey(
1603 QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
1604 UNDER_UTILIZED_PREEMPTION_DELAY), timeout);
1605 }
1606
1607 private static final String UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION =
1608 "underutilized-preemption.allow-move-reservation";
1609
1610 /**
1611 * When doing preemption from under-satisfied queues for priority queue.
1612 * Do we allow move reserved container from one host to another?
1613 *
1614 * @return allow or not
1615 */
1616 public boolean getPUOrderingPolicyUnderUtilizedPreemptionMoveReservation() {
1617 return getBoolean(getOrderingPolicyGlobalConfigKey(
1618 QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
1619 UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION), false);
1620 }
1621
1622 @VisibleForTesting
1623 public void setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation(
1624 boolean allowMoveReservation) {
1625 setBoolean(getOrderingPolicyGlobalConfigKey(
1626 QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
1627 UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION), allowMoveReservation);
1628 }
1629
1630 /**
1631 * Get the weights of all users at this queue level from the configuration.
1632 * Used in computing user-specific user limit, relative to other users.
1633 * @param queuePath full queue path
1634 * @return map of user weights, if they exists. Otherwise, return empty map.
1635 */
1636 public Map<String, Float> getAllUserWeightsForQueue(String queuePath) {
1637 Map <String, Float> userWeights = new HashMap <String, Float>();
1638 String qPathPlusPrefix =
1639 getQueuePrefix(queuePath).replaceAll("\\.", "\\\\.")
1640 + USER_SETTINGS + "\\.";
1641 String weightKeyRegex =
1642 qPathPlusPrefix + "\\w+\\." + USER_WEIGHT;
1643 Map<String, String> props = getValByRegex(weightKeyRegex);
1644 for (Entry<String, String> e : props.entrySet()) {
1645 String userName =
1646 e.getKey().replaceFirst(qPathPlusPrefix, "")
1647 .replaceFirst("\\." + USER_WEIGHT, "");
1648 if (userName != null && !userName.isEmpty()) {
1649 userWeights.put(userName, new Float(e.getValue()));
1650 }
1651 }
1652 return userWeights;
1653 }
1654
1655 public boolean getAssignMultipleEnabled() {
1656 return getBoolean(ASSIGN_MULTIPLE_ENABLED, DEFAULT_ASSIGN_MULTIPLE_ENABLED);
1657 }
1658
1659 public int getMaxAssignPerHeartbeat() {
1660 return getInt(MAX_ASSIGN_PER_HEARTBEAT, DEFAULT_MAX_ASSIGN_PER_HEARTBEAT);
1661 }
1662
1663 public static final String MAXIMUM_LIFETIME_SUFFIX =
1664 "maximum-application-lifetime";
1665
1666 public static final String DEFAULT_LIFETIME_SUFFIX =
1667 "default-application-lifetime";
1668
1669 public long getMaximumLifetimePerQueue(String queue) {
1670 long maximumLifetimePerQueue = getLong(
1671 getQueuePrefix(queue) + MAXIMUM_LIFETIME_SUFFIX, (long) UNDEFINED);
1672 return maximumLifetimePerQueue;
1673 }
1674
1675 public void setMaximumLifetimePerQueue(String queue, long maximumLifetime) {
1676 setLong(getQueuePrefix(queue) + MAXIMUM_LIFETIME_SUFFIX, maximumLifetime);
1677 }
1678
1679 public long getDefaultLifetimePerQueue(String queue) {
1680 long maximumLifetimePerQueue = getLong(
1681 getQueuePrefix(queue) + DEFAULT_LIFETIME_SUFFIX, (long) UNDEFINED);
1682 return maximumLifetimePerQueue;
1683 }
1684
1685 public void setDefaultLifetimePerQueue(String queue, long defaultLifetime) {
1686 setLong(getQueuePrefix(queue) + DEFAULT_LIFETIME_SUFFIX, defaultLifetime);
1687 }
1688
1689 @Private
1690 public static final boolean DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED = false;
1691
1692 @Private
1693 private static final String AUTO_CREATE_CHILD_QUEUE_PREFIX =
1694 "auto-create-child-queue.";
1695
1696 @Private
1697 public static final String AUTO_CREATE_CHILD_QUEUE_ENABLED =
1698 AUTO_CREATE_CHILD_QUEUE_PREFIX + "enabled";
1699
1700 @Private
1701 public static final String AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX =
1702 "leaf-queue-template";
1703
1704 @Private
1705 public static final String AUTO_CREATE_QUEUE_MAX_QUEUES =
1706 "auto-create-child-queue.max-queues";
1707
1708 @Private
1709 public static final int DEFAULT_AUTO_CREATE_QUEUE_MAX_QUEUES = 1000;
1710
1711 /**
1712 * If true, this queue will be created as a Parent Queue which Auto Created
1713 * leaf child queues
1714 *
1715 * @param queuePath The queues path
1716 * @return true if auto create is enabled for child queues else false. Default
1717 * is false
1718 */
1719 @Private
1720 public boolean isAutoCreateChildQueueEnabled(String queuePath) {
1721 boolean isAutoCreateEnabled = getBoolean(
1722 getQueuePrefix(queuePath) + AUTO_CREATE_CHILD_QUEUE_ENABLED,
1723 DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED);
1724 return isAutoCreateEnabled;
1725 }
1726
1727 @Private
1728 @VisibleForTesting
1729 public void setAutoCreateChildQueueEnabled(String queuePath,
1730 boolean autoCreationEnabled) {
1731 setBoolean(getQueuePrefix(queuePath) +
1732 AUTO_CREATE_CHILD_QUEUE_ENABLED,
1733 autoCreationEnabled);
1734 }
1735
1736 /**
1737 * Get the auto created leaf queue's template configuration prefix
1738 * Leaf queue's template capacities are configured at the parent queue
1739 *
1740 * @param queuePath parent queue's path
1741 * @return Config prefix for leaf queue template configurations
1742 */
1743 @Private
1744 public String getAutoCreatedQueueTemplateConfPrefix(String queuePath) {
1745 return queuePath + DOT + AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX;
1746 }
1747
1748 @Private
1749 public static final String FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY =
1750 "auto-create-child-queue.fail-on-exceeding-parent-capacity";
1751
1752 @Private
1753 public static final boolean DEFAULT_FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY =
1754 false;
1755
1756 /**
1757 * Fail further auto leaf queue creation when parent's guaranteed capacity is
1758 * exceeded.
1759 *
1760 * @param queuePath the parent queue's path
1761 * @return true if configured to fail else false
1762 */
1763 @Private
1764 public boolean getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
1765 String queuePath) {
1766 boolean shouldFailAutoQueueCreationOnExceedingGuaranteedCapacity =
1767 getBoolean(getQueuePrefix(queuePath)
1768 + FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY,
1769 DEFAULT_FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY);
1770 return shouldFailAutoQueueCreationOnExceedingGuaranteedCapacity;
1771 }
1772
1773 @VisibleForTesting
1774 @Private
1775 public void setShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
1776 String queuePath, boolean autoCreationEnabled) {
1777 setBoolean(
1778 getQueuePrefix(queuePath) +
1779 FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY,
1780 autoCreationEnabled);
1781 }
1782
1783 /**
1784 * Get the max number of leaf queues that are allowed to be created under
1785 * a parent queue
1786 *
1787 * @param queuePath the paret queue's path
1788 * @return the max number of leaf queues allowed to be auto created
1789 */
1790 @Private
1791 public int getAutoCreatedQueuesMaxChildQueuesLimit(String queuePath) {
1792 return getInt(getQueuePrefix(queuePath) +
1793 AUTO_CREATE_QUEUE_MAX_QUEUES,
1794 DEFAULT_AUTO_CREATE_QUEUE_MAX_QUEUES);
1795 }
1796
1797 @Private
1798 public static final String AUTO_CREATED_QUEUE_MANAGEMENT_POLICY =
1799 AUTO_CREATE_CHILD_QUEUE_PREFIX + "management-policy";
1800
1801 @Private
1802 public static final String DEFAULT_AUTO_CREATED_QUEUE_MANAGEMENT_POLICY =
1803 "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity"
1804 + ".queuemanagement."
1805 + "GuaranteedOrZeroCapacityOverTimePolicy";
1806
1807 @Private
1808 private static final String QUEUE_MANAGEMENT_CONFIG_PREFIX =
1809 "yarn.resourcemanager.monitor.capacity.queue-management.";
1810
1811 /**
1812 * Time in milliseconds between invocations of this policy
1813 */
1814 @Private
1815 public static final String QUEUE_MANAGEMENT_MONITORING_INTERVAL =
1816 QUEUE_MANAGEMENT_CONFIG_PREFIX + "monitoring-interval";
1817
1818 @Private
1819 public static final long DEFAULT_QUEUE_MANAGEMENT_MONITORING_INTERVAL =
1820 1500L;
1821
1822 /**
1823 * Queue Management computation policy for Auto Created queues
1824 * @param queue The queue's path
1825 * @return Configured policy class name
1826 */
1827 @Private
1828 public String getAutoCreatedQueueManagementPolicy(String queue) {
1829 String autoCreatedQueueManagementPolicy =
1830 get(getQueuePrefix(queue) + AUTO_CREATED_QUEUE_MANAGEMENT_POLICY,
1831 DEFAULT_AUTO_CREATED_QUEUE_MANAGEMENT_POLICY);
1832 return autoCreatedQueueManagementPolicy;
1833 }
1834
1835 /**
1836 * Get The policy class configured to manage capacities for auto created leaf
1837 * queues under the specified parent
1838 *
1839 * @param queueName The parent queue's name
1840 * @return The policy class configured to manage capacities for auto created
1841 * leaf queues under the specified parent queue
1842 */
1843 @Private
1844 protected AutoCreatedQueueManagementPolicy
1845 getAutoCreatedQueueManagementPolicyClass(
1846 String queueName) {
1847
1848 String queueManagementPolicyClassName =
1849 getAutoCreatedQueueManagementPolicy(queueName);
1850 LOG.info("Using Auto Created Queue Management Policy: "
1851 + queueManagementPolicyClassName + " for queue: " + queueName);
1852 try {
1853 Class<?> queueManagementPolicyClazz = getClassByName(
1854 queueManagementPolicyClassName);
1855 if (AutoCreatedQueueManagementPolicy.class.isAssignableFrom(
1856 queueManagementPolicyClazz)) {
1857 return (AutoCreatedQueueManagementPolicy) ReflectionUtils.newInstance(
1858 queueManagementPolicyClazz, this);
1859 } else{
1860 throw new YarnRuntimeException(
1861 "Class: " + queueManagementPolicyClassName + " not instance of "
1862 + AutoCreatedQueueManagementPolicy.class.getCanonicalName());
1863 }
1864 } catch (ClassNotFoundException e) {
1865 throw new YarnRuntimeException(
1866 "Could not instantiate " + "AutoCreatedQueueManagementPolicy: "
1867 + queueManagementPolicyClassName + " for queue: " + queueName,
1868 e);
1869 }
1870 }
1871
1872 @VisibleForTesting
1873 @Private
1874 public void setAutoCreatedLeafQueueConfigCapacity(String queuePath,
1875 float val) {
1876 String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
1877 queuePath);
1878 setCapacity(leafQueueConfPrefix, val);
1879 }
1880
1881 @VisibleForTesting
1882 @Private
1883 public void setAutoCreatedLeafQueueTemplateCapacityByLabel(String queuePath,
1884 String label, float val) {
1885 String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
1886 queuePath);
1887 setCapacityByLabel(leafQueueConfPrefix, label, val);
1888 }
1889
1890 @Private
1891 @VisibleForTesting
1892 public void setAutoCreatedLeafQueueConfigMaxCapacity(String queuePath,
1893 float val) {
1894 String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
1895 queuePath);
1896 setMaximumCapacity(leafQueueConfPrefix, val);
1897 }
1898
1899 @Private
1900 @VisibleForTesting
1901 public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath,
1902 String label, float val) {
1903 String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
1904 queuePath);
1905 setMaximumCapacityByLabel(leafQueueConfPrefix, label, val);
1906 }
1907
1908 @VisibleForTesting
1909 @Private
1910 public void setAutoCreatedLeafQueueConfigUserLimit(String queuePath,
1911 int val) {
1912 String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
1913 queuePath);
1914 setUserLimit(leafQueueConfPrefix, val);
1915 }
1916
1917 @VisibleForTesting
1918 @Private
1919 public void setAutoCreatedLeafQueueConfigUserLimitFactor(String queuePath,
1920 float val) {
1921 String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
1922 queuePath);
1923 setUserLimitFactor(leafQueueConfPrefix, val);
1924 }
1925
1926 @Private
1927 @VisibleForTesting
1928 public void setAutoCreatedLeafQueueConfigDefaultNodeLabelExpression(String
1929 queuePath,
1930 String expression) {
1931 String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
1932 queuePath);
1933 setDefaultNodeLabelExpression(leafQueueConfPrefix, expression);
1934 }
1935
1936 public static String getUnits(String resourceValue) {
1937 String units;
1938 for (int i = 0; i < resourceValue.length(); i++) {
1939 if (Character.isAlphabetic(resourceValue.charAt(i))) {
1940 units = resourceValue.substring(i);
1941 if (StringUtils.isAlpha(units)) {
1942 return units;
1943 }
1944 }
1945 }
1946 return "";
1947 }
1948
1949 /**
1950 * Get absolute minimum resource requirement for a queue.
1951 *
1952 * @param label
1953 * NodeLabel
1954 * @param queue
1955 * queue path
1956 * @param resourceTypes
1957 * Resource types
1958 * @return ResourceInformation
1959 */
1960 public Resource getMinimumResourceRequirement(String label, String queue,
1961 Set<String> resourceTypes) {
1962 return internalGetLabeledResourceRequirementForQueue(queue, label,
1963 resourceTypes, CAPACITY);
1964 }
1965
1966 /**
1967 * Get absolute maximum resource requirement for a queue.
1968 *
1969 * @param label
1970 * NodeLabel
1971 * @param queue
1972 * queue path
1973 * @param resourceTypes
1974 * Resource types
1975 * @return Resource
1976 */
1977 public Resource getMaximumResourceRequirement(String label, String queue,
1978 Set<String> resourceTypes) {
1979 return internalGetLabeledResourceRequirementForQueue(queue, label,
1980 resourceTypes, MAXIMUM_CAPACITY);
1981 }
1982
1983 @VisibleForTesting
1984 public void setMinimumResourceRequirement(String label, String queue,
1985 Resource resource) {
1986 updateMinMaxResourceToConf(label, queue, resource, CAPACITY);
1987 }
1988
1989 @VisibleForTesting
1990 public void setMaximumResourceRequirement(String label, String queue,
1991 Resource resource) {
1992 updateMinMaxResourceToConf(label, queue, resource, MAXIMUM_CAPACITY);
1993 }
1994
1995 private void updateMinMaxResourceToConf(String label, String queue,
1996 Resource resource, String type) {
1997 if (queue.equals("root")) {
1998 throw new IllegalArgumentException(
1999 "Cannot set resource, root queue will take 100% of cluster capacity");
2000 }
2001
2002 StringBuilder resourceString = new StringBuilder();
2003 resourceString
2004 .append("[" + AbsoluteResourceType.MEMORY.toString().toLowerCase() + "="
2005 + resource.getMemorySize() + ","
2006 + AbsoluteResourceType.VCORES.toString().toLowerCase() + "="
2007 + resource.getVirtualCores() + "]");
2008
2009 String prefix = getQueuePrefix(queue) + type;
2010 if (!label.isEmpty()) {
2011 prefix = getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label
2012 + DOT + type;
2013 }
2014 set(prefix, resourceString.toString());
2015 }
2016
2017 private Resource internalGetLabeledResourceRequirementForQueue(String queue,
2018 String label, Set<String> resourceTypes, String suffix) {
2019 String propertyName = getNodeLabelPrefix(queue, label) + suffix;
2020 String resourceString = get(propertyName);
2021 if (resourceString == null || resourceString.isEmpty()) {
2022 return Resources.none();
2023 }
2024
2025 // Define resource here.
2026 Resource resource = Resource.newInstance(0L, 0);
2027 Matcher matcher = RESOURCE_PATTERN.matcher(resourceString);
2028
2029 /*
2030 * Absolute resource configuration for a queue will be grouped by "[]".
2031 * Syntax of absolute resource config could be like below
2032 * "memory=4Gi vcores=2". Ideally this means "4GB of memory and 2 vcores".
2033 */
2034 if (matcher.find()) {
2035 // Get the sub-group.
2036 String subGroup = matcher.group(0);
2037 if (subGroup.trim().isEmpty()) {
2038 return Resources.none();
2039 }
2040
2041 subGroup = subGroup.substring(1, subGroup.length() - 1);
2042 for (String kvPair : subGroup.trim().split(",")) {
2043 String[] splits = kvPair.split("=");
2044
2045 // Ensure that each sub string is key value pair separated by '='.
2046 if (splits != null && splits.length > 1) {
2047 updateResourceValuesFromConfig(resourceTypes, resource, splits);
2048 }
2049 }
2050 }
2051
2052 // Memory has to be configured always.
2053 if (resource.getMemorySize() == 0L) {
2054 return Resources.none();
2055 }
2056
2057 if (LOG.isDebugEnabled()) {
2058 LOG.debug("CSConf - getAbsolueResourcePerQueue: prefix="
2059 + getNodeLabelPrefix(queue, label) + ", capacity=" + resource);
2060 }
2061 return resource;
2062 }
2063
2064 private void updateResourceValuesFromConfig(Set<String> resourceTypes,
2065 Resource resource, String[] splits) {
2066
2067 // If key is not a valid type, skip it.
2068 if (!resourceTypes.contains(splits[0])) {
2069 return;
2070 }
2071
2072 String units = getUnits(splits[1]);
2073 Long resourceValue = Long
2074 .valueOf(splits[1].substring(0, splits[1].length() - units.length()));
2075
2076 // Convert all incoming units to MB if units is configured.
2077 if (!units.isEmpty()) {
2078 resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue);
2079 }
2080
2081 // map it based on key.
2082 AbsoluteResourceType resType = AbsoluteResourceType
2083 .valueOf(StringUtils.toUpperCase(splits[0].trim()));
2084 switch (resType) {
2085 case MEMORY :
2086 resource.setMemorySize(resourceValue);
2087 break;
2088 case VCORES :
2089 resource.setVirtualCores(resourceValue.intValue());
2090 break;
2091 default :
2092 resource.setResourceInformation(splits[0].trim(), ResourceInformation
2093 .newInstance(splits[0].trim(), units, resourceValue));
2094 break;
2095 }
2096 }
2097 }