Skip to content

Commit

Permalink
fix: gRPC max mesage size limitation (#2375)
Browse files Browse the repository at this point in the history
* fix: gRPC max mesage size limitation

* chore: don't set max_encoding_message_size
  • Loading branch information
killme2008 authored Sep 13, 2023
1 parent 60bdf96 commit 3f97a0d
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 10 deletions.
12 changes: 9 additions & 3 deletions src/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,24 +138,30 @@ impl Client {
Ok((addr, channel))
}

fn max_grpc_message_size(&self) -> usize {
self.inner.channel_manager.config().max_message_size
}

pub(crate) fn make_flight_client(&self) -> Result<FlightClient> {
let (addr, channel) = self.find_channel()?;
Ok(FlightClient {
addr,
client: FlightServiceClient::new(channel),
client: FlightServiceClient::new(channel)
.max_decoding_message_size(self.max_grpc_message_size()),
})
}

pub(crate) fn make_database_client(&self) -> Result<DatabaseClient> {
let (_, channel) = self.find_channel()?;
Ok(DatabaseClient {
inner: GreptimeDatabaseClient::new(channel),
inner: GreptimeDatabaseClient::new(channel)
.max_decoding_message_size(self.max_grpc_message_size()),
})
}

pub(crate) fn raw_region_client(&self) -> Result<PbRegionClient<Channel>> {
let (_, channel) = self.find_channel()?;
Ok(PbRegionClient::new(channel))
Ok(PbRegionClient::new(channel).max_decoding_message_size(self.max_grpc_message_size()))
}

pub fn make_prometheus_gateway_client(&self) -> Result<PrometheusGatewayClient<Channel>> {
Expand Down
7 changes: 7 additions & 0 deletions src/common/grpc/src/channel_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::error::{CreateChannelSnafu, InvalidConfigFilePathSnafu, InvalidTlsCon
const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60;
pub const DEFAULT_GRPC_REQUEST_TIMEOUT_SECS: u64 = 10;
pub const DEFAULT_GRPC_CONNECT_TIMEOUT_SECS: u64 = 10;
pub const DEFAULT_MAX_GRPC_MESSAGE_SIZE: usize = 512 * 1024 * 1024;

lazy_static! {
static ref ID: AtomicU64 = AtomicU64::new(0);
Expand Down Expand Up @@ -247,6 +248,9 @@ pub struct ChannelConfig {
pub tcp_keepalive: Option<Duration>,
pub tcp_nodelay: bool,
pub client_tls: Option<ClientTlsOption>,
// Max gRPC message size
// TODO(dennis): make it configurable
pub max_message_size: usize,
}

impl Default for ChannelConfig {
Expand All @@ -265,6 +269,7 @@ impl Default for ChannelConfig {
tcp_keepalive: None,
tcp_nodelay: true,
client_tls: None,
max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE,
}
}
}
Expand Down Expand Up @@ -529,6 +534,7 @@ mod tests {
tcp_keepalive: None,
tcp_nodelay: true,
client_tls: None,
max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE,
},
default_cfg
);
Expand Down Expand Up @@ -571,6 +577,7 @@ mod tests {
client_cert_path: "some_cert_path".to_string(),
client_key_path: "some_key_path".to_string(),
}),
max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE,
},
cfg
);
Expand Down
39 changes: 32 additions & 7 deletions src/servers/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use arrow_flight::flight_service_server::FlightService;
use arrow_flight::flight_service_server::FlightServiceServer;
use async_trait::async_trait;
use auth::UserProviderRef;
use common_grpc::channel_manager::DEFAULT_MAX_GRPC_MESSAGE_SIZE;
use common_runtime::Runtime;
use common_telemetry::logging::info;
use common_telemetry::{error, warn};
Expand All @@ -58,6 +59,7 @@ use crate::server::Server;
type TonicResult<T> = std::result::Result<T, Status>;

pub struct GrpcServer {
config: GrpcServerConfig,
// states
shutdown_tx: Mutex<Option<Sender<()>>>,
user_provider: Option<UserProviderRef>,
Expand All @@ -77,6 +79,22 @@ pub struct GrpcServer {
region_server_handler: Option<RegionServerRequestHandler>,
}

/// Grpc Server configuration
#[derive(Debug, Clone)]
pub struct GrpcServerConfig {
// Max gRPC message size
// TODO(dennis): make it configurable
pub max_message_size: usize,
}

impl Default for GrpcServerConfig {
fn default() -> Self {
Self {
max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE,
}
}
}

impl GrpcServer {
pub fn new(
query_handler: Option<ServerGrpcQueryHandlerRef>,
Expand All @@ -92,6 +110,7 @@ impl GrpcServer {
let region_server_handler = region_server_handler
.map(|handler| RegionServerRequestHandler::new(handler, runtime.clone()));
Self {
config: GrpcServerConfig::default(),
shutdown_tx: Mutex::new(None),
user_provider,
serve_state: Mutex::new(None),
Expand Down Expand Up @@ -182,6 +201,7 @@ impl Server for GrpcServer {
}

async fn start(&self, addr: SocketAddr) -> Result<SocketAddr> {
let max_message_size = self.config.max_message_size;
let (tx, rx) = oneshot::channel();
let (listener, addr) = {
let mut shutdown_tx = self.shutdown_tx.lock().await;
Expand All @@ -205,26 +225,31 @@ impl Server for GrpcServer {
.add_service(self.create_healthcheck_service())
.add_service(self.create_reflection_service());
if let Some(database_handler) = &self.database_handler {
builder = builder.add_service(GreptimeDatabaseServer::new(DatabaseService::new(
database_handler.clone(),
)))
builder = builder.add_service(
GreptimeDatabaseServer::new(DatabaseService::new(database_handler.clone()))
.max_decoding_message_size(max_message_size),
)
}
if let Some(prometheus_handler) = &self.prometheus_handler {
builder = builder
.add_service(self.create_prom_query_gateway_service(prometheus_handler.clone()))
}
if let Some(flight_handler) = &self.flight_handler {
builder = builder.add_service(FlightServiceServer::new(FlightCraftWrapper(
flight_handler.clone(),
)))
builder = builder.add_service(
FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone()))
.max_decoding_message_size(max_message_size),
)
} else {
// TODO(ruihang): this is a temporary workaround before region server is ready.
builder = builder.add_service(FlightServiceServer::new(FlightCraftWrapper(
self.database_handler.clone().unwrap(),
)))
}
if let Some(region_server_handler) = &self.region_server_handler {
builder = builder.add_service(RegionServer::new(region_server_handler.clone()))
builder = builder.add_service(
RegionServer::new(region_server_handler.clone())
.max_decoding_message_size(max_message_size),
);
}

let (serve_state_tx, serve_state_rx) = oneshot::channel();
Expand Down

0 comments on commit 3f97a0d

Please sign in to comment.