YARN-7919. Refactor timelineservice-hbase module into submodules. Contributed by...
[hadoop.git] / hadoop-yarn-project / hadoop-yarn / hadoop-yarn-server / hadoop-yarn-server-timelineservice-hbase / hadoop-yarn-server-timelineservice-hbase-client / src / main / java / org / apache / hadoop / yarn / server / timelineservice / storage / reader / SubApplicationEntityReader.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
19
20 import java.io.IOException;
21 import java.util.EnumSet;
22 import java.util.Set;
23
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.hbase.client.Connection;
26 import org.apache.hadoop.hbase.client.Query;
27 import org.apache.hadoop.hbase.client.Result;
28 import org.apache.hadoop.hbase.client.ResultScanner;
29 import org.apache.hadoop.hbase.client.Scan;
30 import org.apache.hadoop.hbase.filter.BinaryComparator;
31 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
32 import org.apache.hadoop.hbase.filter.FamilyFilter;
33 import org.apache.hadoop.hbase.filter.FilterList;
34 import org.apache.hadoop.hbase.filter.FilterList.Operator;
35 import org.apache.hadoop.hbase.filter.PageFilter;
36 import org.apache.hadoop.hbase.filter.QualifierFilter;
37 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
38 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
39 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
40 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
41 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
42 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
43 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
44 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
45 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW;
46 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnRWHelper;
47 import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
48 import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
49 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
50 import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn;
51 import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnFamily;
52 import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix;
53 import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey;
54 import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKeyPrefix;
55 import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTableRW;
56 import org.apache.hadoop.yarn.webapp.BadRequestException;
57
58 import com.google.common.base.Preconditions;
59
60 class SubApplicationEntityReader extends GenericEntityReader {
61 private static final SubApplicationTableRW SUB_APPLICATION_TABLE =
62 new SubApplicationTableRW();
63
64 SubApplicationEntityReader(TimelineReaderContext ctxt,
65 TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
66 super(ctxt, entityFilters, toRetrieve);
67 }
68
69 SubApplicationEntityReader(TimelineReaderContext ctxt,
70 TimelineDataToRetrieve toRetrieve) {
71 super(ctxt, toRetrieve);
72 }
73
74 /**
75 * Uses the {@link SubApplicationTableRW}.
76 */
77 protected BaseTableRW<?> getTable() {
78 return SUB_APPLICATION_TABLE;
79 }
80
81 @Override
82 protected FilterList constructFilterListBasedOnFilters() throws IOException {
83 // Filters here cannot be null for multiple entity reads as they are set in
84 // augmentParams if null.
85 FilterList listBasedOnFilters = new FilterList();
86 TimelineEntityFilters filters = getFilters();
87 // Create filter list based on created time range and add it to
88 // listBasedOnFilters.
89 long createdTimeBegin = filters.getCreatedTimeBegin();
90 long createdTimeEnd = filters.getCreatedTimeEnd();
91 if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) {
92 listBasedOnFilters.addFilter(TimelineFilterUtils
93 .createSingleColValueFiltersByRange(SubApplicationColumn.CREATED_TIME,
94 createdTimeBegin, createdTimeEnd));
95 }
96 // Create filter list based on metric filters and add it to
97 // listBasedOnFilters.
98 TimelineFilterList metricFilters = filters.getMetricFilters();
99 if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) {
100 listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
101 SubApplicationColumnPrefix.METRIC, metricFilters));
102 }
103 // Create filter list based on config filters and add it to
104 // listBasedOnFilters.
105 TimelineFilterList configFilters = filters.getConfigFilters();
106 if (configFilters != null && !configFilters.getFilterList().isEmpty()) {
107 listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
108 SubApplicationColumnPrefix.CONFIG, configFilters));
109 }
110 // Create filter list based on info filters and add it to listBasedOnFilters
111 TimelineFilterList infoFilters = filters.getInfoFilters();
112 if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) {
113 listBasedOnFilters.addFilter(TimelineFilterUtils
114 .createHBaseFilterList(SubApplicationColumnPrefix.INFO, infoFilters));
115 }
116 return listBasedOnFilters;
117 }
118
119 /**
120 * Add {@link QualifierFilter} filters to filter list for each column of
121 * entity table.
122 *
123 * @param list filter list to which qualifier filters have to be added.
124 */
125 protected void updateFixedColumns(FilterList list) {
126 for (SubApplicationColumn column : SubApplicationColumn.values()) {
127 list.addFilter(new QualifierFilter(CompareOp.EQUAL,
128 new BinaryComparator(column.getColumnQualifierBytes())));
129 }
130 }
131
132 /**
133 * Creates a filter list which indicates that only some of the column
134 * qualifiers in the info column family will be returned in result.
135 *
136 * @param isApplication If true, it means operations are to be performed for
137 * application table, otherwise for entity table.
138 * @return filter list.
139 * @throws IOException if any problem occurs while creating filter list.
140 */
141 private FilterList createFilterListForColsOfInfoFamily() throws IOException {
142 FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE);
143 // Add filters for each column in entity table.
144 updateFixedColumns(infoFamilyColsFilter);
145 EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
146 // If INFO field has to be retrieved, add a filter for fetching columns
147 // with INFO column prefix.
148 if (hasField(fieldsToRetrieve, Field.INFO)) {
149 infoFamilyColsFilter.addFilter(
150 TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.EQUAL,
151 SubApplicationColumnPrefix.INFO));
152 }
153 TimelineFilterList relatesTo = getFilters().getRelatesTo();
154 if (hasField(fieldsToRetrieve, Field.RELATES_TO)) {
155 // If RELATES_TO field has to be retrieved, add a filter for fetching
156 // columns with RELATES_TO column prefix.
157 infoFamilyColsFilter.addFilter(
158 TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.EQUAL,
159 SubApplicationColumnPrefix.RELATES_TO));
160 } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) {
161 // Even if fields to retrieve does not contain RELATES_TO, we still
162 // need to have a filter to fetch some of the column qualifiers if
163 // relatesTo filters are specified. relatesTo filters will then be
164 // matched after fetching rows from HBase.
165 Set<String> relatesToCols =
166 TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo);
167 infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
168 SubApplicationColumnPrefix.RELATES_TO, relatesToCols));
169 }
170 TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo();
171 if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
172 // If IS_RELATED_TO field has to be retrieved, add a filter for fetching
173 // columns with IS_RELATED_TO column prefix.
174 infoFamilyColsFilter.addFilter(
175 TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.EQUAL,
176 SubApplicationColumnPrefix.IS_RELATED_TO));
177 } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) {
178 // Even if fields to retrieve does not contain IS_RELATED_TO, we still
179 // need to have a filter to fetch some of the column qualifiers if
180 // isRelatedTo filters are specified. isRelatedTo filters will then be
181 // matched after fetching rows from HBase.
182 Set<String> isRelatedToCols =
183 TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo);
184 infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
185 SubApplicationColumnPrefix.IS_RELATED_TO, isRelatedToCols));
186 }
187 TimelineFilterList eventFilters = getFilters().getEventFilters();
188 if (hasField(fieldsToRetrieve, Field.EVENTS)) {
189 // If EVENTS field has to be retrieved, add a filter for fetching columns
190 // with EVENT column prefix.
191 infoFamilyColsFilter.addFilter(
192 TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.EQUAL,
193 SubApplicationColumnPrefix.EVENT));
194 } else if (eventFilters != null
195 && !eventFilters.getFilterList().isEmpty()) {
196 // Even if fields to retrieve does not contain EVENTS, we still need to
197 // have a filter to fetch some of the column qualifiers on the basis of
198 // event filters specified. Event filters will then be matched after
199 // fetching rows from HBase.
200 Set<String> eventCols =
201 TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters);
202 infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
203 SubApplicationColumnPrefix.EVENT, eventCols));
204 }
205 return infoFamilyColsFilter;
206 }
207
208 /**
209 * Exclude column prefixes via filters which are not required(based on fields
210 * to retrieve) from info column family. These filters are added to filter
211 * list which contains a filter for getting info column family.
212 *
213 * @param infoColFamilyList filter list for info column family.
214 */
215 private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) {
216 EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
217 // Events not required.
218 if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
219 infoColFamilyList.addFilter(
220 TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
221 SubApplicationColumnPrefix.EVENT));
222 }
223 // info not required.
224 if (!hasField(fieldsToRetrieve, Field.INFO)) {
225 infoColFamilyList.addFilter(
226 TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
227 SubApplicationColumnPrefix.INFO));
228 }
229 // is related to not required.
230 if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
231 infoColFamilyList.addFilter(
232 TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
233 SubApplicationColumnPrefix.IS_RELATED_TO));
234 }
235 // relates to not required.
236 if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
237 infoColFamilyList.addFilter(
238 TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
239 SubApplicationColumnPrefix.RELATES_TO));
240 }
241 }
242
243 /**
244 * Updates filter list based on fields for confs and metrics to retrieve.
245 *
246 * @param listBasedOnFields filter list based on fields.
247 * @throws IOException if any problem occurs while updating filter list.
248 */
249 private void updateFilterForConfsAndMetricsToRetrieve(
250 FilterList listBasedOnFields) throws IOException {
251 TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
252 // Please note that if confsToRetrieve is specified, we would have added
253 // CONFS to fields to retrieve in augmentParams() even if not specified.
254 if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) {
255 // Create a filter list for configs.
256 listBasedOnFields.addFilter(
257 TimelineFilterUtils.createFilterForConfsOrMetricsToRetrieve(
258 dataToRetrieve.getConfsToRetrieve(),
259 SubApplicationColumnFamily.CONFIGS,
260 SubApplicationColumnPrefix.CONFIG));
261 }
262
263 // Please note that if metricsToRetrieve is specified, we would have added
264 // METRICS to fields to retrieve in augmentParams() even if not specified.
265 if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) {
266 // Create a filter list for metrics.
267 listBasedOnFields.addFilter(
268 TimelineFilterUtils.createFilterForConfsOrMetricsToRetrieve(
269 dataToRetrieve.getMetricsToRetrieve(),
270 SubApplicationColumnFamily.METRICS,
271 SubApplicationColumnPrefix.METRIC));
272 }
273 }
274
275 @Override
276 protected FilterList constructFilterListBasedOnFields() throws IOException {
277 if (!needCreateFilterListBasedOnFields()) {
278 // Fetch all the columns. No need of a filter.
279 return null;
280 }
281 FilterList listBasedOnFields = new FilterList(Operator.MUST_PASS_ONE);
282 FilterList infoColFamilyList = new FilterList();
283 // By default fetch everything in INFO column family.
284 FamilyFilter infoColumnFamily = new FamilyFilter(CompareOp.EQUAL,
285 new BinaryComparator(SubApplicationColumnFamily.INFO.getBytes()));
286 infoColFamilyList.addFilter(infoColumnFamily);
287 if (fetchPartialColsFromInfoFamily()) {
288 // We can fetch only some of the columns from info family.
289 infoColFamilyList.addFilter(createFilterListForColsOfInfoFamily());
290 } else {
291 // Exclude column prefixes in info column family which are not required
292 // based on fields to retrieve.
293 excludeFieldsFromInfoColFamily(infoColFamilyList);
294 }
295 listBasedOnFields.addFilter(infoColFamilyList);
296 updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields);
297 return listBasedOnFields;
298 }
299
300 @Override
301 protected void validateParams() {
302 Preconditions.checkNotNull(getContext(), "context shouldn't be null");
303 Preconditions.checkNotNull(getDataToRetrieve(),
304 "data to retrieve shouldn't be null");
305 Preconditions.checkNotNull(getContext().getClusterId(),
306 "clusterId shouldn't be null");
307 Preconditions.checkNotNull(getContext().getDoAsUser(),
308 "DoAsUser shouldn't be null");
309 Preconditions.checkNotNull(getContext().getEntityType(),
310 "entityType shouldn't be null");
311 }
312
313 @Override
314 protected void augmentParams(Configuration hbaseConf, Connection conn)
315 throws IOException {
316 getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve();
317 createFiltersIfNull();
318 }
319
320 private void setMetricsTimeRange(Query query) {
321 // Set time range for metric values.
322 HBaseTimelineStorageUtils.setMetricsTimeRange(query,
323 SubApplicationColumnFamily.METRICS.getBytes(),
324 getDataToRetrieve().getMetricsTimeBegin(),
325 getDataToRetrieve().getMetricsTimeEnd());
326 }
327
328 @Override
329 protected ResultScanner getResults(Configuration hbaseConf, Connection conn,
330 FilterList filterList) throws IOException {
331
332 // Scan through part of the table to find the entities belong to one app
333 // and one type
334 Scan scan = new Scan();
335 TimelineReaderContext context = getContext();
336 if (context.getDoAsUser() == null) {
337 throw new BadRequestException("Invalid user!");
338 }
339
340 RowKeyPrefix<SubApplicationRowKey> subApplicationRowKeyPrefix = null;
341 // default mode, will always scans from beginning of entity type.
342 if (getFilters() == null || getFilters().getFromId() == null) {
343 subApplicationRowKeyPrefix = new SubApplicationRowKeyPrefix(
344 context.getDoAsUser(), context.getClusterId(),
345 context.getEntityType(), null, null, null);
346 scan.setRowPrefixFilter(subApplicationRowKeyPrefix.getRowKeyPrefix());
347 } else { // pagination mode, will scan from given entityIdPrefix!enitityId
348
349 SubApplicationRowKey entityRowKey = null;
350 try {
351 entityRowKey = SubApplicationRowKey
352 .parseRowKeyFromString(getFilters().getFromId());
353 } catch (IllegalArgumentException e) {
354 throw new BadRequestException("Invalid filter fromid is provided.");
355 }
356 if (!context.getClusterId().equals(entityRowKey.getClusterId())) {
357 throw new BadRequestException(
358 "fromid doesn't belong to clusterId=" + context.getClusterId());
359 }
360
361 // set start row
362 scan.setStartRow(entityRowKey.getRowKey());
363
364 // get the bytes for stop row
365 subApplicationRowKeyPrefix = new SubApplicationRowKeyPrefix(
366 context.getDoAsUser(), context.getClusterId(),
367 context.getEntityType(), null, null, null);
368
369 // set stop row
370 scan.setStopRow(
371 HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix(
372 subApplicationRowKeyPrefix.getRowKeyPrefix()));
373
374 // set page filter to limit. This filter has to set only in pagination
375 // mode.
376 filterList.addFilter(new PageFilter(getFilters().getLimit()));
377 }
378 setMetricsTimeRange(scan);
379 scan.setMaxVersions(getDataToRetrieve().getMetricsLimit());
380 if (filterList != null && !filterList.getFilters().isEmpty()) {
381 scan.setFilter(filterList);
382 }
383 return getTable().getResultScanner(hbaseConf, conn, scan);
384 }
385
386 @Override
387 protected Result getResult(Configuration hbaseConf, Connection conn,
388 FilterList filterList) throws IOException {
389 throw new UnsupportedOperationException(
390 "we don't support a single entity query");
391 }
392
393 @Override
394 protected TimelineEntity parseEntity(Result result) throws IOException {
395 if (result == null || result.isEmpty()) {
396 return null;
397 }
398 TimelineEntity entity = new TimelineEntity();
399 SubApplicationRowKey parseRowKey =
400 SubApplicationRowKey.parseRowKey(result.getRow());
401 entity.setType(parseRowKey.getEntityType());
402 entity.setId(parseRowKey.getEntityId());
403 entity.setIdPrefix(parseRowKey.getEntityIdPrefix().longValue());
404
405 TimelineEntityFilters filters = getFilters();
406 // fetch created time
407 Long createdTime = (Long) ColumnRWHelper.readResult(result,
408 SubApplicationColumn.CREATED_TIME);
409 entity.setCreatedTime(createdTime);
410
411 EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
412 // fetch is related to entities and match isRelatedTo filter. If isRelatedTo
413 // filters do not match, entity would be dropped. We have to match filters
414 // locally as relevant HBase filters to filter out rows on the basis of
415 // isRelatedTo are not set in HBase scan.
416 boolean checkIsRelatedTo =
417 filters.getIsRelatedTo() != null
418 && filters.getIsRelatedTo().getFilterList().size() > 0;
419 if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) {
420 readRelationship(entity, result, SubApplicationColumnPrefix.IS_RELATED_TO,
421 true);
422 if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity,
423 filters.getIsRelatedTo())) {
424 return null;
425 }
426 if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
427 entity.getIsRelatedToEntities().clear();
428 }
429 }
430
431 // fetch relates to entities and match relatesTo filter. If relatesTo
432 // filters do not match, entity would be dropped. We have to match filters
433 // locally as relevant HBase filters to filter out rows on the basis of
434 // relatesTo are not set in HBase scan.
435 boolean checkRelatesTo =
436 !isSingleEntityRead() && filters.getRelatesTo() != null
437 && filters.getRelatesTo().getFilterList().size() > 0;
438 if (hasField(fieldsToRetrieve, Field.RELATES_TO) || checkRelatesTo) {
439 readRelationship(entity, result, SubApplicationColumnPrefix.RELATES_TO,
440 false);
441 if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity,
442 filters.getRelatesTo())) {
443 return null;
444 }
445 if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
446 entity.getRelatesToEntities().clear();
447 }
448 }
449
450 // fetch info if fieldsToRetrieve contains INFO or ALL.
451 if (hasField(fieldsToRetrieve, Field.INFO)) {
452 readKeyValuePairs(entity, result, SubApplicationColumnPrefix.INFO, false);
453 }
454
455 // fetch configs if fieldsToRetrieve contains CONFIGS or ALL.
456 if (hasField(fieldsToRetrieve, Field.CONFIGS)) {
457 readKeyValuePairs(entity, result, SubApplicationColumnPrefix.CONFIG,
458 true);
459 }
460
461 // fetch events and match event filters if they exist. If event filters do
462 // not match, entity would be dropped. We have to match filters locally
463 // as relevant HBase filters to filter out rows on the basis of events
464 // are not set in HBase scan.
465 boolean checkEvents =
466 !isSingleEntityRead() && filters.getEventFilters() != null
467 && filters.getEventFilters().getFilterList().size() > 0;
468 if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) {
469 readEvents(entity, result, SubApplicationColumnPrefix.EVENT);
470 if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity,
471 filters.getEventFilters())) {
472 return null;
473 }
474 if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
475 entity.getEvents().clear();
476 }
477 }
478
479 // fetch metrics if fieldsToRetrieve contains METRICS or ALL.
480 if (hasField(fieldsToRetrieve, Field.METRICS)) {
481 readMetrics(entity, result, SubApplicationColumnPrefix.METRIC);
482 }
483
484 entity.getInfo().put(TimelineReaderUtils.FROMID_KEY,
485 parseRowKey.getRowKeyAsString());
486 return entity;
487 }
488
489 }