Skip to content

Commit

Permalink
apacheGH-40061: [C++][Python] Basic conversion of RecordBatch to Arro…
Browse files Browse the repository at this point in the history
…w Tensor - add option to cast NULL to NaN (apache#40803)

### Rationale for this change

The conversion from `RecordBatch` to `Tensor` class exists but it doesn't support record batches with validity bitmaps. This PR adds support for an option to convert null values to NaN.

### What changes are included in this PR?

This PR adds a `nul_to_nan` option in `RecordBatch::ToTensor` so that null values are converted to NaN in the resulting `Tensor`. This for example works:

```python
>>> import pyarrow as pa
>>> batch = pa.record_batch(
...     [
...         pa.array([1, 2, 3, 4, None], type=pa.int32()),
...         pa.array([10, 20, 30, 40, None], type=pa.float32()),
...     ], names = ["a", "b"]
... )

>>> batch
pyarrow.RecordBatch
a: int32
b: float
----
a: [1,2,3,4,null]
b: [10,20,30,40,null]

>>> batch.to_tensor(null_to_nan=True)
<pyarrow.Tensor>
type: double
shape: (5, 2)
strides: (8, 40)

>>> batch.to_tensor(null_to_nan=True).to_numpy()
array([[ 1., 10.],
       [ 2., 20.],
       [ 3., 30.],
       [ 4., 40.],
       [nan, nan]])
```
but default would raise:

```python
>>> batch.to_tensor()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "pyarrow/table.pxi", line 3421, in pyarrow.lib.RecordBatch.to_tensor
    a: int32
  File "pyarrow/error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status
    return check_status(status)
  File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
    raise convert_status(status)
pyarrow.lib.ArrowTypeError: Can only convert a RecordBatch with no nulls. Set null_to_nan to true to convert nulls to nan
```

### Are these changes tested?

Yes.

### Are there any user-facing changes?

No.
* GitHub Issue: apache#40061

Lead-authored-by: AlenkaF <[email protected]>
Co-authored-by: Alenka Frim <[email protected]>
Co-authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
  • Loading branch information
AlenkaF and jorisvandenbossche authored Mar 29, 2024
1 parent ed8c363 commit 96f686b
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 20 deletions.
47 changes: 34 additions & 13 deletions cpp/src/arrow/record_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "arrow/record_batch.h"

#include <algorithm>
#include <cmath>
#include <cstdlib>
#include <memory>
#include <sstream>
Expand Down Expand Up @@ -261,12 +262,19 @@ struct ConvertColumnsToTensorVisitor {
using In = typename T::c_type;
auto in_values = ArraySpan(in_data).GetSpan<In>(1, in_data.length);

if constexpr (std::is_same_v<In, Out>) {
memcpy(out_values, in_values.data(), in_values.size_bytes());
out_values += in_values.size();
if (in_data.null_count == 0) {
if constexpr (std::is_same_v<In, Out>) {
memcpy(out_values, in_values.data(), in_values.size_bytes());
out_values += in_values.size();
} else {
for (In in_value : in_values) {
*out_values++ = static_cast<Out>(in_value);
}
}
} else {
for (In in_value : in_values) {
*out_values++ = static_cast<Out>(in_value);
for (int64_t i = 0; i < in_data.length; ++i) {
*out_values++ =
in_data.IsNull(i) ? static_cast<Out>(NAN) : static_cast<Out>(in_values[i]);
}
}
return Status::OK();
Expand All @@ -286,16 +294,20 @@ inline void ConvertColumnsToTensor(const RecordBatch& batch, uint8_t* out) {
}
}

Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(MemoryPool* pool) const {
Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(bool null_to_nan,
MemoryPool* pool) const {
if (num_columns() == 0) {
return Status::TypeError(
"Conversion to Tensor for RecordBatches without columns/schema is not "
"supported.");
}
// Check for no validity bitmap of each field
// if null_to_nan conversion is set to false
for (int i = 0; i < num_columns(); ++i) {
if (column(i)->null_count() > 0) {
return Status::TypeError("Can only convert a RecordBatch with no nulls.");
if (column(i)->null_count() > 0 && !null_to_nan) {
return Status::TypeError(
"Can only convert a RecordBatch with no nulls. Set null_to_nan to true to "
"convert nulls to NaN");
}
}

Expand All @@ -308,12 +320,12 @@ Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(MemoryPool* pool) const {
std::shared_ptr<Field> result_field = schema_->field(0);
std::shared_ptr<DataType> result_type = result_field->type();

if (num_columns() > 1) {
Field::MergeOptions options;
options.promote_integer_to_float = true;
options.promote_integer_sign = true;
options.promote_numeric_width = true;
Field::MergeOptions options;
options.promote_integer_to_float = true;
options.promote_integer_sign = true;
options.promote_numeric_width = true;

if (num_columns() > 1) {
for (int i = 1; i < num_columns(); ++i) {
if (!is_numeric(column(i)->type()->id())) {
return Status::TypeError("DataType is not supported: ",
Expand All @@ -334,6 +346,15 @@ Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(MemoryPool* pool) const {
result_type = result_field->type();
}

// Check if result_type is signed or unsigned integer and null_to_nan is set to true
// Then all columns should be promoted to float type
if (is_integer(result_type->id()) && null_to_nan) {
ARROW_ASSIGN_OR_RAISE(
result_field,
result_field->MergeWith(field(result_field->name(), float32()), options));
result_type = result_field->type();
}

// Allocate memory
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<Buffer> result,
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/arrow/record_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,12 @@ class ARROW_EXPORT RecordBatch {
/// Create a Tensor object with shape (number of rows, number of columns) and
/// strides (type size in bytes, type size in bytes * number of rows).
/// Generated Tensor will have column-major layout.
///
/// \param[in] null_to_nan if true, convert nulls to NaN
/// \param[in] pool the memory pool to allocate the tensor buffer
/// \return the resulting Tensor
Result<std::shared_ptr<Tensor>> ToTensor(
MemoryPool* pool = default_memory_pool()) const;
bool null_to_nan = false, MemoryPool* pool = default_memory_pool()) const;

/// \brief Construct record batch from struct array
///
Expand Down
76 changes: 75 additions & 1 deletion cpp/src/arrow/record_batch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,8 @@ TEST_F(TestRecordBatch, ToTensorUnsupportedMissing) {
auto batch = RecordBatch::Make(schema, length, {a0, a1});

ASSERT_RAISES_WITH_MESSAGE(TypeError,
"Type error: Can only convert a RecordBatch with no nulls.",
"Type error: Can only convert a RecordBatch with no nulls. "
"Set null_to_nan to true to convert nulls to NaN",
batch->ToTensor());
}

Expand Down Expand Up @@ -740,6 +741,79 @@ TEST_F(TestRecordBatch, ToTensorSupportedNaN) {
CheckTensor<FloatType>(tensor, 18, shape, f_strides);
}

TEST_F(TestRecordBatch, ToTensorSupportedNullToNan) {
const int length = 9;

// int32 + float32 = float64
auto f0 = field("f0", int32());
auto f1 = field("f1", float32());

std::vector<std::shared_ptr<Field>> fields = {f0, f1};
auto schema = ::arrow::schema(fields);

auto a0 = ArrayFromJSON(int32(), "[null, 2, 3, 4, 5, 6, 7, 8, 9]");
auto a1 = ArrayFromJSON(float32(), "[10, 20, 30, 40, null, 60, 70, 80, 90]");

auto batch = RecordBatch::Make(schema, length, {a0, a1});

ASSERT_OK_AND_ASSIGN(auto tensor, batch->ToTensor(/*null_to_nan=*/true));
ASSERT_OK(tensor->Validate());

std::vector<int64_t> shape = {9, 2};
const int64_t f64_size = sizeof(double);
std::vector<int64_t> f_strides = {f64_size, f64_size * shape[0]};
std::shared_ptr<Tensor> tensor_expected = TensorFromJSON(
float64(), "[NaN, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, NaN, 60, 70, 80, 90]",
shape, f_strides);

EXPECT_FALSE(tensor_expected->Equals(*tensor));
EXPECT_TRUE(tensor_expected->Equals(*tensor, EqualOptions().nans_equal(true)));

CheckTensor<DoubleType>(tensor, 18, shape, f_strides);

// int32 -> float64
auto f2 = field("f2", int32());

std::vector<std::shared_ptr<Field>> fields1 = {f0, f2};
auto schema1 = ::arrow::schema(fields1);

auto a2 = ArrayFromJSON(int32(), "[10, 20, 30, 40, null, 60, 70, 80, 90]");
auto batch1 = RecordBatch::Make(schema1, length, {a0, a2});

ASSERT_OK_AND_ASSIGN(auto tensor1, batch1->ToTensor(/*null_to_nan=*/true));
ASSERT_OK(tensor1->Validate());

EXPECT_FALSE(tensor_expected->Equals(*tensor1));
EXPECT_TRUE(tensor_expected->Equals(*tensor1, EqualOptions().nans_equal(true)));

CheckTensor<DoubleType>(tensor1, 18, shape, f_strides);

// int8 -> float32
auto f3 = field("f3", int8());
auto f4 = field("f4", int8());

std::vector<std::shared_ptr<Field>> fields2 = {f3, f4};
auto schema2 = ::arrow::schema(fields2);

auto a3 = ArrayFromJSON(int8(), "[null, 2, 3, 4, 5, 6, 7, 8, 9]");
auto a4 = ArrayFromJSON(int8(), "[10, 20, 30, 40, null, 60, 70, 80, 90]");
auto batch2 = RecordBatch::Make(schema2, length, {a3, a4});

ASSERT_OK_AND_ASSIGN(auto tensor2, batch2->ToTensor(/*null_to_nan=*/true));
ASSERT_OK(tensor2->Validate());

const int64_t f32_size = sizeof(float);
std::vector<int64_t> f_strides_2 = {f32_size, f32_size * shape[0]};
std::shared_ptr<Tensor> tensor_expected_2 = TensorFromJSON(
float32(), "[NaN, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, NaN, 60, 70, 80, 90]",
shape, f_strides_2);

EXPECT_FALSE(tensor_expected_2->Equals(*tensor2));
EXPECT_TRUE(tensor_expected_2->Equals(*tensor2, EqualOptions().nans_equal(true)));

CheckTensor<FloatType>(tensor2, 18, shape, f_strides_2);
}

TEST_F(TestRecordBatch, ToTensorSupportedTypesMixed) {
const int length = 9;

Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
shared_ptr[CRecordBatch] Slice(int64_t offset)
shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length)

CResult[shared_ptr[CTensor]] ToTensor() const
CResult[shared_ptr[CTensor]] ToTensor(c_bool null_to_nan, CMemoryPool* pool) const

cdef cppclass CRecordBatchWithMetadata" arrow::RecordBatchWithMetadata":
shared_ptr[CRecordBatch] batch
Expand Down
49 changes: 46 additions & 3 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -3389,21 +3389,64 @@ cdef class RecordBatch(_Tabular):
<CResult[shared_ptr[CArray]]>deref(c_record_batch).ToStructArray())
return pyarrow_wrap_array(c_array)

def to_tensor(self):
def to_tensor(self, c_bool null_to_nan=False, MemoryPool memory_pool=None):
"""
Convert to a :class:`~pyarrow.Tensor`.
RecordBatches that can be converted have fields of type signed or unsigned
integer or float, including all bit-widths, with no validity bitmask.
integer or float, including all bit-widths. RecordBatches with validity bitmask
for any of the arrays can be converted with ``null_to_nan``turned to ``True``.
In this case null values are converted to NaN and signed or unsigned integer
type arrays are promoted to appropriate float type.
Parameters
----------
null_to_nan : bool, default False
Whether to write null values in the result as ``NaN``.
memory_pool : MemoryPool, default None
For memory allocations, if required, otherwise use default pool
Examples
--------
>>> import pyarrow as pa
>>> batch = pa.record_batch(
... [
... pa.array([1, 2, 3, 4, None], type=pa.int32()),
... pa.array([10, 20, 30, 40, None], type=pa.float32()),
... ], names = ["a", "b"]
... )
>>> batch
pyarrow.RecordBatch
a: int32
b: float
----
a: [1,2,3,4,null]
b: [10,20,30,40,null]
>>> batch.to_tensor(null_to_nan=True)
<pyarrow.Tensor>
type: double
shape: (5, 2)
strides: (8, 40)
>>> batch.to_tensor(null_to_nan=True).to_numpy()
array([[ 1., 10.],
[ 2., 20.],
[ 3., 30.],
[ 4., 40.],
[nan, nan]])
"""
cdef:
shared_ptr[CRecordBatch] c_record_batch
shared_ptr[CTensor] c_tensor
CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)

c_record_batch = pyarrow_unwrap_batch(self)
with nogil:
c_tensor = GetResultValue(
<CResult[shared_ptr[CTensor]]>deref(c_record_batch).ToTensor())
<CResult[shared_ptr[CTensor]]>deref(c_record_batch).ToTensor(null_to_nan,
pool))
return pyarrow_wrap_tensor(c_tensor)

def _export_to_c(self, out_ptr, out_schema_ptr=0):
Expand Down
48 changes: 47 additions & 1 deletion python/pyarrow/tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,7 @@ def test_recordbatch_to_tensor_null():
arr2 = [10, 20, 30, 40, 50, 60, 70, None, 90]
batch = pa.RecordBatch.from_arrays(
[
pa.array(arr1, type=pa.float32()),
pa.array(arr1, type=pa.int32()),
pa.array(arr2, type=pa.float32()),
], ["a", "b"]
)
Expand All @@ -1071,6 +1071,52 @@ def test_recordbatch_to_tensor_null():
):
batch.to_tensor()

result = batch.to_tensor(null_to_nan=True)

x = np.array([arr1, arr2], np.float64).transpose()
expected = pa.Tensor.from_numpy(x)

np.testing.assert_equal(result.to_numpy(), x)
assert result.size == 18
assert result.type == pa.float64()
assert result.shape == expected.shape
assert result.strides == expected.strides

# int32 -> float64
batch = pa.RecordBatch.from_arrays(
[
pa.array(arr1, type=pa.int32()),
pa.array(arr2, type=pa.int32()),
], ["a", "b"]
)

result = batch.to_tensor(null_to_nan=True)

np.testing.assert_equal(result.to_numpy(), x)
assert result.size == 18
assert result.type == pa.float64()
assert result.shape == expected.shape
assert result.strides == expected.strides

# int8 -> float32
batch = pa.RecordBatch.from_arrays(
[
pa.array(arr1, type=pa.int8()),
pa.array(arr2, type=pa.int8()),
], ["a", "b"]
)

result = batch.to_tensor(null_to_nan=True)

x = np.array([arr1, arr2], np.float32).transpose()
expected = pa.Tensor.from_numpy(x)

np.testing.assert_equal(result.to_numpy(), x)
assert result.size == 18
assert result.type == pa.float32()
assert result.shape == expected.shape
assert result.strides == expected.strides


def test_recordbatch_to_tensor_empty():
batch = pa.RecordBatch.from_arrays(
Expand Down

0 comments on commit 96f686b

Please sign in to comment.