From 8eacd3a79196450de6ea754cba81b114e3129fbb Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 11 Sep 2023 12:51:52 +0000 Subject: [PATCH] fix: fix standalone starts --- Cargo.lock | 3 +-- Cargo.toml | 3 ++- src/cmd/src/error.rs | 10 +++++++- src/cmd/src/standalone.rs | 18 ++++++++------ src/common/meta/src/ddl_manager.rs | 4 +-- src/common/meta/src/key.rs | 18 ++++++++++++-- src/datanode/src/datanode.rs | 2 +- src/frontend/src/instance.rs | 31 ++++++++++++++++-------- src/frontend/src/instance/distributed.rs | 21 ++++++++++------ 9 files changed, 75 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a9a627e712aa..674e88b19c52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7284,8 +7284,7 @@ dependencies = [ [[package]] name = "raft-engine" version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e02bdc8cba47cb7062b433f56700a8edbc9fcd6d706389120d20aa1827e5ba7b" +source = "git+https://github.com/tikv/raft-engine.git?rev=2da7db2c3db29c517f7345b76a21aace34fb764e#2da7db2c3db29c517f7345b76a21aace34fb764e" dependencies = [ "byteorder", "crc32fast", diff --git a/Cargo.toml b/Cargo.toml index cbc256cf06cb..c9e61312cb91 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -153,7 +153,8 @@ object-store = { path = "src/object-store" } partition = { path = "src/partition" } promql = { path = "src/promql" } query = { path = "src/query" } -raft-engine = { version = "0.4" } +# TODO(weny): waits for https://github.com/tikv/raft-engine/pull/335 +raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "2da7db2c3db29c517f7345b76a21aace34fb764e" } script = { path = "src/script" } servers = { path = "src/servers" } session = { path = "src/session" } diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 3c55361e40b1..a8cfb26d18e6 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -23,6 +23,12 @@ use snafu::{Location, Snafu}; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to create default catalog and schema, source: {}", source))] + InitMetadata { + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Failed to iter stream, source: {}", source))] IterStream { location: Location, @@ -182,7 +188,9 @@ impl ErrorExt for Error { Error::ShutdownMetaServer { source, .. } => source.status_code(), Error::BuildMetaServer { source, .. } => source.status_code(), Error::UnsupportedSelectorType { source, .. } => source.status_code(), - Error::IterStream { source, .. } => source.status_code(), + Error::IterStream { source, .. } | Error::InitMetadata { source, .. } => { + source.status_code() + } Error::MissingConfig { .. } | Error::LoadLayeredConfig { .. } | Error::IllegalConfig { .. } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index a0ca94674133..15cb26c3a932 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -31,7 +31,6 @@ use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDat use frontend::service_config::{ GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions, }; -use query::QueryEngineRef; use serde::{Deserialize, Serialize}; use servers::http::HttpOptions; use servers::tls::{TlsMode, TlsOption}; @@ -39,8 +38,8 @@ use servers::Mode; use snafu::ResultExt; use crate::error::{ - IllegalConfigSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, - StartFrontendSnafu, + IllegalConfigSnafu, InitMetadataSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu, + StartDatanodeSnafu, StartFrontendSnafu, }; use crate::frontend::load_frontend_plugins; use crate::options::{MixOptions, Options, TopLevelOptions}; @@ -318,13 +317,18 @@ impl StartCommand { Arc::new(StandaloneDatanodeManager(region_server.clone())), )); + catalog_manager + .table_metadata_manager_ref() + .init() + .await + .context(InitMetadataSnafu)?; + // TODO: build frontend instance like in distributed mode let mut frontend = build_frontend( plugins, kv_store, procedure_manager, catalog_manager, - datanode.query_engine(), region_server, ) .await?; @@ -344,19 +348,17 @@ async fn build_frontend( kv_store: KvBackendRef, procedure_manager: ProcedureManagerRef, catalog_manager: CatalogManagerRef, - query_engine: QueryEngineRef, region_server: RegionServer, ) -> Result { - let mut frontend_instance = FeInstance::try_new_standalone( + let frontend_instance = FeInstance::try_new_standalone( kv_store, procedure_manager, catalog_manager, - query_engine, + plugins, region_server, ) .await .context(StartFrontendSnafu)?; - frontend_instance.set_plugins(plugins.clone()); Ok(frontend_instance) } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index b4787611d71e..dce827bf7cfc 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -360,9 +360,7 @@ impl DdlTaskExecutor for DdlManager { ctx: &ExecutorContext, request: SubmitDdlTaskRequest, ) -> Result { - let cluster_id = ctx.cluster_id.context(error::UnexpectedSnafu { - err_msg: "cluster_id not found", - })?; + let cluster_id = ctx.cluster_id.unwrap_or_default(); info!("Submitting Ddl task: {:?}", request.task); match request.task { CreateTable(create_table_task) => { diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 409568791e5d..9141f7e0fc2e 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -58,6 +58,7 @@ pub mod table_route; use std::collections::BTreeMap; use std::sync::Arc; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue}; use lazy_static::lazy_static; use regex::Regex; @@ -67,8 +68,8 @@ use table::metadata::{RawTableInfo, TableId}; use table_info::{TableInfoKey, TableInfoManager, TableInfoValue}; use table_name::{TableNameKey, TableNameManager, TableNameValue}; -use self::catalog_name::{CatalogManager, CatalogNameValue}; -use self::schema_name::{SchemaManager, SchemaNameValue}; +use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue}; +use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue}; use self::table_route::{TableRouteManager, TableRouteValue}; use crate::error::{self, Result, SerdeJsonSnafu}; use crate::kv_backend::txn::Txn; @@ -165,6 +166,19 @@ impl TableMetadataManager { } } + pub async fn init(&self) -> Result<()> { + let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME); + if !self.catalog_manager().exist(catalog_name).await? { + self.catalog_manager().create(catalog_name).await?; + } + let schema_name = SchemaNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); + if !self.schema_manager().exist(schema_name).await? { + self.schema_manager().create(schema_name, None).await?; + } + + Ok(()) + } + pub fn table_name_manager(&self) -> &TableNameManager { &self.table_name_manager } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index fe9791ee60b4..6033c849269f 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -371,7 +371,7 @@ impl Default for DatanodeOptions { meta_client_options: None, wal: WalConfig::default(), storage: StorageConfig::default(), - region_engine: vec![], + region_engine: vec![RegionEngineConfig::Mito(MitoConfig::default())], logging: LoggingOptions::default(), heartbeat: HeartbeatOptions::default(), enable_telemetry: true, diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 1d1e02fa18e2..d15d7cf7a60d 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -159,8 +159,12 @@ impl Instance { meta_backend.clone(), datanode_clients.clone(), )); + let partition_manager = Arc::new(PartitionRuleManager::new(meta_backend.clone())); - let region_request_handler = DistRegionRequestHandler::arc(catalog_manager.clone()); + let region_request_handler = DistRegionRequestHandler::arc( + partition_manager.clone(), + catalog_manager.datanode_manager().clone(), + ); let query_engine = QueryEngineFactory::new_with_plugins( catalog_manager.clone(), @@ -170,8 +174,6 @@ impl Instance { ) .query_engine(); - let partition_manager = Arc::new(PartitionRuleManager::new(meta_backend.clone())); - let inserter = Arc::new(Inserter::new( catalog_manager.clone(), partition_manager.clone(), @@ -295,15 +297,28 @@ impl Instance { kv_backend: KvBackendRef, procedure_manager: ProcedureManagerRef, catalog_manager: CatalogManagerRef, - query_engine: QueryEngineRef, + plugins: Arc, region_server: RegionServer, ) -> Result { + let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone())); + let datanode_manager = Arc::new(StandaloneDatanodeManager(region_server)); + + let region_request_handler = + DistRegionRequestHandler::arc(partition_manager.clone(), datanode_manager.clone()); + + let query_engine = QueryEngineFactory::new_with_plugins( + catalog_manager.clone(), + Some(region_request_handler), + true, + plugins.clone(), + ) + .query_engine(); + let script_executor = Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?); let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); - let datanode_manager = Arc::new(StandaloneDatanodeManager(region_server)); let cache_invalidator = Arc::new(DummyCacheInvalidator); let ddl_executor = Arc::new(DdlManager::new( procedure_manager, @@ -341,7 +356,7 @@ impl Instance { script_executor, statement_executor, query_engine, - plugins: Default::default(), + plugins, servers: Arc::new(HashMap::new()), heartbeat_task: None, inserter, @@ -360,10 +375,6 @@ impl Instance { &self.catalog_manager } - pub fn set_plugins(&mut self, map: Arc) { - self.plugins = map; - } - pub fn plugins(&self) -> Arc { self.plugins.clone() } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 4a99b9d2ef19..a14264a3ee36 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -19,20 +19,28 @@ use async_trait::async_trait; use client::error::{HandleRequestSnafu, Result as ClientResult}; use client::region_handler::RegionRequestHandler; use common_error::ext::BoxedError; +use common_meta::datanode_manager::DatanodeManagerRef; use common_recordbatch::SendableRecordBatchStream; +use partition::manager::PartitionRuleManagerRef; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; -use crate::catalog::FrontendCatalogManager; use crate::error::{FindDatanodeSnafu, FindTableRouteSnafu, RequestQuerySnafu, Result}; pub(crate) struct DistRegionRequestHandler { - catalog_manager: Arc, + partition_manager: PartitionRuleManagerRef, + datanode_manager: DatanodeManagerRef, } impl DistRegionRequestHandler { - pub fn arc(catalog_manager: Arc) -> Arc { - Arc::new(Self { catalog_manager }) + pub fn arc( + partition_manager: PartitionRuleManagerRef, + datanode_manager: DatanodeManagerRef, + ) -> Arc { + Arc::new(Self { + partition_manager, + datanode_manager, + }) } } @@ -51,8 +59,7 @@ impl DistRegionRequestHandler { let region_id = RegionId::from_u64(request.region_id); let table_route = self - .catalog_manager - .partition_manager() + .partition_manager .find_table_route(region_id.table_id()) .await .context(FindTableRouteSnafu { @@ -64,7 +71,7 @@ impl DistRegionRequestHandler { region: region_id.region_number(), })?; - let client = self.catalog_manager.datanode_manager().datanode(peer).await; + let client = self.datanode_manager.datanode(peer).await; client .handle_query(request)