diff --git a/src/catalog/src/remote/region_alive_keeper.rs b/src/catalog/src/remote/region_alive_keeper.rs index c9106b286674..7f0c103abe28 100644 --- a/src/catalog/src/remote/region_alive_keeper.rs +++ b/src/catalog/src/remote/region_alive_keeper.rs @@ -29,6 +29,7 @@ use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionNumber; use table::engine::manager::TableEngineManagerRef; use table::engine::{CloseTableResult, EngineContext, TableEngineRef}; +use table::metadata::TableId; use table::requests::CloseTableRequest; use table::TableRef; use tokio::sync::{mpsc, oneshot, Mutex}; @@ -40,7 +41,7 @@ use crate::error::{Result, TableEngineNotFoundSnafu}; /// [RegionAliveKeepers] manages all [RegionAliveKeeper] in a scope of tables. pub struct RegionAliveKeepers { table_engine_manager: TableEngineManagerRef, - keepers: Arc>>>, + keepers: Arc>>>, heartbeat_interval_millis: u64, started: AtomicBool, @@ -65,12 +66,13 @@ impl RegionAliveKeepers { } } - pub async fn find_keeper(&self, table_ident: &TableIdent) -> Option> { - self.keepers.lock().await.get(table_ident).cloned() + pub async fn find_keeper(&self, table_id: TableId) -> Option> { + self.keepers.lock().await.get(&table_id).cloned() } pub async fn register_table(&self, table_ident: TableIdent, table: TableRef) -> Result<()> { - let keeper = self.find_keeper(&table_ident).await; + let table_id = table_ident.table_id; + let keeper = self.find_keeper(table_id).await; if keeper.is_some() { return Ok(()); } @@ -92,7 +94,7 @@ impl RegionAliveKeepers { } let mut keepers = self.keepers.lock().await; - let _ = keepers.insert(table_ident.clone(), keeper.clone()); + let _ = keepers.insert(table_id, keeper.clone()); if self.started.load(Ordering::Relaxed) { keeper.start().await; @@ -108,15 +110,16 @@ impl RegionAliveKeepers { &self, table_ident: &TableIdent, ) -> Option> { - self.keepers.lock().await.remove(table_ident).map(|x| { + let table_id = table_ident.table_id; + self.keepers.lock().await.remove(&table_id).map(|x| { info!("Deregister RegionAliveKeeper for table {table_ident}"); x }) } pub async fn register_region(&self, region_ident: &RegionIdent) { - let table_ident = ®ion_ident.table_ident; - let Some(keeper) = self.find_keeper(table_ident).await else { + let table_id = region_ident.table_ident.table_id; + let Some(keeper) = self.find_keeper(table_id).await else { // Alive keeper could be affected by lagging msg, just warn and ignore. warn!("Alive keeper for region {region_ident} is not found!"); return; @@ -125,8 +128,8 @@ impl RegionAliveKeepers { } pub async fn deregister_region(&self, region_ident: &RegionIdent) { - let table_ident = ®ion_ident.table_ident; - let Some(keeper) = self.find_keeper(table_ident).await else { + let table_id = region_ident.table_ident.table_id; + let Some(keeper) = self.find_keeper(table_id).await else { // Alive keeper could be affected by lagging msg, just warn and ignore. warn!("Alive keeper for region {region_ident} is not found!"); return; @@ -178,7 +181,8 @@ impl HeartbeatResponseHandler for RegionAliveKeepers { } }; - let Some(keeper) = self.keepers.lock().await.get(&table_ident).cloned() else { + let table_id = table_ident.table_id; + let Some(keeper) = self.keepers.lock().await.get(&table_id).cloned() else { // Alive keeper could be affected by lagging msg, just warn and ignore. warn!("Alive keeper for table {table_ident} is not found!"); continue; @@ -547,7 +551,11 @@ mod test { .register_table(table_ident.clone(), table) .await .unwrap(); - assert!(keepers.keepers.lock().await.contains_key(&table_ident)); + assert!(keepers + .keepers + .lock() + .await + .contains_key(&table_ident.table_id)); (table_ident, keepers) } @@ -602,7 +610,7 @@ mod test { .keepers .lock() .await - .get(&table_ident) + .get(&table_ident.table_id) .cloned() .unwrap(); @@ -649,7 +657,7 @@ mod test { }) .await; let mut regions = keepers - .find_keeper(&table_ident) + .find_keeper(table_ident.table_id) .await .unwrap() .countdown_task_handles diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs index bc48b3c9a6ba..a6ad6761e2dc 100644 --- a/src/catalog/tests/remote_catalog_tests.rs +++ b/src/catalog/tests/remote_catalog_tests.rs @@ -396,7 +396,7 @@ mod tests { assert!(catalog_manager.register_table(request).await.unwrap()); let keeper = region_alive_keepers - .find_keeper(&table_before) + .find_keeper(table_before.table_id) .await .unwrap(); let deadline = keeper.deadline(0).await.unwrap(); @@ -435,7 +435,7 @@ mod tests { assert!(catalog_manager.register_table(request).await.unwrap()); let keeper = region_alive_keepers - .find_keeper(&table_after) + .find_keeper(table_after.table_id) .await .unwrap(); let deadline = keeper.deadline(0).await.unwrap(); @@ -443,7 +443,7 @@ mod tests { assert!(deadline <= Instant::now() + Duration::from_secs(20)); let keeper = region_alive_keepers - .find_keeper(&table_before) + .find_keeper(table_before.table_id) .await .unwrap(); let deadline = keeper.deadline(0).await.unwrap(); diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 3de50a8e6f13..45a19e525318 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -173,7 +173,10 @@ async fn test_open_region_handler() { InstructionReply::OpenRegion(SimpleReply { result: true, .. }) ); - let keeper = region_alive_keepers.find_keeper(table_ident).await.unwrap(); + let keeper = region_alive_keepers + .find_keeper(table_ident.table_id) + .await + .unwrap(); let deadline = keeper.deadline(0).await.unwrap(); assert!(deadline <= Instant::now() + Duration::from_secs(20)); @@ -203,7 +206,7 @@ async fn test_open_region_handler() { ); assert!(region_alive_keepers - .find_keeper(&non_exist_table_ident) + .find_keeper(non_exist_table_ident.table_id) .await .is_none()); @@ -222,7 +225,7 @@ async fn test_open_region_handler() { assert_test_table_not_found(instance.inner()).await; assert!(region_alive_keepers - .find_keeper(table_ident) + .find_keeper(table_ident.table_id) .await .is_none());