diff --git a/src/common/meta/src/cache.rs b/src/common/meta/src/cache.rs index d00c0688cbf4..ee8937e9b1be 100644 --- a/src/common/meta/src/cache.rs +++ b/src/common/meta/src/cache.rs @@ -21,6 +21,8 @@ pub use container::{CacheContainer, Initializer, Invalidator, TokenFilter}; pub use flow::{new_table_flownode_set_cache, TableFlownodeSetCache, TableFlownodeSetCacheRef}; pub use registry::{CacheRegistry, CacheRegistryBuilder, CacheRegistryRef}; pub use table::{ - new_table_info_cache, new_table_name_cache, new_table_route_cache, TableInfoCache, - TableInfoCacheRef, TableNameCache, TableNameCacheRef, TableRouteCache, TableRouteCacheRef, + new_composite_table_route_cache, new_table_info_cache, new_table_name_cache, + new_table_route_cache, CompositeTableRoute, CompositeTableRouteCache, + CompositeTableRouteCacheRef, TableInfoCache, TableInfoCacheRef, TableNameCache, + TableNameCacheRef, TableRouteCache, TableRouteCacheRef, }; diff --git a/src/common/meta/src/cache/table.rs b/src/common/meta/src/cache/table.rs index 016275fa97c0..a595118c23f9 100644 --- a/src/common/meta/src/cache/table.rs +++ b/src/common/meta/src/cache/table.rs @@ -12,9 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod composite_table_route; mod table_info; mod table_name; mod table_route; +pub use composite_table_route::{ + new_composite_table_route_cache, CompositeTableRoute, CompositeTableRouteCache, + CompositeTableRouteCacheRef, +}; pub use table_info::{new_table_info_cache, TableInfoCache, TableInfoCacheRef}; pub use table_name::{new_table_name_cache, TableNameCache, TableNameCacheRef}; -pub use table_route::{new_table_route_cache, TableRouteCache, TableRouteCacheRef}; +pub use table_route::{new_table_route_cache, TableRoute, TableRouteCache, TableRouteCacheRef}; diff --git a/src/common/meta/src/cache/table/composite_table_route.rs b/src/common/meta/src/cache/table/composite_table_route.rs new file mode 100644 index 000000000000..c213bdd42ae5 --- /dev/null +++ b/src/common/meta/src/cache/table/composite_table_route.rs @@ -0,0 +1,274 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use futures::future::BoxFuture; +use moka::future::Cache; +use snafu::OptionExt; +use store_api::storage::TableId; + +use crate::cache::table::{TableRoute, TableRouteCacheRef}; +use crate::cache::{CacheContainer, Initializer}; +use crate::error; +use crate::error::Result; +use crate::instruction::CacheIdent; +use crate::key::table_route::{LogicalTableRouteValue, PhysicalTableRouteValue}; + +/// [CompositeTableRoute] stores all level routes of a table. +/// - Stores [PhysicalTableRouteValue] for logical table. +/// - Stores [LogicalTableRouteValue], [PhysicalTableRouteValue] for the logical table. +#[derive(Clone)] +pub enum CompositeTableRoute { + Physical(Arc), + Logical(Arc, Arc), +} + +impl CompositeTableRoute { + /// Returns true if it's physical table. + pub fn is_physical(&self) -> bool { + matches!(self, CompositeTableRoute::Physical(_)) + } + + /// Returns [PhysicalTableRouteValue] reference. + pub fn as_physical_table_route_ref(&self) -> &Arc { + match self { + CompositeTableRoute::Physical(route) => route, + CompositeTableRoute::Logical(_, route) => route, + } + } + + /// Returns [LogicalTableRouteValue] reference if it's [CompositeTableRoute::Logical]; Otherwise returns [None]. + pub fn as_logical_table_route_ref(&self) -> Option<&Arc> { + match self { + CompositeTableRoute::Physical(_) => None, + CompositeTableRoute::Logical(route, _) => Some(route), + } + } +} + +/// [CompositeTableRouteCache] caches the [TableId] to [CompositeTableRoute] mapping. +pub type CompositeTableRouteCache = CacheContainer, CacheIdent>; + +pub type CompositeTableRouteCacheRef = Arc; + +/// Constructs a [CompositeTableRouteCache]. +pub fn new_composite_table_route_cache( + name: String, + cache: Cache>, + table_route_cache: TableRouteCacheRef, +) -> CompositeTableRouteCache { + let init = init_factory(table_route_cache); + + CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter)) +} + +fn init_factory( + table_route_cache: TableRouteCacheRef, +) -> Initializer> { + Arc::new(move |table_id| { + let table_route_cache = table_route_cache.clone(); + Box::pin(async move { + let table_route_value = table_route_cache + .get(*table_id) + .await? + .context(error::ValueNotExistSnafu)?; + match table_route_value.as_ref() { + TableRoute::Physical(physical_table_route) => Ok(Some(Arc::new( + CompositeTableRoute::Physical(physical_table_route.clone()), + ))), + TableRoute::Logical(logical_table_route) => { + let physical_table_id = logical_table_route.physical_table_id(); + let physical_table_route = table_route_cache + .get(physical_table_id) + .await? + .context(error::ValueNotExistSnafu)?; + + let physical_table_route = physical_table_route + .as_physical_table_route_ref() + .with_context(|| error::UnexpectedSnafu { + err_msg: format!( + "Expected the physical table route, but got logical table route, table: {table_id}" + ), + })?; + + Ok(Some(Arc::new(CompositeTableRoute::Logical( + logical_table_route.clone(), + physical_table_route.clone(), + )))) + } + } + }) + }) +} + +fn invalidator<'a>( + cache: &'a Cache>, + ident: &'a CacheIdent, +) -> BoxFuture<'a, Result<()>> { + Box::pin(async move { + if let CacheIdent::TableId(table_id) = ident { + cache.invalidate(table_id).await + } + Ok(()) + }) +} + +fn filter(ident: &CacheIdent) -> bool { + matches!(ident, CacheIdent::TableId(_)) +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use moka::future::CacheBuilder; + use store_api::storage::RegionId; + + use super::*; + use crate::cache::new_table_route_cache; + use crate::ddl::test_util::create_table::test_create_table_task; + use crate::ddl::test_util::test_create_logical_table_task; + use crate::key::table_route::TableRouteValue; + use crate::key::TableMetadataManager; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::peer::Peer; + use crate::rpc::router::{Region, RegionRoute}; + + #[tokio::test] + async fn test_cache_with_physical_table_route() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = TableMetadataManager::new(mem_kv.clone()); + let cache = CacheBuilder::new(128).build(); + let table_route_cache = Arc::new(new_table_route_cache( + "test".to_string(), + cache, + mem_kv.clone(), + )); + let cache = CacheBuilder::new(128).build(); + let cache = + new_composite_table_route_cache("test".to_string(), cache, table_route_cache.clone()); + + let result = cache.get(1024).await.unwrap(); + assert!(result.is_none()); + let task = test_create_table_task("my_table", 1024); + let table_id = 10; + let region_id = RegionId::new(table_id, 1); + let peer = Peer::empty(1); + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }]; + table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(region_routes.clone()), + HashMap::new(), + ) + .await + .unwrap(); + let table_route = cache.get(1024).await.unwrap().unwrap(); + assert_eq!( + (*table_route) + .clone() + .as_physical_table_route_ref() + .region_routes, + region_routes + ); + + assert!(table_route_cache.contains_key(&1024)); + assert!(cache.contains_key(&1024)); + cache + .invalidate(&[CacheIdent::TableId(1024)]) + .await + .unwrap(); + assert!(!cache.contains_key(&1024)); + } + + #[tokio::test] + async fn test_cache_with_logical_table_route() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = TableMetadataManager::new(mem_kv.clone()); + let cache = CacheBuilder::new(128).build(); + let table_route_cache = Arc::new(new_table_route_cache( + "test".to_string(), + cache, + mem_kv.clone(), + )); + let cache = CacheBuilder::new(128).build(); + let cache = + new_composite_table_route_cache("test".to_string(), cache, table_route_cache.clone()); + + let result = cache.get(1024).await.unwrap(); + assert!(result.is_none()); + // Prepares table routes + let task = test_create_table_task("my_table", 1024); + let table_id = 10; + let region_id = RegionId::new(table_id, 1); + let peer = Peer::empty(1); + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }]; + table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(region_routes.clone()), + HashMap::new(), + ) + .await + .unwrap(); + let mut task = test_create_logical_table_task("logical"); + task.table_info.ident.table_id = 1025; + table_metadata_manager + .create_logical_tables_metadata(vec![( + task.table_info, + TableRouteValue::logical(1024, vec![RegionId::new(1025, 0)]), + )]) + .await + .unwrap(); + + // Gets logical table route + let table_route = cache.get(1025).await.unwrap().unwrap(); + assert_eq!( + table_route + .as_logical_table_route_ref() + .unwrap() + .physical_table_id(), + 1024 + ); + assert_eq!( + table_route.as_physical_table_route_ref().region_routes, + region_routes + ); + + assert!(!cache.contains_key(&1024)); + // Gets physical table route + let table_route = cache.get(1024).await.unwrap().unwrap(); + assert_eq!( + table_route.as_physical_table_route_ref().region_routes, + region_routes + ); + assert!(table_route.is_physical()); + + cache + .invalidate(&[CacheIdent::TableId(1025)]) + .await + .unwrap(); + assert!(!cache.contains_key(&1025)); + } +} diff --git a/src/common/meta/src/cache/table/table_route.rs b/src/common/meta/src/cache/table/table_route.rs index d7667cf81e7a..2383a1ea13d0 100644 --- a/src/common/meta/src/cache/table/table_route.rs +++ b/src/common/meta/src/cache/table/table_route.rs @@ -23,18 +23,51 @@ use crate::cache::{CacheContainer, Initializer}; use crate::error; use crate::error::Result; use crate::instruction::CacheIdent; -use crate::key::table_route::{TableRouteManager, TableRouteManagerRef, TableRouteValue}; +use crate::key::table_route::{ + LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteManager, TableRouteManagerRef, + TableRouteValue, +}; use crate::kv_backend::KvBackendRef; -/// [TableRouteCache] caches the [TableId] to [TableRouteValue] mapping. -pub type TableRouteCache = CacheContainer, CacheIdent>; +/// [TableRoute] stores `Arc` wrapped table route. +#[derive(Clone)] +pub enum TableRoute { + Physical(Arc), + Logical(Arc), +} + +impl TableRoute { + /// Returns true if it's physical table. + pub fn is_physical(&self) -> bool { + matches!(self, TableRoute::Physical(_)) + } + + /// Returns [PhysicalTableRouteValue] reference if it's [TableRoute::Physical]; Otherwise it returns [None]. + pub fn as_physical_table_route_ref(&self) -> Option<&Arc> { + match self { + TableRoute::Physical(table_route) => Some(table_route), + TableRoute::Logical(_) => None, + } + } + + /// Returns [LogicalTableRouteValue] reference if it's [TableRoute::Logical]; Otherwise it returns [None]. + pub fn as_logical_table_route_ref(&self) -> Option<&Arc> { + match self { + TableRoute::Physical(_) => None, + TableRoute::Logical(table_route) => Some(table_route), + } + } +} + +/// [TableRouteCache] caches the [TableId] to [TableRoute] mapping. +pub type TableRouteCache = CacheContainer, CacheIdent>; pub type TableRouteCacheRef = Arc; /// Constructs a [TableRouteCache]. pub fn new_table_route_cache( name: String, - cache: Cache>, + cache: Cache>, kv_backend: KvBackendRef, ) -> TableRouteCache { let table_info_manager = Arc::new(TableRouteManager::new(kv_backend)); @@ -45,7 +78,7 @@ pub fn new_table_route_cache( fn init_factory( table_route_manager: TableRouteManagerRef, -) -> Initializer> { +) -> Initializer> { Arc::new(move |table_id| { let table_route_manager = table_route_manager.clone(); Box::pin(async move { @@ -55,13 +88,18 @@ fn init_factory( .await? .context(error::ValueNotExistSnafu {})?; - Ok(Some(Arc::new(table_route_value))) + let table_route = match table_route_value { + TableRouteValue::Physical(physical) => TableRoute::Physical(Arc::new(physical)), + TableRouteValue::Logical(logical) => TableRoute::Logical(Arc::new(logical)), + }; + + Ok(Some(Arc::new(table_route))) }) }) } fn invalidator<'a>( - cache: &'a Cache>, + cache: &'a Cache>, ident: &'a CacheIdent, ) -> BoxFuture<'a, Result<()>> { Box::pin(async move { @@ -122,7 +160,8 @@ mod tests { assert_eq!( (*table_route) .clone() - .into_physical_table_route() + .as_physical_table_route_ref() + .unwrap() .region_routes, region_routes );