From 423b0d9b2063953879a9f7d726adfb712d8e9a55 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 8 Mar 2024 17:31:23 +0800 Subject: [PATCH] feat(Sink): Enhanced integration with StarRocks (#14363) (#15555) Co-authored-by: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> --- src/connector/src/sink/doris.rs | 2 +- .../src/sink/doris_starrocks_connector.rs | 8 +++++ src/connector/src/sink/encoder/json.rs | 31 ++++++++++++++++-- src/connector/src/sink/encoder/mod.rs | 2 ++ src/connector/src/sink/starrocks.rs | 32 +++++++++---------- src/connector/with_options_sink.yaml | 3 ++ 6 files changed, 58 insertions(+), 20 deletions(-) diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index dbe87b4cb51ca..808396df09022 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -170,7 +170,7 @@ impl DorisSink { Ok(doris_data_type.contains("DATETIME")) } risingwave_common::types::DataType::Timestamptz => Err(SinkError::Doris( - "doris can not support Timestamptz".to_string(), + "TIMESTAMP WITH TIMEZONE is not supported for Doris sink as Doris doesn't store time values with timezone information. Please convert to TIMESTAMP first.".to_string(), )), risingwave_common::types::DataType::Interval => Err(SinkError::Doris( "doris can not support Interval".to_string(), diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index 550572a2b4bcc..147aeb6610526 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -140,6 +140,14 @@ impl HeaderBuilder { self } + pub fn set_partial_update(mut self, partial_update: Option) -> Self { + self.header.insert( + "partial_update".to_string(), + partial_update.unwrap_or_else(|| "false".to_string()), + ); + self + } + pub fn build(self) -> HashMap { self.header } diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index b4d2de84c0069..05c81c9043881 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -96,6 +96,24 @@ impl JsonEncoder { } } + pub fn new_with_starrocks( + schema: Schema, + col_indices: Option>, + timestamp_handling_mode: TimestampHandlingMode, + map: HashMap, + ) -> Self { + Self { + schema, + col_indices, + time_handling_mode: TimeHandlingMode::Milli, + date_handling_mode: DateHandlingMode::String, + timestamp_handling_mode, + timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix, + custom_json_type: CustomJsonType::StarRocks(map), + kafka_connect: None, + } + } + pub fn with_kafka_connect(self, kafka_connect: KafkaConnectParams) -> Self { Self { kafka_connect: Some(Arc::new(kafka_connect)), @@ -203,7 +221,7 @@ fn datum_to_json_object( json!(v) } (DataType::Decimal, ScalarRefImpl::Decimal(mut v)) => match custom_json_type { - CustomJsonType::Doris(map) => { + CustomJsonType::Doris(map) | CustomJsonType::StarRocks(map) => { if !matches!(v, Decimal::Normalized(_)) { return Err(ArrayError::internal( "doris/starrocks can't support decimal Inf, -Inf, Nan".to_string(), @@ -270,8 +288,10 @@ fn datum_to_json_object( json!(v.as_iso_8601()) } (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match custom_json_type { - CustomJsonType::Es => JsonbVal::from(jsonb_ref).take(), - CustomJsonType::Doris(_) | CustomJsonType::None => json!(jsonb_ref.to_string()), + CustomJsonType::Es | CustomJsonType::StarRocks(_) => JsonbVal::from(jsonb_ref).take(), + CustomJsonType::Doris(_) | CustomJsonType::None => { + json!(jsonb_ref.to_string()) + } }, (DataType::List(datatype), ScalarRefImpl::List(list_ref)) => { let elems = list_ref.iter(); @@ -315,6 +335,11 @@ fn datum_to_json_object( ArrayError::internal(format!("Json to string err{:?}", err)) })?) } + CustomJsonType::StarRocks(_) => { + return Err(ArrayError::internal( + "starrocks can't support struct".to_string(), + )); + } CustomJsonType::Es | CustomJsonType::None => { let mut map = Map::with_capacity(st.len()); for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug( diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index 83c28ce4b4a5a..3c76803c8e0f1 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -142,6 +142,8 @@ pub enum CustomJsonType { Doris(HashMap), // Es's json need jsonb is struct Es, + // starrocks' need jsonb is struct + StarRocks(HashMap), None, } diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index a33e8aa149f2e..e904e42784fb5 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -68,6 +68,8 @@ pub struct StarrocksCommon { /// The StarRocks table you want to sink data to. #[serde(rename = "starrocks.table")] pub table: String, + #[serde(rename = "starrocks.partial_update")] + pub partial_update: Option, } #[serde_as] @@ -125,8 +127,8 @@ impl StarrocksSink { starrocks_columns_desc: HashMap, ) -> Result<()> { let rw_fields_name = self.schema.fields(); - if rw_fields_name.len().ne(&starrocks_columns_desc.len()) { - return Err(SinkError::Starrocks("The length of the RisingWave column must be equal to the length of the starrocks column".to_string())); + if rw_fields_name.len() > starrocks_columns_desc.len() { + return Err(SinkError::Starrocks("The length of the RisingWave column must be equal or less to the length of the starrocks column".to_string())); } for i in rw_fields_name { @@ -178,7 +180,7 @@ impl StarrocksSink { Ok(starrocks_data_type.contains("datetime")) } risingwave_common::types::DataType::Timestamptz => Err(SinkError::Starrocks( - "starrocks can not support Timestamptz".to_string(), + "TIMESTAMP WITH TIMEZONE is not supported for Starrocks sink as Starrocks doesn't store time values with timezone information. Please convert to TIMESTAMP first.".to_string(), )), risingwave_common::types::DataType::Interval => Err(SinkError::Starrocks( "INTERVAL is not supported for Starrocks sink. Please convert to VARCHAR or other supported types.".to_string(), @@ -193,9 +195,7 @@ impl StarrocksSink { risingwave_common::types::DataType::Bytea => Err(SinkError::Starrocks( "BYTEA is not supported for Starrocks sink. Please convert to VARCHAR or other supported types.".to_string(), )), - risingwave_common::types::DataType::Jsonb => Err(SinkError::Starrocks( - "starrocks can not support import json".to_string(), - )), + risingwave_common::types::DataType::Jsonb => Ok(starrocks_data_type.contains("json")), risingwave_common::types::DataType::Serial => { Ok(starrocks_data_type.contains("bigint")) } @@ -332,18 +332,18 @@ impl StarrocksSinkWriter { decimal_map.insert(name.to_string(), (length, scale)); } } + let mut fields_name = schema.names_str(); + if !is_append_only { + fields_name.push(STARROCKS_DELETE_SIGN); + }; - let builder = HeaderBuilder::new() + let header = HeaderBuilder::new() .add_common_header() .set_user_password(config.common.user.clone(), config.common.password.clone()) - .add_json_format(); - let header = if !is_append_only { - let mut fields_name = schema.names_str(); - fields_name.push(STARROCKS_DELETE_SIGN); - builder.set_columns_name(fields_name).build() - } else { - builder.build() - }; + .add_json_format() + .set_partial_update(config.common.partial_update.clone()) + .set_columns_name(fields_name) + .build(); let starrocks_insert_builder = InserterInnerBuilder::new( format!("http://{}:{}", config.common.host, config.common.http_port), @@ -358,7 +358,7 @@ impl StarrocksSinkWriter { inserter_innet_builder: starrocks_insert_builder, is_append_only, client: None, - row_encoder: JsonEncoder::new_with_doris( + row_encoder: JsonEncoder::new_with_starrocks( schema, None, TimestampHandlingMode::String, diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index d79bf450581a6..eb24e6afa7e4f 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -485,6 +485,9 @@ StarrocksConfig: field_type: String comments: The StarRocks table you want to sink data to. required: true + - name: starrocks.partial_update + field_type: String + required: false - name: r#type field_type: String required: true