From 28df971b8e931a13723a1d1099caf9a63a0a5db9 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Wed, 13 Sep 2023 16:13:49 +0800 Subject: [PATCH] fix: gRPC max mesage size limitation (#2375) * fix: gRPC max mesage size limitation * chore: don't set max_encoding_message_size --- src/client/src/client.rs | 12 ++++++-- src/common/grpc/src/channel_manager.rs | 7 +++++ src/servers/src/grpc.rs | 39 +++++++++++++++++++++----- 3 files changed, 48 insertions(+), 10 deletions(-) diff --git a/src/client/src/client.rs b/src/client/src/client.rs index 2af1d8ae8f5f..ada1ae92c56a 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -138,24 +138,30 @@ impl Client { Ok((addr, channel)) } + fn max_grpc_message_size(&self) -> usize { + self.inner.channel_manager.config().max_message_size + } + pub(crate) fn make_flight_client(&self) -> Result { let (addr, channel) = self.find_channel()?; Ok(FlightClient { addr, - client: FlightServiceClient::new(channel), + client: FlightServiceClient::new(channel) + .max_decoding_message_size(self.max_grpc_message_size()), }) } pub(crate) fn make_database_client(&self) -> Result { let (_, channel) = self.find_channel()?; Ok(DatabaseClient { - inner: GreptimeDatabaseClient::new(channel), + inner: GreptimeDatabaseClient::new(channel) + .max_decoding_message_size(self.max_grpc_message_size()), }) } pub(crate) fn raw_region_client(&self) -> Result> { let (_, channel) = self.find_channel()?; - Ok(PbRegionClient::new(channel)) + Ok(PbRegionClient::new(channel).max_decoding_message_size(self.max_grpc_message_size())) } pub fn make_prometheus_gateway_client(&self) -> Result> { diff --git a/src/common/grpc/src/channel_manager.rs b/src/common/grpc/src/channel_manager.rs index c563f5d078f0..1102ac0fd303 100644 --- a/src/common/grpc/src/channel_manager.rs +++ b/src/common/grpc/src/channel_manager.rs @@ -31,6 +31,7 @@ use crate::error::{CreateChannelSnafu, InvalidConfigFilePathSnafu, InvalidTlsCon const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60; pub const DEFAULT_GRPC_REQUEST_TIMEOUT_SECS: u64 = 10; pub const DEFAULT_GRPC_CONNECT_TIMEOUT_SECS: u64 = 10; +pub const DEFAULT_MAX_GRPC_MESSAGE_SIZE: usize = 512 * 1024 * 1024; lazy_static! { static ref ID: AtomicU64 = AtomicU64::new(0); @@ -247,6 +248,9 @@ pub struct ChannelConfig { pub tcp_keepalive: Option, pub tcp_nodelay: bool, pub client_tls: Option, + // Max gRPC message size + // TODO(dennis): make it configurable + pub max_message_size: usize, } impl Default for ChannelConfig { @@ -265,6 +269,7 @@ impl Default for ChannelConfig { tcp_keepalive: None, tcp_nodelay: true, client_tls: None, + max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE, } } } @@ -529,6 +534,7 @@ mod tests { tcp_keepalive: None, tcp_nodelay: true, client_tls: None, + max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE, }, default_cfg ); @@ -571,6 +577,7 @@ mod tests { client_cert_path: "some_cert_path".to_string(), client_key_path: "some_key_path".to_string(), }), + max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE, }, cfg ); diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index 4689647c4b1b..44fce8b4a275 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -33,6 +33,7 @@ use arrow_flight::flight_service_server::FlightService; use arrow_flight::flight_service_server::FlightServiceServer; use async_trait::async_trait; use auth::UserProviderRef; +use common_grpc::channel_manager::DEFAULT_MAX_GRPC_MESSAGE_SIZE; use common_runtime::Runtime; use common_telemetry::logging::info; use common_telemetry::{error, warn}; @@ -58,6 +59,7 @@ use crate::server::Server; type TonicResult = std::result::Result; pub struct GrpcServer { + config: GrpcServerConfig, // states shutdown_tx: Mutex>>, user_provider: Option, @@ -77,6 +79,22 @@ pub struct GrpcServer { region_server_handler: Option, } +/// Grpc Server configuration +#[derive(Debug, Clone)] +pub struct GrpcServerConfig { + // Max gRPC message size + // TODO(dennis): make it configurable + pub max_message_size: usize, +} + +impl Default for GrpcServerConfig { + fn default() -> Self { + Self { + max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE, + } + } +} + impl GrpcServer { pub fn new( query_handler: Option, @@ -92,6 +110,7 @@ impl GrpcServer { let region_server_handler = region_server_handler .map(|handler| RegionServerRequestHandler::new(handler, runtime.clone())); Self { + config: GrpcServerConfig::default(), shutdown_tx: Mutex::new(None), user_provider, serve_state: Mutex::new(None), @@ -182,6 +201,7 @@ impl Server for GrpcServer { } async fn start(&self, addr: SocketAddr) -> Result { + let max_message_size = self.config.max_message_size; let (tx, rx) = oneshot::channel(); let (listener, addr) = { let mut shutdown_tx = self.shutdown_tx.lock().await; @@ -205,18 +225,20 @@ impl Server for GrpcServer { .add_service(self.create_healthcheck_service()) .add_service(self.create_reflection_service()); if let Some(database_handler) = &self.database_handler { - builder = builder.add_service(GreptimeDatabaseServer::new(DatabaseService::new( - database_handler.clone(), - ))) + builder = builder.add_service( + GreptimeDatabaseServer::new(DatabaseService::new(database_handler.clone())) + .max_decoding_message_size(max_message_size), + ) } if let Some(prometheus_handler) = &self.prometheus_handler { builder = builder .add_service(self.create_prom_query_gateway_service(prometheus_handler.clone())) } if let Some(flight_handler) = &self.flight_handler { - builder = builder.add_service(FlightServiceServer::new(FlightCraftWrapper( - flight_handler.clone(), - ))) + builder = builder.add_service( + FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone())) + .max_decoding_message_size(max_message_size), + ) } else { // TODO(ruihang): this is a temporary workaround before region server is ready. builder = builder.add_service(FlightServiceServer::new(FlightCraftWrapper( @@ -224,7 +246,10 @@ impl Server for GrpcServer { ))) } if let Some(region_server_handler) = &self.region_server_handler { - builder = builder.add_service(RegionServer::new(region_server_handler.clone())) + builder = builder.add_service( + RegionServer::new(region_server_handler.clone()) + .max_decoding_message_size(max_message_size), + ); } let (serve_state_tx, serve_state_rx) = oneshot::channel();