From e9ef4d8c5d74dfd167dbe4b1345ea591fd21e1b2 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Wed, 28 Aug 2024 17:23:55 +0800 Subject: [PATCH] check in meta --- src/connector/src/parser/unified/debezium.rs | 16 -------- src/meta/service/src/ddl_service.rs | 43 +++++++++++++++++++- 2 files changed, 42 insertions(+), 17 deletions(-) diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index 8bbbcf29c0c6..2dbe78cf32e2 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -172,22 +172,6 @@ pub fn parse_schema_change( .as_utf8() .to_string(); - // Currently only accept ADD COLUMN and DROP COLUMN, - // and we assumes each schema change message only contains one DDL statement. - let allowed_ddl = ["ADD COLUMN", "DROP COLUMN"]; - let upper_upstream_ddl = upstream_ddl.to_uppercase(); - let is_allowed = allowed_ddl - .iter() - .any(|&allowed_ddl| upper_upstream_ddl.contains(allowed_ddl)); - if !is_allowed { - Err(AccessError::Uncategorized { - message: format!( - "skip unsupported table schema change for upstream DDL: {}", - upstream_ddl - ), - })?; - } - if let Some(ScalarRefImpl::List(table_changes)) = accessor .access(&[TABLE_CHANGES], &DataType::List(Box::new(DataType::Jsonb)))? .to_datum_ref() diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 782ac8bfe1cd..1e8e1c9641d8 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use anyhow::anyhow; use rand::seq::SliceRandom; use rand::thread_rng; +use risingwave_common::catalog::ColumnCatalog; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_connector::sink::catalog::SinkId; use risingwave_meta::manager::{EventLogManagerRef, MetadataManager}; @@ -966,6 +967,46 @@ impl DdlService for DdlServiceImpl { .await?; for table in tables { + // Since we only support `ADD` and `DROP` column, we check whether the new columns and the original columns + // is a subset of the other. + let original_column_names: HashSet = HashSet::from_iter( + table + .columns + .iter() + .map(|col| ColumnCatalog::from(col.clone()).column_desc.name), + ); + let new_column_names: HashSet = HashSet::from_iter( + table_change + .columns + .iter() + .map(|col| ColumnCatalog::from(col.clone()).column_desc.name), + ); + if !(original_column_names.is_subset(&new_column_names) + || original_column_names.is_superset(&new_column_names)) + { + tracing::warn!(target: "auto_schema_change", + table_id = table.id, + cdc_table_id = table.cdc_table_id, + upstraem_ddl = table_change.upstream_ddl, + original_columns = ?original_column_names, + new_columns = ?new_column_names, + "New columns should be a subset or superset of the original columns, since only `ADD COLUMN` and `DROP COLUMN` is supported"); + return Err(Status::invalid_argument( + "New columns should be a subset or superset of the original columns", + )); + } + // skip the schema change if there is no change to original columns + if original_column_names == new_column_names { + tracing::warn!(target: "auto_schema_change", + table_id = table.id, + cdc_table_id = table.cdc_table_id, + upstraem_ddl = table_change.upstream_ddl, + original_columns = ?original_column_names, + new_columns = ?new_column_names, + "No change to columns, skipping the schema change"); + continue; + } + let latency_timer = self .meta_metrics .auto_schema_change_latency