Skip to content

Commit

Permalink
check in meta
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Aug 28, 2024
1 parent 93472b3 commit e9ef4d8
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 17 deletions.
16 changes: 0 additions & 16 deletions src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,22 +172,6 @@ pub fn parse_schema_change(
.as_utf8()
.to_string();

// Currently only accept ADD COLUMN and DROP COLUMN,
// and we assumes each schema change message only contains one DDL statement.
let allowed_ddl = ["ADD COLUMN", "DROP COLUMN"];
let upper_upstream_ddl = upstream_ddl.to_uppercase();
let is_allowed = allowed_ddl
.iter()
.any(|&allowed_ddl| upper_upstream_ddl.contains(allowed_ddl));
if !is_allowed {
Err(AccessError::Uncategorized {
message: format!(
"skip unsupported table schema change for upstream DDL: {}",
upstream_ddl
),
})?;
}

if let Some(ScalarRefImpl::List(table_changes)) = accessor
.access(&[TABLE_CHANGES], &DataType::List(Box::new(DataType::Jsonb)))?
.to_datum_ref()
Expand Down
43 changes: 42 additions & 1 deletion src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use anyhow::anyhow;
use rand::seq::SliceRandom;
use rand::thread_rng;
use risingwave_common::catalog::ColumnCatalog;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_connector::sink::catalog::SinkId;
use risingwave_meta::manager::{EventLogManagerRef, MetadataManager};
Expand Down Expand Up @@ -966,6 +967,46 @@ impl DdlService for DdlServiceImpl {
.await?;

for table in tables {
// Since we only support `ADD` and `DROP` column, we check whether the new columns and the original columns
// is a subset of the other.
let original_column_names: HashSet<String> = HashSet::from_iter(
table
.columns
.iter()
.map(|col| ColumnCatalog::from(col.clone()).column_desc.name),
);
let new_column_names: HashSet<String> = HashSet::from_iter(
table_change
.columns
.iter()
.map(|col| ColumnCatalog::from(col.clone()).column_desc.name),
);
if !(original_column_names.is_subset(&new_column_names)
|| original_column_names.is_superset(&new_column_names))
{
tracing::warn!(target: "auto_schema_change",
table_id = table.id,
cdc_table_id = table.cdc_table_id,
upstraem_ddl = table_change.upstream_ddl,
original_columns = ?original_column_names,
new_columns = ?new_column_names,
"New columns should be a subset or superset of the original columns, since only `ADD COLUMN` and `DROP COLUMN` is supported");
return Err(Status::invalid_argument(
"New columns should be a subset or superset of the original columns",
));
}
// skip the schema change if there is no change to original columns
if original_column_names == new_column_names {
tracing::warn!(target: "auto_schema_change",
table_id = table.id,
cdc_table_id = table.cdc_table_id,
upstraem_ddl = table_change.upstream_ddl,
original_columns = ?original_column_names,
new_columns = ?new_column_names,
"No change to columns, skipping the schema change");
continue;
}

let latency_timer = self
.meta_metrics
.auto_schema_change_latency
Expand Down

0 comments on commit e9ef4d8

Please sign in to comment.