Skip to content

Commit

Permalink
fix(sink): remove some todo!() with error
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Sep 17, 2024
1 parent 9167768 commit ca90f22
Show file tree
Hide file tree
Showing 12 changed files with 39 additions and 28 deletions.
8 changes: 8 additions & 0 deletions src/common/src/array/map_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,14 @@ mod scalar {
pub fn to_owned(self) -> MapValue {
MapValue(self.0.to_owned())
}

pub fn len(&self) -> usize {
self.0.len()
}

pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}

impl Scalar for MapValue {
Expand Down
5 changes: 1 addition & 4 deletions src/connector/codec/src/decoder/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,7 @@ impl<'a> AvroParseOptions<'a> {
let expected_field_name = avro_schema_to_struct_field_name(variant_schema)?;

let mut fields = Vec::with_capacity(struct_type_info.len());
for (field_name, field_type) in struct_type_info
.names()
.zip_eq_fast(struct_type_info.types())
{
for (field_name, field_type) in struct_type_info.iter() {
if field_name == expected_field_name {
let datum = Self {
schema: Some(variant_schema),
Expand Down
10 changes: 8 additions & 2 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,9 @@ impl BigQuerySink {
DataType::Int256 => Err(SinkError::BigQuery(anyhow::anyhow!(
"Bigquery cannot support Int256"
))),
DataType::Map(_) => todo!(),
DataType::Map(_) => Err(SinkError::BigQuery(anyhow::anyhow!(
"Bigquery cannot support Map"
))),
}
}

Expand Down Expand Up @@ -392,7 +394,11 @@ impl BigQuerySink {
"Bigquery cannot support Int256"
)))
}
DataType::Map(_) => todo!(),
DataType::Map(_) => {
return Err(SinkError::BigQuery(anyhow::anyhow!(
"Bigquery cannot support Map"
)))
}
};
Ok(tfs)
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ impl ClickHouseSink {
fields_type: &DataType,
ck_column: &SystemColumn,
) -> Result<()> {
// FIXME: the "contains" based implementation is wrong
let is_match = match fields_type {
risingwave_common::types::DataType::Boolean => Ok(ck_column.r#type.contains("Bool")),
risingwave_common::types::DataType::Int16 => Ok(ck_column.r#type.contains("UInt16")
Expand Down
28 changes: 13 additions & 15 deletions src/connector/src/sink/dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use dynamodb::types::{
};
use maplit::hashmap;
use risingwave_common::array::{Op, RowRef, StreamChunk};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::catalog::Schema;
use risingwave_common::row::Row as _;
use risingwave_common::types::{DataType, ScalarRefImpl, ToText};
use risingwave_common::util::iter_util::ZipEqDebug;
Expand Down Expand Up @@ -345,16 +345,13 @@ impl DynamoDbFormatter {
row.iter()
.zip_eq_debug((self.schema.clone()).into_fields())
.map(|(scalar, field)| {
map_data_type(scalar, &field.data_type()).map(|attr| (field.name, attr))
map_data(scalar, &field.data_type()).map(|attr| (field.name, attr))
})
.collect()
}
}

fn map_data_type(
scalar_ref: Option<ScalarRefImpl<'_>>,
data_type: &DataType,
) -> Result<AttributeValue> {
fn map_data(scalar_ref: Option<ScalarRefImpl<'_>>, data_type: &DataType) -> Result<AttributeValue> {
let Some(scalar_ref) = scalar_ref else {
return Ok(AttributeValue::Null(true));
};
Expand All @@ -381,24 +378,25 @@ fn map_data_type(
let list_attr = scalar_ref
.into_list()
.iter()
.map(|x| map_data_type(x, datatype))
.map(|x| map_data(x, datatype))
.collect::<Result<Vec<_>>>()?;
AttributeValue::L(list_attr)
}
DataType::Struct(st) => {
let mut map = HashMap::with_capacity(st.len());
for (sub_datum_ref, sub_field) in
scalar_ref.into_struct().iter_fields_ref().zip_eq_debug(
st.iter()
.map(|(name, dt)| Field::with_name(dt.clone(), name)),
)
for (sub_datum_ref, (name, data_type)) in scalar_ref
.into_struct()
.iter_fields_ref()
.zip_eq_debug(st.iter())
{
let attr = map_data_type(sub_datum_ref, &sub_field.data_type())?;
map.insert(sub_field.name.clone(), attr);
let attr = map_data(sub_datum_ref, &data_type)?;
map.insert(name.to_string(), attr);
}
AttributeValue::M(map)
}
DataType::Map(_) => todo!(),
DataType::Map(_m) => {
return Err(SinkError::DynamoDb(anyhow!("map is not supported yet")));
}
};
Ok(attr)
}
2 changes: 1 addition & 1 deletion src/connector/src/sink/encoder/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ fn encode_field<D: MaybeData>(
return no_match_err();
}
DataType::Map(_) => {
// TODO:
// TODO(map): support map
return no_match_err();
}
};
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/sink/encoder/bson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ fn datum_to_bson(field: &Field, datum: DatumRef<'_>) -> Bson {
subtype: BinarySubtype::Generic,
bytes: v.into(),
}),
// TODO(map): support map
_ => {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::warn!(
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/sink/encoder/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ fn datum_to_json_object(
}
}
}
// TODO(map): support map
(data_type, scalar_ref) => {
return Err(ArrayError::internal(
format!("datum_to_json_object: unsupported data type: field name: {:?}, logical type: {:?}, physical type: {:?}", field.name, data_type, scalar_ref),
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/encoder/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ fn encode_field<D: MaybeData>(
return no_match_err();
}
DataType::Map(_) => {
// TODO:
// TODO(map): support map
return no_match_err();
}
};
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/sink/encoder/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use risingwave_common::types::ToText;
use super::{Result, RowEncoder};
use crate::sink::SinkError;

/// Encode a row according to a specified string template `user_id:{user_id}`
/// Encode a row according to a specified string template `user_id:{user_id}`.
/// Data is encoded to string with [`ToText`].
pub struct TemplateEncoder {
schema: Schema,
col_indices: Option<Vec<usize>>,
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/sink/encoder/text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use risingwave_common::types::{DataType, ToText};

use super::RowEncoder;

/// Encode with [`ToText`]. Only used to encode key.
pub struct TextEncoder {
pub schema: Schema,
// the column must contain only one element
Expand Down
5 changes: 1 addition & 4 deletions src/expr/impl/src/scalar/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,7 @@ fn map_contains(map: MapRef<'_>, key: ScalarRefImpl<'_>) -> Result<bool, ExprErr
/// ```
#[function("map_length(anymap) -> int4")]
fn map_length<T: TryFrom<usize>>(map: MapRef<'_>) -> Result<T, ExprError> {
map.inner()
.len()
.try_into()
.map_err(|_| ExprError::NumericOverflow)
map.len().try_into().map_err(|_| ExprError::NumericOverflow)
}

/// If both `m1` and `m2` have a value with the same key, then the output map contains the value from `m2`.
Expand Down

0 comments on commit ca90f22

Please sign in to comment.