-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(cdc): only allow ADD
and DROP
in auto schema change
#18245
Changes from 7 commits
c950cb8
599d50e
50ea3e0
0f956c8
c5d605d
ae91540
93472b3
e9ef4d8
816f07a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -165,13 +165,29 @@ pub fn parse_schema_change( | |
) -> AccessResult<SchemaChangeEnvelope> { | ||
let mut schema_changes = vec![]; | ||
|
||
let upstream_ddl = accessor | ||
let upstream_ddl: String = accessor | ||
.access(&[UPSTREAM_DDL], &DataType::Varchar)? | ||
.to_owned_datum() | ||
.unwrap() | ||
.as_utf8() | ||
.to_string(); | ||
|
||
// Currently only accept ADD COLUMN and DROP COLUMN, | ||
// and we assumes each schema change message only contains one DDL statement. | ||
let allowed_ddl = ["ADD COLUMN", "DROP COLUMN"]; | ||
let upper_upstream_ddl = upstream_ddl.to_uppercase(); | ||
let is_allowed = allowed_ddl | ||
.iter() | ||
.any(|&allowed_ddl| upper_upstream_ddl.contains(allowed_ddl)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
+1. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I am aware of this statement, but I cannot construct this case even with transaction. And it seems mysql doesn't support DDL transactions. So actually the parser assumes each message only contains a single DDL. |
||
if !is_allowed { | ||
Err(AccessError::Uncategorized { | ||
message: format!( | ||
"skip unsupported table schema change for upstream DDL: {}", | ||
upstream_ddl | ||
), | ||
})?; | ||
} | ||
|
||
if let Some(ScalarRefImpl::List(table_changes)) = accessor | ||
.access(&[TABLE_CHANGES], &DataType::List(Box::new(DataType::Jsonb)))? | ||
.to_datum_ref() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1015,6 +1015,7 @@ pub(super) async fn handle_create_table_plan( | |
&constraints, | ||
connect_properties.clone(), | ||
wildcard_idx.is_some(), | ||
None, | ||
) | ||
.await?; | ||
|
||
|
@@ -1123,6 +1124,7 @@ async fn derive_schema_for_cdc_table( | |
constraints: &Vec<TableConstraint>, | ||
connect_properties: WithOptionsSecResolved, | ||
need_auto_schema_map: bool, | ||
original_catalog: Option<Arc<TableCatalog>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add some comments for this parameter. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
) -> Result<(Vec<ColumnCatalog>, Vec<String>)> { | ||
// read cdc table schema from external db or parsing the schema from SQL definitions | ||
if need_auto_schema_map { | ||
|
@@ -1154,10 +1156,20 @@ async fn derive_schema_for_cdc_table( | |
table.pk_names().clone(), | ||
)) | ||
} else { | ||
Ok(( | ||
bind_sql_columns(column_defs)?, | ||
bind_sql_pk_names(column_defs, constraints)?, | ||
)) | ||
let columns = bind_sql_columns(column_defs)?; | ||
// For table created by `create table t (*)` the constraint is empty, we need to | ||
// retrieve primary key names from original table catalog if available | ||
let pk_names = if let Some(original_catalog) = original_catalog { | ||
original_catalog | ||
.pk | ||
.iter() | ||
.map(|x| original_catalog.columns[x.column_index].name().to_string()) | ||
.collect() | ||
} else { | ||
bind_sql_pk_names(column_defs, constraints)? | ||
}; | ||
|
||
Ok((columns, pk_names)) | ||
} | ||
} | ||
|
||
|
@@ -1328,6 +1340,7 @@ pub async fn generate_stream_graph_for_table( | |
&constraints, | ||
connect_properties.clone(), | ||
false, | ||
Some(original_catalog.clone()), | ||
) | ||
.await?; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks fragile. Shall we instead compare the previous and current schema to check whether one is the subset of the other?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite get it. Here the goal is to skip upstream messages generated by unsupported
ALTER
statements.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we only support
ADD
andDROP
, in which cases we must have...There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds ok to me, based on we only support
ADD
andDROP
.