Skip to content

Commit

Permalink
feat: introduce TableRouteCache to PartitionRuleManager (#3922)
Browse files Browse the repository at this point in the history
* chore: add `CompositeTableRouteCacheRef` to `PartitionRuleManager`

* chore: update comments

* fix: add metrics for `get`

* chore: apply suggestions from CR

* chore: correct cache name

* feat: implement `LayeredCacheRegistry`

* fix: invalidate logical tables by physical table id

* refactor: replace `CacheRegistry` with `LayeredCacheRegistry`

* chore: update comments

* chore: apply suggestions from CR

* chore: fix fmt

* refactor: use `TableRouteCache` instead

* chore: apply suggestions from CR

* chore: fix clippy
  • Loading branch information
WenyXu authored May 13, 2024
1 parent be1eb4e commit e15294d
Show file tree
Hide file tree
Showing 22 changed files with 459 additions and 361 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@ license.workspace = true

[dependencies]
catalog.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-meta.workspace = true
moka.workspace = true
snafu.workspace = true
44 changes: 44 additions & 0 deletions src/cache/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};

#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to get cache from cache registry: {}", name))]
CacheRequired {
#[snafu(implicit)]
location: Location,
name: String,
},
}

pub type Result<T> = std::result::Result<T, Error>;

impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::CacheRequired { .. } => StatusCode::Internal,
}
}

fn as_any(&self) -> &dyn std::any::Any {
self
}
}
54 changes: 45 additions & 9 deletions src/cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod error;

use std::sync::Arc;
use std::time::Duration;

use catalog::kvbackend::new_table_cache;
use common_meta::cache::{
new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache, CacheRegistryBuilder,
new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache,
new_table_route_cache, CacheRegistry, CacheRegistryBuilder, LayeredCacheRegistryBuilder,
};
use common_meta::kv_backend::KvBackendRef;
use moka::future::CacheBuilder;
use snafu::OptionExt;

use crate::error::Result;

const DEFAULT_CACHE_MAX_CAPACITY: u64 = 65536;
const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
Expand All @@ -30,9 +36,9 @@ pub const TABLE_INFO_CACHE_NAME: &str = "table_info_cache";
pub const TABLE_NAME_CACHE_NAME: &str = "table_name_cache";
pub const TABLE_CACHE_NAME: &str = "table_cache";
pub const TABLE_FLOWNODE_SET_CACHE_NAME: &str = "table_flownode_set_cache";
pub const TABLE_ROUTE_CACHE_NAME: &str = "table_route_cache";

// TODO(weny): Make the cache configurable.
pub fn default_cache_registry_builder(kv_backend: KvBackendRef) -> CacheRegistryBuilder {
pub fn build_fundamental_cache_registry(kv_backend: KvBackendRef) -> CacheRegistry {
// Builds table info cache
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
Expand All @@ -55,16 +61,15 @@ pub fn default_cache_registry_builder(kv_backend: KvBackendRef) -> CacheRegistry
kv_backend.clone(),
));

