Skip to content

Commit

Permalink
refactor: refactor cache invalidator
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Oct 8, 2023
1 parent 17b385a commit 94496a8
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 32 deletions.
12 changes: 4 additions & 8 deletions src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ use std::sync::{Arc, Weak};

use common_catalog::consts::{DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::{
CacheInvalidator, Context, KvCacheInvalidatorRef, TableMetadataCacheInvalidator,
};
use common_meta::cache_invalidator::{CacheInvalidator, Context, TableMetadataCacheInvalidatorRef};
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::error::Result as MetaResult;
use common_meta::key::catalog_name::CatalogNameKey;
Expand Down Expand Up @@ -54,7 +52,7 @@ pub struct KvBackendCatalogManager {
// TODO(LFC): Maybe use a real implementation for Standalone mode.
// Now we use `NoopKvCacheInvalidator` for Standalone mode. In Standalone mode, the KV backend
// is implemented by RaftEngine. Maybe we need a cache for it?
table_metadata_cache_invalidator: TableMetadataCacheInvalidator,
table_metadata_cache_invalidator: TableMetadataCacheInvalidatorRef,
partition_manager: PartitionRuleManagerRef,
table_metadata_manager: TableMetadataManagerRef,
datanode_manager: DatanodeManagerRef,
Expand All @@ -80,15 +78,13 @@ impl CacheInvalidator for KvBackendCatalogManager {
impl KvBackendCatalogManager {
pub fn new(
backend: KvBackendRef,
backend_cache_invalidator: KvCacheInvalidatorRef,
table_metadata_cache_invalidator: TableMetadataCacheInvalidatorRef,
datanode_manager: DatanodeManagerRef,
) -> Arc<Self> {
Arc::new_cyclic(|me| Self {
partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())),
table_metadata_manager: Arc::new(TableMetadataManager::new(backend)),
table_metadata_cache_invalidator: TableMetadataCacheInvalidator::new(
backend_cache_invalidator.clone(),
),
table_metadata_cache_invalidator,
datanode_manager,
system_catalog: SystemCatalog {
catalog_manager: me.clone(),
Expand Down
39 changes: 23 additions & 16 deletions src/common/meta/src/cache_invalidator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub trait CacheInvalidator: Send + Sync {

pub type CacheInvalidatorRef = Arc<dyn CacheInvalidator>;

pub type TableMetadataCacheInvalidatorRef = Arc<dyn TableMetadataCacheInvalidator>;

pub struct DummyCacheInvalidator;

#[async_trait::async_trait]
Expand All @@ -68,37 +70,42 @@ impl CacheInvalidator for DummyCacheInvalidator {
}
}

#[derive(Clone)]
pub struct TableMetadataCacheInvalidator(KvCacheInvalidatorRef);

impl TableMetadataCacheInvalidator {
pub fn new(kv_cache_invalidator: KvCacheInvalidatorRef) -> Self {
Self(kv_cache_invalidator)
}

pub async fn invalidate_schema(&self, catalog: &str, schema: &str) {
let key = SchemaNameKey::new(catalog, schema).as_raw_key();
self.0.invalidate_key(&key).await;
}
#[async_trait::async_trait]
pub trait TableMetadataCacheInvalidator: CacheInvalidator {
async fn invalidate_schema(&self, _catalog: &str, _schema: &str) {}
}

#[async_trait::async_trait]
impl CacheInvalidator for TableMetadataCacheInvalidator {
impl<T> CacheInvalidator for T
where
T: KvCacheInvalidator,
{
async fn invalidate_table_name(&self, _ctx: &Context, table_name: TableName) -> Result<()> {
let key: TableNameKey = (&table_name).into();

self.0.invalidate_key(&key.as_raw_key()).await;
self.invalidate_key(&key.as_raw_key()).await;

Ok(())
}

async fn invalidate_table_id(&self, _ctx: &Context, table_id: TableId) -> Result<()> {
let key = TableInfoKey::new(table_id);
self.0.invalidate_key(&key.as_raw_key()).await;
self.invalidate_key(&key.as_raw_key()).await;

let key = &TableRouteKey { table_id };
self.0.invalidate_key(&key.as_raw_key()).await;
self.invalidate_key(&key.as_raw_key()).await;

Ok(())
}
}

#[async_trait::async_trait]
impl<T> TableMetadataCacheInvalidator for T
where
T: KvCacheInvalidator,
{
async fn invalidate_schema(&self, catalog: &str, schema: &str) {
let key = SchemaNameKey::new(catalog, schema).as_raw_key();
self.invalidate_key(&key).await;
}
}
12 changes: 4 additions & 8 deletions src/frontend/src/heartbeat/handler/invalidate_table_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
// limitations under the License.

use async_trait::async_trait;
use common_meta::cache_invalidator::{
CacheInvalidator, Context, KvCacheInvalidatorRef, TableMetadataCacheInvalidator,
};
use common_meta::cache_invalidator::{Context, TableMetadataCacheInvalidatorRef};
use common_meta::error::Result as MetaResult;
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
Expand All @@ -26,7 +24,7 @@ use futures::future::Either;

#[derive(Clone)]
pub struct InvalidateTableCacheHandler {
table_metadata_cache_invalidator: TableMetadataCacheInvalidator,
table_metadata_cache_invalidator: TableMetadataCacheInvalidatorRef,
}

#[async_trait]
Expand Down Expand Up @@ -86,11 +84,9 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
}

impl InvalidateTableCacheHandler {
pub fn new(backend_cache_invalidator: KvCacheInvalidatorRef) -> Self {
pub fn new(table_metadata_cache_invalidator: TableMetadataCacheInvalidatorRef) -> Self {
Self {
table_metadata_cache_invalidator: TableMetadataCacheInvalidator::new(
backend_cache_invalidator,
),
table_metadata_cache_invalidator,
}
}
}

0 comments on commit 94496a8

Please sign in to comment.