[SPARK-23388][SQL] Support for Parquet Binary DecimalType in VectorizedColumnReader
[spark.git] / sql / core / src / main / java / org / apache / spark / sql / execution / datasources / parquet / VectorizedColumnReader.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.spark.sql.execution.datasources.parquet;
19
20 import java.io.IOException;
21 import java.util.TimeZone;
22
23 import org.apache.parquet.bytes.BytesUtils;
24 import org.apache.parquet.column.ColumnDescriptor;
25 import org.apache.parquet.column.Dictionary;
26 import org.apache.parquet.column.Encoding;
27 import org.apache.parquet.column.page.*;
28 import org.apache.parquet.column.values.ValuesReader;
29 import org.apache.parquet.io.api.Binary;
30 import org.apache.parquet.schema.OriginalType;
31 import org.apache.parquet.schema.PrimitiveType;
32
33 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
34 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
35 import org.apache.spark.sql.types.DataTypes;
36 import org.apache.spark.sql.types.DecimalType;
37
38 import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
39 import static org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.ValuesReaderIntIterator;
40 import static org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.createRLEIterator;
41
42 /**
43 * Decoder to return values from a single column.
44 */
45 public class VectorizedColumnReader {
46 /**
47 * Total number of values read.
48 */
49 private long valuesRead;
50
51 /**
52 * value that indicates the end of the current page. That is,
53 * if valuesRead == endOfPageValueCount, we are at the end of the page.
54 */
55 private long endOfPageValueCount;
56
57 /**
58 * The dictionary, if this column has dictionary encoding.
59 */
60 private final Dictionary dictionary;
61
62 /**
63 * If true, the current page is dictionary encoded.
64 */
65 private boolean isCurrentPageDictionaryEncoded;
66
67 /**
68 * Maximum definition level for this column.
69 */
70 private final int maxDefLevel;
71
72 /**
73 * Repetition/Definition/Value readers.
74 */
75 private SpecificParquetRecordReaderBase.IntIterator repetitionLevelColumn;
76 private SpecificParquetRecordReaderBase.IntIterator definitionLevelColumn;
77 private ValuesReader dataColumn;
78
79 // Only set if vectorized decoding is true. This is used instead of the row by row decoding
80 // with `definitionLevelColumn`.
81 private VectorizedRleValuesReader defColumn;
82
83 /**
84 * Total number of values in this column (in this row group).
85 */
86 private final long totalValueCount;
87
88 /**
89 * Total values in the current page.
90 */
91 private int pageValueCount;
92
93 private final PageReader pageReader;
94 private final ColumnDescriptor descriptor;
95 private final OriginalType originalType;
96 // The timezone conversion to apply to int96 timestamps. Null if no conversion.
97 private final TimeZone convertTz;
98 private static final TimeZone UTC = DateTimeUtils.TimeZoneUTC();
99
100 public VectorizedColumnReader(
101 ColumnDescriptor descriptor,
102 OriginalType originalType,
103 PageReader pageReader,
104 TimeZone convertTz) throws IOException {
105 this.descriptor = descriptor;
106 this.pageReader = pageReader;
107 this.convertTz = convertTz;
108 this.originalType = originalType;
109 this.maxDefLevel = descriptor.getMaxDefinitionLevel();
110
111 DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
112 if (dictionaryPage != null) {
113 try {
114 this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
115 this.isCurrentPageDictionaryEncoded = true;
116 } catch (IOException e) {
117 throw new IOException("could not decode the dictionary for " + descriptor, e);
118 }
119 } else {
120 this.dictionary = null;
121 this.isCurrentPageDictionaryEncoded = false;
122 }
123 this.totalValueCount = pageReader.getTotalValueCount();
124 if (totalValueCount == 0) {
125 throw new IOException("totalValueCount == 0");
126 }
127 }
128
129 /**
130 * Advances to the next value. Returns true if the value is non-null.
131 */
132 private boolean next() throws IOException {
133 if (valuesRead >= endOfPageValueCount) {
134 if (valuesRead >= totalValueCount) {
135 // How do we get here? Throw end of stream exception?
136 return false;
137 }
138 readPage();
139 }
140 ++valuesRead;
141 // TODO: Don't read for flat schemas
142 //repetitionLevel = repetitionLevelColumn.nextInt();
143 return definitionLevelColumn.nextInt() == maxDefLevel;
144 }
145
146 /**
147 * Reads `total` values from this columnReader into column.
148 */
149 void readBatch(int total, WritableColumnVector column) throws IOException {
150 int rowId = 0;
151 WritableColumnVector dictionaryIds = null;
152 if (dictionary != null) {
153 // SPARK-16334: We only maintain a single dictionary per row batch, so that it can be used to
154 // decode all previous dictionary encoded pages if we ever encounter a non-dictionary encoded
155 // page.
156 dictionaryIds = column.reserveDictionaryIds(total);
157 }
158 while (total > 0) {
159 // Compute the number of values we want to read in this page.
160 int leftInPage = (int) (endOfPageValueCount - valuesRead);
161 if (leftInPage == 0) {
162 readPage();
163 leftInPage = (int) (endOfPageValueCount - valuesRead);
164 }
165 int num = Math.min(total, leftInPage);
166 if (isCurrentPageDictionaryEncoded) {
167 // Read and decode dictionary ids.
168 defColumn.readIntegers(
169 num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
170
171 // TIMESTAMP_MILLIS encoded as INT64 can't be lazily decoded as we need to post process
172 // the values to add microseconds precision.
173 if (column.hasDictionary() || (rowId == 0 &&
174 (descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32 ||
175 (descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64 &&
176 originalType != OriginalType.TIMESTAMP_MILLIS) ||
177 descriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT ||
178 descriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE ||
179 descriptor.getType() == PrimitiveType.PrimitiveTypeName.BINARY))) {
180 // Column vector supports lazy decoding of dictionary values so just set the dictionary.
181 // We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
182 // non-dictionary encoded values have already been added).
183 column.setDictionary(new ParquetDictionary(dictionary));
184 } else {
185 decodeDictionaryIds(rowId, num, column, dictionaryIds);
186 }
187 } else {
188 if (column.hasDictionary() && rowId != 0) {
189 // This batch already has dictionary encoded values but this new page is not. The batch
190 // does not support a mix of dictionary and not so we will decode the dictionary.
191 decodeDictionaryIds(0, rowId, column, column.getDictionaryIds());
192 }
193 column.setDictionary(null);
194 switch (descriptor.getType()) {
195 case BOOLEAN:
196 readBooleanBatch(rowId, num, column);
197 break;
198 case INT32:
199 readIntBatch(rowId, num, column);
200 break;
201 case INT64:
202 readLongBatch(rowId, num, column);
203 break;
204 case INT96:
205 readBinaryBatch(rowId, num, column);
206 break;
207 case FLOAT:
208 readFloatBatch(rowId, num, column);
209 break;
210 case DOUBLE:
211 readDoubleBatch(rowId, num, column);
212 break;
213 case BINARY:
214 readBinaryBatch(rowId, num, column);
215 break;
216 case FIXED_LEN_BYTE_ARRAY:
217 readFixedLenByteArrayBatch(rowId, num, column, descriptor.getTypeLength());
218 break;
219 default:
220 throw new IOException("Unsupported type: " + descriptor.getType());
221 }
222 }
223
224 valuesRead += num;
225 rowId += num;
226 total -= num;
227 }
228 }
229
230 private boolean shouldConvertTimestamps() {
231 return convertTz != null && !convertTz.equals(UTC);
232 }
233
234 /**
235 * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`.
236 */
237 private void decodeDictionaryIds(
238 int rowId,
239 int num,
240 WritableColumnVector column,
241 WritableColumnVector dictionaryIds) {
242 switch (descriptor.getType()) {
243 case INT32:
244 if (column.dataType() == DataTypes.IntegerType ||
245 DecimalType.is32BitDecimalType(column.dataType())) {
246 for (int i = rowId; i < rowId + num; ++i) {
247 if (!column.isNullAt(i)) {
248 column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i)));
249 }
250 }
251 } else if (column.dataType() == DataTypes.ByteType) {
252 for (int i = rowId; i < rowId + num; ++i) {
253 if (!column.isNullAt(i)) {
254 column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
255 }
256 }
257 } else if (column.dataType() == DataTypes.ShortType) {
258 for (int i = rowId; i < rowId + num; ++i) {
259 if (!column.isNullAt(i)) {
260 column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
261 }
262 }
263 } else {
264 throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
265 }
266 break;
267
268 case INT64:
269 if (column.dataType() == DataTypes.LongType ||
270 DecimalType.is64BitDecimalType(column.dataType()) ||
271 originalType == OriginalType.TIMESTAMP_MICROS) {
272 for (int i = rowId; i < rowId + num; ++i) {
273 if (!column.isNullAt(i)) {
274 column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
275 }
276 }
277 } else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
278 for (int i = rowId; i < rowId + num; ++i) {
279 if (!column.isNullAt(i)) {
280 column.putLong(i,
281 DateTimeUtils.fromMillis(dictionary.decodeToLong(dictionaryIds.getDictId(i))));
282 }
283 }
284 } else {
285 throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
286 }
287 break;
288
289 case FLOAT:
290 for (int i = rowId; i < rowId + num; ++i) {
291 if (!column.isNullAt(i)) {
292 column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getDictId(i)));
293 }
294 }
295 break;
296
297 case DOUBLE:
298 for (int i = rowId; i < rowId + num; ++i) {
299 if (!column.isNullAt(i)) {
300 column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getDictId(i)));
301 }
302 }
303 break;
304 case INT96:
305 if (column.dataType() == DataTypes.TimestampType) {
306 if (!shouldConvertTimestamps()) {
307 for (int i = rowId; i < rowId + num; ++i) {
308 if (!column.isNullAt(i)) {
309 Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
310 column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
311 }
312 }
313 } else {
314 for (int i = rowId; i < rowId + num; ++i) {
315 if (!column.isNullAt(i)) {
316 Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
317 long rawTime = ParquetRowConverter.binaryToSQLTimestamp(v);
318 long adjTime = DateTimeUtils.convertTz(rawTime, convertTz, UTC);
319 column.putLong(i, adjTime);
320 }
321 }
322 }
323 } else {
324 throw new UnsupportedOperationException();
325 }
326 break;
327 case BINARY:
328 // TODO: this is incredibly inefficient as it blows up the dictionary right here. We
329 // need to do this better. We should probably add the dictionary data to the ColumnVector
330 // and reuse it across batches. This should mean adding a ByteArray would just update
331 // the length and offset.
332 for (int i = rowId; i < rowId + num; ++i) {
333 if (!column.isNullAt(i)) {
334 Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
335 column.putByteArray(i, v.getBytes());
336 }
337 }
338 break;
339 case FIXED_LEN_BYTE_ARRAY:
340 // DecimalType written in the legacy mode
341 if (DecimalType.is32BitDecimalType(column.dataType())) {
342 for (int i = rowId; i < rowId + num; ++i) {
343 if (!column.isNullAt(i)) {
344 Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
345 column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v));
346 }
347 }
348 } else if (DecimalType.is64BitDecimalType(column.dataType())) {
349 for (int i = rowId; i < rowId + num; ++i) {
350 if (!column.isNullAt(i)) {
351 Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
352 column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v));
353 }
354 }
355 } else if (DecimalType.isByteArrayDecimalType(column.dataType())) {
356 for (int i = rowId; i < rowId + num; ++i) {
357 if (!column.isNullAt(i)) {
358 Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
359 column.putByteArray(i, v.getBytes());
360 }
361 }
362 } else {
363 throw new UnsupportedOperationException();
364 }
365 break;
366
367 default:
368 throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType());
369 }
370 }
371
372 /**
373 * For all the read*Batch functions, reads `num` values from this columnReader into column. It
374 * is guaranteed that num is smaller than the number of values left in the current page.
375 */
376
377 private void readBooleanBatch(int rowId, int num, WritableColumnVector column) {
378 assert(column.dataType() == DataTypes.BooleanType);
379 defColumn.readBooleans(
380 num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
381 }
382
383 private void readIntBatch(int rowId, int num, WritableColumnVector column) {
384 // This is where we implement support for the valid type conversions.
385 // TODO: implement remaining type conversions
386 if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType ||
387 DecimalType.is32BitDecimalType(column.dataType())) {
388 defColumn.readIntegers(
389 num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
390 } else if (column.dataType() == DataTypes.ByteType) {
391 defColumn.readBytes(
392 num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
393 } else if (column.dataType() == DataTypes.ShortType) {
394 defColumn.readShorts(
395 num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
396 } else {
397 throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
398 }
399 }
400
401 private void readLongBatch(int rowId, int num, WritableColumnVector column) {
402 // This is where we implement support for the valid type conversions.
403 if (column.dataType() == DataTypes.LongType ||
404 DecimalType.is64BitDecimalType(column.dataType()) ||
405 originalType == OriginalType.TIMESTAMP_MICROS) {
406 defColumn.readLongs(
407 num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
408 } else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
409 for (int i = 0; i < num; i++) {
410 if (defColumn.readInteger() == maxDefLevel) {
411 column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong()));
412 } else {
413 column.putNull(rowId + i);
414 }
415 }
416 } else {
417 throw new UnsupportedOperationException("Unsupported conversion to: " + column.dataType());
418 }
419 }
420
421 private void readFloatBatch(int rowId, int num, WritableColumnVector column) {
422 // This is where we implement support for the valid type conversions.
423 // TODO: support implicit cast to double?
424 if (column.dataType() == DataTypes.FloatType) {
425 defColumn.readFloats(
426 num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
427 } else {
428 throw new UnsupportedOperationException("Unsupported conversion to: " + column.dataType());
429 }
430 }
431
432 private void readDoubleBatch(int rowId, int num, WritableColumnVector column) {
433 // This is where we implement support for the valid type conversions.
434 // TODO: implement remaining type conversions
435 if (column.dataType() == DataTypes.DoubleType) {
436 defColumn.readDoubles(
437 num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
438 } else {
439 throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
440 }
441 }
442
443 private void readBinaryBatch(int rowId, int num, WritableColumnVector column) {
444 // This is where we implement support for the valid type conversions.
445 // TODO: implement remaining type conversions
446 VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
447 if (column.dataType() == DataTypes.StringType || column.dataType() == DataTypes.BinaryType
448 || DecimalType.isByteArrayDecimalType(column.dataType())) {
449 defColumn.readBinarys(num, column, rowId, maxDefLevel, data);
450 } else if (column.dataType() == DataTypes.TimestampType) {
451 if (!shouldConvertTimestamps()) {
452 for (int i = 0; i < num; i++) {
453 if (defColumn.readInteger() == maxDefLevel) {
454 // Read 12 bytes for INT96
455 long rawTime = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12));
456 column.putLong(rowId + i, rawTime);
457 } else {
458 column.putNull(rowId + i);
459 }
460 }
461 } else {
462 for (int i = 0; i < num; i++) {
463 if (defColumn.readInteger() == maxDefLevel) {
464 // Read 12 bytes for INT96
465 long rawTime = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12));
466 long adjTime = DateTimeUtils.convertTz(rawTime, convertTz, UTC);
467 column.putLong(rowId + i, adjTime);
468 } else {
469 column.putNull(rowId + i);
470 }
471 }
472 }
473 } else {
474 throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
475 }
476 }
477
478 private void readFixedLenByteArrayBatch(
479 int rowId,
480 int num,
481 WritableColumnVector column,
482 int arrayLen) {
483 VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
484 // This is where we implement support for the valid type conversions.
485 // TODO: implement remaining type conversions
486 if (DecimalType.is32BitDecimalType(column.dataType())) {
487 for (int i = 0; i < num; i++) {
488 if (defColumn.readInteger() == maxDefLevel) {
489 column.putInt(rowId + i,
490 (int) ParquetRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
491 } else {
492 column.putNull(rowId + i);
493 }
494 }
495 } else if (DecimalType.is64BitDecimalType(column.dataType())) {
496 for (int i = 0; i < num; i++) {
497 if (defColumn.readInteger() == maxDefLevel) {
498 column.putLong(rowId + i,
499 ParquetRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
500 } else {
501 column.putNull(rowId + i);
502 }
503 }
504 } else if (DecimalType.isByteArrayDecimalType(column.dataType())) {
505 for (int i = 0; i < num; i++) {
506 if (defColumn.readInteger() == maxDefLevel) {
507 column.putByteArray(rowId + i, data.readBinary(arrayLen).getBytes());
508 } else {
509 column.putNull(rowId + i);
510 }
511 }
512 } else {
513 throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
514 }
515 }
516
517 private void readPage() {
518 DataPage page = pageReader.readPage();
519 // TODO: Why is this a visitor?
520 page.accept(new DataPage.Visitor<Void>() {
521 @Override
522 public Void visit(DataPageV1 dataPageV1) {
523 try {
524 readPageV1(dataPageV1);
525 return null;
526 } catch (IOException e) {
527 throw new RuntimeException(e);
528 }
529 }
530
531 @Override
532 public Void visit(DataPageV2 dataPageV2) {
533 try {
534 readPageV2(dataPageV2);
535 return null;
536 } catch (IOException e) {
537 throw new RuntimeException(e);
538 }
539 }
540 });
541 }
542
543 private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset) throws IOException {
544 this.endOfPageValueCount = valuesRead + pageValueCount;
545 if (dataEncoding.usesDictionary()) {
546 this.dataColumn = null;
547 if (dictionary == null) {
548 throw new IOException(
549 "could not read page in col " + descriptor +
550 " as the dictionary was missing for encoding " + dataEncoding);
551 }
552 @SuppressWarnings("deprecation")
553 Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression
554 if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) {
555 throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
556 }
557 this.dataColumn = new VectorizedRleValuesReader();
558 this.isCurrentPageDictionaryEncoded = true;
559 } else {
560 if (dataEncoding != Encoding.PLAIN) {
561 throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
562 }
563 this.dataColumn = new VectorizedPlainValuesReader();
564 this.isCurrentPageDictionaryEncoded = false;
565 }
566
567 try {
568 dataColumn.initFromPage(pageValueCount, bytes, offset);
569 } catch (IOException e) {
570 throw new IOException("could not read page in col " + descriptor, e);
571 }
572 }
573
574 private void readPageV1(DataPageV1 page) throws IOException {
575 this.pageValueCount = page.getValueCount();
576 ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
577 ValuesReader dlReader;
578
579 // Initialize the decoders.
580 if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) {
581 throw new UnsupportedOperationException("Unsupported encoding: " + page.getDlEncoding());
582 }
583 int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
584 this.defColumn = new VectorizedRleValuesReader(bitWidth);
585 dlReader = this.defColumn;
586 this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
587 this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
588 try {
589 byte[] bytes = page.getBytes().toByteArray();
590 rlReader.initFromPage(pageValueCount, bytes, 0);
591 int next = rlReader.getNextOffset();
592 dlReader.initFromPage(pageValueCount, bytes, next);
593 next = dlReader.getNextOffset();
594 initDataReader(page.getValueEncoding(), bytes, next);
595 } catch (IOException e) {
596 throw new IOException("could not read page " + page + " in col " + descriptor, e);
597 }
598 }
599
600 private void readPageV2(DataPageV2 page) throws IOException {
601 this.pageValueCount = page.getValueCount();
602 this.repetitionLevelColumn = createRLEIterator(descriptor.getMaxRepetitionLevel(),
603 page.getRepetitionLevels(), descriptor);
604
605 int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
606 this.defColumn = new VectorizedRleValuesReader(bitWidth);
607 this.definitionLevelColumn = new ValuesReaderIntIterator(this.defColumn);
608 this.defColumn.initFromBuffer(
609 this.pageValueCount, page.getDefinitionLevels().toByteArray());
610 try {
611 initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0);
612 } catch (IOException e) {
613 throw new IOException("could not read page " + page + " in col " + descriptor, e);
614 }
615 }
616 }