diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index caf478934b3d4..64ab8121aaaa7 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -171,7 +171,7 @@ impl DorisSink { Ok(doris_data_type.contains("DATETIME")) } risingwave_common::types::DataType::Timestamptz => Err(SinkError::Doris( - "doris can not support Timestamptz".to_string(), + "TIMESTAMP WITH TIMEZONE is not supported for Doris sink as Doris doesn't store time values with timezone information. Please convert to TIMESTAMP first.".to_string(), )), risingwave_common::types::DataType::Interval => Err(SinkError::Doris( "doris can not support Interval".to_string(), diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index ce019dd186005..8a77f1a13cf1b 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -141,6 +141,14 @@ impl HeaderBuilder { self } + pub fn set_partial_update(mut self, partial_update: Option) -> Self { + self.header.insert( + "partial_update".to_string(), + partial_update.unwrap_or_else(|| "false".to_string()), + ); + self + } + pub fn build(self) -> HashMap { self.header } diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index 22c41a18c002a..eb5c7b129385d 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -98,6 +98,24 @@ impl JsonEncoder { } } + pub fn new_with_starrocks( + schema: Schema, + col_indices: Option>, + timestamp_handling_mode: TimestampHandlingMode, + map: HashMap, + ) -> Self { + Self { + schema, + col_indices, + time_handling_mode: TimeHandlingMode::Milli, + date_handling_mode: DateHandlingMode::String, + timestamp_handling_mode, + timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix, + custom_json_type: CustomJsonType::StarRocks(map), + kafka_connect: None, + } + } + pub fn with_kafka_connect(self, kafka_connect: KafkaConnectParams) -> Self { Self { kafka_connect: Some(Arc::new(kafka_connect)), @@ -205,7 +223,7 @@ fn datum_to_json_object( json!(v) } (DataType::Decimal, ScalarRefImpl::Decimal(mut v)) => match custom_json_type { - CustomJsonType::Doris(map) => { + CustomJsonType::Doris(map) | CustomJsonType::StarRocks(map) => { if !matches!(v, Decimal::Normalized(_)) { return Err(ArrayError::internal( "doris/starrocks can't support decimal Inf, -Inf, Nan".to_string(), @@ -272,8 +290,10 @@ fn datum_to_json_object( json!(v.as_iso_8601()) } (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match custom_json_type { - CustomJsonType::Es => JsonbVal::from(jsonb_ref).take(), - CustomJsonType::Doris(_) | CustomJsonType::None => json!(jsonb_ref.to_string()), + CustomJsonType::Es | CustomJsonType::StarRocks(_) => JsonbVal::from(jsonb_ref).take(), + CustomJsonType::Doris(_) | CustomJsonType::None => { + json!(jsonb_ref.to_string()) + } }, (DataType::List(datatype), ScalarRefImpl::List(list_ref)) => { let elems = list_ref.iter(); @@ -317,6 +337,11 @@ fn datum_to_json_object( serde_json::to_string(&map).context("failed to serialize into JSON")?, ) } + CustomJsonType::StarRocks(_) => { + return Err(ArrayError::internal( + "starrocks can't support struct".to_string(), + )); + } CustomJsonType::Es | CustomJsonType::None => { let mut map = Map::with_capacity(st.len()); for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug( diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index 83c28ce4b4a5a..3c76803c8e0f1 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -142,6 +142,8 @@ pub enum CustomJsonType { Doris(HashMap), // Es's json need jsonb is struct Es, + // starrocks' need jsonb is struct + StarRocks(HashMap), None, } diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 4c9460abc431d..11594133695d4 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -69,6 +69,8 @@ pub struct StarrocksCommon { /// The StarRocks table you want to sink data to. #[serde(rename = "starrocks.table")] pub table: String, + #[serde(rename = "starrocks.partial_update")] + pub partial_update: Option, } #[serde_as] @@ -126,8 +128,8 @@ impl StarrocksSink { starrocks_columns_desc: HashMap, ) -> Result<()> { let rw_fields_name = self.schema.fields(); - if rw_fields_name.len().ne(&starrocks_columns_desc.len()) { - return Err(SinkError::Starrocks("The length of the RisingWave column must be equal to the length of the starrocks column".to_string())); + if rw_fields_name.len() > starrocks_columns_desc.len() { + return Err(SinkError::Starrocks("The length of the RisingWave column must be equal or less to the length of the starrocks column".to_string())); } for i in rw_fields_name { @@ -179,7 +181,7 @@ impl StarrocksSink { Ok(starrocks_data_type.contains("datetime")) } risingwave_common::types::DataType::Timestamptz => Err(SinkError::Starrocks( - "starrocks can not support Timestamptz".to_string(), + "TIMESTAMP WITH TIMEZONE is not supported for Starrocks sink as Starrocks doesn't store time values with timezone information. Please convert to TIMESTAMP first.".to_string(), )), risingwave_common::types::DataType::Interval => Err(SinkError::Starrocks( "starrocks can not support Interval".to_string(), @@ -194,9 +196,7 @@ impl StarrocksSink { risingwave_common::types::DataType::Bytea => Err(SinkError::Starrocks( "starrocks can not support Bytea".to_string(), )), - risingwave_common::types::DataType::Jsonb => Err(SinkError::Starrocks( - "starrocks can not support import json".to_string(), - )), + risingwave_common::types::DataType::Jsonb => Ok(starrocks_data_type.contains("json")), risingwave_common::types::DataType::Serial => { Ok(starrocks_data_type.contains("bigint")) } @@ -337,18 +337,18 @@ impl StarrocksSinkWriter { decimal_map.insert(name.to_string(), (length, scale)); } } + let mut fields_name = schema.names_str(); + if !is_append_only { + fields_name.push(STARROCKS_DELETE_SIGN); + }; - let builder = HeaderBuilder::new() + let header = HeaderBuilder::new() .add_common_header() .set_user_password(config.common.user.clone(), config.common.password.clone()) - .add_json_format(); - let header = if !is_append_only { - let mut fields_name = schema.names_str(); - fields_name.push(STARROCKS_DELETE_SIGN); - builder.set_columns_name(fields_name).build() - } else { - builder.build() - }; + .add_json_format() + .set_partial_update(config.common.partial_update.clone()) + .set_columns_name(fields_name) + .build(); let starrocks_insert_builder = InserterInnerBuilder::new( format!("http://{}:{}", config.common.host, config.common.http_port), @@ -363,7 +363,7 @@ impl StarrocksSinkWriter { inserter_innet_builder: starrocks_insert_builder, is_append_only, client: None, - row_encoder: JsonEncoder::new_with_doris( + row_encoder: JsonEncoder::new_with_starrocks( schema, None, TimestampHandlingMode::String, diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index b2090d1dbf4bb..8035686d0987c 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -483,6 +483,9 @@ StarrocksConfig: field_type: String comments: The StarRocks table you want to sink data to. required: true + - name: starrocks.partial_update + field_type: String + required: false - name: r#type field_type: String required: true