Skip to content

Commit

Permalink
fix: deregister table after keeper closes table (GreptimeTeam#2278)
Browse files Browse the repository at this point in the history
* fix: deregister table after keeper closes table

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored and paomian committed Oct 19, 2023
1 parent ac9a952 commit 31c73af
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 45 deletions.
92 changes: 60 additions & 32 deletions src/catalog/src/local/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,26 +97,7 @@ impl CatalogManager for MemoryCatalogManager {
}

async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<()> {
let mut catalogs = self.catalogs.write().unwrap();
let schema = catalogs
.get_mut(&request.catalog)
.with_context(|| CatalogNotFoundSnafu {
catalog_name: &request.catalog,
})?
.get_mut(&request.schema)
.with_context(|| SchemaNotFoundSnafu {
catalog: &request.catalog,
schema: &request.schema,
})?;
let result = schema.remove(&request.table_name);
if result.is_some() {
decrement_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
1.0,
&[crate::metrics::db_label(&request.catalog, &request.schema)],
);
}
Ok(())
self.deregister_table_sync(request)
}

async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
Expand Down Expand Up @@ -157,15 +138,7 @@ impl CatalogManager for MemoryCatalogManager {
}

async fn schema_exist(&self, catalog: &str, schema: &str) -> Result<bool> {
Ok(self
.catalogs
.read()
.unwrap()
.get(catalog)
.with_context(|| CatalogNotFoundSnafu {
catalog_name: catalog,
})?
.contains_key(schema))
self.schema_exist_sync(catalog, schema)
}

