Skip to content

Commit

Permalink
fix: decimal convert in deltalake (#16216)
Browse files Browse the repository at this point in the history
Co-authored-by: ZENOTME <[email protected]>
  • Loading branch information
ZENOTME and ZENOTME authored Apr 10, 2024
1 parent f82821d commit f126d38
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 5 deletions.
4 changes: 2 additions & 2 deletions ci/scripts/e2e-deltalake-sink-rust-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \
--conf 'spark.hadoop.fs.s3a.secret.key=hummockadmin' \
--conf 'spark.hadoop.fs.s3a.endpoint=http://127.0.0.1:9301' \
--conf 'spark.hadoop.fs.s3a.path.style.access=true' \
--S --e 'create table delta.`s3a://deltalake/deltalake-test`(v1 int, v2 short, v3 long, v4 float, v5 double, v6 string, v7 date, v8 Timestamp, v9 boolean) using delta;'
--S --e 'create table delta.`s3a://deltalake/deltalake-test`(v1 int, v2 short, v3 long, v4 float, v5 double, v6 string, v7 date, v8 Timestamp, v9 boolean, v10 decimal) using delta;'


echo "--- testing sinks"
Expand All @@ -67,7 +67,7 @@ spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \

# check sink destination using shell
if cat ./spark-output/*.csv | sort | awk -F "," '{
exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01T01:01:01.000Z" && $9 == "false"); }'; then
exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01T01:01:01.000Z" && $9 == "false" && $10 == 1); }'; then
echo "DeltaLake sink check passed"
else
cat ./spark-output/*.csv
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/sink/deltalake_rust_sink.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
statement ok
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean);
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean, v10 decimal);

statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
Expand All @@ -17,7 +17,7 @@ with (
);

statement ok
INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01+00:00' , false);
INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01+00:00' , false, 1);

statement ok
FLUSH;
Expand Down
94 changes: 93 additions & 1 deletion src/common/src/array/arrow/arrow_deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,104 @@
//!
//! The corresponding version of arrow is currently used by `deltalake` sink.
pub use arrow_impl::to_record_batch_with_schema as to_deltalake_record_batch_with_schema;
use std::ops::{Div, Mul};
use std::sync::Arc;

use arrow_array::ArrayRef;
use itertools::Itertools;
use num_traits::abs;
use {
arrow_array_deltalake as arrow_array, arrow_buffer_deltalake as arrow_buffer,
arrow_cast_deltalake as arrow_cast, arrow_schema_deltalake as arrow_schema,
};

use self::arrow_impl::ToArrowArrayWithTypeConvert;
use crate::array::{Array, ArrayError, DataChunk, DecimalArray};
use crate::util::iter_util::ZipEqFast;
#[expect(clippy::duplicate_mod)]
#[path = "./arrow_impl.rs"]
mod arrow_impl;

struct DeltaLakeConvert;

impl arrow_impl::ToArrowArrayWithTypeConvert for DeltaLakeConvert {
fn decimal_to_arrow(
&self,
data_type: &arrow_schema::DataType,
array: &DecimalArray,
) -> Result<arrow_array::ArrayRef, ArrayError> {
let (precision, max_scale) = match data_type {
arrow_schema::DataType::Decimal128(precision, scale) => (*precision, *scale),
_ => return Err(ArrayError::to_arrow("Invalid decimal type")),
};

// Convert Decimal to i128:
let values: Vec<Option<i128>> = array
.iter()
.map(|e| {
e.and_then(|e| match e {
crate::array::Decimal::Normalized(e) => {
let value = e.mantissa();
let scale = e.scale() as i8;
let diff_scale = abs(max_scale - scale);
let value = match scale {
_ if scale < max_scale => {
value.mul(10_i32.pow(diff_scale as u32) as i128)
}
_ if scale > max_scale => {
value.div(10_i32.pow(diff_scale as u32) as i128)
}
_ => value,
};
Some(value)
}
// For Inf, we replace them with the max/min value within the precision.
crate::array::Decimal::PositiveInf => {
let max_value = 10_i128.pow(precision as u32) - 1;
Some(max_value)
}
crate::array::Decimal::NegativeInf => {
let max_value = 10_i128.pow(precision as u32) - 1;
Some(-max_value)
}
crate::array::Decimal::NaN => None,
})
})
.collect();

let array = arrow_array::Decimal128Array::from(values)
.with_precision_and_scale(precision, max_scale)
.map_err(ArrayError::from_arrow)?;
Ok(Arc::new(array) as ArrayRef)
}
}

/// Converts RisingWave array to Arrow array with the schema.
/// This function will try to convert the array if the type is not same with the schema.
pub fn to_deltalake_record_batch_with_schema(
schema: arrow_schema::SchemaRef,
chunk: &DataChunk,
) -> Result<arrow_array::RecordBatch, ArrayError> {
if !chunk.is_compacted() {
let c = chunk.clone();
return to_deltalake_record_batch_with_schema(schema, &c.compact());
}
let columns: Vec<_> = chunk
.columns()
.iter()
.zip_eq_fast(schema.fields().iter())
.map(|(column, field)| {
let column: arrow_array::ArrayRef =
DeltaLakeConvert.to_arrow_with_type(field.data_type(), column)?;
if column.data_type() == field.data_type() {
Ok(column)
} else {
arrow_cast::cast(&column, field.data_type()).map_err(ArrayError::from_arrow)
}
})
.try_collect::<_, _, ArrayError>()?;

let opts = arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity()));
arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts)
.map_err(ArrayError::to_arrow)
}
1 change: 1 addition & 0 deletions src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use crate::util::iter_util::ZipEqFast;

/// Converts RisingWave array to Arrow array with the schema.
/// This function will try to convert the array if the type is not same with the schema.
#[allow(dead_code)]
pub fn to_record_batch_with_schema(
schema: arrow_schema::SchemaRef,
chunk: &DataChunk,
Expand Down

0 comments on commit f126d38

Please sign in to comment.