Skip to content

Commit

Permalink
refactor: use table id instead of table ident (#2233)
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu authored Aug 23, 2023
1 parent fdb5ad2 commit beb92ba
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 20 deletions.
36 changes: 22 additions & 14 deletions src/catalog/src/remote/region_alive_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use table::engine::manager::TableEngineManagerRef;
use table::engine::{CloseTableResult, EngineContext, TableEngineRef};
use table::metadata::TableId;
use table::requests::CloseTableRequest;
use table::TableRef;
use tokio::sync::{mpsc, oneshot, Mutex};
Expand All @@ -40,7 +41,7 @@ use crate::error::{Result, TableEngineNotFoundSnafu};
/// [RegionAliveKeepers] manages all [RegionAliveKeeper] in a scope of tables.
pub struct RegionAliveKeepers {
table_engine_manager: TableEngineManagerRef,
keepers: Arc<Mutex<HashMap<TableIdent, Arc<RegionAliveKeeper>>>>,
keepers: Arc<Mutex<HashMap<TableId, Arc<RegionAliveKeeper>>>>,
heartbeat_interval_millis: u64,
started: AtomicBool,

Expand All @@ -65,12 +66,13 @@ impl RegionAliveKeepers {
}
}

pub async fn find_keeper(&self, table_ident: &TableIdent) -> Option<Arc<RegionAliveKeeper>> {
self.keepers.lock().await.get(table_ident).cloned()
pub async fn find_keeper(&self, table_id: TableId) -> Option<Arc<RegionAliveKeeper>> {
self.keepers.lock().await.get(&table_id).cloned()
}

pub async fn register_table(&self, table_ident: TableIdent, table: TableRef) -> Result<()> {
let keeper = self.find_keeper(&table_ident).await;
let table_id = table_ident.table_id;
let keeper = self.find_keeper(table_id).await;
if keeper.is_some() {
return Ok(());
}
Expand All @@ -92,7 +94,7 @@ impl RegionAliveKeepers {
}

let mut keepers = self.keepers.lock().await;
let _ = keepers.insert(table_ident.clone(), keeper.clone());
let _ = keepers.insert(table_id, keeper.clone());

if self.started.load(Ordering::Relaxed) {
keeper.start().await;
Expand All @@ -108,15 +110,16 @@ impl RegionAliveKeepers {
&self,
table_ident: &TableIdent,
) -> Option<Arc<RegionAliveKeeper>> {
self.keepers.lock().await.remove(table_ident).map(|x| {
let table_id = table_ident.table_id;
self.keepers.lock().await.remove(&table_id).map(|x| {
info!("Deregister RegionAliveKeeper for table {table_ident}");
x
})
}

pub async fn register_region(&self, region_ident: &RegionIdent) {
let table_ident = &region_ident.table_ident;
let Some(keeper) = self.find_keeper(table_ident).await else {
let table_id = region_ident.table_ident.table_id;
let Some(keeper) = self.find_keeper(table_id).await else {
// Alive keeper could be affected by lagging msg, just warn and ignore.
warn!("Alive keeper for region {region_ident} is not found!");
return;
Expand All @@ -125,8 +128,8 @@ impl RegionAliveKeepers {
}

pub async fn deregister_region(&self, region_ident: &RegionIdent) {
let table_ident = &region_ident.table_ident;
let Some(keeper) = self.find_keeper(table_ident).await else {
let table_id = region_ident.table_ident.table_id;
let Some(keeper) = self.find_keeper(table_id).await else {
// Alive keeper could be affected by lagging msg, just warn and ignore.
warn!("Alive keeper for region {region_ident} is not found!");
return;
Expand Down Expand Up @@ -178,7 +181,8 @@ impl HeartbeatResponseHandler for RegionAliveKeepers {
}
};

let Some(keeper) = self.keepers.lock().await.get(&table_ident).cloned() else {
let table_id = table_ident.table_id;
let Some(keeper) = self.keepers.lock().await.get(&table_id).cloned() else {
// Alive keeper could be affected by lagging msg, just warn and ignore.
warn!("Alive keeper for table {table_ident} is not found!");
continue;
Expand Down Expand Up @@ -547,7 +551,11 @@ mod test {
.register_table(table_ident.clone(), table)
.await
.unwrap();
assert!(keepers.keepers.lock().await.contains_key(&table_ident));
assert!(keepers
.keepers
.lock()
.await
.contains_key(&table_ident.table_id));

(table_ident, keepers)
}
Expand Down Expand Up @@ -602,7 +610,7 @@ mod test {
.keepers
.lock()
.await
.get(&table_ident)
.get(&table_ident.table_id)
.cloned()
.unwrap();

Expand Down Expand Up @@ -649,7 +657,7 @@ mod test {
})
.await;
let mut regions = keepers
.find_keeper(&table_ident)
.find_keeper(table_ident.table_id)
.await
.unwrap()
.countdown_task_handles
Expand Down
6 changes: 3 additions & 3 deletions src/catalog/tests/remote_catalog_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ mod tests {
assert!(catalog_manager.register_table(request).await.unwrap());

let keeper = region_alive_keepers
.find_keeper(&table_before)
.find_keeper(table_before.table_id)
.await
.unwrap();
let deadline = keeper.deadline(0).await.unwrap();
Expand Down Expand Up @@ -435,15 +435,15 @@ mod tests {
assert!(catalog_manager.register_table(request).await.unwrap());

let keeper = region_alive_keepers
.find_keeper(&table_after)
.find_keeper(table_after.table_id)
.await
.unwrap();
let deadline = keeper.deadline(0).await.unwrap();
// assert countdown is started for the table registered after [RegionAliveKeepers] started
assert!(deadline <= Instant::now() + Duration::from_secs(20));

let keeper = region_alive_keepers
.find_keeper(&table_before)
.find_keeper(table_before.table_id)
.await
.unwrap();
let deadline = keeper.deadline(0).await.unwrap();
Expand Down
9 changes: 6 additions & 3 deletions src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ async fn test_open_region_handler() {
InstructionReply::OpenRegion(SimpleReply { result: true, .. })
);

let keeper = region_alive_keepers.find_keeper(table_ident).await.unwrap();
let keeper = region_alive_keepers
.find_keeper(table_ident.table_id)
.await
.unwrap();
let deadline = keeper.deadline(0).await.unwrap();
assert!(deadline <= Instant::now() + Duration::from_secs(20));

Expand Down Expand Up @@ -203,7 +206,7 @@ async fn test_open_region_handler() {
);

assert!(region_alive_keepers
.find_keeper(&non_exist_table_ident)
.find_keeper(non_exist_table_ident.table_id)
.await
.is_none());

Expand All @@ -222,7 +225,7 @@ async fn test_open_region_handler() {
assert_test_table_not_found(instance.inner()).await;

assert!(region_alive_keepers
.find_keeper(table_ident)
.find_keeper(table_ident.table_id)
.await
.is_none());

Expand Down

0 comments on commit beb92ba

Please sign in to comment.