From 64a36e9b3668e22e3267ab6f0aa2a811d6852bb4 Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Fri, 24 Nov 2023 16:16:21 +0800 Subject: [PATCH] refactor: start datanode more flexibly (#2800) * refactor: start datanode more flexibly * Update src/datanode/src/datanode.rs Co-authored-by: Weny Xu * fix: resolve PR comments * Apply suggestions from code review Co-authored-by: JeremyHi --------- Co-authored-by: Weny Xu Co-authored-by: JeremyHi --- src/cmd/src/datanode.rs | 26 ++- src/cmd/src/error.rs | 11 +- src/cmd/src/options.rs | 2 +- src/cmd/src/standalone.rs | 24 ++- src/datanode/Cargo.toml | 1 + src/datanode/src/datanode.rs | 242 ++++++++++++++++++---------- src/datanode/src/error.rs | 10 +- src/datanode/src/lib.rs | 1 - src/datanode/src/region_server.rs | 88 +++++++--- src/datanode/src/server.rs | 95 ----------- src/frontend/src/instance.rs | 6 +- src/frontend/src/server.rs | 16 +- src/servers/src/server.rs | 11 ++ tests-integration/src/cluster.rs | 8 +- tests-integration/src/standalone.rs | 10 +- 15 files changed, 297 insertions(+), 254 deletions(-) delete mode 100644 src/datanode/src/server.rs diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 9babe11aaf37..c007ef8a70e9 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -12,15 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; use std::time::Duration; +use catalog::kvbackend::MetaKvBackend; use clap::Parser; use common_telemetry::logging; use datanode::config::DatanodeOptions; use datanode::datanode::{Datanode, DatanodeBuilder}; use meta_client::MetaClientOptions; use servers::Mode; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use crate::error::{MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu}; use crate::options::{Options, TopLevelOptions}; @@ -177,7 +179,27 @@ impl StartCommand { logging::info!("Datanode start command: {:#?}", self); logging::info!("Datanode options: {:#?}", opts); - let datanode = DatanodeBuilder::new(opts, None, plugins) + let node_id = opts + .node_id + .context(MissingConfigSnafu { msg: "'node_id'" })?; + + let meta_config = opts.meta_client.as_ref().context(MissingConfigSnafu { + msg: "'meta_client_options'", + })?; + + let meta_client = datanode::heartbeat::new_metasrv_client(node_id, meta_config) + .await + .context(StartDatanodeSnafu)?; + + let meta_backend = Arc::new(MetaKvBackend { + client: Arc::new(meta_client.clone()), + }); + + let datanode = DatanodeBuilder::new(opts, plugins) + .with_meta_client(meta_client) + .with_kv_backend(meta_backend) + .enable_region_server_service() + .enable_http_service() .build() .await .context(StartDatanodeSnafu)?; diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 40cad2e44ec8..30eeff465c13 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -225,6 +225,13 @@ pub enum Error { #[snafu(source)] error: std::io::Error, }, + + #[snafu(display("Failed to parse address {}", addr))] + ParseAddr { + addr: String, + #[snafu(source)] + error: std::net::AddrParseError, + }, } pub type Result = std::result::Result; @@ -252,7 +259,9 @@ impl ErrorExt for Error { | Error::NotDataFromOutput { .. } | Error::CreateDir { .. } | Error::EmptyResult { .. } - | Error::InvalidDatabaseName { .. } => StatusCode::InvalidArguments, + | Error::InvalidDatabaseName { .. } + | Error::ParseAddr { .. } => StatusCode::InvalidArguments, + Error::StartProcedureManager { source, .. } | Error::StopProcedureManager { source, .. } => source.status_code(), Error::ReplCreation { .. } | Error::Readline { .. } => StatusCode::Internal, diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 9127f4502698..bc340d588a3e 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -28,7 +28,7 @@ pub const ENV_VAR_SEP: &str = "__"; pub const ENV_LIST_SEP: &str = ","; /// Options mixed up from datanode, frontend and metasrv. -#[derive(Serialize)] +#[derive(Serialize, Debug)] pub struct MixOptions { pub data_home: String, pub procedure: ProcedureConfig, diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index c7adfd0834d9..5483c6c74724 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -169,9 +169,7 @@ pub struct Instance { impl Instance { pub async fn start(&mut self) -> Result<()> { - // Start datanode instance before starting services, to avoid requests come in before internal components are started. - self.datanode.start().await.context(StartDatanodeSnafu)?; - info!("Datanode instance started"); + self.datanode.start_telemetry(); self.procedure_manager .start() @@ -325,10 +323,8 @@ impl StartCommand { let dn_opts = opts.datanode.clone(); info!("Standalone start command: {:#?}", self); - info!( - "Standalone frontend options: {:#?}, datanode options: {:#?}", - fe_opts, dn_opts - ); + + info!("Building standalone instance with {opts:#?}"); // Ensure the data_home directory exists. fs::create_dir_all(path::Path::new(&opts.data_home)).context(CreateDirSnafu { @@ -344,14 +340,12 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; - let datanode = DatanodeBuilder::new( - dn_opts.clone(), - Some(kv_backend.clone()), - Default::default(), - ) - .build() - .await - .context(StartDatanodeSnafu)?; + let datanode = DatanodeBuilder::new(dn_opts, fe_plugins.clone()) + .with_kv_backend(kv_backend.clone()) + .build() + .await + .context(StartDatanodeSnafu)?; + let region_server = datanode.region_server(); let catalog_manager = KvBackendCatalogManager::new( diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 08773eaf98d5..323293ff0ee1 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -17,6 +17,7 @@ axum = "0.6" axum-macros = "0.3" bytes = "1.1" catalog.workspace = true +client.workspace = true common-base.workspace = true common-catalog.workspace = true common-config.workspace = true diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 9dc157f3f943..f8a658f55aed 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -14,10 +14,11 @@ //! Datanode implementation. +use std::collections::HashMap; +use std::net::SocketAddr; use std::path::Path; use std::sync::Arc; -use catalog::kvbackend::MetaKvBackend; use catalog::memory::MemoryCatalogManager; use common_base::Plugins; use common_error::ext::BoxedError; @@ -26,8 +27,9 @@ use common_meta::key::datanode_table::DatanodeTableManager; use common_meta::kv_backend::KvBackendRef; pub use common_procedure::options::ProcedureConfig; use common_runtime::Runtime; -use common_telemetry::{error, info}; +use common_telemetry::{error, info, warn}; use file_engine::engine::FileRegionEngine; +use futures::future; use futures_util::future::try_join_all; use futures_util::StreamExt; use log_store::raft_engine::log_store::RaftEngineLogStore; @@ -36,6 +38,10 @@ use mito2::engine::MitoEngine; use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; use object_store::util::normalize_dir; use query::QueryEngineFactory; +use servers::grpc::{GrpcServer, GrpcServerConfig}; +use servers::http::HttpServerBuilder; +use servers::metrics_handler::MetricsHandler; +use servers::server::{start_server, ServerHandler, ServerHandlers}; use servers::Mode; use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; @@ -48,26 +54,26 @@ use tokio::sync::Notify; use crate::config::{DatanodeOptions, RegionEngineConfig}; use crate::error::{ - CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu, MissingMetaClientSnafu, - MissingMetasrvOptsSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, - ShutdownInstanceSnafu, + CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, + ParseAddrSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu, ShutdownServerSnafu, + StartServerSnafu, }; use crate::event_listener::{ new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef, RegionServerEventReceiver, }; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; -use crate::heartbeat::{new_metasrv_client, HeartbeatTask}; -use crate::region_server::RegionServer; -use crate::server::Services; +use crate::heartbeat::HeartbeatTask; +use crate::region_server::{DummyTableProviderFactory, RegionServer}; use crate::store; const OPEN_REGION_PARALLELISM: usize = 16; +const REGION_SERVER_SERVICE_NAME: &str = "REGION_SERVER_SERVICE"; +const DATANODE_HTTP_SERVICE_NAME: &str = "DATANODE_HTTP_SERVICE"; /// Datanode service. pub struct Datanode { - opts: DatanodeOptions, - services: Option, + services: ServerHandlers, heartbeat_task: Option, region_event_receiver: Option, region_server: RegionServer, @@ -83,10 +89,17 @@ impl Datanode { self.start_heartbeat().await?; self.wait_coordinated().await; - let _ = self.greptimedb_telemetry_task.start(); + self.start_telemetry(); + self.start_services().await } + pub fn start_telemetry(&self) { + if let Err(e) = self.greptimedb_telemetry_task.start() { + warn!(e; "Failed to start telemetry task!"); + } + } + pub async fn start_heartbeat(&mut self) -> Result<()> { if let Some(task) = &self.heartbeat_task { // Safety: The event_receiver must exist. @@ -106,19 +119,17 @@ impl Datanode { /// Start services of datanode. This method call will block until services are shutdown. pub async fn start_services(&mut self) -> Result<()> { - if let Some(service) = self.services.as_mut() { - service.start(&self.opts).await - } else { - Ok(()) - } + let _ = future::try_join_all(self.services.values().map(start_server)) + .await + .context(StartServerSnafu)?; + Ok(()) } async fn shutdown_services(&self) -> Result<()> { - if let Some(service) = self.services.as_ref() { - service.shutdown().await - } else { - Ok(()) - } + let _ = future::try_join_all(self.services.values().map(|server| server.0.shutdown())) + .await + .context(ShutdownServerSnafu)?; + Ok(()) } pub async fn shutdown(&self) -> Result<()> { @@ -150,17 +161,21 @@ pub struct DatanodeBuilder { plugins: Plugins, meta_client: Option, kv_backend: Option, + enable_region_server_service: bool, + enable_http_service: bool, } impl DatanodeBuilder { /// `kv_backend` is optional. If absent, the builder will try to build one /// by using the given `opts` - pub fn new(opts: DatanodeOptions, kv_backend: Option, plugins: Plugins) -> Self { + pub fn new(opts: DatanodeOptions, plugins: Plugins) -> Self { Self { opts, plugins, meta_client: None, - kv_backend, + kv_backend: None, + enable_region_server_service: false, + enable_http_service: false, } } @@ -171,76 +186,63 @@ impl DatanodeBuilder { } } + pub fn with_kv_backend(self, kv_backend: KvBackendRef) -> Self { + Self { + kv_backend: Some(kv_backend), + ..self + } + } + + pub fn enable_region_server_service(self) -> Self { + Self { + enable_region_server_service: true, + ..self + } + } + + pub fn enable_http_service(self) -> Self { + Self { + enable_http_service: true, + ..self + } + } + pub async fn build(mut self) -> Result { let mode = &self.opts.mode; - // build meta client - let meta_client = match mode { - Mode::Distributed => { - let meta_client = if let Some(meta_client) = self.meta_client.take() { - meta_client - } else { - let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?; - - let meta_config = self - .opts - .meta_client - .as_ref() - .context(MissingMetasrvOptsSnafu)?; - - new_metasrv_client(node_id, meta_config).await? - }; - Some(meta_client) - } - Mode::Standalone => None, - }; + let meta_client = self.meta_client.take(); - // build kv-backend - let kv_backend = match mode { - Mode::Distributed => Arc::new(MetaKvBackend { - client: Arc::new(meta_client.clone().context(MissingMetaClientSnafu)?), - }), - Mode::Standalone => self.kv_backend.clone().context(MissingKvBackendSnafu)?, - }; + // If metasrv client is provided, we will use it to control the region server. + // Otherwise the region server is self-controlled, meaning no heartbeat and immediately + // writable upon open. + let controlled_by_metasrv = meta_client.is_some(); + + let kv_backend = self.kv_backend.take().context(MissingKvBackendSnafu)?; // build and initialize region server let log_store = Self::build_log_store(&self.opts).await?; - let (region_event_listener, region_event_receiver) = match mode { - Mode::Distributed => { - let (tx, rx) = new_region_server_event_channel(); - (Box::new(tx) as RegionServerEventListenerRef, Some(rx)) - } - Mode::Standalone => ( - Box::new(NoopRegionServerEventListener) as RegionServerEventListenerRef, - None, - ), + + let (region_event_listener, region_event_receiver) = if controlled_by_metasrv { + let (tx, rx) = new_region_server_event_channel(); + (Box::new(tx) as _, Some(rx)) + } else { + (Box::new(NoopRegionServerEventListener) as _, None) }; - let region_server = Self::new_region_server( - &self.opts, - self.plugins.clone(), - log_store, - region_event_listener, - ) - .await?; - self.initialize_region_server(®ion_server, kv_backend, matches!(mode, Mode::Standalone)) + let region_server = self + .new_region_server(log_store, region_event_listener) .await?; - let heartbeat_task = match mode { - Mode::Distributed => { - let meta_client = meta_client.context(MissingMetaClientSnafu)?; + self.initialize_region_server(®ion_server, kv_backend, !controlled_by_metasrv) + .await?; - let heartbeat_task = - HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?; - Some(heartbeat_task) - } - Mode::Standalone => None, + let heartbeat_task = if let Some(meta_client) = meta_client { + Some(HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?) + } else { + None }; - let services = match mode { - Mode::Distributed => Some(Services::try_new(region_server.clone(), &self.opts).await?), - Mode::Standalone => None, - }; + let services = self.create_datanode_services(®ion_server)?; let greptimedb_telemetry_task = get_greptimedb_telemetry_task( Some(self.opts.storage.data_home.clone()), @@ -257,7 +259,6 @@ impl DatanodeBuilder { }; Ok(Datanode { - opts: self.opts, services, heartbeat_task, region_server, @@ -268,6 +269,68 @@ impl DatanodeBuilder { }) } + fn create_datanode_services(&self, region_server: &RegionServer) -> Result { + let mut services = HashMap::new(); + + if self.enable_region_server_service { + services.insert( + REGION_SERVER_SERVICE_NAME.to_string(), + self.create_region_server_service(region_server)?, + ); + } + + if self.enable_http_service { + services.insert( + DATANODE_HTTP_SERVICE_NAME.to_string(), + self.create_http_service()?, + ); + } + + Ok(services) + } + + fn create_region_server_service(&self, region_server: &RegionServer) -> Result { + let opts = &self.opts; + + let config = GrpcServerConfig { + max_recv_message_size: opts.rpc_max_recv_message_size.as_bytes() as usize, + max_send_message_size: opts.rpc_max_send_message_size.as_bytes() as usize, + }; + + let server = Box::new(GrpcServer::new( + Some(config), + None, + None, + Some(Arc::new(region_server.clone()) as _), + Some(Arc::new(region_server.clone()) as _), + None, + region_server.runtime(), + )); + + let addr: SocketAddr = opts.rpc_addr.parse().context(ParseAddrSnafu { + addr: &opts.rpc_addr, + })?; + + Ok((server, addr)) + } + + fn create_http_service(&self) -> Result { + let opts = &self.opts; + + let server = Box::new( + HttpServerBuilder::new(opts.http.clone()) + .with_metrics_handler(MetricsHandler) + .with_greptime_config_options(opts.to_toml_string()) + .build(), + ); + + let addr = opts.http.addr.parse().context(ParseAddrSnafu { + addr: &opts.http.addr, + })?; + + Ok((server, addr)) + } + /// Open all regions belong to this datanode. async fn initialize_region_server( &self, @@ -329,18 +392,19 @@ impl DatanodeBuilder { } async fn new_region_server( - opts: &DatanodeOptions, - plugins: Plugins, + &self, log_store: Arc, event_listener: RegionServerEventListenerRef, ) -> Result { + let opts = &self.opts; + let query_engine_factory = QueryEngineFactory::new_with_plugins( // query engine in datanode only executes plan with resolved table source. MemoryCatalogManager::with_default_setup(), None, None, false, - plugins, + self.plugins.clone(), ); let query_engine = query_engine_factory.query_engine(); @@ -352,8 +416,15 @@ impl DatanodeBuilder { .context(RuntimeResourceSnafu)?, ); - let mut region_server = - RegionServer::new(query_engine.clone(), runtime.clone(), event_listener); + let table_provider_factory = Arc::new(DummyTableProviderFactory); + + let mut region_server = RegionServer::with_table_provider( + query_engine, + runtime, + event_listener, + table_provider_factory, + ); + let object_store = store::new_object_store(opts).await?; let object_store_manager = ObjectStoreManager::new( "default", // TODO: use a name which is set in the configuration when #919 is done. @@ -472,7 +543,6 @@ mod tests { node_id: Some(0), ..Default::default() }, - None, Plugins::default(), ); diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 1357570c3ac9..2f9786f6672d 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -212,9 +212,6 @@ pub enum Error { #[snafu(display("Expect KvBackend but not found"))] MissingKvBackend { location: Location }, - #[snafu(display("Expect MetaClient but not found"))] - MissingMetaClient { location: Location }, - #[snafu(display("Invalid SQL, error: {}", msg))] InvalidSql { msg: String }, @@ -295,9 +292,6 @@ pub enum Error { #[snafu(display("Missing node id in Datanode config"))] MissingNodeId { location: Location }, - #[snafu(display("Missing node id option in distributed mode"))] - MissingMetasrvOpts { location: Location }, - #[snafu(display("Missing required field: {}", name))] MissingRequiredField { name: String, location: Location }, @@ -477,13 +471,11 @@ impl ErrorExt for Error { | SchemaExists { .. } | DatabaseNotFound { .. } | MissingNodeId { .. } - | MissingMetasrvOpts { .. } | ColumnNoneDefaultValue { .. } | MissingWalDirConfig { .. } | PrepareImmutableTable { .. } | ColumnDataType { .. } - | MissingKvBackend { .. } - | MissingMetaClient { .. } => StatusCode::InvalidArguments, + | MissingKvBackend { .. } => StatusCode::InvalidArguments, EncodeJson { .. } | PayloadNotExist { .. } | Unexpected { .. } => { StatusCode::Unexpected diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 62a9bd9f309a..2a1265baa786 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -24,7 +24,6 @@ mod greptimedb_telemetry; pub mod heartbeat; pub mod metrics; pub mod region_server; -pub mod server; mod store; #[cfg(test)] #[allow(dead_code)] diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 7e2925af6f74..343891b5cc20 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -79,12 +79,27 @@ impl RegionServer { query_engine: QueryEngineRef, runtime: Arc, event_listener: RegionServerEventListenerRef, + ) -> Self { + Self::with_table_provider( + query_engine, + runtime, + event_listener, + Arc::new(DummyTableProviderFactory), + ) + } + + pub fn with_table_provider( + query_engine: QueryEngineRef, + runtime: Arc, + event_listener: RegionServerEventListenerRef, + table_provider_factory: TableProviderFactoryRef, ) -> Self { Self { inner: Arc::new(RegionServerInner::new( query_engine, runtime, event_listener, + table_provider_factory, )), } } @@ -233,6 +248,7 @@ struct RegionServerInner { query_engine: QueryEngineRef, runtime: Arc, event_listener: RegionServerEventListenerRef, + table_provider_factory: TableProviderFactoryRef, } impl RegionServerInner { @@ -240,6 +256,7 @@ impl RegionServerInner { query_engine: QueryEngineRef, runtime: Arc, event_listener: RegionServerEventListenerRef, + table_provider_factory: TableProviderFactoryRef, ) -> Self { Self { engines: RwLock::new(HashMap::new()), @@ -247,6 +264,7 @@ impl RegionServerInner { query_engine, runtime, event_listener, + table_provider_factory, } } @@ -346,7 +364,13 @@ impl RegionServerInner { .get(®ion_id) .with_context(|| RegionNotFoundSnafu { region_id })? .clone(); - let catalog_list = Arc::new(DummyCatalogList::new(region_id, engine).await?); + + let table_provider = self + .table_provider_factory + .create(region_id, engine) + .await?; + + let catalog_list = Arc::new(DummyCatalogList::with_table_provider(table_provider)); // decode substrait plan to logical plan and execute it let logical_plan = DFLogicalSubstraitConvertor @@ -407,31 +431,16 @@ struct DummyCatalogList { } impl DummyCatalogList { - pub async fn new(region_id: RegionId, engine: RegionEngineRef) -> Result { - let metadata = - engine - .get_metadata(region_id) - .await - .with_context(|_| GetRegionMetadataSnafu { - engine: engine.name(), - region_id, - })?; - let table_provider = DummyTableProvider { - region_id, - engine, - metadata, - scan_request: Default::default(), - }; + fn with_table_provider(table_provider: Arc) -> Self { let schema_provider = DummySchemaProvider { table: table_provider, }; let catalog_provider = DummyCatalogProvider { schema: schema_provider, }; - let catalog_list = Self { + Self { catalog: catalog_provider, - }; - Ok(catalog_list) + } } } @@ -480,7 +489,7 @@ impl CatalogProvider for DummyCatalogProvider { /// For [DummyCatalogList]. #[derive(Clone)] struct DummySchemaProvider { - table: DummyTableProvider, + table: Arc, } #[async_trait] @@ -494,7 +503,7 @@ impl SchemaProvider for DummySchemaProvider { } async fn table(&self, _name: &str) -> Option> { - Some(Arc::new(self.table.clone())) + Some(self.table.clone()) } fn table_exist(&self, _name: &str) -> bool { @@ -555,3 +564,40 @@ impl TableProvider for DummyTableProvider { Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) } } + +pub struct DummyTableProviderFactory; + +#[async_trait] +impl TableProviderFactory for DummyTableProviderFactory { + async fn create( + &self, + region_id: RegionId, + engine: RegionEngineRef, + ) -> Result> { + let metadata = + engine + .get_metadata(region_id) + .await + .with_context(|_| GetRegionMetadataSnafu { + engine: engine.name(), + region_id, + })?; + Ok(Arc::new(DummyTableProvider { + region_id, + engine, + metadata, + scan_request: Default::default(), + })) + } +} + +#[async_trait] +pub trait TableProviderFactory: Send + Sync { + async fn create( + &self, + region_id: RegionId, + engine: RegionEngineRef, + ) -> Result>; +} + +pub type TableProviderFactoryRef = Arc; diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs deleted file mode 100644 index ad198af81350..000000000000 --- a/src/datanode/src/server.rs +++ /dev/null @@ -1,95 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::net::SocketAddr; -use std::sync::Arc; - -use futures::future; -use servers::grpc::{GrpcServer, GrpcServerConfig}; -use servers::http::{HttpServer, HttpServerBuilder}; -use servers::metrics_handler::MetricsHandler; -use servers::server::Server; -use snafu::ResultExt; - -use crate::config::DatanodeOptions; -use crate::error::{ - ParseAddrSnafu, Result, ShutdownServerSnafu, StartServerSnafu, WaitForGrpcServingSnafu, -}; -use crate::region_server::RegionServer; - -/// All rpc services. -pub struct Services { - grpc_server: GrpcServer, - http_server: HttpServer, -} - -impl Services { - pub async fn try_new(region_server: RegionServer, opts: &DatanodeOptions) -> Result { - let flight_handler = Some(Arc::new(region_server.clone()) as _); - let region_server_handler = Some(Arc::new(region_server.clone()) as _); - let runtime = region_server.runtime(); - let grpc_config = GrpcServerConfig { - max_recv_message_size: opts.rpc_max_recv_message_size.as_bytes() as usize, - max_send_message_size: opts.rpc_max_send_message_size.as_bytes() as usize, - }; - - Ok(Self { - grpc_server: GrpcServer::new( - Some(grpc_config), - None, - None, - flight_handler, - region_server_handler, - None, - runtime, - ), - http_server: HttpServerBuilder::new(opts.http.clone()) - .with_metrics_handler(MetricsHandler) - .with_greptime_config_options(opts.to_toml_string()) - .build(), - }) - } - - pub async fn start(&mut self, opts: &DatanodeOptions) -> Result<()> { - let grpc_addr: SocketAddr = opts.rpc_addr.parse().context(ParseAddrSnafu { - addr: &opts.rpc_addr, - })?; - let http_addr = opts.http.addr.parse().context(ParseAddrSnafu { - addr: &opts.http.addr, - })?; - let grpc = self.grpc_server.start(grpc_addr); - let http = self.http_server.start(http_addr); - let _ = future::try_join_all(vec![grpc, http]) - .await - .context(StartServerSnafu)?; - - self.grpc_server - .wait_for_serve() - .await - .context(WaitForGrpcServingSnafu)?; - Ok(()) - } - - pub async fn shutdown(&self) -> Result<()> { - self.grpc_server - .shutdown() - .await - .context(ShutdownServerSnafu)?; - self.http_server - .shutdown() - .await - .context(ShutdownServerSnafu)?; - Ok(()) - } -} diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 8f487dc782a9..2ba0daa0a202 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -19,7 +19,8 @@ mod otlp; mod prom_store; mod region_query; mod script; -mod standalone; +pub mod standalone; + use std::collections::HashMap; use std::sync::Arc; @@ -72,6 +73,7 @@ use servers::query_handler::{ InfluxdbLineProtocolHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler, PromStoreProtocolHandler, ScriptHandler, }; +use servers::server::{start_server, ServerHandlers}; use session::context::QueryContextRef; use snafu::prelude::*; use sql::dialect::Dialect; @@ -93,7 +95,7 @@ use crate::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandl use crate::heartbeat::HeartbeatTask; use crate::metrics; use crate::script::ScriptExecutor; -use crate::server::{start_server, ServerHandlers, Services}; +use crate::server::Services; #[async_trait] pub trait FrontendInstance: diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index c29f2bf45387..5831a9ab3215 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -12,14 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use auth::UserProviderRef; use common_base::Plugins; use common_runtime::Builder as RuntimeBuilder; -use common_telemetry::info; use servers::error::InternalIoSnafu; use servers::grpc::{GrpcServer, GrpcServerConfig}; use servers::http::HttpServerBuilder; @@ -29,7 +27,7 @@ use servers::opentsdb::OpentsdbServer; use servers::postgres::PostgresServer; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor; -use servers::server::Server; +use servers::server::{Server, ServerHandler, ServerHandlers}; use snafu::ResultExt; use crate::error::{self, Result, StartServerSnafu}; @@ -38,10 +36,6 @@ use crate::instance::FrontendInstance; pub(crate) struct Services; -pub type ServerHandlers = HashMap; - -pub type ServerHandler = (Box, SocketAddr); - impl Services { pub(crate) async fn build( opts: T, @@ -210,11 +204,3 @@ impl Services { fn parse_addr(addr: &str) -> Result { addr.parse().context(error::ParseAddrSnafu { addr }) } - -pub async fn start_server( - server_and_addr: &(Box, SocketAddr), -) -> servers::error::Result> { - let (server, addr) = server_and_addr; - info!("Starting {} at {}", server.name(), addr); - server.start(*addr).await.map(Some) -} diff --git a/src/servers/src/server.rs b/src/servers/src/server.rs index 69ee0918609a..95b142803234 100644 --- a/src/servers/src/server.rs +++ b/src/servers/src/server.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; @@ -28,6 +29,16 @@ use crate::error::{self, Result}; pub(crate) type AbortableStream = Abortable; +pub type ServerHandlers = HashMap; + +pub type ServerHandler = (Box, SocketAddr); + +pub async fn start_server(server_handler: &ServerHandler) -> Result> { + let (server, addr) = server_handler; + info!("Starting {} at {}", server.name(), addr); + server.start(*addr).await.map(Some) +} + #[async_trait] pub trait Server: Send + Sync { /// Shutdown the server gracefully. diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 2f84fd756d9c..c1862bcba6ed 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::time::Duration; use api::v1::meta::Role; +use catalog::kvbackend::MetaKvBackend; use client::client_manager::DatanodeClients; use client::Client; use common_base::Plugins; @@ -220,7 +221,12 @@ impl GreptimeDbClusterBuilder { .build(); meta_client.start(&[&meta_srv.server_addr]).await.unwrap(); - let mut datanode = DatanodeBuilder::new(opts, None, Plugins::default()) + let meta_backend = Arc::new(MetaKvBackend { + client: Arc::new(meta_client.clone()), + }); + + let mut datanode = DatanodeBuilder::new(opts, Plugins::default()) + .with_kv_backend(meta_backend) .with_meta_client(meta_client) .build() .await diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 153ad46e0608..33afcb02d64a 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -82,11 +82,11 @@ impl GreptimeDbStandaloneBuilder { let plugins = self.plugin.unwrap_or_default(); - let datanode = - DatanodeBuilder::new(opts.clone(), Some(kv_backend.clone()), plugins.clone()) - .build() - .await - .unwrap(); + let datanode = DatanodeBuilder::new(opts.clone(), plugins.clone()) + .with_kv_backend(kv_backend.clone()) + .build() + .await + .unwrap(); let catalog_manager = KvBackendCatalogManager::new( kv_backend.clone(),