From 94496a836e641488ceb5485e16215b3057955d93 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Sun, 8 Oct 2023 10:44:40 +0000 Subject: [PATCH] refactor: refactor cache invalidator --- src/catalog/src/kvbackend/manager.rs | 12 ++---- src/common/meta/src/cache_invalidator.rs | 39 +++++++++++-------- .../handler/invalidate_table_cache.rs | 12 ++---- 3 files changed, 31 insertions(+), 32 deletions(-) diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index c434f1c91841..0f3ef83b87f5 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -18,9 +18,7 @@ use std::sync::{Arc, Weak}; use common_catalog::consts::{DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID}; use common_error::ext::BoxedError; -use common_meta::cache_invalidator::{ - CacheInvalidator, Context, KvCacheInvalidatorRef, TableMetadataCacheInvalidator, -}; +use common_meta::cache_invalidator::{CacheInvalidator, Context, TableMetadataCacheInvalidatorRef}; use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::error::Result as MetaResult; use common_meta::key::catalog_name::CatalogNameKey; @@ -54,7 +52,7 @@ pub struct KvBackendCatalogManager { // TODO(LFC): Maybe use a real implementation for Standalone mode. // Now we use `NoopKvCacheInvalidator` for Standalone mode. In Standalone mode, the KV backend // is implemented by RaftEngine. Maybe we need a cache for it? - table_metadata_cache_invalidator: TableMetadataCacheInvalidator, + table_metadata_cache_invalidator: TableMetadataCacheInvalidatorRef, partition_manager: PartitionRuleManagerRef, table_metadata_manager: TableMetadataManagerRef, datanode_manager: DatanodeManagerRef, @@ -80,15 +78,13 @@ impl CacheInvalidator for KvBackendCatalogManager { impl KvBackendCatalogManager { pub fn new( backend: KvBackendRef, - backend_cache_invalidator: KvCacheInvalidatorRef, + table_metadata_cache_invalidator: TableMetadataCacheInvalidatorRef, datanode_manager: DatanodeManagerRef, ) -> Arc { Arc::new_cyclic(|me| Self { partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())), table_metadata_manager: Arc::new(TableMetadataManager::new(backend)), - table_metadata_cache_invalidator: TableMetadataCacheInvalidator::new( - backend_cache_invalidator.clone(), - ), + table_metadata_cache_invalidator, datanode_manager, system_catalog: SystemCatalog { catalog_manager: me.clone(), diff --git a/src/common/meta/src/cache_invalidator.rs b/src/common/meta/src/cache_invalidator.rs index 21cb06ec7ee1..09f826583a73 100644 --- a/src/common/meta/src/cache_invalidator.rs +++ b/src/common/meta/src/cache_invalidator.rs @@ -55,6 +55,8 @@ pub trait CacheInvalidator: Send + Sync { pub type CacheInvalidatorRef = Arc; +pub type TableMetadataCacheInvalidatorRef = Arc; + pub struct DummyCacheInvalidator; #[async_trait::async_trait] @@ -68,37 +70,42 @@ impl CacheInvalidator for DummyCacheInvalidator { } } -#[derive(Clone)] -pub struct TableMetadataCacheInvalidator(KvCacheInvalidatorRef); - -impl TableMetadataCacheInvalidator { - pub fn new(kv_cache_invalidator: KvCacheInvalidatorRef) -> Self { - Self(kv_cache_invalidator) - } - - pub async fn invalidate_schema(&self, catalog: &str, schema: &str) { - let key = SchemaNameKey::new(catalog, schema).as_raw_key(); - self.0.invalidate_key(&key).await; - } +#[async_trait::async_trait] +pub trait TableMetadataCacheInvalidator: CacheInvalidator { + async fn invalidate_schema(&self, _catalog: &str, _schema: &str) {} } #[async_trait::async_trait] -impl CacheInvalidator for TableMetadataCacheInvalidator { +impl CacheInvalidator for T +where + T: KvCacheInvalidator, +{ async fn invalidate_table_name(&self, _ctx: &Context, table_name: TableName) -> Result<()> { let key: TableNameKey = (&table_name).into(); - self.0.invalidate_key(&key.as_raw_key()).await; + self.invalidate_key(&key.as_raw_key()).await; Ok(()) } async fn invalidate_table_id(&self, _ctx: &Context, table_id: TableId) -> Result<()> { let key = TableInfoKey::new(table_id); - self.0.invalidate_key(&key.as_raw_key()).await; + self.invalidate_key(&key.as_raw_key()).await; let key = &TableRouteKey { table_id }; - self.0.invalidate_key(&key.as_raw_key()).await; + self.invalidate_key(&key.as_raw_key()).await; Ok(()) } } + +#[async_trait::async_trait] +impl TableMetadataCacheInvalidator for T +where + T: KvCacheInvalidator, +{ + async fn invalidate_schema(&self, catalog: &str, schema: &str) { + let key = SchemaNameKey::new(catalog, schema).as_raw_key(); + self.invalidate_key(&key).await; + } +} diff --git a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs index 409a7409e63b..edd1502c4172 100644 --- a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs +++ b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs @@ -13,9 +13,7 @@ // limitations under the License. use async_trait::async_trait; -use common_meta::cache_invalidator::{ - CacheInvalidator, Context, KvCacheInvalidatorRef, TableMetadataCacheInvalidator, -}; +use common_meta::cache_invalidator::{Context, TableMetadataCacheInvalidatorRef}; use common_meta::error::Result as MetaResult; use common_meta::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, @@ -26,7 +24,7 @@ use futures::future::Either; #[derive(Clone)] pub struct InvalidateTableCacheHandler { - table_metadata_cache_invalidator: TableMetadataCacheInvalidator, + table_metadata_cache_invalidator: TableMetadataCacheInvalidatorRef, } #[async_trait] @@ -86,11 +84,9 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler { } impl InvalidateTableCacheHandler { - pub fn new(backend_cache_invalidator: KvCacheInvalidatorRef) -> Self { + pub fn new(table_metadata_cache_invalidator: TableMetadataCacheInvalidatorRef) -> Self { Self { - table_metadata_cache_invalidator: TableMetadataCacheInvalidator::new( - backend_cache_invalidator, - ), + table_metadata_cache_invalidator, } } }