async fn table(
Expand All @@ -187,7 +160,7 @@ impl CatalogManager for MemoryCatalogManager {
}

async fn catalog_exist(&self, catalog: &str) -> Result<bool> {
Ok(self.catalogs.read().unwrap().get(catalog).is_some())
self.catalog_exist_sync(catalog)
}

async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> Result<bool> {
Expand Down Expand Up @@ -267,6 +240,22 @@ impl MemoryCatalogManager {
manager
}

fn schema_exist_sync(&self, catalog: &str, schema: &str) -> Result<bool> {
Ok(self
.catalogs
.read()
.unwrap()
.get(catalog)
.with_context(|| CatalogNotFoundSnafu {
catalog_name: catalog,
})?
.contains_key(schema))
}

fn catalog_exist_sync(&self, catalog: &str) -> Result<bool> {
Ok(self.catalogs.read().unwrap().get(catalog).is_some())
}

/// Registers a catalog if it does not exist and returns false if the schema exists.
pub fn register_catalog_sync(self: &Arc<Self>, name: String) -> Result<bool> {
let mut catalogs = self.catalogs.write().unwrap();
Expand All @@ -282,6 +271,29 @@ impl MemoryCatalogManager {
}
}

pub fn deregister_table_sync(&self, request: DeregisterTableRequest) -> Result<()> {
let mut catalogs = self.catalogs.write().unwrap();
let schema = catalogs
.get_mut(&request.catalog)
.with_context(|| CatalogNotFoundSnafu {
catalog_name: &request.catalog,
})?
.get_mut(&request.schema)
.with_context(|| SchemaNotFoundSnafu {
catalog: &request.catalog,
schema: &request.schema,
})?;
let result = schema.remove(&request.table_name);
if result.is_some() {
decrement_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
1.0,
&[crate::metrics::db_label(&request.catalog, &request.schema)],
);
}
Ok(())
}

/// Registers a schema if it does not exist.
/// It returns an error if the catalog does not exist,
/// and returns false if the schema exists.
Expand Down Expand Up @@ -345,9 +357,25 @@ impl MemoryCatalogManager {
#[cfg(any(test, feature = "testing"))]
pub fn new_with_table(table: TableRef) -> Arc<Self> {
let manager = Self::with_default_setup();
let catalog = &table.table_info().catalog_name;
let schema = &table.table_info().schema_name;

if !manager.catalog_exist_sync(catalog).unwrap() {
manager.register_catalog_sync(catalog.to_string()).unwrap();
}

if !manager.schema_exist_sync(catalog, schema).unwrap() {
manager
.register_schema_sync(RegisterSchemaRequest {
catalog: catalog.to_string(),
schema: schema.to_string(),
})
.unwrap();
}

let request = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
catalog: catalog.to_string(),
schema: schema.to_string(),
table_name: table.table_info().name.clone(),
table_id: table.table_info().ident.table_id,
table,
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/remote/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ async fn register_table(
engine: table_info.meta.engine.clone(),
};
region_alive_keepers
.register_table(table_ident, table)
.register_table(table_ident, table, memory_catalog_manager.clone())
.await?;
}

Expand Down
55 changes: 44 additions & 11 deletions src/catalog/src/remote/region_alive_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use tokio::task::JoinHandle;
use tokio::time::{Duration, Instant};

use crate::error::{Result, TableEngineNotFoundSnafu};
use crate::local::MemoryCatalogManager;
use crate::DeregisterTableRequest;

/// [RegionAliveKeepers] manages all [RegionAliveKeeper] in a scope of tables.
pub struct RegionAliveKeepers {
Expand Down Expand Up @@ -70,7 +72,12 @@ impl RegionAliveKeepers {
self.keepers.lock().await.get(&table_id).cloned()
}

pub async fn register_table(&self, table_ident: TableIdent, table: TableRef) -> Result<()> {
pub async fn register_table(
&self,
table_ident: TableIdent,
table: TableRef,
catalog_manager: Arc<MemoryCatalogManager>,
) -> Result<()> {
let table_id = table_ident.table_id;
let keeper = self.find_keeper(table_id).await;
if keeper.is_some() {
Expand All @@ -86,6 +93,7 @@ impl RegionAliveKeepers {

let keeper = Arc::new(RegionAliveKeeper::new(
table_engine,
catalog_manager,
table_ident.clone(),
self.heartbeat_interval_millis,
));
Expand Down Expand Up @@ -203,6 +211,7 @@ impl HeartbeatResponseHandler for RegionAliveKeepers {
/// Datanode, it will "extend" the region's "lease", with a deadline for [RegionAliveKeeper] to
/// countdown.
pub struct RegionAliveKeeper {
catalog_manager: Arc<MemoryCatalogManager>,
table_engine: TableEngineRef,
table_ident: TableIdent,
countdown_task_handles: Arc<Mutex<HashMap<RegionNumber, Arc<CountdownTaskHandle>>>>,
Expand All @@ -213,10 +222,12 @@ pub struct RegionAliveKeeper {
impl RegionAliveKeeper {
fn new(
table_engine: TableEngineRef,
catalog_manager: Arc<MemoryCatalogManager>,
table_ident: TableIdent,
heartbeat_interval_millis: u64,
) -> Self {
Self {
catalog_manager,
table_engine,
table_ident,
countdown_task_handles: Arc::new(Mutex::new(HashMap::new())),
Expand Down Expand Up @@ -244,11 +255,29 @@ impl RegionAliveKeeper {
let _ = x.lock().await.remove(&region);
} // Else the countdown task handles map could be dropped because the keeper is dropped.
};
let catalog_manager = self.catalog_manager.clone();
let ident = self.table_ident.clone();
let handle = Arc::new(CountdownTaskHandle::new(
self.table_engine.clone(),
self.table_ident.clone(),
region,
|| on_task_finished,
move |result: Option<CloseTableResult>| {
if matches!(result, Some(CloseTableResult::Released(_))) {
let result = catalog_manager.deregister_table_sync(DeregisterTableRequest {
catalog: ident.catalog.to_string(),
schema: ident.schema.to_string(),
table_name: ident.table.to_string(),
});

info!(
"Deregister table: {} after countdown task finished, result: {result:?}",
ident.table_id
);
} else {
debug!("Countdown task returns: {result:?}");
}
on_task_finished
},
));

let mut handles = self.countdown_task_handles.lock().await;
Expand Down Expand Up @@ -347,7 +376,7 @@ impl CountdownTaskHandle {
table_engine: TableEngineRef,
table_ident: TableIdent,
region: RegionNumber,
on_task_finished: impl FnOnce() -> Fut + Send + 'static,
on_task_finished: impl FnOnce(Option<CloseTableResult>) -> Fut + Send + 'static,
) -> Self
where
Fut: Future<Output = ()> + Send,
Expand All @@ -361,8 +390,8 @@ impl CountdownTaskHandle {
rx,
};
let handler = common_runtime::spawn_bg(async move {
countdown_task.run().await;
on_task_finished().await;
let result = countdown_task.run().await;
on_task_finished(result).await;
});

Self {
Expand Down Expand Up @@ -414,7 +443,8 @@ struct CountdownTask {
}

impl CountdownTask {
async fn run(&mut self) {
// returns true if
async fn run(&mut self) -> Option<CloseTableResult> {
// 30 years. See `Instant::far_future`.
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);

Expand Down Expand Up @@ -468,10 +498,11 @@ impl CountdownTask {
"Region {region} of table {table_ident} is closed, result: {result:?}. \
RegionAliveKeeper out.",
);
break;
return Some(result);
}
}
}
None
}

async fn close_region(&self) -> CloseTableResult {
Expand Down Expand Up @@ -547,8 +578,9 @@ mod test {
table_options: TableOptions::default(),
engine: "MockTableEngine".to_string(),
}));
let catalog_manager = MemoryCatalogManager::new_with_table(table.clone());
keepers
.register_table(table_ident.clone(), table)
.register_table(table_ident.clone(), table, catalog_manager)
.await
.unwrap();
assert!(keepers
Expand Down Expand Up @@ -684,7 +716,8 @@ mod test {
table_id: 1024,
engine: "mito".to_string(),
};
let keeper = RegionAliveKeeper::new(table_engine, table_ident, 1000);
let catalog_manager = MemoryCatalogManager::with_default_setup();
let keeper = RegionAliveKeeper::new(table_engine, catalog_manager, table_ident, 1000);

let region = 1;
assert!(keeper.find_handle(&region).await.is_none());
Expand Down Expand Up @@ -727,7 +760,7 @@ mod test {
table_engine.clone(),
table_ident.clone(),
1,
|| async move { finished_clone.store(true, Ordering::Relaxed) },
|_| async move { finished_clone.store(true, Ordering::Relaxed) },
);
let tx = handle.tx.clone();

Expand All @@ -749,7 +782,7 @@ mod test {

let finished = Arc::new(AtomicBool::new(false));
let finished_clone = finished.clone();
let handle = CountdownTaskHandle::new(table_engine, table_ident, 1, || async move {
let handle = CountdownTaskHandle::new(table_engine, table_ident, 1, |_| async move {
finished_clone.store(true, Ordering::Relaxed)
});
handle.tx.send(CountdownCommand::Start(100)).await.unwrap();
Expand Down
5 changes: 4 additions & 1 deletion src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use api::v1::greptime_request::Request as GrpcRequest;
use api::v1::meta::HeartbeatResponse;
use api::v1::query_request::Query;
use api::v1::QueryRequest;
use catalog::local::MemoryCatalogManager;
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
use catalog::CatalogManagerRef;
use common_meta::heartbeat::handler::{
Expand Down Expand Up @@ -160,8 +161,10 @@ async fn test_open_region_handler() {
let table_ident = &region_ident.table_ident;

let table = prepare_table(instance.inner()).await;

let dummy_catalog_manager = MemoryCatalogManager::with_default_setup();
region_alive_keepers
.register_table(table_ident.clone(), table)
.register_table(table_ident.clone(), table, dummy_catalog_manager)
.await
.unwrap();

Expand Down

0 comments on commit 31c73af

Please sign in to comment.