Skip to content

Commit

Permalink
fix: fix start issues under standalone mode (#2352)
Browse files Browse the repository at this point in the history
* fix: fix standalone starts

* chore: bump raft-engine to 571462e

* refactor: remove MetadataService
  • Loading branch information
WenyXu authored and waynexia committed Sep 12, 2023
1 parent 80c5d52 commit 912341e
Show file tree
Hide file tree
Showing 19 changed files with 102 additions and 73 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ object-store = { path = "src/object-store" }
partition = { path = "src/partition" }
promql = { path = "src/promql" }
query = { path = "src/query" }
raft-engine = { version = "0.4" }
# TODO(weny): waits for https://github.com/tikv/raft-engine/pull/335
raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "571462e36621407b9920465a1a15b8b01b929a7f" }
script = { path = "src/script" }
servers = { path = "src/servers" }
session = { path = "src/session" }
Expand Down
10 changes: 9 additions & 1 deletion src/cmd/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ use snafu::{Location, Snafu};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to create default catalog and schema, source: {}", source))]
InitMetadata {
location: Location,
source: common_meta::error::Error,
},

#[snafu(display("Failed to iter stream, source: {}", source))]
IterStream {
location: Location,
Expand Down Expand Up @@ -182,7 +188,9 @@ impl ErrorExt for Error {
Error::ShutdownMetaServer { source, .. } => source.status_code(),
Error::BuildMetaServer { source, .. } => source.status_code(),
Error::UnsupportedSelectorType { source, .. } => source.status_code(),
Error::IterStream { source, .. } => source.status_code(),
Error::IterStream { source, .. } | Error::InitMetadata { source, .. } => {
source.status_code()
}
Error::MissingConfig { .. }
| Error::LoadLayeredConfig { .. }
| Error::IllegalConfig { .. }
Expand Down
18 changes: 10 additions & 8 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@ use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDat
use frontend::service_config::{
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions,
};
use query::QueryEngineRef;
use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::ResultExt;

use crate::error::{
IllegalConfigSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu,
StartFrontendSnafu,
IllegalConfigSnafu, InitMetadataSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu,
StartDatanodeSnafu, StartFrontendSnafu,
};
use crate::frontend::load_frontend_plugins;
use crate::options::{MixOptions, Options, TopLevelOptions};
Expand Down Expand Up @@ -318,13 +317,18 @@ impl StartCommand {
Arc::new(StandaloneDatanodeManager(region_server.clone())),
));

catalog_manager
.table_metadata_manager_ref()
.init()
.await
.context(InitMetadataSnafu)?;

// TODO: build frontend instance like in distributed mode
let mut frontend = build_frontend(
plugins,
kv_store,
procedure_manager,
catalog_manager,
datanode.query_engine(),
region_server,
)
.await?;
Expand All @@ -344,19 +348,17 @@ async fn build_frontend(
kv_store: KvBackendRef,
procedure_manager: ProcedureManagerRef,
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
region_server: RegionServer,
) -> Result<FeInstance> {
let mut frontend_instance = FeInstance::try_new_standalone(
let frontend_instance = FeInstance::try_new_standalone(
kv_store,
procedure_manager,
catalog_manager,
query_engine,
plugins,
region_server,
)
.await
.context(StartFrontendSnafu)?;
frontend_instance.set_plugins(plugins.clone());
Ok(frontend_instance)
}

Expand Down
1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ etcd-client.workspace = true
futures.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
metrics.workspace = true
prost.workspace = true
regex.workspace = true
serde.workspace = true
Expand Down
4 changes: 1 addition & 3 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,7 @@ impl DdlTaskExecutor for DdlManager {
ctx: &ExecutorContext,
request: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse> {
let cluster_id = ctx.cluster_id.context(error::UnexpectedSnafu {
err_msg: "cluster_id not found",
})?;
let cluster_id = ctx.cluster_id.unwrap_or_default();
info!("Submitting Ddl task: {:?}", request.task);
match request.task {
CreateTable(create_table_task) => {
Expand Down
18 changes: 16 additions & 2 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub mod table_route;
use std::collections::BTreeMap;
use std::sync::Arc;

use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
use lazy_static::lazy_static;
use regex::Regex;
Expand All @@ -67,8 +68,8 @@ use table::metadata::{RawTableInfo, TableId};
use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
use table_name::{TableNameKey, TableNameManager, TableNameValue};

use self::catalog_name::{CatalogManager, CatalogNameValue};
use self::schema_name::{SchemaManager, SchemaNameValue};
use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
use self::table_route::{TableRouteManager, TableRouteValue};
use crate::error::{self, Result, SerdeJsonSnafu};
use crate::kv_backend::txn::Txn;
Expand Down Expand Up @@ -165,6 +166,19 @@ impl TableMetadataManager {
}
}

pub async fn init(&self) -> Result<()> {
let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
if !self.catalog_manager().exist(catalog_name).await? {
self.catalog_manager().create(catalog_name).await?;
}
let schema_name = SchemaNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
if !self.schema_manager().exist(schema_name).await? {
self.schema_manager().create(schema_name, None).await?;
}

Ok(())
}

pub fn table_name_manager(&self) -> &TableNameManager {
&self.table_name_manager
}
Expand Down
4 changes: 4 additions & 0 deletions src/common/meta/src/key/catalog_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ use std::fmt::Display;
use std::sync::Arc;

use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_telemetry::timer;
use futures::stream::BoxStream;
use futures::StreamExt;
use metrics::increment_counter;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};

Expand Down Expand Up @@ -103,11 +105,13 @@ impl CatalogManager {
/// Creates `CatalogNameKey`.
pub async fn create(&self, catalog: CatalogNameKey<'_>) -> Result<()> {
let raw_key = catalog.as_raw_key();
let _timer = timer!(crate::metrics::METRIC_META_CREATE_CATALOG);

let req = PutRequest::new()
.with_key(raw_key)
.with_value(CatalogNameValue.try_as_raw_value()?);
self.kv_backend.put(req).await?;
increment_counter!(crate::metrics::METRIC_META_CREATE_CATALOG);

Ok(())
}
Expand Down
5 changes: 5 additions & 0 deletions src/common/meta/src/key/schema_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ use std::sync::Arc;
use std::time::Duration;

use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_telemetry::timer;
use futures::stream::BoxStream;
use futures::StreamExt;
use humantime_serde::re::humantime;
use metrics::increment_counter;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};

Expand Down Expand Up @@ -143,12 +145,15 @@ impl SchemaManager {
schema: SchemaNameKey<'_>,
value: Option<SchemaNameValue>,
) -> Result<()> {
let _timer = timer!(crate::metrics::METRIC_META_CREATE_SCHEMA);

let raw_key = schema.as_raw_key();
let req = PutRequest::new()
.with_key(raw_key)
.with_value(value.unwrap_or_default().try_as_raw_value()?);

self.kv_backend.put(req).await?;
increment_counter!(crate::metrics::METRIC_META_CREATE_SCHEMA);

Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

pub const METRIC_META_TXN_REQUEST: &str = "meta.txn_request";

pub(crate) const METRIC_META_CREATE_CATALOG: &str = "meta.create_catalog";
pub(crate) const METRIC_META_CREATE_SCHEMA: &str = "meta.create_schema";
pub(crate) const METRIC_META_PROCEDURE_CREATE_TABLE: &str = "meta.procedure.create_table";
pub(crate) const METRIC_META_PROCEDURE_DROP_TABLE: &str = "meta.procedure.drop_table";
pub(crate) const METRIC_META_PROCEDURE_ALTER_TABLE: &str = "meta.procedure.alter_table";
31 changes: 21 additions & 10 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,12 @@ impl Instance {
meta_backend.clone(),
datanode_clients.clone(),
));
let partition_manager = Arc::new(PartitionRuleManager::new(meta_backend.clone()));

let region_request_handler = DistRegionRequestHandler::arc(catalog_manager.clone());
let region_request_handler = DistRegionRequestHandler::arc(
partition_manager.clone(),
catalog_manager.datanode_manager().clone(),
);

let query_engine = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
Expand All @@ -170,8 +174,6 @@ impl Instance {
)
.query_engine();

let partition_manager = Arc::new(PartitionRuleManager::new(meta_backend.clone()));

let inserter = Arc::new(Inserter::new(
catalog_manager.clone(),
partition_manager.clone(),
Expand Down Expand Up @@ -295,15 +297,28 @@ impl Instance {
kv_backend: KvBackendRef,
procedure_manager: ProcedureManagerRef,
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
plugins: Arc<Plugins>,
region_server: RegionServer,
) -> Result<Self> {
let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone()));
let datanode_manager = Arc::new(StandaloneDatanodeManager(region_server));

let region_request_handler =
DistRegionRequestHandler::arc(partition_manager.clone(), datanode_manager.clone());

let query_engine = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
Some(region_request_handler),
true,
plugins.clone(),
)
.query_engine();

let script_executor =
Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?);

let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));

let datanode_manager = Arc::new(StandaloneDatanodeManager(region_server));
let cache_invalidator = Arc::new(DummyCacheInvalidator);
let ddl_executor = Arc::new(DdlManager::new(
procedure_manager,
Expand Down Expand Up @@ -341,7 +356,7 @@ impl Instance {
script_executor,
statement_executor,
query_engine,
plugins: Default::default(),
plugins,
servers: Arc::new(HashMap::new()),
heartbeat_task: None,
inserter,
Expand All @@ -360,10 +375,6 @@ impl Instance {
&self.catalog_manager
}

pub fn set_plugins(&mut self, map: Arc<Plugins>) {
self.plugins = map;
}

pub fn plugins(&self) -> Arc<Plugins> {
self.plugins.clone()
}
Expand Down
21 changes: 14 additions & 7 deletions src/frontend/src/instance/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,28 @@ use async_trait::async_trait;
use client::error::{HandleRequestSnafu, Result as ClientResult};
use client::region_handler::RegionRequestHandler;
use common_error::ext::BoxedError;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_recordbatch::SendableRecordBatchStream;
use partition::manager::PartitionRuleManagerRef;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;

use crate::catalog::FrontendCatalogManager;
use crate::error::{FindDatanodeSnafu, FindTableRouteSnafu, RequestQuerySnafu, Result};

pub(crate) struct DistRegionRequestHandler {
catalog_manager: Arc<FrontendCatalogManager>,
partition_manager: PartitionRuleManagerRef,
datanode_manager: DatanodeManagerRef,
}

impl DistRegionRequestHandler {
pub fn arc(catalog_manager: Arc<FrontendCatalogManager>) -> Arc<Self> {
Arc::new(Self { catalog_manager })
pub fn arc(
partition_manager: PartitionRuleManagerRef,
datanode_manager: DatanodeManagerRef,
) -> Arc<Self> {
Arc::new(Self {
partition_manager,
datanode_manager,
})
}
}

Expand All @@ -51,8 +59,7 @@ impl DistRegionRequestHandler {
let region_id = RegionId::from_u64(request.region_id);

let table_route = self
.catalog_manager
.partition_manager()
.partition_manager
.find_table_route(region_id.table_id())
.await
.context(FindTableRouteSnafu {
Expand All @@ -64,7 +71,7 @@ impl DistRegionRequestHandler {
region: region_id.region_number(),
})?;

let client = self.catalog_manager.datanode_manager().datanode(peer).await;
let client = self.datanode_manager.datanode(peer).await;

client
.handle_query(request)
Expand Down
8 changes: 8 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ use crate::pubsub::Message;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to create default catalog and schema, source: {}", source))]
InitMetadata {
location: Location,
source: common_meta::error::Error,
},

#[snafu(display("Failed to allocate next sequence number: {}", source))]
NextSequence {
location: Location,
Expand Down Expand Up @@ -612,6 +618,8 @@ impl ErrorExt for Error {
| Error::ConvertEtcdTxnObject { source, .. }
| Error::GetFullTableInfo { source, .. } => source.status_code(),

Error::InitMetadata { source, .. } => source.status_code(),

Error::Other { source, .. } => source.status_code(),
}
}
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub mod handler;
pub mod keys;
pub mod lease;
pub mod lock;

pub mod metadata_service;
pub mod metasrv;
mod metrics;
Expand Down
Loading

0 comments on commit 912341e

Please sign in to comment.