From 31c73afddefacc1a005420206b8560f5acf68d7c Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 30 Aug 2023 11:43:04 +0800 Subject: [PATCH] fix: deregister table after keeper closes table (#2278) * fix: deregister table after keeper closes table * chore: apply suggestions from CR --- src/catalog/src/local/memory.rs | 92 ++++++++++++------- src/catalog/src/remote/manager.rs | 2 +- src/catalog/src/remote/region_alive_keeper.rs | 55 ++++++++--- src/datanode/src/tests.rs | 5 +- 4 files changed, 109 insertions(+), 45 deletions(-) diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index a47de669afbd..565224ff68b1 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -97,26 +97,7 @@ impl CatalogManager for MemoryCatalogManager { } async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<()> { - let mut catalogs = self.catalogs.write().unwrap(); - let schema = catalogs - .get_mut(&request.catalog) - .with_context(|| CatalogNotFoundSnafu { - catalog_name: &request.catalog, - })? - .get_mut(&request.schema) - .with_context(|| SchemaNotFoundSnafu { - catalog: &request.catalog, - schema: &request.schema, - })?; - let result = schema.remove(&request.table_name); - if result.is_some() { - decrement_gauge!( - crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT, - 1.0, - &[crate::metrics::db_label(&request.catalog, &request.schema)], - ); - } - Ok(()) + self.deregister_table_sync(request) } async fn register_schema(&self, request: RegisterSchemaRequest) -> Result { @@ -157,15 +138,7 @@ impl CatalogManager for MemoryCatalogManager { } async fn schema_exist(&self, catalog: &str, schema: &str) -> Result { - Ok(self - .catalogs - .read() - .unwrap() - .get(catalog) - .with_context(|| CatalogNotFoundSnafu { - catalog_name: catalog, - })? - .contains_key(schema)) + self.schema_exist_sync(catalog, schema) } async fn table( @@ -187,7 +160,7 @@ impl CatalogManager for MemoryCatalogManager { } async fn catalog_exist(&self, catalog: &str) -> Result { - Ok(self.catalogs.read().unwrap().get(catalog).is_some()) + self.catalog_exist_sync(catalog) } async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> Result { @@ -267,6 +240,22 @@ impl MemoryCatalogManager { manager } + fn schema_exist_sync(&self, catalog: &str, schema: &str) -> Result { + Ok(self + .catalogs + .read() + .unwrap() + .get(catalog) + .with_context(|| CatalogNotFoundSnafu { + catalog_name: catalog, + })? + .contains_key(schema)) + } + + fn catalog_exist_sync(&self, catalog: &str) -> Result { + Ok(self.catalogs.read().unwrap().get(catalog).is_some()) + } + /// Registers a catalog if it does not exist and returns false if the schema exists. pub fn register_catalog_sync(self: &Arc, name: String) -> Result { let mut catalogs = self.catalogs.write().unwrap(); @@ -282,6 +271,29 @@ impl MemoryCatalogManager { } } + pub fn deregister_table_sync(&self, request: DeregisterTableRequest) -> Result<()> { + let mut catalogs = self.catalogs.write().unwrap(); + let schema = catalogs + .get_mut(&request.catalog) + .with_context(|| CatalogNotFoundSnafu { + catalog_name: &request.catalog, + })? + .get_mut(&request.schema) + .with_context(|| SchemaNotFoundSnafu { + catalog: &request.catalog, + schema: &request.schema, + })?; + let result = schema.remove(&request.table_name); + if result.is_some() { + decrement_gauge!( + crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT, + 1.0, + &[crate::metrics::db_label(&request.catalog, &request.schema)], + ); + } + Ok(()) + } + /// Registers a schema if it does not exist. /// It returns an error if the catalog does not exist, /// and returns false if the schema exists. @@ -345,9 +357,25 @@ impl MemoryCatalogManager { #[cfg(any(test, feature = "testing"))] pub fn new_with_table(table: TableRef) -> Arc { let manager = Self::with_default_setup(); + let catalog = &table.table_info().catalog_name; + let schema = &table.table_info().schema_name; + + if !manager.catalog_exist_sync(catalog).unwrap() { + manager.register_catalog_sync(catalog.to_string()).unwrap(); + } + + if !manager.schema_exist_sync(catalog, schema).unwrap() { + manager + .register_schema_sync(RegisterSchemaRequest { + catalog: catalog.to_string(), + schema: schema.to_string(), + }) + .unwrap(); + } + let request = RegisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), + catalog: catalog.to_string(), + schema: schema.to_string(), table_name: table.table_info().name.clone(), table_id: table.table_info().ident.table_id, table, diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index bc9e296d671f..ddbd9f71ec11 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -226,7 +226,7 @@ async fn register_table( engine: table_info.meta.engine.clone(), }; region_alive_keepers - .register_table(table_ident, table) + .register_table(table_ident, table, memory_catalog_manager.clone()) .await?; } diff --git a/src/catalog/src/remote/region_alive_keeper.rs b/src/catalog/src/remote/region_alive_keeper.rs index 7f0c103abe28..f643487ec773 100644 --- a/src/catalog/src/remote/region_alive_keeper.rs +++ b/src/catalog/src/remote/region_alive_keeper.rs @@ -37,6 +37,8 @@ use tokio::task::JoinHandle; use tokio::time::{Duration, Instant}; use crate::error::{Result, TableEngineNotFoundSnafu}; +use crate::local::MemoryCatalogManager; +use crate::DeregisterTableRequest; /// [RegionAliveKeepers] manages all [RegionAliveKeeper] in a scope of tables. pub struct RegionAliveKeepers { @@ -70,7 +72,12 @@ impl RegionAliveKeepers { self.keepers.lock().await.get(&table_id).cloned() } - pub async fn register_table(&self, table_ident: TableIdent, table: TableRef) -> Result<()> { + pub async fn register_table( + &self, + table_ident: TableIdent, + table: TableRef, + catalog_manager: Arc, + ) -> Result<()> { let table_id = table_ident.table_id; let keeper = self.find_keeper(table_id).await; if keeper.is_some() { @@ -86,6 +93,7 @@ impl RegionAliveKeepers { let keeper = Arc::new(RegionAliveKeeper::new( table_engine, + catalog_manager, table_ident.clone(), self.heartbeat_interval_millis, )); @@ -203,6 +211,7 @@ impl HeartbeatResponseHandler for RegionAliveKeepers { /// Datanode, it will "extend" the region's "lease", with a deadline for [RegionAliveKeeper] to /// countdown. pub struct RegionAliveKeeper { + catalog_manager: Arc, table_engine: TableEngineRef, table_ident: TableIdent, countdown_task_handles: Arc>>>, @@ -213,10 +222,12 @@ pub struct RegionAliveKeeper { impl RegionAliveKeeper { fn new( table_engine: TableEngineRef, + catalog_manager: Arc, table_ident: TableIdent, heartbeat_interval_millis: u64, ) -> Self { Self { + catalog_manager, table_engine, table_ident, countdown_task_handles: Arc::new(Mutex::new(HashMap::new())), @@ -244,11 +255,29 @@ impl RegionAliveKeeper { let _ = x.lock().await.remove(®ion); } // Else the countdown task handles map could be dropped because the keeper is dropped. }; + let catalog_manager = self.catalog_manager.clone(); + let ident = self.table_ident.clone(); let handle = Arc::new(CountdownTaskHandle::new( self.table_engine.clone(), self.table_ident.clone(), region, - || on_task_finished, + move |result: Option| { + if matches!(result, Some(CloseTableResult::Released(_))) { + let result = catalog_manager.deregister_table_sync(DeregisterTableRequest { + catalog: ident.catalog.to_string(), + schema: ident.schema.to_string(), + table_name: ident.table.to_string(), + }); + + info!( + "Deregister table: {} after countdown task finished, result: {result:?}", + ident.table_id + ); + } else { + debug!("Countdown task returns: {result:?}"); + } + on_task_finished + }, )); let mut handles = self.countdown_task_handles.lock().await; @@ -347,7 +376,7 @@ impl CountdownTaskHandle { table_engine: TableEngineRef, table_ident: TableIdent, region: RegionNumber, - on_task_finished: impl FnOnce() -> Fut + Send + 'static, + on_task_finished: impl FnOnce(Option) -> Fut + Send + 'static, ) -> Self where Fut: Future + Send, @@ -361,8 +390,8 @@ impl CountdownTaskHandle { rx, }; let handler = common_runtime::spawn_bg(async move { - countdown_task.run().await; - on_task_finished().await; + let result = countdown_task.run().await; + on_task_finished(result).await; }); Self { @@ -414,7 +443,8 @@ struct CountdownTask { } impl CountdownTask { - async fn run(&mut self) { + // returns true if + async fn run(&mut self) -> Option { // 30 years. See `Instant::far_future`. let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30); @@ -468,10 +498,11 @@ impl CountdownTask { "Region {region} of table {table_ident} is closed, result: {result:?}. \ RegionAliveKeeper out.", ); - break; + return Some(result); } } } + None } async fn close_region(&self) -> CloseTableResult { @@ -547,8 +578,9 @@ mod test { table_options: TableOptions::default(), engine: "MockTableEngine".to_string(), })); + let catalog_manager = MemoryCatalogManager::new_with_table(table.clone()); keepers - .register_table(table_ident.clone(), table) + .register_table(table_ident.clone(), table, catalog_manager) .await .unwrap(); assert!(keepers @@ -684,7 +716,8 @@ mod test { table_id: 1024, engine: "mito".to_string(), }; - let keeper = RegionAliveKeeper::new(table_engine, table_ident, 1000); + let catalog_manager = MemoryCatalogManager::with_default_setup(); + let keeper = RegionAliveKeeper::new(table_engine, catalog_manager, table_ident, 1000); let region = 1; assert!(keeper.find_handle(®ion).await.is_none()); @@ -727,7 +760,7 @@ mod test { table_engine.clone(), table_ident.clone(), 1, - || async move { finished_clone.store(true, Ordering::Relaxed) }, + |_| async move { finished_clone.store(true, Ordering::Relaxed) }, ); let tx = handle.tx.clone(); @@ -749,7 +782,7 @@ mod test { let finished = Arc::new(AtomicBool::new(false)); let finished_clone = finished.clone(); - let handle = CountdownTaskHandle::new(table_engine, table_ident, 1, || async move { + let handle = CountdownTaskHandle::new(table_engine, table_ident, 1, |_| async move { finished_clone.store(true, Ordering::Relaxed) }); handle.tx.send(CountdownCommand::Start(100)).await.unwrap(); diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index f220981e9cee..00a256b5e445 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -20,6 +20,7 @@ use api::v1::greptime_request::Request as GrpcRequest; use api::v1::meta::HeartbeatResponse; use api::v1::query_request::Query; use api::v1::QueryRequest; +use catalog::local::MemoryCatalogManager; use catalog::remote::region_alive_keeper::RegionAliveKeepers; use catalog::CatalogManagerRef; use common_meta::heartbeat::handler::{ @@ -160,8 +161,10 @@ async fn test_open_region_handler() { let table_ident = ®ion_ident.table_ident; let table = prepare_table(instance.inner()).await; + + let dummy_catalog_manager = MemoryCatalogManager::with_default_setup(); region_alive_keepers - .register_table(table_ident.clone(), table) + .register_table(table_ident.clone(), table, dummy_catalog_manager) .await .unwrap();