YARN-7574. Add support for Node Labels on Auto Created Leaf Queue Template. Contribut...
[hadoop.git] / hadoop-yarn-project / hadoop-yarn / hadoop-yarn-server / hadoop-yarn-server-resourcemanager / src / test / java / org / apache / hadoop / yarn / server / resourcemanager / MockNM.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.yarn.server.resourcemanager;
20
21 import java.util.ArrayList;
22 import java.util.Arrays;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentHashMap;
29
30 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
31 import org.apache.hadoop.yarn.api.records.ApplicationId;
32 import org.apache.hadoop.yarn.api.records.Container;
33 import org.apache.hadoop.yarn.api.records.ContainerId;
34 import org.apache.hadoop.yarn.api.records.ContainerState;
35 import org.apache.hadoop.yarn.api.records.ContainerStatus;
36 import org.apache.hadoop.yarn.api.records.NodeId;
37 import org.apache.hadoop.yarn.api.records.NodeLabel;
38 import org.apache.hadoop.yarn.api.records.Resource;
39 import org.apache.hadoop.yarn.conf.YarnConfiguration;
40 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
41 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
42 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
43 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
44 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
45 import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
46 import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
47 import org.apache.hadoop.yarn.server.api.records.MasterKey;
48 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
49 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
50 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
51 import org.apache.hadoop.yarn.util.Records;
52 import org.apache.hadoop.yarn.util.YarnVersionInfo;
53 import org.apache.hadoop.yarn.util.resource.Resources;
54 import org.eclipse.jetty.util.log.Log;
55
56 public class MockNM {
57
58 private int responseId;
59 private NodeId nodeId;
60 private Resource capability;
61 private ResourceTrackerService resourceTracker;
62 private int httpPort = 2;
63 private MasterKey currentContainerTokenMasterKey;
64 private MasterKey currentNMTokenMasterKey;
65 private String version;
66 private Map<ContainerId, ContainerStatus> containerStats =
67 new HashMap<ContainerId, ContainerStatus>();
68 private Map<ApplicationId, AppCollectorData> registeringCollectors
69 = new ConcurrentHashMap<>();
70 private Set<NodeLabel> nodeLabels;
71
72 public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
73 // scale vcores based on the requested memory
74 this(nodeIdStr, memory,
75 Math.max(1, (memory * YarnConfiguration.DEFAULT_NM_VCORES) /
76 YarnConfiguration.DEFAULT_NM_PMEM_MB),
77 resourceTracker);
78 }
79
80 public MockNM(String nodeIdStr, int memory, int vcores,
81 ResourceTrackerService resourceTracker) {
82 this(nodeIdStr, memory, vcores, resourceTracker,
83 YarnVersionInfo.getVersion());
84 }
85
86 public MockNM(String nodeIdStr, int memory, int vcores,
87 ResourceTrackerService resourceTracker, String version) {
88 this(nodeIdStr, Resource.newInstance(memory, vcores), resourceTracker,
89 version);
90 }
91
92 public MockNM(String nodeIdStr, Resource capability,
93 ResourceTrackerService resourceTracker) {
94 this(nodeIdStr, capability, resourceTracker,
95 YarnVersionInfo.getVersion());
96 }
97
98 public MockNM(String nodeIdStr, Resource capability,
99 ResourceTrackerService resourceTracker, String version) {
100 this.capability = capability;
101 this.resourceTracker = resourceTracker;
102 this.version = version;
103 String[] splits = nodeIdStr.split(":");
104 nodeId = BuilderUtils.newNodeId(splits[0], Integer.parseInt(splits[1]));
105 }
106
107 public MockNM(String nodeIdStr, Resource capability,
108 ResourceTrackerService resourceTracker, String version, Set<NodeLabel>
109 nodeLabels) {
110 this(nodeIdStr, capability, resourceTracker, version);
111 this.nodeLabels = nodeLabels;
112 }
113
114 public NodeId getNodeId() {
115 return nodeId;
116 }
117
118 public int getHttpPort() {
119 return httpPort;
120 }
121
122 public void setHttpPort(int port) {
123 httpPort = port;
124 }
125
126 public void setResourceTrackerService(ResourceTrackerService resourceTracker) {
127 this.resourceTracker = resourceTracker;
128 }
129
130 public void containerStatus(ContainerStatus containerStatus) throws Exception {
131 Map<ApplicationId, List<ContainerStatus>> conts =
132 new HashMap<ApplicationId, List<ContainerStatus>>();
133 conts.put(containerStatus.getContainerId().getApplicationAttemptId().getApplicationId(),
134 Arrays.asList(new ContainerStatus[] { containerStatus }));
135 nodeHeartbeat(conts, true);
136 }
137
138 public void containerIncreaseStatus(Container container) throws Exception {
139 ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
140 container.getId(), ContainerState.RUNNING, "Success", 0,
141 container.getResource());
142 List<Container> increasedConts = Collections.singletonList(container);
143 nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts,
144 true, responseId);
145 }
146
147 public void addRegisteringCollector(ApplicationId appId,
148 AppCollectorData data) {
149 this.registeringCollectors.put(appId, data);
150 }
151
152 public Map<ApplicationId, AppCollectorData> getRegisteringCollectors() {
153 return this.registeringCollectors;
154 }
155
156 public void unRegisterNode() throws Exception {
157 UnRegisterNodeManagerRequest request = Records
158 .newRecord(UnRegisterNodeManagerRequest.class);
159 request.setNodeId(nodeId);
160 resourceTracker.unRegisterNodeManager(request);
161 }
162
163 public RegisterNodeManagerResponse registerNode() throws Exception {
164 return registerNode(null, null);
165 }
166
167 public RegisterNodeManagerResponse registerNode(
168 List<ApplicationId> runningApplications) throws Exception {
169 return registerNode(null, runningApplications);
170 }
171
172 public RegisterNodeManagerResponse registerNode(
173 List<NMContainerStatus> containerReports,
174 List<ApplicationId> runningApplications) throws Exception {
175 RegisterNodeManagerRequest req = Records.newRecord(
176 RegisterNodeManagerRequest.class);
177
178 req.setNodeId(nodeId);
179 req.setHttpPort(httpPort);
180 req.setResource(capability);
181 req.setContainerStatuses(containerReports);
182 req.setNMVersion(version);
183 req.setRunningApplications(runningApplications);
184 if ( nodeLabels != null && nodeLabels.size() > 0) {
185 req.setNodeLabels(nodeLabels);
186 }
187
188 RegisterNodeManagerResponse registrationResponse =
189 resourceTracker.registerNodeManager(req);
190 this.currentContainerTokenMasterKey =
191 registrationResponse.getContainerTokenMasterKey();
192 this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey();
193 Resource newResource = registrationResponse.getResource();
194 if (newResource != null) {
195 capability = Resources.clone(newResource);
196 }
197 containerStats.clear();
198 if (containerReports != null) {
199 for (NMContainerStatus report : containerReports) {
200 if (report.getContainerState() != ContainerState.COMPLETE) {
201 containerStats.put(report.getContainerId(),
202 ContainerStatus.newInstance(report.getContainerId(),
203 report.getContainerState(), report.getDiagnostics(),
204 report.getContainerExitStatus()));
205 }
206 }
207 }
208 responseId = 0;
209 return registrationResponse;
210 }
211
212 public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
213 return nodeHeartbeat(Collections.<ContainerStatus>emptyList(),
214 Collections.<Container>emptyList(), isHealthy, responseId);
215 }
216
217 public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
218 long containerId, ContainerState containerState) throws Exception {
219 ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
220 BuilderUtils.newContainerId(attemptId, containerId), containerState,
221 "Success", 0, capability);
222 ArrayList<ContainerStatus> containerStatusList =
223 new ArrayList<ContainerStatus>(1);
224 containerStatusList.add(containerStatus);
225 Log.getLog().info("ContainerStatus: " + containerStatus);
226 return nodeHeartbeat(containerStatusList,
227 Collections.<Container>emptyList(), true, responseId);
228 }
229
230 public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
231 List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
232 return nodeHeartbeat(conts, isHealthy, responseId);
233 }
234
235 public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
236 List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception {
237 ArrayList<ContainerStatus> updatedStats = new ArrayList<ContainerStatus>();
238 for (List<ContainerStatus> stats : conts.values()) {
239 updatedStats.addAll(stats);
240 }
241 return nodeHeartbeat(updatedStats, Collections.<Container>emptyList(),
242 isHealthy, resId);
243 }
244
245 public NodeHeartbeatResponse nodeHeartbeat(
246 List<ContainerStatus> updatedStats, boolean isHealthy) throws Exception {
247 return nodeHeartbeat(updatedStats, Collections.<Container>emptyList(),
248 isHealthy, responseId);
249 }
250
251 public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats,
252 List<Container> increasedConts, boolean isHealthy, int resId)
253 throws Exception {
254 NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
255 NodeStatus status = Records.newRecord(NodeStatus.class);
256 status.setResponseId(resId);
257 status.setNodeId(nodeId);
258 ArrayList<ContainerId> completedContainers = new ArrayList<ContainerId>();
259 for (ContainerStatus stat : updatedStats) {
260 if (stat.getState() == ContainerState.COMPLETE) {
261 completedContainers.add(stat.getContainerId());
262 }
263 containerStats.put(stat.getContainerId(), stat);
264 }
265 status.setContainersStatuses(
266 new ArrayList<ContainerStatus>(containerStats.values()));
267 for (ContainerId cid : completedContainers) {
268 containerStats.remove(cid);
269 }
270 status.setIncreasedContainers(increasedConts);
271 NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);
272 healthStatus.setHealthReport("");
273 healthStatus.setIsNodeHealthy(isHealthy);
274 healthStatus.setLastHealthReportTime(1);
275 status.setNodeHealthStatus(healthStatus);
276 req.setNodeStatus(status);
277 req.setLastKnownContainerTokenMasterKey(this.currentContainerTokenMasterKey);
278 req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey);
279
280 req.setRegisteringCollectors(this.registeringCollectors);
281
282 NodeHeartbeatResponse heartbeatResponse =
283 resourceTracker.nodeHeartbeat(req);
284 responseId = heartbeatResponse.getResponseId();
285
286 MasterKey masterKeyFromRM = heartbeatResponse.getContainerTokenMasterKey();
287 if (masterKeyFromRM != null
288 && masterKeyFromRM.getKeyId() != this.currentContainerTokenMasterKey
289 .getKeyId()) {
290 this.currentContainerTokenMasterKey = masterKeyFromRM;
291 }
292
293 masterKeyFromRM = heartbeatResponse.getNMTokenMasterKey();
294 if (masterKeyFromRM != null
295 && masterKeyFromRM.getKeyId() != this.currentNMTokenMasterKey
296 .getKeyId()) {
297 this.currentNMTokenMasterKey = masterKeyFromRM;
298 }
299
300 Resource newResource = heartbeatResponse.getResource();
301 if (newResource != null) {
302 capability = Resources.clone(newResource);
303 }
304
305 return heartbeatResponse;
306 }
307
308 public long getMemory() {
309 return capability.getMemorySize();
310 }
311
312 public int getvCores() {
313 return capability.getVirtualCores();
314 }
315
316 public Resource getCapability() {
317 return capability;
318 }
319
320 public String getVersion() {
321 return version;
322 }
323
324 public void setResponseId(int id) {
325 this.responseId = id;
326 }
327 }