diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java index a6be8f7fc89c..8327893f6da9 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java @@ -79,9 +79,10 @@ private static int getCorrespondingCassandraType(DataType dataType) { public static void checkSchema( List columnDescs, Map cassandraColumnDescMap) { - if (columnDescs.size() != cassandraColumnDescMap.size()) { + if (columnDescs.size() > cassandraColumnDescMap.size()) { throw Status.FAILED_PRECONDITION - .withDescription("Don't match in the number of columns in the table") + .withDescription( + "The columns of the sink must be equal to or a superset of the target table's columns.") .asRuntimeException(); } for (ColumnDesc columnDesc : columnDescs) { diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index c506f00e6d2c..42bbcf44e863 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -262,7 +262,7 @@ impl ClickHouseSink { .collect(); if rw_fields_name.len().gt(&clickhouse_columns_desc.len()) { - return Err(SinkError::ClickHouse("The nums of the RisingWave column must be greater than/equal to the length of the Clickhouse column".to_string())); + return Err(SinkError::ClickHouse("The columns of the sink must be equal to or a superset of the target table's columns.".to_string())); } for i in rw_fields_name { diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index 643ad9e3b7b3..f8c7b374d11c 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -54,6 +54,8 @@ pub struct DorisCommon { pub database: String, #[serde(rename = "doris.table")] pub table: String, + #[serde(rename = "doris.partial_update")] + pub partial_update: Option, } impl DorisCommon { @@ -125,8 +127,11 @@ impl DorisSink { .collect(); let rw_fields_name = self.schema.fields(); - if rw_fields_name.len().ne(&doris_columns_desc.len()) { - return Err(SinkError::Doris("The length of the RisingWave column must be equal to the length of the doris column".to_string())); + if rw_fields_name.len() > doris_columns_desc.len() { + return Err(SinkError::Doris( + "The columns of the sink must be equal to or a superset of the target table's columns." + .to_string(), + )); } for i in rw_fields_name { @@ -273,6 +278,7 @@ impl DorisSinkWriter { .add_common_header() .set_user_password(config.common.user.clone(), config.common.password.clone()) .add_json_format() + .set_partial_columns(config.common.partial_update.clone()) .add_read_json_by_line(); let header = if !is_append_only { header_builder.add_hidden_column().build() diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index 3173b64389f2..fb0a37572710 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -142,6 +142,7 @@ impl HeaderBuilder { self } + /// Only use in Starrocks pub fn set_partial_update(mut self, partial_update: Option) -> Self { self.header.insert( "partial_update".to_string(), @@ -150,6 +151,15 @@ impl HeaderBuilder { self } + /// Only use in Doris + pub fn set_partial_columns(mut self, partial_columns: Option) -> Self { + self.header.insert( + "partial_columns".to_string(), + partial_columns.unwrap_or_else(|| "false".to_string()), + ); + self + } + /// Only used in Starrocks Transaction API pub fn set_db(mut self, db: String) -> Self { self.header.insert("db".to_string(), db); diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index bace71cd59e2..64dadf0b8986 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -168,7 +168,7 @@ impl StarrocksSink { ) -> Result<()> { let rw_fields_name = self.schema.fields(); if rw_fields_name.len() > starrocks_columns_desc.len() { - return Err(SinkError::Starrocks("The length of the RisingWave column must be equal or less to the length of the starrocks column".to_string())); + return Err(SinkError::Starrocks("The columns of the sink must be equal to or a superset of the target table's columns.".to_string())); } for i in rw_fields_name { diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 84e39d04be04..1326ed9b5578 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -136,6 +136,9 @@ DorisConfig: - name: doris.table field_type: String required: true + - name: doris.partial_update + field_type: String + required: false - name: r#type field_type: String required: true