HIVE-19127: Concurrency fixes in QueryResultsCache (Jason Dere, reviewed by Deepak...
[hive.git] / ql / src / java / org / apache / hadoop / hive / ql / cache / results / QueryResultsCache.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.hive.ql.cache.results;
20
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.base.Preconditions;
23 import com.google.common.util.concurrent.ThreadFactoryBuilder;
24
25 import java.io.IOException;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.HashSet;
29 import java.util.LinkedHashMap;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.UUID;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.ScheduledFuture;
38 import java.util.concurrent.ThreadFactory;
39 import java.util.concurrent.ThreadPoolExecutor;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicBoolean;
42 import java.util.concurrent.atomic.AtomicInteger;
43 import java.util.concurrent.locks.Lock;
44 import java.util.concurrent.locks.ReadWriteLock;
45 import java.util.concurrent.locks.ReentrantReadWriteLock;
46
47 import org.apache.hadoop.fs.ContentSummary;
48 import org.apache.hadoop.fs.FileSystem;
49 import org.apache.hadoop.fs.Path;
50 import org.apache.hadoop.fs.permission.FsPermission;
51 import org.apache.hadoop.hive.common.metrics.common.Metrics;
52 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
53 import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
54 import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
55 import org.apache.hadoop.hive.conf.HiveConf;
56 import org.apache.hadoop.hive.metastore.api.FieldSchema;
57 import org.apache.hadoop.hive.ql.exec.Utilities;
58 import org.apache.hadoop.hive.ql.hooks.Entity.Type;
59 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
60 import org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;
61 import org.apache.hadoop.hive.ql.metadata.Table;
62 import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo;
63 import org.apache.hadoop.hive.ql.parse.TableAccessInfo;
64 import org.apache.hadoop.hive.ql.plan.FetchWork;
65 import org.apache.hadoop.hive.ql.plan.HiveOperation;
66
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70 /**
71 * A class to handle management and lookup of cached Hive query results.
72 */
73 public final class QueryResultsCache {
74
75 private static final Logger LOG = LoggerFactory.getLogger(QueryResultsCache.class);
76
77 public static class LookupInfo {
78 private String queryText;
79
80 public LookupInfo(String queryText) {
81 super();
82 this.queryText = queryText;
83 }
84
85 public String getQueryText() {
86 return queryText;
87 }
88 }
89
90 public static class QueryInfo {
91 private LookupInfo lookupInfo;
92 private HiveOperation hiveOperation;
93 private List<FieldSchema> resultSchema;
94 private TableAccessInfo tableAccessInfo;
95 private ColumnAccessInfo columnAccessInfo;
96 private Set<ReadEntity> inputs;
97
98 public QueryInfo(
99 LookupInfo lookupInfo,
100 HiveOperation hiveOperation,
101 List<FieldSchema> resultSchema,
102 TableAccessInfo tableAccessInfo,
103 ColumnAccessInfo columnAccessInfo,
104 Set<ReadEntity> inputs) {
105 this.lookupInfo = lookupInfo;
106 this.hiveOperation = hiveOperation;
107 this.resultSchema = resultSchema;
108 this.tableAccessInfo = tableAccessInfo;
109 this.columnAccessInfo = columnAccessInfo;
110 this.inputs = inputs;
111 }
112
113 public LookupInfo getLookupInfo() {
114 return lookupInfo;
115 }
116
117 public void setLookupInfo(LookupInfo lookupInfo) {
118 this.lookupInfo = lookupInfo;
119 }
120
121 public HiveOperation getHiveOperation() {
122 return hiveOperation;
123 }
124
125 public void setHiveOperation(HiveOperation hiveOperation) {
126 this.hiveOperation = hiveOperation;
127 }
128
129 public List<FieldSchema> getResultSchema() {
130 return resultSchema;
131 }
132
133 public void setResultSchema(List<FieldSchema> resultSchema) {
134 this.resultSchema = resultSchema;
135 }
136
137 public TableAccessInfo getTableAccessInfo() {
138 return tableAccessInfo;
139 }
140
141 public void setTableAccessInfo(TableAccessInfo tableAccessInfo) {
142 this.tableAccessInfo = tableAccessInfo;
143 }
144
145 public ColumnAccessInfo getColumnAccessInfo() {
146 return columnAccessInfo;
147 }
148
149 public void setColumnAccessInfo(ColumnAccessInfo columnAccessInfo) {
150 this.columnAccessInfo = columnAccessInfo;
151 }
152
153 public Set<ReadEntity> getInputs() {
154 return inputs;
155 }
156
157 public void setInputs(Set<ReadEntity> inputs) {
158 this.inputs = inputs;
159 }
160 }
161
162 public enum CacheEntryStatus {
163 VALID, INVALID, PENDING
164 }
165
166 public static class CacheEntry {
167 private QueryInfo queryInfo;
168 private FetchWork fetchWork;
169 private Path cachedResultsPath;
170
171 // Cache administration
172 private long createTime;
173 private long size;
174 private AtomicInteger readers = new AtomicInteger(0);
175 private ScheduledFuture<?> invalidationFuture = null;
176 private volatile CacheEntryStatus status = CacheEntryStatus.PENDING;
177
178 public void releaseReader() {
179 int readerCount = 0;
180 synchronized (this) {
181 readerCount = readers.decrementAndGet();
182 }
183 LOG.debug("releaseReader: entry: {}, readerCount: {}", this, readerCount);
184
185 cleanupIfNeeded();
186 }
187
188 public String toString() {
189 return "CacheEntry query: [" + getQueryInfo().getLookupInfo().getQueryText()
190 + "], status: " + status + ", location: " + cachedResultsPath
191 + ", size: " + size;
192 }
193
194 public boolean addReader() {
195 boolean added = false;
196 int readerCount = 0;
197 synchronized (this) {
198 if (status == CacheEntryStatus.VALID) {
199 readerCount = readers.incrementAndGet();
200 added = true;
201 }
202 }
203 LOG.debug("addReader: entry: {}, readerCount: {}, added: {}", this, readerCount, added);
204 return added;
205 }
206
207 private int numReaders() {
208 return readers.get();
209 }
210
211 private void invalidate() {
212 LOG.info("Invalidating cache entry: {}", this);
213 CacheEntryStatus prevStatus = setStatus(CacheEntryStatus.INVALID);
214 if (prevStatus == CacheEntryStatus.VALID) {
215 if (invalidationFuture != null) {
216 // The cache entry has just been invalidated, no need for the scheduled invalidation.
217 invalidationFuture.cancel(false);
218 }
219 cleanupIfNeeded();
220 decrementMetric(MetricsConstant.QC_VALID_ENTRIES);
221 } else if (prevStatus == CacheEntryStatus.PENDING) {
222 // Need to notify any queries waiting on the change from pending status.
223 synchronized (this) {
224 this.notifyAll();
225 }
226 decrementMetric(MetricsConstant.QC_PENDING_FAILS);
227 }
228 }
229
230 public CacheEntryStatus getStatus() {
231 return status;
232 }
233
234 private CacheEntryStatus setStatus(CacheEntryStatus newStatus) {
235 synchronized (this) {
236 CacheEntryStatus oldStatus = status;
237 status = newStatus;
238 return oldStatus;
239 }
240 }
241
242 private void cleanupIfNeeded() {
243 if (status == CacheEntryStatus.INVALID && readers.get() <= 0) {
244 QueryResultsCache.cleanupEntry(this);
245 }
246 }
247
248 private String getQueryText() {
249 return getQueryInfo().getLookupInfo().getQueryText();
250 }
251
252 public FetchWork getFetchWork() {
253 // FetchWork's sink is used to hold results, so each query needs a separate copy of FetchWork
254 FetchWork fetch = new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), fetchWork.getLimit());
255 fetch.setCachedResult(true);
256 return fetch;
257 }
258
259 public QueryInfo getQueryInfo() {
260 return queryInfo;
261 }
262
263 public Path getCachedResultsPath() {
264 return cachedResultsPath;
265 }
266
267 /**
268 * Wait for the cache entry to go from PENDING to VALID status.
269 * @return true if the cache entry successfully changed to VALID status,
270 * false if the status changes from PENDING to INVALID
271 */
272 public boolean waitForValidStatus() {
273 LOG.info("Waiting on pending cacheEntry");
274 long timeout = 1000;
275
276 long startTime = System.nanoTime();
277 long endTime;
278
279 while (true) {
280 try {
281 switch (status) {
282 case VALID:
283 endTime = System.nanoTime();
284 incrementMetric(MetricsConstant.QC_PENDING_SUCCESS_WAIT_TIME,
285 TimeUnit.MILLISECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS));
286 return true;
287 case INVALID:
288 endTime = System.nanoTime();
289 incrementMetric(MetricsConstant.QC_PENDING_FAILS_WAIT_TIME,
290 TimeUnit.MILLISECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS));
291 return false;
292 case PENDING:
293 // Status has not changed, continue waiting.
294 break;
295 }
296
297 synchronized (this) {
298 this.wait(timeout);
299 }
300 } catch (InterruptedException err) {
301 Thread.currentThread().interrupt();
302 return false;
303 }
304 }
305 }
306 }
307
308 // Allow lookup by query string
309 private final Map<String, Set<CacheEntry>> queryMap = new HashMap<String, Set<CacheEntry>>();
310
311 // LRU. Could also implement LRU as a doubly linked list if CacheEntry keeps its node.
312 // Use synchronized map since even read actions cause the lru to get updated.
313 private final Map<CacheEntry, CacheEntry> lru = Collections.synchronizedMap(
314 new LinkedHashMap<CacheEntry, CacheEntry>(INITIAL_LRU_SIZE, LRU_LOAD_FACTOR, true));
315
316 private final HiveConf conf;
317 private Path cacheDirPath;
318 private Path zeroRowsPath;
319 private long cacheSize = 0;
320 private long maxCacheSize;
321 private long maxEntrySize;
322 private long maxEntryLifetime;
323 private ReadWriteLock rwLock = new ReentrantReadWriteLock();
324
325 private QueryResultsCache(HiveConf configuration) throws IOException {
326 this.conf = configuration;
327
328 // Set up cache directory
329 Path rootCacheDir = new Path(conf.getVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY));
330 LOG.info("Initializing query results cache at {}", rootCacheDir);
331 Utilities.ensurePathIsWritable(rootCacheDir, conf);
332
333 String currentCacheDirName = "results-" + UUID.randomUUID().toString();
334 cacheDirPath = new Path(rootCacheDir, currentCacheDirName);
335 FileSystem fs = cacheDirPath.getFileSystem(conf);
336 FsPermission fsPermission = new FsPermission("700");
337 fs.mkdirs(cacheDirPath, fsPermission);
338
339 // Create non-existent path for 0-row results
340 zeroRowsPath = new Path(cacheDirPath, "dummy_zero_rows");
341
342 // Results cache directory should be cleaned up at process termination.
343 fs.deleteOnExit(cacheDirPath);
344
345 maxCacheSize = conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE);
346 maxEntrySize = conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_SIZE);
347 maxEntryLifetime = conf.getTimeVar(
348 HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_LIFETIME,
349 TimeUnit.MILLISECONDS);
350
351 LOG.info("Query results cache: cacheDirectory {}, maxCacheSize {}, maxEntrySize {}, maxEntryLifetime {}",
352 cacheDirPath, maxCacheSize, maxEntrySize, maxEntryLifetime);
353 }
354
355 private static final AtomicBoolean inited = new AtomicBoolean(false);
356 private static QueryResultsCache instance;
357
358 public static void initialize(HiveConf conf) throws IOException {
359 if (!inited.getAndSet(true)) {
360 try {
361 instance = new QueryResultsCache(conf);
362
363 Metrics metrics = MetricsFactory.getInstance();
364 if (metrics != null) {
365 registerMetrics(metrics, instance);
366 }
367 } catch (IOException err) {
368 inited.set(false);
369 throw err;
370 }
371 }
372 }
373
374 public static QueryResultsCache getInstance() {
375 return instance;
376 }
377
378 /**
379 * Check if the cache contains an entry for the requested LookupInfo.
380 * @param request
381 * @param addReader Should the reader count be incremented during the lookup.
382 * This will ensure the returned entry can be used after the lookup.
383 * If true, the caller will be responsible for decrementing the reader count
384 * using CacheEntry.releaseReader().
385 * @return The cached result if there is a match in the cache, or null if no match is found.
386 */
387 public CacheEntry lookup(LookupInfo request) {
388 CacheEntry result = null;
389
390 LOG.debug("QueryResultsCache lookup for query: {}", request.queryText);
391
392 boolean foundPending = false;
393 Lock readLock = rwLock.readLock();
394 try {
395 readLock.lock();
396 Set<CacheEntry> candidates = queryMap.get(request.queryText);
397 if (candidates != null) {
398 CacheEntry pendingResult = null;
399 for (CacheEntry candidate : candidates) {
400 if (entryMatches(request, candidate)) {
401 CacheEntryStatus entryStatus = candidate.status;
402 if (entryStatus == CacheEntryStatus.VALID) {
403 result = candidate;
404 break;
405 } else if (entryStatus == CacheEntryStatus.PENDING && pendingResult == null) {
406 pendingResult = candidate;
407 }
408 }
409 }
410
411 // Try to find valid entry, but settle for pending entry if that is all we have.
412 if (result == null && pendingResult != null) {
413 result = pendingResult;
414 foundPending = true;
415 }
416
417 if (result != null) {
418 lru.get(result); // Update LRU
419 }
420 }
421 } finally {
422 readLock.unlock();
423 }
424
425 LOG.debug("QueryResultsCache lookup result: {}", result);
426 incrementMetric(MetricsConstant.QC_LOOKUPS);
427 if (result != null) {
428 if (foundPending) {
429 incrementMetric(MetricsConstant.QC_PENDING_HITS);
430 } else {
431 incrementMetric(MetricsConstant.QC_VALID_HITS);
432 }
433 }
434
435 return result;
436 }
437
438 /**
439 * Add an entry to the cache.
440 * The new entry will be in PENDING state and not usable setEntryValid() is called on the entry.
441 * @param queryInfo
442 * @return
443 */
444 public CacheEntry addToCache(QueryInfo queryInfo) {
445 // Create placeholder entry with PENDING state.
446 String queryText = queryInfo.getLookupInfo().getQueryText();
447 CacheEntry addedEntry = new CacheEntry();
448 addedEntry.queryInfo = queryInfo;
449
450 Lock writeLock = rwLock.writeLock();
451 try {
452 writeLock.lock();
453
454 LOG.info("Adding placeholder cache entry for query '{}'", queryText);
455
456 // Add the entry to the cache structures while under write lock.
457 Set<CacheEntry> entriesForQuery = queryMap.get(queryText);
458 if (entriesForQuery == null) {
459 entriesForQuery = new HashSet<CacheEntry>();
460 queryMap.put(queryText, entriesForQuery);
461 }
462 entriesForQuery.add(addedEntry);
463 lru.put(addedEntry, addedEntry);
464 } finally {
465 writeLock.unlock();
466 }
467
468 return addedEntry;
469 }
470
471 /**
472 * Updates a pending cache entry with a FetchWork result from a finished query.
473 * If successful the cache entry will be set to valid status and be usable for cached queries.
474 * Important: Adding the entry to the cache will increment the reader count for the cache entry.
475 * CacheEntry.releaseReader() should be called when the caller is done with the cache entry.
476 * @param cacheEntry
477 * @param fetchWork
478 * @return
479 */
480 public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) {
481 String queryText = cacheEntry.getQueryText();
482 boolean dataDirMoved = false;
483 Path queryResultsPath = null;
484 Path cachedResultsPath = null;
485
486 try {
487 boolean requiresMove = true;
488 queryResultsPath = fetchWork.getTblDir();
489 FileSystem resultsFs = queryResultsPath.getFileSystem(conf);
490 long resultSize;
491 if (resultsFs.exists(queryResultsPath)) {
492 ContentSummary cs = resultsFs.getContentSummary(queryResultsPath);
493 resultSize = cs.getLength();
494 } else {
495 // No actual result directory, no need to move anything.
496 cachedResultsPath = zeroRowsPath;
497 resultSize = 0;
498 requiresMove = false;
499 }
500
501 if (!shouldEntryBeAdded(cacheEntry, resultSize)) {
502 return false;
503 }
504
505 // Synchronize on the cache entry so that no one else can invalidate this entry
506 // while we are in the process of setting it to valid.
507 synchronized (cacheEntry) {
508 if (cacheEntry.getStatus() == CacheEntryStatus.INVALID) {
509 // Entry either expired, or was invalidated due to table updates
510 return false;
511 }
512
513 if (requiresMove) {
514 // Move the query results to the query cache directory.
515 cachedResultsPath = moveResultsToCacheDirectory(queryResultsPath);
516 dataDirMoved = true;
517 }
518 LOG.info("Moved query results from {} to {} (size {}) for query '{}'",
519 queryResultsPath, cachedResultsPath, resultSize, queryText);
520
521 // Create a new FetchWork to reference the new cache location.
522 FetchWork fetchWorkForCache =
523 new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), fetchWork.getLimit());
524 fetchWorkForCache.setCachedResult(true);
525 cacheEntry.fetchWork = fetchWorkForCache;
526 cacheEntry.cachedResultsPath = cachedResultsPath;
527 cacheEntry.size = resultSize;
528 this.cacheSize += resultSize;
529 cacheEntry.createTime = System.currentTimeMillis();
530
531 cacheEntry.setStatus(CacheEntryStatus.VALID);
532 // Mark this entry as being in use. Caller will need to release later.
533 cacheEntry.addReader();
534
535 scheduleEntryInvalidation(cacheEntry);
536
537 // Notify any queries waiting on this cacheEntry to become valid.
538 cacheEntry.notifyAll();
539 }
540
541 incrementMetric(MetricsConstant.QC_VALID_ENTRIES);
542 incrementMetric(MetricsConstant.QC_TOTAL_ENTRIES_ADDED);
543 } catch (Exception err) {
544 LOG.error("Failed to create cache entry for query results for query: " + queryText, err);
545
546 if (dataDirMoved) {
547 // If data was moved from original location to cache directory, we need to move it back!
548 LOG.info("Restoring query results from {} back to {}", cachedResultsPath, queryResultsPath);
549 try {
550 FileSystem fs = cachedResultsPath.getFileSystem(conf);
551 fs.rename(cachedResultsPath, queryResultsPath);
552 cacheEntry.size = 0;
553 cacheEntry.cachedResultsPath = null;
554 } catch (Exception err2) {
555 String errMsg = "Failed cleanup during failed attempt to cache query: " + queryText;
556 LOG.error(errMsg);
557 throw new RuntimeException(errMsg);
558 }
559 }
560
561 // Invalidate the entry. Rely on query cleanup to remove from lookup.
562 cacheEntry.invalidate();
563 return false;
564 }
565
566 return true;
567 }
568
569 public void clear() {
570 Lock writeLock = rwLock.writeLock();
571 try {
572 writeLock.lock();
573 LOG.info("Clearing the results cache");
574 CacheEntry[] allEntries = null;
575 synchronized (lru) {
576 allEntries = lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY);
577 }
578 for (CacheEntry entry : allEntries) {
579 try {
580 removeEntry(entry);
581 } catch (Exception err) {
582 LOG.error("Error removing cache entry " + entry, err);
583 }
584 }
585 } finally {
586 writeLock.unlock();
587 }
588 }
589
590 public long getSize() {
591 Lock readLock = rwLock.readLock();
592 try {
593 readLock.lock();
594 return cacheSize;
595 } finally {
596 readLock.unlock();
597 }
598 }
599
600 private static final int INITIAL_LRU_SIZE = 16;
601 private static final float LRU_LOAD_FACTOR = 0.75f;
602 private static final CacheEntry[] EMPTY_CACHEENTRY_ARRAY = {};
603
604 private boolean entryMatches(LookupInfo lookupInfo, CacheEntry entry) {
605 QueryInfo queryInfo = entry.getQueryInfo();
606 for (ReadEntity readEntity : queryInfo.getInputs()) {
607 // Check that the tables used do not resolve to temp tables.
608 if (readEntity.getType() == Type.TABLE) {
609 Table tableUsed = readEntity.getTable();
610 Map<String, Table> tempTables =
611 SessionHiveMetaStoreClient.getTempTablesForDatabase(tableUsed.getDbName());
612 if (tempTables != null && tempTables.containsKey(tableUsed.getTableName())) {
613 LOG.info("{} resolves to a temporary table in the current session. This query cannot use the cache.",
614 tableUsed.getTableName());
615 return false;
616 }
617 }
618 }
619
620 return true;
621 }
622
623 public void removeEntry(CacheEntry entry) {
624 entry.invalidate();
625 rwLock.writeLock().lock();
626 try {
627 removeFromLookup(entry);
628 lru.remove(entry);
629 // Should the cache size be updated here, or after the result data has actually been deleted?
630 cacheSize -= entry.size;
631 } finally {
632 rwLock.writeLock().unlock();
633 }
634 }
635
636 private void removeFromLookup(CacheEntry entry) {
637 String queryString = entry.getQueryText();
638 Set<CacheEntry> entries = queryMap.get(queryString);
639 if (entries == null) {
640 LOG.warn("ResultsCache: no entry for {}", queryString);
641 return;
642 }
643 boolean deleted = entries.remove(entry);
644 if (!deleted) {
645 LOG.warn("ResultsCache: Attempted to remove entry but it was not in the cache: {}", entry);
646 }
647 if (entries.isEmpty()) {
648 queryMap.remove(queryString);
649 }
650 }
651
652 private void calculateEntrySize(CacheEntry entry, FetchWork fetchWork) throws IOException {
653 Path queryResultsPath = fetchWork.getTblDir();
654 FileSystem resultsFs = queryResultsPath.getFileSystem(conf);
655 ContentSummary cs = resultsFs.getContentSummary(queryResultsPath);
656 entry.size = cs.getLength();
657 }
658
659 /**
660 * Determines if the cache entry should be added to the results cache.
661 */
662 private boolean shouldEntryBeAdded(CacheEntry entry, long size) {
663 // Assumes the cache lock has already been taken.
664 if (maxEntrySize >= 0 && size > maxEntrySize) {
665 LOG.debug("Cache entry size {} larger than max entry size ({})", size, maxEntrySize);
666 incrementMetric(MetricsConstant.QC_REJECTED_TOO_LARGE);
667 return false;
668 }
669
670 if (!clearSpaceForCacheEntry(entry, size)) {
671 return false;
672 }
673
674 return true;
675 }
676
677 private Path moveResultsToCacheDirectory(Path queryResultsPath) throws IOException {
678 String dirName = UUID.randomUUID().toString();
679 Path cachedResultsPath = new Path(cacheDirPath, dirName);
680 FileSystem fs = cachedResultsPath.getFileSystem(conf);
681 fs.rename(queryResultsPath, cachedResultsPath);
682 return cachedResultsPath;
683 }
684
685 private boolean hasSpaceForCacheEntry(CacheEntry entry, long size) {
686 if (maxCacheSize >= 0) {
687 return (cacheSize + size) <= maxCacheSize;
688 }
689 // Negative max cache size means unbounded.
690 return true;
691 }
692
693 private CacheEntry findEntryToRemove() {
694 // Entries should be in LRU order in the keyset iterator.
695 Set<CacheEntry> entries = lru.keySet();
696 synchronized (lru) {
697 for (CacheEntry removalCandidate : entries) {
698 if (removalCandidate.getStatus() != CacheEntryStatus.VALID) {
699 continue;
700 }
701 return removalCandidate;
702 }
703 }
704 return null;
705 }
706
707 private boolean clearSpaceForCacheEntry(CacheEntry entry, long size) {
708 if (hasSpaceForCacheEntry(entry, size)) {
709 return true;
710 }
711
712 LOG.info("Clearing space for cache entry for query: [{}] with size {}",
713 entry.getQueryText(), size);
714
715 CacheEntry removalCandidate;
716 while ((removalCandidate = findEntryToRemove()) != null) {
717 LOG.info("Removing entry: {}", removalCandidate);
718 removeEntry(removalCandidate);
719 // TODO: Should we wait for the entry to actually be deleted from HDFS? Would have to
720 // poll the reader count, waiting for it to reach 0, at which point cleanup should occur.
721 if (hasSpaceForCacheEntry(entry, size)) {
722 return true;
723 }
724 }
725
726 LOG.info("Could not free enough space for cache entry for query: [{}] withe size {}",
727 entry.getQueryText(), size);
728 return false;
729 }
730
731
732 @VisibleForTesting
733 public static void cleanupInstance() {
734 // This should only ever be called in testing scenarios.
735 // There should not be any other users of the cache or its entries or this may mess up cleanup.
736 if (inited.get()) {
737 getInstance().clear();
738 instance = null;
739 inited.set(false);
740 }
741 }
742
743 private static ScheduledExecutorService invalidationExecutor = null;
744 private static ExecutorService deletionExecutor = null;
745
746 static {
747 ThreadFactory threadFactory =
748 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("QueryResultsCache %d").build();
749 invalidationExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
750 deletionExecutor = Executors.newSingleThreadExecutor(threadFactory);
751 }
752
753 private void scheduleEntryInvalidation(final CacheEntry entry) {
754 if (maxEntryLifetime >= 0) {
755 // Schedule task to invalidate cache entry and remove from lookup.
756 ScheduledFuture<?> future = invalidationExecutor.schedule(new Runnable() {
757 @Override
758 public void run() {
759 removeEntry(entry);
760 }
761 }, maxEntryLifetime, TimeUnit.MILLISECONDS);
762 entry.invalidationFuture = future;
763 }
764 }
765
766 private static void cleanupEntry(final CacheEntry entry) {
767 Preconditions.checkState(entry.getStatus() == CacheEntryStatus.INVALID);
768 final HiveConf conf = getInstance().conf;
769
770 if (entry.cachedResultsPath != null &&
771 !getInstance().zeroRowsPath.equals(entry.cachedResultsPath)) {
772 deletionExecutor.execute(new Runnable() {
773 @Override
774 public void run() {
775 Path path = entry.cachedResultsPath;
776 LOG.info("Cache directory cleanup: deleting {}", path);
777 try {
778 FileSystem fs = entry.cachedResultsPath.getFileSystem(getInstance().conf);
779 fs.delete(entry.cachedResultsPath, true);
780 } catch (Exception err) {
781 LOG.error("Error while trying to delete " + path, err);
782 }
783 }
784 });
785 }
786 }
787
788 public static void incrementMetric(String name, long count) {
789 Metrics metrics = MetricsFactory.getInstance();
790 if (metrics != null) {
791 metrics.incrementCounter(name, count);
792 }
793 }
794
795 public static void decrementMetric(String name, long count) {
796 Metrics metrics = MetricsFactory.getInstance();
797 if (metrics != null) {
798 metrics.decrementCounter(name, count);
799 }
800 }
801
802 public static void incrementMetric(String name) {
803 incrementMetric(name, 1);
804 }
805
806 public static void decrementMetric(String name) {
807 decrementMetric(name, 1);
808 }
809
810 private static void registerMetrics(Metrics metrics, final QueryResultsCache cache) {
811 MetricsVariable<Long> maxCacheSize = new MetricsVariable<Long>() {
812 @Override
813 public Long getValue() {
814 return cache.maxCacheSize;
815 }
816 };
817
818 MetricsVariable<Long> curCacheSize = new MetricsVariable<Long>() {
819 @Override
820 public Long getValue() {
821 return cache.cacheSize;
822 }
823 };
824
825 metrics.addGauge(MetricsConstant.QC_MAX_SIZE, maxCacheSize);
826 metrics.addGauge(MetricsConstant.QC_CURRENT_SIZE, curCacheSize);
827 }
828 }