diff --git a/ci/scripts/e2e-deltalake-sink-rust-test.sh b/ci/scripts/e2e-deltalake-sink-rust-test.sh index 0a96b4331dd9a..d4a4828687b63 100755 --- a/ci/scripts/e2e-deltalake-sink-rust-test.sh +++ b/ci/scripts/e2e-deltalake-sink-rust-test.sh @@ -48,7 +48,7 @@ spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \ --conf 'spark.hadoop.fs.s3a.secret.key=hummockadmin' \ --conf 'spark.hadoop.fs.s3a.endpoint=http://127.0.0.1:9301' \ --conf 'spark.hadoop.fs.s3a.path.style.access=true' \ - --S --e 'create table delta.`s3a://deltalake/deltalake-test`(v1 int, v2 short, v3 long, v4 float, v5 double, v6 string, v7 date, v8 Timestamp, v9 boolean) using delta;' + --S --e 'create table delta.`s3a://deltalake/deltalake-test`(v1 int, v2 short, v3 long, v4 float, v5 double, v6 string, v7 date, v8 Timestamp, v9 boolean, v10 decimal) using delta;' echo "--- testing sinks" @@ -67,7 +67,7 @@ spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \ # check sink destination using shell if cat ./spark-output/*.csv | sort | awk -F "," '{ - exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01T01:01:01.000Z" && $9 == "false"); }'; then + exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01T01:01:01.000Z" && $9 == "false" && $10 == 1); }'; then echo "DeltaLake sink check passed" else cat ./spark-output/*.csv diff --git a/e2e_test/sink/deltalake_rust_sink.slt b/e2e_test/sink/deltalake_rust_sink.slt index 9265940e01edf..1f983f262685d 100644 --- a/e2e_test/sink/deltalake_rust_sink.slt +++ b/e2e_test/sink/deltalake_rust_sink.slt @@ -1,5 +1,5 @@ statement ok -CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean); +CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean, v10 decimal); statement ok CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; @@ -17,7 +17,7 @@ with ( ); statement ok -INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01+00:00' , false); +INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01+00:00' , false, 1); statement ok FLUSH; diff --git a/src/common/src/array/arrow/arrow_deltalake.rs b/src/common/src/array/arrow/arrow_deltalake.rs index c1583373d8ac0..c19deea059722 100644 --- a/src/common/src/array/arrow/arrow_deltalake.rs +++ b/src/common/src/array/arrow/arrow_deltalake.rs @@ -17,12 +17,104 @@ //! //! The corresponding version of arrow is currently used by `deltalake` sink. -pub use arrow_impl::to_record_batch_with_schema as to_deltalake_record_batch_with_schema; +use std::ops::{Div, Mul}; +use std::sync::Arc; + +use arrow_array::ArrayRef; +use itertools::Itertools; +use num_traits::abs; use { arrow_array_deltalake as arrow_array, arrow_buffer_deltalake as arrow_buffer, arrow_cast_deltalake as arrow_cast, arrow_schema_deltalake as arrow_schema, }; +use self::arrow_impl::ToArrowArrayWithTypeConvert; +use crate::array::{Array, ArrayError, DataChunk, DecimalArray}; +use crate::util::iter_util::ZipEqFast; #[expect(clippy::duplicate_mod)] #[path = "./arrow_impl.rs"] mod arrow_impl; + +struct DeltaLakeConvert; + +impl arrow_impl::ToArrowArrayWithTypeConvert for DeltaLakeConvert { + fn decimal_to_arrow( + &self, + data_type: &arrow_schema::DataType, + array: &DecimalArray, + ) -> Result { + let (precision, max_scale) = match data_type { + arrow_schema::DataType::Decimal128(precision, scale) => (*precision, *scale), + _ => return Err(ArrayError::to_arrow("Invalid decimal type")), + }; + + // Convert Decimal to i128: + let values: Vec> = array + .iter() + .map(|e| { + e.and_then(|e| match e { + crate::array::Decimal::Normalized(e) => { + let value = e.mantissa(); + let scale = e.scale() as i8; + let diff_scale = abs(max_scale - scale); + let value = match scale { + _ if scale < max_scale => { + value.mul(10_i32.pow(diff_scale as u32) as i128) + } + _ if scale > max_scale => { + value.div(10_i32.pow(diff_scale as u32) as i128) + } + _ => value, + }; + Some(value) + } + // For Inf, we replace them with the max/min value within the precision. + crate::array::Decimal::PositiveInf => { + let max_value = 10_i128.pow(precision as u32) - 1; + Some(max_value) + } + crate::array::Decimal::NegativeInf => { + let max_value = 10_i128.pow(precision as u32) - 1; + Some(-max_value) + } + crate::array::Decimal::NaN => None, + }) + }) + .collect(); + + let array = arrow_array::Decimal128Array::from(values) + .with_precision_and_scale(precision, max_scale) + .map_err(ArrayError::from_arrow)?; + Ok(Arc::new(array) as ArrayRef) + } +} + +/// Converts RisingWave array to Arrow array with the schema. +/// This function will try to convert the array if the type is not same with the schema. +pub fn to_deltalake_record_batch_with_schema( + schema: arrow_schema::SchemaRef, + chunk: &DataChunk, +) -> Result { + if !chunk.is_compacted() { + let c = chunk.clone(); + return to_deltalake_record_batch_with_schema(schema, &c.compact()); + } + let columns: Vec<_> = chunk + .columns() + .iter() + .zip_eq_fast(schema.fields().iter()) + .map(|(column, field)| { + let column: arrow_array::ArrayRef = + DeltaLakeConvert.to_arrow_with_type(field.data_type(), column)?; + if column.data_type() == field.data_type() { + Ok(column) + } else { + arrow_cast::cast(&column, field.data_type()).map_err(ArrayError::from_arrow) + } + }) + .try_collect::<_, _, ArrayError>()?; + + let opts = arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity())); + arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts) + .map_err(ArrayError::to_arrow) +} diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index f22ee55e9eece..e288a8f8fff67 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -52,6 +52,7 @@ use crate::util::iter_util::ZipEqFast; /// Converts RisingWave array to Arrow array with the schema. /// This function will try to convert the array if the type is not same with the schema. +#[allow(dead_code)] pub fn to_record_batch_with_schema( schema: arrow_schema::SchemaRef, chunk: &DataChunk,