From a85786b80389f7896055dcc75e8f758ad96a432b Mon Sep 17 00:00:00 2001 From: luofucong Date: Fri, 12 Jan 2024 22:33:30 +0800 Subject: [PATCH] refactor: make grpc service able to be added dynamically --- Cargo.lock | 1 + Cargo.toml | 1 + benchmarks/Cargo.toml | 2 +- src/cmd/Cargo.toml | 2 +- src/cmd/src/datanode.rs | 16 +++- src/cmd/src/lib.rs | 5 ++ src/datanode/src/datanode.rs | 99 ++--------------------- src/datanode/src/lib.rs | 1 + src/datanode/src/service.rs | 107 +++++++++++++++++++++++++ src/frontend/src/server.rs | 7 +- src/servers/src/grpc.rs | 115 ++++---------------------- src/servers/src/grpc/builder.rs | 124 +++++++++++++++++------------ tests-integration/Cargo.toml | 1 + tests-integration/src/cluster.rs | 17 ++-- tests-integration/src/test_util.rs | 6 +- tests/runner/Cargo.toml | 2 +- 16 files changed, 245 insertions(+), 261 deletions(-) create mode 100644 src/datanode/src/service.rs diff --git a/Cargo.lock b/Cargo.lock index 84e879bf8639..9c1c71356e6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9487,6 +9487,7 @@ name = "tests-integration" version = "0.6.0" dependencies = [ "api", + "arrow-flight", "async-trait", "auth", "axum", diff --git a/Cargo.toml b/Cargo.toml index e8d08b7a3549..8e1e9b07f830 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,7 @@ bitflags = "2.4.1" bytemuck = "1.12" bytes = { version = "1.5", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] } +clap = { version = "4.4", features = ["derive"] } dashmap = "5.4" datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 07d33930cf72..d56524fe36a9 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -7,7 +7,7 @@ license.workspace = true [dependencies] arrow.workspace = true chrono.workspace = true -clap = { version = "4.0", features = ["derive"] } +clap.workspace = true client.workspace = true futures-util.workspace = true indicatif = "0.17.1" diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index d9d63d74dff4..648e92852ba9 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -18,7 +18,7 @@ async-trait.workspace = true auth.workspace = true catalog.workspace = true chrono.workspace = true -clap = { version = "4.4", features = ["derive"] } +clap.workspace = true client.workspace = true common-base.workspace = true common-catalog.workspace = true diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index b3f0cbf8a111..883241d8ba07 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -22,6 +22,7 @@ use common_config::WalConfig; use common_telemetry::{info, logging}; use datanode::config::DatanodeOptions; use datanode::datanode::{Datanode, DatanodeBuilder}; +use datanode::service::DatanodeServiceBuilder; use meta_client::MetaClientOptions; use servers::Mode; use snafu::{OptionExt, ResultExt}; @@ -38,6 +39,10 @@ impl Instance { fn new(datanode: Datanode) -> Self { Self { datanode } } + + pub fn datanode_mut(&mut self) -> &mut Datanode { + &mut self.datanode + } } #[async_trait] @@ -219,15 +224,20 @@ impl StartCommand { client: Arc::new(meta_client.clone()), }); - let datanode = DatanodeBuilder::new(opts, plugins) + let mut datanode = DatanodeBuilder::new(opts.clone(), plugins) .with_meta_client(meta_client) .with_kv_backend(meta_backend) - .enable_region_server_service() - .enable_http_service() .build() .await .context(StartDatanodeSnafu)?; + let services = DatanodeServiceBuilder::new(&opts) + .with_default_grpc_server(&datanode.region_server()) + .enable_http_service() + .build() + .context(StartDatanodeSnafu)?; + datanode.setup_services(services); + Ok(Instance::new(datanode)) } } diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index dfa2b3a119d5..08bc2e66f313 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -35,6 +35,11 @@ lazy_static::lazy_static! { pub trait App { fn name(&self) -> &str; + /// A hook for implementor to make something happened before actual startup. Defaults to no-op. + fn pre_start(&mut self) -> error::Result<()> { + Ok(()) + } + async fn start(&mut self) -> error::Result<()>; async fn stop(&self) -> error::Result<()>; diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 4e4d16610e69..2d40ad81245e 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -15,7 +15,6 @@ //! Datanode implementation. use std::collections::HashMap; -use std::net::SocketAddr; use std::path::Path; use std::sync::Arc; @@ -45,11 +44,7 @@ use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; use object_store::util::{join_dir, normalize_dir}; use query::QueryEngineFactory; use servers::export_metrics::ExportMetricsTask; -use servers::grpc::builder::GrpcServerBuilder; -use servers::grpc::GrpcServerConfig; -use servers::http::HttpServerBuilder; -use servers::metrics_handler::MetricsHandler; -use servers::server::{start_server, ServerHandler, ServerHandlers}; +use servers::server::{start_server, ServerHandlers}; use servers::Mode; use snafu::{OptionExt, ResultExt}; use store_api::path_utils::{region_dir, WAL_DIR}; @@ -62,8 +57,8 @@ use tokio::sync::Notify; use crate::config::{DatanodeOptions, RegionEngineConfig}; use crate::error::{ BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu, - MissingNodeIdSnafu, OpenLogStoreSnafu, ParseAddrSnafu, Result, RuntimeResourceSnafu, - ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu, + MissingNodeIdSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu, + ShutdownServerSnafu, StartServerSnafu, }; use crate::event_listener::{ new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef, @@ -75,8 +70,6 @@ use crate::region_server::{DummyTableProviderFactory, RegionServer}; use crate::store; const OPEN_REGION_PARALLELISM: usize = 16; -const REGION_SERVER_SERVICE_NAME: &str = "REGION_SERVER_SERVICE"; -const DATANODE_HTTP_SERVICE_NAME: &str = "DATANODE_HTTP_SERVICE"; /// Datanode service. pub struct Datanode { @@ -129,6 +122,10 @@ impl Datanode { } } + pub fn setup_services(&mut self, services: ServerHandlers) { + self.services = services; + } + /// Start services of datanode. This method call will block until services are shutdown. pub async fn start_services(&mut self) -> Result<()> { let _ = future::try_join_all(self.services.values().map(start_server)) @@ -173,8 +170,6 @@ pub struct DatanodeBuilder { plugins: Plugins, meta_client: Option, kv_backend: Option, - enable_region_server_service: bool, - enable_http_service: bool, } impl DatanodeBuilder { @@ -186,8 +181,6 @@ impl DatanodeBuilder { plugins, meta_client: None, kv_backend: None, - enable_region_server_service: false, - enable_http_service: false, } } @@ -205,20 +198,6 @@ impl DatanodeBuilder { } } - pub fn enable_region_server_service(self) -> Self { - Self { - enable_region_server_service: true, - ..self - } - } - - pub fn enable_http_service(self) -> Self { - Self { - enable_http_service: true, - ..self - } - } - pub async fn build(mut self) -> Result { let mode = &self.opts.mode; let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?; @@ -269,8 +248,6 @@ impl DatanodeBuilder { None }; - let services = self.create_datanode_services(®ion_server)?; - let greptimedb_telemetry_task = get_greptimedb_telemetry_task( Some(self.opts.storage.data_home.clone()), mode, @@ -290,7 +267,7 @@ impl DatanodeBuilder { .context(StartServerSnafu)?; Ok(Datanode { - services, + services: HashMap::new(), heartbeat_task, region_server, greptimedb_telemetry_task, @@ -301,66 +278,6 @@ impl DatanodeBuilder { }) } - fn create_datanode_services(&self, region_server: &RegionServer) -> Result { - let mut services = HashMap::new(); - - if self.enable_region_server_service { - services.insert( - REGION_SERVER_SERVICE_NAME.to_string(), - self.create_region_server_service(region_server)?, - ); - } - - if self.enable_http_service { - services.insert( - DATANODE_HTTP_SERVICE_NAME.to_string(), - self.create_http_service()?, - ); - } - - Ok(services) - } - - fn create_region_server_service(&self, region_server: &RegionServer) -> Result { - let opts = &self.opts; - - let config = GrpcServerConfig { - max_recv_message_size: opts.rpc_max_recv_message_size.as_bytes() as usize, - max_send_message_size: opts.rpc_max_send_message_size.as_bytes() as usize, - }; - - let server = Box::new( - GrpcServerBuilder::new(region_server.runtime()) - .config(config) - .flight_handler(Arc::new(region_server.clone())) - .region_server_handler(Arc::new(region_server.clone())) - .build(), - ); - - let addr: SocketAddr = opts.rpc_addr.parse().context(ParseAddrSnafu { - addr: &opts.rpc_addr, - })?; - - Ok((server, addr)) - } - - fn create_http_service(&self) -> Result { - let opts = &self.opts; - - let server = Box::new( - HttpServerBuilder::new(opts.http.clone()) - .with_metrics_handler(MetricsHandler) - .with_greptime_config_options(opts.to_toml_string()) - .build(), - ); - - let addr = opts.http.addr.parse().context(ParseAddrSnafu { - addr: &opts.http.addr, - })?; - - Ok((server, addr)) - } - #[cfg(test)] /// Open all regions belong to this datanode. async fn initialize_region_server( diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 0e036f2bd540..dae3eef76c79 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -23,6 +23,7 @@ mod greptimedb_telemetry; pub mod heartbeat; pub mod metrics; pub mod region_server; +pub mod service; mod store; #[cfg(any(test, feature = "testing"))] pub mod tests; diff --git a/src/datanode/src/service.rs b/src/datanode/src/service.rs new file mode 100644 index 000000000000..5e203f53da61 --- /dev/null +++ b/src/datanode/src/service.rs @@ -0,0 +1,107 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::Arc; + +use servers::grpc::builder::GrpcServerBuilder; +use servers::grpc::{GrpcServer, GrpcServerConfig}; +use servers::http::HttpServerBuilder; +use servers::metrics_handler::MetricsHandler; +use servers::server::{ServerHandler, ServerHandlers}; +use snafu::ResultExt; + +use crate::config::DatanodeOptions; +use crate::error::{ParseAddrSnafu, Result}; +use crate::region_server::RegionServer; + +const DATANODE_GRPC_SERVICE_NAME: &str = "DATANODE_GRPC_SERVICE"; +const DATANODE_HTTP_SERVICE_NAME: &str = "DATANODE_HTTP_SERVICE"; + +pub struct DatanodeServiceBuilder<'a> { + opts: &'a DatanodeOptions, + grpc_server: Option, + enable_http_service: bool, +} + +impl<'a> DatanodeServiceBuilder<'a> { + pub fn new(opts: &'a DatanodeOptions) -> Self { + Self { + opts, + grpc_server: None, + enable_http_service: false, + } + } + + pub fn with_grpc_server(self, grpc_server: GrpcServer) -> Self { + Self { + grpc_server: Some(grpc_server), + ..self + } + } + + pub fn with_default_grpc_server(mut self, region_server: &RegionServer) -> Self { + let grpc_server = Self::grpc_server_builder(self.opts, region_server).build(); + self.grpc_server = Some(grpc_server); + self + } + + pub fn enable_http_service(self) -> Self { + Self { + enable_http_service: true, + ..self + } + } + + pub fn build(mut self) -> Result { + let mut services = HashMap::new(); + + if let Some(grpc_server) = self.grpc_server.take() { + let addr: SocketAddr = self.opts.rpc_addr.parse().context(ParseAddrSnafu { + addr: &self.opts.rpc_addr, + })?; + let handler: ServerHandler = (Box::new(grpc_server), addr); + services.insert(DATANODE_GRPC_SERVICE_NAME.to_string(), handler); + } + + if self.enable_http_service { + let http_server = HttpServerBuilder::new(self.opts.http.clone()) + .with_metrics_handler(MetricsHandler) + .with_greptime_config_options(self.opts.to_toml_string()) + .build(); + let addr: SocketAddr = self.opts.http.addr.parse().context(ParseAddrSnafu { + addr: &self.opts.http.addr, + })?; + let handler: ServerHandler = (Box::new(http_server), addr); + services.insert(DATANODE_HTTP_SERVICE_NAME.to_string(), handler); + } + + Ok(services) + } + + pub fn grpc_server_builder( + opts: &DatanodeOptions, + region_server: &RegionServer, + ) -> GrpcServerBuilder { + let config = GrpcServerConfig { + max_recv_message_size: opts.rpc_max_recv_message_size.as_bytes() as usize, + max_send_message_size: opts.rpc_max_send_message_size.as_bytes() as usize, + }; + + GrpcServerBuilder::new(config, region_server.runtime()) + .flight_handler(Arc::new(region_server.clone())) + .region_server_handler(Arc::new(region_server.clone())) + } +} diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 42079aab99bf..9e9480556f92 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -60,7 +60,7 @@ impl Services { max_send_message_size: opts.max_send_message_size.as_bytes() as usize, }; - Ok(GrpcServerBuilder::new(grpc_runtime).config(grpc_config)) + Ok(GrpcServerBuilder::new(grpc_config, grpc_runtime)) } pub async fn build(&self, opts: T, instance: Arc) -> Result @@ -102,9 +102,8 @@ impl Services { ); let grpc_server = builder .database_handler(greptime_request_handler.clone()) - .prometheus_handler(instance.clone()) - .otlp_handler(instance.clone()) - .user_provider(user_provider.clone()) + .prometheus_handler(instance.clone(), user_provider.clone()) + .otlp_handler(instance.clone(), user_provider.clone()) .flight_handler(Arc::new(greptime_request_handler)) .build(); diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index d8bc6fc387c6..9b667a173263 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -23,70 +23,38 @@ pub mod region_server; use std::net::SocketAddr; -use api::v1::greptime_database_server::GreptimeDatabaseServer; use api::v1::health_check_server::{HealthCheck, HealthCheckServer}; -use api::v1::prometheus_gateway_server::{PrometheusGateway, PrometheusGatewayServer}; -#[cfg(feature = "testing")] -use api::v1::region::region_server::Region; -use api::v1::region::region_server::RegionServer; use api::v1::{HealthCheckRequest, HealthCheckResponse}; -#[cfg(feature = "testing")] -use arrow_flight::flight_service_server::FlightService; -use arrow_flight::flight_service_server::FlightServiceServer; use async_trait::async_trait; -use auth::UserProviderRef; use common_grpc::channel_manager::{ DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, }; use common_telemetry::logging::info; use common_telemetry::{error, warn}; use futures::FutureExt; -use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer; -use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer; use snafu::{ensure, OptionExt, ResultExt}; use tokio::net::TcpListener; use tokio::sync::oneshot::{self, Receiver, Sender}; use tokio::sync::Mutex; -use tonic::transport::server::TcpIncoming; +use tonic::transport::server::{Routes, TcpIncoming}; use tonic::{Request, Response, Status}; use tonic_reflection::server::{ServerReflection, ServerReflectionServer}; -use tower::ServiceBuilder; -use self::authorize::AuthMiddlewareLayer; -use self::flight::{FlightCraftRef, FlightCraftWrapper}; -use self::otlp::OtlpService; -use self::prom_query_gateway::PrometheusGatewayService; -use self::region_server::RegionServerRequestHandler; use crate::error::{ AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu, }; -use crate::grpc::database::DatabaseService; -use crate::grpc::greptime_handler::GreptimeRequestHandler; -use crate::prometheus_handler::PrometheusHandlerRef; -use crate::query_handler::OpenTelemetryProtocolHandlerRef; use crate::server::Server; type TonicResult = std::result::Result; pub struct GrpcServer { - config: GrpcServerConfig, // states shutdown_tx: Mutex>>, /// gRPC serving state receiver. Only present if the gRPC server is started. /// Used to wait for the server to stop, performing the old blocking fashion. serve_state: Mutex>>>, - user_provider: Option, // handlers - /// Handler for [DatabaseService] service. - database_handler: Option, - /// Handler for Prometheus-compatible PromQL queries ([PrometheusGateway]). Only present for frontend server. - prometheus_handler: Option, - /// Handler for [FlightService](arrow_flight::flight_service_server::FlightService). - flight_handler: Option, - /// Handler for [RegionServer]. - region_server_handler: Option, - /// Handler for OpenTelemetry Protocol (OTLP) requests. - otlp_handler: Option, + routes: Mutex>, } /// Grpc Server configuration @@ -108,16 +76,6 @@ impl Default for GrpcServerConfig { } impl GrpcServer { - #[cfg(feature = "testing")] - pub fn create_flight_service(&self) -> FlightServiceServer { - FlightServiceServer::new(FlightCraftWrapper(self.flight_handler.clone().unwrap())) - } - - #[cfg(feature = "testing")] - pub fn create_region_service(&self) -> RegionServer { - RegionServer::new(self.region_server_handler.clone().unwrap()) - } - pub fn create_healthcheck_service(&self) -> HealthCheckServer { HealthCheckServer::new(HealthCheckHandler) } @@ -132,16 +90,6 @@ impl GrpcServer { .unwrap() } - pub fn create_prom_query_gateway_service( - &self, - handler: PrometheusHandlerRef, - ) -> PrometheusGatewayServer { - PrometheusGatewayServer::new(PrometheusGatewayService::new( - handler, - self.user_provider.clone(), - )) - } - pub async fn wait_for_serve(&self) -> Result<()> { let mut serve_state = self.serve_state.lock().await; let rx = serve_state.take().context(InternalSnafu { @@ -188,8 +136,17 @@ impl Server for GrpcServer { } async fn start(&self, addr: SocketAddr) -> Result { - let max_recv_message_size = self.config.max_recv_message_size; - let max_send_message_size = self.config.max_send_message_size; + let routes = { + let mut routes = self.routes.lock().await; + let Some(routes) = routes.take() else { + return AlreadyStartedSnafu { + server: self.name(), + } + .fail(); + }; + routes + }; + let (tx, rx) = oneshot::channel(); let (incoming, addr) = { let mut shutdown_tx = self.shutdown_tx.lock().await; @@ -211,52 +168,10 @@ impl Server for GrpcServer { (incoming, addr) }; - let mut builder = tonic::transport::Server::builder() + let builder = tonic::transport::Server::builder() + .add_routes(routes) .add_service(self.create_healthcheck_service()) .add_service(self.create_reflection_service()); - if let Some(database_handler) = &self.database_handler { - builder = builder.add_service( - GreptimeDatabaseServer::new(DatabaseService::new(database_handler.clone())) - .max_decoding_message_size(max_recv_message_size) - .max_encoding_message_size(max_send_message_size), - ) - } - if let Some(prometheus_handler) = &self.prometheus_handler { - builder = builder - .add_service(self.create_prom_query_gateway_service(prometheus_handler.clone())) - } - - if let Some(otlp_handler) = &self.otlp_handler { - let trace_server = ServiceBuilder::new() - .layer(AuthMiddlewareLayer::with(self.user_provider.clone())) - .service(TraceServiceServer::new(OtlpService::new( - otlp_handler.clone(), - ))); - builder = builder.add_service(trace_server); - - let metrics_server = ServiceBuilder::new() - .layer(AuthMiddlewareLayer::with(self.user_provider.clone())) - .service(MetricsServiceServer::new(OtlpService::new( - otlp_handler.clone(), - ))); - - builder = builder.add_service(metrics_server); - } - - if let Some(flight_handler) = &self.flight_handler { - builder = builder.add_service( - FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone())) - .max_decoding_message_size(max_recv_message_size) - .max_encoding_message_size(max_send_message_size), - ) - } - if let Some(region_server_handler) = &self.region_server_handler { - builder = builder.add_service( - RegionServer::new(region_server_handler.clone()) - .max_decoding_message_size(max_recv_message_size) - .max_encoding_message_size(max_send_message_size), - ); - } let (serve_state_tx, serve_state_rx) = oneshot::channel(); let mut serve_state = self.serve_state.lock().await; diff --git a/src/servers/src/grpc/builder.rs b/src/servers/src/grpc/builder.rs index 33e49f9be8bc..7a5baa40ea72 100644 --- a/src/servers/src/grpc/builder.rs +++ b/src/servers/src/grpc/builder.rs @@ -14,101 +14,127 @@ use std::sync::Arc; +use api::v1::greptime_database_server::GreptimeDatabaseServer; +use api::v1::prometheus_gateway_server::PrometheusGatewayServer; +use api::v1::region::region_server::RegionServer; +use arrow_flight::flight_service_server::FlightServiceServer; use auth::UserProviderRef; use common_runtime::Runtime; +use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer; +use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer; use tokio::sync::Mutex; +use tonic::transport::server::RoutesBuilder; +use tower::ServiceBuilder; -use super::flight::FlightCraftRef; +use super::flight::{FlightCraftRef, FlightCraftWrapper}; use super::region_server::{RegionServerHandlerRef, RegionServerRequestHandler}; use super::{GrpcServer, GrpcServerConfig}; +use crate::grpc::authorize::AuthMiddlewareLayer; +use crate::grpc::database::DatabaseService; use crate::grpc::greptime_handler::GreptimeRequestHandler; +use crate::grpc::otlp::OtlpService; +use crate::grpc::prom_query_gateway::PrometheusGatewayService; use crate::prometheus_handler::PrometheusHandlerRef; use crate::query_handler::OpenTelemetryProtocolHandlerRef; +macro_rules! add_service { + ($builder: ident, $service: expr) => { + $builder.routes_builder.add_service( + $service + .max_decoding_message_size($builder.config.max_recv_message_size) + .max_encoding_message_size($builder.config.max_send_message_size), + ) + }; +} + pub struct GrpcServerBuilder { - config: Option, - database_handler: Option, - prometheus_handler: Option, - flight_handler: Option, - region_server_handler: Option, - otlp_handler: Option, - user_provider: Option, + config: GrpcServerConfig, runtime: Arc, + routes_builder: RoutesBuilder, } impl GrpcServerBuilder { - pub fn new(runtime: Arc) -> Self { + pub fn new(config: GrpcServerConfig, runtime: Arc) -> Self { Self { - config: None, - database_handler: None, - prometheus_handler: None, - flight_handler: None, - region_server_handler: None, - otlp_handler: None, - user_provider: None, + config, runtime, + routes_builder: RoutesBuilder::default(), } } - pub fn config(mut self, config: GrpcServerConfig) -> Self { - self.config = Some(config); - self - } - - pub fn option_config(mut self, config: Option) -> Self { - self.config = config; - self - } - pub fn runtime(&self) -> &Arc { &self.runtime } + /// Add handler for [DatabaseService] service. pub fn database_handler(mut self, database_handler: GreptimeRequestHandler) -> Self { - self.database_handler = Some(database_handler); + add_service!( + self, + GreptimeDatabaseServer::new(DatabaseService::new(database_handler)) + ); self } - pub fn prometheus_handler(mut self, prometheus_handler: PrometheusHandlerRef) -> Self { - self.prometheus_handler = Some(prometheus_handler); + /// Add handler for Prometheus-compatible PromQL queries ([PrometheusGateway]). + pub fn prometheus_handler( + mut self, + prometheus_handler: PrometheusHandlerRef, + user_provider: Option, + ) -> Self { + add_service!( + self, + PrometheusGatewayServer::new(PrometheusGatewayService::new( + prometheus_handler, + user_provider, + )) + ); self } + /// Add handler for [FlightService](arrow_flight::flight_service_server::FlightService). pub fn flight_handler(mut self, flight_handler: FlightCraftRef) -> Self { - self.flight_handler = Some(flight_handler); + add_service!( + self, + FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone())) + ); self } + /// Add handler for [RegionServer]. pub fn region_server_handler(mut self, region_server_handler: RegionServerHandlerRef) -> Self { - self.region_server_handler = Some(region_server_handler); + let handler = RegionServerRequestHandler::new(region_server_handler, self.runtime.clone()); + add_service!(self, RegionServer::new(handler)); self } - pub fn otlp_handler(mut self, otlp_handler: OpenTelemetryProtocolHandlerRef) -> Self { - self.otlp_handler = Some(otlp_handler); + /// Add handler for OpenTelemetry Protocol (OTLP) requests. + pub fn otlp_handler( + mut self, + otlp_handler: OpenTelemetryProtocolHandlerRef, + user_provider: Option, + ) -> Self { + let trace_server = ServiceBuilder::new() + .layer(AuthMiddlewareLayer::with(user_provider.clone())) + .service(TraceServiceServer::new(OtlpService::new( + otlp_handler.clone(), + ))); + self.routes_builder.add_service(trace_server); + + let metrics_server = ServiceBuilder::new() + .layer(AuthMiddlewareLayer::with(user_provider)) + .service(MetricsServiceServer::new(OtlpService::new(otlp_handler))); + self.routes_builder.add_service(metrics_server); + self } - pub fn user_provider(mut self, user_provider: Option) -> Self { - self.user_provider = user_provider; - self + pub fn routes_builder_mut(&mut self) -> &mut RoutesBuilder { + &mut self.routes_builder } pub fn build(self) -> GrpcServer { - let config = self.config.unwrap_or_default(); - let runtime = self.runtime; - let region_server_handler = self - .region_server_handler - .map(|handler| RegionServerRequestHandler::new(handler, runtime)); - GrpcServer { - config, - prometheus_handler: self.prometheus_handler, - flight_handler: self.flight_handler, - region_server_handler, - database_handler: self.database_handler, - otlp_handler: self.otlp_handler, - user_provider: self.user_provider, + routes: Mutex::new(Some(self.routes_builder.routes())), shutdown_tx: Mutex::new(None), serve_state: Mutex::new(None), } diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 1ba6f8e05d36..b32cb5856510 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -9,6 +9,7 @@ dashboard = [] [dependencies] api.workspace = true +arrow-flight.workspace = true async-trait = "0.1" auth.workspace = true axum = "0.6" diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index eb55a8621b56..40a98dd040b0 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -18,6 +18,8 @@ use std::sync::Arc; use std::time::Duration; use api::v1::meta::Role; +use api::v1::region::region_server::RegionServer; +use arrow_flight::flight_service_server::FlightServiceServer; use catalog::kvbackend::{CachedMetaKvBackend, MetaKvBackend}; use client::client_manager::DatanodeClients; use client::Client; @@ -45,7 +47,8 @@ use meta_client::client::MetaClientBuilder; use meta_srv::cluster::MetaPeerClientRef; use meta_srv::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef}; use meta_srv::mocks::MockInfo; -use servers::grpc::builder::GrpcServerBuilder; +use servers::grpc::flight::FlightCraftWrapper; +use servers::grpc::region_server::RegionServerRequestHandler; use servers::heartbeat_options::HeartbeatOptions; use servers::Mode; use tempfile::TempDir; @@ -373,15 +376,15 @@ async fn create_datanode_client(datanode: &Datanode) -> (String, Client) { .unwrap(), ); - let grpc_server = GrpcServerBuilder::new(runtime) - .flight_handler(Arc::new(datanode.region_server())) - .region_server_handler(Arc::new(datanode.region_server())) - .build(); + let flight_handler = FlightCraftWrapper(datanode.region_server()); + + let region_server_handler = + RegionServerRequestHandler::new(Arc::new(datanode.region_server()), runtime); let _handle = tokio::spawn(async move { Server::builder() - .add_service(grpc_server.create_flight_service()) - .add_service(grpc_server.create_region_service()) + .add_service(FlightServiceServer::new(flight_handler)) + .add_service(RegionServer::new(region_server_handler)) .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await }); diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 830bb23fad81..43eae9b4759e 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -511,12 +511,10 @@ pub async fn setup_grpc_server_with( let flight_handler = Arc::new(greptime_request_handler.clone()); let fe_grpc_server = Arc::new( - GrpcServerBuilder::new(runtime) - .option_config(grpc_config) + GrpcServerBuilder::new(grpc_config.unwrap_or_default(), runtime) .database_handler(greptime_request_handler) .flight_handler(flight_handler) - .prometheus_handler(fe_instance_ref.clone()) - .user_provider(user_provider) + .prometheus_handler(fe_instance_ref.clone(), user_provider) .build(), ); diff --git a/tests/runner/Cargo.toml b/tests/runner/Cargo.toml index b2757f479dd6..e6ad0bd84de2 100644 --- a/tests/runner/Cargo.toml +++ b/tests/runner/Cargo.toml @@ -6,7 +6,7 @@ license.workspace = true [dependencies] async-trait = "0.1" -clap = { version = "4.0", features = ["derive"] } +clap.workspace = true client.workspace = true common-base.workspace = true common-error.workspace = true