diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index 550572a2b4bcc..db0a3e795a92f 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -140,6 +140,14 @@ impl HeaderBuilder { self } + pub fn add_partial_update(mut self, partial_update: Option) -> Self { + if let Some(_partial_update) = partial_update { + self.header + .insert("partial_update".to_string(), "true".to_string()); + } + self + } + pub fn build(self) -> HashMap { self.header } diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 34286a3b6e991..773fb27b05072 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -60,6 +60,8 @@ pub struct StarrocksCommon { pub database: String, #[serde(rename = "starrocks.table")] pub table: String, + #[serde(rename = "starrocks.partial_update")] + pub partial_update: Option, } #[serde_as] @@ -324,18 +326,18 @@ impl StarrocksSinkWriter { decimal_map.insert(name.to_string(), (length, scale)); } } + let mut fields_name = schema.names_str(); + if !is_append_only { + fields_name.push(STARROCKS_DELETE_SIGN); + }; - let builder = HeaderBuilder::new() + let header = HeaderBuilder::new() .add_common_header() .set_user_password(config.common.user.clone(), config.common.password.clone()) - .add_json_format(); - let header = if !is_append_only { - let mut fields_name = schema.names_str(); - fields_name.push(STARROCKS_DELETE_SIGN); - builder.set_columns_name(fields_name).build() - } else { - builder.build() - }; + .add_json_format() + .add_partial_update(config.common.partial_update.clone()) + .set_columns_name(fields_name) + .build(); let starrocks_insert_builder = InserterInnerBuilder::new( format!("http://{}:{}", config.common.host, config.common.http_port),