diff --git a/e2e_test/batch/basic/logical_view.slt.part b/e2e_test/batch/basic/logical_view.slt.part index 4adc05b38606..9c6b4c0f7360 100644 --- a/e2e_test/batch/basic/logical_view.slt.part +++ b/e2e_test/batch/basic/logical_view.slt.part @@ -40,7 +40,7 @@ SELECT * FROM v3; statement error DROP TABLE t; -statement error other relation\(s\) depend on it +statement error Permission denied DROP VIEW v2; statement ok diff --git a/e2e_test/ddl/alter_rename.slt b/e2e_test/ddl/alter_rename.slt index 11b7007f72bf..5171f7f3cdad 100644 --- a/e2e_test/ddl/alter_rename.slt +++ b/e2e_test/ddl/alter_rename.slt @@ -229,7 +229,7 @@ DROP SCHEMA schema1; statement ok DROP SINK sink1; -statement error other relation\(s\) depend on it +statement error Permission denied DROP VIEW v5; statement ok diff --git a/e2e_test/ddl/dependency_check.slt b/e2e_test/ddl/dependency_check.slt index 88a89975d07e..413579bb13e7 100644 --- a/e2e_test/ddl/dependency_check.slt +++ b/e2e_test/ddl/dependency_check.slt @@ -16,7 +16,7 @@ create index i_b1 on b(b1); statement ok create materialized view mv1 as select * from a join b on a.a1 = b.b1; -statement error other relation\(s\) depend on it +statement error Permission denied drop index i_a1; statement ok @@ -25,7 +25,7 @@ drop materialized view mv1; statement ok create materialized view mv2 as with ctx as (select a1 from a) select b1 from b; -statement error other relation\(s\) depend on it +statement error Permission denied drop table a; statement ok @@ -48,13 +48,13 @@ create view v2 as select * from v; statement ok create materialized view mv3 as select * from v2; -statement error other relation\(s\) depend on it +statement error Permission denied drop source src; -statement error other relation\(s\) depend on it +statement error Permission denied drop view v; -statement error other relation\(s\) depend on it +statement error Permission denied drop view v2; statement ok diff --git a/e2e_test/source/basic/ddl.slt b/e2e_test/source/basic/ddl.slt index f959ce7fa49a..6e640e047d4c 100644 --- a/e2e_test/source/basic/ddl.slt +++ b/e2e_test/source/basic/ddl.slt @@ -134,7 +134,7 @@ create source s ( statement ok create materialized view mv_1 as select * from s -statement error other relation\(s\) depend on it +statement error Permission denied drop source s statement ok diff --git a/e2e_test/source/basic/old_row_format_syntax/ddl.slt b/e2e_test/source/basic/old_row_format_syntax/ddl.slt index 2ec7239e61d2..d5c41d4ded87 100644 --- a/e2e_test/source/basic/old_row_format_syntax/ddl.slt +++ b/e2e_test/source/basic/old_row_format_syntax/ddl.slt @@ -90,7 +90,7 @@ create source s ( statement ok create materialized view mv_1 as select * from s -statement error other relation\(s\) depend on it +statement error Permission denied drop source s statement ok diff --git a/src/meta/model_v2/src/table.rs b/src/meta/model_v2/src/table.rs index 446c928718cf..f0d83cb2d847 100644 --- a/src/meta/model_v2/src/table.rs +++ b/src/meta/model_v2/src/table.rs @@ -24,7 +24,7 @@ use crate::{ SourceId, TableId, TableVersion, }; -#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[derive(Clone, Debug, PartialEq, Copy, Eq, EnumIter, DeriveActiveEnum)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum TableType { #[sea_orm(string_value = "TABLE")] diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index a89fe23b43e7..8d34e4076c1e 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -1131,10 +1131,6 @@ impl CatalogController { if obj.schema_id == Some(new_schema) { return Ok(IGNORED_NOTIFICATION_VERSION); } - - let mut obj = obj.into_active_model(); - obj.schema_id = Set(Some(new_schema)); - let obj = obj.update(&txn).await?; let database_id = obj.database_id.unwrap(); let mut relations = vec![]; @@ -1145,9 +1141,16 @@ impl CatalogController { .await? .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?; check_relation_name_duplicate(&table.name, database_id, new_schema, &txn).await?; + let (associated_src_id, table_type) = + (table.optional_associated_source_id, table.table_type); + + let mut obj = obj.into_active_model(); + obj.schema_id = Set(Some(new_schema)); + let obj = obj.update(&txn).await?; + relations.push(PbRelationInfo::Table(ObjectModel(table, obj).into())); // associated source. - if let Some(associated_source_id) = table.optional_associated_source_id { + if let Some(associated_source_id) = associated_src_id { let src_obj = object::ActiveModel { oid: Set(associated_source_id as _), schema_id: Set(Some(new_schema)), @@ -1168,7 +1171,7 @@ impl CatalogController { let (index_ids, (index_names, mut table_ids)): ( Vec, (Vec, Vec), - ) = if table.table_type == TableType::Table { + ) = if table_type == TableType::Table { Index::find() .select_only() .columns([ @@ -1186,7 +1189,6 @@ impl CatalogController { } else { (vec![], (vec![], vec![])) }; - relations.push(PbRelationInfo::Table(ObjectModel(table, obj).into())); // internal tables. let internal_tables: Vec = Table::find() @@ -1220,18 +1222,6 @@ impl CatalogController { .await?; } - if !index_ids.is_empty() { - let index_objs = Index::find() - .find_also_related(Object) - .filter(index::Column::IndexId.is_in(index_ids)) - .all(&txn) - .await?; - for (index, index_obj) in index_objs { - relations.push(PbRelationInfo::Index( - ObjectModel(index, index_obj.unwrap()).into(), - )); - } - } if !table_ids.is_empty() { let table_objs = Table::find() .find_also_related(Object) @@ -1244,6 +1234,18 @@ impl CatalogController { )); } } + if !index_ids.is_empty() { + let index_objs = Index::find() + .find_also_related(Object) + .filter(index::Column::IndexId.is_in(index_ids)) + .all(&txn) + .await?; + for (index, index_obj) in index_objs { + relations.push(PbRelationInfo::Index( + ObjectModel(index, index_obj.unwrap()).into(), + )); + } + } } ObjectType::Source => { let source = Source::find_by_id(object_id) @@ -1251,6 +1253,10 @@ impl CatalogController { .await? .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?; check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?; + + let mut obj = obj.into_active_model(); + obj.schema_id = Set(Some(new_schema)); + let obj = obj.update(&txn).await?; relations.push(PbRelationInfo::Source(ObjectModel(source, obj).into())); } ObjectType::Sink => { @@ -1259,6 +1265,10 @@ impl CatalogController { .await? .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?; check_relation_name_duplicate(&sink.name, database_id, new_schema, &txn).await?; + + let mut obj = obj.into_active_model(); + obj.schema_id = Set(Some(new_schema)); + let obj = obj.update(&txn).await?; relations.push(PbRelationInfo::Sink(ObjectModel(sink, obj).into())); // internal tables. @@ -1298,6 +1308,10 @@ impl CatalogController { .await? .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?; check_relation_name_duplicate(&view.name, database_id, new_schema, &txn).await?; + + let mut obj = obj.into_active_model(); + obj.schema_id = Set(Some(new_schema)); + let obj = obj.update(&txn).await?; relations.push(PbRelationInfo::View(ObjectModel(view, obj).into())); } ObjectType::Function => { @@ -1305,9 +1319,19 @@ impl CatalogController { .one(&txn) .await? .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?; - let pb_function: PbFunction = ObjectModel(function, obj).into(); + + let mut pb_function: PbFunction = ObjectModel(function, obj).into(); + pb_function.schema_id = new_schema as _; check_function_signature_duplicate(&pb_function, &txn).await?; + object::ActiveModel { + oid: Set(object_id), + schema_id: Set(Some(new_schema)), + ..Default::default() + } + .update(&txn) + .await?; + txn.commit().await?; let version = self .notify_frontend( @@ -1322,9 +1346,19 @@ impl CatalogController { .one(&txn) .await? .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?; - let pb_connection: PbConnection = ObjectModel(connection, obj).into(); + + let mut pb_connection: PbConnection = ObjectModel(connection, obj).into(); + pb_connection.schema_id = new_schema as _; check_connection_name_duplicate(&pb_connection, &txn).await?; + object::ActiveModel { + oid: Set(object_id), + schema_id: Set(Some(new_schema)), + ..Default::default() + } + .update(&txn) + .await?; + txn.commit().await?; let version = self .notify_frontend( @@ -1337,6 +1371,7 @@ impl CatalogController { _ => unreachable!("not supported object type: {:?}", object_type), } + txn.commit().await?; let version = self .notify_frontend( Operation::Update, @@ -1748,7 +1783,7 @@ impl CatalogController { let obj = obj.unwrap(); let old_name = relation.name.clone(); relation.name = object_name.into(); - if obj.obj_type != ObjectType::Index { + if obj.obj_type != ObjectType::View { relation.definition = alter_relation_rename(&relation.definition, object_name); } let active_model = $table::ActiveModel { @@ -1778,6 +1813,7 @@ impl CatalogController { .unwrap(); index.name = object_name.into(); let index_table_id = index.index_table_id; + let old_name = rename_relation!(Table, table, table_id, index_table_id); // the name of index and its associated table is the same. let active_model = index::ActiveModel { @@ -1791,7 +1827,7 @@ impl CatalogController { ObjectModel(index, obj.unwrap()).into(), )), }); - rename_relation!(Table, table, table_id, index_table_id) + old_name } _ => unreachable!("only relation name can be altered."), }; diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 86a527c6a0eb..7b9426ec7855 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -38,7 +38,7 @@ use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion use risingwave_pb::catalog::{PbCreateType, PbTable}; use risingwave_pb::meta::relation::PbRelationInfo; use risingwave_pb::meta::subscribe_response::{ - Info as NotificationInfo, Info, Operation as NotificationOperation, Operation, + Info as NotificationInfo, Operation as NotificationOperation, Operation, }; use risingwave_pb::meta::table_fragments::PbActorStatus; use risingwave_pb::meta::{ @@ -543,7 +543,7 @@ impl CatalogController { let table = table::ActiveModel::from(table).update(&txn).await?; - let old_fragment_mappings = get_fragment_mappings(&txn, job_id).await?; + // let old_fragment_mappings = get_fragment_mappings(&txn, job_id).await?; // 1. replace old fragments/actors with new ones. Fragment::delete_many() .filter(fragment::Column::JobId.eq(job_id)) @@ -701,8 +701,11 @@ impl CatalogController { txn.commit().await?; - self.notify_fragment_mapping(NotificationOperation::Delete, old_fragment_mappings) - .await; + // FIXME: Do not notify frontend currently, because frontend nodes might refer to old table + // catalog and need to access the old fragment. Let frontend nodes delete the old fragment + // when they receive table catalog change. + // self.notify_fragment_mapping(NotificationOperation::Delete, old_fragment_mappings) + // .await; self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping) .await; let version = self @@ -1200,13 +1203,8 @@ impl CatalogController { } txn.commit().await?; - - for mapping in fragment_mapping_to_notify { - self.env - .notification_manager() - .notify_frontend(Operation::Update, Info::ParallelUnitMapping(mapping)) - .await; - } + self.notify_fragment_mapping(Operation::Update, fragment_mapping_to_notify) + .await; Ok(()) }