Skip to content

Commit

Permalink
refactor: unify table metadata cache invalidator
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Sep 19, 2023
1 parent 339e12c commit 2c04293
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 91 deletions.
17 changes: 0 additions & 17 deletions src/catalog/src/kvbackend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

pub use client::{CachedMetaKvBackend, MetaKvBackend};

mod client;
Expand All @@ -22,18 +20,3 @@ mod manager;
#[cfg(feature = "testing")]
pub mod mock;
pub use manager::KvBackendCatalogManager;

/// KvBackend cache invalidator
#[async_trait::async_trait]
pub trait KvCacheInvalidator: Send + Sync {
async fn invalidate_key(&self, key: &[u8]);
}

pub type KvCacheInvalidatorRef = Arc<dyn KvCacheInvalidator>;

pub struct DummyKvCacheInvalidator;

#[async_trait::async_trait]
impl KvCacheInvalidator for DummyKvCacheInvalidator {
async fn invalidate_key(&self, _key: &[u8]) {}
}
2 changes: 1 addition & 1 deletion src/catalog/src/kvbackend/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use std::time::Duration;

use common_error::ext::BoxedError;
use common_meta::cache_invalidator::KvCacheInvalidator;
use common_meta::error::Error::{CacheNotGet, GetKvCache};
use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, Result};
use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService};
Expand All @@ -33,7 +34,6 @@ use meta_client::client::MetaClient;
use moka::future::{Cache, CacheBuilder};
use snafu::{OptionExt, ResultExt};

use super::KvCacheInvalidator;
use crate::metrics::{METRIC_CATALOG_KV_GET, METRIC_CATALOG_KV_REMOTE_GET};

const CACHE_MAX_CAPACITY: u64 = 10000;
Expand Down
56 changes: 17 additions & 39 deletions src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@ use std::sync::{Arc, Weak};

