Skip to content

Commit

Permalink
fix(cdc): only allow ADD and DROP in auto schema change (#18245)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored and StrikeW committed Aug 28, 2024
1 parent 612955d commit d415ede
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 6 deletions.
52 changes: 52 additions & 0 deletions e2e_test/source/cdc_inline/auto_schema_change_mysql.slt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ distribution key id NULL NULL
table description rw_customers NULL NULL


# add column
system ok
mysql -e "
USE mytest;
Expand All @@ -64,6 +65,57 @@ primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL

# rename column on upstream will not be replicated, since we do not support rename column
system ok
mysql -e "
USE mytest;
ALTER TABLE customers RENAME COLUMN v1 TO v11;
ALTER TABLE customers CHANGE COLUMN v2 v22 decimal(5,2);
"

sleep 3s

# table schema unchanges, since we reject rename column
query TTTT
describe rw_customers;
----
id bigint false NULL
modified timestamp without time zone false NULL
custinfo jsonb false NULL
v1 character varying false NULL
v2 double precision false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL

# revert column rename on upstream
system ok
mysql -e "
USE mytest;
ALTER TABLE customers RENAME COLUMN v11 TO v1;
ALTER TABLE customers CHANGE COLUMN v22 v2 double(5,2);
"

# drop columns
system ok
mysql -e "
USE mytest;
ALTER TABLE customers DROP COLUMN modified;
ALTER TABLE customers DROP COLUMN v1;
ALTER TABLE customers DROP COLUMN v2;
"

sleep 3s

# modified column should be dropped
query TTTT
describe rw_customers;
----
id bigint false NULL
custinfo jsonb false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL

statement ok
drop source mysql_source cascade;
2 changes: 1 addition & 1 deletion src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ pub fn parse_schema_change(
) -> AccessResult<SchemaChangeEnvelope> {
let mut schema_changes = vec![];

let upstream_ddl = accessor
let upstream_ddl: String = accessor
.access(&[UPSTREAM_DDL], &DataType::Varchar)?
.to_owned_datum()
.unwrap()
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub async fn replace_table_with_definition(
Ok(())
}

/// Used in auto schema change process
pub async fn get_new_table_definition_for_cdc_table(
session: &Arc<SessionImpl>,
table_name: ObjectName,
Expand Down
22 changes: 18 additions & 4 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,7 @@ pub(super) async fn handle_create_table_plan(
&constraints,
connect_properties.clone(),
wildcard_idx.is_some(),
None,
)
.await?;

Expand Down Expand Up @@ -1123,6 +1124,8 @@ async fn derive_schema_for_cdc_table(
constraints: &Vec<TableConstraint>,
connect_properties: WithOptionsSecResolved,
need_auto_schema_map: bool,
// original table catalog available in auto schema change process
original_catalog: Option<Arc<TableCatalog>>,
) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
// read cdc table schema from external db or parsing the schema from SQL definitions
if need_auto_schema_map {
Expand Down Expand Up @@ -1154,10 +1157,20 @@ async fn derive_schema_for_cdc_table(
table.pk_names().clone(),
))
} else {
Ok((
bind_sql_columns(column_defs)?,
bind_sql_pk_names(column_defs, constraints)?,
))
let columns = bind_sql_columns(column_defs)?;
// For table created by `create table t (*)` the constraint is empty, we need to
// retrieve primary key names from original table catalog if available
let pk_names = if let Some(original_catalog) = original_catalog {
original_catalog
.pk
.iter()
.map(|x| original_catalog.columns[x.column_index].name().to_string())
.collect()
} else {
bind_sql_pk_names(column_defs, constraints)?
};

Ok((columns, pk_names))
}
}

Expand Down Expand Up @@ -1328,6 +1341,7 @@ pub async fn generate_stream_graph_for_table(
&constraints,
connect_properties.clone(),
false,
Some(original_catalog.clone()),
)
.await?;

Expand Down
43 changes: 42 additions & 1 deletion src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use anyhow::anyhow;
use rand::seq::SliceRandom;
use rand::thread_rng;
use risingwave_common::catalog::ColumnCatalog;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_connector::sink::catalog::SinkId;
use risingwave_meta::manager::{EventLogManagerRef, MetadataManager};
Expand Down Expand Up @@ -966,6 +967,46 @@ impl DdlService for DdlServiceImpl {
.await?;

for table in tables {
// Since we only support `ADD` and `DROP` column, we check whether the new columns and the original columns
// is a subset of the other.
let original_column_names: HashSet<String> = HashSet::from_iter(
table
.columns
.iter()
.map(|col| ColumnCatalog::from(col.clone()).column_desc.name),
);
let new_column_names: HashSet<String> = HashSet::from_iter(
table_change
.columns
.iter()
.map(|col| ColumnCatalog::from(col.clone()).column_desc.name),
);
if !(original_column_names.is_subset(&new_column_names)
|| original_column_names.is_superset(&new_column_names))
{
tracing::warn!(target: "auto_schema_change",
table_id = table.id,
cdc_table_id = table.cdc_table_id,
upstraem_ddl = table_change.upstream_ddl,
original_columns = ?original_column_names,
new_columns = ?new_column_names,
"New columns should be a subset or superset of the original columns, since only `ADD COLUMN` and `DROP COLUMN` is supported");
return Err(Status::invalid_argument(
"New columns should be a subset or superset of the original columns",
));
}
// skip the schema change if there is no change to original columns
if original_column_names == new_column_names {
tracing::warn!(target: "auto_schema_change",
table_id = table.id,
cdc_table_id = table.cdc_table_id,
upstraem_ddl = table_change.upstream_ddl,
original_columns = ?original_column_names,
new_columns = ?new_column_names,
"No change to columns, skipping the schema change");
continue;
}

let latency_timer = self
.meta_metrics
.auto_schema_change_latency
Expand Down

0 comments on commit d415ede

Please sign in to comment.