Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce TableRouteCache to PartitionRuleManager #3922

Merged
merged 14 commits into from
May 13, 2024
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@ license.workspace = true

[dependencies]
catalog.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-meta.workspace = true
moka.workspace = true
snafu.workspace = true
44 changes: 44 additions & 0 deletions src/cache/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};

#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to get cache from cache registry: {}", name))]
CacheRequired {
#[snafu(implicit)]
location: Location,
name: String,
},
}

pub type Result<T> = std::result::Result<T, Error>;

impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::CacheRequired { .. } => StatusCode::Internal,
}
}

fn as_any(&self) -> &dyn std::any::Any {
self
}
}
71 changes: 62 additions & 9 deletions src/cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod error;

use std::sync::Arc;
use std::time::Duration;

use catalog::kvbackend::new_table_cache;
use common_meta::cache::{
new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache, CacheRegistryBuilder,
new_composite_table_route_cache, new_table_flownode_set_cache, new_table_info_cache,
new_table_name_cache, new_table_route_cache, CacheRegistry, CacheRegistryBuilder,
LayeredCacheRegistryBuilder,
};
use common_meta::kv_backend::KvBackendRef;
use moka::future::CacheBuilder;
use snafu::OptionExt;

use crate::error::Result;

const DEFAULT_CACHE_MAX_CAPACITY: u64 = 65536;
const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
Expand All @@ -30,9 +37,10 @@ pub const TABLE_INFO_CACHE_NAME: &str = "table_info_cache";
pub const TABLE_NAME_CACHE_NAME: &str = "table_name_cache";
pub const TABLE_CACHE_NAME: &str = "table_cache";
pub const TABLE_FLOWNODE_SET_CACHE_NAME: &str = "table_flownode_set_cache";
pub const TABLE_ROUTE_CACHE_NAME: &str = "table_route_cache";
pub const COMPOSITE_TABLE_ROUTE_CACHE: &str = "composite_table_route_cache";

// TODO(weny): Make the cache configurable.
pub fn default_cache_registry_builder(kv_backend: KvBackendRef) -> CacheRegistryBuilder {
pub fn build_fundamental_cache_registry(kv_backend: KvBackendRef) -> CacheRegistry {
// Builds table info cache
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
Expand All @@ -55,16 +63,15 @@ pub fn default_cache_registry_builder(kv_backend: KvBackendRef) -> CacheRegistry
kv_backend.clone(),
));

