PHOENIX-4366 Rebuilding a local index fails sometimes
[phoenix.git] / phoenix-core / src / main / java / org / apache / phoenix / iterate / SnapshotScanner.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.phoenix.iterate;
20
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.FileSystem;
26 import org.apache.hadoop.fs.Path;
27 import org.apache.hadoop.hbase.*;
28 import org.apache.hadoop.hbase.client.*;
29
30 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
31 import org.apache.hadoop.hbase.regionserver.*;
32 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
33 import org.apache.phoenix.schema.PTable;
34 import org.apache.phoenix.util.*;
35
36 import java.io.IOException;
37 import java.util.ArrayList;
38 import java.util.List;
39 import java.util.concurrent.ConcurrentMap;
40 import java.util.concurrent.ExecutorService;
41
42 public class SnapshotScanner extends AbstractClientScanner {
43
44 private static final Log LOG = LogFactory.getLog(SnapshotScanner.class);
45
46 private RegionScanner scanner = null;
47 private HRegion region;
48 List<Cell> values;
49
50 public SnapshotScanner(Configuration conf, FileSystem fs, Path rootDir,
51 HTableDescriptor htd, HRegionInfo hri, Scan scan) throws Throwable{
52
53 scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
54 values = new ArrayList<>();
55 this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null);
56
57 RegionCoprocessorEnvironment snapshotEnv = getSnapshotContextEnvironment(conf);
58
59 RegionScannerFactory regionScannerFactory;
60 if (scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null) {
61 regionScannerFactory = new NonAggregateRegionScannerFactory(snapshotEnv);
62 } else {
63 /* future work : Snapshot M/R jobs for aggregate queries*/
64 throw new UnsupportedOperationException("Snapshot M/R jobs not available for aggregate queries");
65 }
66
67 this.scanner = regionScannerFactory.getRegionScanner(scan, region.getScanner(scan));
68 region.startRegionOperation();
69 }
70
71
72 @Override
73 public Result next() throws IOException {
74 values.clear();
75 scanner.nextRaw(values);
76 if (values.isEmpty()) {
77 //we are done
78 return null;
79 }
80
81 return Result.create(values);
82 }
83
84 @Override
85 public void close() {
86 if (this.scanner != null) {
87 try {
88 this.scanner.close();
89 this.scanner = null;
90 } catch (IOException e) {
91 LOG.warn("Exception while closing scanner", e);
92 }
93 }
94 if (this.region != null) {
95 try {
96 this.region.closeRegionOperation();
97 this.region.close(true);
98 this.region = null;
99 } catch (IOException e) {
100 LOG.warn("Exception while closing scanner", e);
101 }
102 }
103 }
104
105 @Override
106 public boolean renewLease() {
107 return false;
108 }
109
110 private RegionCoprocessorEnvironment getSnapshotContextEnvironment(final Configuration conf) {
111 return new RegionCoprocessorEnvironment() {
112 @Override
113 public Region getRegion() {
114 return region;
115 }
116
117 @Override
118 public HRegionInfo getRegionInfo() {
119 return region.getRegionInfo();
120 }
121
122 @Override
123 public RegionServerServices getRegionServerServices() {
124 throw new UnsupportedOperationException();
125 }
126
127 @Override
128 public ConcurrentMap<String, Object> getSharedData() {
129 throw new UnsupportedOperationException();
130 }
131
132 @Override
133 public int getVersion() {
134 throw new UnsupportedOperationException();
135 }
136
137 @Override
138 public String getHBaseVersion() {
139 throw new UnsupportedOperationException();
140 }
141
142 @Override
143 public Coprocessor getInstance() {
144 throw new UnsupportedOperationException();
145 }
146
147 @Override
148 public int getPriority() {
149 throw new UnsupportedOperationException();
150 }
151
152 @Override
153 public int getLoadSequence() {
154 throw new UnsupportedOperationException();
155 }
156
157 @Override
158 public Configuration getConfiguration() {
159 return conf;
160 }
161
162 @Override
163 public HTableInterface getTable(TableName tableName) throws IOException {
164 throw new UnsupportedOperationException();
165 }
166
167 @Override
168 public HTableInterface getTable(TableName tableName, ExecutorService executorService)
169 throws IOException {
170 throw new UnsupportedOperationException();
171 }
172
173 @Override
174 public ClassLoader getClassLoader() {
175 throw new UnsupportedOperationException();
176 }
177 };
178 }
179 }