diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index 7274bb3f34e4d..6d4e66ef6280a 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -160,6 +160,14 @@ macro_rules! converts_generic { .unwrap() .try_into()?, )), + // This arrow decimal type is used by iceberg source to read iceberg decimal into RW decimal. + Decimal128(_, _) => Ok(ArrayImpl::Decimal( + array + .as_any() + .downcast_ref::() + .unwrap() + .try_into()?, + )), t => Err(ArrayError::from_arrow(format!("unsupported data type: {t:?}"))), } } @@ -506,6 +514,30 @@ impl From<&DecimalArray> for arrow_array::LargeBinaryArray { } } +// This arrow decimal type is used by iceberg source to read iceberg decimal into RW decimal. +impl TryFrom<&arrow_array::Decimal128Array> for DecimalArray { + type Error = ArrayError; + + fn try_from(array: &arrow_array::Decimal128Array) -> Result { + if array.scale() < 0 { + bail!("support negative scale for arrow decimal") + } + let from_arrow = |value| { + const NAN: i128 = i128::MIN + 1; + match value { + NAN => Decimal::NaN, + i128::MAX => Decimal::PositiveInf, + i128::MIN => Decimal::NegativeInf, + _ => Decimal::Normalized(rust_decimal::Decimal::from_i128_with_scale( + value, + array.scale() as u32, + )), + } + }; + Ok(array.iter().map(|o| o.map(from_arrow)).collect()) + } +} + impl TryFrom<&arrow_array::LargeBinaryArray> for DecimalArray { type Error = ArrayError; diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index ce4269851cec7..374f8a925b1f0 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -473,7 +473,8 @@ impl IcebergSink { .try_into() .map_err(|err: icelake::Error| SinkError::Iceberg(anyhow!(err)))?; - try_matches_arrow_schema(&sink_schema, &iceberg_schema)?; + try_matches_arrow_schema(&sink_schema, &iceberg_schema, false) + .map_err(|err| SinkError::Iceberg(anyhow!(err)))?; Ok(table) } @@ -961,13 +962,18 @@ impl SinkCommitCoordinator for IcebergSinkCommitter { } /// Try to match our schema with iceberg schema. -pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> { +/// `for_source` = true means the schema is used for source, otherwise it's used for sink. +pub fn try_matches_arrow_schema( + rw_schema: &Schema, + arrow_schema: &ArrowSchema, + for_source: bool, +) -> anyhow::Result<()> { if rw_schema.fields.len() != arrow_schema.fields().len() { - return Err(SinkError::Iceberg(anyhow!( - "Schema length not match, ours is {}, and iceberg is {}", + bail!( + "Schema length not match, risingwave is {}, and iceberg is {}", rw_schema.fields.len(), arrow_schema.fields.len() - ))); + ); } let mut schema_fields = HashMap::new(); @@ -978,26 +984,37 @@ pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) }); for arrow_field in &arrow_schema.fields { - let our_field_type = schema_fields.get(arrow_field.name()).ok_or_else(|| { - SinkError::Iceberg(anyhow!( - "Field {} not found in our schema", - arrow_field.name() - )) - })?; - - let converted_arrow_data_type = - ArrowDataType::try_from(*our_field_type).map_err(|e| SinkError::Iceberg(anyhow!(e)))?; + let our_field_type = schema_fields + .get(arrow_field.name()) + .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?; + + // Iceberg source should be able to read iceberg decimal type. + // Since the arrow type default conversion is used by udf, in udf, decimal is converted to + // large binary type which is not compatible with iceberg decimal type, + // so we need to convert it to decimal type manually. + let converted_arrow_data_type = if for_source + && matches!(our_field_type, risingwave_common::types::DataType::Decimal) + { + // RisingWave decimal type cannot specify precision and scale, so we use the default value. + ArrowDataType::Decimal128(38, 0) + } else { + ArrowDataType::try_from(*our_field_type).map_err(|e| anyhow!(e))? + }; let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) { (ArrowDataType::Decimal128(p1, s1), ArrowDataType::Decimal128(p2, s2)) => { - *p1 >= *p2 && *s1 >= *s2 + if for_source { + true + } else { + *p1 >= *p2 && *s1 >= *s2 + } } (left, right) => left == right, }; if !compatible { - return Err(SinkError::Iceberg(anyhow!("Field {}'s type not compatible, ours converted data type {}, iceberg's data type: {}", + bail!("Field {}'s type not compatible, risingwave converted data type {}, iceberg's data type: {}", arrow_field.name(), converted_arrow_data_type, arrow_field.data_type() - ))); + ); } } @@ -1029,7 +1046,7 @@ mod test { ArrowField::new("c", ArrowDataType::Int32, false), ]); - try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap(); + try_matches_arrow_schema(&risingwave_schema, &arrow_schema, false).unwrap(); let risingwave_schema = Schema::new(vec![ Field::with_name(DataType::Int32, "d"), @@ -1043,7 +1060,7 @@ mod test { ArrowField::new("d", ArrowDataType::Int32, false), ArrowField::new("c", ArrowDataType::Int32, false), ]); - try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap(); + try_matches_arrow_schema(&risingwave_schema, &arrow_schema, false).unwrap(); } #[test] diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index e77876a636f16..c8e31bdab081e 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -1188,7 +1188,11 @@ pub async fn check_iceberg_source( .collect::>(); let new_iceberg_schema = arrow_schema::Schema::new(new_iceberg_field); - risingwave_connector::sink::iceberg::try_matches_arrow_schema(&schema, &new_iceberg_schema)?; + risingwave_connector::sink::iceberg::try_matches_arrow_schema( + &schema, + &new_iceberg_schema, + true, + )?; Ok(()) }