Skip to content

Commit

Permalink
refactor: simplify match statement
Browse files Browse the repository at this point in the history
  • Loading branch information
jetjinser committed May 16, 2024
1 parent 2f7b839 commit d4686b2
Showing 1 changed file with 28 additions and 35 deletions.
63 changes: 28 additions & 35 deletions src/connector/src/sink/dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,53 +323,46 @@ fn map_data_type(
let Some(scalar_ref) = scalar_ref else {
return Ok(AttributeValue::Null(true));
};
let attr = match (data_type, scalar_ref) {
(DataType::Int16, ScalarRefImpl::Int16(_))
| (DataType::Int32, ScalarRefImpl::Int32(_))
| (DataType::Int64, ScalarRefImpl::Int64(_))
| (DataType::Int256, ScalarRefImpl::Int256(_))
| (DataType::Float32, ScalarRefImpl::Float32(_))
| (DataType::Float64, ScalarRefImpl::Float64(_))
| (DataType::Decimal, ScalarRefImpl::Decimal(_))
| (DataType::Serial, ScalarRefImpl::Serial(_)) => {
AttributeValue::N(scalar_ref.to_text_with_type(data_type))
}
let attr = match data_type {
DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Int256
| DataType::Float32
| DataType::Float64
| DataType::Decimal
| DataType::Serial => AttributeValue::N(scalar_ref.to_text_with_type(data_type)),
// TODO: jsonb as dynamic type (https://github.com/risingwavelabs/risingwave/issues/11699)
(DataType::Varchar, ScalarRefImpl::Utf8(_))
| (DataType::Interval, ScalarRefImpl::Interval(_))
| (DataType::Date, ScalarRefImpl::Date(_))
| (DataType::Time, ScalarRefImpl::Time(_))
| (DataType::Timestamp, ScalarRefImpl::Timestamp(_))
| (DataType::Timestamptz, ScalarRefImpl::Timestamptz(_))
| (DataType::Jsonb, ScalarRefImpl::Jsonb(_)) => {
AttributeValue::S(scalar_ref.to_text_with_type(data_type))
}
(DataType::Boolean, ScalarRefImpl::Bool(v)) => AttributeValue::Bool(v),
(DataType::Bytea, ScalarRefImpl::Bytea(v)) => AttributeValue::B(Blob::new(v)),
(DataType::List(datatype), ScalarRefImpl::List(list_ref)) => {
let list_attr = list_ref
DataType::Varchar
| DataType::Interval
| DataType::Date
| DataType::Time
| DataType::Timestamp
| DataType::Timestamptz
| DataType::Jsonb => AttributeValue::S(scalar_ref.to_text_with_type(data_type)),
DataType::Boolean => AttributeValue::Bool(scalar_ref.into_bool()),
DataType::Bytea => AttributeValue::B(Blob::new(scalar_ref.into_bytea())),
DataType::List(datatype) => {
let list_attr = scalar_ref
.into_list()
.iter()
.map(|x| map_data_type(x, datatype))
.collect::<Result<Vec<_>>>()?;
AttributeValue::L(list_attr)
}
(DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => {
DataType::Struct(st) => {
let mut map = HashMap::with_capacity(st.len());
for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
st.iter()
.map(|(name, dt)| Field::with_name(dt.clone(), name)),
) {
for (sub_datum_ref, sub_field) in
scalar_ref.into_struct().iter_fields_ref().zip_eq_debug(
st.iter()
.map(|(name, dt)| Field::with_name(dt.clone(), name)),
)
{
let attr = map_data_type(sub_datum_ref, &sub_field.data_type())?;
map.insert(sub_field.name.clone(), attr);
}
AttributeValue::M(map)
}
(data_type, scalar_ref) => {
return Err(SinkError::DynamoDb(anyhow!(format!(
"map_data_type: unsupported data type: logical type: {:?}, physical type: {:?}",
data_type, scalar_ref
),)));
}
};
Ok(attr)
}

0 comments on commit d4686b2

Please sign in to comment.