diff --git a/src/catalog/src/kvbackend.rs b/src/catalog/src/kvbackend.rs index ecc1b72b488b..f899586e7ee6 100644 --- a/src/catalog/src/kvbackend.rs +++ b/src/catalog/src/kvbackend.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - pub use client::{CachedMetaKvBackend, MetaKvBackend}; mod client; @@ -22,18 +20,3 @@ mod manager; #[cfg(feature = "testing")] pub mod mock; pub use manager::KvBackendCatalogManager; - -/// KvBackend cache invalidator -#[async_trait::async_trait] -pub trait KvCacheInvalidator: Send + Sync { - async fn invalidate_key(&self, key: &[u8]); -} - -pub type KvCacheInvalidatorRef = Arc; - -pub struct DummyKvCacheInvalidator; - -#[async_trait::async_trait] -impl KvCacheInvalidator for DummyKvCacheInvalidator { - async fn invalidate_key(&self, _key: &[u8]) {} -} diff --git a/src/catalog/src/kvbackend/client.rs b/src/catalog/src/kvbackend/client.rs index aed8be54f100..ac63e5a0b2e9 100644 --- a/src/catalog/src/kvbackend/client.rs +++ b/src/catalog/src/kvbackend/client.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::time::Duration; use common_error::ext::BoxedError; +use common_meta::cache_invalidator::KvCacheInvalidator; use common_meta::error::Error::{CacheNotGet, GetKvCache}; use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, Result}; use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService}; @@ -28,12 +29,11 @@ use common_meta::rpc::store::{ RangeRequest, RangeResponse, }; use common_meta::rpc::KeyValue; -use common_telemetry::timer; +use common_telemetry::{debug, timer}; use meta_client::client::MetaClient; use moka::future::{Cache, CacheBuilder}; use snafu::{OptionExt, ResultExt}; -use super::KvCacheInvalidator; use crate::metrics::{METRIC_CATALOG_KV_GET, METRIC_CATALOG_KV_REMOTE_GET}; const CACHE_MAX_CAPACITY: u64 = 10000; @@ -197,7 +197,8 @@ impl KvBackend for CachedMetaKvBackend { #[async_trait::async_trait] impl KvCacheInvalidator for CachedMetaKvBackend { async fn invalidate_key(&self, key: &[u8]) { - self.cache.invalidate(key).await + self.cache.invalidate(key).await; + debug!("invalidated cache key: {}", String::from_utf8_lossy(key)); } } diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 80f9f22e3536..c434f1c91841 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -18,18 +18,17 @@ use std::sync::{Arc, Weak}; use common_catalog::consts::{DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID}; use common_error::ext::BoxedError; -use common_meta::cache_invalidator::{CacheInvalidator, Context}; +use common_meta::cache_invalidator::{ + CacheInvalidator, Context, KvCacheInvalidatorRef, TableMetadataCacheInvalidator, +}; use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::error::Result as MetaResult; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; -use common_meta::key::table_info::TableInfoKey; use common_meta::key::table_name::TableNameKey; -use common_meta::key::table_route::TableRouteKey; -use common_meta::key::{TableMetaKey, TableMetadataManager, TableMetadataManagerRef}; +use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::table_name::TableName; -use common_telemetry::debug; use futures_util::TryStreamExt; use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use snafu::prelude::*; @@ -43,7 +42,6 @@ use crate::error::{ TableMetadataManagerSnafu, }; use crate::information_schema::{InformationSchemaProvider, COLUMNS, TABLES}; -use crate::kvbackend::KvCacheInvalidatorRef; use crate::CatalogManager; /// Access all existing catalog, schema and tables. @@ -56,7 +54,7 @@ pub struct KvBackendCatalogManager { // TODO(LFC): Maybe use a real implementation for Standalone mode. // Now we use `NoopKvCacheInvalidator` for Standalone mode. In Standalone mode, the KV backend // is implemented by RaftEngine. Maybe we need a cache for it? - backend_cache_invalidator: KvCacheInvalidatorRef, + table_metadata_cache_invalidator: TableMetadataCacheInvalidator, partition_manager: PartitionRuleManagerRef, table_metadata_manager: TableMetadataManagerRef, datanode_manager: DatanodeManagerRef, @@ -66,40 +64,16 @@ pub struct KvBackendCatalogManager { #[async_trait::async_trait] impl CacheInvalidator for KvBackendCatalogManager { - async fn invalidate_table_name(&self, _ctx: &Context, table_name: TableName) -> MetaResult<()> { - let key: TableNameKey = (&table_name).into(); - - self.backend_cache_invalidator - .invalidate_key(&key.as_raw_key()) - .await; - debug!( - "invalidated cache key: {}", - String::from_utf8_lossy(&key.as_raw_key()) - ); - - Ok(()) + async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> { + self.table_metadata_cache_invalidator + .invalidate_table_name(ctx, table_name) + .await } - async fn invalidate_table_id(&self, _ctx: &Context, table_id: TableId) -> MetaResult<()> { - let key = TableInfoKey::new(table_id); - self.backend_cache_invalidator - .invalidate_key(&key.as_raw_key()) - .await; - debug!( - "invalidated cache key: {}", - String::from_utf8_lossy(&key.as_raw_key()) - ); - - let key = &TableRouteKey { table_id }; - self.backend_cache_invalidator - .invalidate_key(&key.as_raw_key()) - .await; - debug!( - "invalidated cache key: {}", - String::from_utf8_lossy(&key.as_raw_key()) - ); - - Ok(()) + async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> { + self.table_metadata_cache_invalidator + .invalidate_table_id(ctx, table_id) + .await } } @@ -110,9 +84,11 @@ impl KvBackendCatalogManager { datanode_manager: DatanodeManagerRef, ) -> Arc { Arc::new_cyclic(|me| Self { - backend_cache_invalidator, partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())), table_metadata_manager: Arc::new(TableMetadataManager::new(backend)), + table_metadata_cache_invalidator: TableMetadataCacheInvalidator::new( + backend_cache_invalidator.clone(), + ), datanode_manager, system_catalog: SystemCatalog { catalog_manager: me.clone(), @@ -133,9 +109,9 @@ impl KvBackendCatalogManager { } pub async fn invalidate_schema(&self, catalog: &str, schema: &str) { - let key = SchemaNameKey::new(catalog, schema).as_raw_key(); - - self.backend_cache_invalidator.invalidate_key(&key).await; + self.table_metadata_cache_invalidator + .invalidate_schema(catalog, schema) + .await } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 15a8bc403aa7..b47e50c847c1 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -14,11 +14,12 @@ use std::sync::Arc; -use catalog::kvbackend::{DummyKvCacheInvalidator, KvBackendCatalogManager}; +use catalog::kvbackend::KvBackendCatalogManager; use catalog::CatalogManagerRef; use clap::Parser; use common_base::Plugins; use common_config::{kv_store_dir, KvStoreConfig, WalConfig}; +use common_meta::cache_invalidator::DummyKvCacheInvalidator; use common_meta::kv_backend::KvBackendRef; use common_procedure::ProcedureManagerRef; use common_telemetry::info; diff --git a/src/common/meta/src/cache_invalidator.rs b/src/common/meta/src/cache_invalidator.rs index 4326ec1da622..21cb06ec7ee1 100644 --- a/src/common/meta/src/cache_invalidator.rs +++ b/src/common/meta/src/cache_invalidator.rs @@ -17,8 +17,28 @@ use std::sync::Arc; use table::metadata::TableId; use crate::error::Result; +use crate::key::schema_name::SchemaNameKey; +use crate::key::table_info::TableInfoKey; +use crate::key::table_name::TableNameKey; +use crate::key::table_route::TableRouteKey; +use crate::key::TableMetaKey; use crate::table_name::TableName; +/// KvBackend cache invalidator +#[async_trait::async_trait] +pub trait KvCacheInvalidator: Send + Sync { + async fn invalidate_key(&self, key: &[u8]); +} + +pub type KvCacheInvalidatorRef = Arc; + +pub struct DummyKvCacheInvalidator; + +#[async_trait::async_trait] +impl KvCacheInvalidator for DummyKvCacheInvalidator { + async fn invalidate_key(&self, _key: &[u8]) {} +} + /// Places context of invalidating cache. e.g., span id, trace id etc. #[derive(Default)] pub struct Context { @@ -47,3 +67,38 @@ impl CacheInvalidator for DummyCacheInvalidator { Ok(()) } } + +#[derive(Clone)] +pub struct TableMetadataCacheInvalidator(KvCacheInvalidatorRef); + +impl TableMetadataCacheInvalidator { + pub fn new(kv_cache_invalidator: KvCacheInvalidatorRef) -> Self { + Self(kv_cache_invalidator) + } + + pub async fn invalidate_schema(&self, catalog: &str, schema: &str) { + let key = SchemaNameKey::new(catalog, schema).as_raw_key(); + self.0.invalidate_key(&key).await; + } +} + +#[async_trait::async_trait] +impl CacheInvalidator for TableMetadataCacheInvalidator { + async fn invalidate_table_name(&self, _ctx: &Context, table_name: TableName) -> Result<()> { + let key: TableNameKey = (&table_name).into(); + + self.0.invalidate_key(&key.as_raw_key()).await; + + Ok(()) + } + + async fn invalidate_table_id(&self, _ctx: &Context, table_id: TableId) -> Result<()> { + let key = TableInfoKey::new(table_id); + self.0.invalidate_key(&key.as_raw_key()).await; + + let key = &TableRouteKey { table_id }; + self.0.invalidate_key(&key.as_raw_key()).await; + + Ok(()) + } +} diff --git a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs index 5df9cb0a112f..409a7409e63b 100644 --- a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs +++ b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs @@ -13,24 +13,20 @@ // limitations under the License. use async_trait::async_trait; -use catalog::kvbackend::KvCacheInvalidatorRef; +use common_meta::cache_invalidator::{ + CacheInvalidator, Context, KvCacheInvalidatorRef, TableMetadataCacheInvalidator, +}; use common_meta::error::Result as MetaResult; use common_meta::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, }; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; -use common_meta::key::table_info::TableInfoKey; -use common_meta::key::table_name::TableNameKey; -use common_meta::key::table_route::TableRouteKey; -use common_meta::key::TableMetaKey; -use common_meta::table_name::TableName; use common_telemetry::error; use futures::future::Either; -use table::metadata::TableId; #[derive(Clone)] pub struct InvalidateTableCacheHandler { - backend_cache_invalidator: KvCacheInvalidatorRef, + table_metadata_cache_invalidator: TableMetadataCacheInvalidator, } #[async_trait] @@ -45,24 +41,31 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler { async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult { let mailbox = ctx.mailbox.clone(); - let self_ref = self.clone(); + let cache_invalidator = self.table_metadata_cache_invalidator.clone(); let (meta, invalidator) = match ctx.incoming_message.take() { Some((meta, Instruction::InvalidateTableIdCache(table_id))) => ( meta, - Either::Left(async move { self_ref.invalidate_table_id_cache(table_id).await }), + Either::Left(async move { + cache_invalidator + .invalidate_table_id(&Context::default(), table_id) + .await + }), ), Some((meta, Instruction::InvalidateTableNameCache(table_name))) => ( meta, - Either::Right( - async move { self_ref.invalidate_table_name_cache(table_name).await }, - ), + Either::Right(async move { + cache_invalidator + .invalidate_table_name(&Context::default(), table_name) + .await + }), ), _ => unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'"), }; let _handle = common_runtime::spawn_bg(async move { - invalidator.await; + // Local cache invalidation always succeeds. + let _ = invalidator.await; if let Err(e) = mailbox .send(( @@ -85,23 +88,9 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler { impl InvalidateTableCacheHandler { pub fn new(backend_cache_invalidator: KvCacheInvalidatorRef) -> Self { Self { - backend_cache_invalidator, + table_metadata_cache_invalidator: TableMetadataCacheInvalidator::new( + backend_cache_invalidator, + ), } } - - async fn invalidate_table_id_cache(&self, table_id: TableId) { - self.backend_cache_invalidator - .invalidate_key(&TableInfoKey::new(table_id).as_raw_key()) - .await; - - self.backend_cache_invalidator - .invalidate_key(&TableRouteKey { table_id }.as_raw_key()) - .await; - } - - async fn invalidate_table_name_cache(&self, table_name: TableName) { - self.backend_cache_invalidator - .invalidate_key(&TableNameKey::from(&table_name).as_raw_key()) - .await; - } } diff --git a/src/frontend/src/heartbeat/handler/tests.rs b/src/frontend/src/heartbeat/handler/tests.rs index abae860eff81..ba9e9541439b 100644 --- a/src/frontend/src/heartbeat/handler/tests.rs +++ b/src/frontend/src/heartbeat/handler/tests.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; use api::v1::meta::HeartbeatResponse; -use catalog::kvbackend::KvCacheInvalidator; +use common_meta::cache_invalidator::KvCacheInvalidator; use common_meta::heartbeat::handler::{ HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, }; diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 7f345d42ee58..a4748772bacd 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -14,9 +14,10 @@ use std::sync::Arc; -use catalog::kvbackend::{DummyKvCacheInvalidator, KvBackendCatalogManager}; +use catalog::kvbackend::KvBackendCatalogManager; use common_base::Plugins; use common_config::KvStoreConfig; +use common_meta::cache_invalidator::DummyKvCacheInvalidator; use common_procedure::options::ProcedureConfig; use datanode::config::DatanodeOptions; use datanode::datanode::DatanodeBuilder;