// Builds table cache
// Builds table route cache
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let table_cache = Arc::new(new_table_cache(
TABLE_CACHE_NAME.to_string(),
let table_route_cache = Arc::new(new_table_route_cache(
TABLE_ROUTE_CACHE_NAME.to_string(),
cache,
table_info_cache.clone(),
table_name_cache.clone(),
kv_backend.clone(),
));

// Builds table flownode set cache
Expand All @@ -81,6 +86,37 @@ pub fn default_cache_registry_builder(kv_backend: KvBackendRef) -> CacheRegistry
CacheRegistryBuilder::default()
.add_cache(table_info_cache)
.add_cache(table_name_cache)
.add_cache(table_cache)
.add_cache(table_route_cache)
.add_cache(table_flownode_set_cache)
.build()
}

// TODO(weny): Make the cache configurable.
pub fn with_default_composite_cache_registry(
builder: LayeredCacheRegistryBuilder,
) -> Result<LayeredCacheRegistryBuilder> {
let table_info_cache = builder.get().context(error::CacheRequiredSnafu {
name: TABLE_INFO_CACHE_NAME,
})?;
let table_name_cache = builder.get().context(error::CacheRequiredSnafu {
name: TABLE_NAME_CACHE_NAME,
})?;

// Builds table cache
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let table_cache = Arc::new(new_table_cache(
TABLE_CACHE_NAME.to_string(),
cache,
table_info_cache,
table_name_cache,
));

let registry = CacheRegistryBuilder::default()
.add_cache(table_cache)
.build();

Ok(builder.add_cache_registry(registry))
}
7 changes: 6 additions & 1 deletion src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_catalog::consts::{
};
use common_config::Mode;
use common_error::ext::BoxedError;
use common_meta::cache::TableRouteCacheRef;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::table_info::TableInfoValue;
Expand Down Expand Up @@ -71,11 +72,15 @@ impl KvBackendCatalogManager {
meta_client: Option<Arc<MetaClient>>,
backend: KvBackendRef,
table_cache: TableCacheRef,
table_route_cache: TableRouteCacheRef,
) -> Arc<Self> {
Arc::new_cyclic(|me| Self {
mode,
meta_client,
partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())),
partition_manager: Arc::new(PartitionRuleManager::new(
backend.clone(),
table_route_cache,
)),
table_metadata_manager: Arc::new(TableMetadataManager::new(backend)),
system_catalog: SystemCatalog {
catalog_manager: me.clone(),
Expand Down
9 changes: 8 additions & 1 deletion src/cmd/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,13 @@ pub enum Error {
location: Location,
name: String,
},

#[snafu(display("Failed to build cache registry"))]
BuildCacheRegistry {
#[snafu(implicit)]
location: Location,
source: cache::error::Error,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -359,7 +366,7 @@ impl ErrorExt for Error {

Error::SerdeJson { .. } | Error::FileIo { .. } => StatusCode::Unexpected,

Error::CacheRequired { .. } => StatusCode::Internal,
Error::CacheRequired { .. } | Error::BuildCacheRegistry { .. } => StatusCode::Internal,

Error::Other { source, .. } => source.status_code(),

Expand Down
47 changes: 36 additions & 11 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use cache::{default_cache_registry_builder, TABLE_CACHE_NAME};
use cache::{
build_fundamental_cache_registry, with_default_composite_cache_registry, TABLE_CACHE_NAME,
TABLE_ROUTE_CACHE_NAME,
};
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use clap::Parser;
use client::client_manager::DatanodeClients;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_telemetry::info;
Expand Down Expand Up @@ -248,27 +252,48 @@ impl StartCommand {
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);
let cache_registry_builder =
default_cache_registry_builder(Arc::new(MetaKvBackend::new(meta_client.clone())));
let cache_registry = Arc::new(
cache_registry_builder

// Builds cache registry
let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
CacheRegistryBuilder::default()
.add_cache(cached_meta_backend.clone())
.build(),
);
let table_cache = cache_registry.get().context(error::CacheRequiredSnafu {
name: TABLE_CACHE_NAME,
})?;
let fundamental_cache_registry =
build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
let layered_cache_registry = Arc::new(
with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
)
.context(error::BuildCacheRegistrySnafu)?
.build(),
);

let table_cache = layered_cache_registry
.get()
.context(error::CacheRequiredSnafu {
name: TABLE_CACHE_NAME,
})?;
let table_route_cache =
layered_cache_registry
.get()
.context(error::CacheRequiredSnafu {
name: TABLE_ROUTE_CACHE_NAME,
})?;
let catalog_manager = KvBackendCatalogManager::new(
opts.mode,
Some(meta_client.clone()),
cached_meta_backend.clone(),
table_cache,
table_route_cache,
)
.await;

let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(cache_registry.clone())),
Arc::new(InvalidateTableCacheHandler::new(
layered_cache_registry.clone(),
)),
]);

let heartbeat_task = HeartbeatTask::new(
Expand All @@ -280,13 +305,13 @@ impl StartCommand {

let mut instance = FrontendBuilder::new(
cached_meta_backend.clone(),
cache_registry.clone(),
layered_cache_registry.clone(),
catalog_manager,
Arc::new(DatanodeClients::default()),
meta_client,
)
.with_plugin(plugins.clone())
.with_local_cache_invalidator(cache_registry)
.with_local_cache_invalidator(layered_cache_registry)
.with_heartbeat_task(heartbeat_task)
.try_build()
.await
Expand Down
45 changes: 34 additions & 11 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ use std::sync::Arc;
use std::{fs, path};

use async_trait::async_trait;
use cache::{default_cache_registry_builder, TABLE_CACHE_NAME};
use cache::{
build_fundamental_cache_registry, with_default_composite_cache_registry, TABLE_CACHE_NAME,
TABLE_ROUTE_CACHE_NAME,
};
use catalog::kvbackend::KvBackendCatalogManager;
use clap::Parser;
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef};
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
Expand Down Expand Up @@ -59,10 +63,10 @@ use servers::Mode;
use snafu::{OptionExt, ResultExt};

use crate::error::{
CacheRequiredSnafu, CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu,
InitTimezoneSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu,
StartFrontendSnafu, StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu,
StopProcedureManagerSnafu,
BuildCacheRegistrySnafu, CacheRequiredSnafu, CreateDirSnafu, IllegalConfigSnafu,
InitDdlManagerSnafu, InitMetadataSnafu, InitTimezoneSnafu, Result, ShutdownDatanodeSnafu,
ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu,
StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
};
use crate::options::{GlobalOptions, Options};
use crate::App;
Expand Down Expand Up @@ -388,12 +392,31 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;

let cache_registry = Arc::new(default_cache_registry_builder(kv_backend.clone()).build());
let table_cache = cache_registry.get().context(CacheRequiredSnafu {
// Builds cache registry
let layered_cache_builder = LayeredCacheRegistryBuilder::default();
let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
let layered_cache_registry = Arc::new(
with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
)
.context(BuildCacheRegistrySnafu)?
.build(),
);

let table_cache = layered_cache_registry.get().context(CacheRequiredSnafu {
name: TABLE_CACHE_NAME,
})?;
let catalog_manager =
KvBackendCatalogManager::new(dn_opts.mode, None, kv_backend.clone(), table_cache).await;
let table_route_cache = layered_cache_registry.get().context(CacheRequiredSnafu {
name: TABLE_ROUTE_CACHE_NAME,
})?;
let catalog_manager = KvBackendCatalogManager::new(
dn_opts.mode,
None,
kv_backend.clone(),
table_cache,
table_route_cache,
)
.await;

let builder =
DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone());
Expand Down Expand Up @@ -431,7 +454,7 @@ impl StartCommand {
let ddl_task_executor = Self::create_ddl_task_executor(
procedure_manager.clone(),
node_manager.clone(),
cache_registry.clone(),
layered_cache_registry.clone(),
table_metadata_manager,
table_meta_allocator,
flow_metadata_manager,
Expand All @@ -441,7 +464,7 @@ impl StartCommand {

let mut frontend = FrontendBuilder::new(
kv_backend,
cache_registry,
layered_cache_registry,
catalog_manager,
node_manager,
ddl_task_executor,
Expand Down
Loading

0 comments on commit e15294d

Please sign in to comment.