diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index b63c1ede24fbc..8a0af601562ae 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -979,46 +979,24 @@ impl CommandContext { tracing::debug!(id = ?table_fragments.table_id(), "cancelling stream job"); self.clean_up(table_fragments.actor_ids()).await?; - // NOTE(kwannoel): At this point, meta has already registered the table ids. - // We should unregister them. - // This is required for background ddl, for foreground ddl this is a no-op. - // Foreground ddl is handled entirely by stream manager, so it will unregister - // the table ids on failure. - // On the other hand background ddl could be handled by barrier manager. - // It won't clean the tables on failure, - // since the failure could be recoverable. - // As such it needs to be handled here. - self.barrier_manager_context - .hummock_manager - .unregister_table_ids(table_fragments.all_table_ids().map(TableId::new)) - .await?; - match &self.barrier_manager_context.metadata_manager { MetadataManager::V1(mgr) => { // NOTE(kwannoel): At this point, catalog manager has persisted the tables already. // We need to cleanup the table state. So we can do it here. - // The logic is the same as above, for hummock_manager.unregister_table_ids. - if let Err(e) = mgr - .catalog_manager + mgr.catalog_manager .cancel_create_materialized_view_procedure( table_fragments.table_id().table_id, table_fragments.internal_table_ids(), ) .await - { - let table_id = table_fragments.table_id().table_id; - tracing::warn!( - table_id, - error = %e.as_report(), - "cancel_create_table_procedure failed for CancelStreamingJob", - ); - // If failed, check that table is not in meta store. - // If any table is, just panic, let meta do bootstrap recovery. - // Otherwise our persisted state is dirty. - let mut table_ids = table_fragments.internal_table_ids(); - table_ids.push(table_id); - mgr.catalog_manager.assert_tables_deleted(table_ids).await; - } + .inspect_err(|e| { + let table_id = table_fragments.table_id().table_id; + tracing::warn!( + table_id, + error = %e.as_report(), + "cancel_create_table_procedure failed for CancelStreamingJob", + ); + })?; // We need to drop table fragments here, // since this is not done in stream manager (foreground ddl) @@ -1038,6 +1016,20 @@ impl CommandContext { .await?; } } + + // NOTE(kwannoel): At this point, meta has already registered the table ids. + // We should unregister them. + // This is required for background ddl, for foreground ddl this is a no-op. + // Foreground ddl is handled entirely by stream manager, so it will unregister + // the table ids on failure. + // On the other hand background ddl could be handled by barrier manager. + // It won't clean the tables on failure, + // since the failure could be recoverable. + // As such it needs to be handled here. + self.barrier_manager_context + .hummock_manager + .unregister_table_ids(table_fragments.all_table_ids().map(TableId::new)) + .await?; } Command::CreateStreamingJob { info, job_type } => { diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 7c457e4f056db..65ea2a5ba2fc4 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -1132,14 +1132,6 @@ impl CatalogManager { }; } - pub async fn assert_tables_deleted(&self, table_ids: Vec) { - let core = self.core.lock().await; - let tables = &core.database.tables; - for id in table_ids { - assert_eq!(tables.get(&id), None,) - } - } - /// We clean the following tables: /// 1. Those which belonged to incomplete Foreground jobs. /// 2. Those which did not persist their table fragments, we can't recover these.