Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cdc): only allow ADD and DROP in auto schema change #18245

Merged
merged 9 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions e2e_test/source/cdc_inline/auto_schema_change_mysql.slt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ distribution key id NULL NULL
table description rw_customers NULL NULL


# add column
system ok
mysql -e "
USE mytest;
Expand All @@ -64,6 +65,57 @@ primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL

# rename column on upstream will not be replicated, since we do not support rename column
system ok
mysql -e "
USE mytest;
ALTER TABLE customers RENAME COLUMN v1 TO v11;
ALTER TABLE customers CHANGE COLUMN v2 v22 decimal(5,2);
"

sleep 3s

# table schema unchanges, since we reject rename column
query TTTT
describe rw_customers;
----
id bigint false NULL
modified timestamp without time zone false NULL
custinfo jsonb false NULL
v1 character varying false NULL
v2 double precision false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL

# revert column rename on upstream
system ok
mysql -e "
USE mytest;
ALTER TABLE customers RENAME COLUMN v11 TO v1;
ALTER TABLE customers CHANGE COLUMN v22 v2 double(5,2);
"

# drop columns
system ok
mysql -e "
USE mytest;
ALTER TABLE customers DROP COLUMN modified;
ALTER TABLE customers DROP COLUMN v1;
ALTER TABLE customers DROP COLUMN v2;
"

sleep 3s

# modified column should be dropped
query TTTT
describe rw_customers;
----
id bigint false NULL
custinfo jsonb false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL

statement ok
drop source mysql_source cascade;
18 changes: 17 additions & 1 deletion src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,29 @@ pub fn parse_schema_change(
) -> AccessResult<SchemaChangeEnvelope> {
let mut schema_changes = vec![];

let upstream_ddl = accessor
let upstream_ddl: String = accessor
.access(&[UPSTREAM_DDL], &DataType::Varchar)?
.to_owned_datum()
.unwrap()
.as_utf8()
.to_string();

// Currently only accept ADD COLUMN and DROP COLUMN,
// and we assumes each schema change message only contains one DDL statement.
let allowed_ddl = ["ADD COLUMN", "DROP COLUMN"];
let upper_upstream_ddl = upstream_ddl.to_uppercase();
let is_allowed = allowed_ddl
.iter()
.any(|&allowed_ddl| upper_upstream_ddl.contains(allowed_ddl));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks fragile. Shall we instead compare the previous and current schema to check whether one is the subset of the other?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compare the previous and current schema to check whether one is the subset of the other?

Not quite get it. Here the goal is to skip upstream messages generated by unsupported ALTER statements.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we only support ADD and DROP, in which cases we must have...

the previous and current schema to check whether one is the subset of the other

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds ok to me, based on we only support ADD and DROP.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks fragile.

+1.
What if there are both ALTER and ADD/DROP COLUMN statements in upstream_ddl? From debezium's doc: "The ddl field can contain multiple DDL statements."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ddl field can contain multiple DDL statements.

I am aware of this statement, but I cannot construct this case even with transaction. And it seems mysql doesn't support DDL transactions. So actually the parser assumes each message only contains a single DDL.

if !is_allowed {
Err(AccessError::Uncategorized {
message: format!(
"skip unsupported table schema change for upstream DDL: {}",
upstream_ddl
),
})?;
}

if let Some(ScalarRefImpl::List(table_changes)) = accessor
.access(&[TABLE_CHANGES], &DataType::List(Box::new(DataType::Jsonb)))?
.to_datum_ref()
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub async fn replace_table_with_definition(
Ok(())
}

/// Used in auto schema change process
pub async fn get_new_table_definition_for_cdc_table(
session: &Arc<SessionImpl>,
table_name: ObjectName,
Expand Down
21 changes: 17 additions & 4 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,7 @@ pub(super) async fn handle_create_table_plan(
&constraints,
connect_properties.clone(),
wildcard_idx.is_some(),
None,
)
.await?;

Expand Down Expand Up @@ -1123,6 +1124,7 @@ async fn derive_schema_for_cdc_table(
constraints: &Vec<TableConstraint>,
connect_properties: WithOptionsSecResolved,
need_auto_schema_map: bool,
original_catalog: Option<Arc<TableCatalog>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some comments for this parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
// read cdc table schema from external db or parsing the schema from SQL definitions
if need_auto_schema_map {
Expand Down Expand Up @@ -1154,10 +1156,20 @@ async fn derive_schema_for_cdc_table(
table.pk_names().clone(),
))
} else {
Ok((
bind_sql_columns(column_defs)?,
bind_sql_pk_names(column_defs, constraints)?,
))
let columns = bind_sql_columns(column_defs)?;
// For table created by `create table t (*)` the constraint is empty, we need to
// retrieve primary key names from original table catalog if available
let pk_names = if let Some(original_catalog) = original_catalog {
original_catalog
.pk
.iter()
.map(|x| original_catalog.columns[x.column_index].name().to_string())
.collect()
} else {
bind_sql_pk_names(column_defs, constraints)?
};

Ok((columns, pk_names))
}
}

Expand Down Expand Up @@ -1328,6 +1340,7 @@ pub async fn generate_stream_graph_for_table(
&constraints,
connect_properties.clone(),
false,
Some(original_catalog.clone()),
)
.await?;

Expand Down
Loading