From d415edeea9142832c9d0afaddcd7b4904b55df29 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Wed, 28 Aug 2024 18:01:26 +0800 Subject: [PATCH] fix(cdc): only allow `ADD` and `DROP` in auto schema change (#18245) --- .../cdc_inline/auto_schema_change_mysql.slt | 52 +++++++++++++++++++ src/connector/src/parser/unified/debezium.rs | 2 +- .../src/handler/alter_table_column.rs | 1 + src/frontend/src/handler/create_table.rs | 22 ++++++-- src/meta/service/src/ddl_service.rs | 43 ++++++++++++++- 5 files changed, 114 insertions(+), 6 deletions(-) diff --git a/e2e_test/source/cdc_inline/auto_schema_change_mysql.slt b/e2e_test/source/cdc_inline/auto_schema_change_mysql.slt index 31bb9d1b0421b..3c386a2718479 100644 --- a/e2e_test/source/cdc_inline/auto_schema_change_mysql.slt +++ b/e2e_test/source/cdc_inline/auto_schema_change_mysql.slt @@ -42,6 +42,7 @@ distribution key id NULL NULL table description rw_customers NULL NULL +# add column system ok mysql -e " USE mytest; @@ -64,6 +65,57 @@ primary key id NULL NULL distribution key id NULL NULL table description rw_customers NULL NULL +# rename column on upstream will not be replicated, since we do not support rename column +system ok +mysql -e " + USE mytest; + ALTER TABLE customers RENAME COLUMN v1 TO v11; + ALTER TABLE customers CHANGE COLUMN v2 v22 decimal(5,2); +" + +sleep 3s + +# table schema unchanges, since we reject rename column +query TTTT +describe rw_customers; +---- +id bigint false NULL +modified timestamp without time zone false NULL +custinfo jsonb false NULL +v1 character varying false NULL +v2 double precision false NULL +primary key id NULL NULL +distribution key id NULL NULL +table description rw_customers NULL NULL + +# revert column rename on upstream +system ok +mysql -e " + USE mytest; + ALTER TABLE customers RENAME COLUMN v11 TO v1; + ALTER TABLE customers CHANGE COLUMN v22 v2 double(5,2); +" + +# drop columns +system ok +mysql -e " + USE mytest; + ALTER TABLE customers DROP COLUMN modified; + ALTER TABLE customers DROP COLUMN v1; + ALTER TABLE customers DROP COLUMN v2; +" + +sleep 3s + +# modified column should be dropped +query TTTT +describe rw_customers; +---- +id bigint false NULL +custinfo jsonb false NULL +primary key id NULL NULL +distribution key id NULL NULL +table description rw_customers NULL NULL statement ok drop source mysql_source cascade; diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index 9af7ef359e250..2dbe78cf32e25 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -165,7 +165,7 @@ pub fn parse_schema_change( ) -> AccessResult { let mut schema_changes = vec![]; - let upstream_ddl = accessor + let upstream_ddl: String = accessor .access(&[UPSTREAM_DDL], &DataType::Varchar)? .to_owned_datum() .unwrap() diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 1e11e390edfd8..f00ff35992b43 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -69,6 +69,7 @@ pub async fn replace_table_with_definition( Ok(()) } +/// Used in auto schema change process pub async fn get_new_table_definition_for_cdc_table( session: &Arc, table_name: ObjectName, diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 380212e1a92dd..825ee595b4b29 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -1015,6 +1015,7 @@ pub(super) async fn handle_create_table_plan( &constraints, connect_properties.clone(), wildcard_idx.is_some(), + None, ) .await?; @@ -1123,6 +1124,8 @@ async fn derive_schema_for_cdc_table( constraints: &Vec, connect_properties: WithOptionsSecResolved, need_auto_schema_map: bool, + // original table catalog available in auto schema change process + original_catalog: Option>, ) -> Result<(Vec, Vec)> { // read cdc table schema from external db or parsing the schema from SQL definitions if need_auto_schema_map { @@ -1154,10 +1157,20 @@ async fn derive_schema_for_cdc_table( table.pk_names().clone(), )) } else { - Ok(( - bind_sql_columns(column_defs)?, - bind_sql_pk_names(column_defs, constraints)?, - )) + let columns = bind_sql_columns(column_defs)?; + // For table created by `create table t (*)` the constraint is empty, we need to + // retrieve primary key names from original table catalog if available + let pk_names = if let Some(original_catalog) = original_catalog { + original_catalog + .pk + .iter() + .map(|x| original_catalog.columns[x.column_index].name().to_string()) + .collect() + } else { + bind_sql_pk_names(column_defs, constraints)? + }; + + Ok((columns, pk_names)) } } @@ -1328,6 +1341,7 @@ pub async fn generate_stream_graph_for_table( &constraints, connect_properties.clone(), false, + Some(original_catalog.clone()), ) .await?; diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 782ac8bfe1cd9..1e8e1c9641d8a 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use anyhow::anyhow; use rand::seq::SliceRandom; use rand::thread_rng; +use risingwave_common::catalog::ColumnCatalog; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_connector::sink::catalog::SinkId; use risingwave_meta::manager::{EventLogManagerRef, MetadataManager}; @@ -966,6 +967,46 @@ impl DdlService for DdlServiceImpl { .await?; for table in tables { + // Since we only support `ADD` and `DROP` column, we check whether the new columns and the original columns + // is a subset of the other. + let original_column_names: HashSet = HashSet::from_iter( + table + .columns + .iter() + .map(|col| ColumnCatalog::from(col.clone()).column_desc.name), + ); + let new_column_names: HashSet = HashSet::from_iter( + table_change + .columns + .iter() + .map(|col| ColumnCatalog::from(col.clone()).column_desc.name), + ); + if !(original_column_names.is_subset(&new_column_names) + || original_column_names.is_superset(&new_column_names)) + { + tracing::warn!(target: "auto_schema_change", + table_id = table.id, + cdc_table_id = table.cdc_table_id, + upstraem_ddl = table_change.upstream_ddl, + original_columns = ?original_column_names, + new_columns = ?new_column_names, + "New columns should be a subset or superset of the original columns, since only `ADD COLUMN` and `DROP COLUMN` is supported"); + return Err(Status::invalid_argument( + "New columns should be a subset or superset of the original columns", + )); + } + // skip the schema change if there is no change to original columns + if original_column_names == new_column_names { + tracing::warn!(target: "auto_schema_change", + table_id = table.id, + cdc_table_id = table.cdc_table_id, + upstraem_ddl = table_change.upstream_ddl, + original_columns = ?original_column_names, + new_columns = ?new_column_names, + "No change to columns, skipping the schema change"); + continue; + } + let latency_timer = self .meta_metrics .auto_schema_change_latency