PHOENIX-4366 Rebuilding a local index fails sometimes
[phoenix.git] / phoenix-core / src / main / java / org / apache / phoenix / iterate / NonAggregateRegionScannerFactory.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.phoenix.iterate;
20
21 import com.google.common.collect.Lists;
22 import com.google.common.collect.Sets;
23
24 import org.apache.hadoop.hbase.Cell;
25 import org.apache.hadoop.hbase.KeyValue;
26 import org.apache.hadoop.hbase.client.Result;
27 import org.apache.hadoop.hbase.client.Scan;
28 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
29 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
30 import org.apache.hadoop.hbase.regionserver.Region;
31 import org.apache.hadoop.hbase.regionserver.RegionScanner;
32 import org.apache.hadoop.io.WritableUtils;
33 import org.apache.phoenix.cache.GlobalCache;
34 import org.apache.phoenix.cache.TenantCache;
35 import org.apache.phoenix.coprocessor.BaseRegionScanner;
36 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
37 import org.apache.phoenix.coprocessor.HashJoinRegionScanner;
38 import org.apache.phoenix.execute.MutationState;
39 import org.apache.phoenix.execute.TupleProjector;
40 import org.apache.phoenix.expression.Expression;
41 import org.apache.phoenix.expression.KeyValueColumnExpression;
42 import org.apache.phoenix.expression.OrderByExpression;
43 import org.apache.phoenix.expression.SingleCellColumnExpression;
44 import org.apache.phoenix.expression.function.ArrayIndexFunction;
45 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
46 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
47 import org.apache.phoenix.index.IndexMaintainer;
48 import org.apache.phoenix.join.HashJoinInfo;
49 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
50 import org.apache.phoenix.memory.MemoryManager;
51 import org.apache.phoenix.query.QueryConstants;
52 import org.apache.phoenix.schema.KeyValueSchema;
53 import org.apache.phoenix.schema.PTable;
54 import org.apache.phoenix.schema.ValueBitSet;
55 import org.apache.phoenix.schema.tuple.ResultTuple;
56 import org.apache.phoenix.schema.tuple.Tuple;
57 import org.apache.phoenix.schema.types.PInteger;
58 import org.apache.phoenix.transaction.PhoenixTransactionContext;
59 import org.apache.phoenix.util.EncodedColumnsUtil;
60 import org.apache.phoenix.util.IndexUtil;
61 import org.apache.phoenix.util.ScanUtil;
62 import org.apache.phoenix.util.ServerUtil;
63
64 import java.io.ByteArrayInputStream;
65 import java.io.DataInputStream;
66 import java.io.IOException;
67 import java.sql.SQLException;
68 import java.util.ArrayList;
69 import java.util.List;
70 import java.util.Set;
71
72 import static org.apache.phoenix.util.EncodedColumnsUtil.getMinMaxQualifiersFromScan;
73
74 public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
75
76 public NonAggregateRegionScannerFactory(RegionCoprocessorEnvironment env) {
77 this.env = env;
78 }
79
80 @Override
81 public RegionScanner getRegionScanner(final Scan scan, final RegionScanner s) throws Throwable {
82 ImmutableBytesWritable ptr = new ImmutableBytesWritable();
83 int offset = 0;
84 if (ScanUtil.isLocalIndex(scan)) {
85 /*
86 * For local indexes, we need to set an offset on row key expressions to skip
87 * the region start key.
88 */
89 Region region = getRegion();
90 offset = region.getRegionInfo().getStartKey().length != 0 ?
91 region.getRegionInfo().getStartKey().length :
92 region.getRegionInfo().getEndKey().length;
93 ScanUtil.setRowKeyOffset(scan, offset);
94 }
95 byte[] scanOffsetBytes = scan.getAttribute(BaseScannerRegionObserver.SCAN_OFFSET);
96 Integer scanOffset = null;
97 if (scanOffsetBytes != null) {
98 scanOffset = (Integer)PInteger.INSTANCE.toObject(scanOffsetBytes);
99 }
100 RegionScanner innerScanner = s;
101 PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
102 boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
103
104 Set<KeyValueColumnExpression> arrayKVRefs = Sets.newHashSet();
105 Expression[] arrayFuncRefs = deserializeArrayPositionalExpressionInfoFromScan(scan, innerScanner, arrayKVRefs);
106 KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0);
107 for (Expression expression : arrayFuncRefs) {
108 builder.addField(expression);
109 }
110 KeyValueSchema kvSchema = builder.build();
111 ValueBitSet kvSchemaBitSet = ValueBitSet.newInstance(kvSchema);
112 TupleProjector tupleProjector = null;
113 Region dataRegion = null;
114 IndexMaintainer indexMaintainer = null;
115 byte[][] viewConstants = null;
116 PhoenixTransactionContext tx = null;
117 ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
118 if (dataColumns != null) {
119 tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
120 dataRegion = env.getRegion();
121 boolean useProto = false;
122 byte[] localIndexBytes = scan.getAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO);
123 useProto = localIndexBytes != null;
124 if (localIndexBytes == null) {
125 localIndexBytes = scan.getAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD);
126 }
127 List<IndexMaintainer> indexMaintainers =
128 localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto);
129 indexMaintainer = indexMaintainers.get(0);
130 viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
131 byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
132 tx = MutationState.decodeTransaction(txState);
133 }
134
135 final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
136 final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
137 boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(getMinMaxQualifiersFromScan(scan))
138 && scan.getAttribute(BaseScannerRegionObserver.TOPN) != null;
139 // setting dataRegion in case of a non-coprocessor environment
140 if (dataRegion == null &&
141 env.getConfiguration().get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY) != null) {
142 dataRegion = env.getRegion();
143 }
144 innerScanner = getWrappedScanner(env, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan, dataColumns,
145 tupleProjector, dataRegion, indexMaintainer, tx, viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null,
146 ptr, useQualifierAsIndex);
147
148 final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan);
149 if (j != null) {
150 innerScanner = new HashJoinRegionScanner(innerScanner, p, j, tenantId, env, useQualifierAsIndex,
151 useNewValueColumnQualifier);
152 }
153 if (scanOffset != null) {
154 innerScanner = getOffsetScanner(innerScanner, new OffsetResultIterator(
155 new RegionScannerResultIterator(innerScanner, getMinMaxQualifiersFromScan(scan), encodingScheme), scanOffset),
156 scan.getAttribute(QueryConstants.LAST_SCAN) != null);
157 }
158 final OrderedResultIterator iterator = deserializeFromScan(scan, innerScanner);
159 if (iterator == null) {
160 return innerScanner;
161 }
162 // TODO:the above wrapped scanner should be used here also
163 return getTopNScanner(env, innerScanner, iterator, tenantId);
164 }
165
166 private static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s) {
167 byte[] topN = scan.getAttribute(BaseScannerRegionObserver.TOPN);
168 if (topN == null) {
169 return null;
170 }
171 ByteArrayInputStream stream = new ByteArrayInputStream(topN); // TODO: size?
172 try {
173 DataInputStream input = new DataInputStream(stream);
174 int thresholdBytes = WritableUtils.readVInt(input);
175 int limit = WritableUtils.readVInt(input);
176 int estimatedRowSize = WritableUtils.readVInt(input);
177 int size = WritableUtils.readVInt(input);
178 List<OrderByExpression> orderByExpressions = Lists.newArrayListWithExpectedSize(size);
179 for (int i = 0; i < size; i++) {
180 OrderByExpression orderByExpression = new OrderByExpression();
181 orderByExpression.readFields(input);
182 orderByExpressions.add(orderByExpression);
183 }
184 PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
185 ResultIterator inner = new RegionScannerResultIterator(s, EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan), encodingScheme);
186 return new OrderedResultIterator(inner, orderByExpressions, thresholdBytes, limit >= 0 ? limit : null, null,
187 estimatedRowSize);
188 } catch (IOException e) {
189 throw new RuntimeException(e);
190 } finally {
191 try {
192 stream.close();
193 } catch (IOException e) {
194 throw new RuntimeException(e);
195 }
196 }
197 }
198
199 private Expression[] deserializeArrayPositionalExpressionInfoFromScan(Scan scan, RegionScanner s,
200 Set<KeyValueColumnExpression> arrayKVRefs) {
201 byte[] specificArrayIdx = scan.getAttribute(BaseScannerRegionObserver.SPECIFIC_ARRAY_INDEX);
202 if (specificArrayIdx == null) {
203 return null;
204 }
205 ByteArrayInputStream stream = new ByteArrayInputStream(specificArrayIdx);
206 try {
207 DataInputStream input = new DataInputStream(stream);
208 int arrayKVRefSize = WritableUtils.readVInt(input);
209 for (int i = 0; i < arrayKVRefSize; i++) {
210 PTable.ImmutableStorageScheme scheme = EncodedColumnsUtil.getImmutableStorageScheme(scan);
211 KeyValueColumnExpression kvExp = scheme != PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN ? new SingleCellColumnExpression(scheme)
212 : new KeyValueColumnExpression();
213 kvExp.readFields(input);
214 arrayKVRefs.add(kvExp);
215 }
216 int arrayKVFuncSize = WritableUtils.readVInt(input);
217 Expression[] arrayFuncRefs = new Expression[arrayKVFuncSize];
218 for (int i = 0; i < arrayKVFuncSize; i++) {
219 ArrayIndexFunction arrayIdxFunc = new ArrayIndexFunction();
220 arrayIdxFunc.readFields(input);
221 arrayFuncRefs[i] = arrayIdxFunc;
222 }
223 return arrayFuncRefs;
224 } catch (IOException e) {
225 throw new RuntimeException(e);
226 } finally {
227 try {
228 stream.close();
229 } catch (IOException e) {
230 throw new RuntimeException(e);
231 }
232 }
233 }
234
235
236 private RegionScanner getOffsetScanner(final RegionScanner s,
237 final OffsetResultIterator iterator, final boolean isLastScan) throws IOException {
238 final Tuple firstTuple;
239 final Region region = getRegion();
240 region.startRegionOperation();
241 try {
242 Tuple tuple = iterator.next();
243 if (tuple == null && !isLastScan) {
244 List<KeyValue> kvList = new ArrayList<KeyValue>(1);
245 KeyValue kv = new KeyValue(QueryConstants.OFFSET_ROW_KEY_BYTES, QueryConstants.OFFSET_FAMILY,
246 QueryConstants.OFFSET_COLUMN, PInteger.INSTANCE.toBytes(iterator.getRemainingOffset()));
247 kvList.add(kv);
248 Result r = new Result(kvList);
249 firstTuple = new ResultTuple(r);
250 } else {
251 firstTuple = tuple;
252 }
253 } catch (Throwable t) {
254 ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
255 return null;
256 } finally {
257 region.closeRegionOperation();
258 }
259 return new BaseRegionScanner(s) {
260 private Tuple tuple = firstTuple;
261
262 @Override
263 public boolean isFilterDone() {
264 return tuple == null;
265 }
266
267 @Override
268 public boolean next(List<Cell> results) throws IOException {
269 try {
270 if (isFilterDone()) { return false; }
271 for (int i = 0; i < tuple.size(); i++) {
272 results.add(tuple.getValue(i));
273 }
274 tuple = iterator.next();
275 return !isFilterDone();
276 } catch (Throwable t) {
277 ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
278 return false;
279 }
280 }
281
282 @Override
283 public void close() throws IOException {
284 try {
285 s.close();
286 } finally {
287 try {
288 if (iterator != null) {
289 iterator.close();
290 }
291 } catch (SQLException e) {
292 ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), e);
293 }
294 }
295 }
296 };
297 }
298
299 /**
300 * Return region scanner that does TopN.
301 * We only need to call startRegionOperation and closeRegionOperation when
302 * getting the first Tuple (which forces running through the entire region)
303 * since after this everything is held in memory
304 */
305 private RegionScanner getTopNScanner(RegionCoprocessorEnvironment env, final RegionScanner s,
306 final OrderedResultIterator iterator, ImmutableBytesPtr tenantId) throws Throwable {
307
308 final Tuple firstTuple;
309 TenantCache tenantCache = GlobalCache.getTenantCache(env, tenantId);
310 long estSize = iterator.getEstimatedByteSize();
311 final MemoryManager.MemoryChunk chunk = tenantCache.getMemoryManager().allocate(estSize);
312 final Region region = getRegion();
313 region.startRegionOperation();
314 try {
315 // Once we return from the first call to next, we've run through and cached
316 // the topN rows, so we no longer need to start/stop a region operation.
317 firstTuple = iterator.next();
318 // Now that the topN are cached, we can resize based on the real size
319 long actualSize = iterator.getByteSize();
320 chunk.resize(actualSize);
321 } catch (Throwable t) {
322 ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
323 return null;
324 } finally {
325 region.closeRegionOperation();
326 }
327 return new BaseRegionScanner(s) {
328 private Tuple tuple = firstTuple;
329
330 @Override
331 public boolean isFilterDone() {
332 return tuple == null;
333 }
334
335 @Override
336 public boolean next(List<Cell> results) throws IOException {
337 try {
338 if (isFilterDone()) {
339 return false;
340 }
341
342 for (int i = 0; i < tuple.size(); i++) {
343 results.add(tuple.getValue(i));
344 }
345
346 tuple = iterator.next();
347 return !isFilterDone();
348 } catch (Throwable t) {
349 ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
350 return false;
351 }
352 }
353
354 @Override
355 public void close() throws IOException {
356 try {
357 s.close();
358 } finally {
359 try {
360 if(iterator != null) {
361 iterator.close();
362 }
363 } catch (SQLException e) {
364 ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), e);
365 } finally {
366 chunk.close();
367 }
368 }
369 }
370 };
371 }
372 }