ARROW-439: [Python] Add option in "to_pandas" conversions to yield Categorical from...
[arrow.git] / cpp / src / arrow / python / arrow_to_pandas.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 // Functions for pandas conversion via NumPy
19
20 #include "arrow/python/numpy_interop.h"
21
22 #include "arrow/python/arrow_to_pandas.h"
23
24 #include <algorithm>
25 #include <cmath>
26 #include <cstdint>
27 #include <memory>
28 #include <sstream>
29 #include <string>
30 #include <unordered_map>
31 #include <vector>
32
33 #include "arrow/array.h"
34 #include "arrow/status.h"
35 #include "arrow/table.h"
36 #include "arrow/type_fwd.h"
37 #include "arrow/type_traits.h"
38 #include "arrow/util/bit-util.h"
39 #include "arrow/util/decimal.h"
40 #include "arrow/util/logging.h"
41 #include "arrow/util/macros.h"
42 #include "arrow/util/parallel.h"
43 #include "arrow/visitor_inline.h"
44
45 #include "arrow/python/builtin_convert.h"
46 #include "arrow/python/common.h"
47 #include "arrow/python/config.h"
48 #include "arrow/python/helpers.h"
49 #include "arrow/python/numpy-internal.h"
50 #include "arrow/python/numpy_convert.h"
51 #include "arrow/python/type_traits.h"
52 #include "arrow/python/util/datetime.h"
53
54 namespace arrow {
55 namespace py {
56
57 using internal::kPandasTimestampNull;
58 using internal::kNanosecondsInDay;
59
60 // ----------------------------------------------------------------------
61 // Utility code
62
63 template <typename T>
64 struct WrapBytes {};
65
66 template <>
67 struct WrapBytes<StringArray> {
68 static inline PyObject* Wrap(const uint8_t* data, int64_t length) {
69 return PyUnicode_FromStringAndSize(reinterpret_cast<const char*>(data), length);
70 }
71 };
72
73 template <>
74 struct WrapBytes<BinaryArray> {
75 static inline PyObject* Wrap(const uint8_t* data, int64_t length) {
76 return PyBytes_FromStringAndSize(reinterpret_cast<const char*>(data), length);
77 }
78 };
79
80 template <>
81 struct WrapBytes<FixedSizeBinaryArray> {
82 static inline PyObject* Wrap(const uint8_t* data, int64_t length) {
83 return PyBytes_FromStringAndSize(reinterpret_cast<const char*>(data), length);
84 }
85 };
86
87 static inline bool ListTypeSupported(const DataType& type) {
88 switch (type.id()) {
89 case Type::UINT8:
90 case Type::INT8:
91 case Type::UINT16:
92 case Type::INT16:
93 case Type::UINT32:
94 case Type::INT32:
95 case Type::INT64:
96 case Type::UINT64:
97 case Type::FLOAT:
98 case Type::DOUBLE:
99 case Type::STRING:
100 case Type::TIMESTAMP:
101 // The above types are all supported.
102 return true;
103 case Type::LIST: {
104 const ListType& list_type = static_cast<const ListType&>(type);
105 return ListTypeSupported(*list_type.value_type());
106 }
107 default:
108 break;
109 }
110 return false;
111 }
112
113 // ----------------------------------------------------------------------
114 // pandas 0.x DataFrame conversion internals
115
116 inline void set_numpy_metadata(int type, DataType* datatype, PyArray_Descr* out) {
117 if (type == NPY_DATETIME) {
118 auto date_dtype = reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(out->c_metadata);
119 if (datatype->id() == Type::TIMESTAMP) {
120 auto timestamp_type = static_cast<TimestampType*>(datatype);
121
122 switch (timestamp_type->unit()) {
123 case TimestampType::Unit::SECOND:
124 date_dtype->meta.base = NPY_FR_s;
125 break;
126 case TimestampType::Unit::MILLI:
127 date_dtype->meta.base = NPY_FR_ms;
128 break;
129 case TimestampType::Unit::MICRO:
130 date_dtype->meta.base = NPY_FR_us;
131 break;
132 case TimestampType::Unit::NANO:
133 date_dtype->meta.base = NPY_FR_ns;
134 break;
135 }
136 } else {
137 // datatype->type == Type::DATE64
138 date_dtype->meta.base = NPY_FR_D;
139 }
140 }
141 }
142
143 static inline PyArray_Descr* GetSafeNumPyDtype(int type) {
144 if (type == NPY_DATETIME) {
145 // It is not safe to mutate the result of DescrFromType
146 return PyArray_DescrNewFromType(type);
147 } else {
148 return PyArray_DescrFromType(type);
149 }
150 }
151 static inline PyObject* NewArray1DFromType(DataType* arrow_type, int type, int64_t length,
152 void* data) {
153 npy_intp dims[1] = {length};
154
155 PyArray_Descr* descr = GetSafeNumPyDtype(type);
156 if (descr == nullptr) {
157 // Error occurred, trust error state is set
158 return nullptr;
159 }
160
161 set_numpy_metadata(type, arrow_type, descr);
162 return PyArray_NewFromDescr(&PyArray_Type, descr, 1, dims, nullptr, data,
163 NPY_ARRAY_OWNDATA | NPY_ARRAY_CARRAY | NPY_ARRAY_WRITEABLE,
164 nullptr);
165 }
166
167 class PandasBlock {
168 public:
169 enum type {
170 OBJECT,
171 UINT8,
172 INT8,
173 UINT16,
174 INT16,
175 UINT32,
176 INT32,
177 UINT64,
178 INT64,
179 FLOAT,
180 DOUBLE,
181 BOOL,
182 DATETIME,
183 DATETIME_WITH_TZ,
184 CATEGORICAL
185 };
186
187 PandasBlock(PandasOptions options, int64_t num_rows, int num_columns)
188 : num_rows_(num_rows), num_columns_(num_columns), options_(options) {}
189 virtual ~PandasBlock() {}
190
191 virtual Status Allocate() = 0;
192 virtual Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
193 int64_t rel_placement) = 0;
194
195 PyObject* block_arr() const { return block_arr_.obj(); }
196
197 virtual Status GetPyResult(PyObject** output) {
198 PyObject* result = PyDict_New();
199 RETURN_IF_PYERROR();
200
201 PyDict_SetItemString(result, "block", block_arr_.obj());
202 PyDict_SetItemString(result, "placement", placement_arr_.obj());
203
204 *output = result;
205
206 return Status::OK();
207 }
208
209 protected:
210 Status AllocateNDArray(int npy_type, int ndim = 2) {
211 PyAcquireGIL lock;
212
213 PyArray_Descr* descr = GetSafeNumPyDtype(npy_type);
214
215 PyObject* block_arr;
216 if (ndim == 2) {
217 npy_intp block_dims[2] = {num_columns_, num_rows_};
218 block_arr = PyArray_SimpleNewFromDescr(2, block_dims, descr);
219 } else {
220 npy_intp block_dims[1] = {num_rows_};
221 block_arr = PyArray_SimpleNewFromDescr(1, block_dims, descr);
222 }
223
224 if (block_arr == NULL) {
225 // TODO(wesm): propagating Python exception
226 return Status::OK();
227 }
228
229 PyArray_ENABLEFLAGS(reinterpret_cast<PyArrayObject*>(block_arr), NPY_ARRAY_OWNDATA);
230
231 npy_intp placement_dims[1] = {num_columns_};
232 PyObject* placement_arr = PyArray_SimpleNew(1, placement_dims, NPY_INT64);
233 if (placement_arr == NULL) {
234 // TODO(wesm): propagating Python exception
235 return Status::OK();
236 }
237
238 block_arr_.reset(block_arr);
239 placement_arr_.reset(placement_arr);
240
241 block_data_ = reinterpret_cast<uint8_t*>(
242 PyArray_DATA(reinterpret_cast<PyArrayObject*>(block_arr)));
243
244 placement_data_ = reinterpret_cast<int64_t*>(
245 PyArray_DATA(reinterpret_cast<PyArrayObject*>(placement_arr)));
246
247 return Status::OK();
248 }
249
250 int64_t num_rows_;
251 int num_columns_;
252
253 OwnedRef block_arr_;
254 uint8_t* block_data_;
255
256 PandasOptions options_;
257
258 // ndarray<int32>
259 OwnedRef placement_arr_;
260 int64_t* placement_data_;
261
262 private:
263 DISALLOW_COPY_AND_ASSIGN(PandasBlock);
264 };
265
266 template <typename T>
267 inline void ConvertIntegerWithNulls(PandasOptions options, const ChunkedArray& data,
268 double* out_values) {
269 for (int c = 0; c < data.num_chunks(); c++) {
270 const auto& arr = static_cast<const PrimitiveArray&>(*data.chunk(c));
271 auto in_values = reinterpret_cast<const T*>(arr.raw_values());
272 // Upcast to double, set NaN as appropriate
273
274 for (int i = 0; i < arr.length(); ++i) {
275 *out_values++ = arr.IsNull(i) ? NAN : static_cast<double>(in_values[i]);
276 }
277 }
278 }
279
280 template <typename T>
281 inline void ConvertIntegerNoNullsSameType(PandasOptions options, const ChunkedArray& data,
282 T* out_values) {
283 for (int c = 0; c < data.num_chunks(); c++) {
284 const auto& arr = static_cast<const PrimitiveArray&>(*data.chunk(c));
285 auto in_values = reinterpret_cast<const T*>(arr.raw_values());
286 memcpy(out_values, in_values, sizeof(T) * arr.length());
287 out_values += arr.length();
288 }
289 }
290
291 template <typename InType, typename OutType>
292 inline void ConvertIntegerNoNullsCast(PandasOptions options, const ChunkedArray& data,
293 OutType* out_values) {
294 for (int c = 0; c < data.num_chunks(); c++) {
295 const auto& arr = static_cast<const PrimitiveArray&>(*data.chunk(c));
296 auto in_values = reinterpret_cast<const InType*>(arr.raw_values());
297 for (int64_t i = 0; i < arr.length(); ++i) {
298 *out_values = in_values[i];
299 }
300 }
301 }
302
303 static Status ConvertBooleanWithNulls(PandasOptions options, const ChunkedArray& data,
304 PyObject** out_values) {
305 PyAcquireGIL lock;
306 for (int c = 0; c < data.num_chunks(); c++) {
307 const std::shared_ptr<Array> arr = data.chunk(c);
308 auto bool_arr = static_cast<BooleanArray*>(arr.get());
309
310 for (int64_t i = 0; i < arr->length(); ++i) {
311 if (bool_arr->IsNull(i)) {
312 Py_INCREF(Py_None);
313 *out_values++ = Py_None;
314 } else if (bool_arr->Value(i)) {
315 // True
316 Py_INCREF(Py_True);
317 *out_values++ = Py_True;
318 } else {
319 // False
320 Py_INCREF(Py_False);
321 *out_values++ = Py_False;
322 }
323 }
324 }
325 return Status::OK();
326 }
327
328 static void ConvertBooleanNoNulls(PandasOptions options, const ChunkedArray& data,
329 uint8_t* out_values) {
330 for (int c = 0; c < data.num_chunks(); c++) {
331 const std::shared_ptr<Array> arr = data.chunk(c);
332 auto bool_arr = static_cast<BooleanArray*>(arr.get());
333 for (int64_t i = 0; i < arr->length(); ++i) {
334 *out_values++ = static_cast<uint8_t>(bool_arr->Value(i));
335 }
336 }
337 }
338
339 template <typename Type>
340 inline Status ConvertBinaryLike(PandasOptions options, const ChunkedArray& data,
341 PyObject** out_values) {
342 using ArrayType = typename TypeTraits<Type>::ArrayType;
343 PyAcquireGIL lock;
344 for (int c = 0; c < data.num_chunks(); c++) {
345 auto arr = static_cast<ArrayType*>(data.chunk(c).get());
346
347 const uint8_t* data_ptr;
348 int32_t length;
349 const bool has_nulls = data.null_count() > 0;
350 for (int64_t i = 0; i < arr->length(); ++i) {
351 if (has_nulls && arr->IsNull(i)) {
352 Py_INCREF(Py_None);
353 *out_values = Py_None;
354 } else {
355 data_ptr = arr->GetValue(i, &length);
356 *out_values = WrapBytes<ArrayType>::Wrap(data_ptr, length);
357 if (*out_values == nullptr) {
358 PyErr_Clear();
359 std::stringstream ss;
360 ss << "Wrapping "
361 << std::string(reinterpret_cast<const char*>(data_ptr), length) << " failed";
362 return Status::UnknownError(ss.str());
363 }
364 }
365 ++out_values;
366 }
367 }
368 return Status::OK();
369 }
370
371 inline Status ConvertNulls(PandasOptions options, const ChunkedArray& data,
372 PyObject** out_values) {
373 PyAcquireGIL lock;
374 for (int c = 0; c < data.num_chunks(); c++) {
375 std::shared_ptr<Array> arr = data.chunk(c);
376
377 for (int64_t i = 0; i < arr->length(); ++i) {
378 // All values are null
379 Py_INCREF(Py_None);
380 *out_values = Py_None;
381 ++out_values;
382 }
383 }
384 return Status::OK();
385 }
386
387 inline Status ConvertFixedSizeBinary(PandasOptions options, const ChunkedArray& data,
388 PyObject** out_values) {
389 PyAcquireGIL lock;
390 for (int c = 0; c < data.num_chunks(); c++) {
391 auto arr = static_cast<FixedSizeBinaryArray*>(data.chunk(c).get());
392
393 const uint8_t* data_ptr;
394 int32_t length =
395 std::dynamic_pointer_cast<FixedSizeBinaryType>(arr->type())->byte_width();
396 const bool has_nulls = data.null_count() > 0;
397 for (int64_t i = 0; i < arr->length(); ++i) {
398 if (has_nulls && arr->IsNull(i)) {
399 Py_INCREF(Py_None);
400 *out_values = Py_None;
401 } else {
402 data_ptr = arr->GetValue(i);
403 *out_values = WrapBytes<FixedSizeBinaryArray>::Wrap(data_ptr, length);
404 if (*out_values == nullptr) {
405 PyErr_Clear();
406 std::stringstream ss;
407 ss << "Wrapping "
408 << std::string(reinterpret_cast<const char*>(data_ptr), length) << " failed";
409 return Status::UnknownError(ss.str());
410 }
411 }
412 ++out_values;
413 }
414 }
415 return Status::OK();
416 }
417
418 inline Status ConvertStruct(PandasOptions options, const ChunkedArray& data,
419 PyObject** out_values) {
420 PyAcquireGIL lock;
421 if (data.num_chunks() <= 0) {
422 return Status::OK();
423 }
424 // ChunkedArray has at least one chunk
425 auto arr = static_cast<const StructArray*>(data.chunk(0).get());
426 // Use it to cache the struct type and number of fields for all chunks
427 int32_t num_fields = arr->num_fields();
428 auto array_type = arr->type();
429 std::vector<OwnedRef> fields_data(num_fields);
430 OwnedRef dict_item;
431 for (int c = 0; c < data.num_chunks(); c++) {
432 auto arr = static_cast<const StructArray*>(data.chunk(c).get());
433 // Convert the struct arrays first
434 for (int32_t i = 0; i < num_fields; i++) {
435 PyObject* numpy_array;
436 RETURN_NOT_OK(ConvertArrayToPandas(options, arr->field(static_cast<int>(i)),
437 nullptr, &numpy_array));
438 fields_data[i].reset(numpy_array);
439 }
440
441 // Construct a dictionary for each row
442 const bool has_nulls = data.null_count() > 0;
443 for (int64_t i = 0; i < arr->length(); ++i) {
444 if (has_nulls && arr->IsNull(i)) {
445 Py_INCREF(Py_None);
446 *out_values = Py_None;
447 } else {
448 // Build the new dict object for the row
449 dict_item.reset(PyDict_New());
450 RETURN_IF_PYERROR();
451 for (int32_t field_idx = 0; field_idx < num_fields; ++field_idx) {
452 OwnedRef field_value;
453 auto name = array_type->child(static_cast<int>(field_idx))->name();
454 if (!arr->field(static_cast<int>(field_idx))->IsNull(i)) {
455 // Value exists in child array, obtain it
456 auto array = reinterpret_cast<PyArrayObject*>(fields_data[field_idx].obj());
457 auto ptr = reinterpret_cast<const char*>(PyArray_GETPTR1(array, i));
458 field_value.reset(PyArray_GETITEM(array, ptr));
459 RETURN_IF_PYERROR();
460 } else {
461 // Translate the Null to a None
462 Py_INCREF(Py_None);
463 field_value.reset(Py_None);
464 }
465 // PyDict_SetItemString does not steal the value reference
466 auto setitem_result =
467 PyDict_SetItemString(dict_item.obj(), name.c_str(), field_value.obj());
468 RETURN_IF_PYERROR();
469 DCHECK_EQ(setitem_result, 0);
470 }
471 *out_values = dict_item.obj();
472 // Grant ownership to the resulting array
473 Py_INCREF(*out_values);
474 }
475 ++out_values;
476 }
477 }
478 return Status::OK();
479 }
480
481 template <typename ArrowType>
482 inline Status ConvertListsLike(PandasOptions options, const std::shared_ptr<Column>& col,
483 PyObject** out_values) {
484 const ChunkedArray& data = *col->data().get();
485 auto list_type = std::static_pointer_cast<ListType>(col->type());
486
487 // Get column of underlying value arrays
488 std::vector<std::shared_ptr<Array>> value_arrays;
489 for (int c = 0; c < data.num_chunks(); c++) {
490 auto arr = std::static_pointer_cast<ListArray>(data.chunk(c));
491 value_arrays.emplace_back(arr->values());
492 }
493 auto flat_column = std::make_shared<Column>(list_type->value_field(), value_arrays);
494 // TODO(ARROW-489): Currently we don't have a Python reference for single columns.
495 // Storing a reference to the whole Array would be to expensive.
496 PyObject* numpy_array;
497 RETURN_NOT_OK(ConvertColumnToPandas(options, flat_column, nullptr, &numpy_array));
498
499 PyAcquireGIL lock;
500
501 for (int c = 0; c < data.num_chunks(); c++) {
502 auto arr = std::static_pointer_cast<ListArray>(data.chunk(c));
503
504 const bool has_nulls = data.null_count() > 0;
505 for (int64_t i = 0; i < arr->length(); ++i) {
506 if (has_nulls && arr->IsNull(i)) {
507 Py_INCREF(Py_None);
508 *out_values = Py_None;
509 } else {
510 PyObject* start = PyLong_FromLong(arr->value_offset(i));
511 PyObject* end = PyLong_FromLong(arr->value_offset(i + 1));
512 PyObject* slice = PySlice_New(start, end, NULL);
513 *out_values = PyObject_GetItem(numpy_array, slice);
514 Py_DECREF(start);
515 Py_DECREF(end);
516 Py_DECREF(slice);
517 }
518 ++out_values;
519 }
520 }
521
522 Py_XDECREF(numpy_array);
523 return Status::OK();
524 }
525
526 template <typename T>
527 inline void ConvertNumericNullable(const ChunkedArray& data, T na_value, T* out_values) {
528 for (int c = 0; c < data.num_chunks(); c++) {
529 const auto& arr = static_cast<const PrimitiveArray&>(*data.chunk(c));
530 auto in_values = reinterpret_cast<const T*>(arr.raw_values());
531
532 const uint8_t* valid_bits = arr.null_bitmap_data();
533
534 if (arr.null_count() > 0) {
535 for (int64_t i = 0; i < arr.length(); ++i) {
536 *out_values++ = BitUtil::BitNotSet(valid_bits, i) ? na_value : in_values[i];
537 }
538 } else {
539 memcpy(out_values, in_values, sizeof(T) * arr.length());
540 out_values += arr.length();
541 }
542 }
543 }
544
545 template <typename InType, typename OutType>
546 inline void ConvertNumericNullableCast(const ChunkedArray& data, OutType na_value,
547 OutType* out_values) {
548 for (int c = 0; c < data.num_chunks(); c++) {
549 const auto& arr = static_cast<const PrimitiveArray&>(*data.chunk(c));
550 auto in_values = reinterpret_cast<const InType*>(arr.raw_values());
551
552 for (int64_t i = 0; i < arr.length(); ++i) {
553 *out_values++ = arr.IsNull(i) ? na_value : static_cast<OutType>(in_values[i]);
554 }
555 }
556 }
557
558 template <typename InType, int64_t SHIFT>
559 inline void ConvertDatetimeNanos(const ChunkedArray& data, int64_t* out_values) {
560 for (int c = 0; c < data.num_chunks(); c++) {
561 const auto& arr = static_cast<const PrimitiveArray&>(*data.chunk(c));
562 auto in_values = reinterpret_cast<const InType*>(arr.raw_values());
563
564 for (int64_t i = 0; i < arr.length(); ++i) {
565 *out_values++ = arr.IsNull(i) ? kPandasTimestampNull
566 : (static_cast<int64_t>(in_values[i]) * SHIFT);
567 }
568 }
569 }
570
571 template <typename TYPE>
572 static Status ConvertTimes(PandasOptions options, const ChunkedArray& data,
573 PyObject** out_values) {
574 using ArrayType = typename TypeTraits<TYPE>::ArrayType;
575
576 PyAcquireGIL lock;
577 OwnedRef time_ref;
578
579 PyDateTime_IMPORT;
580
581 for (int c = 0; c < data.num_chunks(); c++) {
582 const auto& arr = static_cast<const ArrayType&>(*data.chunk(c));
583 auto type = std::dynamic_pointer_cast<TYPE>(arr.type());
584 DCHECK(type);
585
586 const TimeUnit::type unit = type->unit();
587
588 for (int64_t i = 0; i < arr.length(); ++i) {
589 if (arr.IsNull(i)) {
590 Py_INCREF(Py_None);
591 *out_values++ = Py_None;
592 } else {
593 RETURN_NOT_OK(PyTime_from_int(arr.Value(i), unit, out_values++));
594 RETURN_IF_PYERROR();
595 }
596 }
597 }
598
599 return Status::OK();
600 }
601
602 template <typename T>
603 Status ValidateDecimalPrecision(int precision) {
604 constexpr static const int maximum_precision = decimal::DecimalPrecision<T>::maximum;
605 if (!(precision > 0 && precision <= maximum_precision)) {
606 std::stringstream ss;
607 ss << "Invalid precision: " << precision << ". Minimum is 1, maximum is "
608 << maximum_precision;
609 return Status::Invalid(ss.str());
610 }
611 return Status::OK();
612 }
613
614 template <typename T>
615 Status RawDecimalToString(const uint8_t* bytes, int precision, int scale,
616 std::string* result) {
617 DCHECK_NE(bytes, nullptr);
618 DCHECK_NE(result, nullptr);
619 RETURN_NOT_OK(ValidateDecimalPrecision<T>(precision));
620 decimal::Decimal<T> decimal;
621 FromBytes(bytes, &decimal);
622 *result = ToString(decimal, precision, scale);
623 return Status::OK();
624 }
625
626 template Status RawDecimalToString<int32_t>(const uint8_t*, int, int,
627 std::string* result);
628 template Status RawDecimalToString<int64_t>(const uint8_t*, int, int,
629 std::string* result);
630
631 Status RawDecimalToString(const uint8_t* bytes, int precision, int scale,
632 bool is_negative, std::string* result) {
633 DCHECK_NE(bytes, nullptr);
634 DCHECK_NE(result, nullptr);
635 RETURN_NOT_OK(ValidateDecimalPrecision<boost::multiprecision::int128_t>(precision));
636 decimal::Decimal128 decimal;
637 FromBytes(bytes, is_negative, &decimal);
638 *result = ToString(decimal, precision, scale);
639 return Status::OK();
640 }
641
642 static Status ConvertDecimals(PandasOptions options, const ChunkedArray& data,
643 PyObject** out_values) {
644 PyAcquireGIL lock;
645 OwnedRef decimal_ref;
646 OwnedRef Decimal_ref;
647 RETURN_NOT_OK(ImportModule("decimal", &decimal_ref));
648 RETURN_NOT_OK(ImportFromModule(decimal_ref, "Decimal", &Decimal_ref));
649 PyObject* Decimal = Decimal_ref.obj();
650
651 for (int c = 0; c < data.num_chunks(); c++) {
652 auto* arr(static_cast<arrow::DecimalArray*>(data.chunk(c).get()));
653 auto type(std::dynamic_pointer_cast<arrow::DecimalType>(arr->type()));
654 const int precision = type->precision();
655 const int scale = type->scale();
656 const int bit_width = type->bit_width();
657
658 for (int64_t i = 0; i < arr->length(); ++i) {
659 if (arr->IsNull(i)) {
660 Py_INCREF(Py_None);
661 *out_values++ = Py_None;
662 } else {
663 const uint8_t* raw_value = arr->GetValue(i);
664 std::string s;
665 switch (bit_width) {
666 case 32:
667 RETURN_NOT_OK(RawDecimalToString<int32_t>(raw_value, precision, scale, &s));
668 break;
669 case 64:
670 RETURN_NOT_OK(RawDecimalToString<int64_t>(raw_value, precision, scale, &s));
671 break;
672 case 128:
673 RETURN_NOT_OK(
674 RawDecimalToString(raw_value, precision, scale, arr->IsNegative(i), &s));
675 break;
676 default:
677 break;
678 }
679 RETURN_NOT_OK(DecimalFromString(Decimal, s, out_values++));
680 }
681 }
682 }
683
684 return Status::OK();
685 }
686
687 #define CONVERTLISTSLIKE_CASE(ArrowType, ArrowEnum) \
688 case Type::ArrowEnum: \
689 RETURN_NOT_OK((ConvertListsLike<ArrowType>(options_, col, out_buffer))); \
690 break;
691
692 class ObjectBlock : public PandasBlock {
693 public:
694 using PandasBlock::PandasBlock;
695 Status Allocate() override { return AllocateNDArray(NPY_OBJECT); }
696
697 Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
698 int64_t rel_placement) override {
699 Type::type type = col->type()->id();
700
701 PyObject** out_buffer =
702 reinterpret_cast<PyObject**>(block_data_) + rel_placement * num_rows_;
703
704 const ChunkedArray& data = *col->data().get();
705
706 if (type == Type::BOOL) {
707 RETURN_NOT_OK(ConvertBooleanWithNulls(options_, data, out_buffer));
708 } else if (type == Type::BINARY) {
709 RETURN_NOT_OK(ConvertBinaryLike<BinaryType>(options_, data, out_buffer));
710 } else if (type == Type::STRING) {
711 RETURN_NOT_OK(ConvertBinaryLike<StringType>(options_, data, out_buffer));
712 } else if (type == Type::FIXED_SIZE_BINARY) {
713 RETURN_NOT_OK(ConvertFixedSizeBinary(options_, data, out_buffer));
714 } else if (type == Type::TIME32) {
715 RETURN_NOT_OK(ConvertTimes<Time32Type>(options_, data, out_buffer));
716 } else if (type == Type::TIME64) {
717 RETURN_NOT_OK(ConvertTimes<Time64Type>(options_, data, out_buffer));
718 } else if (type == Type::DECIMAL) {
719 RETURN_NOT_OK(ConvertDecimals(options_, data, out_buffer));
720 } else if (type == Type::NA) {
721 RETURN_NOT_OK(ConvertNulls(options_, data, out_buffer));
722 } else if (type == Type::LIST) {
723 auto list_type = std::static_pointer_cast<ListType>(col->type());
724 switch (list_type->value_type()->id()) {
725 CONVERTLISTSLIKE_CASE(UInt8Type, UINT8)
726 CONVERTLISTSLIKE_CASE(Int8Type, INT8)
727 CONVERTLISTSLIKE_CASE(UInt16Type, UINT16)
728 CONVERTLISTSLIKE_CASE(Int16Type, INT16)
729 CONVERTLISTSLIKE_CASE(UInt32Type, UINT32)
730 CONVERTLISTSLIKE_CASE(Int32Type, INT32)
731 CONVERTLISTSLIKE_CASE(UInt64Type, UINT64)
732 CONVERTLISTSLIKE_CASE(Int64Type, INT64)
733 CONVERTLISTSLIKE_CASE(TimestampType, TIMESTAMP)
734 CONVERTLISTSLIKE_CASE(FloatType, FLOAT)
735 CONVERTLISTSLIKE_CASE(DoubleType, DOUBLE)
736 CONVERTLISTSLIKE_CASE(StringType, STRING)
737 CONVERTLISTSLIKE_CASE(ListType, LIST)
738 default: {
739 std::stringstream ss;
740 ss << "Not implemented type for conversion from List to Pandas ObjectBlock: "
741 << list_type->value_type()->ToString();
742 return Status::NotImplemented(ss.str());
743 }
744 }
745 } else if (type == Type::STRUCT) {
746 RETURN_NOT_OK(ConvertStruct(options_, data, out_buffer));
747 } else {
748 std::stringstream ss;
749 ss << "Unsupported type for object array output: " << col->type()->ToString();
750 return Status::NotImplemented(ss.str());
751 }
752
753 placement_data_[rel_placement] = abs_placement;
754 return Status::OK();
755 }
756 };
757
758 template <int ARROW_TYPE, typename C_TYPE>
759 class IntBlock : public PandasBlock {
760 public:
761 using PandasBlock::PandasBlock;
762 Status Allocate() override {
763 return AllocateNDArray(internal::arrow_traits<ARROW_TYPE>::npy_type);
764 }
765
766 Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
767 int64_t rel_placement) override {
768 Type::type type = col->type()->id();
769
770 C_TYPE* out_buffer =
771 reinterpret_cast<C_TYPE*>(block_data_) + rel_placement * num_rows_;
772
773 const ChunkedArray& data = *col->data().get();
774
775 if (type != ARROW_TYPE) {
776 std::stringstream ss;
777 ss << "Cannot write Arrow data of type " << col->type()->ToString();
778 ss << " to a Pandas int" << sizeof(C_TYPE) << " block.";
779 return Status::NotImplemented(ss.str());
780 }
781
782 ConvertIntegerNoNullsSameType<C_TYPE>(options_, data, out_buffer);
783 placement_data_[rel_placement] = abs_placement;
784 return Status::OK();
785 }
786 };
787
788 using UInt8Block = IntBlock<Type::UINT8, uint8_t>;
789 using Int8Block = IntBlock<Type::INT8, int8_t>;
790 using UInt16Block = IntBlock<Type::UINT16, uint16_t>;
791 using Int16Block = IntBlock<Type::INT16, int16_t>;
792 using UInt32Block = IntBlock<Type::UINT32, uint32_t>;
793 using Int32Block = IntBlock<Type::INT32, int32_t>;
794 using UInt64Block = IntBlock<Type::UINT64, uint64_t>;
795 using Int64Block = IntBlock<Type::INT64, int64_t>;
796
797 class Float32Block : public PandasBlock {
798 public:
799 using PandasBlock::PandasBlock;
800 Status Allocate() override { return AllocateNDArray(NPY_FLOAT32); }
801
802 Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
803 int64_t rel_placement) override {
804 Type::type type = col->type()->id();
805
806 if (type != Type::FLOAT) {
807 std::stringstream ss;
808 ss << "Cannot write Arrow data of type " << col->type()->ToString();
809 ss << " to a Pandas float32 block.";
810 return Status::NotImplemented(ss.str());
811 }
812
813 float* out_buffer = reinterpret_cast<float*>(block_data_) + rel_placement * num_rows_;
814
815 ConvertNumericNullable<float>(*col->data().get(), NAN, out_buffer);
816 placement_data_[rel_placement] = abs_placement;
817 return Status::OK();
818 }
819 };
820
821 class Float64Block : public PandasBlock {
822 public:
823 using PandasBlock::PandasBlock;
824 Status Allocate() override { return AllocateNDArray(NPY_FLOAT64); }
825
826 Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
827 int64_t rel_placement) override {
828 Type::type type = col->type()->id();
829
830 double* out_buffer =
831 reinterpret_cast<double*>(block_data_) + rel_placement * num_rows_;
832
833 const ChunkedArray& data = *col->data().get();
834
835 #define INTEGER_CASE(IN_TYPE) \
836 ConvertIntegerWithNulls<IN_TYPE>(options_, data, out_buffer); \
837 break;
838
839 switch (type) {
840 case Type::UINT8:
841 INTEGER_CASE(uint8_t);
842 case Type::INT8:
843 INTEGER_CASE(int8_t);
844 case Type::UINT16:
845 INTEGER_CASE(uint16_t);
846 case Type::INT16:
847 INTEGER_CASE(int16_t);
848 case Type::UINT32:
849 INTEGER_CASE(uint32_t);
850 case Type::INT32:
851 INTEGER_CASE(int32_t);
852 case Type::UINT64:
853 INTEGER_CASE(uint64_t);
854 case Type::INT64:
855 INTEGER_CASE(int64_t);
856 case Type::FLOAT:
857 ConvertNumericNullableCast<float, double>(data, NAN, out_buffer);
858 break;
859 case Type::DOUBLE:
860 ConvertNumericNullable<double>(data, NAN, out_buffer);
861 break;
862 default:
863 std::stringstream ss;
864 ss << "Cannot write Arrow data of type " << col->type()->ToString();
865 ss << " to a Pandas float64 block.";
866 return Status::NotImplemented(ss.str());
867 }
868
869 #undef INTEGER_CASE
870
871 placement_data_[rel_placement] = abs_placement;
872 return Status::OK();
873 }
874 };
875
876 class BoolBlock : public PandasBlock {
877 public:
878 using PandasBlock::PandasBlock;
879 Status Allocate() override { return AllocateNDArray(NPY_BOOL); }
880
881 Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
882 int64_t rel_placement) override {
883 Type::type type = col->type()->id();
884
885 if (type != Type::BOOL) {
886 std::stringstream ss;
887 ss << "Cannot write Arrow data of type " << col->type()->ToString();
888 ss << " to a Pandas boolean block.";
889 return Status::NotImplemented(ss.str());
890 }
891
892 uint8_t* out_buffer =
893 reinterpret_cast<uint8_t*>(block_data_) + rel_placement * num_rows_;
894
895 ConvertBooleanNoNulls(options_, *col->data().get(), out_buffer);
896 placement_data_[rel_placement] = abs_placement;
897 return Status::OK();
898 }
899 };
900
901 class DatetimeBlock : public PandasBlock {
902 public:
903 using PandasBlock::PandasBlock;
904 Status AllocateDatetime(int ndim) {
905 RETURN_NOT_OK(AllocateNDArray(NPY_DATETIME, ndim));
906
907 PyAcquireGIL lock;
908 auto date_dtype = reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(
909 PyArray_DESCR(reinterpret_cast<PyArrayObject*>(block_arr_.obj()))->c_metadata);
910 date_dtype->meta.base = NPY_FR_ns;
911 return Status::OK();
912 }
913
914 Status Allocate() override { return AllocateDatetime(2); }
915
916 Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
917 int64_t rel_placement) override {
918 Type::type type = col->type()->id();
919
920 int64_t* out_buffer =
921 reinterpret_cast<int64_t*>(block_data_) + rel_placement * num_rows_;
922
923 const ChunkedArray& data = *col.get()->data();
924
925 if (type == Type::DATE32) {
926 // Convert from days since epoch to datetime64[ns]
927 ConvertDatetimeNanos<int32_t, kNanosecondsInDay>(data, out_buffer);
928 } else if (type == Type::DATE64) {
929 // Date64Type is millisecond timestamp stored as int64_t
930 // TODO(wesm): Do we want to make sure to zero out the milliseconds?
931 ConvertDatetimeNanos<int64_t, 1000000L>(data, out_buffer);
932 } else if (type == Type::TIMESTAMP) {
933 auto ts_type = static_cast<TimestampType*>(col->type().get());
934
935 if (ts_type->unit() == TimeUnit::NANO) {
936 ConvertNumericNullable<int64_t>(data, kPandasTimestampNull, out_buffer);
937 } else if (ts_type->unit() == TimeUnit::MICRO) {
938 ConvertDatetimeNanos<int64_t, 1000L>(data, out_buffer);
939 } else if (ts_type->unit() == TimeUnit::MILLI) {
940 ConvertDatetimeNanos<int64_t, 1000000L>(data, out_buffer);
941 } else if (ts_type->unit() == TimeUnit::SECOND) {
942 ConvertDatetimeNanos<int64_t, 1000000000L>(data, out_buffer);
943 } else {
944 return Status::NotImplemented("Unsupported time unit");
945 }
946 } else {
947 std::stringstream ss;
948 ss << "Cannot write Arrow data of type " << col->type()->ToString();
949 ss << " to a Pandas datetime block.";
950 return Status::NotImplemented(ss.str());
951 }
952
953 placement_data_[rel_placement] = abs_placement;
954 return Status::OK();
955 }
956 };
957
958 class DatetimeTZBlock : public DatetimeBlock {
959 public:
960 DatetimeTZBlock(PandasOptions options, const std::string& timezone, int64_t num_rows)
961 : DatetimeBlock(options, num_rows, 1), timezone_(timezone) {}
962
963 // Like Categorical, the internal ndarray is 1-dimensional
964 Status Allocate() override { return AllocateDatetime(1); }
965
966 Status GetPyResult(PyObject** output) override {
967 PyObject* result = PyDict_New();
968 RETURN_IF_PYERROR();
969
970 PyObject* py_tz = PyUnicode_FromStringAndSize(
971 timezone_.c_str(), static_cast<Py_ssize_t>(timezone_.size()));
972 RETURN_IF_PYERROR();
973
974 PyDict_SetItemString(result, "block", block_arr_.obj());
975 PyDict_SetItemString(result, "timezone", py_tz);
976 PyDict_SetItemString(result, "placement", placement_arr_.obj());
977
978 *output = result;
979
980 return Status::OK();
981 }
982
983 private:
984 std::string timezone_;
985 };
986
987 class CategoricalBlock : public PandasBlock {
988 public:
989 explicit CategoricalBlock(PandasOptions options, MemoryPool* pool, int64_t num_rows)
990 : PandasBlock(options, num_rows, 1), pool_(pool) {}
991
992 Status Allocate() override {
993 return Status::NotImplemented(
994 "CategoricalBlock allocation happens when calling Write");
995 }
996
997 template <int ARROW_INDEX_TYPE>
998 Status WriteIndices(const std::shared_ptr<Column>& col) {
999 using TRAITS = internal::arrow_traits<ARROW_INDEX_TYPE>;
1000 using T = typename TRAITS::T;
1001 constexpr int npy_type = TRAITS::npy_type;
1002 RETURN_NOT_OK(AllocateNDArray(npy_type, 1));
1003
1004 // No relative placement offset because a single column
1005 T* out_values = reinterpret_cast<T*>(block_data_);
1006
1007 const ChunkedArray& data = *col->data().get();
1008
1009 for (int c = 0; c < data.num_chunks(); c++) {
1010 const std::shared_ptr<Array> arr = data.chunk(c);
1011 const auto& dict_arr = static_cast<const DictionaryArray&>(*arr);
1012
1013 const auto& indices = static_cast<const PrimitiveArray&>(*dict_arr.indices());
1014 auto in_values = reinterpret_cast<const T*>(indices.raw_values());
1015
1016 // Null is -1 in CategoricalBlock
1017 for (int i = 0; i < arr->length(); ++i) {
1018 *out_values++ = indices.IsNull(i) ? -1 : in_values[i];
1019 }
1020 }
1021
1022 return Status::OK();
1023 }
1024
1025 Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
1026 int64_t rel_placement) override {
1027 std::shared_ptr<Column> converted_col;
1028 if (options_.strings_to_categorical &&
1029 (col->type()->id() == Type::STRING || col->type()->id() == Type::BINARY)) {
1030 RETURN_NOT_OK(EncodeColumnToDictionary(static_cast<const Column&>(*col), pool_,
1031 &converted_col));
1032 } else {
1033 converted_col = col;
1034 }
1035
1036 const auto& dict_type = static_cast<const DictionaryType&>(*converted_col->type());
1037
1038 switch (dict_type.index_type()->id()) {
1039 case Type::INT8:
1040 RETURN_NOT_OK(WriteIndices<Type::INT8>(converted_col));
1041 break;
1042 case Type::INT16:
1043 RETURN_NOT_OK(WriteIndices<Type::INT16>(converted_col));
1044 break;
1045 case Type::INT32:
1046 RETURN_NOT_OK(WriteIndices<Type::INT32>(converted_col));
1047 break;
1048 case Type::INT64:
1049 RETURN_NOT_OK(WriteIndices<Type::INT64>(converted_col));
1050 break;
1051 default: {
1052 std::stringstream ss;
1053 ss << "Categorical index type not supported: "
1054 << dict_type.index_type()->ToString();
1055 return Status::NotImplemented(ss.str());
1056 }
1057 }
1058
1059 placement_data_[rel_placement] = abs_placement;
1060 PyObject* dict;
1061 RETURN_NOT_OK(ConvertArrayToPandas(options_, dict_type.dictionary(), nullptr, &dict));
1062 dictionary_.reset(dict);
1063 ordered_ = dict_type.ordered();
1064
1065 return Status::OK();
1066 }
1067
1068 Status GetPyResult(PyObject** output) override {
1069 PyObject* result = PyDict_New();
1070 RETURN_IF_PYERROR();
1071
1072 PyDict_SetItemString(result, "block", block_arr_.obj());
1073 PyDict_SetItemString(result, "dictionary", dictionary_.obj());
1074 PyDict_SetItemString(result, "placement", placement_arr_.obj());
1075
1076 PyObject* py_ordered = ordered_ ? Py_True : Py_False;
1077 Py_INCREF(py_ordered);
1078 PyDict_SetItemString(result, "ordered", py_ordered);
1079
1080 *output = result;
1081
1082 return Status::OK();
1083 }
1084
1085 protected:
1086 MemoryPool* pool_;
1087 OwnedRef dictionary_;
1088 bool ordered_;
1089 };
1090
1091 Status MakeBlock(PandasOptions options, PandasBlock::type type, int64_t num_rows,
1092 int num_columns, std::shared_ptr<PandasBlock>* block) {
1093 #define BLOCK_CASE(NAME, TYPE) \
1094 case PandasBlock::NAME: \
1095 *block = std::make_shared<TYPE>(options, num_rows, num_columns); \
1096 break;
1097
1098 switch (type) {
1099 BLOCK_CASE(OBJECT, ObjectBlock);
1100 BLOCK_CASE(UINT8, UInt8Block);
1101 BLOCK_CASE(INT8, Int8Block);
1102 BLOCK_CASE(UINT16, UInt16Block);
1103 BLOCK_CASE(INT16, Int16Block);
1104 BLOCK_CASE(UINT32, UInt32Block);
1105 BLOCK_CASE(INT32, Int32Block);
1106 BLOCK_CASE(UINT64, UInt64Block);
1107 BLOCK_CASE(INT64, Int64Block);
1108 BLOCK_CASE(FLOAT, Float32Block);
1109 BLOCK_CASE(DOUBLE, Float64Block);
1110 BLOCK_CASE(BOOL, BoolBlock);
1111 BLOCK_CASE(DATETIME, DatetimeBlock);
1112 default:
1113 return Status::NotImplemented("Unsupported block type");
1114 }
1115
1116 #undef BLOCK_CASE
1117
1118 return (*block)->Allocate();
1119 }
1120
1121 using BlockMap = std::unordered_map<int, std::shared_ptr<PandasBlock>>;
1122
1123 static Status GetPandasBlockType(const Column& col, const PandasOptions& options,
1124 PandasBlock::type* output_type) {
1125 switch (col.type()->id()) {
1126 case Type::BOOL:
1127 *output_type = col.null_count() > 0 ? PandasBlock::OBJECT : PandasBlock::BOOL;
1128 break;
1129 case Type::UINT8:
1130 *output_type = col.null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::UINT8;
1131 break;
1132 case Type::INT8:
1133 *output_type = col.null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT8;
1134 break;
1135 case Type::UINT16:
1136 *output_type = col.null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::UINT16;
1137 break;
1138 case Type::INT16:
1139 *output_type = col.null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT16;
1140 break;
1141 case Type::UINT32:
1142 *output_type = col.null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::UINT32;
1143 break;
1144 case Type::INT32:
1145 *output_type = col.null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT32;
1146 break;
1147 case Type::INT64:
1148 *output_type = col.null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT64;
1149 break;
1150 case Type::UINT64:
1151 *output_type = col.null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::UINT64;
1152 break;
1153 case Type::FLOAT:
1154 *output_type = PandasBlock::FLOAT;
1155 break;
1156 case Type::DOUBLE:
1157 *output_type = PandasBlock::DOUBLE;
1158 break;
1159 case Type::STRING:
1160 case Type::BINARY:
1161 if (options.strings_to_categorical) {
1162 *output_type = PandasBlock::CATEGORICAL;
1163 break;
1164 }
1165 case Type::NA:
1166 case Type::FIXED_SIZE_BINARY:
1167 case Type::STRUCT:
1168 case Type::TIME32:
1169 case Type::TIME64:
1170 case Type::DECIMAL:
1171 *output_type = PandasBlock::OBJECT;
1172 break;
1173 case Type::DATE32:
1174 *output_type = PandasBlock::DATETIME;
1175 break;
1176 case Type::DATE64:
1177 *output_type = PandasBlock::DATETIME;
1178 break;
1179 case Type::TIMESTAMP: {
1180 const auto& ts_type = static_cast<const TimestampType&>(*col.type());
1181 if (ts_type.timezone() != "") {
1182 *output_type = PandasBlock::DATETIME_WITH_TZ;
1183 } else {
1184 *output_type = PandasBlock::DATETIME;
1185 }
1186 } break;
1187 case Type::LIST: {
1188 auto list_type = std::static_pointer_cast<ListType>(col.type());
1189 if (!ListTypeSupported(*list_type->value_type())) {
1190 std::stringstream ss;
1191 ss << "Not implemented type for list in DataFrameBlock: "
1192 << list_type->value_type()->ToString();
1193 return Status::NotImplemented(ss.str());
1194 }
1195 *output_type = PandasBlock::OBJECT;
1196 } break;
1197 case Type::DICTIONARY:
1198 *output_type = PandasBlock::CATEGORICAL;
1199 break;
1200 default:
1201 std::stringstream ss;
1202 ss << "No known equivalent Pandas block for Arrow data of type ";
1203 ss << col.type()->ToString() << " is known.";
1204 return Status::NotImplemented(ss.str());
1205 }
1206 return Status::OK();
1207 }
1208
1209 // Construct the exact pandas 0.x "BlockManager" memory layout
1210 //
1211 // * For each column determine the correct output pandas type
1212 // * Allocate 2D blocks (ncols x nrows) for each distinct data type in output
1213 // * Allocate block placement arrays
1214 // * Write Arrow columns out into each slice of memory; populate block
1215 // * placement arrays as we go
1216 class DataFrameBlockCreator {
1217 public:
1218 explicit DataFrameBlockCreator(const PandasOptions& options,
1219 const std::shared_ptr<Table>& table, MemoryPool* pool)
1220 : table_(table), options_(options), pool_(pool) {}
1221
1222 Status Convert(int nthreads, PyObject** output) {
1223 column_types_.resize(table_->num_columns());
1224 column_block_placement_.resize(table_->num_columns());
1225 type_counts_.clear();
1226 blocks_.clear();
1227
1228 RETURN_NOT_OK(CreateBlocks());
1229 RETURN_NOT_OK(WriteTableToBlocks(nthreads));
1230
1231 return GetResultList(output);
1232 }
1233
1234 Status CreateBlocks() {
1235 for (int i = 0; i < table_->num_columns(); ++i) {
1236 std::shared_ptr<Column> col = table_->column(i);
1237 PandasBlock::type output_type;
1238 RETURN_NOT_OK(GetPandasBlockType(*col, options_, &output_type));
1239
1240 int block_placement = 0;
1241 std::shared_ptr<PandasBlock> block;
1242 if (output_type == PandasBlock::CATEGORICAL) {
1243 block = std::make_shared<CategoricalBlock>(options_, pool_, table_->num_rows());
1244 categorical_blocks_[i] = block;
1245 } else if (output_type == PandasBlock::DATETIME_WITH_TZ) {
1246 const auto& ts_type = static_cast<const TimestampType&>(*col->type());
1247 block = std::make_shared<DatetimeTZBlock>(options_, ts_type.timezone(),
1248 table_->num_rows());
1249 RETURN_NOT_OK(block->Allocate());
1250 datetimetz_blocks_[i] = block;
1251 } else {
1252 auto it = type_counts_.find(output_type);
1253 if (it != type_counts_.end()) {
1254 block_placement = it->second;
1255 // Increment count
1256 it->second += 1;
1257 } else {
1258 // Add key to map
1259 type_counts_[output_type] = 1;
1260 }
1261 }
1262 column_types_[i] = output_type;
1263 column_block_placement_[i] = block_placement;
1264 }
1265
1266 // Create normal non-categorical blocks
1267 for (const auto& it : this->type_counts_) {
1268 PandasBlock::type type = static_cast<PandasBlock::type>(it.first);
1269 std::shared_ptr<PandasBlock> block;
1270 RETURN_NOT_OK(
1271 MakeBlock(this->options_, type, this->table_->num_rows(), it.second, &block));
1272 this->blocks_[type] = block;
1273 }
1274 return Status::OK();
1275 }
1276
1277 Status GetBlock(int i, std::shared_ptr<PandasBlock>* block) {
1278 PandasBlock::type output_type = this->column_types_[i];
1279
1280 if (output_type == PandasBlock::CATEGORICAL) {
1281 auto it = this->categorical_blocks_.find(i);
1282 if (it == this->blocks_.end()) {
1283 return Status::KeyError("No categorical block allocated");
1284 }
1285 *block = it->second;
1286 } else if (output_type == PandasBlock::DATETIME_WITH_TZ) {
1287 auto it = this->datetimetz_blocks_.find(i);
1288 if (it == this->datetimetz_blocks_.end()) {
1289 return Status::KeyError("No datetimetz block allocated");
1290 }
1291 *block = it->second;
1292 } else {
1293 auto it = this->blocks_.find(output_type);
1294 if (it == this->blocks_.end()) {
1295 return Status::KeyError("No block allocated");
1296 }
1297 *block = it->second;
1298 }
1299 return Status::OK();
1300 }
1301
1302 Status WriteTableToBlocks(int nthreads) {
1303 auto WriteColumn = [this](int i) {
1304 std::shared_ptr<PandasBlock> block;
1305 RETURN_NOT_OK(this->GetBlock(i, &block));
1306 return block->Write(this->table_->column(i), i, this->column_block_placement_[i]);
1307 };
1308
1309 int num_tasks = table_->num_columns();
1310 nthreads = std::min<int>(nthreads, num_tasks);
1311 if (nthreads == 1) {
1312 for (int i = 0; i < num_tasks; ++i) {
1313 RETURN_NOT_OK(WriteColumn(i));
1314 }
1315 } else {
1316 RETURN_NOT_OK(ParallelFor(nthreads, num_tasks, WriteColumn));
1317 }
1318 return Status::OK();
1319 }
1320
1321 Status AppendBlocks(const BlockMap& blocks, PyObject* list) {
1322 for (const auto& it : blocks) {
1323 PyObject* item;
1324 RETURN_NOT_OK(it.second->GetPyResult(&item));
1325 if (PyList_Append(list, item) < 0) {
1326 RETURN_IF_PYERROR();
1327 }
1328
1329 // ARROW-1017; PyList_Append increments object refcount
1330 Py_DECREF(item);
1331 }
1332 return Status::OK();
1333 }
1334
1335 Status GetResultList(PyObject** out) {
1336 PyAcquireGIL lock;
1337
1338 PyObject* result = PyList_New(0);
1339 RETURN_IF_PYERROR();
1340
1341 RETURN_NOT_OK(AppendBlocks(blocks_, result));
1342 RETURN_NOT_OK(AppendBlocks(categorical_blocks_, result));
1343 RETURN_NOT_OK(AppendBlocks(datetimetz_blocks_, result));
1344
1345 *out = result;
1346 return Status::OK();
1347 }
1348
1349 private:
1350 std::shared_ptr<Table> table_;
1351
1352 // column num -> block type id
1353 std::vector<PandasBlock::type> column_types_;
1354
1355 // column num -> relative placement within internal block
1356 std::vector<int> column_block_placement_;
1357
1358 // block type -> type count
1359 std::unordered_map<int, int> type_counts_;
1360
1361 PandasOptions options_;
1362
1363 // Memory pool for dictionary encoding
1364 MemoryPool* pool_;
1365
1366 // block type -> block
1367 BlockMap blocks_;
1368
1369 // column number -> categorical block
1370 BlockMap categorical_blocks_;
1371
1372 // column number -> datetimetz block
1373 BlockMap datetimetz_blocks_;
1374 };
1375
1376 class ArrowDeserializer {
1377 public:
1378 ArrowDeserializer(PandasOptions options, const std::shared_ptr<Column>& col,
1379 PyObject* py_ref)
1380 : col_(col), data_(*col->data().get()), options_(options), py_ref_(py_ref) {}
1381
1382 Status AllocateOutput(int type) {
1383 PyAcquireGIL lock;
1384
1385 result_ = NewArray1DFromType(col_->type().get(), type, col_->length(), nullptr);
1386 arr_ = reinterpret_cast<PyArrayObject*>(result_);
1387 return Status::OK();
1388 }
1389
1390 template <int TYPE>
1391 Status ConvertValuesZeroCopy(PandasOptions options, int npy_type,
1392 std::shared_ptr<Array> arr) {
1393 typedef typename internal::arrow_traits<TYPE>::T T;
1394
1395 const auto& prim_arr = static_cast<const PrimitiveArray&>(*arr);
1396 auto in_values = reinterpret_cast<const T*>(prim_arr.raw_values());
1397
1398 // Zero-Copy. We can pass the data pointer directly to NumPy.
1399 void* data = const_cast<T*>(in_values);
1400
1401 PyAcquireGIL lock;
1402
1403 // Zero-Copy. We can pass the data pointer directly to NumPy.
1404 result_ = NewArray1DFromType(col_->type().get(), npy_type, col_->length(), data);
1405 arr_ = reinterpret_cast<PyArrayObject*>(result_);
1406
1407 if (arr_ == NULL) {
1408 // Error occurred, trust that error set
1409 return Status::OK();
1410 }
1411
1412 if (PyArray_SetBaseObject(arr_, py_ref_) == -1) {
1413 // Error occurred, trust that SetBaseObject set the error state
1414 return Status::OK();
1415 } else {
1416 // PyArray_SetBaseObject steals our reference to py_ref_
1417 Py_INCREF(py_ref_);
1418 }
1419
1420 // Arrow data is immutable.
1421 PyArray_CLEARFLAGS(arr_, NPY_ARRAY_WRITEABLE);
1422
1423 // Arrow data is owned by another
1424 PyArray_CLEARFLAGS(arr_, NPY_ARRAY_OWNDATA);
1425
1426 return Status::OK();
1427 }
1428
1429 // ----------------------------------------------------------------------
1430 // Allocate new array and deserialize. Can do a zero copy conversion for some
1431 // types
1432
1433 template <typename Type>
1434 typename std::enable_if<std::is_base_of<FloatingPoint, Type>::value, Status>::type
1435 Visit(const Type& type) {
1436 constexpr int TYPE = Type::type_id;
1437 using traits = internal::arrow_traits<TYPE>;
1438
1439 typedef typename traits::T T;
1440 int npy_type = traits::npy_type;
1441
1442 if (data_.num_chunks() == 1 && data_.null_count() == 0 && py_ref_ != nullptr) {
1443 return ConvertValuesZeroCopy<TYPE>(options_, npy_type, data_.chunk(0));
1444 }
1445
1446 RETURN_NOT_OK(AllocateOutput(npy_type));
1447 auto out_values = reinterpret_cast<T*>(PyArray_DATA(arr_));
1448 ConvertNumericNullable<T>(data_, traits::na_value, out_values);
1449
1450 return Status::OK();
1451 }
1452
1453 template <typename Type>
1454 typename std::enable_if<std::is_base_of<DateType, Type>::value ||
1455 std::is_base_of<TimestampType, Type>::value,
1456 Status>::type
1457 Visit(const Type& type) {
1458 constexpr int TYPE = Type::type_id;
1459 using traits = internal::arrow_traits<TYPE>;
1460
1461 typedef typename traits::T T;
1462
1463 RETURN_NOT_OK(AllocateOutput(traits::npy_type));
1464 auto out_values = reinterpret_cast<T*>(PyArray_DATA(arr_));
1465
1466 constexpr T na_value = traits::na_value;
1467 constexpr int64_t kShift = traits::npy_shift;
1468
1469 for (int c = 0; c < data_.num_chunks(); c++) {
1470 const auto& arr = static_cast<const PrimitiveArray&>(*data_.chunk(c));
1471 auto in_values = reinterpret_cast<const T*>(arr.raw_values());
1472
1473 for (int64_t i = 0; i < arr.length(); ++i) {
1474 *out_values++ = arr.IsNull(i) ? na_value : in_values[i] / kShift;
1475 }
1476 }
1477 return Status::OK();
1478 }
1479
1480 template <typename Type>
1481 typename std::enable_if<std::is_base_of<TimeType, Type>::value, Status>::type Visit(
1482 const Type& type) {
1483 return Status::NotImplemented("Don't know how to serialize Arrow time type to NumPy");
1484 }
1485
1486 // Integer specialization
1487 template <typename Type>
1488 typename std::enable_if<std::is_base_of<Integer, Type>::value, Status>::type Visit(
1489 const Type& type) {
1490 constexpr int TYPE = Type::type_id;
1491 using traits = internal::arrow_traits<TYPE>;
1492
1493 typedef typename traits::T T;
1494
1495 if (data_.num_chunks() == 1 && data_.null_count() == 0 && py_ref_ != nullptr) {
1496 return ConvertValuesZeroCopy<TYPE>(options_, traits::npy_type, data_.chunk(0));
1497 }
1498
1499 if (data_.null_count() > 0) {
1500 RETURN_NOT_OK(AllocateOutput(NPY_FLOAT64));
1501 auto out_values = reinterpret_cast<double*>(PyArray_DATA(arr_));
1502 ConvertIntegerWithNulls<T>(options_, data_, out_values);
1503 } else {
1504 RETURN_NOT_OK(AllocateOutput(traits::npy_type));
1505 auto out_values = reinterpret_cast<T*>(PyArray_DATA(arr_));
1506 ConvertIntegerNoNullsSameType<T>(options_, data_, out_values);
1507 }
1508
1509 return Status::OK();
1510 }
1511
1512 template <typename FUNCTOR>
1513 inline Status VisitObjects(FUNCTOR func) {
1514 RETURN_NOT_OK(AllocateOutput(NPY_OBJECT));
1515 auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
1516 return func(options_, data_, out_values);
1517 }
1518
1519 // UTF8 strings
1520 template <typename Type>
1521 typename std::enable_if<std::is_base_of<BinaryType, Type>::value, Status>::type Visit(
1522 const Type& type) {
1523 return VisitObjects(ConvertBinaryLike<Type>);
1524 }
1525
1526 Status Visit(const NullType& type) { return VisitObjects(ConvertNulls); }
1527
1528 // Fixed length binary strings
1529 Status Visit(const FixedSizeBinaryType& type) {
1530 return VisitObjects(ConvertFixedSizeBinary);
1531 }
1532
1533 Status Visit(const DecimalType& type) { return VisitObjects(ConvertDecimals); }
1534
1535 Status Visit(const Time32Type& type) { return VisitObjects(ConvertTimes<Time32Type>); }
1536
1537 Status Visit(const Time64Type& type) { return VisitObjects(ConvertTimes<Time64Type>); }
1538
1539 Status Visit(const StructType& type) { return VisitObjects(ConvertStruct); }
1540
1541 // Boolean specialization
1542 Status Visit(const BooleanType& type) {
1543 if (data_.null_count() > 0) {
1544 return VisitObjects(ConvertBooleanWithNulls);
1545 } else {
1546 RETURN_NOT_OK(AllocateOutput(internal::arrow_traits<Type::BOOL>::npy_type));
1547 auto out_values = reinterpret_cast<uint8_t*>(PyArray_DATA(arr_));
1548 ConvertBooleanNoNulls(options_, data_, out_values);
1549 }
1550 return Status::OK();
1551 }
1552
1553 Status Visit(const ListType& type) {
1554 #define CONVERTVALUES_LISTSLIKE_CASE(ArrowType, ArrowEnum) \
1555 case Type::ArrowEnum: \
1556 return ConvertListsLike<ArrowType>(options_, col_, out_values);
1557
1558 RETURN_NOT_OK(AllocateOutput(NPY_OBJECT));
1559 auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
1560 auto list_type = std::static_pointer_cast<ListType>(col_->type());
1561 switch (list_type->value_type()->id()) {
1562 CONVERTVALUES_LISTSLIKE_CASE(UInt8Type, UINT8)
1563 CONVERTVALUES_LISTSLIKE_CASE(Int8Type, INT8)
1564 CONVERTVALUES_LISTSLIKE_CASE(UInt16Type, UINT16)
1565 CONVERTVALUES_LISTSLIKE_CASE(Int16Type, INT16)
1566 CONVERTVALUES_LISTSLIKE_CASE(UInt32Type, UINT32)
1567 CONVERTVALUES_LISTSLIKE_CASE(Int32Type, INT32)
1568 CONVERTVALUES_LISTSLIKE_CASE(UInt64Type, UINT64)
1569 CONVERTVALUES_LISTSLIKE_CASE(Int64Type, INT64)
1570 CONVERTVALUES_LISTSLIKE_CASE(TimestampType, TIMESTAMP)
1571 CONVERTVALUES_LISTSLIKE_CASE(FloatType, FLOAT)
1572 CONVERTVALUES_LISTSLIKE_CASE(DoubleType, DOUBLE)
1573 CONVERTVALUES_LISTSLIKE_CASE(StringType, STRING)
1574 CONVERTVALUES_LISTSLIKE_CASE(DecimalType, DECIMAL)
1575 CONVERTVALUES_LISTSLIKE_CASE(ListType, LIST)
1576 default: {
1577 std::stringstream ss;
1578 ss << "Not implemented type for lists: " << list_type->value_type()->ToString();
1579 return Status::NotImplemented(ss.str());
1580 }
1581 }
1582 #undef CONVERTVALUES_LISTSLIKE_CASE
1583 }
1584
1585 Status Visit(const DictionaryType& type) {
1586 auto block = std::make_shared<CategoricalBlock>(options_, nullptr, col_->length());
1587 RETURN_NOT_OK(block->Write(col_, 0, 0));
1588
1589 auto dict_type = static_cast<const DictionaryType*>(col_->type().get());
1590
1591 PyAcquireGIL lock;
1592 result_ = PyDict_New();
1593 RETURN_IF_PYERROR();
1594
1595 PyObject* dictionary;
1596
1597 // Release GIL before calling ConvertArrayToPandas, will be reacquired
1598 // there if needed
1599 lock.release();
1600 RETURN_NOT_OK(
1601 ConvertArrayToPandas(options_, dict_type->dictionary(), nullptr, &dictionary));
1602 lock.acquire();
1603
1604 PyDict_SetItemString(result_, "indices", block->block_arr());
1605 PyDict_SetItemString(result_, "dictionary", dictionary);
1606
1607 return Status::OK();
1608 }
1609
1610 Status Visit(const UnionType& type) { return Status::NotImplemented("union type"); }
1611
1612 Status Convert(PyObject** out) {
1613 RETURN_NOT_OK(VisitTypeInline(*col_->type(), this));
1614 *out = result_;
1615 return Status::OK();
1616 }
1617
1618 private:
1619 std::shared_ptr<Column> col_;
1620 const ChunkedArray& data_;
1621 PandasOptions options_;
1622 PyObject* py_ref_;
1623 PyArrayObject* arr_;
1624 PyObject* result_;
1625 };
1626
1627 Status ConvertArrayToPandas(PandasOptions options, const std::shared_ptr<Array>& arr,
1628 PyObject* py_ref, PyObject** out) {
1629 static std::string dummy_name = "dummy";
1630 auto field = std::make_shared<Field>(dummy_name, arr->type());
1631 auto col = std::make_shared<Column>(field, arr);
1632 return ConvertColumnToPandas(options, col, py_ref, out);
1633 }
1634
1635 Status ConvertColumnToPandas(PandasOptions options, const std::shared_ptr<Column>& col,
1636 PyObject* py_ref, PyObject** out) {
1637 ArrowDeserializer converter(options, col, py_ref);
1638 return converter.Convert(out);
1639 }
1640
1641 Status ConvertTableToPandas(PandasOptions options, const std::shared_ptr<Table>& table,
1642 int nthreads, MemoryPool* pool, PyObject** out) {
1643 DataFrameBlockCreator helper(options, table, pool);
1644 return helper.Convert(nthreads, out);
1645 }
1646
1647 } // namespace py
1648 } // namespace arrow