Skip to content

Commit

Permalink
feat(batch): support decimal type for iceberg type (#15298)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Feb 28, 2024
1 parent 495ed83 commit c2734d0
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 20 deletions.
32 changes: 32 additions & 0 deletions src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,14 @@ macro_rules! converts_generic {
.unwrap()
.try_into()?,
)),
// This arrow decimal type is used by iceberg source to read iceberg decimal into RW decimal.
Decimal128(_, _) => Ok(ArrayImpl::Decimal(
array
.as_any()
.downcast_ref::<arrow_array::Decimal128Array>()
.unwrap()
.try_into()?,
)),
t => Err(ArrayError::from_arrow(format!("unsupported data type: {t:?}"))),
}
}
Expand Down Expand Up @@ -506,6 +514,30 @@ impl From<&DecimalArray> for arrow_array::LargeBinaryArray {
}
}

// This arrow decimal type is used by iceberg source to read iceberg decimal into RW decimal.
impl TryFrom<&arrow_array::Decimal128Array> for DecimalArray {
type Error = ArrayError;

fn try_from(array: &arrow_array::Decimal128Array) -> Result<Self, Self::Error> {
if array.scale() < 0 {
bail!("support negative scale for arrow decimal")
}
let from_arrow = |value| {
const NAN: i128 = i128::MIN + 1;
match value {
NAN => Decimal::NaN,
i128::MAX => Decimal::PositiveInf,
i128::MIN => Decimal::NegativeInf,
_ => Decimal::Normalized(rust_decimal::Decimal::from_i128_with_scale(
value,
array.scale() as u32,
)),
}
};
Ok(array.iter().map(|o| o.map(from_arrow)).collect())
}
}

impl TryFrom<&arrow_array::LargeBinaryArray> for DecimalArray {
type Error = ArrayError;

Expand Down
55 changes: 36 additions & 19 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,8 @@ impl IcebergSink {
.try_into()
.map_err(|err: icelake::Error| SinkError::Iceberg(anyhow!(err)))?;

try_matches_arrow_schema(&sink_schema, &iceberg_schema)?;
try_matches_arrow_schema(&sink_schema, &iceberg_schema, false)
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;

Ok(table)
}
Expand Down Expand Up @@ -961,13 +962,18 @@ impl SinkCommitCoordinator for IcebergSinkCommitter {
}

/// Try to match our schema with iceberg schema.
pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
/// `for_source` = true means the schema is used for source, otherwise it's used for sink.
pub fn try_matches_arrow_schema(
rw_schema: &Schema,
arrow_schema: &ArrowSchema,
for_source: bool,
) -> anyhow::Result<()> {
if rw_schema.fields.len() != arrow_schema.fields().len() {
return Err(SinkError::Iceberg(anyhow!(
"Schema length not match, ours is {}, and iceberg is {}",
bail!(
"Schema length not match, risingwave is {}, and iceberg is {}",
rw_schema.fields.len(),
arrow_schema.fields.len()
)));
);
}

let mut schema_fields = HashMap::new();
Expand All @@ -978,26 +984,37 @@ pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema)
});

for arrow_field in &arrow_schema.fields {
let our_field_type = schema_fields.get(arrow_field.name()).ok_or_else(|| {
SinkError::Iceberg(anyhow!(
"Field {} not found in our schema",
arrow_field.name()
))
})?;

let converted_arrow_data_type =
ArrowDataType::try_from(*our_field_type).map_err(|e| SinkError::Iceberg(anyhow!(e)))?;
let our_field_type = schema_fields
.get(arrow_field.name())
.ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;

// Iceberg source should be able to read iceberg decimal type.
// Since the arrow type default conversion is used by udf, in udf, decimal is converted to
// large binary type which is not compatible with iceberg decimal type,
// so we need to convert it to decimal type manually.
let converted_arrow_data_type = if for_source
&& matches!(our_field_type, risingwave_common::types::DataType::Decimal)
{
// RisingWave decimal type cannot specify precision and scale, so we use the default value.
ArrowDataType::Decimal128(38, 0)
} else {
ArrowDataType::try_from(*our_field_type).map_err(|e| anyhow!(e))?
};

let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
(ArrowDataType::Decimal128(p1, s1), ArrowDataType::Decimal128(p2, s2)) => {
*p1 >= *p2 && *s1 >= *s2
if for_source {
true
} else {
*p1 >= *p2 && *s1 >= *s2
}
}
(left, right) => left == right,
};
if !compatible {
return Err(SinkError::Iceberg(anyhow!("Field {}'s type not compatible, ours converted data type {}, iceberg's data type: {}",
bail!("Field {}'s type not compatible, risingwave converted data type {}, iceberg's data type: {}",
arrow_field.name(), converted_arrow_data_type, arrow_field.data_type()
)));
);
}
}

Expand Down Expand Up @@ -1029,7 +1046,7 @@ mod test {
ArrowField::new("c", ArrowDataType::Int32, false),
]);

try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
try_matches_arrow_schema(&risingwave_schema, &arrow_schema, false).unwrap();

let risingwave_schema = Schema::new(vec![
Field::with_name(DataType::Int32, "d"),
Expand All @@ -1043,7 +1060,7 @@ mod test {
ArrowField::new("d", ArrowDataType::Int32, false),
ArrowField::new("c", ArrowDataType::Int32, false),
]);
try_matches_arrow_schema(&risingwave_schema, &arrow_schema).unwrap();
try_matches_arrow_schema(&risingwave_schema, &arrow_schema, false).unwrap();
}

#[test]
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1188,7 +1188,11 @@ pub async fn check_iceberg_source(
.collect::<Vec<_>>();
let new_iceberg_schema = arrow_schema::Schema::new(new_iceberg_field);

risingwave_connector::sink::iceberg::try_matches_arrow_schema(&schema, &new_iceberg_schema)?;
risingwave_connector::sink::iceberg::try_matches_arrow_schema(
&schema,
&new_iceberg_schema,
true,
)?;

Ok(())
}
Expand Down

0 comments on commit c2734d0

Please sign in to comment.