Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cdc): only allow ADD and DROP in auto schema change #18245

Merged
merged 9 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions e2e_test/source/cdc_inline/auto_schema_change_mysql.slt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ distribution key id NULL NULL
table description rw_customers NULL NULL


# add column
system ok
mysql -e "
USE mytest;
Expand All @@ -64,6 +65,57 @@ primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL

# rename column on upstream will not be replicated, since we do not support rename column
system ok
mysql -e "
USE mytest;
ALTER TABLE customers RENAME COLUMN v1 TO v11;
ALTER TABLE customers CHANGE COLUMN v2 v22 decimal(5,2);
"

sleep 3s

# table schema unchanges, since we reject rename column
query TTTT
describe rw_customers;
----
id bigint false NULL
modified timestamp without time zone false NULL
custinfo jsonb false NULL
v1 character varying false NULL
v2 double precision false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL

# revert column rename on upstream
system ok
mysql -e "
USE mytest;
ALTER TABLE customers RENAME COLUMN v11 TO v1;
ALTER TABLE customers CHANGE COLUMN v22 v2 double(5,2);
"

# drop columns
system ok
mysql -e "
USE mytest;
ALTER TABLE customers DROP COLUMN modified;
ALTER TABLE customers DROP COLUMN v1;
ALTER TABLE customers DROP COLUMN v2;
"

sleep 3s

# modified column should be dropped
query TTTT
describe rw_customers;
----
id bigint false NULL
custinfo jsonb false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL

statement ok
drop source mysql_source cascade;
2 changes: 1 addition & 1 deletion src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ pub fn parse_schema_change(
) -> AccessResult<SchemaChangeEnvelope> {
let mut schema_changes = vec![];

let upstream_ddl = accessor
let upstream_ddl: String = accessor
.access(&[UPSTREAM_DDL], &DataType::Varchar)?
.to_owned_datum()
.unwrap()
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub async fn replace_table_with_definition(
Ok(())
}

/// Used in auto schema change process
pub async fn get_new_table_definition_for_cdc_table(
session: &Arc<SessionImpl>,
table_name: ObjectName,
Expand Down
22 changes: 18 additions & 4 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,7 @@ pub(super) async fn handle_create_table_plan(
&constraints,
connect_properties.clone(),
wildcard_idx.is_some(),
None,
)
.await?;

Expand Down Expand Up @@ -1123,6 +1124,8 @@ async fn derive_schema_for_cdc_table(
constraints: &Vec<TableConstraint>,
connect_properties: WithOptionsSecResolved,
need_auto_schema_map: bool,
// original table catalog available in auto schema change process
original_catalog: Option<Arc<TableCatalog>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some comments for this parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
// read cdc table schema from external db or parsing the schema from SQL definitions
if need_auto_schema_map {
Expand Down Expand Up @@ -1154,10 +1157,20 @@ async fn derive_schema_for_cdc_table(
table.pk_names().clone(),
))
} else {
Ok((
bind_sql_columns(column_defs)?,
bind_sql_pk_names(column_defs, constraints)?,
))
let columns = bind_sql_columns(column_defs)?;
// For table created by `create table t (*)` the constraint is empty, we need to
// retrieve primary key names from original table catalog if available
let pk_names = if let Some(original_catalog) = original_catalog {
original_catalog
.pk
.iter()
.map(|x| original_catalog.columns[x.column_index].name().to_string())
.collect()
} else {
bind_sql_pk_names(column_defs, constraints)?
};

Ok((columns, pk_names))
}
}

Expand Down Expand Up @@ -1328,6 +1341,7 @@ pub async fn generate_stream_graph_for_table(
&constraints,
connect_properties.clone(),
false,
Some(original_catalog.clone()),
)
.await?;

Expand Down
43 changes: 42 additions & 1 deletion src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use anyhow::anyhow;
use rand::seq::SliceRandom;
use rand::thread_rng;
use risingwave_common::catalog::ColumnCatalog;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_connector::sink::catalog::SinkId;
use risingwave_meta::manager::{EventLogManagerRef, MetadataManager};
Expand Down Expand Up @@ -966,6 +967,46 @@ impl DdlService for DdlServiceImpl {
.await?;

for table in tables {
// Since we only support `ADD` and `DROP` column, we check whether the new columns and the original columns
// is a subset of the other.
let original_column_names: HashSet<String> = HashSet::from_iter(
table
.columns
.iter()
.map(|col| ColumnCatalog::from(col.clone()).column_desc.name),
);
let new_column_names: HashSet<String> = HashSet::from_iter(
table_change
.columns
.iter()
.map(|col| ColumnCatalog::from(col.clone()).column_desc.name),
);
if !(original_column_names.is_subset(&new_column_names)
|| original_column_names.is_superset(&new_column_names))
Comment on lines +984 to +985
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realize that comparing the name sets is not sufficient, as users may alter the data type of a column, or even set a new default value. Those functionalities are not supported by us.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. So there is one more check in below.

                // skip the schema change if there is no change to original columns
                if original_column_names == new_column_names {
                    continue;
                }

Copy link
Member

@BugenZhao BugenZhao Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if

  1. alter column type, we skip since there's no change on the names
  2. then, add column, we do schema change.

Will we also accidentally apply the changes happened in step 1?

Copy link
Contributor Author

@StrikeW StrikeW Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we also accidentally apply the changes happened in step 1?

Yes, that would be a problem. Since we apply the latest version column schema from upstream. Maybe we should also check column type besides the names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for point out the issue, I submit the patch in
f112e6d. (#18322)

{
tracing::warn!(target: "auto_schema_change",
table_id = table.id,
cdc_table_id = table.cdc_table_id,
upstraem_ddl = table_change.upstream_ddl,
original_columns = ?original_column_names,
new_columns = ?new_column_names,
"New columns should be a subset or superset of the original columns, since only `ADD COLUMN` and `DROP COLUMN` is supported");
return Err(Status::invalid_argument(
"New columns should be a subset or superset of the original columns",
));
}
// skip the schema change if there is no change to original columns
if original_column_names == new_column_names {
tracing::warn!(target: "auto_schema_change",
table_id = table.id,
cdc_table_id = table.cdc_table_id,
upstraem_ddl = table_change.upstream_ddl,
original_columns = ?original_column_names,
new_columns = ?new_column_names,
"No change to columns, skipping the schema change");
continue;
}

let latency_timer = self
.meta_metrics
.auto_schema_change_latency
Expand Down
Loading