From b84b69e50fe67f2751b117aa85e43e5171301355 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 5 Jul 2024 16:09:12 +0800 Subject: [PATCH 1/2] fix --- src/meta/src/manager/catalog/mod.rs | 37 +++++++++++++++-------------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 72024ff98711c..1acf1f4638b99 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -1295,7 +1295,7 @@ impl CatalogManager { table_id: TableId, internal_table_ids: Vec, ) -> MetaResult { - let (table, internal_tables) = { + let table = { let core = &mut self.core.lock().await; let database_core = &mut core.database; let tables = &mut database_core.tables; @@ -1337,7 +1337,24 @@ impl CatalogManager { assert!(res.is_some(), "table_id {} missing", table_id); } commit_meta!(self, tables)?; - (table, internal_tables) + + // FIXME(kwannoel): Propagate version to fe + let _version = self + .notify_frontend( + Operation::Delete, + Info::RelationGroup(RelationGroup { + relations: vec![Relation { + relation_info: RelationInfo::Table(table.to_owned()).into(), + }] + .into_iter() + .chain(internal_tables.into_iter().map(|internal_table| Relation { + relation_info: RelationInfo::Table(internal_table).into(), + })) + .collect_vec(), + }), + ) + .await; + table }; { @@ -1355,22 +1372,6 @@ impl CatalogManager { } } - // FIXME(kwannoel): Propagate version to fe - let _version = self - .notify_frontend( - Operation::Delete, - Info::RelationGroup(RelationGroup { - relations: vec![Relation { - relation_info: RelationInfo::Table(table.to_owned()).into(), - }] - .into_iter() - .chain(internal_tables.into_iter().map(|internal_table| Relation { - relation_info: RelationInfo::Table(internal_table).into(), - })) - .collect_vec(), - }), - ) - .await; Ok(true) } From 5d2d9104a1caab546e981ae0b2c0bd683494476d Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 5 Jul 2024 17:12:35 +0800 Subject: [PATCH 2/2] dont separate core lock --- src/meta/src/manager/catalog/mod.rs | 128 +++++++++++++--------------- 1 file changed, 61 insertions(+), 67 deletions(-) diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 1acf1f4638b99..cb7173943dcd5 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -1295,83 +1295,77 @@ impl CatalogManager { table_id: TableId, internal_table_ids: Vec, ) -> MetaResult { - let table = { - let core = &mut self.core.lock().await; - let database_core = &mut core.database; - let tables = &mut database_core.tables; - let Some(table) = tables.get(&table_id).cloned() else { - tracing::warn!( - "table_id {} missing when attempting to cancel job, could be cleaned on recovery", - table_id - ); - return Ok(false); - }; - let mut internal_tables = vec![]; - for internal_table_id in &internal_table_ids { - if let Some(table) = tables.get(internal_table_id) { - internal_tables.push(table.clone()); - } - } - - // `Unspecified` maps to Created state, due to backwards compatibility. - // `Created` states should not be cancelled. - if table - .get_stream_job_status() - .unwrap_or(StreamJobStatus::Created) - != StreamJobStatus::Creating - { - return Err(MetaError::invalid_parameter(format!( - "table is not in creating state id={:#?}", - table_id - ))); + let core = &mut self.core.lock().await; + let database_core = &mut core.database; + let tables = &mut database_core.tables; + let Some(table) = tables.get(&table_id).cloned() else { + tracing::warn!( + "table_id {} missing when attempting to cancel job, could be cleaned on recovery", + table_id + ); + return Ok(false); + }; + let mut internal_tables = vec![]; + for internal_table_id in &internal_table_ids { + if let Some(table) = tables.get(internal_table_id) { + internal_tables.push(table.clone()); } + } - tracing::trace!("cleanup tables for {}", table.id); - let mut table_ids = vec![table.id]; - table_ids.extend(internal_table_ids); + // `Unspecified` maps to Created state, due to backwards compatibility. + // `Created` states should not be cancelled. + if table + .get_stream_job_status() + .unwrap_or(StreamJobStatus::Created) + != StreamJobStatus::Creating + { + return Err(MetaError::invalid_parameter(format!( + "table is not in creating state id={:#?}", + table_id + ))); + } - let tables = &mut database_core.tables; - let mut tables = BTreeMapTransaction::new(tables); - for table_id in table_ids { - let res = tables.remove(table_id); - assert!(res.is_some(), "table_id {} missing", table_id); - } - commit_meta!(self, tables)?; + tracing::trace!("cleanup tables for {}", table.id); + let mut table_ids = vec![table.id]; + table_ids.extend(internal_table_ids); - // FIXME(kwannoel): Propagate version to fe - let _version = self - .notify_frontend( - Operation::Delete, - Info::RelationGroup(RelationGroup { - relations: vec![Relation { - relation_info: RelationInfo::Table(table.to_owned()).into(), - }] - .into_iter() - .chain(internal_tables.into_iter().map(|internal_table| Relation { - relation_info: RelationInfo::Table(internal_table).into(), - })) - .collect_vec(), - }), - ) - .await; - table - }; + let tables = &mut database_core.tables; + let mut tables = BTreeMapTransaction::new(tables); + for table_id in table_ids { + let res = tables.remove(table_id); + assert!(res.is_some(), "table_id {} missing", table_id); + } + commit_meta!(self, tables)?; { - let core = &mut self.core.lock().await; - { - let user_core = &mut core.user; - user_core.decrease_ref(table.owner); - } + let user_core = &mut core.user; + user_core.decrease_ref(table.owner); + } - { - let database_core = &mut core.database; - for &dependent_relation_id in &table.dependent_relations { - database_core.decrease_relation_ref_count(dependent_relation_id); - } + { + let database_core = &mut core.database; + for &dependent_relation_id in &table.dependent_relations { + database_core.decrease_relation_ref_count(dependent_relation_id); } } + // FIXME(kwannoel): Propagate version to fe + let _version = self + .notify_frontend( + Operation::Delete, + Info::RelationGroup(RelationGroup { + relations: vec![Relation { + relation_info: RelationInfo::Table(table.to_owned()).into(), + }] + .into_iter() + .chain(internal_tables.into_iter().map(|internal_table| Relation { + relation_info: RelationInfo::Table(internal_table).into(), + })) + .collect_vec(), + }), + ) + .await; + Ok(true) }