Skip to content

Commit

Permalink
Enable direct ingestion and production of Arrow scalars (#14121)
Browse files Browse the repository at this point in the history
This PR adds overloads of `from_arrow` and `to_arrow` for scalars to enable interoperability on par with Arrow Arrays. The new public APIs accept streams, and for consistency streams have also been added to the corresponding column APIs, so this PR contributes to #925.

Authors:
  - Vyas Ramasubramani (https://github.com/vyasr)

Approvers:
  - David Wendt (https://github.com/davidwendt)
  - Bradley Dice (https://github.com/bdice)

URL: #14121
  • Loading branch information
vyasr authored Sep 23, 2023
1 parent 517d123 commit 71f30be
Show file tree
Hide file tree
Showing 8 changed files with 540 additions and 29 deletions.
80 changes: 74 additions & 6 deletions cpp/include/cudf/detail/interop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,27 +104,95 @@ std::shared_ptr<arrow::Array> to_arrow_array(cudf::type_id id, Ts&&... args)
}
}

/**
* @brief Invokes an `operator()` template with the type instantiation based on
* the specified `arrow::DataType`'s `id()`.
*
* This function is analogous to libcudf's type_dispatcher, but instead applies
* to Arrow functions. Its primary use case is to leverage Arrow's
* metaprogramming facilities like arrow::TypeTraits that require translating
* the runtime dtype information into compile-time types.
*/
template <typename Functor, typename... Ts>
constexpr decltype(auto) arrow_type_dispatcher(arrow::DataType const& dtype,
Functor f,
Ts&&... args)
{
switch (dtype.id()) {
case arrow::Type::INT8:
return f.template operator()<arrow::Int8Type>(std::forward<Ts>(args)...);
case arrow::Type::INT16:
return f.template operator()<arrow::Int16Type>(std::forward<Ts>(args)...);
case arrow::Type::INT32:
return f.template operator()<arrow::Int32Type>(std::forward<Ts>(args)...);
case arrow::Type::INT64:
return f.template operator()<arrow::Int64Type>(std::forward<Ts>(args)...);
case arrow::Type::UINT8:
return f.template operator()<arrow::UInt8Type>(std::forward<Ts>(args)...);
case arrow::Type::UINT16:
return f.template operator()<arrow::UInt16Type>(std::forward<Ts>(args)...);
case arrow::Type::UINT32:
return f.template operator()<arrow::UInt32Type>(std::forward<Ts>(args)...);
case arrow::Type::UINT64:
return f.template operator()<arrow::UInt64Type>(std::forward<Ts>(args)...);
case arrow::Type::FLOAT:
return f.template operator()<arrow::FloatType>(std::forward<Ts>(args)...);
case arrow::Type::DOUBLE:
return f.template operator()<arrow::DoubleType>(std::forward<Ts>(args)...);
case arrow::Type::BOOL:
return f.template operator()<arrow::BooleanType>(std::forward<Ts>(args)...);
case arrow::Type::TIMESTAMP:
return f.template operator()<arrow::TimestampType>(std::forward<Ts>(args)...);
case arrow::Type::DURATION:
return f.template operator()<arrow::DurationType>(std::forward<Ts>(args)...);
case arrow::Type::STRING:
return f.template operator()<arrow::StringType>(std::forward<Ts>(args)...);
case arrow::Type::LIST:
return f.template operator()<arrow::ListType>(std::forward<Ts>(args)...);
case arrow::Type::DECIMAL128:
return f.template operator()<arrow::Decimal128Type>(std::forward<Ts>(args)...);
case arrow::Type::STRUCT:
return f.template operator()<arrow::StructType>(std::forward<Ts>(args)...);
default: {
CUDF_FAIL("Invalid type.");
}
}
}

// Converting arrow type to cudf type
data_type arrow_to_cudf_type(arrow::DataType const& arrow_type);

/**
* @copydoc cudf::to_arrow
*
* @param stream CUDA stream used for device memory operations and kernel launches.
* @copydoc cudf::to_arrow(table_view input, std::vector<column_metadata> const& metadata,
* rmm::cuda_stream_view stream, arrow::MemoryPool* ar_mr)
*/
std::shared_ptr<arrow::Table> to_arrow(table_view input,
std::vector<column_metadata> const& metadata,
rmm::cuda_stream_view stream,
arrow::MemoryPool* ar_mr);

/**
* @copydoc cudf::arrow_to_cudf
*
* @param stream CUDA stream used for device memory operations and kernel launches.
* @copydoc cudf::to_arrow(cudf::scalar const& input, column_metadata const& metadata,
* rmm::cuda_stream_view stream, arrow::MemoryPool* ar_mr)
*/
std::shared_ptr<arrow::Scalar> to_arrow(cudf::scalar const& input,
column_metadata const& metadata,
rmm::cuda_stream_view stream,
arrow::MemoryPool* ar_mr);
/**
* @copydoc cudf::from_arrow(arrow::Table const& input_table, rmm::cuda_stream_view stream,
* rmm::mr::device_memory_resource* mr)
*/
std::unique_ptr<table> from_arrow(arrow::Table const& input_table,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @copydoc cudf::from_arrow(arrow::Scalar const& input, rmm::cuda_stream_view stream,
* rmm::mr::device_memory_resource* mr)
*/
std::unique_ptr<cudf::scalar> from_arrow(arrow::Scalar const& input,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);
} // namespace detail
} // namespace cudf
35 changes: 34 additions & 1 deletion cpp/include/cudf/interop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,23 +126,56 @@ struct column_metadata {
*
* @param input table_view that needs to be converted to arrow Table
* @param metadata Contains hierarchy of names of columns and children
* @param stream CUDA stream used for device memory operations and kernel launches
* @param ar_mr arrow memory pool to allocate memory for arrow Table
* @return arrow Table generated from `input`
*/
std::shared_ptr<arrow::Table> to_arrow(table_view input,
std::vector<column_metadata> const& metadata = {},
arrow::MemoryPool* ar_mr = arrow::default_memory_pool());
rmm::cuda_stream_view stream = cudf::get_default_stream(),
arrow::MemoryPool* ar_mr = arrow::default_memory_pool());

/**
* @brief Create `arrow::Scalar` from cudf scalar `input`
*
* Converts the `cudf::scalar` to `arrow::Scalar`.
*
* @param input scalar that needs to be converted to arrow Scalar
* @param metadata Contains hierarchy of names of columns and children
* @param stream CUDA stream used for device memory operations and kernel launches
* @param ar_mr arrow memory pool to allocate memory for arrow Scalar
* @return arrow Scalar generated from `input`
*/
std::shared_ptr<arrow::Scalar> to_arrow(cudf::scalar const& input,
column_metadata const& metadata = {},
rmm::cuda_stream_view stream = cudf::get_default_stream(),
arrow::MemoryPool* ar_mr = arrow::default_memory_pool());
/**
* @brief Create `cudf::table` from given arrow Table input
*
* @param input arrow:Table that needs to be converted to `cudf::table`
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate `cudf::table`
* @return cudf table generated from given arrow Table
*/

std::unique_ptr<table> from_arrow(
arrow::Table const& input,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Create `cudf::scalar` from given arrow Scalar input
*
* @param input `arrow::Scalar` that needs to be converted to `cudf::scalar`
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate `cudf::scalar`
* @return cudf scalar generated from given arrow Scalar
*/

std::unique_ptr<cudf::scalar> from_arrow(
arrow::Scalar const& input,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
Expand Down
88 changes: 87 additions & 1 deletion cpp/src/interop/from_arrow.cu
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,52 @@ std::unique_ptr<column> get_column(arrow::Array const& array,
: get_empty_type_column(array.length());
}

struct BuilderGenerator {
template <typename T,
CUDF_ENABLE_IF(!std::is_same_v<T, arrow::ListType> &&
!std::is_same_v<T, arrow::StructType>)>
std::shared_ptr<arrow::ArrayBuilder> operator()(std::shared_ptr<arrow::DataType> const& type)
{
return std::make_shared<typename arrow::TypeTraits<T>::BuilderType>(
type, arrow::default_memory_pool());
}

template <typename T,
CUDF_ENABLE_IF(std::is_same_v<T, arrow::ListType> ||
std::is_same_v<T, arrow::StructType>)>
std::shared_ptr<arrow::ArrayBuilder> operator()(std::shared_ptr<arrow::DataType> const& type)
{
CUDF_FAIL("Type not supported by BuilderGenerator");
}
};

std::shared_ptr<arrow::ArrayBuilder> make_builder(std::shared_ptr<arrow::DataType> const& type)
{
switch (type->id()) {
case arrow::Type::STRUCT: {
std::vector<std::shared_ptr<arrow::ArrayBuilder>> field_builders;

for (auto field : type->fields()) {
auto const vt = field->type();
if (vt->id() == arrow::Type::STRUCT || vt->id() == arrow::Type::LIST) {
field_builders.push_back(make_builder(vt));
} else {
field_builders.push_back(arrow_type_dispatcher(*vt, BuilderGenerator{}, vt));
}
}
return std::make_shared<arrow::StructBuilder>(
type, arrow::default_memory_pool(), field_builders);
}
case arrow::Type::LIST: {
return std::make_shared<arrow::ListBuilder>(arrow::default_memory_pool(),
make_builder(type->field(0)->type()));
}
default: {
return arrow_type_dispatcher(*type, BuilderGenerator{}, type);
}
}
}

} // namespace

std::unique_ptr<table> from_arrow(arrow::Table const& input_table,
Expand Down Expand Up @@ -462,14 +508,54 @@ std::unique_ptr<table> from_arrow(arrow::Table const& input_table,
return std::make_unique<table>(std::move(columns));
}

std::unique_ptr<cudf::scalar> from_arrow(arrow::Scalar const& input,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
// Get a builder for the scalar type
auto builder = detail::make_builder(input.type);

auto status = builder->AppendScalar(input);
if (status != arrow::Status::OK()) {
if (status.IsNotImplemented()) {
// The only known failure case here is for nulls
CUDF_FAIL("Cannot create untyped null scalars or nested types with untyped null leaf nodes",
std::invalid_argument);
}
CUDF_FAIL("Arrow ArrayBuilder::AppendScalar failed");
}

auto maybe_array = builder->Finish();
if (!maybe_array.ok()) { CUDF_FAIL("Arrow ArrayBuilder::Finish failed"); }
auto array = *maybe_array;

auto field = arrow::field("", input.type);

auto table = arrow::Table::Make(arrow::schema({field}), {array});

auto cudf_table = detail::from_arrow(*table, stream, mr);

auto cv = cudf_table->view().column(0);
return get_element(cv, 0, stream);
}

} // namespace detail

std::unique_ptr<table> from_arrow(arrow::Table const& input_table,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();

return detail::from_arrow(input_table, cudf::get_default_stream(), mr);
return detail::from_arrow(input_table, stream, mr);
}

std::unique_ptr<cudf::scalar> from_arrow(arrow::Scalar const& input,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();

return detail::from_arrow(input, stream, mr);
}
} // namespace cudf
Loading

0 comments on commit 71f30be

Please sign in to comment.