diff --git a/Cargo.lock b/Cargo.lock index f0b0e9305728..cfb73999c47b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -206,6 +206,12 @@ version = "1.0.0-beta.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f1f8f5a6f3d50d89e3797d7593a50f96bb2aaa20ca0cc7be1fb673232c91d72" +[[package]] +name = "anymap2" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c" + [[package]] name = "api" version = "0.7.2" @@ -1906,6 +1912,7 @@ dependencies = [ name = "common-meta" version = "0.7.2" dependencies = [ + "anymap2", "api", "async-recursion", "async-trait", diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 4694d9aae8bb..ca2305899b22 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -82,7 +82,7 @@ impl CacheInvalidator for TableCacheInvalidator { async fn invalidate( &self, _ctx: &Context, - caches: Vec, + caches: &[CacheIdent], ) -> common_meta::error::Result<()> { for cache in caches { if let CacheIdent::TableName(table_name) = cache { diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 78fc193e6291..70faabb7ee75 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -11,6 +11,7 @@ testing = [] workspace = true [dependencies] +anymap2 = "0.13.0" api.workspace = true async-recursion = "1.0" async-trait.workspace = true diff --git a/src/common/meta/src/cache.rs b/src/common/meta/src/cache.rs index 26a6ea2c580a..49c064ab9733 100644 --- a/src/common/meta/src/cache.rs +++ b/src/common/meta/src/cache.rs @@ -14,10 +14,12 @@ mod container; mod flow; +mod registry; mod table; pub use container::{CacheContainer, Initializer, Invalidator, TokenFilter}; pub use flow::{new_table_flownode_set_cache, TableFlownodeSetCache}; +pub use registry::{CacheRegistry, CacheRegistryBuilder, CacheRegistryRef}; pub use table::{ new_table_info_cache, new_table_name_cache, new_table_route_cache, TableInfoCache, TableInfoCacheRef, TableNameCache, TableNameCacheRef, TableRouteCache, TableRouteCacheRef, diff --git a/src/common/meta/src/cache/container.rs b/src/common/meta/src/cache/container.rs index 8c85b3851d7a..e0bf9c27043c 100644 --- a/src/common/meta/src/cache/container.rs +++ b/src/common/meta/src/cache/container.rs @@ -16,7 +16,7 @@ use std::borrow::Borrow; use std::hash::Hash; use std::sync::Arc; -use futures::future::BoxFuture; +use futures::future::{join_all, BoxFuture}; use moka::future::Cache; use snafu::{OptionExt, ResultExt}; @@ -68,6 +68,11 @@ where token_filter, } } + + /// Returns the `name`. + pub fn name(&self) -> &str { + &self.name + } } #[async_trait::async_trait] @@ -76,13 +81,15 @@ where K: Send + Sync, V: Send + Sync, { - async fn invalidate(&self, _ctx: &Context, caches: Vec) -> Result<()> { - for token in caches - .into_iter() + async fn invalidate(&self, _ctx: &Context, caches: &[CacheIdent]) -> Result<()> { + let tasks = caches + .iter() .filter(|token| (self.token_filter)(token)) - { - (self.invalidator)(&self.cache, &token).await?; - } + .map(|token| (self.invalidator)(&self.cache, token)); + join_all(tasks) + .await + .into_iter() + .collect::>>()?; Ok(()) } } @@ -120,9 +127,14 @@ where { /// Invalidates cache by [CacheToken]. pub async fn invalidate(&self, caches: &[CacheToken]) -> Result<()> { - for token in caches.iter().filter(|token| (self.token_filter)(token)) { - (self.invalidator)(&self.cache, token).await?; - } + let tasks = caches + .iter() + .filter(|token| (self.token_filter)(token)) + .map(|token| (self.invalidator)(&self.cache, token)); + join_all(tasks) + .await + .into_iter() + .collect::>>()?; Ok(()) } diff --git a/src/common/meta/src/cache/registry.rs b/src/common/meta/src/cache/registry.rs new file mode 100644 index 000000000000..e330eab9cdd5 --- /dev/null +++ b/src/common/meta/src/cache/registry.rs @@ -0,0 +1,139 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use futures::future::join_all; + +use crate::cache_invalidator::{CacheInvalidator, Context}; +use crate::error::Result; +use crate::instruction::CacheIdent; + +pub type CacheRegistryRef = Arc; + +/// [CacheRegistryBuilder] provides ability of +/// - Register the `cache` which implements the [CacheInvalidator] trait into [CacheRegistry]. +/// - Build a [CacheRegistry] +#[derive(Default)] +pub struct CacheRegistryBuilder { + registry: CacheRegistry, +} + +impl CacheRegistryBuilder { + pub fn add_cache(mut self, cache: Arc) -> Self { + self.registry.register(cache); + self + } + + pub fn build(self) -> CacheRegistry { + self.registry + } +} + +/// [CacheRegistry] provides ability of +/// - Get a specific `cache`. +#[derive(Default)] +pub struct CacheRegistry { + indexes: Vec>, + registry: anymap2::SendSyncAnyMap, +} + +#[async_trait::async_trait] +impl CacheInvalidator for Arc { + async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()> { + let tasks = self + .indexes + .iter() + .map(|invalidator| invalidator.invalidate(ctx, caches)); + join_all(tasks) + .await + .into_iter() + .collect::>>()?; + Ok(()) + } +} + +impl CacheRegistry { + /// Sets the value stored in the collection for the type `T`. + /// Returns true if the collection already had a value of type `T` + fn register(&mut self, cache: Arc) -> bool { + self.indexes.push(cache.clone()); + self.registry.insert(cache).is_some() + } + + /// Returns __cloned__ the value stored in the collection for the type `T`, if it exists. + pub fn get(&self) -> Option { + self.registry.get().cloned() + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicI32, Ordering}; + use std::sync::Arc; + + use moka::future::{Cache, CacheBuilder}; + + use crate::cache::registry::CacheRegistryBuilder; + use crate::cache::*; + use crate::instruction::CacheIdent; + + fn test_cache(name: &str) -> CacheContainer { + let cache: Cache = CacheBuilder::new(128).build(); + let filter: TokenFilter = Box::new(|_| true); + let counter = Arc::new(AtomicI32::new(0)); + let moved_counter = counter.clone(); + let init: Initializer = Arc::new(move |_| { + moved_counter.fetch_add(1, Ordering::Relaxed); + Box::pin(async { Ok(Some("hi".to_string())) }) + }); + let invalidator: Invalidator = + Box::new(|_, _| Box::pin(async { Ok(()) })); + + CacheContainer::new(name.to_string(), cache, invalidator, init, filter) + } + + fn test_i32_cache(name: &str) -> CacheContainer { + let cache: Cache = CacheBuilder::new(128).build(); + let filter: TokenFilter = Box::new(|_| true); + let counter = Arc::new(AtomicI32::new(0)); + let moved_counter = counter.clone(); + let init: Initializer = Arc::new(move |_| { + moved_counter.fetch_add(1, Ordering::Relaxed); + Box::pin(async { Ok(Some("foo".to_string())) }) + }); + let invalidator: Invalidator = + Box::new(|_, _| Box::pin(async { Ok(()) })); + + CacheContainer::new(name.to_string(), cache, invalidator, init, filter) + } + + #[tokio::test] + async fn test_register() { + let builder = CacheRegistryBuilder::default(); + let i32_cache = Arc::new(test_i32_cache("i32_cache")); + let cache = Arc::new(test_cache("string_cache")); + let registry = builder.add_cache(i32_cache).add_cache(cache).build(); + + let cache = registry + .get::>>() + .unwrap(); + assert_eq!(cache.name(), "i32_cache"); + + let cache = registry + .get::>>() + .unwrap(); + assert_eq!(cache.name(), "string_cache"); + } +} diff --git a/src/common/meta/src/cache_invalidator.rs b/src/common/meta/src/cache_invalidator.rs index f547040b9a5c..3281971c1a4f 100644 --- a/src/common/meta/src/cache_invalidator.rs +++ b/src/common/meta/src/cache_invalidator.rs @@ -47,7 +47,7 @@ pub struct Context { #[async_trait::async_trait] pub trait CacheInvalidator: Send + Sync { - async fn invalidate(&self, ctx: &Context, caches: Vec) -> Result<()>; + async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()>; } pub type CacheInvalidatorRef = Arc; @@ -56,7 +56,7 @@ pub struct DummyCacheInvalidator; #[async_trait::async_trait] impl CacheInvalidator for DummyCacheInvalidator { - async fn invalidate(&self, _ctx: &Context, _caches: Vec) -> Result<()> { + async fn invalidate(&self, _ctx: &Context, _caches: &[CacheIdent]) -> Result<()> { Ok(()) } } @@ -80,10 +80,10 @@ impl MultiCacheInvalidator { #[async_trait::async_trait] impl CacheInvalidator for MultiCacheInvalidator { - async fn invalidate(&self, ctx: &Context, caches: Vec) -> Result<()> { + async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()> { let invalidators = self.invalidators.read().await; for invalidator in invalidators.iter() { - invalidator.invalidate(ctx, caches.clone()).await?; + invalidator.invalidate(ctx, caches).await?; } Ok(()) } @@ -94,25 +94,25 @@ impl CacheInvalidator for T where T: KvCacheInvalidator, { - async fn invalidate(&self, _ctx: &Context, caches: Vec) -> Result<()> { + async fn invalidate(&self, _ctx: &Context, caches: &[CacheIdent]) -> Result<()> { for cache in caches { match cache { CacheIdent::TableId(table_id) => { - let key = TableInfoKey::new(table_id); + let key = TableInfoKey::new(*table_id); self.invalidate_key(&key.to_bytes()).await; - let key = &TableRouteKey { table_id }; + let key = TableRouteKey::new(*table_id); self.invalidate_key(&key.to_bytes()).await; } CacheIdent::TableName(table_name) => { - let key: TableNameKey = (&table_name).into(); + let key: TableNameKey = table_name.into(); self.invalidate_key(&key.to_bytes()).await } CacheIdent::SchemaName(schema_name) => { - let key: SchemaNameKey = (&schema_name).into(); + let key: SchemaNameKey = schema_name.into(); self.invalidate_key(&key.to_bytes()).await; } - CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) => { + &CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) => { // TODO(weny): implements it unimplemented!() } diff --git a/src/common/meta/src/ddl/alter_logical_tables.rs b/src/common/meta/src/ddl/alter_logical_tables.rs index abec47764780..48d34b4307c3 100644 --- a/src/common/meta/src/ddl/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/alter_logical_tables.rs @@ -175,7 +175,7 @@ impl AlterLogicalTablesProcedure { self.context .cache_invalidator - .invalidate(&ctx, to_invalidate) + .invalidate(&ctx, &to_invalidate) .await?; Ok(Status::done()) } diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index 209ed3812203..fdb063d17681 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -185,7 +185,7 @@ impl AlterTableProcedure { }; cache_invalidator - .invalidate(&Context::default(), cache_keys) + .invalidate(&Context::default(), &cache_keys) .await?; Ok(Status::done()) diff --git a/src/common/meta/src/ddl/create_logical_tables/update_metadata.rs b/src/common/meta/src/ddl/create_logical_tables/update_metadata.rs index 103ae4907142..61ec611f850f 100644 --- a/src/common/meta/src/ddl/create_logical_tables/update_metadata.rs +++ b/src/common/meta/src/ddl/create_logical_tables/update_metadata.rs @@ -69,7 +69,7 @@ impl CreateLogicalTablesProcedure { .cache_invalidator .invalidate( &Context::default(), - vec![ + &[ CacheIdent::TableId(self.data.physical_table_id), CacheIdent::TableName(physical_table_name), ], diff --git a/src/common/meta/src/ddl/drop_database/metadata.rs b/src/common/meta/src/ddl/drop_database/metadata.rs index a9a64f9eca95..005806146013 100644 --- a/src/common/meta/src/ddl/drop_database/metadata.rs +++ b/src/common/meta/src/ddl/drop_database/metadata.rs @@ -68,7 +68,7 @@ impl DropMetadataBroadcast { cache_invalidator .invalidate( &ctx, - vec![CacheIdent::SchemaName(SchemaName { + &[CacheIdent::SchemaName(SchemaName { catalog_name: db_ctx.catalog.clone(), schema_name: db_ctx.schema.clone(), })], diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index 0c0f2ddc9cb9..aa41d03c6597 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -154,7 +154,7 @@ impl DropTableExecutor { cache_invalidator .invalidate( &ctx, - vec![ + &[ CacheIdent::TableName(self.table.table_ref().into()), CacheIdent::TableId(self.table_id), ], diff --git a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs index 3cda489035b1..9b949b4ea32a 100644 --- a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs +++ b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs @@ -48,7 +48,7 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler { // Invalidate local cache always success let _ = self .cache_invalidator - .invalidate(&Context::default(), caches) + .invalidate(&Context::default(), &caches) .await?; Ok(HandleControl::Done) diff --git a/src/meta-srv/src/cache_invalidator.rs b/src/meta-srv/src/cache_invalidator.rs index f4085b6bf9fd..c14a20965a02 100644 --- a/src/meta-srv/src/cache_invalidator.rs +++ b/src/meta-srv/src/cache_invalidator.rs @@ -63,8 +63,8 @@ impl MetasrvCacheInvalidator { #[async_trait] impl CacheInvalidator for MetasrvCacheInvalidator { - async fn invalidate(&self, ctx: &Context, caches: Vec) -> MetaResult<()> { - let instruction = Instruction::InvalidateCaches(caches); + async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> MetaResult<()> { + let instruction = Instruction::InvalidateCaches(caches.to_vec()); self.broadcast(ctx, instruction).await } } diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 599dbe51386f..6ca5bb54b74d 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -404,7 +404,7 @@ impl StatementExecutor { self.cache_invalidator .invalidate( &Context::default(), - vec![ + &[ CacheIdent::TableId(table_id), CacheIdent::TableName(table_name.clone()), ], @@ -619,7 +619,7 @@ impl StatementExecutor { // Invalidates local cache ASAP. self.cache_invalidator - .invalidate(&Context::default(), invalidate_keys) + .invalidate(&Context::default(), &invalidate_keys) .await .context(error::InvalidateTableCacheSnafu)?;