From 6c4905da22ad5b3d5007f45f38a3fa8449f7f8e1 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Wed, 21 Aug 2024 21:03:12 -0700 Subject: [PATCH] Remove legacy Arrow interop APIs (#16590) Contributes to #15193. Authors: - Vyas Ramasubramani (https://github.com/vyasr) Approvers: - Kyle Edwards (https://github.com/KyleFromNVIDIA) - Robert (Bobby) Evans (https://github.com/revans2) - Bradley Dice (https://github.com/bdice) - David Wendt (https://github.com/davidwendt) - Yunsong Wang (https://github.com/PointKernel) URL: https://github.com/rapidsai/cudf/pull/16590 --- cpp/CMakeLists.txt | 3 - cpp/include/cudf/detail/interop.hpp | 101 +--- cpp/include/cudf/interop.hpp | 101 ---- cpp/src/interop/detail/arrow_allocator.cpp | 83 --- cpp/src/interop/detail/arrow_allocator.hpp | 31 -- cpp/src/interop/from_arrow.cu | 524 ------------------- cpp/src/interop/to_arrow.cu | 490 ----------------- cpp/tests/interop/arrow_utils.hpp | 64 ++- java/src/main/native/src/ColumnVectorJni.cpp | 76 ++- java/src/main/native/src/TableJni.cpp | 35 +- 10 files changed, 167 insertions(+), 1341 deletions(-) delete mode 100644 cpp/src/interop/detail/arrow_allocator.cpp delete mode 100644 cpp/src/interop/detail/arrow_allocator.hpp delete mode 100644 cpp/src/interop/from_arrow.cu delete mode 100644 cpp/src/interop/to_arrow.cu diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index ff00c484501..6b8bb26825b 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -363,17 +363,14 @@ add_library( src/hash/sha512_hash.cu src/hash/xxhash_64.cu src/interop/dlpack.cpp - src/interop/from_arrow.cu src/interop/arrow_utilities.cpp src/interop/decimal_conversion_utilities.cu - src/interop/to_arrow.cu src/interop/to_arrow_device.cu src/interop/to_arrow_host.cu src/interop/from_arrow_device.cu src/interop/from_arrow_host.cu src/interop/from_arrow_stream.cu src/interop/to_arrow_schema.cpp - src/interop/detail/arrow_allocator.cpp src/io/avro/avro.cpp src/io/avro/avro_gpu.cu src/io/avro/reader_impl.cu diff --git a/cpp/include/cudf/detail/interop.hpp b/cpp/include/cudf/detail/interop.hpp index 0b9319ba663..0d8f078c9d1 100644 --- a/cpp/include/cudf/detail/interop.hpp +++ b/cpp/include/cudf/detail/interop.hpp @@ -16,29 +16,13 @@ #pragma once -// We disable warning 611 because the `arrow::TableBatchReader` only partially -// override the `ReadNext` method of `arrow::RecordBatchReader::ReadNext` -// triggering warning 611-D from nvcc. -#ifdef __CUDACC__ -#pragma nv_diag_suppress 611 -#pragma nv_diag_suppress 2810 -#endif -#include - -#include -#ifdef __CUDACC__ -#pragma nv_diag_default 611 -#pragma nv_diag_default 2810 -#endif - #include #include #include #include #include - -#include +#include namespace CUDF_EXPORT cudf { namespace detail { @@ -61,89 +45,6 @@ DLManagedTensor* to_dlpack(table_view const& input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); -// Creating arrow as per given type_id and buffer arguments -template -std::shared_ptr to_arrow_array(cudf::type_id id, Ts&&... args) -{ - switch (id) { - case type_id::BOOL8: return std::make_shared(std::forward(args)...); - case type_id::INT8: return std::make_shared(std::forward(args)...); - case type_id::INT16: return std::make_shared(std::forward(args)...); - case type_id::INT32: return std::make_shared(std::forward(args)...); - case type_id::INT64: return std::make_shared(std::forward(args)...); - case type_id::UINT8: return std::make_shared(std::forward(args)...); - case type_id::UINT16: return std::make_shared(std::forward(args)...); - case type_id::UINT32: return std::make_shared(std::forward(args)...); - case type_id::UINT64: return std::make_shared(std::forward(args)...); - case type_id::FLOAT32: return std::make_shared(std::forward(args)...); - case type_id::FLOAT64: return std::make_shared(std::forward(args)...); - case type_id::TIMESTAMP_DAYS: - return std::make_shared(std::make_shared(), - std::forward(args)...); - case type_id::TIMESTAMP_SECONDS: - return std::make_shared(arrow::timestamp(arrow::TimeUnit::SECOND), - std::forward(args)...); - case type_id::TIMESTAMP_MILLISECONDS: - return std::make_shared(arrow::timestamp(arrow::TimeUnit::MILLI), - std::forward(args)...); - case type_id::TIMESTAMP_MICROSECONDS: - return std::make_shared(arrow::timestamp(arrow::TimeUnit::MICRO), - std::forward(args)...); - case type_id::TIMESTAMP_NANOSECONDS: - return std::make_shared(arrow::timestamp(arrow::TimeUnit::NANO), - std::forward(args)...); - case type_id::DURATION_SECONDS: - return std::make_shared(arrow::duration(arrow::TimeUnit::SECOND), - std::forward(args)...); - case type_id::DURATION_MILLISECONDS: - return std::make_shared(arrow::duration(arrow::TimeUnit::MILLI), - std::forward(args)...); - case type_id::DURATION_MICROSECONDS: - return std::make_shared(arrow::duration(arrow::TimeUnit::MICRO), - std::forward(args)...); - case type_id::DURATION_NANOSECONDS: - return std::make_shared(arrow::duration(arrow::TimeUnit::NANO), - std::forward(args)...); - default: CUDF_FAIL("Unsupported type_id conversion to arrow"); - } -} - -// Converting arrow type to cudf type -data_type arrow_to_cudf_type(arrow::DataType const& arrow_type); - -/** - * @copydoc cudf::to_arrow(table_view input, std::vector const& metadata, - * rmm::cuda_stream_view stream, arrow::MemoryPool* ar_mr) - */ -std::shared_ptr to_arrow(table_view input, - std::vector const& metadata, - rmm::cuda_stream_view stream, - arrow::MemoryPool* ar_mr); - -/** - * @copydoc cudf::to_arrow(cudf::scalar const& input, column_metadata const& metadata, - * rmm::cuda_stream_view stream, arrow::MemoryPool* ar_mr) - */ -std::shared_ptr to_arrow(cudf::scalar const& input, - column_metadata const& metadata, - rmm::cuda_stream_view stream, - arrow::MemoryPool* ar_mr); -/** - * @copydoc cudf::from_arrow(arrow::Table const& input_table, rmm::cuda_stream_view stream, - * rmm::device_async_resource_ref mr) - */ -std::unique_ptr from_arrow(arrow::Table const& input_table, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr); - -/** - * @copydoc cudf::from_arrow(arrow::Scalar const& input, rmm::cuda_stream_view stream, - * rmm::device_async_resource_ref mr) - */ -std::unique_ptr from_arrow(arrow::Scalar const& input, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr); - /** * @brief Return a maximum precision for a given type. * diff --git a/cpp/include/cudf/interop.hpp b/cpp/include/cudf/interop.hpp index 9a8f87b4a46..0f52b0f7b31 100644 --- a/cpp/include/cudf/interop.hpp +++ b/cpp/include/cudf/interop.hpp @@ -16,21 +16,6 @@ #pragma once -// We disable warning 611 because the `arrow::TableBatchReader` only partially -// override the `ReadNext` method of `arrow::RecordBatchReader::ReadNext` -// triggering warning 611-D from nvcc. -#ifdef __CUDACC__ -#pragma nv_diag_suppress 611 -#pragma nv_diag_suppress 2810 -#endif -#include - -#include -#ifdef __CUDACC__ -#pragma nv_diag_default 611 -#pragma nv_diag_default 2810 -#endif - #include #include #include @@ -131,59 +116,6 @@ struct column_metadata { column_metadata() = default; }; -/** - * @brief Create `arrow::Table` from cudf table `input` - * - * Converts the `cudf::table_view` to `arrow::Table` with the provided - * metadata `column_names`. - * - * @deprecated Since 24.08. Use cudf::to_arrow_host instead. - * - * @throws cudf::logic_error if `column_names` size doesn't match with number of columns. - * - * @param input table_view that needs to be converted to arrow Table - * @param metadata Contains hierarchy of names of columns and children - * @param stream CUDA stream used for device memory operations and kernel launches - * @param ar_mr arrow memory pool to allocate memory for arrow Table - * @return arrow Table generated from `input` - * - * @note For decimals, since the precision is not stored for them in libcudf, - * it will be converted to an Arrow decimal128 that has the widest-precision the cudf decimal type - * supports. For example, numeric::decimal32 will be converted to Arrow decimal128 of the precision - * 9 which is the maximum precision for 32-bit types. Similarly, numeric::decimal128 will be - * converted to Arrow decimal128 of the precision 38. - */ -[[deprecated("Use cudf::to_arrow_host")]] std::shared_ptr to_arrow( - table_view input, - std::vector const& metadata = {}, - rmm::cuda_stream_view stream = cudf::get_default_stream(), - arrow::MemoryPool* ar_mr = arrow::default_memory_pool()); - -/** - * @brief Create `arrow::Scalar` from cudf scalar `input` - * - * Converts the `cudf::scalar` to `arrow::Scalar`. - * - * @deprecated Since 24.08. - * - * @param input scalar that needs to be converted to arrow Scalar - * @param metadata Contains hierarchy of names of columns and children - * @param stream CUDA stream used for device memory operations and kernel launches - * @param ar_mr arrow memory pool to allocate memory for arrow Scalar - * @return arrow Scalar generated from `input` - * - * @note For decimals, since the precision is not stored for them in libcudf, - * it will be converted to an Arrow decimal128 that has the widest-precision the cudf decimal type - * supports. For example, numeric::decimal32 will be converted to Arrow decimal128 of the precision - * 9 which is the maximum precision for 32-bit types. Similarly, numeric::decimal128 will be - * converted to Arrow decimal128 of the precision 38. - */ -[[deprecated("Use cudf::to_arrow_host")]] std::shared_ptr to_arrow( - cudf::scalar const& input, - column_metadata const& metadata = {}, - rmm::cuda_stream_view stream = cudf::get_default_stream(), - arrow::MemoryPool* ar_mr = arrow::default_memory_pool()); - /** * @brief typedef for a unique_ptr to an ArrowSchema with custom deleter * @@ -386,39 +318,6 @@ unique_device_array_t to_arrow_host( rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); -/** - * @brief Create `cudf::table` from given arrow Table input - * - * @deprecated Since 24.08. Use cudf::from_arrow_host instead. - * - * @param input arrow:Table that needs to be converted to `cudf::table` - * @param stream CUDA stream used for device memory operations and kernel launches - * @param mr Device memory resource used to allocate `cudf::table` - * @return cudf table generated from given arrow Table - */ -[[deprecated("Use cudf::from_arrow_host")]] std::unique_ptr
from_arrow( - arrow::Table const& input, - rmm::cuda_stream_view stream = cudf::get_default_stream(), - rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); - -/** - * @brief Create `cudf::scalar` from given arrow Scalar input - * - * @deprecated Since 24.08. Use arrow's `MakeArrayFromScalar` on the - * input, followed by `ExportArray` to obtain something that can be - * consumed by `from_arrow_host`. Then use `cudf::get_element` to - * extract a device scalar from the column. - * - * @param input `arrow::Scalar` that needs to be converted to `cudf::scalar` - * @param stream CUDA stream used for device memory operations and kernel launches - * @param mr Device memory resource used to allocate `cudf::scalar` - * @return cudf scalar generated from given arrow Scalar - */ -[[deprecated("See docstring for migration strategies")]] std::unique_ptr from_arrow( - arrow::Scalar const& input, - rmm::cuda_stream_view stream = cudf::get_default_stream(), - rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); - /** * @brief Create `cudf::table` from given ArrowArray and ArrowSchema input * diff --git a/cpp/src/interop/detail/arrow_allocator.cpp b/cpp/src/interop/detail/arrow_allocator.cpp deleted file mode 100644 index 2a19a5360fe..00000000000 --- a/cpp/src/interop/detail/arrow_allocator.cpp +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright (c) 2021-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -#include -#include - -#include - -namespace cudf { -namespace detail { - -/* - Enable Transparent Huge Pages (THP) for large (>4MB) allocations. - `buf` is returned untouched. - Enabling THP can improve performance of device-host memory transfers - significantly, see . -*/ -template -T enable_hugepage(T&& buf) -{ - if (buf->size() < (1u << 22u)) { // Smaller than 4 MB - return std::move(buf); - } - -#ifdef MADV_HUGEPAGE - auto const pagesize = sysconf(_SC_PAGESIZE); - void* addr = const_cast(buf->data()); - if (addr == nullptr) { return std::move(buf); } - auto length{static_cast(buf->size())}; - if (std::align(pagesize, pagesize, addr, length)) { - // Intentionally not checking for errors that may be returned by older kernel versions; - // optimistically tries enabling huge pages. - madvise(addr, length, MADV_HUGEPAGE); - } -#endif - return std::move(buf); -} - -std::unique_ptr allocate_arrow_buffer(int64_t const size, arrow::MemoryPool* ar_mr) -{ - /* - nvcc 11.0 generates Internal Compiler Error during codegen when arrow::AllocateBuffer - and `ValueOrDie` are used inside a CUDA compilation unit. - - To work around this issue we compile an allocation shim in C++ and use - that from our cuda sources - */ - arrow::Result> result = arrow::AllocateBuffer(size, ar_mr); - CUDF_EXPECTS(result.ok(), "Failed to allocate Arrow buffer"); - return enable_hugepage(std::move(result).ValueOrDie()); -} - -std::shared_ptr allocate_arrow_bitmap(int64_t const size, arrow::MemoryPool* ar_mr) -{ - /* - nvcc 11.0 generates Internal Compiler Error during codegen when arrow::AllocateBuffer - and `ValueOrDie` are used inside a CUDA compilation unit. - - To work around this issue we compile an allocation shim in C++ and use - that from our cuda sources - */ - arrow::Result> result = arrow::AllocateBitmap(size, ar_mr); - CUDF_EXPECTS(result.ok(), "Failed to allocate Arrow bitmap"); - return enable_hugepage(std::move(result).ValueOrDie()); -} - -} // namespace detail -} // namespace cudf diff --git a/cpp/src/interop/detail/arrow_allocator.hpp b/cpp/src/interop/detail/arrow_allocator.hpp deleted file mode 100644 index 75c1baa0dca..00000000000 --- a/cpp/src/interop/detail/arrow_allocator.hpp +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -namespace cudf { -namespace detail { - -// unique_ptr because that is what AllocateBuffer returns -std::unique_ptr allocate_arrow_buffer(int64_t const size, arrow::MemoryPool* ar_mr); - -// shared_ptr because that is what AllocateBitmap returns -std::shared_ptr allocate_arrow_bitmap(int64_t const size, arrow::MemoryPool* ar_mr); - -} // namespace detail -} // namespace cudf diff --git a/cpp/src/interop/from_arrow.cu b/cpp/src/interop/from_arrow.cu deleted file mode 100644 index 579820cbae3..00000000000 --- a/cpp/src/interop/from_arrow.cu +++ /dev/null @@ -1,524 +0,0 @@ -/* - * Copyright (c) 2020-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -#include - -namespace cudf { - -namespace detail { -data_type arrow_to_cudf_type(arrow::DataType const& arrow_type) -{ - switch (arrow_type.id()) { - case arrow::Type::NA: return data_type(type_id::EMPTY); - case arrow::Type::BOOL: return data_type(type_id::BOOL8); - case arrow::Type::INT8: return data_type(type_id::INT8); - case arrow::Type::INT16: return data_type(type_id::INT16); - case arrow::Type::INT32: return data_type(type_id::INT32); - case arrow::Type::INT64: return data_type(type_id::INT64); - case arrow::Type::UINT8: return data_type(type_id::UINT8); - case arrow::Type::UINT16: return data_type(type_id::UINT16); - case arrow::Type::UINT32: return data_type(type_id::UINT32); - case arrow::Type::UINT64: return data_type(type_id::UINT64); - case arrow::Type::FLOAT: return data_type(type_id::FLOAT32); - case arrow::Type::DOUBLE: return data_type(type_id::FLOAT64); - case arrow::Type::DATE32: return data_type(type_id::TIMESTAMP_DAYS); - case arrow::Type::TIMESTAMP: { - auto type = static_cast(&arrow_type); - switch (type->unit()) { - case arrow::TimeUnit::type::SECOND: return data_type(type_id::TIMESTAMP_SECONDS); - case arrow::TimeUnit::type::MILLI: return data_type(type_id::TIMESTAMP_MILLISECONDS); - case arrow::TimeUnit::type::MICRO: return data_type(type_id::TIMESTAMP_MICROSECONDS); - case arrow::TimeUnit::type::NANO: return data_type(type_id::TIMESTAMP_NANOSECONDS); - default: CUDF_FAIL("Unsupported timestamp unit in arrow"); - } - } - case arrow::Type::DURATION: { - auto type = static_cast(&arrow_type); - switch (type->unit()) { - case arrow::TimeUnit::type::SECOND: return data_type(type_id::DURATION_SECONDS); - case arrow::TimeUnit::type::MILLI: return data_type(type_id::DURATION_MILLISECONDS); - case arrow::TimeUnit::type::MICRO: return data_type(type_id::DURATION_MICROSECONDS); - case arrow::TimeUnit::type::NANO: return data_type(type_id::DURATION_NANOSECONDS); - default: CUDF_FAIL("Unsupported duration unit in arrow"); - } - } - case arrow::Type::STRING: return data_type(type_id::STRING); - case arrow::Type::LARGE_STRING: return data_type(type_id::STRING); - case arrow::Type::DICTIONARY: return data_type(type_id::DICTIONARY32); - case arrow::Type::LIST: return data_type(type_id::LIST); - case arrow::Type::DECIMAL: { - auto const type = static_cast(&arrow_type); - return data_type{type_id::DECIMAL128, -type->scale()}; - } - case arrow::Type::STRUCT: return data_type(type_id::STRUCT); - default: CUDF_FAIL("Unsupported type_id conversion to cudf"); - } -} - -namespace { -/** - * @brief Functor to return column for a corresponding arrow array. column - * is formed from buffer underneath the arrow array along with any offset and - * change in length that array has. - */ -struct dispatch_to_cudf_column { - /** - * @brief Returns mask from an array without any offsets. - */ - std::unique_ptr get_mask_buffer(arrow::Array const& array, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) - { - if (array.null_bitmap_data() == nullptr) { - return std::make_unique(0, stream, mr); - } - auto const null_bitmap_size = array.null_bitmap()->size(); - auto const allocation_size = - bitmask_allocation_size_bytes(static_cast(null_bitmap_size * CHAR_BIT)); - auto mask = std::make_unique(allocation_size, stream, mr); - auto mask_buffer = array.null_bitmap(); - CUDF_CUDA_TRY(cudaMemcpyAsync(mask->data(), - reinterpret_cast(mask_buffer->address()), - null_bitmap_size, - cudaMemcpyDefault, - stream.value())); - // Zero-initialize trailing padding bytes - auto const num_trailing_bytes = allocation_size - null_bitmap_size; - if (num_trailing_bytes > 0) { - auto trailing_bytes = static_cast(mask->data()) + null_bitmap_size; - CUDF_CUDA_TRY(cudaMemsetAsync(trailing_bytes, 0, num_trailing_bytes, stream.value())); - } - return mask; - } - - template ())> - std::unique_ptr operator()( - arrow::Array const&, data_type, bool, rmm::cuda_stream_view, rmm::device_async_resource_ref) - { - CUDF_FAIL("Unsupported type in from_arrow."); - } - - template ())> - std::unique_ptr operator()(arrow::Array const& array, - data_type type, - bool skip_mask, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) - { - auto data_buffer = array.data()->buffers[1]; - size_type const num_rows = array.length(); - auto const has_nulls = skip_mask ? false : array.null_bitmap_data() != nullptr; - auto col = make_fixed_width_column(type, num_rows, mask_state::UNALLOCATED, stream, mr); - auto mutable_column_view = col->mutable_view(); - CUDF_CUDA_TRY(cudaMemcpyAsync( - mutable_column_view.data(), - reinterpret_cast(data_buffer->address()) + array.offset() * sizeof(T), - sizeof(T) * num_rows, - cudaMemcpyDefault, - stream.value())); - if (has_nulls) { - auto tmp_mask = get_mask_buffer(array, stream, mr); - - // If array is sliced, we have to copy whole mask and then take copy. - auto out_mask = (num_rows == static_cast(data_buffer->size() / sizeof(T))) - ? std::move(*tmp_mask) - : cudf::detail::copy_bitmask(static_cast(tmp_mask->data()), - array.offset(), - array.offset() + num_rows, - stream, - mr); - - col->set_null_mask(std::move(out_mask), array.null_count()); - } - - return col; - } -}; - -std::unique_ptr get_empty_type_column(size_type size) -{ - // this abomination is required by cuDF Python, which needs to handle - // [PyArrow null arrays](https://arrow.apache.org/docs/python/generated/pyarrow.NullArray.html) - // of finite length - return std::make_unique( - data_type(type_id::EMPTY), size, rmm::device_buffer{}, rmm::device_buffer{}, size); -} - -/** - * @brief Returns cudf column formed from given arrow array - * This has been introduced to take care of compiler error "error: explicit specialization of - * function must precede its first use" - */ -std::unique_ptr get_column(arrow::Array const& array, - data_type type, - bool skip_mask, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr); - -template <> -std::unique_ptr dispatch_to_cudf_column::operator()( - arrow::Array const& array, - data_type type, - bool skip_mask, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - using DeviceType = __int128_t; - - auto data_buffer = array.data()->buffers[1]; - auto const num_rows = static_cast(array.length()); - auto col = make_fixed_width_column(type, num_rows, mask_state::UNALLOCATED, stream, mr); - auto mutable_column_view = col->mutable_view(); - - CUDF_CUDA_TRY(cudaMemcpyAsync( - mutable_column_view.data(), - reinterpret_cast(data_buffer->address()) + array.offset() * sizeof(DeviceType), - sizeof(DeviceType) * num_rows, - cudaMemcpyDefault, - stream.value())); - - auto null_mask = [&] { - if (not skip_mask and array.null_bitmap_data()) { - auto temp_mask = get_mask_buffer(array, stream, mr); - // If array is sliced, we have to copy whole mask and then take copy. - return (num_rows == static_cast(data_buffer->size() / sizeof(DeviceType))) - ? std::move(*temp_mask.release()) - : cudf::detail::copy_bitmask(static_cast(temp_mask->data()), - array.offset(), - array.offset() + num_rows, - stream, - mr); - } - return rmm::device_buffer{}; - }(); - - col->set_null_mask(std::move(null_mask), array.null_count()); - return col; -} - -template <> -std::unique_ptr dispatch_to_cudf_column::operator()(arrow::Array const& array, - data_type, - bool skip_mask, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - auto data_buffer = array.data()->buffers[1]; - // mask-to-bools expects the mask to be bitmask_type aligned/padded - auto data = rmm::device_buffer( - cudf::bitmask_allocation_size_bytes(data_buffer->size() * CHAR_BIT), stream, mr); - CUDF_CUDA_TRY(cudaMemcpyAsync(data.data(), - reinterpret_cast(data_buffer->address()), - data_buffer->size(), - cudaMemcpyDefault, - stream.value())); - auto out_col = mask_to_bools(static_cast(data.data()), - array.offset(), - array.offset() + array.length(), - stream, - mr); - - auto const has_nulls = skip_mask ? false : array.null_bitmap_data() != nullptr; - if (has_nulls) { - auto out_mask = - detail::copy_bitmask(static_cast(get_mask_buffer(array, stream, mr)->data()), - array.offset(), - array.offset() + array.length(), - stream, - mr); - - out_col->set_null_mask(std::move(out_mask), array.null_count()); - } - - return out_col; -} - -template <> -std::unique_ptr dispatch_to_cudf_column::operator()( - arrow::Array const& array, - data_type, - bool, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - if (array.length() == 0) { return make_empty_column(type_id::STRING); } - - std::unique_ptr offsets_column; - std::unique_ptr char_array; - - if (array.type_id() == arrow::Type::LARGE_STRING) { - auto str_array = static_cast(&array); - auto offset_array = std::make_unique( - str_array->value_offsets()->size() / sizeof(int64_t), str_array->value_offsets(), nullptr); - offsets_column = dispatch_to_cudf_column{}.operator()( - *offset_array, data_type(type_id::INT64), true, stream, mr); - char_array = std::make_unique( - str_array->value_data()->size(), str_array->value_data(), nullptr); - } else if (array.type_id() == arrow::Type::STRING) { - auto str_array = static_cast(&array); - auto offset_array = std::make_unique( - str_array->value_offsets()->size() / sizeof(int32_t), str_array->value_offsets(), nullptr); - offsets_column = dispatch_to_cudf_column{}.operator()( - *offset_array, data_type(type_id::INT32), true, stream, mr); - char_array = std::make_unique( - str_array->value_data()->size(), str_array->value_data(), nullptr); - } else { - throw std::runtime_error("Unsupported array type"); - } - - rmm::device_buffer chars(char_array->length(), stream, mr); - auto data_buffer = char_array->data()->buffers[1]; - CUDF_CUDA_TRY(cudaMemcpyAsync(chars.data(), - reinterpret_cast(data_buffer->address()), - chars.size(), - cudaMemcpyDefault, - stream.value())); - - auto const num_rows = offsets_column->size() - 1; - auto out_col = make_strings_column(num_rows, - std::move(offsets_column), - std::move(chars), - array.null_count(), - std::move(*get_mask_buffer(array, stream, mr))); - - return num_rows == array.length() - ? std::move(out_col) - : std::make_unique( - cudf::detail::slice(out_col->view(), - static_cast(array.offset()), - static_cast(array.offset() + array.length()), - stream), - stream, - mr); -} - -template <> -std::unique_ptr dispatch_to_cudf_column::operator()( - arrow::Array const& array, - data_type, - bool, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - auto dict_array = static_cast(&array); - auto dict_type = arrow_to_cudf_type(*(dict_array->dictionary()->type())); - auto keys_column = get_column(*(dict_array->dictionary()), dict_type, true, stream, mr); - auto ind_type = arrow_to_cudf_type(*(dict_array->indices()->type())); - - auto indices_column = get_column(*(dict_array->indices()), ind_type, false, stream, mr); - // If index type is not of type uint32_t, then cast it to uint32_t - auto const dict_indices_type = data_type{type_id::UINT32}; - if (indices_column->type().id() != dict_indices_type.id()) - indices_column = cudf::detail::cast(indices_column->view(), dict_indices_type, stream, mr); - - // Child columns shouldn't have masks and we need the mask in main column - auto column_contents = indices_column->release(); - indices_column = std::make_unique(dict_indices_type, - static_cast(array.length()), - std::move(*(column_contents.data)), - rmm::device_buffer{}, - 0); - - return make_dictionary_column(std::move(keys_column), - std::move(indices_column), - std::move(*(column_contents.null_mask)), - array.null_count()); -} - -template <> -std::unique_ptr dispatch_to_cudf_column::operator()( - arrow::Array const& array, - data_type, - bool, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - auto struct_array = static_cast(&array); - std::vector> child_columns; - // Offsets have already been applied to child - arrow::ArrayVector array_children = struct_array->fields(); - std::transform(array_children.cbegin(), - array_children.cend(), - std::back_inserter(child_columns), - [&mr, &stream](auto const& child_array) { - auto type = arrow_to_cudf_type(*(child_array->type())); - return get_column(*child_array, type, false, stream, mr); - }); - - auto out_mask = std::move(*(get_mask_buffer(array, stream, mr))); - if (struct_array->null_bitmap_data() != nullptr) { - out_mask = detail::copy_bitmask(static_cast(out_mask.data()), - array.offset(), - array.offset() + array.length(), - stream, - mr); - } - - return make_structs_column( - array.length(), move(child_columns), array.null_count(), std::move(out_mask), stream, mr); -} - -template <> -std::unique_ptr dispatch_to_cudf_column::operator()( - arrow::Array const& array, - data_type, - bool, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - auto list_array = static_cast(&array); - auto offset_array = std::make_unique( - list_array->value_offsets()->size() / sizeof(int32_t), list_array->value_offsets(), nullptr); - auto offsets_column = dispatch_to_cudf_column{}.operator()( - *offset_array, data_type(type_id::INT32), true, stream, mr); - - auto child_type = arrow_to_cudf_type(*(list_array->values()->type())); - auto child_column = get_column(*(list_array->values()), child_type, false, stream, mr); - - auto const num_rows = offsets_column->size() - 1; - auto out_col = make_lists_column(num_rows, - std::move(offsets_column), - std::move(child_column), - array.null_count(), - std::move(*get_mask_buffer(array, stream, mr)), - stream, - mr); - - return num_rows == array.length() - ? std::move(out_col) - : std::make_unique( - cudf::detail::slice(out_col->view(), - static_cast(array.offset()), - static_cast(array.offset() + array.length()), - stream), - stream, - mr); -} - -std::unique_ptr get_column(arrow::Array const& array, - data_type type, - bool skip_mask, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - return type.id() != type_id::EMPTY - ? type_dispatcher(type, dispatch_to_cudf_column{}, array, type, skip_mask, stream, mr) - : get_empty_type_column(array.length()); -} - -} // namespace - -std::unique_ptr
from_arrow(arrow::Table const& input_table, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - if (input_table.num_columns() == 0) { return std::make_unique
(); } - std::vector> columns; - auto chunked_arrays = input_table.columns(); - std::transform(chunked_arrays.begin(), - chunked_arrays.end(), - std::back_inserter(columns), - [&mr, &stream](auto const& chunked_array) { - std::vector> concat_columns; - auto cudf_type = arrow_to_cudf_type(*(chunked_array->type())); - auto array_chunks = chunked_array->chunks(); - if (cudf_type.id() == type_id::EMPTY) { - return get_empty_type_column(chunked_array->length()); - } - std::transform(array_chunks.begin(), - array_chunks.end(), - std::back_inserter(concat_columns), - [&cudf_type, &mr, &stream](auto const& array_chunk) { - return get_column(*array_chunk, cudf_type, false, stream, mr); - }); - if (concat_columns.empty()) { - return std::make_unique( - cudf_type, 0, rmm::device_buffer{}, rmm::device_buffer{}, 0); - } else if (concat_columns.size() == 1) { - return std::move(concat_columns[0]); - } - - std::vector column_views; - std::transform(concat_columns.begin(), - concat_columns.end(), - std::back_inserter(column_views), - [](auto const& col) { return col->view(); }); - return cudf::detail::concatenate(column_views, stream, mr); - }); - - return std::make_unique
(std::move(columns)); -} - -std::unique_ptr from_arrow(arrow::Scalar const& input, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - auto maybe_array = arrow::MakeArrayFromScalar(input, 1); - if (!maybe_array.ok()) { CUDF_FAIL("Failed to create array"); } - auto array = *maybe_array; - - auto field = arrow::field("", input.type); - - auto table = arrow::Table::Make(arrow::schema({field}), {array}); - - auto cudf_table = detail::from_arrow(*table, stream, mr); - - auto cv = cudf_table->view().column(0); - return get_element(cv, 0, stream); -} - -} // namespace detail - -std::unique_ptr
from_arrow(arrow::Table const& input_table, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - CUDF_FUNC_RANGE(); - - return detail::from_arrow(input_table, stream, mr); -} - -std::unique_ptr from_arrow(arrow::Scalar const& input, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - CUDF_FUNC_RANGE(); - - return detail::from_arrow(input, stream, mr); -} -} // namespace cudf diff --git a/cpp/src/interop/to_arrow.cu b/cpp/src/interop/to_arrow.cu deleted file mode 100644 index a867d4adfa1..00000000000 --- a/cpp/src/interop/to_arrow.cu +++ /dev/null @@ -1,490 +0,0 @@ -/* - * Copyright (c) 2020-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "arrow_utilities.hpp" -#include "decimal_conversion_utilities.cuh" -#include "detail/arrow_allocator.hpp" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -#include -#include -#include - -namespace cudf { -namespace detail { -namespace { - -/** - * @brief Create arrow data buffer from given cudf column - */ -template -std::shared_ptr fetch_data_buffer(device_span input, - arrow::MemoryPool* ar_mr, - rmm::cuda_stream_view stream) -{ - int64_t const data_size_in_bytes = sizeof(T) * input.size(); - - auto data_buffer = allocate_arrow_buffer(data_size_in_bytes, ar_mr); - - CUDF_CUDA_TRY(cudaMemcpyAsync(data_buffer->mutable_data(), - input.data(), - data_size_in_bytes, - cudaMemcpyDefault, - stream.value())); - - return std::move(data_buffer); -} - -/** - * @brief Create arrow buffer of mask from given cudf column - */ -std::shared_ptr fetch_mask_buffer(column_view input_view, - arrow::MemoryPool* ar_mr, - rmm::cuda_stream_view stream) -{ - int64_t const mask_size_in_bytes = cudf::bitmask_allocation_size_bytes(input_view.size()); - - if (input_view.has_nulls()) { - auto mask_buffer = allocate_arrow_bitmap(static_cast(input_view.size()), ar_mr); - CUDF_CUDA_TRY(cudaMemcpyAsync( - mask_buffer->mutable_data(), - (input_view.offset() > 0) - ? cudf::detail::copy_bitmask(input_view, stream, rmm::mr::get_current_device_resource()) - .data() - : input_view.null_mask(), - mask_size_in_bytes, - cudaMemcpyDefault, - stream.value())); - - // Resets all padded bits to 0 - mask_buffer->ZeroPadding(); - - return mask_buffer; - } - - return nullptr; -} - -/** - * @brief Functor to convert cudf column to arrow array - */ -struct dispatch_to_arrow { - /** - * @brief Creates vector Arrays from given cudf column children - */ - std::vector> fetch_child_array( - column_view input_view, - std::vector const& metadata, - arrow::MemoryPool* ar_mr, - rmm::cuda_stream_view stream) - { - std::vector> child_arrays; - std::transform( - input_view.child_begin(), - input_view.child_end(), - metadata.begin(), - std::back_inserter(child_arrays), - [&ar_mr, &stream](auto const& child, auto const& meta) { - return type_dispatcher( - child.type(), dispatch_to_arrow{}, child, child.type().id(), meta, ar_mr, stream); - }); - return child_arrays; - } - - template ())> - std::shared_ptr operator()( - column_view, cudf::type_id, column_metadata const&, arrow::MemoryPool*, rmm::cuda_stream_view) - { - CUDF_FAIL("Unsupported type for to_arrow."); - } - - template ())> - std::shared_ptr operator()(column_view input_view, - cudf::type_id id, - column_metadata const&, - arrow::MemoryPool* ar_mr, - rmm::cuda_stream_view stream) - { - return to_arrow_array( - id, - static_cast(input_view.size()), - fetch_data_buffer( - device_span(input_view.data(), input_view.size()), ar_mr, stream), - fetch_mask_buffer(input_view, ar_mr, stream), - static_cast(input_view.null_count())); - } -}; - -// Convert decimal types from libcudf to arrow where those types are not -// directly supported by Arrow. These types must be fit into 128 bits, the -// smallest decimal resolution supported by Arrow. -template -std::shared_ptr unsupported_decimals_to_arrow(column_view input, - int32_t precision, - arrow::MemoryPool* ar_mr, - rmm::cuda_stream_view stream) -{ - auto buf = detail::convert_decimals_to_decimal128( - input, stream, rmm::mr::get_current_device_resource()); - - // Synchronize stream here to ensure the decimal128 buffer is ready. - stream.synchronize(); - - auto const buf_size_in_bytes = buf->size(); - auto data_buffer = allocate_arrow_buffer(buf_size_in_bytes, ar_mr); - - CUDF_CUDA_TRY(cudaMemcpyAsync(data_buffer->mutable_data(), - buf->data(), - buf_size_in_bytes, - cudaMemcpyDefault, - stream.value())); - - auto type = arrow::decimal(precision, -input.type().scale()); - auto mask = fetch_mask_buffer(input, ar_mr, stream); - auto buffers = std::vector>{mask, std::move(data_buffer)}; - auto data = std::make_shared(type, input.size(), buffers); - - return std::make_shared(data); -} - -template <> -std::shared_ptr dispatch_to_arrow::operator()( - column_view input, - cudf::type_id, - column_metadata const&, - arrow::MemoryPool* ar_mr, - rmm::cuda_stream_view stream) -{ - using DeviceType = int32_t; - return unsupported_decimals_to_arrow( - input, cudf::detail::max_precision(), ar_mr, stream); -} - -template <> -std::shared_ptr dispatch_to_arrow::operator()( - column_view input, - cudf::type_id, - column_metadata const&, - arrow::MemoryPool* ar_mr, - rmm::cuda_stream_view stream) -{ - using DeviceType = int64_t; - return unsupported_decimals_to_arrow( - input, cudf::detail::max_precision(), ar_mr, stream); -} - -template <> -std::shared_ptr dispatch_to_arrow::operator()( - column_view input, - cudf::type_id, - column_metadata const&, - arrow::MemoryPool* ar_mr, - rmm::cuda_stream_view stream) -{ - using DeviceType = __int128_t; - auto const max_precision = cudf::detail::max_precision(); - - rmm::device_uvector buf(input.size(), stream); - - thrust::copy(rmm::exec_policy(stream), // - input.begin(), - input.end(), - buf.begin()); - - auto const buf_size_in_bytes = buf.size() * sizeof(DeviceType); - auto data_buffer = allocate_arrow_buffer(buf_size_in_bytes, ar_mr); - - CUDF_CUDA_TRY(cudaMemcpyAsync( - data_buffer->mutable_data(), buf.data(), buf_size_in_bytes, cudaMemcpyDefault, stream.value())); - - auto type = arrow::decimal(max_precision, -input.type().scale()); - auto mask = fetch_mask_buffer(input, ar_mr, stream); - auto buffers = std::vector>{mask, std::move(data_buffer)}; - auto data = std::make_shared(type, input.size(), buffers); - - return std::make_shared(data); -} - -template <> -std::shared_ptr dispatch_to_arrow::operator()(column_view input, - cudf::type_id id, - column_metadata const&, - arrow::MemoryPool* ar_mr, - rmm::cuda_stream_view stream) -{ - auto bitmask = detail::bools_to_mask(input, stream, rmm::mr::get_current_device_resource()); - - auto data_buffer = allocate_arrow_buffer(static_cast(bitmask.first->size()), ar_mr); - - CUDF_CUDA_TRY(cudaMemcpyAsync(data_buffer->mutable_data(), - bitmask.first->data(), - bitmask.first->size(), - cudaMemcpyDefault, - stream.value())); - return to_arrow_array(id, - static_cast(input.size()), - std::move(data_buffer), - fetch_mask_buffer(input, ar_mr, stream), - static_cast(input.null_count())); -} - -template <> -std::shared_ptr dispatch_to_arrow::operator()( - column_view input, - cudf::type_id, - column_metadata const&, - arrow::MemoryPool* ar_mr, - rmm::cuda_stream_view stream) -{ - std::unique_ptr tmp_column = - ((input.offset() != 0) or - ((input.num_children() == 1) and (input.child(0).size() - 1 != input.size()))) - ? std::make_unique(input, stream) - : nullptr; - - column_view input_view = (tmp_column != nullptr) ? tmp_column->view() : input; - auto child_arrays = fetch_child_array(input_view, {{}, {}}, ar_mr, stream); - if (child_arrays.empty()) { - // Empty string will have only one value in offset of 4 bytes - auto tmp_offset_buffer = allocate_arrow_buffer(sizeof(int32_t), ar_mr); - auto tmp_data_buffer = allocate_arrow_buffer(0, ar_mr); - memset(tmp_offset_buffer->mutable_data(), 0, sizeof(int32_t)); - - return std::make_shared( - 0, std::move(tmp_offset_buffer), std::move(tmp_data_buffer)); - } - auto offset_buffer = child_arrays[strings_column_view::offsets_column_index]->data()->buffers[1]; - auto const sview = strings_column_view{input_view}; - auto data_buffer = fetch_data_buffer( - device_span{sview.chars_begin(stream), - static_cast(sview.chars_size(stream))}, - ar_mr, - stream); - if (sview.offsets().type().id() == cudf::type_id::INT64) { - return std::make_shared(static_cast(input_view.size()), - offset_buffer, - data_buffer, - fetch_mask_buffer(input_view, ar_mr, stream), - static_cast(input_view.null_count())); - } else { - return std::make_shared(static_cast(input_view.size()), - offset_buffer, - data_buffer, - fetch_mask_buffer(input_view, ar_mr, stream), - static_cast(input_view.null_count())); - } -} - -template <> -std::shared_ptr dispatch_to_arrow::operator()( - column_view input, - cudf::type_id, - column_metadata const& metadata, - arrow::MemoryPool* ar_mr, - rmm::cuda_stream_view stream) -{ - CUDF_EXPECTS(metadata.children_meta.size() == static_cast(input.num_children()), - "Number of field names and number of children doesn't match\n"); - std::unique_ptr tmp_column = nullptr; - - if (input.offset() != 0) { tmp_column = std::make_unique(input, stream); } - - column_view input_view = (tmp_column != nullptr) ? tmp_column->view() : input; - auto child_arrays = fetch_child_array(input_view, metadata.children_meta, ar_mr, stream); - auto mask = fetch_mask_buffer(input_view, ar_mr, stream); - - std::vector> fields; - std::transform(child_arrays.cbegin(), - child_arrays.cend(), - metadata.children_meta.cbegin(), - std::back_inserter(fields), - [](auto const array, auto const meta) { - return std::make_shared( - meta.name, array->type(), array->null_count() > 0); - }); - auto dtype = std::make_shared(fields); - - return std::make_shared(dtype, - static_cast(input_view.size()), - child_arrays, - mask, - static_cast(input_view.null_count())); -} - -template <> -std::shared_ptr dispatch_to_arrow::operator()( - column_view input, - cudf::type_id, - column_metadata const& metadata, - arrow::MemoryPool* ar_mr, - rmm::cuda_stream_view stream) -{ - CUDF_EXPECTS(metadata.children_meta.empty() || - metadata.children_meta.size() == static_cast(input.num_children()), - "Number of field names and number of children do not match\n"); - std::unique_ptr tmp_column = nullptr; - if ((input.offset() != 0) or - ((input.num_children() == 2) and (input.child(0).size() - 1 != input.size()))) { - tmp_column = std::make_unique(input, stream); - } - - column_view input_view = (tmp_column != nullptr) ? tmp_column->view() : input; - auto children_meta = - metadata.children_meta.empty() ? std::vector{{}, {}} : metadata.children_meta; - auto child_arrays = fetch_child_array(input_view, children_meta, ar_mr, stream); - if (child_arrays.empty() || child_arrays[0]->data()->length == 0) { - auto element_type = child_arrays.empty() ? arrow::null() : child_arrays[1]->type(); - auto result = arrow::MakeEmptyArray(arrow::list(element_type), ar_mr); - CUDF_EXPECTS(result.ok(), "Failed to construct empty arrow list array\n"); - return result.ValueUnsafe(); - } - - auto offset_buffer = child_arrays[0]->data()->buffers[1]; - auto data = child_arrays[1]; - return std::make_shared(arrow::list(data->type()), - static_cast(input_view.size()), - offset_buffer, - data, - fetch_mask_buffer(input_view, ar_mr, stream), - static_cast(input_view.null_count())); -} - -template <> -std::shared_ptr dispatch_to_arrow::operator()( - column_view input, - cudf::type_id, - column_metadata const& metadata, - arrow::MemoryPool* ar_mr, - rmm::cuda_stream_view stream) -{ - // Arrow dictionary requires indices to be signed integer - std::unique_ptr dict_indices = - detail::cast(cudf::dictionary_column_view(input).get_indices_annotated(), - cudf::data_type{type_id::INT32}, - stream, - rmm::mr::get_current_device_resource()); - auto indices = dispatch_to_arrow{}.operator()( - dict_indices->view(), dict_indices->type().id(), {}, ar_mr, stream); - auto dict_keys = cudf::dictionary_column_view(input).keys(); - auto dictionary = - type_dispatcher(dict_keys.type(), - dispatch_to_arrow{}, - dict_keys, - dict_keys.type().id(), - metadata.children_meta.empty() ? column_metadata{} : metadata.children_meta[0], - ar_mr, - stream); - - return std::make_shared( - arrow::dictionary(indices->type(), dictionary->type()), indices, dictionary); -} -} // namespace - -std::shared_ptr to_arrow(table_view input, - std::vector const& metadata, - rmm::cuda_stream_view stream, - arrow::MemoryPool* ar_mr) -{ - CUDF_EXPECTS((metadata.size() == static_cast(input.num_columns())), - "columns' metadata should be equal to number of columns in table"); - - std::vector> arrays; - std::vector> fields; - - std::transform( - input.begin(), - input.end(), - metadata.begin(), - std::back_inserter(arrays), - [&](auto const& c, auto const& meta) { - return c.type().id() != type_id::EMPTY - ? type_dispatcher( - c.type(), detail::dispatch_to_arrow{}, c, c.type().id(), meta, ar_mr, stream) - : std::make_shared(c.size()); - }); - - std::transform( - arrays.begin(), - arrays.end(), - metadata.begin(), - std::back_inserter(fields), - [](auto const& array, auto const& meta) { return arrow::field(meta.name, array->type()); }); - - auto result = arrow::Table::Make(arrow::schema(fields), arrays); - - // synchronize the stream because after the return the data may be accessed from the host before - // the above `cudaMemcpyAsync` calls have completed their copies (especially if pinned host - // memory is used). - stream.synchronize(); - - return result; -} - -std::shared_ptr to_arrow(cudf::scalar const& input, - column_metadata const& metadata, - rmm::cuda_stream_view stream, - arrow::MemoryPool* ar_mr) -{ - auto const column = cudf::make_column_from_scalar(input, 1, stream); - cudf::table_view const tv{{column->view()}}; - auto const arrow_table = detail::to_arrow(tv, {metadata}, stream, ar_mr); - auto const ac = arrow_table->column(0); - auto const maybe_scalar = ac->GetScalar(0); - if (!maybe_scalar.ok()) { CUDF_FAIL("Failed to produce a scalar"); } - return maybe_scalar.ValueOrDie(); -} -} // namespace detail - -std::shared_ptr to_arrow(table_view input, - std::vector const& metadata, - rmm::cuda_stream_view stream, - arrow::MemoryPool* ar_mr) -{ - CUDF_FUNC_RANGE(); - return detail::to_arrow(input, metadata, stream, ar_mr); -} - -std::shared_ptr to_arrow(cudf::scalar const& input, - column_metadata const& metadata, - rmm::cuda_stream_view stream, - arrow::MemoryPool* ar_mr) -{ - CUDF_FUNC_RANGE(); - return detail::to_arrow(input, metadata, stream, ar_mr); -} -} // namespace cudf diff --git a/cpp/tests/interop/arrow_utils.hpp b/cpp/tests/interop/arrow_utils.hpp index 08eada632a5..70a9fe64d70 100644 --- a/cpp/tests/interop/arrow_utils.hpp +++ b/cpp/tests/interop/arrow_utils.hpp @@ -14,6 +14,8 @@ * limitations under the License. */ +#pragma once + #include #include #include @@ -30,11 +32,65 @@ #include #include +#include #include -#include - -#pragma once +// Creating arrow as per given type_id and buffer arguments +template +std::shared_ptr to_arrow_array(cudf::type_id id, Ts&&... args) +{ + switch (id) { + case cudf::type_id::BOOL8: + return std::make_shared(std::forward(args)...); + case cudf::type_id::INT8: return std::make_shared(std::forward(args)...); + case cudf::type_id::INT16: + return std::make_shared(std::forward(args)...); + case cudf::type_id::INT32: + return std::make_shared(std::forward(args)...); + case cudf::type_id::INT64: + return std::make_shared(std::forward(args)...); + case cudf::type_id::UINT8: + return std::make_shared(std::forward(args)...); + case cudf::type_id::UINT16: + return std::make_shared(std::forward(args)...); + case cudf::type_id::UINT32: + return std::make_shared(std::forward(args)...); + case cudf::type_id::UINT64: + return std::make_shared(std::forward(args)...); + case cudf::type_id::FLOAT32: + return std::make_shared(std::forward(args)...); + case cudf::type_id::FLOAT64: + return std::make_shared(std::forward(args)...); + case cudf::type_id::TIMESTAMP_DAYS: + return std::make_shared(std::make_shared(), + std::forward(args)...); + case cudf::type_id::TIMESTAMP_SECONDS: + return std::make_shared(arrow::timestamp(arrow::TimeUnit::SECOND), + std::forward(args)...); + case cudf::type_id::TIMESTAMP_MILLISECONDS: + return std::make_shared(arrow::timestamp(arrow::TimeUnit::MILLI), + std::forward(args)...); + case cudf::type_id::TIMESTAMP_MICROSECONDS: + return std::make_shared(arrow::timestamp(arrow::TimeUnit::MICRO), + std::forward(args)...); + case cudf::type_id::TIMESTAMP_NANOSECONDS: + return std::make_shared(arrow::timestamp(arrow::TimeUnit::NANO), + std::forward(args)...); + case cudf::type_id::DURATION_SECONDS: + return std::make_shared(arrow::duration(arrow::TimeUnit::SECOND), + std::forward(args)...); + case cudf::type_id::DURATION_MILLISECONDS: + return std::make_shared(arrow::duration(arrow::TimeUnit::MILLI), + std::forward(args)...); + case cudf::type_id::DURATION_MICROSECONDS: + return std::make_shared(arrow::duration(arrow::TimeUnit::MICRO), + std::forward(args)...); + case cudf::type_id::DURATION_NANOSECONDS: + return std::make_shared(arrow::duration(arrow::TimeUnit::NANO), + std::forward(args)...); + default: CUDF_FAIL("Unsupported type_id conversion to arrow"); + } +} template std::enable_if_t() and !std::is_same_v, @@ -50,7 +106,7 @@ get_arrow_array(std::vector const& data, std::vector const& mask = { std::shared_ptr mask_buffer = mask.empty() ? nullptr : arrow::internal::BytesToBits(mask).ValueOrDie(); - return cudf::detail::to_arrow_array(cudf::type_to_id(), data.size(), data_buffer, mask_buffer); + return to_arrow_array(cudf::type_to_id(), data.size(), data_buffer, mask_buffer); } template diff --git a/java/src/main/native/src/ColumnVectorJni.cpp b/java/src/main/native/src/ColumnVectorJni.cpp index cdc5aa41abe..9b718b2ed83 100644 --- a/java/src/main/native/src/ColumnVectorJni.cpp +++ b/java/src/main/native/src/ColumnVectorJni.cpp @@ -38,12 +38,70 @@ #include #include +#include #include using cudf::jni::ptr_as_jlong; using cudf::jni::release_as_jlong; +// Creating arrow as per given type_id and buffer arguments +template +std::shared_ptr to_arrow_array(cudf::type_id id, Ts&&... args) +{ + switch (id) { + case cudf::type_id::BOOL8: + return std::make_shared(std::forward(args)...); + case cudf::type_id::INT8: return std::make_shared(std::forward(args)...); + case cudf::type_id::INT16: + return std::make_shared(std::forward(args)...); + case cudf::type_id::INT32: + return std::make_shared(std::forward(args)...); + case cudf::type_id::INT64: + return std::make_shared(std::forward(args)...); + case cudf::type_id::UINT8: + return std::make_shared(std::forward(args)...); + case cudf::type_id::UINT16: + return std::make_shared(std::forward(args)...); + case cudf::type_id::UINT32: + return std::make_shared(std::forward(args)...); + case cudf::type_id::UINT64: + return std::make_shared(std::forward(args)...); + case cudf::type_id::FLOAT32: + return std::make_shared(std::forward(args)...); + case cudf::type_id::FLOAT64: + return std::make_shared(std::forward(args)...); + case cudf::type_id::TIMESTAMP_DAYS: + return std::make_shared(std::make_shared(), + std::forward(args)...); + case cudf::type_id::TIMESTAMP_SECONDS: + return std::make_shared(arrow::timestamp(arrow::TimeUnit::SECOND), + std::forward(args)...); + case cudf::type_id::TIMESTAMP_MILLISECONDS: + return std::make_shared(arrow::timestamp(arrow::TimeUnit::MILLI), + std::forward(args)...); + case cudf::type_id::TIMESTAMP_MICROSECONDS: + return std::make_shared(arrow::timestamp(arrow::TimeUnit::MICRO), + std::forward(args)...); + case cudf::type_id::TIMESTAMP_NANOSECONDS: + return std::make_shared(arrow::timestamp(arrow::TimeUnit::NANO), + std::forward(args)...); + case cudf::type_id::DURATION_SECONDS: + return std::make_shared(arrow::duration(arrow::TimeUnit::SECOND), + std::forward(args)...); + case cudf::type_id::DURATION_MILLISECONDS: + return std::make_shared(arrow::duration(arrow::TimeUnit::MILLI), + std::forward(args)...); + case cudf::type_id::DURATION_MICROSECONDS: + return std::make_shared(arrow::duration(arrow::TimeUnit::MICRO), + std::forward(args)...); + case cudf::type_id::DURATION_NANOSECONDS: + return std::make_shared(arrow::duration(arrow::TimeUnit::NANO), + std::forward(args)...); + default: CUDF_FAIL("Unsupported type_id conversion to arrow"); + } +} + extern "C" { JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ColumnVector_sequence( @@ -141,15 +199,27 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ColumnVector_fromArrow(JNIEnv* env, break; default: // this handles the primitive types - arrow_array = cudf::detail::to_arrow_array( - n_type, j_col_length, data_buffer, null_buffer, j_null_count); + arrow_array = to_arrow_array(n_type, j_col_length, data_buffer, null_buffer, j_null_count); } auto name_and_type = arrow::field("col", arrow_array->type()); std::vector> fields = {name_and_type}; std::shared_ptr schema = std::make_shared(fields); auto arrow_table = arrow::Table::Make(schema, std::vector>{arrow_array}); - auto retCols = cudf::from_arrow(*(arrow_table))->release(); + + ArrowSchema sch; + if (!arrow::ExportSchema(*arrow_table->schema(), &sch).ok()) { + JNI_THROW_NEW(env, "java/lang/RuntimeException", "Unable to produce an ArrowSchema", 0) + } + auto batch = arrow_table->CombineChunksToBatch().ValueOrDie(); + ArrowArray arr; + if (!arrow::ExportRecordBatch(*batch, &arr).ok()) { + JNI_THROW_NEW(env, "java/lang/RuntimeException", "Unable to produce an ArrowArray", 0) + } + auto retCols = cudf::from_arrow(&sch, &arr)->release(); + arr.release(&arr); + sch.release(&sch); + if (retCols.size() != 1) { JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "Must result in one column", 0); } diff --git a/java/src/main/native/src/TableJni.cpp b/java/src/main/native/src/TableJni.cpp index ecc551f1143..c749c8c84bf 100644 --- a/java/src/main/native/src/TableJni.cpp +++ b/java/src/main/native/src/TableJni.cpp @@ -54,6 +54,8 @@ #include +#include +#include #include #include @@ -1069,6 +1071,15 @@ void append_flattened_child_names(cudf::io::column_name_info const& info, } } +// Recursively make schema and its children nullable +void set_nullable(ArrowSchema* schema) +{ + schema->flags |= ARROW_FLAG_NULLABLE; + for (int i = 0; i < schema->n_children; ++i) { + set_nullable(schema->children[i]); + } +} + } // namespace } // namespace jni @@ -2635,7 +2646,13 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Table_convertCudfToArrowTable(JNIEnv // The pointer to the shared_ptr<> is returned as a jlong. using result_t = std::shared_ptr; - auto result = cudf::to_arrow(*tview, state->get_column_metadata(*tview)); + auto got_arrow_schema = cudf::to_arrow_schema(*tview, state->get_column_metadata(*tview)); + cudf::jni::set_nullable(got_arrow_schema.get()); + auto got_arrow_array = cudf::to_arrow_host(*tview); + auto batch = + arrow::ImportRecordBatch(&got_arrow_array->array, got_arrow_schema.get()).ValueOrDie(); + auto result = arrow::Table::FromRecordBatches({batch}).ValueOrDie(); + return ptr_as_jlong(new result_t{result}); } CATCH_STD(env, 0) @@ -2746,7 +2763,21 @@ Java_ai_rapids_cudf_Table_convertArrowTableToCudf(JNIEnv* env, jclass, jlong arr try { cudf::jni::auto_set_device(env); - return convert_table_for_return(env, cudf::from_arrow(*(handle->get()))); + + ArrowSchema sch; + if (!arrow::ExportSchema(*handle->get()->schema(), &sch).ok()) { + JNI_THROW_NEW(env, "java/lang/RuntimeException", "Unable to produce an ArrowSchema", 0) + } + auto batch = handle->get()->CombineChunksToBatch().ValueOrDie(); + ArrowArray arr; + if (!arrow::ExportRecordBatch(*batch, &arr).ok()) { + JNI_THROW_NEW(env, "java/lang/RuntimeException", "Unable to produce an ArrowArray", 0) + } + auto ret = cudf::from_arrow(&sch, &arr); + arr.release(&arr); + sch.release(&sch); + + return convert_table_for_return(env, ret); } CATCH_STD(env, 0) }