diff --git a/Cargo.lock b/Cargo.lock index a9a627e712aa..9edc6dcc7045 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1859,6 +1859,7 @@ dependencies = [ "humantime-serde", "hyper", "lazy_static", + "metrics", "prost", "regex", "serde", @@ -7284,8 +7285,7 @@ dependencies = [ [[package]] name = "raft-engine" version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e02bdc8cba47cb7062b433f56700a8edbc9fcd6d706389120d20aa1827e5ba7b" +source = "git+https://github.com/tikv/raft-engine.git?rev=571462e36621407b9920465a1a15b8b01b929a7f#571462e36621407b9920465a1a15b8b01b929a7f" dependencies = [ "byteorder", "crc32fast", diff --git a/Cargo.toml b/Cargo.toml index cbc256cf06cb..c26db0e06854 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -153,7 +153,8 @@ object-store = { path = "src/object-store" } partition = { path = "src/partition" } promql = { path = "src/promql" } query = { path = "src/query" } -raft-engine = { version = "0.4" } +# TODO(weny): waits for https://github.com/tikv/raft-engine/pull/335 +raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "571462e36621407b9920465a1a15b8b01b929a7f" } script = { path = "src/script" } servers = { path = "src/servers" } session = { path = "src/session" } diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 3c55361e40b1..a8cfb26d18e6 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -23,6 +23,12 @@ use snafu::{Location, Snafu}; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to create default catalog and schema, source: {}", source))] + InitMetadata { + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Failed to iter stream, source: {}", source))] IterStream { location: Location, @@ -182,7 +188,9 @@ impl ErrorExt for Error { Error::ShutdownMetaServer { source, .. } => source.status_code(), Error::BuildMetaServer { source, .. } => source.status_code(), Error::UnsupportedSelectorType { source, .. } => source.status_code(), - Error::IterStream { source, .. } => source.status_code(), + Error::IterStream { source, .. } | Error::InitMetadata { source, .. } => { + source.status_code() + } Error::MissingConfig { .. } | Error::LoadLayeredConfig { .. } | Error::IllegalConfig { .. } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index a0ca94674133..15cb26c3a932 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -31,7 +31,6 @@ use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDat use frontend::service_config::{ GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions, }; -use query::QueryEngineRef; use serde::{Deserialize, Serialize}; use servers::http::HttpOptions; use servers::tls::{TlsMode, TlsOption}; @@ -39,8 +38,8 @@ use servers::Mode; use snafu::ResultExt; use crate::error::{ - IllegalConfigSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, - StartFrontendSnafu, + IllegalConfigSnafu, InitMetadataSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu, + StartDatanodeSnafu, StartFrontendSnafu, }; use crate::frontend::load_frontend_plugins; use crate::options::{MixOptions, Options, TopLevelOptions}; @@ -318,13 +317,18 @@ impl StartCommand { Arc::new(StandaloneDatanodeManager(region_server.clone())), )); + catalog_manager + .table_metadata_manager_ref() + .init() + .await + .context(InitMetadataSnafu)?; + // TODO: build frontend instance like in distributed mode let mut frontend = build_frontend( plugins, kv_store, procedure_manager, catalog_manager, - datanode.query_engine(), region_server, ) .await?; @@ -344,19 +348,17 @@ async fn build_frontend( kv_store: KvBackendRef, procedure_manager: ProcedureManagerRef, catalog_manager: CatalogManagerRef, - query_engine: QueryEngineRef, region_server: RegionServer, ) -> Result { - let mut frontend_instance = FeInstance::try_new_standalone( + let frontend_instance = FeInstance::try_new_standalone( kv_store, procedure_manager, catalog_manager, - query_engine, + plugins, region_server, ) .await .context(StartFrontendSnafu)?; - frontend_instance.set_plugins(plugins.clone()); Ok(frontend_instance) } diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 2bd4a56ab49a..566e49f05bea 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -22,6 +22,7 @@ etcd-client.workspace = true futures.workspace = true humantime-serde.workspace = true lazy_static.workspace = true +metrics.workspace = true prost.workspace = true regex.workspace = true serde.workspace = true diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index b4787611d71e..dce827bf7cfc 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -360,9 +360,7 @@ impl DdlTaskExecutor for DdlManager { ctx: &ExecutorContext, request: SubmitDdlTaskRequest, ) -> Result { - let cluster_id = ctx.cluster_id.context(error::UnexpectedSnafu { - err_msg: "cluster_id not found", - })?; + let cluster_id = ctx.cluster_id.unwrap_or_default(); info!("Submitting Ddl task: {:?}", request.task); match request.task { CreateTable(create_table_task) => { diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 409568791e5d..9141f7e0fc2e 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -58,6 +58,7 @@ pub mod table_route; use std::collections::BTreeMap; use std::sync::Arc; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue}; use lazy_static::lazy_static; use regex::Regex; @@ -67,8 +68,8 @@ use table::metadata::{RawTableInfo, TableId}; use table_info::{TableInfoKey, TableInfoManager, TableInfoValue}; use table_name::{TableNameKey, TableNameManager, TableNameValue}; -use self::catalog_name::{CatalogManager, CatalogNameValue}; -use self::schema_name::{SchemaManager, SchemaNameValue}; +use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue}; +use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue}; use self::table_route::{TableRouteManager, TableRouteValue}; use crate::error::{self, Result, SerdeJsonSnafu}; use crate::kv_backend::txn::Txn; @@ -165,6 +166,19 @@ impl TableMetadataManager { } } + pub async fn init(&self) -> Result<()> { + let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME); + if !self.catalog_manager().exist(catalog_name).await? { + self.catalog_manager().create(catalog_name).await?; + } + let schema_name = SchemaNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); + if !self.schema_manager().exist(schema_name).await? { + self.schema_manager().create(schema_name, None).await?; + } + + Ok(()) + } + pub fn table_name_manager(&self) -> &TableNameManager { &self.table_name_manager } diff --git a/src/common/meta/src/key/catalog_name.rs b/src/common/meta/src/key/catalog_name.rs index c6c727c0af39..6debb5af9356 100644 --- a/src/common/meta/src/key/catalog_name.rs +++ b/src/common/meta/src/key/catalog_name.rs @@ -16,8 +16,10 @@ use std::fmt::Display; use std::sync::Arc; use common_catalog::consts::DEFAULT_CATALOG_NAME; +use common_telemetry::timer; use futures::stream::BoxStream; use futures::StreamExt; +use metrics::increment_counter; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -103,11 +105,13 @@ impl CatalogManager { /// Creates `CatalogNameKey`. pub async fn create(&self, catalog: CatalogNameKey<'_>) -> Result<()> { let raw_key = catalog.as_raw_key(); + let _timer = timer!(crate::metrics::METRIC_META_CREATE_CATALOG); let req = PutRequest::new() .with_key(raw_key) .with_value(CatalogNameValue.try_as_raw_value()?); self.kv_backend.put(req).await?; + increment_counter!(crate::metrics::METRIC_META_CREATE_CATALOG); Ok(()) } diff --git a/src/common/meta/src/key/schema_name.rs b/src/common/meta/src/key/schema_name.rs index c162e31b1617..d65b08bc2899 100644 --- a/src/common/meta/src/key/schema_name.rs +++ b/src/common/meta/src/key/schema_name.rs @@ -18,9 +18,11 @@ use std::sync::Arc; use std::time::Duration; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_telemetry::timer; use futures::stream::BoxStream; use futures::StreamExt; use humantime_serde::re::humantime; +use metrics::increment_counter; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -143,12 +145,15 @@ impl SchemaManager { schema: SchemaNameKey<'_>, value: Option, ) -> Result<()> { + let _timer = timer!(crate::metrics::METRIC_META_CREATE_SCHEMA); + let raw_key = schema.as_raw_key(); let req = PutRequest::new() .with_key(raw_key) .with_value(value.unwrap_or_default().try_as_raw_value()?); self.kv_backend.put(req).await?; + increment_counter!(crate::metrics::METRIC_META_CREATE_SCHEMA); Ok(()) } diff --git a/src/common/meta/src/metrics.rs b/src/common/meta/src/metrics.rs index ed672a772133..49535607af72 100644 --- a/src/common/meta/src/metrics.rs +++ b/src/common/meta/src/metrics.rs @@ -14,6 +14,8 @@ pub const METRIC_META_TXN_REQUEST: &str = "meta.txn_request"; +pub(crate) const METRIC_META_CREATE_CATALOG: &str = "meta.create_catalog"; +pub(crate) const METRIC_META_CREATE_SCHEMA: &str = "meta.create_schema"; pub(crate) const METRIC_META_PROCEDURE_CREATE_TABLE: &str = "meta.procedure.create_table"; pub(crate) const METRIC_META_PROCEDURE_DROP_TABLE: &str = "meta.procedure.drop_table"; pub(crate) const METRIC_META_PROCEDURE_ALTER_TABLE: &str = "meta.procedure.alter_table"; diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index fe9791ee60b4..6033c849269f 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -371,7 +371,7 @@ impl Default for DatanodeOptions { meta_client_options: None, wal: WalConfig::default(), storage: StorageConfig::default(), - region_engine: vec![], + region_engine: vec![RegionEngineConfig::Mito(MitoConfig::default())], logging: LoggingOptions::default(), heartbeat: HeartbeatOptions::default(), enable_telemetry: true, diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 1d1e02fa18e2..d15d7cf7a60d 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -159,8 +159,12 @@ impl Instance { meta_backend.clone(), datanode_clients.clone(), )); + let partition_manager = Arc::new(PartitionRuleManager::new(meta_backend.clone())); - let region_request_handler = DistRegionRequestHandler::arc(catalog_manager.clone()); + let region_request_handler = DistRegionRequestHandler::arc( + partition_manager.clone(), + catalog_manager.datanode_manager().clone(), + ); let query_engine = QueryEngineFactory::new_with_plugins( catalog_manager.clone(), @@ -170,8 +174,6 @@ impl Instance { ) .query_engine(); - let partition_manager = Arc::new(PartitionRuleManager::new(meta_backend.clone())); - let inserter = Arc::new(Inserter::new( catalog_manager.clone(), partition_manager.clone(), @@ -295,15 +297,28 @@ impl Instance { kv_backend: KvBackendRef, procedure_manager: ProcedureManagerRef, catalog_manager: CatalogManagerRef, - query_engine: QueryEngineRef, + plugins: Arc, region_server: RegionServer, ) -> Result { + let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone())); + let datanode_manager = Arc::new(StandaloneDatanodeManager(region_server)); + + let region_request_handler = + DistRegionRequestHandler::arc(partition_manager.clone(), datanode_manager.clone()); + + let query_engine = QueryEngineFactory::new_with_plugins( + catalog_manager.clone(), + Some(region_request_handler), + true, + plugins.clone(), + ) + .query_engine(); + let script_executor = Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?); let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); - let datanode_manager = Arc::new(StandaloneDatanodeManager(region_server)); let cache_invalidator = Arc::new(DummyCacheInvalidator); let ddl_executor = Arc::new(DdlManager::new( procedure_manager, @@ -341,7 +356,7 @@ impl Instance { script_executor, statement_executor, query_engine, - plugins: Default::default(), + plugins, servers: Arc::new(HashMap::new()), heartbeat_task: None, inserter, @@ -360,10 +375,6 @@ impl Instance { &self.catalog_manager } - pub fn set_plugins(&mut self, map: Arc) { - self.plugins = map; - } - pub fn plugins(&self) -> Arc { self.plugins.clone() } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 4a99b9d2ef19..a14264a3ee36 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -19,20 +19,28 @@ use async_trait::async_trait; use client::error::{HandleRequestSnafu, Result as ClientResult}; use client::region_handler::RegionRequestHandler; use common_error::ext::BoxedError; +use common_meta::datanode_manager::DatanodeManagerRef; use common_recordbatch::SendableRecordBatchStream; +use partition::manager::PartitionRuleManagerRef; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; -use crate::catalog::FrontendCatalogManager; use crate::error::{FindDatanodeSnafu, FindTableRouteSnafu, RequestQuerySnafu, Result}; pub(crate) struct DistRegionRequestHandler { - catalog_manager: Arc, + partition_manager: PartitionRuleManagerRef, + datanode_manager: DatanodeManagerRef, } impl DistRegionRequestHandler { - pub fn arc(catalog_manager: Arc) -> Arc { - Arc::new(Self { catalog_manager }) + pub fn arc( + partition_manager: PartitionRuleManagerRef, + datanode_manager: DatanodeManagerRef, + ) -> Arc { + Arc::new(Self { + partition_manager, + datanode_manager, + }) } } @@ -51,8 +59,7 @@ impl DistRegionRequestHandler { let region_id = RegionId::from_u64(request.region_id); let table_route = self - .catalog_manager - .partition_manager() + .partition_manager .find_table_route(region_id.table_id()) .await .context(FindTableRouteSnafu { @@ -64,7 +71,7 @@ impl DistRegionRequestHandler { region: region_id.region_number(), })?; - let client = self.catalog_manager.datanode_manager().datanode(peer).await; + let client = self.datanode_manager.datanode(peer).await; client .handle_query(request) diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 37b835c49609..9d96b333ad47 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -26,6 +26,12 @@ use crate::pubsub::Message; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to create default catalog and schema, source: {}", source))] + InitMetadata { + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Failed to allocate next sequence number: {}", source))] NextSequence { location: Location, @@ -612,6 +618,8 @@ impl ErrorExt for Error { | Error::ConvertEtcdTxnObject { source, .. } | Error::GetFullTableInfo { source, .. } => source.status_code(), + Error::InitMetadata { source, .. } => source.status_code(), + Error::Other { source, .. } => source.status_code(), } } diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 4e5b26e1f532..f2a49e7452a5 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -25,6 +25,7 @@ pub mod handler; pub mod keys; pub mod lease; pub mod lock; + pub mod metadata_service; pub mod metasrv; mod metrics; diff --git a/src/meta-srv/src/metadata_service.rs b/src/meta-srv/src/metadata_service.rs index 09efdbe6d308..8816abb8bc65 100644 --- a/src/meta-srv/src/metadata_service.rs +++ b/src/meta-srv/src/metadata_service.rs @@ -18,8 +18,7 @@ use async_trait::async_trait; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; use common_meta::key::TableMetadataManagerRef; -use common_telemetry::{info, timer}; -use metrics::increment_counter; +use common_telemetry::info; use snafu::{ensure, ResultExt}; use crate::error; @@ -62,15 +61,12 @@ impl MetadataService for DefaultMetadataService { schema_name: &str, if_not_exist: bool, ) -> Result<()> { - let _timer = timer!(crate::metrics::METRIC_META_CREATE_SCHEMA); - self.table_metadata_manager .catalog_manager() .create(CatalogNameKey::new(catalog_name)) .await .context(error::TableMetadataManagerSnafu)?; - increment_counter!(crate::metrics::METRIC_META_CREATE_CATALOG); info!("Successfully created a catalog: {}", catalog_name); let schema = SchemaNameKey::new(catalog_name, schema_name); diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index fb320b8afb05..727f6a0c4187 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use std::time::Duration; use api::v1::meta::Peer; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager; use common_meta::ddl::DdlTaskExecutorRef; @@ -36,10 +35,9 @@ use tokio::sync::broadcast::error::RecvError; use crate::cluster::MetaPeerClientRef; use crate::election::{Election, LeaderChangeMessage}; -use crate::error::{RecoverProcedureSnafu, Result}; +use crate::error::{InitMetadataSnafu, RecoverProcedureSnafu, Result}; use crate::handler::HeartbeatHandlerGroup; use crate::lock::DistLockRef; -use crate::metadata_service::MetadataServiceRef; use crate::pubsub::{PublishRef, SubscribeManagerRef}; use crate::selector::{Selector, SelectorType}; use crate::service::mailbox::MailboxRef; @@ -196,7 +194,6 @@ pub struct MetaSrv { election: Option, lock: DistLockRef, procedure_manager: ProcedureManagerRef, - metadata_service: MetadataServiceRef, mailbox: MailboxRef, ddl_executor: DdlTaskExecutorRef, table_metadata_manager: TableMetadataManagerRef, @@ -296,9 +293,10 @@ impl MetaSrv { } async fn create_default_schema_if_not_exist(&self) -> Result<()> { - self.metadata_service - .create_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, true) + self.table_metadata_manager + .init() .await + .context(InitMetadataSnafu) } pub fn shutdown(&self) { diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 1c630a30ab59..39768817c5bd 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -43,7 +43,6 @@ use crate::handler::response_header_handler::ResponseHeaderHandler; use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pushers}; use crate::lock::memory::MemLock; use crate::lock::DistLockRef; -use crate::metadata_service::{DefaultMetadataService, MetadataServiceRef}; use crate::metasrv::{ ElectionRef, MetaSrv, MetaSrvOptions, MetasrvInfo, SelectorContext, SelectorRef, TABLE_ID_SEQ, }; @@ -66,7 +65,6 @@ pub struct MetaSrvBuilder { election: Option, meta_peer_client: Option, lock: Option, - metadata_service: Option, datanode_clients: Option>, pubsub: Option<(PublishRef, SubscribeManagerRef)>, } @@ -82,7 +80,6 @@ impl MetaSrvBuilder { election: None, options: None, lock: None, - metadata_service: None, datanode_clients: None, pubsub: None, } @@ -128,11 +125,6 @@ impl MetaSrvBuilder { self } - pub fn metadata_service(mut self, metadata_service: MetadataServiceRef) -> Self { - self.metadata_service = Some(metadata_service); - self - } - pub fn datanode_clients(mut self, clients: Arc) -> Self { self.datanode_clients = Some(clients); self @@ -155,7 +147,6 @@ impl MetaSrvBuilder { selector, handler_group, lock, - metadata_service, datanode_clients, pubsub, } = self; @@ -174,10 +165,7 @@ impl MetaSrvBuilder { let kv_backend = KvBackendAdapter::wrap(kv_store.clone()); let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_backend.clone())); let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); - let metadata_service = metadata_service - .unwrap_or_else(|| Arc::new(DefaultMetadataService::new(table_metadata_manager))); let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default())); - let table_metadata_manager = build_table_metadata_manager(&kv_store); let ctx = SelectorContext { datanode_lease_secs: options.datanode_lease_secs, server_addr: options.server_addr.clone(), @@ -275,7 +263,6 @@ impl MetaSrvBuilder { election, lock, procedure_manager, - metadata_service, mailbox, ddl_executor: ddl_manager, table_metadata_manager, @@ -333,12 +320,6 @@ fn build_procedure_manager(options: &MetaSrvOptions, kv_store: &KvStoreRef) -> P Arc::new(LocalManager::new(manager_config, state_store)) } -fn build_table_metadata_manager(kv_store: &KvStoreRef) -> TableMetadataManagerRef { - Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( - kv_store.clone(), - ))) -} - fn build_ddl_manager( options: &MetaSrvOptions, datanode_clients: Option>, diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs index 709e410b0b02..483fccfff8df 100644 --- a/src/meta-srv/src/metrics.rs +++ b/src/meta-srv/src/metrics.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) const METRIC_META_CREATE_CATALOG: &str = "meta.create_catalog"; -pub(crate) const METRIC_META_CREATE_SCHEMA: &str = "meta.create_schema"; pub(crate) const METRIC_META_KV_REQUEST: &str = "meta.kv_request"; pub(crate) const METRIC_META_ROUTE_REQUEST: &str = "meta.route_request"; pub(crate) const METRIC_META_HEARTBEAT_CONNECTION_NUM: &str = "meta.heartbeat_connection_num"; diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index d7393ba04863..0ce0b4230fa8 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -20,12 +20,10 @@ use api::v1::meta::heartbeat_server::HeartbeatServer; use api::v1::meta::router_server::RouterServer; use api::v1::meta::store_server::StoreServer; use client::client_manager::DatanodeClients; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::key::TableMetadataManager; use tower::service_fn; -use crate::metadata_service::{DefaultMetadataService, MetadataService}; use crate::metasrv::builder::MetaSrvBuilder; use crate::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef}; use crate::service::store::etcd::EtcdStore; @@ -64,12 +62,8 @@ pub async fn mock( let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( kv_store.clone(), ))); - let metadata_service = DefaultMetadataService::new(table_metadata_manager); - metadata_service - .create_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, true) - .await - .unwrap(); + table_metadata_manager.init().await.unwrap(); let builder = MetaSrvBuilder::new().options(opts).kv_store(kv_store);