diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs index 866412055bcd2..e8321a40443ea 100644 --- a/src/connector/src/sink/dynamodb.rs +++ b/src/connector/src/sink/dynamodb.rs @@ -323,53 +323,46 @@ fn map_data_type( let Some(scalar_ref) = scalar_ref else { return Ok(AttributeValue::Null(true)); }; - let attr = match (data_type, scalar_ref) { - (DataType::Int16, ScalarRefImpl::Int16(_)) - | (DataType::Int32, ScalarRefImpl::Int32(_)) - | (DataType::Int64, ScalarRefImpl::Int64(_)) - | (DataType::Int256, ScalarRefImpl::Int256(_)) - | (DataType::Float32, ScalarRefImpl::Float32(_)) - | (DataType::Float64, ScalarRefImpl::Float64(_)) - | (DataType::Decimal, ScalarRefImpl::Decimal(_)) - | (DataType::Serial, ScalarRefImpl::Serial(_)) => { - AttributeValue::N(scalar_ref.to_text_with_type(data_type)) - } + let attr = match data_type { + DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::Int256 + | DataType::Float32 + | DataType::Float64 + | DataType::Decimal + | DataType::Serial => AttributeValue::N(scalar_ref.to_text_with_type(data_type)), // TODO: jsonb as dynamic type (https://github.com/risingwavelabs/risingwave/issues/11699) - (DataType::Varchar, ScalarRefImpl::Utf8(_)) - | (DataType::Interval, ScalarRefImpl::Interval(_)) - | (DataType::Date, ScalarRefImpl::Date(_)) - | (DataType::Time, ScalarRefImpl::Time(_)) - | (DataType::Timestamp, ScalarRefImpl::Timestamp(_)) - | (DataType::Timestamptz, ScalarRefImpl::Timestamptz(_)) - | (DataType::Jsonb, ScalarRefImpl::Jsonb(_)) => { - AttributeValue::S(scalar_ref.to_text_with_type(data_type)) - } - (DataType::Boolean, ScalarRefImpl::Bool(v)) => AttributeValue::Bool(v), - (DataType::Bytea, ScalarRefImpl::Bytea(v)) => AttributeValue::B(Blob::new(v)), - (DataType::List(datatype), ScalarRefImpl::List(list_ref)) => { - let list_attr = list_ref + DataType::Varchar + | DataType::Interval + | DataType::Date + | DataType::Time + | DataType::Timestamp + | DataType::Timestamptz + | DataType::Jsonb => AttributeValue::S(scalar_ref.to_text_with_type(data_type)), + DataType::Boolean => AttributeValue::Bool(scalar_ref.into_bool()), + DataType::Bytea => AttributeValue::B(Blob::new(scalar_ref.into_bytea())), + DataType::List(datatype) => { + let list_attr = scalar_ref + .into_list() .iter() .map(|x| map_data_type(x, datatype)) .collect::>>()?; AttributeValue::L(list_attr) } - (DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => { + DataType::Struct(st) => { let mut map = HashMap::with_capacity(st.len()); - for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug( - st.iter() - .map(|(name, dt)| Field::with_name(dt.clone(), name)), - ) { + for (sub_datum_ref, sub_field) in + scalar_ref.into_struct().iter_fields_ref().zip_eq_debug( + st.iter() + .map(|(name, dt)| Field::with_name(dt.clone(), name)), + ) + { let attr = map_data_type(sub_datum_ref, &sub_field.data_type())?; map.insert(sub_field.name.clone(), attr); } AttributeValue::M(map) } - (data_type, scalar_ref) => { - return Err(SinkError::DynamoDb(anyhow!(format!( - "map_data_type: unsupported data type: logical type: {:?}, physical type: {:?}", - data_type, scalar_ref - ),))); - } }; Ok(attr) }