YARN-7574. Add support for Node Labels on Auto Created Leaf Queue Template. Contribut...
[hadoop.git] / hadoop-yarn-project / hadoop-yarn / hadoop-yarn-server / hadoop-yarn-server-resourcemanager / src / main / java / org / apache / hadoop / yarn / server / resourcemanager / scheduler / capacity / ManagedParentQueue.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 * <p>
10 * http://www.apache.org/licenses/LICENSE-2.0
11 * <p>
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
19
20 import org.apache.hadoop.conf.Configuration;
21 import org.apache.hadoop.yarn.api.records.Resource;
22 import org.apache.hadoop.yarn.exceptions.YarnException;
23 import org.apache.hadoop.yarn.server.resourcemanager.scheduler
24 .SchedulerDynamicEditException;
25 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
26 .FiCaSchedulerApp;
27
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 import java.io.IOException;
32 import java.util.ArrayList;
33 import java.util.Collections;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.Map;
37
38 /**
39 * Auto Creation enabled Parent queue. This queue initially does not have any
40 * children to start with and all child
41 * leaf queues will be auto created. Currently this does not allow other
42 * pre-configured leaf or parent queues to
43 * co-exist along with auto-created leaf queues. The auto creation is limited
44 * to leaf queues currently.
45 */
46 public class ManagedParentQueue extends AbstractManagedParentQueue {
47
48 private boolean shouldFailAutoCreationWhenGuaranteedCapacityExceeded = false;
49
50 private static final Logger LOG = LoggerFactory.getLogger(
51 ManagedParentQueue.class);
52
53 public ManagedParentQueue(final CapacitySchedulerContext cs,
54 final String queueName, final CSQueue parent, final CSQueue old)
55 throws IOException {
56 super(cs, queueName, parent, old);
57
58 shouldFailAutoCreationWhenGuaranteedCapacityExceeded =
59 csContext.getConfiguration()
60 .getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
61 getQueuePath());
62
63 leafQueueTemplate = initializeLeafQueueConfigs().build();
64
65 StringBuffer queueInfo = new StringBuffer();
66 queueInfo.append("Created Managed Parent Queue: ").append(queueName).append(
67 "]\nwith capacity: [").append(super.getCapacity()).append(
68 "]\nwith max capacity: [").append(super.getMaximumCapacity()).append(
69 "].");
70 LOG.info(queueInfo.toString());
71
72 initializeQueueManagementPolicy();
73 }
74
75 @Override
76 public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
77 throws IOException {
78
79 try {
80 writeLock.lock();
81 validate(newlyParsedQueue);
82
83 shouldFailAutoCreationWhenGuaranteedCapacityExceeded =
84 csContext.getConfiguration()
85 .getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
86 getQueuePath());
87
88 //validate if capacity is exceeded for child queues
89 if (shouldFailAutoCreationWhenGuaranteedCapacityExceeded) {
90 float childCap = sumOfChildCapacities();
91 if (getCapacity() < childCap) {
92 throw new IOException(
93 "Total of Auto Created leaf queues guaranteed capacity : "
94 + childCap + " exceeds Parent queue's " + getQueuePath()
95 + " guaranteed capacity " + getCapacity() + ""
96 + ".Cannot enforce policy to auto"
97 + " create queues beyond parent queue's capacity");
98 }
99 }
100
101 leafQueueTemplate = initializeLeafQueueConfigs().build();
102
103 super.reinitialize(newlyParsedQueue, clusterResource);
104
105 // run reinitialize on each existing queue, to trigger absolute cap
106 // recomputations
107 for (CSQueue res : this.getChildQueues()) {
108 res.reinitialize(res, clusterResource);
109 }
110
111 //clear state in policy
112 reinitializeQueueManagementPolicy();
113
114 //reassign capacities according to policy
115 final List<QueueManagementChange> queueManagementChanges =
116 queueManagementPolicy.computeQueueManagementChanges();
117
118 validateAndApplyQueueManagementChanges(queueManagementChanges);
119
120 StringBuffer queueInfo = new StringBuffer();
121 queueInfo.append("Reinitialized Managed Parent Queue: ").append(queueName)
122 .append("]\nwith capacity: [").append(super.getCapacity()).append(
123 "]\nwith max capacity: [").append(super.getMaximumCapacity()).append(
124 "].");
125 LOG.info(queueInfo.toString());
126 } catch (YarnException ye) {
127 LOG.error("Exception while computing policy changes for leaf queue : "
128 + getQueueName(), ye);
129 throw new IOException(ye);
130 } finally {
131 writeLock.unlock();
132 }
133 }
134
135 private void initializeQueueManagementPolicy() throws IOException {
136 queueManagementPolicy =
137 csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass(
138 getQueuePath());
139
140 queueManagementPolicy.init(csContext, this);
141 }
142
143 private void reinitializeQueueManagementPolicy() throws IOException {
144 AutoCreatedQueueManagementPolicy managementPolicy =
145 csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass(
146 getQueuePath());
147
148 if (!(managementPolicy.getClass().equals(
149 this.queueManagementPolicy.getClass()))) {
150 queueManagementPolicy = managementPolicy;
151 queueManagementPolicy.init(csContext, this);
152 } else{
153 queueManagementPolicy.reinitialize(csContext, this);
154 }
155 }
156
157 protected AutoCreatedLeafQueueConfig.Builder initializeLeafQueueConfigs() {
158
159 AutoCreatedLeafQueueConfig.Builder builder =
160 new AutoCreatedLeafQueueConfig.Builder();
161
162 String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(
163 csContext.getConfiguration());
164 //Load template configuration
165 builder.configuration(
166 super.initializeLeafQueueConfigs(leafQueueTemplateConfPrefix));
167
168 //Load template capacities
169 QueueCapacities queueCapacities = new QueueCapacities(false);
170 CSQueueUtils.loadUpdateAndCheckCapacities(csContext.getConfiguration()
171 .getAutoCreatedQueueTemplateConfPrefix(getQueuePath()),
172 csContext.getConfiguration(), queueCapacities, getQueueCapacities());
173 builder.capacities(queueCapacities);
174
175 return builder;
176 }
177
178 protected void validate(final CSQueue newlyParsedQueue) throws IOException {
179 // Sanity check
180 if (!(newlyParsedQueue instanceof ManagedParentQueue) || !newlyParsedQueue
181 .getQueuePath().equals(getQueuePath())) {
182 throw new IOException(
183 "Trying to reinitialize " + getQueuePath() + " from "
184 + newlyParsedQueue.getQueuePath());
185 }
186 }
187
188 @Override
189 public void addChildQueue(CSQueue childQueue)
190 throws SchedulerDynamicEditException, IOException {
191 try {
192 writeLock.lock();
193
194 if (childQueue == null || !(childQueue instanceof AutoCreatedLeafQueue)) {
195 throw new SchedulerDynamicEditException(
196 "Expected child queue to be an instance of AutoCreatedLeafQueue");
197 }
198
199 CapacitySchedulerConfiguration conf = csContext.getConfiguration();
200 ManagedParentQueue parentQueue =
201 (ManagedParentQueue) childQueue.getParent();
202
203 String leafQueueName = childQueue.getQueueName();
204 int maxQueues = conf.getAutoCreatedQueuesMaxChildQueuesLimit(
205 parentQueue.getQueuePath());
206
207 if (parentQueue.getChildQueues().size() >= maxQueues) {
208 throw new SchedulerDynamicEditException(
209 "Cannot auto create leaf queue " + leafQueueName + ".Max Child "
210 + "Queue limit exceeded which is configured as : " + maxQueues
211 + " and number of child queues is : " + parentQueue
212 .getChildQueues().size());
213 }
214
215 if (shouldFailAutoCreationWhenGuaranteedCapacityExceeded) {
216 if (getLeafQueueTemplate().getQueueCapacities().getAbsoluteCapacity()
217 + parentQueue.sumOfChildAbsCapacities() > parentQueue
218 .getAbsoluteCapacity()) {
219 throw new SchedulerDynamicEditException(
220 "Cannot auto create leaf queue " + leafQueueName + ". Child "
221 + "queues capacities have reached parent queue : "
222 + parentQueue.getQueuePath() + "'s guaranteed capacity");
223 }
224 }
225
226 AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue;
227 super.addChildQueue(leafQueue);
228 final AutoCreatedLeafQueueConfig initialLeafQueueTemplate =
229 queueManagementPolicy.getInitialLeafQueueConfiguration(leafQueue);
230
231 leafQueue.reinitializeFromTemplate(initialLeafQueueTemplate);
232 } finally {
233 writeLock.unlock();
234 }
235 }
236
237 public List<FiCaSchedulerApp> getScheduleableApplications() {
238 try {
239 readLock.lock();
240 List<FiCaSchedulerApp> apps = new ArrayList<>();
241 for (CSQueue childQueue : getChildQueues()) {
242 apps.addAll(((LeafQueue) childQueue).getApplications());
243 }
244 return Collections.unmodifiableList(apps);
245 } finally {
246 readLock.unlock();
247 }
248 }
249
250 public List<FiCaSchedulerApp> getPendingApplications() {
251 try {
252 readLock.lock();
253 List<FiCaSchedulerApp> apps = new ArrayList<>();
254 for (CSQueue childQueue : getChildQueues()) {
255 apps.addAll(((LeafQueue) childQueue).getPendingApplications());
256 }
257 return Collections.unmodifiableList(apps);
258 } finally {
259 readLock.unlock();
260 }
261 }
262
263 public List<FiCaSchedulerApp> getAllApplications() {
264 try {
265 readLock.lock();
266 List<FiCaSchedulerApp> apps = new ArrayList<>();
267 for (CSQueue childQueue : getChildQueues()) {
268 apps.addAll(((LeafQueue) childQueue).getAllApplications());
269 }
270 return Collections.unmodifiableList(apps);
271 } finally {
272 readLock.unlock();
273 }
274 }
275
276 public String getLeafQueueConfigPrefix(CapacitySchedulerConfiguration conf) {
277 return CapacitySchedulerConfiguration.PREFIX + conf
278 .getAutoCreatedQueueTemplateConfPrefix(getQueuePath());
279 }
280
281 public boolean shouldFailAutoCreationWhenGuaranteedCapacityExceeded() {
282 return shouldFailAutoCreationWhenGuaranteedCapacityExceeded;
283 }
284
285 /**
286 * Asynchronously called from scheduler to apply queue management changes
287 *
288 * @param queueManagementChanges
289 */
290 public void validateAndApplyQueueManagementChanges(
291 List<QueueManagementChange> queueManagementChanges)
292 throws IOException, SchedulerDynamicEditException {
293 try {
294 writeLock.lock();
295
296 validateQueueManagementChanges(queueManagementChanges);
297
298 applyQueueManagementChanges(queueManagementChanges);
299
300 AutoCreatedQueueManagementPolicy policy =
301 getAutoCreatedQueueManagementPolicy();
302
303 //acquires write lock on policy
304 policy.commitQueueManagementChanges(queueManagementChanges);
305
306 } finally {
307 writeLock.unlock();
308 }
309 }
310
311 public void validateQueueManagementChanges(
312 List<QueueManagementChange> queueManagementChanges)
313 throws SchedulerDynamicEditException {
314
315 for (QueueManagementChange queueManagementChange : queueManagementChanges) {
316
317 CSQueue childQueue = queueManagementChange.getQueue();
318
319 if (!(childQueue instanceof AutoCreatedLeafQueue)) {
320 throw new SchedulerDynamicEditException(
321 "queue should be " + "AutoCreatedLeafQueue. Found " + childQueue
322 .getClass());
323 }
324
325 if (!(AbstractManagedParentQueue.class.
326 isAssignableFrom(childQueue.getParent().getClass()))) {
327 LOG.error("Queue " + getQueueName()
328 + " is not an instance of PlanQueue or ManagedParentQueue." + " "
329 + "Ignoring update " + queueManagementChanges);
330 throw new SchedulerDynamicEditException(
331 "Queue " + getQueueName() + " is not a AutoEnabledParentQueue."
332 + " Ignoring update " + queueManagementChanges);
333 }
334
335 switch (queueManagementChange.getQueueAction()){
336 case UPDATE_QUEUE:
337 AutoCreatedLeafQueueConfig template =
338 queueManagementChange.getUpdatedQueueTemplate();
339 ((AutoCreatedLeafQueue) childQueue).validateConfigurations(template);
340 break;
341 }
342
343 }
344 }
345
346 private void applyQueueManagementChanges(
347 List<QueueManagementChange> queueManagementChanges)
348 throws SchedulerDynamicEditException, IOException {
349 for (QueueManagementChange queueManagementChange : queueManagementChanges) {
350 switch (queueManagementChange.getQueueAction()){
351 case UPDATE_QUEUE:
352 AutoCreatedLeafQueue childQueueToBeUpdated =
353 (AutoCreatedLeafQueue) queueManagementChange.getQueue();
354 //acquires write lock on leaf queue
355 childQueueToBeUpdated.reinitializeFromTemplate(
356 queueManagementChange.getUpdatedQueueTemplate());
357 break;
358 }
359 }
360 }
361
362 public CapacitySchedulerConfiguration getLeafQueueConfigs(
363 String leafQueueName) {
364 return getLeafQueueConfigs(getLeafQueueTemplate().getLeafQueueConfigs(),
365 leafQueueName);
366 }
367
368 public CapacitySchedulerConfiguration getLeafQueueConfigs(
369 CapacitySchedulerConfiguration templateConfig, String leafQueueName) {
370 CapacitySchedulerConfiguration leafQueueConfigTemplate = new
371 CapacitySchedulerConfiguration(new Configuration(false), false);
372 for (final Iterator<Map.Entry<String, String>> iterator =
373 templateConfig.iterator(); iterator.hasNext(); ) {
374 Map.Entry<String, String> confKeyValuePair = iterator.next();
375 final String name = confKeyValuePair.getKey().replaceFirst(
376 CapacitySchedulerConfiguration
377 .AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX,
378 leafQueueName);
379 leafQueueConfigTemplate.set(name, confKeyValuePair.getValue());
380 }
381 return leafQueueConfigTemplate;
382 }
383 }