-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(cdc): only allow ADD
and DROP
in auto schema change
#18245
Changes from all commits
c950cb8
599d50e
50ea3e0
0f956c8
c5d605d
ae91540
93472b3
e9ef4d8
816f07a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,12 +12,13 @@ | |
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
use std::collections::HashMap; | ||
use std::collections::{HashMap, HashSet}; | ||
use std::sync::Arc; | ||
|
||
use anyhow::anyhow; | ||
use rand::seq::SliceRandom; | ||
use rand::thread_rng; | ||
use risingwave_common::catalog::ColumnCatalog; | ||
use risingwave_common::util::column_index_mapping::ColIndexMapping; | ||
use risingwave_connector::sink::catalog::SinkId; | ||
use risingwave_meta::manager::{EventLogManagerRef, MetadataManager}; | ||
|
@@ -966,6 +967,46 @@ impl DdlService for DdlServiceImpl { | |
.await?; | ||
|
||
for table in tables { | ||
// Since we only support `ADD` and `DROP` column, we check whether the new columns and the original columns | ||
// is a subset of the other. | ||
let original_column_names: HashSet<String> = HashSet::from_iter( | ||
table | ||
.columns | ||
.iter() | ||
.map(|col| ColumnCatalog::from(col.clone()).column_desc.name), | ||
); | ||
let new_column_names: HashSet<String> = HashSet::from_iter( | ||
table_change | ||
.columns | ||
.iter() | ||
.map(|col| ColumnCatalog::from(col.clone()).column_desc.name), | ||
); | ||
if !(original_column_names.is_subset(&new_column_names) | ||
|| original_column_names.is_superset(&new_column_names)) | ||
Comment on lines
+984
to
+985
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just realize that comparing the name sets is not sufficient, as users may alter the data type of a column, or even set a new default value. Those functionalities are not supported by us. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. So there is one more check in below.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if
Will we also accidentally apply the changes happened in step 1? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, that would be a problem. Since we apply the latest version column schema from upstream. Maybe we should also check column type besides the names. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
{ | ||
tracing::warn!(target: "auto_schema_change", | ||
table_id = table.id, | ||
cdc_table_id = table.cdc_table_id, | ||
upstraem_ddl = table_change.upstream_ddl, | ||
original_columns = ?original_column_names, | ||
new_columns = ?new_column_names, | ||
"New columns should be a subset or superset of the original columns, since only `ADD COLUMN` and `DROP COLUMN` is supported"); | ||
return Err(Status::invalid_argument( | ||
"New columns should be a subset or superset of the original columns", | ||
)); | ||
} | ||
// skip the schema change if there is no change to original columns | ||
if original_column_names == new_column_names { | ||
tracing::warn!(target: "auto_schema_change", | ||
table_id = table.id, | ||
cdc_table_id = table.cdc_table_id, | ||
upstraem_ddl = table_change.upstream_ddl, | ||
original_columns = ?original_column_names, | ||
new_columns = ?new_column_names, | ||
"No change to columns, skipping the schema change"); | ||
continue; | ||
} | ||
|
||
let latency_timer = self | ||
.meta_metrics | ||
.auto_schema_change_latency | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some comments for this parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed