Skip to content

Commit

Permalink
fix: fix standalone starts
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Sep 12, 2023
1 parent eb48426 commit 8eacd3a
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 35 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ object-store = { path = "src/object-store" }
partition = { path = "src/partition" }
promql = { path = "src/promql" }
query = { path = "src/query" }
raft-engine = { version = "0.4" }
# TODO(weny): waits for https://github.com/tikv/raft-engine/pull/335
raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "2da7db2c3db29c517f7345b76a21aace34fb764e" }
script = { path = "src/script" }
servers = { path = "src/servers" }
session = { path = "src/session" }
Expand Down
10 changes: 9 additions & 1 deletion src/cmd/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ use snafu::{Location, Snafu};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to create default catalog and schema, source: {}", source))]
InitMetadata {
location: Location,
source: common_meta::error::Error,
},

#[snafu(display("Failed to iter stream, source: {}", source))]
IterStream {
location: Location,
Expand Down Expand Up @@ -182,7 +188,9 @@ impl ErrorExt for Error {
Error::ShutdownMetaServer { source, .. } => source.status_code(),
Error::BuildMetaServer { source, .. } => source.status_code(),
Error::UnsupportedSelectorType { source, .. } => source.status_code(),
Error::IterStream { source, .. } => source.status_code(),
Error::IterStream { source, .. } | Error::InitMetadata { source, .. } => {
source.status_code()
}
Error::MissingConfig { .. }
| Error::LoadLayeredConfig { .. }
| Error::IllegalConfig { .. }
Expand Down
18 changes: 10 additions & 8 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@ use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDat
use frontend::service_config::{
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions,
};
use query::QueryEngineRef;
use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::ResultExt;

use crate::error::{
IllegalConfigSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu,
StartFrontendSnafu,
IllegalConfigSnafu, InitMetadataSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu,
StartDatanodeSnafu, StartFrontendSnafu,
};
use crate::frontend::load_frontend_plugins;
use crate::options::{MixOptions, Options, TopLevelOptions};
Expand Down Expand Up @@ -318,13 +317,18 @@ impl StartCommand {
Arc::new(StandaloneDatanodeManager(region_server.clone())),
));

catalog_manager
.table_metadata_manager_ref()
.init()
.await
.context(InitMetadataSnafu)?;

// TODO: build frontend instance like in distributed mode
let mut frontend = build_frontend(
plugins,
kv_store,
procedure_manager,
catalog_manager,
datanode.query_engine(),
region_server,
)
.await?;
Expand All @@ -344,19 +348,17 @@ async fn build_frontend(
kv_store: KvBackendRef,
procedure_manager: ProcedureManagerRef,
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
region_server: RegionServer,
) -> Result<FeInstance> {
let mut frontend_instance = FeInstance::try_new_standalone(
let frontend_instance = FeInstance::try_new_standalone(
kv_store,
procedure_manager,
catalog_manager,
query_engine,
plugins,
region_server,
)
.await
.context(StartFrontendSnafu)?;
frontend_instance.set_plugins(plugins.clone());
Ok(frontend_instance)
}

Expand Down
4 changes: 1 addition & 3 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,7 @@ impl DdlTaskExecutor for DdlManager {
ctx: &ExecutorContext,
request: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse> {
let cluster_id = ctx.cluster_id.context(error::UnexpectedSnafu {
err_msg: "cluster_id not found",
})?;
let cluster_id = ctx.cluster_id.unwrap_or_default();
info!("Submitting Ddl task: {:?}", request.task);
match request.task {
CreateTable(create_table_task) => {
Expand Down
18 changes: 16 additions & 2 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub mod table_route;
use std::collections::BTreeMap;
use std::sync::Arc;

use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
use lazy_static::lazy_static;
use regex::Regex;
Expand All @@ -67,8 +68,8 @@ use table::metadata::{RawTableInfo, TableId};
use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
use table_name::{TableNameKey, TableNameManager, TableNameValue};

use self::catalog_name::{CatalogManager, CatalogNameValue};
use self::schema_name::{SchemaManager, SchemaNameValue};
use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
use self::table_route::{TableRouteManager, TableRouteValue};
use crate::error::{self, Result, SerdeJsonSnafu};
use crate::kv_backend::txn::Txn;
Expand Down Expand Up @@ -165,6 +166,19 @@ impl TableMetadataManager {
}
}

pub async fn init(&self) -> Result<()> {
let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
if !self.catalog_manager().exist(catalog_name).await? {
self.catalog_manager().create(catalog_name).await?;
}
let schema_name = SchemaNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
if !self.schema_manager().exist(schema_name).await? {
self.schema_manager().create(schema_name, None).await?;
}

Ok(())
}

pub fn table_name_manager(&self) -> &TableNameManager {
&self.table_name_manager
}
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ impl Default for DatanodeOptions {
meta_client_options: None,
wal: WalConfig::default(),
storage: StorageConfig::default(),
region_engine: vec![],
region_engine: vec![RegionEngineConfig::Mito(MitoConfig::default())],
logging: LoggingOptions::default(),
heartbeat: HeartbeatOptions::default(),
enable_telemetry: true,
Expand Down
31 changes: 21 additions & 10 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,12 @@ impl Instance {
meta_backend.clone(),
datanode_clients.clone(),
));
let partition_manager = Arc::new(PartitionRuleManager::new(meta_backend.clone()));

let region_request_handler = DistRegionRequestHandler::arc(catalog_manager.clone());
let region_request_handler = DistRegionRequestHandler::arc(
partition_manager.clone(),
catalog_manager.datanode_manager().clone(),
);

let query_engine = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
Expand All @@ -170,8 +174,6 @@ impl Instance {
)
.query_engine();

let partition_manager = Arc::new(PartitionRuleManager::new(meta_backend.clone()));

let inserter = Arc::new(Inserter::new(
catalog_manager.clone(),
partition_manager.clone(),
Expand Down Expand Up @@ -295,15 +297,28 @@ impl Instance {
kv_backend: KvBackendRef,
procedure_manager: ProcedureManagerRef,
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
plugins: Arc<Plugins>,
region_server: RegionServer,
) -> Result<Self> {
let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone()));
let datanode_manager = Arc::new(StandaloneDatanodeManager(region_server));

let region_request_handler =
DistRegionRequestHandler::arc(partition_manager.clone(), datanode_manager.clone());

let query_engine = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
Some(region_request_handler),
true,
plugins.clone(),
)
.query_engine();

let script_executor =
Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?);

let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));

let datanode_manager = Arc::new(StandaloneDatanodeManager(region_server));
let cache_invalidator = Arc::new(DummyCacheInvalidator);
let ddl_executor = Arc::new(DdlManager::new(
procedure_manager,
Expand Down Expand Up @@ -341,7 +356,7 @@ impl Instance {
script_executor,
statement_executor,
query_engine,
plugins: Default::default(),
plugins,
servers: Arc::new(HashMap::new()),
heartbeat_task: None,
inserter,
Expand All @@ -360,10 +375,6 @@ impl Instance {
&self.catalog_manager
}

pub fn set_plugins(&mut self, map: Arc<Plugins>) {
self.plugins = map;
}

pub fn plugins(&self) -> Arc<Plugins> {
self.plugins.clone()
}
Expand Down
21 changes: 14 additions & 7 deletions src/frontend/src/instance/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,28 @@ use async_trait::async_trait;
use client::error::{HandleRequestSnafu, Result as ClientResult};
use client::region_handler::RegionRequestHandler;
use common_error::ext::BoxedError;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_recordbatch::SendableRecordBatchStream;
use partition::manager::PartitionRuleManagerRef;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;

use crate::catalog::FrontendCatalogManager;
use crate::error::{FindDatanodeSnafu, FindTableRouteSnafu, RequestQuerySnafu, Result};

pub(crate) struct DistRegionRequestHandler {
catalog_manager: Arc<FrontendCatalogManager>,
partition_manager: PartitionRuleManagerRef,
datanode_manager: DatanodeManagerRef,
}

impl DistRegionRequestHandler {
pub fn arc(catalog_manager: Arc<FrontendCatalogManager>) -> Arc<Self> {
Arc::new(Self { catalog_manager })
pub fn arc(
partition_manager: PartitionRuleManagerRef,
datanode_manager: DatanodeManagerRef,
) -> Arc<Self> {
Arc::new(Self {
partition_manager,
datanode_manager,
})
}
}

Expand All @@ -51,8 +59,7 @@ impl DistRegionRequestHandler {
let region_id = RegionId::from_u64(request.region_id);

let table_route = self
.catalog_manager
.partition_manager()
.partition_manager
.find_table_route(region_id.table_id())
.await
.context(FindTableRouteSnafu {
Expand All @@ -64,7 +71,7 @@ impl DistRegionRequestHandler {
region: region_id.region_number(),
})?;

let client = self.catalog_manager.datanode_manager().datanode(peer).await;
let client = self.datanode_manager.datanode(peer).await;

client
.handle_query(request)
Expand Down

0 comments on commit 8eacd3a

Please sign in to comment.