// Builds table cache
// Builds table route cache
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let table_cache = Arc::new(new_table_cache(
TABLE_CACHE_NAME.to_string(),
let table_route_cache = Arc::new(new_table_route_cache(
TABLE_ROUTE_CACHE_NAME.to_string(),
cache,
table_info_cache.clone(),
table_name_cache.clone(),
kv_backend.clone(),
));

// Builds table flownode set cache
Expand All @@ -81,6 +88,52 @@ pub fn default_cache_registry_builder(kv_backend: KvBackendRef) -> CacheRegistry
CacheRegistryBuilder::default()
.add_cache(table_info_cache)
.add_cache(table_name_cache)
.add_cache(table_cache)
.add_cache(table_route_cache)
.add_cache(table_flownode_set_cache)
.build()
}

// TODO(weny): Make the cache configurable.
pub fn with_default_composite_cache_registry(
builder: LayeredCacheRegistryBuilder,
) -> Result<LayeredCacheRegistryBuilder> {
let table_info_cache = builder.get().context(error::CacheRequiredSnafu {
name: TABLE_INFO_CACHE_NAME,
})?;
let table_name_cache = builder.get().context(error::CacheRequiredSnafu {
name: TABLE_NAME_CACHE_NAME,
})?;
let table_route_cache = builder.get().context(error::CacheRequiredSnafu {
name: TABLE_ROUTE_CACHE_NAME,
})?;

// Builds table cache
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let table_cache = Arc::new(new_table_cache(
TABLE_CACHE_NAME.to_string(),
cache,
table_info_cache,
table_name_cache,
));

// Builds composite table route cache
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let composite_table_route_cache = Arc::new(new_composite_table_route_cache(
COMPOSITE_TABLE_ROUTE_CACHE.to_string(),
cache,
table_route_cache,
));

let registry = CacheRegistryBuilder::default()
.add_cache(table_cache)
.add_cache(composite_table_route_cache)
.build();

Ok(builder.add_cache_registry(registry))
}
7 changes: 6 additions & 1 deletion src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_catalog::consts::{
};
use common_config::Mode;
use common_error::ext::BoxedError;
use common_meta::cache::CompositeTableRouteCacheRef;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::table_info::TableInfoValue;
Expand Down Expand Up @@ -71,11 +72,15 @@ impl KvBackendCatalogManager {
meta_client: Option<Arc<MetaClient>>,
backend: KvBackendRef,
table_cache: TableCacheRef,
composite_table_route_cache: CompositeTableRouteCacheRef,
) -> Arc<Self> {
Arc::new_cyclic(|me| Self {
mode,
meta_client,
partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())),
partition_manager: Arc::new(PartitionRuleManager::new(
backend.clone(),
composite_table_route_cache,
)),
table_metadata_manager: Arc::new(TableMetadataManager::new(backend)),
system_catalog: SystemCatalog {
catalog_manager: me.clone(),
Expand Down
9 changes: 8 additions & 1 deletion src/cmd/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,13 @@ pub enum Error {
location: Location,
name: String,
},

#[snafu(display("Failed to build cache registry"))]
BuildCacheRegistry {
#[snafu(implicit)]
location: Location,
source: cache::error::Error,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -359,7 +366,7 @@ impl ErrorExt for Error {

Error::SerdeJson { .. } | Error::FileIo { .. } => StatusCode::Unexpected,

Error::CacheRequired { .. } => StatusCode::Internal,
Error::CacheRequired { .. } | Error::BuildCacheRegistry { .. } => StatusCode::Internal,

Error::Other { source, .. } => source.status_code(),

Expand Down
47 changes: 36 additions & 11 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use cache::{default_cache_registry_builder, TABLE_CACHE_NAME};
use cache::{
build_fundamental_cache_registry, with_default_composite_cache_registry,
COMPOSITE_TABLE_ROUTE_CACHE, TABLE_CACHE_NAME,
};
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use clap::Parser;
use client::client_manager::DatanodeClients;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_telemetry::info;
Expand Down Expand Up @@ -242,27 +246,48 @@ impl StartCommand {
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);
let cache_registry_builder =
default_cache_registry_builder(Arc::new(MetaKvBackend::new(meta_client.clone())));
let cache_registry = Arc::new(
cache_registry_builder

// Builds cache registry
let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
CacheRegistryBuilder::default()
.add_cache(cached_meta_backend.clone())
.build(),
);
let table_cache = cache_registry.get().context(error::CacheRequiredSnafu {
name: TABLE_CACHE_NAME,
})?;
let fundamental_cache_registry =
build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
let layered_cache_registry = Arc::new(
with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
)
.context(error::BuildCacheRegistrySnafu)?
.build(),
);

let table_cache = layered_cache_registry
.get()
.context(error::CacheRequiredSnafu {
name: TABLE_CACHE_NAME,
})?;
let composite_table_route_cache =
layered_cache_registry
.get()
.context(error::CacheRequiredSnafu {
name: COMPOSITE_TABLE_ROUTE_CACHE,
})?;
let catalog_manager = KvBackendCatalogManager::new(
opts.mode,
Some(meta_client.clone()),
cached_meta_backend.clone(),
table_cache,
composite_table_route_cache,
)
.await;

let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(cache_registry.clone())),
Arc::new(InvalidateTableCacheHandler::new(
layered_cache_registry.clone(),
)),
]);

let heartbeat_task = HeartbeatTask::new(
Expand All @@ -274,13 +299,13 @@ impl StartCommand {

let mut instance = FrontendBuilder::new(
cached_meta_backend.clone(),
cache_registry.clone(),
layered_cache_registry.clone(),
catalog_manager,
Arc::new(DatanodeClients::default()),
meta_client,
)
.with_plugin(plugins.clone())
.with_local_cache_invalidator(cache_registry)
.with_local_cache_invalidator(layered_cache_registry)
.with_heartbeat_task(heartbeat_task)
.try_build()
.await
Expand Down
Loading