YARN-8091. Revisit checkUserAccessToQueue RM REST API. (wangda)
[hadoop.git] / hadoop-yarn-project / hadoop-yarn / hadoop-yarn-server / hadoop-yarn-server-router / src / main / java / org / apache / hadoop / yarn / server / router / webapp / FederationInterceptorREST.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 * <p>
10 * http://www.apache.org/licenses/LICENSE-2.0
11 * <p>
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.yarn.server.router.webapp;
20
21 import java.io.IOException;
22 import java.security.Principal;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Random;
28 import java.util.Set;
29 import java.util.concurrent.Callable;
30 import java.util.concurrent.CompletionService;
31 import java.util.concurrent.ExecutorCompletionService;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Future;
34
35 import javax.servlet.http.HttpServletRequest;
36 import javax.servlet.http.HttpServletRequestWrapper;
37 import javax.servlet.http.HttpServletResponse;
38 import javax.ws.rs.core.HttpHeaders;
39 import javax.ws.rs.core.Response;
40 import javax.ws.rs.core.Response.Status;
41
42 import org.apache.commons.lang.NotImplementedException;
43 import org.apache.hadoop.conf.Configuration;
44 import org.apache.hadoop.security.authorize.AuthorizationException;
45 import org.apache.hadoop.util.ReflectionUtils;
46 import org.apache.hadoop.util.concurrent.HadoopExecutors;
47 import org.apache.hadoop.yarn.api.records.ApplicationId;
48 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
49 import org.apache.hadoop.yarn.conf.YarnConfiguration;
50 import org.apache.hadoop.yarn.exceptions.YarnException;
51 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
52 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
53 import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
54 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
55 import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
56 import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
57 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
58 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
59 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
60 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
61 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
62 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
63 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
64 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
65 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
66 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
67 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
68 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
69 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
70 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
71 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
72 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
73 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
74 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
75 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
76 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
77 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
78 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
79 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList;
80 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
81 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
82 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
83 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
84 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
85 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
86 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
87 import org.apache.hadoop.yarn.server.router.RouterMetrics;
88 import org.apache.hadoop.yarn.server.router.RouterServerUtil;
89 import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
90 import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
91 import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
92 import org.apache.hadoop.yarn.util.Clock;
93 import org.apache.hadoop.yarn.util.MonotonicClock;
94 import org.apache.hadoop.yarn.webapp.NotFoundException;
95 import org.slf4j.Logger;
96 import org.slf4j.LoggerFactory;
97
98 import com.google.common.annotations.VisibleForTesting;
99 import com.google.common.util.concurrent.ThreadFactoryBuilder;
100
101 /**
102 * Extends the {@code AbstractRESTRequestInterceptor} class and provides an
103 * implementation for federation of YARN RM and scaling an application across
104 * multiple YARN SubClusters. All the federation specific implementation is
105 * encapsulated in this class. This is always the last intercepter in the chain.
106 */
107 public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
108
109 private static final Logger LOG =
110 LoggerFactory.getLogger(FederationInterceptorREST.class);
111
112 private int numSubmitRetries;
113 private FederationStateStoreFacade federationFacade;
114 private Random rand;
115 private RouterPolicyFacade policyFacade;
116 private RouterMetrics routerMetrics;
117 private final Clock clock = new MonotonicClock();
118 private boolean returnPartialReport;
119
120 private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors;
121
122 /**
123 * Thread pool used for asynchronous operations.
124 */
125 private ExecutorService threadpool;
126
127 @Override
128 public void init(String user) {
129 federationFacade = FederationStateStoreFacade.getInstance();
130 rand = new Random();
131
132 final Configuration conf = this.getConf();
133
134 try {
135 SubClusterResolver subClusterResolver =
136 this.federationFacade.getSubClusterResolver();
137 policyFacade = new RouterPolicyFacade(
138 conf, federationFacade, subClusterResolver, null);
139 } catch (FederationPolicyInitializationException e) {
140 throw new YarnRuntimeException(e);
141 }
142
143 numSubmitRetries = conf.getInt(
144 YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
145 YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
146
147 interceptors = new HashMap<>();
148 routerMetrics = RouterMetrics.getMetrics();
149 threadpool = HadoopExecutors.newCachedThreadPool(
150 new ThreadFactoryBuilder()
151 .setNameFormat("FederationInterceptorREST #%d")
152 .build());
153
154 returnPartialReport = conf.getBoolean(
155 YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED,
156 YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED);
157 }
158
159 private SubClusterId getRandomActiveSubCluster(
160 Map<SubClusterId, SubClusterInfo> activeSubclusters,
161 List<SubClusterId> blackListSubClusters) throws YarnException {
162
163 if (activeSubclusters == null || activeSubclusters.size() < 1) {
164 RouterServerUtil.logAndThrowException(
165 FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
166 }
167 List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
168
169 FederationPolicyUtils.validateSubClusterAvailability(
170 list, blackListSubClusters);
171
172 if (blackListSubClusters != null) {
173
174 // Remove from the active SubClusters from StateStore the blacklisted ones
175 for (SubClusterId scId : blackListSubClusters) {
176 list.remove(scId);
177 }
178 }
179
180 return list.get(rand.nextInt(list.size()));
181 }
182
183 @VisibleForTesting
184 protected DefaultRequestInterceptorREST getInterceptorForSubCluster(
185 SubClusterId subClusterId) {
186 if (interceptors.containsKey(subClusterId)) {
187 return interceptors.get(subClusterId);
188 } else {
189 LOG.error(
190 "The interceptor for SubCluster {} does not exist in the cache.",
191 subClusterId);
192 return null;
193 }
194 }
195
196 private DefaultRequestInterceptorREST createInterceptorForSubCluster(
197 SubClusterId subClusterId, String webAppAddress) {
198
199 final Configuration conf = this.getConf();
200
201 String interceptorClassName = conf.get(
202 YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS,
203 YarnConfiguration.DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS);
204 DefaultRequestInterceptorREST interceptorInstance = null;
205 try {
206 Class<?> interceptorClass = conf.getClassByName(interceptorClassName);
207 if (DefaultRequestInterceptorREST.class
208 .isAssignableFrom(interceptorClass)) {
209 interceptorInstance = (DefaultRequestInterceptorREST) ReflectionUtils
210 .newInstance(interceptorClass, conf);
211
212 } else {
213 throw new YarnRuntimeException(
214 "Class: " + interceptorClassName + " not instance of "
215 + DefaultRequestInterceptorREST.class.getCanonicalName());
216 }
217 } catch (ClassNotFoundException e) {
218 throw new YarnRuntimeException(
219 "Could not instantiate ApplicationMasterRequestInterceptor: "
220 + interceptorClassName,
221 e);
222 }
223
224 interceptorInstance.setWebAppAddress("http://" + webAppAddress);
225 interceptorInstance.setSubClusterId(subClusterId);
226 interceptors.put(subClusterId, interceptorInstance);
227 return interceptorInstance;
228 }
229
230 @VisibleForTesting
231 protected DefaultRequestInterceptorREST getOrCreateInterceptorForSubCluster(
232 SubClusterId subClusterId, String webAppAddress) {
233 DefaultRequestInterceptorREST interceptor =
234 getInterceptorForSubCluster(subClusterId);
235 if (interceptor == null) {
236 interceptor = createInterceptorForSubCluster(subClusterId, webAppAddress);
237 }
238 return interceptor;
239 }
240
241 /**
242 * YARN Router forwards every getNewApplication requests to any RM. During
243 * this operation there will be no communication with the State Store. The
244 * Router will forward the requests to any SubCluster. The Router will retry
245 * to submit the request on #numSubmitRetries different SubClusters. The
246 * SubClusters are randomly chosen from the active ones.
247 * <p>
248 * Possible failures and behaviors:
249 * <p>
250 * Client: identical behavior as {@code RMWebServices}.
251 * <p>
252 * Router: the Client will timeout and resubmit.
253 * <p>
254 * ResourceManager: the Router will timeout and contacts another RM.
255 * <p>
256 * StateStore: not in the execution.
257 */
258 @Override
259 public Response createNewApplication(HttpServletRequest hsr)
260 throws AuthorizationException, IOException, InterruptedException {
261
262 long startTime = clock.getTime();
263
264 Map<SubClusterId, SubClusterInfo> subClustersActive;
265 try {
266 subClustersActive = federationFacade.getSubClusters(true);
267 } catch (YarnException e) {
268 routerMetrics.incrAppsFailedCreated();
269 return Response.status(Status.INTERNAL_SERVER_ERROR)
270 .entity(e.getLocalizedMessage()).build();
271 }
272
273 List<SubClusterId> blacklist = new ArrayList<SubClusterId>();
274
275 for (int i = 0; i < numSubmitRetries; ++i) {
276
277 SubClusterId subClusterId;
278 try {
279 subClusterId = getRandomActiveSubCluster(subClustersActive, blacklist);
280 } catch (YarnException e) {
281 routerMetrics.incrAppsFailedCreated();
282 return Response.status(Status.SERVICE_UNAVAILABLE)
283 .entity(e.getLocalizedMessage()).build();
284 }
285
286 LOG.debug("getNewApplication try #{} on SubCluster {}", i, subClusterId);
287
288 DefaultRequestInterceptorREST interceptor =
289 getOrCreateInterceptorForSubCluster(subClusterId,
290 subClustersActive.get(subClusterId).getRMWebServiceAddress());
291 Response response = null;
292 try {
293 response = interceptor.createNewApplication(hsr);
294 } catch (Exception e) {
295 LOG.warn("Unable to create a new ApplicationId in SubCluster {}",
296 subClusterId.getId(), e);
297 }
298
299 if (response != null &&
300 response.getStatus() == HttpServletResponse.SC_OK) {
301
302 long stopTime = clock.getTime();
303 routerMetrics.succeededAppsCreated(stopTime - startTime);
304
305 return response;
306 } else {
307 // Empty response from the ResourceManager.
308 // Blacklist this subcluster for this request.
309 blacklist.add(subClusterId);
310 }
311 }
312
313 String errMsg = "Fail to create a new application.";
314 LOG.error(errMsg);
315 routerMetrics.incrAppsFailedCreated();
316 return Response
317 .status(Status.INTERNAL_SERVER_ERROR)
318 .entity(errMsg)
319 .build();
320 }
321
322 /**
323 * Today, in YARN there are no checks of any applicationId submitted.
324 * <p>
325 * Base scenarios:
326 * <p>
327 * The Client submits an application to the Router. • The Router selects one
328 * SubCluster to forward the request. • The Router inserts a tuple into
329 * StateStore with the selected SubCluster (e.g. SC1) and the appId. • The
330 * State Store replies with the selected SubCluster (e.g. SC1). • The Router
331 * submits the request to the selected SubCluster.
332 * <p>
333 * In case of State Store failure:
334 * <p>
335 * The client submits an application to the Router. • The Router selects one
336 * SubCluster to forward the request. • The Router inserts a tuple into State
337 * Store with the selected SubCluster (e.g. SC1) and the appId. • Due to the
338 * State Store down the Router times out and it will retry depending on the
339 * FederationFacade settings. • The Router replies to the client with an error
340 * message.
341 * <p>
342 * If State Store fails after inserting the tuple: identical behavior as
343 * {@code RMWebServices}.
344 * <p>
345 * In case of Router failure:
346 * <p>
347 * Scenario 1 – Crash before submission to the ResourceManager
348 * <p>
349 * The Client submits an application to the Router. • The Router selects one
350 * SubCluster to forward the request. • The Router inserts a tuple into State
351 * Store with the selected SubCluster (e.g. SC1) and the appId. • The Router
352 * crashes. • The Client timeouts and resubmits the application. • The Router
353 * selects one SubCluster to forward the request. • The Router inserts a tuple
354 * into State Store with the selected SubCluster (e.g. SC2) and the appId. •
355 * Because the tuple is already inserted in the State Store, it returns the
356 * previous selected SubCluster (e.g. SC1). • The Router submits the request
357 * to the selected SubCluster (e.g. SC1).
358 * <p>
359 * Scenario 2 – Crash after submission to the ResourceManager
360 * <p>
361 * • The Client submits an application to the Router. • The Router selects one
362 * SubCluster to forward the request. • The Router inserts a tuple into State
363 * Store with the selected SubCluster (e.g. SC1) and the appId. • The Router
364 * submits the request to the selected SubCluster. • The Router crashes. • The
365 * Client timeouts and resubmit the application. • The Router selects one
366 * SubCluster to forward the request. • The Router inserts a tuple into State
367 * Store with the selected SubCluster (e.g. SC2) and the appId. • The State
368 * Store replies with the selected SubCluster (e.g. SC1). • The Router submits
369 * the request to the selected SubCluster (e.g. SC1). When a client re-submits
370 * the same application to the same RM, it does not raise an exception and
371 * replies with operation successful message.
372 * <p>
373 * In case of Client failure: identical behavior as {@code RMWebServices}.
374 * <p>
375 * In case of ResourceManager failure:
376 * <p>
377 * The Client submits an application to the Router. • The Router selects one
378 * SubCluster to forward the request. • The Router inserts a tuple into State
379 * Store with the selected SubCluster (e.g. SC1) and the appId. • The Router
380 * submits the request to the selected SubCluster. • The entire SubCluster is
381 * down – all the RMs in HA or the master RM is not reachable. • The Router
382 * times out. • The Router selects a new SubCluster to forward the request. •
383 * The Router update a tuple into State Store with the selected SubCluster
384 * (e.g. SC2) and the appId. • The State Store replies with OK answer. • The
385 * Router submits the request to the selected SubCluster (e.g. SC2).
386 */
387 @Override
388 public Response submitApplication(ApplicationSubmissionContextInfo newApp,
389 HttpServletRequest hsr)
390 throws AuthorizationException, IOException, InterruptedException {
391
392 long startTime = clock.getTime();
393
394 if (newApp == null || newApp.getApplicationId() == null) {
395 routerMetrics.incrAppsFailedSubmitted();
396 String errMsg = "Missing ApplicationSubmissionContextInfo or "
397 + "applicationSubmissionContex information.";
398 return Response
399 .status(Status.BAD_REQUEST)
400 .entity(errMsg)
401 .build();
402 }
403
404 ApplicationId applicationId = null;
405 try {
406 applicationId = ApplicationId.fromString(newApp.getApplicationId());
407 } catch (IllegalArgumentException e) {
408 routerMetrics.incrAppsFailedSubmitted();
409 return Response
410 .status(Status.BAD_REQUEST)
411 .entity(e.getLocalizedMessage())
412 .build();
413 }
414
415 List<SubClusterId> blacklist = new ArrayList<SubClusterId>();
416
417 for (int i = 0; i < numSubmitRetries; ++i) {
418
419 ApplicationSubmissionContext context =
420 RMWebAppUtil.createAppSubmissionContext(newApp, this.getConf());
421
422 SubClusterId subClusterId = null;
423 try {
424 subClusterId = policyFacade.getHomeSubcluster(context, blacklist);
425 } catch (YarnException e) {
426 routerMetrics.incrAppsFailedSubmitted();
427 return Response
428 .status(Status.SERVICE_UNAVAILABLE)
429 .entity(e.getLocalizedMessage())
430 .build();
431 }
432 LOG.info("submitApplication appId {} try #{} on SubCluster {}",
433 applicationId, i, subClusterId);
434
435 ApplicationHomeSubCluster appHomeSubCluster =
436 ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
437
438 if (i == 0) {
439 try {
440 // persist the mapping of applicationId and the subClusterId which has
441 // been selected as its home
442 subClusterId =
443 federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
444 } catch (YarnException e) {
445 routerMetrics.incrAppsFailedSubmitted();
446 String errMsg = "Unable to insert the ApplicationId " + applicationId
447 + " into the FederationStateStore";
448 return Response
449 .status(Status.SERVICE_UNAVAILABLE)
450 .entity(errMsg + " " + e.getLocalizedMessage())
451 .build();
452 }
453 } else {
454 try {
455 // update the mapping of applicationId and the home subClusterId to
456 // the new subClusterId we have selected
457 federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster);
458 } catch (YarnException e) {
459 String errMsg = "Unable to update the ApplicationId " + applicationId
460 + " into the FederationStateStore";
461 SubClusterId subClusterIdInStateStore;
462 try {
463 subClusterIdInStateStore =
464 federationFacade.getApplicationHomeSubCluster(applicationId);
465 } catch (YarnException e1) {
466 routerMetrics.incrAppsFailedSubmitted();
467 return Response
468 .status(Status.SERVICE_UNAVAILABLE)
469 .entity(e1.getLocalizedMessage())
470 .build();
471 }
472 if (subClusterId == subClusterIdInStateStore) {
473 LOG.info("Application {} already submitted on SubCluster {}",
474 applicationId, subClusterId);
475 } else {
476 routerMetrics.incrAppsFailedSubmitted();
477 return Response
478 .status(Status.SERVICE_UNAVAILABLE)
479 .entity(errMsg)
480 .build();
481 }
482 }
483 }
484
485 SubClusterInfo subClusterInfo;
486 try {
487 subClusterInfo = federationFacade.getSubCluster(subClusterId);
488 } catch (YarnException e) {
489 routerMetrics.incrAppsFailedSubmitted();
490 return Response
491 .status(Status.SERVICE_UNAVAILABLE)
492 .entity(e.getLocalizedMessage())
493 .build();
494 }
495
496 Response response = null;
497 try {
498 response = getOrCreateInterceptorForSubCluster(subClusterId,
499 subClusterInfo.getRMWebServiceAddress()).submitApplication(newApp,
500 hsr);
501 } catch (Exception e) {
502 LOG.warn("Unable to submit the application {} to SubCluster {}",
503 applicationId, subClusterId.getId(), e);
504 }
505
506 if (response != null &&
507 response.getStatus() == HttpServletResponse.SC_ACCEPTED) {
508 LOG.info("Application {} with appId {} submitted on {}",
509 context.getApplicationName(), applicationId, subClusterId);
510
511 long stopTime = clock.getTime();
512 routerMetrics.succeededAppsSubmitted(stopTime - startTime);
513
514 return response;
515 } else {
516 // Empty response from the ResourceManager.
517 // Blacklist this subcluster for this request.
518 blacklist.add(subClusterId);
519 }
520 }
521
522 routerMetrics.incrAppsFailedSubmitted();
523 String errMsg = "Application " + newApp.getApplicationName()
524 + " with appId " + applicationId + " failed to be submitted.";
525 LOG.error(errMsg);
526 return Response
527 .status(Status.SERVICE_UNAVAILABLE)
528 .entity(errMsg)
529 .build();
530 }
531
532 /**
533 * The YARN Router will forward to the respective YARN RM in which the AM is
534 * running.
535 * <p>
536 * Possible failure:
537 * <p>
538 * Client: identical behavior as {@code RMWebServices}.
539 * <p>
540 * Router: the Client will timeout and resubmit the request.
541 * <p>
542 * ResourceManager: the Router will timeout and the call will fail.
543 * <p>
544 * State Store: the Router will timeout and it will retry depending on the
545 * FederationFacade settings - if the failure happened before the select
546 * operation.
547 */
548 @Override
549 public AppInfo getApp(HttpServletRequest hsr, String appId,
550 Set<String> unselectedFields) {
551
552 long startTime = clock.getTime();
553
554 ApplicationId applicationId = null;
555 try {
556 applicationId = ApplicationId.fromString(appId);
557 } catch (IllegalArgumentException e) {
558 routerMetrics.incrAppsFailedRetrieved();
559 return null;
560 }
561
562 SubClusterInfo subClusterInfo = null;
563 SubClusterId subClusterId = null;
564 try {
565 subClusterId =
566 federationFacade.getApplicationHomeSubCluster(applicationId);
567 if (subClusterId == null) {
568 routerMetrics.incrAppsFailedRetrieved();
569 return null;
570 }
571 subClusterInfo = federationFacade.getSubCluster(subClusterId);
572 } catch (YarnException e) {
573 routerMetrics.incrAppsFailedRetrieved();
574 return null;
575 }
576
577 DefaultRequestInterceptorREST interceptor =
578 getOrCreateInterceptorForSubCluster(
579 subClusterId, subClusterInfo.getRMWebServiceAddress());
580 AppInfo response = interceptor.getApp(hsr, appId, unselectedFields);
581
582 long stopTime = clock.getTime();
583 routerMetrics.succeededAppsRetrieved(stopTime - startTime);
584
585 return response;
586 }
587
588 /**
589 * The YARN Router will forward to the respective YARN RM in which the AM is
590 * running.
591 * <p>
592 * Possible failures and behaviors:
593 * <p>
594 * Client: identical behavior as {@code RMWebServices}.
595 * <p>
596 * Router: the Client will timeout and resubmit the request.
597 * <p>
598 * ResourceManager: the Router will timeout and the call will fail.
599 * <p>
600 * State Store: the Router will timeout and it will retry depending on the
601 * FederationFacade settings - if the failure happened before the select
602 * operation.
603 */
604 @Override
605 public Response updateAppState(AppState targetState, HttpServletRequest hsr,
606 String appId) throws AuthorizationException, YarnException,
607 InterruptedException, IOException {
608
609 long startTime = clock.getTime();
610
611 ApplicationId applicationId = null;
612 try {
613 applicationId = ApplicationId.fromString(appId);
614 } catch (IllegalArgumentException e) {
615 routerMetrics.incrAppsFailedKilled();
616 return Response
617 .status(Status.BAD_REQUEST)
618 .entity(e.getLocalizedMessage())
619 .build();
620 }
621
622 SubClusterInfo subClusterInfo = null;
623 SubClusterId subClusterId = null;
624 try {
625 subClusterId =
626 federationFacade.getApplicationHomeSubCluster(applicationId);
627 subClusterInfo = federationFacade.getSubCluster(subClusterId);
628 } catch (YarnException e) {
629 routerMetrics.incrAppsFailedKilled();
630 return Response
631 .status(Status.BAD_REQUEST)
632 .entity(e.getLocalizedMessage())
633 .build();
634 }
635
636 Response response = getOrCreateInterceptorForSubCluster(subClusterId,
637 subClusterInfo.getRMWebServiceAddress()).updateAppState(targetState,
638 hsr, appId);
639
640 long stopTime = clock.getTime();
641 routerMetrics.succeededAppsRetrieved(stopTime - startTime);
642
643 return response;
644 }
645
646 /**
647 * The YARN Router will forward the request to all the YARN RMs in parallel,
648 * after that it will group all the ApplicationReports by the ApplicationId.
649 * <p>
650 * Possible failure:
651 * <p>
652 * Client: identical behavior as {@code RMWebServices}.
653 * <p>
654 * Router: the Client will timeout and resubmit the request.
655 * <p>
656 * ResourceManager: the Router calls each YARN RM in parallel by using one
657 * thread for each YARN RM. In case a YARN RM fails, a single call will
658 * timeout. However the Router will merge the ApplicationReports it got, and
659 * provides a partial list to the client.
660 * <p>
661 * State Store: the Router will timeout and it will retry depending on the
662 * FederationFacade settings - if the failure happened before the select
663 * operation.
664 */
665 @Override
666 public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
667 Set<String> statesQuery, String finalStatusQuery, String userQuery,
668 String queueQuery, String count, String startedBegin, String startedEnd,
669 String finishBegin, String finishEnd, Set<String> applicationTypes,
670 Set<String> applicationTags, Set<String> unselectedFields) {
671 AppsInfo apps = new AppsInfo();
672 long startTime = clock.getTime();
673
674 Map<SubClusterId, SubClusterInfo> subClustersActive = null;
675 try {
676 subClustersActive = federationFacade.getSubClusters(true);
677 } catch (YarnException e) {
678 routerMetrics.incrMultipleAppsFailedRetrieved();
679 return null;
680 }
681
682 // Send the requests in parallel
683 CompletionService<AppsInfo> compSvc =
684 new ExecutorCompletionService<>(this.threadpool);
685
686 // HttpServletRequest does not work with ExecutorCompletionService.
687 // Create a duplicate hsr.
688 final HttpServletRequest hsrCopy = clone(hsr);
689 for (final SubClusterInfo info : subClustersActive.values()) {
690 compSvc.submit(new Callable<AppsInfo>() {
691 @Override
692 public AppsInfo call() {
693 DefaultRequestInterceptorREST interceptor =
694 getOrCreateInterceptorForSubCluster(
695 info.getSubClusterId(), info.getRMWebServiceAddress());
696 AppsInfo rmApps = interceptor.getApps(hsrCopy, stateQuery,
697 statesQuery, finalStatusQuery, userQuery, queueQuery, count,
698 startedBegin, startedEnd, finishBegin, finishEnd,
699 applicationTypes, applicationTags, unselectedFields);
700
701 if (rmApps == null) {
702 routerMetrics.incrMultipleAppsFailedRetrieved();
703 LOG.error("Subcluster {} failed to return appReport.",
704 info.getSubClusterId());
705 return null;
706 }
707 return rmApps;
708 }
709 });
710 }
711
712 // Collect all the responses in parallel
713 for (int i = 0; i < subClustersActive.size(); i++) {
714 try {
715 Future<AppsInfo> future = compSvc.take();
716 AppsInfo appsResponse = future.get();
717
718 long stopTime = clock.getTime();
719 routerMetrics.succeededMultipleAppsRetrieved(stopTime - startTime);
720
721 if (appsResponse != null) {
722 apps.addAll(appsResponse.getApps());
723 }
724 } catch (Throwable e) {
725 routerMetrics.incrMultipleAppsFailedRetrieved();
726 LOG.warn("Failed to get application report", e);
727 }
728 }
729
730 if (apps.getApps().isEmpty()) {
731 return null;
732 }
733
734 // Merge all the application reports got from all the available YARN RMs
735 return RouterWebServiceUtil.mergeAppsInfo(
736 apps.getApps(), returnPartialReport);
737 }
738
739 /**
740 * Get a copy of a HTTP request. This is for thread safety.
741 * @param hsr HTTP servlet request to copy.
742 * @return Copy of the HTTP request.
743 */
744 private HttpServletRequestWrapper clone(final HttpServletRequest hsr) {
745 if (hsr == null) {
746 return null;
747 }
748 @SuppressWarnings("unchecked")
749 final Map<String, String[]> parameterMap =
750 (Map<String, String[]>) hsr.getParameterMap();
751 final String pathInfo = hsr.getPathInfo();
752 final String user = hsr.getRemoteUser();
753 final Principal principal = hsr.getUserPrincipal();
754 final String mediaType =
755 RouterWebServiceUtil.getMediaTypeFromHttpServletRequest(
756 hsr, AppsInfo.class);
757 return new HttpServletRequestWrapper(hsr) {
758 public Map<String, String[]> getParameterMap() {
759 return parameterMap;
760 }
761 public String getPathInfo() {
762 return pathInfo;
763 }
764 public String getRemoteUser() {
765 return user;
766 }
767 public Principal getUserPrincipal() {
768 return principal;
769 }
770 public String getHeader(String value) {
771 // we override only Accept
772 if (value.equals(HttpHeaders.ACCEPT)) {
773 return mediaType;
774 }
775 return null;
776 }
777 };
778 }
779
780 /**
781 * The YARN Router will forward to the request to all the SubClusters to find
782 * where the node is running.
783 * <p>
784 * Possible failure:
785 * <p>
786 * Client: identical behavior as {@code RMWebServices}.
787 * <p>
788 * Router: the Client will timeout and resubmit the request.
789 * <p>
790 * ResourceManager: the Router will timeout and the call will fail.
791 * <p>
792 * State Store: the Router will timeout and it will retry depending on the
793 * FederationFacade settings - if the failure happened before the select
794 * operation.
795 */
796 @Override
797 public NodeInfo getNode(String nodeId) {
798 Map<SubClusterId, SubClusterInfo> subClustersActive = null;
799 try {
800 subClustersActive = federationFacade.getSubClusters(true);
801 } catch (YarnException e) {
802 throw new NotFoundException(e.getMessage());
803 }
804
805 if (subClustersActive.isEmpty()) {
806 throw new NotFoundException(
807 FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
808 }
809
810 // Send the requests in parallel
811 CompletionService<NodeInfo> compSvc =
812 new ExecutorCompletionService<NodeInfo>(this.threadpool);
813
814 for (final SubClusterInfo info : subClustersActive.values()) {
815 compSvc.submit(new Callable<NodeInfo>() {
816 @Override
817 public NodeInfo call() {
818 DefaultRequestInterceptorREST interceptor =
819 getOrCreateInterceptorForSubCluster(
820 info.getSubClusterId(), info.getRMWebServiceAddress());
821 try {
822 NodeInfo nodeInfo = interceptor.getNode(nodeId);
823 return nodeInfo;
824 } catch (Exception e) {
825 LOG.error("Subcluster {} failed to return nodeInfo.",
826 info.getSubClusterId());
827 return null;
828 }
829 }
830 });
831 }
832
833 // Collect all the responses in parallel
834 NodeInfo nodeInfo = null;
835 for (int i = 0; i < subClustersActive.size(); i++) {
836 try {
837 Future<NodeInfo> future = compSvc.take();
838 NodeInfo nodeResponse = future.get();
839
840 // Check if the node was found in this SubCluster
841 if (nodeResponse != null) {
842 // Check if the node was already found in a different SubCluster and
843 // it has an old health report
844 if (nodeInfo == null || nodeInfo.getLastHealthUpdate() <
845 nodeResponse.getLastHealthUpdate()) {
846 nodeInfo = nodeResponse;
847 }
848 }
849 } catch (Throwable e) {
850 LOG.warn("Failed to get node report ", e);
851 }
852 }
853 if (nodeInfo == null) {
854 throw new NotFoundException("nodeId, " + nodeId + ", is not found");
855 }
856 return nodeInfo;
857 }
858
859 /**
860 * The YARN Router will forward the request to all the YARN RMs in parallel,
861 * after that it will remove all the duplicated NodeInfo by using the NodeId.
862 * <p>
863 * Possible failure:
864 * <p>
865 * Client: identical behavior as {@code RMWebServices}.
866 * <p>
867 * Router: the Client will timeout and resubmit the request.
868 * <p>
869 * ResourceManager: the Router calls each YARN RM in parallel by using one
870 * thread for each YARN RM. In case a YARN RM fails, a single call will
871 * timeout. However the Router will use the NodesInfo it got, and provides a
872 * partial list to the client.
873 * <p>
874 * State Store: the Router will timeout and it will retry depending on the
875 * FederationFacade settings - if the failure happened before the select
876 * operation.
877 */
878 @Override
879 public NodesInfo getNodes(String states) {
880
881 NodesInfo nodes = new NodesInfo();
882
883 Map<SubClusterId, SubClusterInfo> subClustersActive = null;
884 try {
885 subClustersActive = federationFacade.getSubClusters(true);
886 } catch (YarnException e) {
887 LOG.error("Cannot get nodes: {}", e.getMessage());
888 return new NodesInfo();
889 }
890
891 // Send the requests in parallel
892 CompletionService<NodesInfo> compSvc =
893 new ExecutorCompletionService<NodesInfo>(this.threadpool);
894
895 for (final SubClusterInfo info : subClustersActive.values()) {
896 compSvc.submit(new Callable<NodesInfo>() {
897 @Override
898 public NodesInfo call() {
899 DefaultRequestInterceptorREST interceptor =
900 getOrCreateInterceptorForSubCluster(
901 info.getSubClusterId(), info.getRMWebServiceAddress());
902 try {
903 NodesInfo nodesInfo = interceptor.getNodes(states);
904 return nodesInfo;
905 } catch (Exception e) {
906 LOG.error("Subcluster {} failed to return nodesInfo.",
907 info.getSubClusterId());
908 return null;
909 }
910 }
911 });
912 }
913
914 // Collect all the responses in parallel
915
916 for (int i = 0; i < subClustersActive.size(); i++) {
917 try {
918 Future<NodesInfo> future = compSvc.take();
919 NodesInfo nodesResponse = future.get();
920
921 if (nodesResponse != null) {
922 nodes.addAll(nodesResponse.getNodes());
923 }
924 } catch (Throwable e) {
925 LOG.warn("Failed to get nodes report ", e);
926 }
927 }
928
929 // Delete duplicate from all the node reports got from all the available
930 // YARN RMs. Nodes can be moved from one subclusters to another. In this
931 // operation they result LOST/RUNNING in the previous SubCluster and
932 // NEW/RUNNING in the new one.
933
934 return RouterWebServiceUtil.deleteDuplicateNodesInfo(nodes.getNodes());
935 }
936
937 @Override
938 public ClusterMetricsInfo getClusterMetricsInfo() {
939 ClusterMetricsInfo metrics = new ClusterMetricsInfo();
940
941 Map<SubClusterId, SubClusterInfo> subClustersActive = null;
942 try {
943 subClustersActive = federationFacade.getSubClusters(true);
944 } catch (YarnException e) {
945 LOG.error(e.getLocalizedMessage());
946 return metrics;
947 }
948
949 // Send the requests in parallel
950 CompletionService<ClusterMetricsInfo> compSvc =
951 new ExecutorCompletionService<ClusterMetricsInfo>(this.threadpool);
952
953 for (final SubClusterInfo info : subClustersActive.values()) {
954 compSvc.submit(new Callable<ClusterMetricsInfo>() {
955 @Override
956 public ClusterMetricsInfo call() {
957 DefaultRequestInterceptorREST interceptor =
958 getOrCreateInterceptorForSubCluster(
959 info.getSubClusterId(), info.getRMWebServiceAddress());
960 try {
961 ClusterMetricsInfo metrics = interceptor.getClusterMetricsInfo();
962 return metrics;
963 } catch (Exception e) {
964 LOG.error("Subcluster {} failed to return Cluster Metrics.",
965 info.getSubClusterId());
966 return null;
967 }
968 }
969 });
970 }
971
972 // Collect all the responses in parallel
973
974 for (int i = 0; i < subClustersActive.size(); i++) {
975 try {
976 Future<ClusterMetricsInfo> future = compSvc.take();
977 ClusterMetricsInfo metricsResponse = future.get();
978
979 if (metricsResponse != null) {
980 RouterWebServiceUtil.mergeMetrics(metrics, metricsResponse);
981 }
982 } catch (Throwable e) {
983 LOG.warn("Failed to get nodes report ", e);
984 }
985 }
986
987 return metrics;
988 }
989
990 @Override
991 public ClusterInfo get() {
992 return getClusterInfo();
993 }
994
995 @Override
996 public ClusterInfo getClusterInfo() {
997 throw new NotImplementedException();
998 }
999
1000 @Override
1001 public SchedulerTypeInfo getSchedulerInfo() {
1002 throw new NotImplementedException();
1003 }
1004
1005 @Override
1006 public String dumpSchedulerLogs(String time, HttpServletRequest hsr)
1007 throws IOException {
1008 throw new NotImplementedException();
1009 }
1010
1011 @Override
1012 public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId) {
1013 throw new NotImplementedException();
1014 }
1015
1016 @Override
1017 public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
1018 String appId, String time) {
1019 throw new NotImplementedException();
1020 }
1021
1022 @Override
1023 public ApplicationStatisticsInfo getAppStatistics(HttpServletRequest hsr,
1024 Set<String> stateQueries, Set<String> typeQueries) {
1025 throw new NotImplementedException();
1026 }
1027
1028 @Override
1029 public AppState getAppState(HttpServletRequest hsr, String appId)
1030 throws AuthorizationException {
1031 throw new NotImplementedException();
1032 }
1033
1034 @Override
1035 public NodeToLabelsInfo getNodeToLabels(HttpServletRequest hsr)
1036 throws IOException {
1037 throw new NotImplementedException();
1038 }
1039
1040 @Override
1041 public LabelsToNodesInfo getLabelsToNodes(Set<String> labels)
1042 throws IOException {
1043 throw new NotImplementedException();
1044 }
1045
1046 @Override
1047 public Response replaceLabelsOnNodes(NodeToLabelsEntryList newNodeToLabels,
1048 HttpServletRequest hsr) throws IOException {
1049 throw new NotImplementedException();
1050 }
1051
1052 @Override
1053 public Response replaceLabelsOnNode(Set<String> newNodeLabelsName,
1054 HttpServletRequest hsr, String nodeId) throws Exception {
1055 throw new NotImplementedException();
1056 }
1057
1058 @Override
1059 public NodeLabelsInfo getClusterNodeLabels(HttpServletRequest hsr)
1060 throws IOException {
1061 throw new NotImplementedException();
1062 }
1063
1064 @Override
1065 public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels,
1066 HttpServletRequest hsr) throws Exception {
1067 throw new NotImplementedException();
1068 }
1069
1070 @Override
1071 public Response removeFromCluserNodeLabels(Set<String> oldNodeLabels,
1072 HttpServletRequest hsr) throws Exception {
1073 throw new NotImplementedException();
1074 }
1075
1076 @Override
1077 public NodeLabelsInfo getLabelsOnNode(HttpServletRequest hsr, String nodeId)
1078 throws IOException {
1079 throw new NotImplementedException();
1080 }
1081
1082 @Override
1083 public AppPriority getAppPriority(HttpServletRequest hsr, String appId)
1084 throws AuthorizationException {
1085 throw new NotImplementedException();
1086 }
1087
1088 @Override
1089 public Response updateApplicationPriority(AppPriority targetPriority,
1090 HttpServletRequest hsr, String appId) throws AuthorizationException,
1091 YarnException, InterruptedException, IOException {
1092 throw new NotImplementedException();
1093 }
1094
1095 @Override
1096 public AppQueue getAppQueue(HttpServletRequest hsr, String appId)
1097 throws AuthorizationException {
1098 throw new NotImplementedException();
1099 }
1100
1101 @Override
1102 public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr,
1103 String appId) throws AuthorizationException, YarnException,
1104 InterruptedException, IOException {
1105 throw new NotImplementedException();
1106 }
1107
1108 @Override
1109 public Response postDelegationToken(DelegationToken tokenData,
1110 HttpServletRequest hsr) throws AuthorizationException, IOException,
1111 InterruptedException, Exception {
1112 throw new NotImplementedException();
1113 }
1114
1115 @Override
1116 public Response postDelegationTokenExpiration(HttpServletRequest hsr)
1117 throws AuthorizationException, IOException, InterruptedException,
1118 Exception {
1119 throw new NotImplementedException();
1120 }
1121
1122 @Override
1123 public Response cancelDelegationToken(HttpServletRequest hsr)
1124 throws AuthorizationException, IOException, InterruptedException,
1125 Exception {
1126 throw new NotImplementedException();
1127 }
1128
1129 @Override
1130 public Response createNewReservation(HttpServletRequest hsr)
1131 throws AuthorizationException, IOException, InterruptedException {
1132 throw new NotImplementedException();
1133 }
1134
1135 @Override
1136 public Response submitReservation(ReservationSubmissionRequestInfo resContext,
1137 HttpServletRequest hsr)
1138 throws AuthorizationException, IOException, InterruptedException {
1139 throw new NotImplementedException();
1140 }
1141
1142 @Override
1143 public Response updateReservation(ReservationUpdateRequestInfo resContext,
1144 HttpServletRequest hsr)
1145 throws AuthorizationException, IOException, InterruptedException {
1146 throw new NotImplementedException();
1147 }
1148
1149 @Override
1150 public Response deleteReservation(ReservationDeleteRequestInfo resContext,
1151 HttpServletRequest hsr)
1152 throws AuthorizationException, IOException, InterruptedException {
1153 throw new NotImplementedException();
1154 }
1155
1156 @Override
1157 public Response listReservation(String queue, String reservationId,
1158 long startTime, long endTime, boolean includeResourceAllocations,
1159 HttpServletRequest hsr) throws Exception {
1160 throw new NotImplementedException();
1161 }
1162
1163 @Override
1164 public AppTimeoutInfo getAppTimeout(HttpServletRequest hsr, String appId,
1165 String type) throws AuthorizationException {
1166 throw new NotImplementedException();
1167 }
1168
1169 @Override
1170 public AppTimeoutsInfo getAppTimeouts(HttpServletRequest hsr, String appId)
1171 throws AuthorizationException {
1172 throw new NotImplementedException();
1173 }
1174
1175 @Override
1176 public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,
1177 HttpServletRequest hsr, String appId) throws AuthorizationException,
1178 YarnException, InterruptedException, IOException {
1179 throw new NotImplementedException();
1180 }
1181
1182 @Override
1183 public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
1184 throw new NotImplementedException();
1185 }
1186
1187 @Override
1188 public RMQueueAclInfo checkUserAccessToQueue(String queue, String username,
1189 String queueAclType, HttpServletRequest hsr) {
1190 throw new NotImplementedException();
1191 }
1192
1193 @Override
1194 public AppAttemptInfo getAppAttempt(HttpServletRequest req,
1195 HttpServletResponse res, String appId, String appAttemptId) {
1196 throw new NotImplementedException();
1197 }
1198
1199 @Override
1200 public ContainersInfo getContainers(HttpServletRequest req,
1201 HttpServletResponse res, String appId, String appAttemptId) {
1202 throw new NotImplementedException();
1203 }
1204
1205 @Override
1206 public ContainerInfo getContainer(HttpServletRequest req,
1207 HttpServletResponse res, String appId, String appAttemptId,
1208 String containerId) {
1209 throw new NotImplementedException();
1210 }
1211
1212 @Override
1213 public void setNextInterceptor(RESTRequestInterceptor next) {
1214 throw new YarnRuntimeException("setNextInterceptor is being called on "
1215 + "FederationInterceptorREST, which should be the last one "
1216 + "in the chain. Check if the interceptor pipeline configuration "
1217 + "is correct");
1218 }
1219
1220 @Override
1221 public void shutdown() {
1222 if (threadpool != null) {
1223 threadpool.shutdown();
1224 }
1225 }
1226 }