From 80ec46d02f292186a5d8a7eb36169cc526a34448 Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 13 Sep 2024 16:05:38 +0800 Subject: [PATCH] fix(sink): remove some todo!() with error Signed-off-by: xxchan --- src/common/src/array/map_array.rs | 8 ++++++ src/connector/codec/src/decoder/avro/mod.rs | 5 +--- src/connector/src/sink/big_query.rs | 10 ++++++-- src/connector/src/sink/clickhouse.rs | 1 + src/connector/src/sink/dynamodb.rs | 28 ++++++++++----------- src/connector/src/sink/encoder/avro.rs | 2 +- src/connector/src/sink/encoder/bson.rs | 1 + src/connector/src/sink/encoder/json.rs | 1 + src/connector/src/sink/encoder/proto.rs | 2 +- src/connector/src/sink/encoder/template.rs | 3 ++- src/connector/src/sink/encoder/text.rs | 1 + src/expr/impl/src/scalar/array.rs | 5 +--- 12 files changed, 39 insertions(+), 28 deletions(-) diff --git a/src/common/src/array/map_array.rs b/src/common/src/array/map_array.rs index 2f0da9bbf816f..f519c25981a56 100644 --- a/src/common/src/array/map_array.rs +++ b/src/common/src/array/map_array.rs @@ -337,6 +337,14 @@ mod scalar { pub fn to_owned(self) -> MapValue { MapValue(self.0.to_owned()) } + + pub fn len(&self) -> usize { + self.0.len() + } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } } impl Scalar for MapValue { diff --git a/src/connector/codec/src/decoder/avro/mod.rs b/src/connector/codec/src/decoder/avro/mod.rs index dc4dae49ca7c4..ebae0e1292fec 100644 --- a/src/connector/codec/src/decoder/avro/mod.rs +++ b/src/connector/codec/src/decoder/avro/mod.rs @@ -136,10 +136,7 @@ impl<'a> AvroParseOptions<'a> { let expected_field_name = avro_schema_to_struct_field_name(variant_schema)?; let mut fields = Vec::with_capacity(struct_type_info.len()); - for (field_name, field_type) in struct_type_info - .names() - .zip_eq_fast(struct_type_info.types()) - { + for (field_name, field_type) in struct_type_info.iter() { if field_name == expected_field_name { let datum = Self { schema: Some(variant_schema), diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 235b1ff5b6539..85e8ba0187c99 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -342,7 +342,9 @@ impl BigQuerySink { DataType::Int256 => Err(SinkError::BigQuery(anyhow::anyhow!( "Bigquery cannot support Int256" ))), - DataType::Map(_) => todo!(), + DataType::Map(_) => Err(SinkError::BigQuery(anyhow::anyhow!( + "Bigquery cannot support Map" + ))), } } @@ -392,7 +394,11 @@ impl BigQuerySink { "Bigquery cannot support Int256" ))) } - DataType::Map(_) => todo!(), + DataType::Map(_) => { + return Err(SinkError::BigQuery(anyhow::anyhow!( + "Bigquery cannot support Map" + ))) + } }; Ok(tfs) } diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index 07db42790f581..21a7c56dc8154 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -425,6 +425,7 @@ impl ClickHouseSink { fields_type: &DataType, ck_column: &SystemColumn, ) -> Result<()> { + // FIXME: the "contains" based implementation is wrong let is_match = match fields_type { risingwave_common::types::DataType::Boolean => Ok(ck_column.r#type.contains("Bool")), risingwave_common::types::DataType::Int16 => Ok(ck_column.r#type.contains("UInt16") diff --git a/src/connector/src/sink/dynamodb.rs b/src/connector/src/sink/dynamodb.rs index 6d73bf2d478c8..c8c3c598e6319 100644 --- a/src/connector/src/sink/dynamodb.rs +++ b/src/connector/src/sink/dynamodb.rs @@ -24,7 +24,7 @@ use dynamodb::types::{ }; use maplit::hashmap; use risingwave_common::array::{Op, RowRef, StreamChunk}; -use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::catalog::Schema; use risingwave_common::row::Row as _; use risingwave_common::types::{DataType, ScalarRefImpl, ToText}; use risingwave_common::util::iter_util::ZipEqDebug; @@ -345,16 +345,13 @@ impl DynamoDbFormatter { row.iter() .zip_eq_debug((self.schema.clone()).into_fields()) .map(|(scalar, field)| { - map_data_type(scalar, &field.data_type()).map(|attr| (field.name, attr)) + map_data(scalar, &field.data_type()).map(|attr| (field.name, attr)) }) .collect() } } -fn map_data_type( - scalar_ref: Option>, - data_type: &DataType, -) -> Result { +fn map_data(scalar_ref: Option>, data_type: &DataType) -> Result { let Some(scalar_ref) = scalar_ref else { return Ok(AttributeValue::Null(true)); }; @@ -381,24 +378,25 @@ fn map_data_type( let list_attr = scalar_ref .into_list() .iter() - .map(|x| map_data_type(x, datatype)) + .map(|x| map_data(x, datatype)) .collect::>>()?; AttributeValue::L(list_attr) } DataType::Struct(st) => { let mut map = HashMap::with_capacity(st.len()); - for (sub_datum_ref, sub_field) in - scalar_ref.into_struct().iter_fields_ref().zip_eq_debug( - st.iter() - .map(|(name, dt)| Field::with_name(dt.clone(), name)), - ) + for (sub_datum_ref, (name, data_type)) in scalar_ref + .into_struct() + .iter_fields_ref() + .zip_eq_debug(st.iter()) { - let attr = map_data_type(sub_datum_ref, &sub_field.data_type())?; - map.insert(sub_field.name.clone(), attr); + let attr = map_data(sub_datum_ref, data_type)?; + map.insert(name.to_string(), attr); } AttributeValue::M(map) } - DataType::Map(_) => todo!(), + DataType::Map(_m) => { + return Err(SinkError::DynamoDb(anyhow!("map is not supported yet"))); + } }; Ok(attr) } diff --git a/src/connector/src/sink/encoder/avro.rs b/src/connector/src/sink/encoder/avro.rs index 4a2060f0a8c6c..1a9218572814f 100644 --- a/src/connector/src/sink/encoder/avro.rs +++ b/src/connector/src/sink/encoder/avro.rs @@ -455,7 +455,7 @@ fn encode_field( return no_match_err(); } DataType::Map(_) => { - // TODO: + // TODO(map): support map return no_match_err(); } }; diff --git a/src/connector/src/sink/encoder/bson.rs b/src/connector/src/sink/encoder/bson.rs index c401d0575a12b..5f7908ed4cedd 100644 --- a/src/connector/src/sink/encoder/bson.rs +++ b/src/connector/src/sink/encoder/bson.rs @@ -188,6 +188,7 @@ fn datum_to_bson(field: &Field, datum: DatumRef<'_>) -> Bson { subtype: BinarySubtype::Generic, bytes: v.into(), }), + // TODO(map): support map _ => { if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::warn!( diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index 6dc8809f42933..7691b3de5f447 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -346,6 +346,7 @@ fn datum_to_json_object( } } } + // TODO(map): support map (data_type, scalar_ref) => { return Err(ArrayError::internal( format!("datum_to_json_object: unsupported data type: field name: {:?}, logical type: {:?}, physical type: {:?}", field.name, data_type, scalar_ref), diff --git a/src/connector/src/sink/encoder/proto.rs b/src/connector/src/sink/encoder/proto.rs index ce6e8503b624e..88fb445b1c3ba 100644 --- a/src/connector/src/sink/encoder/proto.rs +++ b/src/connector/src/sink/encoder/proto.rs @@ -421,7 +421,7 @@ fn encode_field( return no_match_err(); } DataType::Map(_) => { - // TODO: + // TODO(map): support map return no_match_err(); } }; diff --git a/src/connector/src/sink/encoder/template.rs b/src/connector/src/sink/encoder/template.rs index 1903be667d781..c34e75e4ad435 100644 --- a/src/connector/src/sink/encoder/template.rs +++ b/src/connector/src/sink/encoder/template.rs @@ -22,7 +22,8 @@ use risingwave_common::types::ToText; use super::{Result, RowEncoder}; use crate::sink::SinkError; -/// Encode a row according to a specified string template `user_id:{user_id}` +/// Encode a row according to a specified string template `user_id:{user_id}`. +/// Data is encoded to string with [`ToText`]. pub struct TemplateEncoder { schema: Schema, col_indices: Option>, diff --git a/src/connector/src/sink/encoder/text.rs b/src/connector/src/sink/encoder/text.rs index 734ac8bd6a425..369f4212fea6b 100644 --- a/src/connector/src/sink/encoder/text.rs +++ b/src/connector/src/sink/encoder/text.rs @@ -17,6 +17,7 @@ use risingwave_common::types::{DataType, ToText}; use super::RowEncoder; +/// Encode with [`ToText`]. Only used to encode key. pub struct TextEncoder { pub schema: Schema, // the column must contain only one element diff --git a/src/expr/impl/src/scalar/array.rs b/src/expr/impl/src/scalar/array.rs index 7b7d272000597..863bd5eba62fe 100644 --- a/src/expr/impl/src/scalar/array.rs +++ b/src/expr/impl/src/scalar/array.rs @@ -153,10 +153,7 @@ fn map_contains(map: MapRef<'_>, key: ScalarRefImpl<'_>) -> Result int4")] fn map_length>(map: MapRef<'_>) -> Result { - map.inner() - .len() - .try_into() - .map_err(|_| ExprError::NumericOverflow) + map.len().try_into().map_err(|_| ExprError::NumericOverflow) } /// If both `m1` and `m2` have a value with the same key, then the output map contains the value from `m2`.