Skip to content

Commit

Permalink
feat: introduce the CacheRegistry (#3896)
Browse files Browse the repository at this point in the history
* feat: implement the `CacheRegistry`

* refactor: change `CacheInvalidator` signature

* feat: implement `CacheInvalidator`

* feat: add `get_or_register`

* fix: fmt toml

* feat: implement the `CacheRegistryBuilder`

* chore: apply suggestions from CR

* chore: fmt code
  • Loading branch information
WenyXu authored May 10, 2024
1 parent c91132b commit 9d36c31
Show file tree
Hide file tree
Showing 15 changed files with 192 additions and 31 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl CacheInvalidator for TableCacheInvalidator {
async fn invalidate(
&self,
_ctx: &Context,
caches: Vec<CacheIdent>,
caches: &[CacheIdent],
) -> common_meta::error::Result<()> {
for cache in caches {
if let CacheIdent::TableName(table_name) = cache {
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ testing = []
workspace = true

[dependencies]
anymap2 = "0.13.0"
api.workspace = true
async-recursion = "1.0"
async-trait.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

mod container;
mod flow;
mod registry;
mod table;

pub use container::{CacheContainer, Initializer, Invalidator, TokenFilter};
pub use flow::{new_table_flownode_set_cache, TableFlownodeSetCache};
pub use registry::{CacheRegistry, CacheRegistryBuilder, CacheRegistryRef};
pub use table::{
new_table_info_cache, new_table_name_cache, new_table_route_cache, TableInfoCache,
TableInfoCacheRef, TableNameCache, TableNameCacheRef, TableRouteCache, TableRouteCacheRef,
Expand Down
32 changes: 22 additions & 10 deletions src/common/meta/src/cache/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::borrow::Borrow;
use std::hash::Hash;
use std::sync::Arc;

use futures::future::BoxFuture;
use futures::future::{join_all, BoxFuture};
use moka::future::Cache;
use snafu::{OptionExt, ResultExt};

Expand Down Expand Up @@ -68,6 +68,11 @@ where
token_filter,
}
}

/// Returns the `name`.
pub fn name(&self) -> &str {
&self.name
}
}

#[async_trait::async_trait]
Expand All @@ -76,13 +81,15 @@ where
K: Send + Sync,
V: Send + Sync,
{
async fn invalidate(&self, _ctx: &Context, caches: Vec<CacheIdent>) -> Result<()> {
for token in caches
.into_iter()
async fn invalidate(&self, _ctx: &Context, caches: &[CacheIdent]) -> Result<()> {
let tasks = caches
.iter()
.filter(|token| (self.token_filter)(token))
{
(self.invalidator)(&self.cache, &token).await?;
}
.map(|token| (self.invalidator)(&self.cache, token));
join_all(tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
Ok(())
}
}
Expand Down Expand Up @@ -120,9 +127,14 @@ where
{
/// Invalidates cache by [CacheToken].
pub async fn invalidate(&self, caches: &[CacheToken]) -> Result<()> {
for token in caches.iter().filter(|token| (self.token_filter)(token)) {
(self.invalidator)(&self.cache, token).await?;
}
let tasks = caches
.iter()
.filter(|token| (self.token_filter)(token))
.map(|token| (self.invalidator)(&self.cache, token));
join_all(tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
Ok(())
}

Expand Down
139 changes: 139 additions & 0 deletions src/common/meta/src/cache/registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use futures::future::join_all;

use crate::cache_invalidator::{CacheInvalidator, Context};
use crate::error::Result;
use crate::instruction::CacheIdent;

pub type CacheRegistryRef = Arc<CacheRegistry>;

/// [CacheRegistryBuilder] provides ability of
/// - Register the `cache` which implements the [CacheInvalidator] trait into [CacheRegistry].
/// - Build a [CacheRegistry]
#[derive(Default)]
pub struct CacheRegistryBuilder {
registry: CacheRegistry,
}

impl CacheRegistryBuilder {
pub fn add_cache<T: CacheInvalidator + 'static>(mut self, cache: Arc<T>) -> Self {
self.registry.register(cache);
self
}

pub fn build(self) -> CacheRegistry {
self.registry
}
}

/// [CacheRegistry] provides ability of
/// - Get a specific `cache`.
#[derive(Default)]
pub struct CacheRegistry {
indexes: Vec<Arc<dyn CacheInvalidator>>,
registry: anymap2::SendSyncAnyMap,
}

#[async_trait::async_trait]
impl CacheInvalidator for Arc<CacheRegistry> {
async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()> {
let tasks = self
.indexes
.iter()
.map(|invalidator| invalidator.invalidate(ctx, caches));
join_all(tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
Ok(())
}
}

impl CacheRegistry {
/// Sets the value stored in the collection for the type `T`.
/// Returns true if the collection already had a value of type `T`
fn register<T: CacheInvalidator + 'static>(&mut self, cache: Arc<T>) -> bool {
self.indexes.push(cache.clone());
self.registry.insert(cache).is_some()
}

/// Returns __cloned__ the value stored in the collection for the type `T`, if it exists.
pub fn get<T: Send + Sync + Clone + 'static>(&self) -> Option<T> {
self.registry.get().cloned()
}
}

#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;

use moka::future::{Cache, CacheBuilder};

use crate::cache::registry::CacheRegistryBuilder;
use crate::cache::*;
use crate::instruction::CacheIdent;

fn test_cache(name: &str) -> CacheContainer<String, String, CacheIdent> {
let cache: Cache<String, String> = CacheBuilder::new(128).build();
let filter: TokenFilter<CacheIdent> = Box::new(|_| true);
let counter = Arc::new(AtomicI32::new(0));
let moved_counter = counter.clone();
let init: Initializer<String, String> = Arc::new(move |_| {
moved_counter.fetch_add(1, Ordering::Relaxed);
Box::pin(async { Ok(Some("hi".to_string())) })
});
let invalidator: Invalidator<String, String, CacheIdent> =
Box::new(|_, _| Box::pin(async { Ok(()) }));

CacheContainer::new(name.to_string(), cache, invalidator, init, filter)
}

fn test_i32_cache(name: &str) -> CacheContainer<i32, String, CacheIdent> {
let cache: Cache<i32, String> = CacheBuilder::new(128).build();
let filter: TokenFilter<CacheIdent> = Box::new(|_| true);
let counter = Arc::new(AtomicI32::new(0));
let moved_counter = counter.clone();
let init: Initializer<i32, String> = Arc::new(move |_| {
moved_counter.fetch_add(1, Ordering::Relaxed);
Box::pin(async { Ok(Some("foo".to_string())) })
});
let invalidator: Invalidator<i32, String, CacheIdent> =
Box::new(|_, _| Box::pin(async { Ok(()) }));

CacheContainer::new(name.to_string(), cache, invalidator, init, filter)
}

#[tokio::test]
async fn test_register() {
let builder = CacheRegistryBuilder::default();
let i32_cache = Arc::new(test_i32_cache("i32_cache"));
let cache = Arc::new(test_cache("string_cache"));
let registry = builder.add_cache(i32_cache).add_cache(cache).build();

let cache = registry
.get::<Arc<CacheContainer<i32, String, CacheIdent>>>()
.unwrap();
assert_eq!(cache.name(), "i32_cache");

let cache = registry
.get::<Arc<CacheContainer<String, String, CacheIdent>>>()
.unwrap();
assert_eq!(cache.name(), "string_cache");
}
}
20 changes: 10 additions & 10 deletions src/common/meta/src/cache_invalidator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub struct Context {

#[async_trait::async_trait]
pub trait CacheInvalidator: Send + Sync {
async fn invalidate(&self, ctx: &Context, caches: Vec<CacheIdent>) -> Result<()>;
async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()>;
}

pub type CacheInvalidatorRef = Arc<dyn CacheInvalidator>;
Expand All @@ -56,7 +56,7 @@ pub struct DummyCacheInvalidator;

#[async_trait::async_trait]
impl CacheInvalidator for DummyCacheInvalidator {
async fn invalidate(&self, _ctx: &Context, _caches: Vec<CacheIdent>) -> Result<()> {
async fn invalidate(&self, _ctx: &Context, _caches: &[CacheIdent]) -> Result<()> {
Ok(())
}
}
Expand All @@ -80,10 +80,10 @@ impl MultiCacheInvalidator {

#[async_trait::async_trait]
impl CacheInvalidator for MultiCacheInvalidator {
async fn invalidate(&self, ctx: &Context, caches: Vec<CacheIdent>) -> Result<()> {
async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()> {
let invalidators = self.invalidators.read().await;
for invalidator in invalidators.iter() {
invalidator.invalidate(ctx, caches.clone()).await?;
invalidator.invalidate(ctx, caches).await?;
}
Ok(())
}
Expand All @@ -94,25 +94,25 @@ impl<T> CacheInvalidator for T
where
T: KvCacheInvalidator,
{
async fn invalidate(&self, _ctx: &Context, caches: Vec<CacheIdent>) -> Result<()> {
async fn invalidate(&self, _ctx: &Context, caches: &[CacheIdent]) -> Result<()> {
for cache in caches {
match cache {
CacheIdent::TableId(table_id) => {
let key = TableInfoKey::new(table_id);
let key = TableInfoKey::new(*table_id);
self.invalidate_key(&key.to_bytes()).await;

let key = &TableRouteKey { table_id };
let key = TableRouteKey::new(*table_id);
self.invalidate_key(&key.to_bytes()).await;
}
CacheIdent::TableName(table_name) => {
let key: TableNameKey = (&table_name).into();
let key: TableNameKey = table_name.into();
self.invalidate_key(&key.to_bytes()).await
}
CacheIdent::SchemaName(schema_name) => {
let key: SchemaNameKey = (&schema_name).into();
let key: SchemaNameKey = schema_name.into();
self.invalidate_key(&key.to_bytes()).await;
}
CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) => {
&CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) => {
// TODO(weny): implements it
unimplemented!()
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/alter_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl AlterLogicalTablesProcedure {

self.context
.cache_invalidator
.invalidate(&ctx, to_invalidate)
.invalidate(&ctx, &to_invalidate)
.await?;
Ok(Status::done())
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl AlterTableProcedure {
};

cache_invalidator
.invalidate(&Context::default(), cache_keys)
.invalidate(&Context::default(), &cache_keys)
.await?;

Ok(Status::done())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl CreateLogicalTablesProcedure {
.cache_invalidator
.invalidate(
&Context::default(),
vec![
&[
CacheIdent::TableId(self.data.physical_table_id),
CacheIdent::TableName(physical_table_name),
],
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/drop_database/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl DropMetadataBroadcast {
cache_invalidator
.invalidate(
&ctx,
vec![CacheIdent::SchemaName(SchemaName {
&[CacheIdent::SchemaName(SchemaName {
catalog_name: db_ctx.catalog.clone(),
schema_name: db_ctx.schema.clone(),
})],
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/drop_table/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl DropTableExecutor {
cache_invalidator
.invalidate(
&ctx,
vec![
&[
CacheIdent::TableName(self.table.table_ref().into()),
CacheIdent::TableId(self.table_id),
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
// Invalidate local cache always success
let _ = self
.cache_invalidator
.invalidate(&Context::default(), caches)
.invalidate(&Context::default(), &caches)
.await?;

Ok(HandleControl::Done)
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/cache_invalidator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ impl MetasrvCacheInvalidator {

#[async_trait]
impl CacheInvalidator for MetasrvCacheInvalidator {
async fn invalidate(&self, ctx: &Context, caches: Vec<CacheIdent>) -> MetaResult<()> {
let instruction = Instruction::InvalidateCaches(caches);
async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> MetaResult<()> {
let instruction = Instruction::InvalidateCaches(caches.to_vec());
self.broadcast(ctx, instruction).await
}
}
4 changes: 2 additions & 2 deletions src/operator/src/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ impl StatementExecutor {
self.cache_invalidator
.invalidate(
&Context::default(),
vec![
&[
CacheIdent::TableId(table_id),
CacheIdent::TableName(table_name.clone()),
],
Expand Down Expand Up @@ -619,7 +619,7 @@ impl StatementExecutor {

// Invalidates local cache ASAP.
self.cache_invalidator
.invalidate(&Context::default(), invalidate_keys)
.invalidate(&Context::default(), &invalidate_keys)
.await
.context(error::InvalidateTableCacheSnafu)?;

Expand Down

0 comments on commit 9d36c31

Please sign in to comment.