diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index d3cd7b6c8832b..71a73831a5f4e 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -783,10 +783,12 @@ impl CatalogManager { let core = &mut *self.core.lock().await; let database_core = &mut core.database; + let user_core = &mut core.user; for table in &tables { for relation_id in &table.dependent_relations { database_core.decrease_ref_count(*relation_id); } + user_core.decrease_ref(table.owner); } let tables = &mut database_core.tables; @@ -844,7 +846,7 @@ impl CatalogManager { &self, table: &Table, fragment_manager: FragmentManagerRef, - ) { + ) -> MetaResult<()> { let core = &mut *self.core.lock().await; let database_core = &mut core.database; let user_core = &mut core.user; @@ -854,7 +856,22 @@ impl CatalogManager { database_core.decrease_ref_count(dependent_relation_id); } user_core.decrease_ref(table.owner); - self.clean_dirty_tables(fragment_manager).await.unwrap(); + + let mut table_ids = vec![table.id]; + let fragment = fragment_manager + .select_table_fragments_by_table_id(&table.id.into()) + .await?; + let internal_table_ids = fragment.internal_table_ids(); + table_ids.extend(internal_table_ids); + + let tables = &mut database_core.tables; + let mut tables = BTreeMapTransaction::new(tables); + for table_id in table_ids { + let table = tables.remove(table_id); + assert!(table.is_some()) + } + commit_meta!(self, tables)?; + Ok(()) } /// return id of streaming jobs in the database which need to be dropped by stream manager. diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index a411f7b892815..29a2158ef6d2f 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -455,7 +455,7 @@ impl DdlController { let (ctx, table_fragments) = match result { Ok(r) => r, Err(e) => { - self.cancel_stream_job(&stream_job, internal_tables).await; + self.cancel_stream_job(&stream_job, internal_tables).await?; return Err(e); } }; @@ -512,7 +512,7 @@ impl DdlController { .create_streaming_job(table_fragments, ctx) .await; if let Err(e) = result { - self.cancel_stream_job(&stream_job, internal_tables).await; + self.cancel_stream_job(&stream_job, internal_tables).await?; return Err(e); }; self.finish_stream_job(stream_job, internal_tables).await @@ -724,7 +724,11 @@ impl DdlController { } /// `cancel_stream_job` cancels a stream job and clean some states. - async fn cancel_stream_job(&self, stream_job: &StreamingJob, internal_tables: Vec) { + async fn cancel_stream_job( + &self, + stream_job: &StreamingJob, + internal_tables: Vec
, + ) -> MetaResult<()> { let mut creating_internal_table_ids = internal_tables.into_iter().map(|t| t.id).collect_vec(); // 1. cancel create procedure. @@ -733,7 +737,7 @@ impl DdlController { creating_internal_table_ids.push(table.id); self.catalog_manager .cancel_create_table_procedure(table, self.fragment_manager.clone()) - .await; + .await?; } StreamingJob::Sink(sink) => { self.catalog_manager @@ -749,7 +753,7 @@ impl DdlController { } else { self.catalog_manager .cancel_create_table_procedure(table, self.fragment_manager.clone()) - .await; + .await?; } } StreamingJob::Index(index, table) => { @@ -763,6 +767,7 @@ impl DdlController { self.catalog_manager .unmark_creating_tables(&creating_internal_table_ids, true) .await; + Ok(()) } /// `finish_stream_job` finishes a stream job and clean some states.