diff --git a/src/client/src/client.rs b/src/client/src/client.rs index ada1ae92c56a..c5457e58741f 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -138,8 +138,12 @@ impl Client { Ok((addr, channel)) } - fn max_grpc_message_size(&self) -> usize { - self.inner.channel_manager.config().max_message_size + fn max_grpc_recv_message_size(&self) -> usize { + self.inner.channel_manager.config().max_recv_message_size + } + + fn max_grpc_send_message_size(&self) -> usize { + self.inner.channel_manager.config().max_send_message_size } pub(crate) fn make_flight_client(&self) -> Result { @@ -147,7 +151,8 @@ impl Client { Ok(FlightClient { addr, client: FlightServiceClient::new(channel) - .max_decoding_message_size(self.max_grpc_message_size()), + .max_decoding_message_size(self.max_grpc_recv_message_size()) + .max_encoding_message_size(self.max_grpc_send_message_size()), }) } @@ -155,13 +160,16 @@ impl Client { let (_, channel) = self.find_channel()?; Ok(DatabaseClient { inner: GreptimeDatabaseClient::new(channel) - .max_decoding_message_size(self.max_grpc_message_size()), + .max_decoding_message_size(self.max_grpc_recv_message_size()) + .max_encoding_message_size(self.max_grpc_send_message_size()), }) } pub(crate) fn raw_region_client(&self) -> Result> { let (_, channel) = self.find_channel()?; - Ok(PbRegionClient::new(channel).max_decoding_message_size(self.max_grpc_message_size())) + Ok(PbRegionClient::new(channel) + .max_decoding_message_size(self.max_grpc_recv_message_size()) + .max_encoding_message_size(self.max_grpc_send_message_size())) } pub fn make_prometheus_gateway_client(&self) -> Result> { diff --git a/src/common/grpc/src/channel_manager.rs b/src/common/grpc/src/channel_manager.rs index 1102ac0fd303..f52177e2890d 100644 --- a/src/common/grpc/src/channel_manager.rs +++ b/src/common/grpc/src/channel_manager.rs @@ -31,7 +31,8 @@ use crate::error::{CreateChannelSnafu, InvalidConfigFilePathSnafu, InvalidTlsCon const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60; pub const DEFAULT_GRPC_REQUEST_TIMEOUT_SECS: u64 = 10; pub const DEFAULT_GRPC_CONNECT_TIMEOUT_SECS: u64 = 10; -pub const DEFAULT_MAX_GRPC_MESSAGE_SIZE: usize = 512 * 1024 * 1024; +pub const DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE: usize = 512 * 1024 * 1024; +pub const DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE: usize = 512 * 1024 * 1024; lazy_static! { static ref ID: AtomicU64 = AtomicU64::new(0); @@ -248,9 +249,10 @@ pub struct ChannelConfig { pub tcp_keepalive: Option, pub tcp_nodelay: bool, pub client_tls: Option, - // Max gRPC message size - // TODO(dennis): make it configurable - pub max_message_size: usize, + // Max gRPC receiving(decoding) message size + pub max_recv_message_size: usize, + // Max gRPC sending(encoding) message size + pub max_send_message_size: usize, } impl Default for ChannelConfig { @@ -269,7 +271,8 @@ impl Default for ChannelConfig { tcp_keepalive: None, tcp_nodelay: true, client_tls: None, - max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE, + max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, + max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, } } } @@ -534,7 +537,8 @@ mod tests { tcp_keepalive: None, tcp_nodelay: true, client_tls: None, - max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE, + max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, + max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, }, default_cfg ); @@ -577,7 +581,8 @@ mod tests { client_cert_path: "some_cert_path".to_string(), client_key_path: "some_key_path".to_string(), }), - max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE, + max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, + max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, }, cfg ); diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 61001f2e39d1..b3835e6e2e84 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -18,6 +18,9 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use common_config::WalConfig; +use common_grpc::channel_manager::{ + DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, +}; pub use common_procedure::options::ProcedureConfig; use common_telemetry::logging::LoggingOptions; use file_engine::config::EngineConfig as FileEngineConfig; @@ -324,6 +327,10 @@ pub struct DatanodeOptions { pub rpc_addr: String, pub rpc_hostname: Option, pub rpc_runtime_size: usize, + // Max gRPC receiving(decoding) message size + pub rpc_max_recv_message_size: usize, + // Max gRPC sending(encoding) message size + pub rpc_max_send_message_size: usize, pub heartbeat: HeartbeatOptions, pub http: HttpOptions, pub meta_client: Option, @@ -344,6 +351,8 @@ impl Default for DatanodeOptions { rpc_addr: "127.0.0.1:3001".to_string(), rpc_hostname: None, rpc_runtime_size: 8, + rpc_max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, + rpc_max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, http: HttpOptions::default(), meta_client: None, wal: WalConfig::default(), diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index 71c050dc0bbc..1847dc4c992a 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -16,7 +16,7 @@ use std::net::SocketAddr; use std::sync::Arc; use futures::future; -use servers::grpc::GrpcServer; +use servers::grpc::{GrpcServer, GrpcServerConfig}; use servers::http::{HttpServer, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::server::Server; @@ -39,9 +39,14 @@ impl Services { let flight_handler = Some(Arc::new(region_server.clone()) as _); let region_server_handler = Some(Arc::new(region_server.clone()) as _); let runtime = region_server.runtime(); + let grpc_config = GrpcServerConfig { + max_recv_message_size: opts.rpc_max_recv_message_size, + max_send_message_size: opts.rpc_max_send_message_size, + }; Ok(Self { grpc_server: GrpcServer::new( + Some(grpc_config), None, None, flight_handler, diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index d96178f1d3ad..5a61c3b48834 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -22,7 +22,7 @@ use common_runtime::Builder as RuntimeBuilder; use common_telemetry::info; use servers::configurator::ConfiguratorRef; use servers::error::Error::InternalIo; -use servers::grpc::GrpcServer; +use servers::grpc::{GrpcServer, GrpcServerConfig}; use servers::http::HttpServerBuilder; use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; @@ -69,7 +69,12 @@ impl Services { .context(error::RuntimeResourceSnafu)?, ); + let grpc_config = GrpcServerConfig { + max_recv_message_size: opts.max_recv_message_size, + max_send_message_size: opts.max_send_message_size, + }; let grpc_server = GrpcServer::new( + Some(grpc_config), Some(ServerGrpcQueryHandlerAdaptor::arc(instance.clone())), Some(instance.clone()), None, diff --git a/src/frontend/src/service_config/grpc.rs b/src/frontend/src/service_config/grpc.rs index 92d6ea771710..e0a64565015d 100644 --- a/src/frontend/src/service_config/grpc.rs +++ b/src/frontend/src/service_config/grpc.rs @@ -12,12 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_grpc::channel_manager::{ + DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, +}; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct GrpcOptions { pub addr: String, pub runtime_size: usize, + // Max gRPC receiving(decoding) message size + pub max_recv_message_size: usize, + // Max gRPC sending(encoding) message size + pub max_send_message_size: usize, } impl Default for GrpcOptions { @@ -25,6 +32,8 @@ impl Default for GrpcOptions { Self { addr: "127.0.0.1:4001".to_string(), runtime_size: 8, + max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, + max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, } } } diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index 5d5582de7faf..69ea1943a6fa 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -33,7 +33,9 @@ use arrow_flight::flight_service_server::FlightService; use arrow_flight::flight_service_server::FlightServiceServer; use async_trait::async_trait; use auth::UserProviderRef; -use common_grpc::channel_manager::DEFAULT_MAX_GRPC_MESSAGE_SIZE; +use common_grpc::channel_manager::{ + DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, +}; use common_runtime::Runtime; use common_telemetry::logging::info; use common_telemetry::{error, warn}; @@ -82,21 +84,24 @@ pub struct GrpcServer { /// Grpc Server configuration #[derive(Debug, Clone)] pub struct GrpcServerConfig { - // Max gRPC message size - // TODO(dennis): make it configurable - pub max_message_size: usize, + // Max gRPC receiving(decoding) message size + pub max_recv_message_size: usize, + // Max gRPC sending(encoding) message size + pub max_send_message_size: usize, } impl Default for GrpcServerConfig { fn default() -> Self { Self { - max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE, + max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, + max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, } } } impl GrpcServer { pub fn new( + config: Option, query_handler: Option, prometheus_handler: Option, flight_handler: Option, @@ -110,7 +115,7 @@ impl GrpcServer { let region_server_handler = region_server_handler .map(|handler| RegionServerRequestHandler::new(handler, runtime.clone())); Self { - config: GrpcServerConfig::default(), + config: config.unwrap_or_default(), shutdown_tx: Mutex::new(None), user_provider, serve_state: Mutex::new(None), @@ -201,7 +206,8 @@ impl Server for GrpcServer { } async fn start(&self, addr: SocketAddr) -> Result { - let max_message_size = self.config.max_message_size; + let max_recv_message_size = self.config.max_recv_message_size; + let max_send_message_size = self.config.max_send_message_size; let (tx, rx) = oneshot::channel(); let (listener, addr) = { let mut shutdown_tx = self.shutdown_tx.lock().await; @@ -227,7 +233,8 @@ impl Server for GrpcServer { if let Some(database_handler) = &self.database_handler { builder = builder.add_service( GreptimeDatabaseServer::new(DatabaseService::new(database_handler.clone())) - .max_decoding_message_size(max_message_size), + .max_decoding_message_size(max_recv_message_size) + .max_encoding_message_size(max_send_message_size), ) } if let Some(prometheus_handler) = &self.prometheus_handler { @@ -237,18 +244,24 @@ impl Server for GrpcServer { if let Some(flight_handler) = &self.flight_handler { builder = builder.add_service( FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone())) - .max_decoding_message_size(max_message_size), + .max_decoding_message_size(max_recv_message_size) + .max_encoding_message_size(max_send_message_size), ) } else { // TODO(ruihang): this is a temporary workaround before region server is ready. - builder = builder.add_service(FlightServiceServer::new(FlightCraftWrapper( - self.database_handler.clone().unwrap(), - ))) + builder = builder.add_service( + FlightServiceServer::new(FlightCraftWrapper( + self.database_handler.clone().unwrap(), + )) + .max_decoding_message_size(max_recv_message_size) + .max_encoding_message_size(max_send_message_size), + ) } if let Some(region_server_handler) = &self.region_server_handler { builder = builder.add_service( RegionServer::new(region_server_handler.clone()) - .max_decoding_message_size(max_message_size), + .max_decoding_message_size(max_recv_message_size) + .max_encoding_message_size(max_send_message_size), ); } diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 9bdb11f90b00..013eeb681ec6 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -272,6 +272,7 @@ async fn create_datanode_client(datanode: &Datanode) -> (String, Client) { let flight_handler = Some(Arc::new(datanode.region_server()) as _); let region_server_handler = Some(Arc::new(datanode.region_server()) as _); let grpc_server = GrpcServer::new( + None, None, None, flight_handler, diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index a6c0cf72fb65..4cf63c042878 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -448,6 +448,7 @@ pub async fn setup_grpc_server_with_user_provider( runtime.clone(), )); let fe_grpc_server = Arc::new(GrpcServer::new( + None, Some(ServerGrpcQueryHandlerAdaptor::arc(fe_instance_ref.clone())), Some(fe_instance_ref.clone()), Some(flight_handler),