Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: decimal convert in deltalake #16216

Merged
merged 1 commit into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ci/scripts/e2e-deltalake-sink-rust-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \
--conf 'spark.hadoop.fs.s3a.secret.key=hummockadmin' \
--conf 'spark.hadoop.fs.s3a.endpoint=http://127.0.0.1:9301' \
--conf 'spark.hadoop.fs.s3a.path.style.access=true' \
--S --e 'create table delta.`s3a://deltalake/deltalake-test`(v1 int, v2 short, v3 long, v4 float, v5 double, v6 string, v7 date, v8 Timestamp, v9 boolean) using delta;'
--S --e 'create table delta.`s3a://deltalake/deltalake-test`(v1 int, v2 short, v3 long, v4 float, v5 double, v6 string, v7 date, v8 Timestamp, v9 boolean, v10 decimal) using delta;'


echo "--- testing sinks"
Expand All @@ -67,7 +67,7 @@ spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \

# check sink destination using shell
if cat ./spark-output/*.csv | sort | awk -F "," '{
exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01T01:01:01.000Z" && $9 == "false"); }'; then
exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01T01:01:01.000Z" && $9 == "false" && $10 == 1); }'; then
echo "DeltaLake sink check passed"
else
cat ./spark-output/*.csv
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/sink/deltalake_rust_sink.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
statement ok
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean);
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean, v10 decimal);

statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
Expand All @@ -17,7 +17,7 @@ with (
);

statement ok
INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01+00:00' , false);
INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01+00:00' , false, 1);

statement ok
FLUSH;
Expand Down
94 changes: 93 additions & 1 deletion src/common/src/array/arrow/arrow_deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,104 @@
//!
//! The corresponding version of arrow is currently used by `deltalake` sink.

pub use arrow_impl::to_record_batch_with_schema as to_deltalake_record_batch_with_schema;
use std::ops::{Div, Mul};
use std::sync::Arc;

use arrow_array::ArrayRef;
use itertools::Itertools;
use num_traits::abs;
use {
arrow_array_deltalake as arrow_array, arrow_buffer_deltalake as arrow_buffer,
arrow_cast_deltalake as arrow_cast, arrow_schema_deltalake as arrow_schema,
};

use self::arrow_impl::ToArrowArrayWithTypeConvert;
use crate::array::{Array, ArrayError, DataChunk, DecimalArray};
use crate::util::iter_util::ZipEqFast;
#[expect(clippy::duplicate_mod)]
#[path = "./arrow_impl.rs"]
mod arrow_impl;

struct DeltaLakeConvert;

impl arrow_impl::ToArrowArrayWithTypeConvert for DeltaLakeConvert {
fn decimal_to_arrow(
&self,
data_type: &arrow_schema::DataType,
array: &DecimalArray,
) -> Result<arrow_array::ArrayRef, ArrayError> {
let (precision, max_scale) = match data_type {
arrow_schema::DataType::Decimal128(precision, scale) => (*precision, *scale),
_ => return Err(ArrayError::to_arrow("Invalid decimal type")),
};

// Convert Decimal to i128:
let values: Vec<Option<i128>> = array
.iter()
.map(|e| {
e.and_then(|e| match e {
crate::array::Decimal::Normalized(e) => {
let value = e.mantissa();
let scale = e.scale() as i8;
let diff_scale = abs(max_scale - scale);
let value = match scale {
_ if scale < max_scale => {
value.mul(10_i32.pow(diff_scale as u32) as i128)
}
_ if scale > max_scale => {
value.div(10_i32.pow(diff_scale as u32) as i128)
}
_ => value,
};
Some(value)
}
// For Inf, we replace them with the max/min value within the precision.
crate::array::Decimal::PositiveInf => {
let max_value = 10_i128.pow(precision as u32) - 1;
Some(max_value)
}
crate::array::Decimal::NegativeInf => {
let max_value = 10_i128.pow(precision as u32) - 1;
Some(-max_value)
}
crate::array::Decimal::NaN => None,
})
})
.collect();

let array = arrow_array::Decimal128Array::from(values)
.with_precision_and_scale(precision, max_scale)
.map_err(ArrayError::from_arrow)?;
Ok(Arc::new(array) as ArrayRef)
}
}

/// Converts RisingWave array to Arrow array with the schema.
/// This function will try to convert the array if the type is not same with the schema.
pub fn to_deltalake_record_batch_with_schema(
schema: arrow_schema::SchemaRef,
chunk: &DataChunk,
) -> Result<arrow_array::RecordBatch, ArrayError> {
if !chunk.is_compacted() {
let c = chunk.clone();
return to_deltalake_record_batch_with_schema(schema, &c.compact());
}
let columns: Vec<_> = chunk
.columns()
.iter()
.zip_eq_fast(schema.fields().iter())
.map(|(column, field)| {
let column: arrow_array::ArrayRef =
DeltaLakeConvert.to_arrow_with_type(field.data_type(), column)?;
if column.data_type() == field.data_type() {
Ok(column)
} else {
arrow_cast::cast(&column, field.data_type()).map_err(ArrayError::from_arrow)
}
})
.try_collect::<_, _, ArrayError>()?;

let opts = arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity()));
arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts)
.map_err(ArrayError::to_arrow)
}
1 change: 1 addition & 0 deletions src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use crate::util::iter_util::ZipEqFast;

/// Converts RisingWave array to Arrow array with the schema.
/// This function will try to convert the array if the type is not same with the schema.
#[allow(dead_code)]
pub fn to_record_batch_with_schema(
schema: arrow_schema::SchemaRef,
chunk: &DataChunk,
Expand Down
Loading