diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 72024ff98711c..cb7173943dcd5 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -1295,63 +1295,57 @@ impl CatalogManager { table_id: TableId, internal_table_ids: Vec, ) -> MetaResult { - let (table, internal_tables) = { - let core = &mut self.core.lock().await; - let database_core = &mut core.database; - let tables = &mut database_core.tables; - let Some(table) = tables.get(&table_id).cloned() else { - tracing::warn!( - "table_id {} missing when attempting to cancel job, could be cleaned on recovery", - table_id - ); - return Ok(false); - }; - let mut internal_tables = vec![]; - for internal_table_id in &internal_table_ids { - if let Some(table) = tables.get(internal_table_id) { - internal_tables.push(table.clone()); - } + let core = &mut self.core.lock().await; + let database_core = &mut core.database; + let tables = &mut database_core.tables; + let Some(table) = tables.get(&table_id).cloned() else { + tracing::warn!( + "table_id {} missing when attempting to cancel job, could be cleaned on recovery", + table_id + ); + return Ok(false); + }; + let mut internal_tables = vec![]; + for internal_table_id in &internal_table_ids { + if let Some(table) = tables.get(internal_table_id) { + internal_tables.push(table.clone()); } + } - // `Unspecified` maps to Created state, due to backwards compatibility. - // `Created` states should not be cancelled. - if table - .get_stream_job_status() - .unwrap_or(StreamJobStatus::Created) - != StreamJobStatus::Creating - { - return Err(MetaError::invalid_parameter(format!( - "table is not in creating state id={:#?}", - table_id - ))); - } + // `Unspecified` maps to Created state, due to backwards compatibility. + // `Created` states should not be cancelled. + if table + .get_stream_job_status() + .unwrap_or(StreamJobStatus::Created) + != StreamJobStatus::Creating + { + return Err(MetaError::invalid_parameter(format!( + "table is not in creating state id={:#?}", + table_id + ))); + } - tracing::trace!("cleanup tables for {}", table.id); - let mut table_ids = vec![table.id]; - table_ids.extend(internal_table_ids); + tracing::trace!("cleanup tables for {}", table.id); + let mut table_ids = vec![table.id]; + table_ids.extend(internal_table_ids); - let tables = &mut database_core.tables; - let mut tables = BTreeMapTransaction::new(tables); - for table_id in table_ids { - let res = tables.remove(table_id); - assert!(res.is_some(), "table_id {} missing", table_id); - } - commit_meta!(self, tables)?; - (table, internal_tables) - }; + let tables = &mut database_core.tables; + let mut tables = BTreeMapTransaction::new(tables); + for table_id in table_ids { + let res = tables.remove(table_id); + assert!(res.is_some(), "table_id {} missing", table_id); + } + commit_meta!(self, tables)?; { - let core = &mut self.core.lock().await; - { - let user_core = &mut core.user; - user_core.decrease_ref(table.owner); - } + let user_core = &mut core.user; + user_core.decrease_ref(table.owner); + } - { - let database_core = &mut core.database; - for &dependent_relation_id in &table.dependent_relations { - database_core.decrease_relation_ref_count(dependent_relation_id); - } + { + let database_core = &mut core.database; + for &dependent_relation_id in &table.dependent_relations { + database_core.decrease_relation_ref_count(dependent_relation_id); } } @@ -1371,6 +1365,7 @@ impl CatalogManager { }), ) .await; + Ok(true) }