Skip to content

Commit

Permalink
Deduplicate decimal32/decimal64 to decimal128 conversion function (ra…
Browse files Browse the repository at this point in the history
…pidsai#16236)

Closes rapidsai#16194

This PR deduplicates the `convert_data_to_decimal128` function from `to_arrow.cu`, `writer_impl.cu` and `to_arrow_device.cu` to a common location.

Authors:
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - Vyas Ramasubramani (https://github.com/vyasr)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Nghia Truong (https://github.com/ttnghia)
  - Vyas Ramasubramani (https://github.com/vyasr)

URL: rapidsai#16236
  • Loading branch information
mhaseeb123 authored and rjzamora committed Jul 30, 2024
1 parent 4893218 commit f9b2559
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 85 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ add_library(
src/interop/dlpack.cpp
src/interop/from_arrow.cu
src/interop/arrow_utilities.cpp
src/interop/decimal_conversion_utilities.cu
src/interop/to_arrow.cu
src/interop/to_arrow_device.cu
src/interop/to_arrow_host.cu
Expand Down
70 changes: 70 additions & 0 deletions cpp/src/interop/decimal_conversion_utilities.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "decimal_conversion_utilities.cuh"

#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/detail/utilities/linked_column.hpp>
#include <cudf/fixed_point/fixed_point.hpp>

#include <rmm/exec_policy.hpp>

#include <thrust/for_each.h>

#include <type_traits>

namespace cudf {
namespace detail {

template <typename DecimalType>
std::unique_ptr<rmm::device_buffer> convert_decimals_to_decimal128(
cudf::column_view const& column, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr)
{
static_assert(std::is_same_v<DecimalType, int32_t> or std::is_same_v<DecimalType, int64_t>,
"Only int32 and int64 decimal types can be converted to decimal128.");

constexpr size_type BIT_WIDTH_RATIO = sizeof(__int128_t) / sizeof(DecimalType);
auto buf = std::make_unique<rmm::device_buffer>(column.size() * sizeof(__int128_t), stream, mr);

thrust::for_each(rmm::exec_policy_nosync(stream, mr),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(column.size()),
[in = column.begin<DecimalType>(),
out = reinterpret_cast<DecimalType*>(buf->data()),
BIT_WIDTH_RATIO] __device__(auto in_idx) {
auto const out_idx = in_idx * BIT_WIDTH_RATIO;
// the lowest order bits are the value, the remainder
// simply matches the sign bit to satisfy the two's
// complement integer representation of negative numbers.
out[out_idx] = in[in_idx];
#pragma unroll BIT_WIDTH_RATIO - 1
for (auto i = 1; i < BIT_WIDTH_RATIO; ++i) {
out[out_idx + i] = in[in_idx] < 0 ? -1 : 0;
}
});

return buf;
}

// Instantiate templates for int32_t and int64_t decimal types
template std::unique_ptr<rmm::device_buffer> convert_decimals_to_decimal128<int32_t>(
cudf::column_view const& column, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr);

template std::unique_ptr<rmm::device_buffer> convert_decimals_to_decimal128<int64_t>(
cudf::column_view const& column, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr);

} // namespace detail
} // namespace cudf
44 changes: 44 additions & 0 deletions cpp/src/interop/decimal_conversion_utilities.cuh
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/column/column_view.hpp>
#include <cudf/types.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/resource_ref.hpp>

#include <type_traits>

namespace cudf::detail {

/**
* @brief Convert decimal32 and decimal64 numeric data to decimal128 and return the device vector
*
* @tparam DecimalType to convert from
*
* @param column A view of the input columns
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*
* @return A device vector containing the converted decimal128 data
*/
template <typename DecimalType>
std::unique_ptr<rmm::device_buffer> convert_decimals_to_decimal128(
cudf::column_view const& input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr);

} // namespace cudf::detail
8 changes: 6 additions & 2 deletions cpp/src/interop/to_arrow.cu
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "arrow_utilities.hpp"
#include "decimal_conversion_utilities.cuh"
#include "detail/arrow_allocator.hpp"

#include <cudf/column/column.hpp>
Expand Down Expand Up @@ -158,8 +159,11 @@ std::shared_ptr<arrow::Array> unsupported_decimals_to_arrow(column_view input,
arrow::MemoryPool* ar_mr,
rmm::cuda_stream_view stream)
{
auto buf =
detail::decimals_to_arrow<DeviceType>(input, stream, rmm::mr::get_current_device_resource());
auto buf = detail::convert_decimals_to_decimal128<DeviceType>(
input, stream, rmm::mr::get_current_device_resource());

// Synchronize stream here to ensure the decimal128 buffer is ready.
stream.synchronize();

auto const buf_size_in_bytes = buf->size();
auto data_buffer = allocate_arrow_buffer(buf_size_in_bytes, ar_mr);
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/interop/to_arrow_device.cu
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "arrow_utilities.hpp"
#include "decimal_conversion_utilities.cuh"

#include <cudf/column/column.hpp>
#include <cudf/column/column_view.hpp>
Expand Down Expand Up @@ -141,7 +142,9 @@ int construct_decimals(cudf::column_view input,
nanoarrow::UniqueArray tmp;
NANOARROW_RETURN_NOT_OK(initialize_array(tmp.get(), NANOARROW_TYPE_DECIMAL128, input));

auto buf = detail::decimals_to_arrow<DeviceType>(input, stream, mr);
auto buf = detail::convert_decimals_to_decimal128<DeviceType>(input, stream, mr);
// Synchronize stream here to ensure the decimal128 buffer is ready.
stream.synchronize();
NANOARROW_RETURN_NOT_OK(set_buffer(std::move(buf), fixed_width_data_buffer_idx, tmp.get()));

ArrowArrayMove(tmp.get(), out);
Expand Down
40 changes: 4 additions & 36 deletions cpp/src/interop/to_arrow_host.cu
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "arrow_utilities.hpp"
#include "decimal_conversion_utilities.cuh"

#include <cudf/column/column_view.hpp>
#include <cudf/detail/interop.hpp>
Expand Down Expand Up @@ -50,41 +51,6 @@
namespace cudf {
namespace detail {

template <typename DeviceType>
std::unique_ptr<rmm::device_buffer> decimals_to_arrow(cudf::column_view input,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
constexpr size_type BIT_WIDTH_RATIO = sizeof(__int128_t) / sizeof(DeviceType);
auto buf = std::make_unique<rmm::device_buffer>(input.size() * sizeof(__int128_t), stream, mr);

auto count = thrust::counting_iterator<size_type>(0);
thrust::for_each(rmm::exec_policy(stream, mr),
count,
count + input.size(),
[in = input.begin<DeviceType>(),
out = reinterpret_cast<DeviceType*>(buf->data()),
BIT_WIDTH_RATIO] __device__(auto in_idx) {
auto const out_idx = in_idx * BIT_WIDTH_RATIO;
// the lowest order bits are the value, the remainder
// simply matches the sign bit to satisfy the two's
// complement integer representation of negative numbers.
out[out_idx] = in[in_idx];
#pragma unroll BIT_WIDTH_RATIO - 1
for (auto i = 1; i < BIT_WIDTH_RATIO; ++i) {
out[out_idx + i] = in[in_idx] < 0 ? -1 : 0;
}
});

return buf;
}

template std::unique_ptr<rmm::device_buffer> decimals_to_arrow<int32_t>(
cudf::column_view input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr);

template std::unique_ptr<rmm::device_buffer> decimals_to_arrow<int64_t>(
cudf::column_view input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr);

namespace {

struct dispatch_to_arrow_host {
Expand Down Expand Up @@ -156,7 +122,9 @@ struct dispatch_to_arrow_host {
NANOARROW_RETURN_NOT_OK(initialize_array(tmp.get(), NANOARROW_TYPE_DECIMAL128, column));

NANOARROW_RETURN_NOT_OK(populate_validity_bitmap(ArrowArrayValidityBitmap(tmp.get())));
auto buf = detail::decimals_to_arrow<DeviceType>(column, stream, mr);
auto buf = detail::convert_decimals_to_decimal128<DeviceType>(column, stream, mr);
// No need to synchronize stream here as populate_data_buffer uses the same stream to copy data
// to host.
NANOARROW_RETURN_NOT_OK(
populate_data_buffer(device_span<__int128_t const>(
reinterpret_cast<const __int128_t*>(buf->data()), column.size()),
Expand Down
60 changes: 14 additions & 46 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "arrow_schema_writer.hpp"
#include "compact_protocol_reader.hpp"
#include "compact_protocol_writer.hpp"
#include "interop/decimal_conversion_utilities.cuh"
#include "io/comp/nvcomp_adapter.hpp"
#include "io/parquet/parquet.hpp"
#include "io/parquet/parquet_gpu.hpp"
Expand Down Expand Up @@ -1601,58 +1602,20 @@ size_t column_index_buffer_size(EncColumnChunk* ck,
return ck->ck_stat_size * num_pages + column_index_truncate_length + padding + size_struct_size;
}

/**
* @brief Convert decimal32 and decimal64 data to decimal128 and return the device vector
*
* @tparam DecimalType to convert from
*
* @param column A view of the input columns
* @param stream CUDA stream used for device memory operations and kernel launches
*
* @return A device vector containing the converted decimal128 data
*/
template <typename DecimalType>
rmm::device_uvector<__int128_t> convert_data_to_decimal128(column_view const& column,
rmm::cuda_stream_view stream)
{
size_type constexpr BIT_WIDTH_RATIO = sizeof(__int128_t) / sizeof(DecimalType);

rmm::device_uvector<__int128_t> d128_buffer(column.size(), stream);

thrust::for_each(rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(column.size()),
[in = column.begin<DecimalType>(),
out = reinterpret_cast<DecimalType*>(d128_buffer.data()),
BIT_WIDTH_RATIO] __device__(auto in_idx) {
auto const out_idx = in_idx * BIT_WIDTH_RATIO;
// The lowest order bits are the value, the remainder
// simply matches the sign bit to satisfy the two's
// complement integer representation of negative numbers.
out[out_idx] = in[in_idx];
#pragma unroll BIT_WIDTH_RATIO - 1
for (auto i = 1; i < BIT_WIDTH_RATIO; ++i) {
out[out_idx + i] = in[in_idx] < 0 ? -1 : 0;
}
});

return d128_buffer;
}

/**
* @brief Function to convert decimal32 and decimal64 columns to decimal128 data,
* update the input table metadata, and return a new vector of column views.
*
* @param[in,out] table_meta The table metadata
* @param[in,out] d128_vectors Vector containing the computed decimal128 data buffers.
* @param[in,out] d128_buffers Buffers containing the converted decimal128 data.
* @param input The input table
* @param stream CUDA stream used for device memory operations and kernel launches
*
* @return A device vector containing the converted decimal128 data
*/
std::vector<column_view> convert_decimal_columns_and_metadata(
table_input_metadata& table_meta,
std::vector<rmm::device_uvector<__int128_t>>& d128_vectors,
std::vector<std::unique_ptr<rmm::device_buffer>>& d128_buffers,
table_view const& table,
rmm::cuda_stream_view stream)
{
Expand All @@ -1673,28 +1636,30 @@ std::vector<column_view> convert_decimal_columns_and_metadata(
switch (column.type().id()) {
case type_id::DECIMAL32:
// Convert data to decimal128 type
d128_vectors.emplace_back(convert_data_to_decimal128<int32_t>(column, stream));
d128_buffers.emplace_back(cudf::detail::convert_decimals_to_decimal128<int32_t>(
column, stream, rmm::mr::get_current_device_resource()));
// Update metadata
metadata.set_decimal_precision(MAX_DECIMAL32_PRECISION);
metadata.set_type_length(size_of(data_type{type_id::DECIMAL128, column.type().scale()}));
// Create a new column view from the d128 data vector
return {data_type{type_id::DECIMAL128, column.type().scale()},
column.size(),
d128_vectors.back().data(),
d128_buffers.back()->data(),
column.null_mask(),
column.null_count(),
column.offset(),
converted_children};
case type_id::DECIMAL64:
// Convert data to decimal128 type
d128_vectors.emplace_back(convert_data_to_decimal128<int64_t>(column, stream));
d128_buffers.emplace_back(cudf::detail::convert_decimals_to_decimal128<int64_t>(
column, stream, rmm::mr::get_current_device_resource()));
// Update metadata
metadata.set_decimal_precision(MAX_DECIMAL64_PRECISION);
metadata.set_type_length(size_of(data_type{type_id::DECIMAL128, column.type().scale()}));
// Create a new column view from the d128 data vector
return {data_type{type_id::DECIMAL128, column.type().scale()},
column.size(),
d128_vectors.back().data(),
d128_buffers.back()->data(),
column.null_mask(),
column.null_count(),
column.offset(),
Expand Down Expand Up @@ -1722,6 +1687,9 @@ std::vector<column_view> convert_decimal_columns_and_metadata(
std::back_inserter(converted_column_views),
[&](auto elem) { return convert_column(thrust::get<0>(elem), thrust::get<1>(elem)); });

// Synchronize stream here to ensure all decimal128 buffers are ready.
stream.synchronize();

return converted_column_views;
}

Expand Down Expand Up @@ -1780,13 +1748,13 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
rmm::cuda_stream_view stream)
{
// Container to store decimal128 converted data if needed
std::vector<rmm::device_uvector<__int128_t>> d128_vectors;
std::vector<std::unique_ptr<rmm::device_buffer>> d128_buffers;

// Convert decimal32/decimal64 data to decimal128 if writing arrow schema
// and initialize LinkedColVector
auto vec = table_to_linked_columns(
(write_arrow_schema)
? table_view({convert_decimal_columns_and_metadata(table_meta, d128_vectors, input, stream)})
? table_view({convert_decimal_columns_and_metadata(table_meta, d128_buffers, input, stream)})
: input);

auto schema_tree = construct_parquet_schema_tree(
Expand Down
Loading

0 comments on commit f9b2559

Please sign in to comment.