diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index 550572a2b4bcc..147aeb6610526 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -140,6 +140,14 @@ impl HeaderBuilder { self } + pub fn set_partial_update(mut self, partial_update: Option) -> Self { + self.header.insert( + "partial_update".to_string(), + partial_update.unwrap_or_else(|| "false".to_string()), + ); + self + } + pub fn build(self) -> HashMap { self.header } diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 34286a3b6e991..b012fc6c8b798 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -60,6 +60,8 @@ pub struct StarrocksCommon { pub database: String, #[serde(rename = "starrocks.table")] pub table: String, + #[serde(rename = "starrocks.partial_update")] + pub partial_update: Option, } #[serde_as] @@ -117,8 +119,8 @@ impl StarrocksSink { starrocks_columns_desc: HashMap, ) -> Result<()> { let rw_fields_name = self.schema.fields(); - if rw_fields_name.len().ne(&starrocks_columns_desc.len()) { - return Err(SinkError::Starrocks("The length of the RisingWave column must be equal to the length of the starrocks column".to_string())); + if rw_fields_name.len().gt(&starrocks_columns_desc.len()) { + return Err(SinkError::Starrocks("The length of the RisingWave column must be equal or less to the length of the starrocks column".to_string())); } for i in rw_fields_name { @@ -324,18 +326,18 @@ impl StarrocksSinkWriter { decimal_map.insert(name.to_string(), (length, scale)); } } + let mut fields_name = schema.names_str(); + if !is_append_only { + fields_name.push(STARROCKS_DELETE_SIGN); + }; - let builder = HeaderBuilder::new() + let header = HeaderBuilder::new() .add_common_header() .set_user_password(config.common.user.clone(), config.common.password.clone()) - .add_json_format(); - let header = if !is_append_only { - let mut fields_name = schema.names_str(); - fields_name.push(STARROCKS_DELETE_SIGN); - builder.set_columns_name(fields_name).build() - } else { - builder.build() - }; + .add_json_format() + .set_partial_update(config.common.partial_update.clone()) + .set_columns_name(fields_name) + .build(); let starrocks_insert_builder = InserterInnerBuilder::new( format!("http://{}:{}", config.common.host, config.common.http_port),