use common_catalog::consts::{DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::{CacheInvalidator, Context};
use common_meta::cache_invalidator::{
CacheInvalidator, Context, KvCacheInvalidatorRef, TableMetadataCacheInvalidator,
};
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::error::Result as MetaResult;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::table_info::TableInfoKey;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::table_route::TableRouteKey;
use common_meta::key::{TableMetaKey, TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::table_name::TableName;
use common_telemetry::debug;
use futures_util::TryStreamExt;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use snafu::prelude::*;
Expand All @@ -43,7 +42,6 @@ use crate::error::{
TableMetadataManagerSnafu,
};
use crate::information_schema::{InformationSchemaProvider, COLUMNS, TABLES};
use crate::kvbackend::KvCacheInvalidatorRef;
use crate::CatalogManager;

/// Access all existing catalog, schema and tables.
Expand All @@ -57,6 +55,7 @@ pub struct KvBackendCatalogManager {
// Now we use `NoopKvCacheInvalidator` for Standalone mode. In Standalone mode, the KV backend
// is implemented by RaftEngine. Maybe we need a cache for it?
backend_cache_invalidator: KvCacheInvalidatorRef,
table_metadata_cache_invalidator: TableMetadataCacheInvalidator,
partition_manager: PartitionRuleManagerRef,
table_metadata_manager: TableMetadataManagerRef,
datanode_manager: DatanodeManagerRef,
Expand All @@ -66,40 +65,16 @@ pub struct KvBackendCatalogManager {

#[async_trait::async_trait]
impl CacheInvalidator for KvBackendCatalogManager {
async fn invalidate_table_name(&self, _ctx: &Context, table_name: TableName) -> MetaResult<()> {
let key: TableNameKey = (&table_name).into();

self.backend_cache_invalidator
.invalidate_key(&key.as_raw_key())
.await;
debug!(
"invalidated cache key: {}",
String::from_utf8_lossy(&key.as_raw_key())
);

Ok(())
async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> {
self.table_metadata_cache_invalidator
.invalidate_table_name(ctx, table_name)
.await
}

async fn invalidate_table_id(&self, _ctx: &Context, table_id: TableId) -> MetaResult<()> {
let key = TableInfoKey::new(table_id);
self.backend_cache_invalidator
.invalidate_key(&key.as_raw_key())
.await;
debug!(
"invalidated cache key: {}",
String::from_utf8_lossy(&key.as_raw_key())
);

let key = &TableRouteKey { table_id };
self.backend_cache_invalidator
.invalidate_key(&key.as_raw_key())
.await;
debug!(
"invalidated cache key: {}",
String::from_utf8_lossy(&key.as_raw_key())
);

Ok(())
async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> {
self.table_metadata_cache_invalidator
.invalidate_table_id(ctx, table_id)
.await
}
}

Expand All @@ -110,9 +85,12 @@ impl KvBackendCatalogManager {
datanode_manager: DatanodeManagerRef,
) -> Arc<Self> {
Arc::new_cyclic(|me| Self {
backend_cache_invalidator,
partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())),
table_metadata_manager: Arc::new(TableMetadataManager::new(backend)),
table_metadata_cache_invalidator: TableMetadataCacheInvalidator::new(
backend_cache_invalidator.clone(),
),
backend_cache_invalidator,
datanode_manager,
system_catalog: SystemCatalog {
catalog_manager: me.clone(),
Expand All @@ -134,7 +112,7 @@ impl KvBackendCatalogManager {

pub async fn invalidate_schema(&self, catalog: &str, schema: &str) {
let key = SchemaNameKey::new(catalog, schema).as_raw_key();

// TODO(weny): refactors to `TableMetadataCacheInvalidator`
self.backend_cache_invalidator.invalidate_key(&key).await;
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@

use std::sync::Arc;

use catalog::kvbackend::{DummyKvCacheInvalidator, KvBackendCatalogManager};
use catalog::kvbackend::KvBackendCatalogManager;
use catalog::CatalogManagerRef;
use clap::Parser;
use common_base::Plugins;
use common_config::{kv_store_dir, KvStoreConfig, WalConfig};
use common_meta::cache_invalidator::DummyKvCacheInvalidator;
use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureManagerRef;
use common_telemetry::info;
Expand Down
62 changes: 62 additions & 0 deletions src/common/meta/src/cache_invalidator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,31 @@

use std::sync::Arc;

use common_telemetry::debug;
use table::metadata::TableId;

use crate::error::Result;
use crate::key::table_info::TableInfoKey;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteKey;
use crate::key::TableMetaKey;
use crate::table_name::TableName;

/// KvBackend cache invalidator
#[async_trait::async_trait]
pub trait KvCacheInvalidator: Send + Sync {
async fn invalidate_key(&self, key: &[u8]);
}

pub type KvCacheInvalidatorRef = Arc<dyn KvCacheInvalidator>;

pub struct DummyKvCacheInvalidator;

#[async_trait::async_trait]
impl KvCacheInvalidator for DummyKvCacheInvalidator {
async fn invalidate_key(&self, _key: &[u8]) {}
}

/// Places context of invalidating cache. e.g., span id, trace id etc.
#[derive(Default)]
pub struct Context {
Expand Down Expand Up @@ -47,3 +67,45 @@ impl CacheInvalidator for DummyCacheInvalidator {
Ok(())
}
}

#[derive(Clone)]
pub struct TableMetadataCacheInvalidator(KvCacheInvalidatorRef);

impl TableMetadataCacheInvalidator {
pub fn new(kv_cache_invalidator: KvCacheInvalidatorRef) -> Self {
Self(kv_cache_invalidator)
}
}

#[async_trait::async_trait]
impl CacheInvalidator for TableMetadataCacheInvalidator {
async fn invalidate_table_name(&self, _ctx: &Context, table_name: TableName) -> Result<()> {
let key: TableNameKey = (&table_name).into();

self.0.invalidate_key(&key.as_raw_key()).await;
debug!(
"invalidated cache key: {}",
String::from_utf8_lossy(&key.as_raw_key())
);

Ok(())
}

async fn invalidate_table_id(&self, _ctx: &Context, table_id: TableId) -> Result<()> {
let key = TableInfoKey::new(table_id);
self.0.invalidate_key(&key.as_raw_key()).await;
debug!(
"invalidated cache key: {}",
String::from_utf8_lossy(&key.as_raw_key())
);

let key = &TableRouteKey { table_id };
self.0.invalidate_key(&key.as_raw_key()).await;
debug!(
"invalidated cache key: {}",
String::from_utf8_lossy(&key.as_raw_key())
);

Ok(())
}
}
51 changes: 20 additions & 31 deletions src/frontend/src/heartbeat/handler/invalidate_table_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,20 @@
// limitations under the License.

use async_trait::async_trait;
use catalog::kvbackend::KvCacheInvalidatorRef;
use common_meta::cache_invalidator::{
CacheInvalidator, Context, KvCacheInvalidatorRef, TableMetadataCacheInvalidator,
};
use common_meta::error::Result as MetaResult;
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::key::table_info::TableInfoKey;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::table_route::TableRouteKey;
use common_meta::key::TableMetaKey;
use common_meta::table_name::TableName;
use common_telemetry::error;
use futures::future::Either;
use table::metadata::TableId;

#[derive(Clone)]
pub struct InvalidateTableCacheHandler {
backend_cache_invalidator: KvCacheInvalidatorRef,
table_metadata_cache_invalidator: TableMetadataCacheInvalidator,
}

#[async_trait]
Expand All @@ -45,24 +41,31 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {

async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
let mailbox = ctx.mailbox.clone();
let self_ref = self.clone();
let cache_invalidator = self.table_metadata_cache_invalidator.clone();

let (meta, invalidator) = match ctx.incoming_message.take() {
Some((meta, Instruction::InvalidateTableIdCache(table_id))) => (
meta,
Either::Left(async move { self_ref.invalidate_table_id_cache(table_id).await }),
Either::Left(async move {
cache_invalidator
.invalidate_table_id(&Context::default(), table_id)
.await
}),
),
Some((meta, Instruction::InvalidateTableNameCache(table_name))) => (
meta,
Either::Right(
async move { self_ref.invalidate_table_name_cache(table_name).await },
),
Either::Right(async move {
cache_invalidator
.invalidate_table_name(&Context::default(), table_name)
.await
}),
),
_ => unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'"),
};

let _handle = common_runtime::spawn_bg(async move {
invalidator.await;
// Local cache invalidation always succeeds.
let _ = invalidator.await;

if let Err(e) = mailbox
.send((
Expand All @@ -85,23 +88,9 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
impl InvalidateTableCacheHandler {
pub fn new(backend_cache_invalidator: KvCacheInvalidatorRef) -> Self {
Self {
backend_cache_invalidator,
table_metadata_cache_invalidator: TableMetadataCacheInvalidator::new(
backend_cache_invalidator,
),
}
}

async fn invalidate_table_id_cache(&self, table_id: TableId) {
self.backend_cache_invalidator
.invalidate_key(&TableInfoKey::new(table_id).as_raw_key())
.await;

self.backend_cache_invalidator
.invalidate_key(&TableRouteKey { table_id }.as_raw_key())
.await;
}

async fn invalidate_table_name_cache(&self, table_name: TableName) {
self.backend_cache_invalidator
.invalidate_key(&TableNameKey::from(&table_name).as_raw_key())
.await;
}
}
2 changes: 1 addition & 1 deletion src/frontend/src/heartbeat/handler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use api::v1::meta::HeartbeatResponse;
use catalog::kvbackend::KvCacheInvalidator;
use common_meta::cache_invalidator::KvCacheInvalidator;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};
Expand Down
3 changes: 2 additions & 1 deletion tests-integration/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@

use std::sync::Arc;

use catalog::kvbackend::{DummyKvCacheInvalidator, KvBackendCatalogManager};
use catalog::kvbackend::KvBackendCatalogManager;
use common_base::Plugins;
use common_config::KvStoreConfig;
use common_meta::cache_invalidator::DummyKvCacheInvalidator;
use common_procedure::options::ProcedureConfig;
use datanode::config::DatanodeOptions;
use datanode::datanode::DatanodeBuilder;
Expand Down

0 comments on commit 2c04293

Please sign in to comment.