From f82737bd6f3c00c8126e28f6bf8456b33b26f83d Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 12 Sep 2023 08:53:47 +0000 Subject: [PATCH] refactor: remove MetadataService --- Cargo.lock | 1 + src/common/meta/Cargo.toml | 1 + src/common/meta/src/key/catalog_name.rs | 2 + src/common/meta/src/key/schema_name.rs | 2 + src/common/meta/src/metrics.rs | 2 + src/meta-srv/src/error.rs | 8 ++ src/meta-srv/src/lib.rs | 1 - src/meta-srv/src/metadata_service.rs | 157 ------------------------ src/meta-srv/src/metasrv.rs | 10 +- src/meta-srv/src/metasrv/builder.rs | 19 --- src/meta-srv/src/metrics.rs | 2 - src/meta-srv/src/mocks.rs | 8 +- 12 files changed, 21 insertions(+), 192 deletions(-) delete mode 100644 src/meta-srv/src/metadata_service.rs diff --git a/Cargo.lock b/Cargo.lock index 2eb4852db6ce..9edc6dcc7045 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1859,6 +1859,7 @@ dependencies = [ "humantime-serde", "hyper", "lazy_static", + "metrics", "prost", "regex", "serde", diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 2bd4a56ab49a..566e49f05bea 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -22,6 +22,7 @@ etcd-client.workspace = true futures.workspace = true humantime-serde.workspace = true lazy_static.workspace = true +metrics.workspace = true prost.workspace = true regex.workspace = true serde.workspace = true diff --git a/src/common/meta/src/key/catalog_name.rs b/src/common/meta/src/key/catalog_name.rs index c6c727c0af39..bdb29cbba041 100644 --- a/src/common/meta/src/key/catalog_name.rs +++ b/src/common/meta/src/key/catalog_name.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use common_catalog::consts::DEFAULT_CATALOG_NAME; use futures::stream::BoxStream; use futures::StreamExt; +use metrics::increment_counter; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -108,6 +109,7 @@ impl CatalogManager { .with_key(raw_key) .with_value(CatalogNameValue.try_as_raw_value()?); self.kv_backend.put(req).await?; + increment_counter!(crate::metrics::METRIC_META_CREATE_CATALOG); Ok(()) } diff --git a/src/common/meta/src/key/schema_name.rs b/src/common/meta/src/key/schema_name.rs index c162e31b1617..17a86ba69fb5 100644 --- a/src/common/meta/src/key/schema_name.rs +++ b/src/common/meta/src/key/schema_name.rs @@ -21,6 +21,7 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use futures::stream::BoxStream; use futures::StreamExt; use humantime_serde::re::humantime; +use metrics::increment_counter; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -149,6 +150,7 @@ impl SchemaManager { .with_value(value.unwrap_or_default().try_as_raw_value()?); self.kv_backend.put(req).await?; + increment_counter!(crate::metrics::METRIC_META_CREATE_SCHEMA); Ok(()) } diff --git a/src/common/meta/src/metrics.rs b/src/common/meta/src/metrics.rs index ed672a772133..49535607af72 100644 --- a/src/common/meta/src/metrics.rs +++ b/src/common/meta/src/metrics.rs @@ -14,6 +14,8 @@ pub const METRIC_META_TXN_REQUEST: &str = "meta.txn_request"; +pub(crate) const METRIC_META_CREATE_CATALOG: &str = "meta.create_catalog"; +pub(crate) const METRIC_META_CREATE_SCHEMA: &str = "meta.create_schema"; pub(crate) const METRIC_META_PROCEDURE_CREATE_TABLE: &str = "meta.procedure.create_table"; pub(crate) const METRIC_META_PROCEDURE_DROP_TABLE: &str = "meta.procedure.drop_table"; pub(crate) const METRIC_META_PROCEDURE_ALTER_TABLE: &str = "meta.procedure.alter_table"; diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 37b835c49609..9d96b333ad47 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -26,6 +26,12 @@ use crate::pubsub::Message; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to create default catalog and schema, source: {}", source))] + InitMetadata { + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Failed to allocate next sequence number: {}", source))] NextSequence { location: Location, @@ -612,6 +618,8 @@ impl ErrorExt for Error { | Error::ConvertEtcdTxnObject { source, .. } | Error::GetFullTableInfo { source, .. } => source.status_code(), + Error::InitMetadata { source, .. } => source.status_code(), + Error::Other { source, .. } => source.status_code(), } } diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 4e5b26e1f532..9bea19c48fce 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -25,7 +25,6 @@ pub mod handler; pub mod keys; pub mod lease; pub mod lock; -pub mod metadata_service; pub mod metasrv; mod metrics; #[cfg(feature = "mock")] diff --git a/src/meta-srv/src/metadata_service.rs b/src/meta-srv/src/metadata_service.rs deleted file mode 100644 index 09efdbe6d308..000000000000 --- a/src/meta-srv/src/metadata_service.rs +++ /dev/null @@ -1,157 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use async_trait::async_trait; -use common_meta::key::catalog_name::CatalogNameKey; -use common_meta::key::schema_name::SchemaNameKey; -use common_meta::key::TableMetadataManagerRef; -use common_telemetry::{info, timer}; -use metrics::increment_counter; -use snafu::{ensure, ResultExt}; - -use crate::error; -use crate::error::Result; - -/// This trait defines some methods of metadata -#[async_trait] -pub trait MetadataService: Send + Sync { - // An error occurs if the schema exists and "if_not_exist" == false. - async fn create_schema( - &self, - catalog_name: &str, - schema_name: &str, - if_not_exist: bool, - ) -> Result<()>; - - async fn delete_schema(&self, catalog_name: &str, schema_name: &str) -> Result<()>; -} - -pub type MetadataServiceRef = Arc; - -#[derive(Clone)] -pub struct DefaultMetadataService { - table_metadata_manager: TableMetadataManagerRef, -} - -impl DefaultMetadataService { - pub fn new(table_metadata_manager: TableMetadataManagerRef) -> Self { - Self { - table_metadata_manager, - } - } -} - -#[async_trait] -impl MetadataService for DefaultMetadataService { - async fn create_schema( - &self, - catalog_name: &str, - schema_name: &str, - if_not_exist: bool, - ) -> Result<()> { - let _timer = timer!(crate::metrics::METRIC_META_CREATE_SCHEMA); - - self.table_metadata_manager - .catalog_manager() - .create(CatalogNameKey::new(catalog_name)) - .await - .context(error::TableMetadataManagerSnafu)?; - - increment_counter!(crate::metrics::METRIC_META_CREATE_CATALOG); - info!("Successfully created a catalog: {}", catalog_name); - - let schema = SchemaNameKey::new(catalog_name, schema_name); - - let exist = self - .table_metadata_manager - .schema_manager() - .exist(schema) - .await - .context(error::TableMetadataManagerSnafu)?; - - ensure!( - !exist || if_not_exist, - error::SchemaAlreadyExistsSnafu { schema_name } - ); - - if !exist { - self.table_metadata_manager - .schema_manager() - .create(schema, None) - .await - .context(error::TableMetadataManagerSnafu)?; - - info!("Successfully created a schema: {}", schema_name); - } - - Ok(()) - } - - async fn delete_schema(&self, _catalog_name: &str, _schema_name: &str) -> Result<()> { - unimplemented!() - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use common_meta::key::catalog_name::CatalogNameKey; - use common_meta::key::schema_name::SchemaNameKey; - use common_meta::key::{TableMetaKey, TableMetadataManager}; - - use super::{DefaultMetadataService, MetadataService}; - use crate::service::store::kv::{KvBackendAdapter, KvStoreRef}; - use crate::service::store::memory::MemStore; - - #[tokio::test] - async fn test_create_schema() { - let kv_store = Arc::new(MemStore::default()); - let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( - kv_store.clone(), - ))); - let service = DefaultMetadataService::new(table_metadata_manager); - - service - .create_schema("catalog", "public", false) - .await - .unwrap(); - verify_result(kv_store.clone()).await; - - let result = service.create_schema("catalog", "public", false).await; - assert!(result.is_err()); - - service - .create_schema("catalog", "public", true) - .await - .unwrap(); - verify_result(kv_store.clone()).await; - } - - async fn verify_result(kv_store: KvStoreRef) { - let key = CatalogNameKey::new("catalog").as_raw_key(); - - let result = kv_store.get(&key).await.unwrap(); - let kv = result.unwrap(); - assert_eq!(key, kv.key()); - - let key = SchemaNameKey::new("catalog", "public").as_raw_key(); - - let result = kv_store.get(&key).await.unwrap(); - let kv = result.unwrap(); - assert_eq!(key, kv.key()); - } -} diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index fb320b8afb05..727f6a0c4187 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use std::time::Duration; use api::v1::meta::Peer; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager; use common_meta::ddl::DdlTaskExecutorRef; @@ -36,10 +35,9 @@ use tokio::sync::broadcast::error::RecvError; use crate::cluster::MetaPeerClientRef; use crate::election::{Election, LeaderChangeMessage}; -use crate::error::{RecoverProcedureSnafu, Result}; +use crate::error::{InitMetadataSnafu, RecoverProcedureSnafu, Result}; use crate::handler::HeartbeatHandlerGroup; use crate::lock::DistLockRef; -use crate::metadata_service::MetadataServiceRef; use crate::pubsub::{PublishRef, SubscribeManagerRef}; use crate::selector::{Selector, SelectorType}; use crate::service::mailbox::MailboxRef; @@ -196,7 +194,6 @@ pub struct MetaSrv { election: Option, lock: DistLockRef, procedure_manager: ProcedureManagerRef, - metadata_service: MetadataServiceRef, mailbox: MailboxRef, ddl_executor: DdlTaskExecutorRef, table_metadata_manager: TableMetadataManagerRef, @@ -296,9 +293,10 @@ impl MetaSrv { } async fn create_default_schema_if_not_exist(&self) -> Result<()> { - self.metadata_service - .create_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, true) + self.table_metadata_manager + .init() .await + .context(InitMetadataSnafu) } pub fn shutdown(&self) { diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 1c630a30ab59..39768817c5bd 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -43,7 +43,6 @@ use crate::handler::response_header_handler::ResponseHeaderHandler; use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pushers}; use crate::lock::memory::MemLock; use crate::lock::DistLockRef; -use crate::metadata_service::{DefaultMetadataService, MetadataServiceRef}; use crate::metasrv::{ ElectionRef, MetaSrv, MetaSrvOptions, MetasrvInfo, SelectorContext, SelectorRef, TABLE_ID_SEQ, }; @@ -66,7 +65,6 @@ pub struct MetaSrvBuilder { election: Option, meta_peer_client: Option, lock: Option, - metadata_service: Option, datanode_clients: Option>, pubsub: Option<(PublishRef, SubscribeManagerRef)>, } @@ -82,7 +80,6 @@ impl MetaSrvBuilder { election: None, options: None, lock: None, - metadata_service: None, datanode_clients: None, pubsub: None, } @@ -128,11 +125,6 @@ impl MetaSrvBuilder { self } - pub fn metadata_service(mut self, metadata_service: MetadataServiceRef) -> Self { - self.metadata_service = Some(metadata_service); - self - } - pub fn datanode_clients(mut self, clients: Arc) -> Self { self.datanode_clients = Some(clients); self @@ -155,7 +147,6 @@ impl MetaSrvBuilder { selector, handler_group, lock, - metadata_service, datanode_clients, pubsub, } = self; @@ -174,10 +165,7 @@ impl MetaSrvBuilder { let kv_backend = KvBackendAdapter::wrap(kv_store.clone()); let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_backend.clone())); let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); - let metadata_service = metadata_service - .unwrap_or_else(|| Arc::new(DefaultMetadataService::new(table_metadata_manager))); let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default())); - let table_metadata_manager = build_table_metadata_manager(&kv_store); let ctx = SelectorContext { datanode_lease_secs: options.datanode_lease_secs, server_addr: options.server_addr.clone(), @@ -275,7 +263,6 @@ impl MetaSrvBuilder { election, lock, procedure_manager, - metadata_service, mailbox, ddl_executor: ddl_manager, table_metadata_manager, @@ -333,12 +320,6 @@ fn build_procedure_manager(options: &MetaSrvOptions, kv_store: &KvStoreRef) -> P Arc::new(LocalManager::new(manager_config, state_store)) } -fn build_table_metadata_manager(kv_store: &KvStoreRef) -> TableMetadataManagerRef { - Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( - kv_store.clone(), - ))) -} - fn build_ddl_manager( options: &MetaSrvOptions, datanode_clients: Option>, diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs index 709e410b0b02..483fccfff8df 100644 --- a/src/meta-srv/src/metrics.rs +++ b/src/meta-srv/src/metrics.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) const METRIC_META_CREATE_CATALOG: &str = "meta.create_catalog"; -pub(crate) const METRIC_META_CREATE_SCHEMA: &str = "meta.create_schema"; pub(crate) const METRIC_META_KV_REQUEST: &str = "meta.kv_request"; pub(crate) const METRIC_META_ROUTE_REQUEST: &str = "meta.route_request"; pub(crate) const METRIC_META_HEARTBEAT_CONNECTION_NUM: &str = "meta.heartbeat_connection_num"; diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index d7393ba04863..0ce0b4230fa8 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -20,12 +20,10 @@ use api::v1::meta::heartbeat_server::HeartbeatServer; use api::v1::meta::router_server::RouterServer; use api::v1::meta::store_server::StoreServer; use client::client_manager::DatanodeClients; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::key::TableMetadataManager; use tower::service_fn; -use crate::metadata_service::{DefaultMetadataService, MetadataService}; use crate::metasrv::builder::MetaSrvBuilder; use crate::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef}; use crate::service::store::etcd::EtcdStore; @@ -64,12 +62,8 @@ pub async fn mock( let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( kv_store.clone(), ))); - let metadata_service = DefaultMetadataService::new(table_metadata_manager); - metadata_service - .create_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, true) - .await - .unwrap(); + table_metadata_manager.init().await.unwrap(); let builder = MetaSrvBuilder::new().options(opts).kv_store(kv_store);