Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support compression on gRPC server #3961

Merged
merged 14 commits into from
May 20, 2024
Merged
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ tokio = { version = "1.36", features = ["full"] }
tokio-stream = { version = "0.1" }
tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.8.8"
tonic = { version = "0.11", features = ["tls"] }
tonic = { version = "0.11", features = ["tls", "gzip", "zstd"] }
uuid = { version = "1.7", features = ["serde", "v4", "fast-rng"] }
zstd = "0.13"

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ config-docs: ## Generate configuration documentation from toml files.
docker run --rm \
-v ${PWD}:/greptimedb \
-w /greptimedb/config \
toml2docs/toml2docs:latest \
toml2docs/toml2docs:v0.1.1 \
-p '##' \
-t ./config-docs-template.md \
-o ./config.md
Expand Down
36 changes: 28 additions & 8 deletions src/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use arrow_flight::flight_service_client::FlightServiceClient;
use common_grpc::channel_manager::ChannelManager;
use parking_lot::RwLock;
use snafu::{OptionExt, ResultExt};
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;

use crate::load_balance::{LoadBalance, Loadbalancer};
Expand Down Expand Up @@ -149,21 +150,40 @@ impl Client {
.as_bytes() as usize
}

pub fn accept_compressed(&self) -> &Vec<CompressionEncoding> {
self.inner
.channel_manager
.config()
.accept_compressed
.as_ref()
}

pub fn make_flight_client(&self) -> Result<FlightClient> {
let (addr, channel) = self.find_channel()?;
Ok(FlightClient {
addr,
client: FlightServiceClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size()),
})

let mut client = FlightServiceClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size());

for encoding in self.accept_compressed() {
client = client
.accept_compressed(*encoding)
.send_compressed(*encoding);
}
Ok(FlightClient { addr, client })
}

pub(crate) fn raw_region_client(&self) -> Result<PbRegionClient<Channel>> {
let (_, channel) = self.find_channel()?;
Ok(PbRegionClient::new(channel)
let mut client = PbRegionClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size()))
.max_encoding_message_size(self.max_grpc_send_message_size());
for encoding in self.accept_compressed() {
client = client
.accept_compressed(*encoding)
.send_compressed(*encoding);
}
Ok(client)
}

pub fn make_prometheus_gateway_client(&self) -> Result<PrometheusGatewayClient<Channel>> {
Expand Down
19 changes: 18 additions & 1 deletion src/common/grpc/src/channel_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use lazy_static::lazy_static;
use snafu::{OptionExt, ResultExt};
use tonic::codec::CompressionEncoding;
use tonic::transport::{
Certificate, Channel as InnerChannel, ClientTlsConfig, Endpoint, Identity, Uri,
};
Expand Down Expand Up @@ -261,6 +262,8 @@ pub struct ChannelConfig {
pub max_recv_message_size: ReadableSize,
// Max gRPC sending(encoding) message size
pub max_send_message_size: ReadableSize,
// Supported compression encoding for gRPC, e.g: gzip, zstd.
pub accept_compressed: Vec<CompressionEncoding>,
}

impl Default for ChannelConfig {
Expand All @@ -281,6 +284,7 @@ impl Default for ChannelConfig {
client_tls: None,
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
accept_compressed: vec![],
}
}
}
Expand Down Expand Up @@ -381,6 +385,16 @@ impl ChannelConfig {
self.client_tls = Some(client_tls_option);
self
}

/// Enable compression for gRPC
///
/// Disabled by default.
pub fn accept_compressed(self, compression: Vec<CompressionEncoding>) -> Self {
shuiyisong marked this conversation as resolved.
Show resolved Hide resolved
Self {
accept_compressed: compression,
..self
}
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -521,6 +535,7 @@ mod tests {
client_tls: None,
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
accept_compressed: vec![]
},
default_cfg
);
Expand All @@ -542,7 +557,8 @@ mod tests {
server_ca_cert_path: "some_server_path".to_string(),
client_cert_path: "some_cert_path".to_string(),
client_key_path: "some_key_path".to_string(),
});
})
.accept_compressed(vec![CompressionEncoding::Gzip]);

assert_eq!(
ChannelConfig {
Expand All @@ -565,6 +581,7 @@ mod tests {
}),
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
accept_compressed: vec![CompressionEncoding::Gzip]
},
cfg
);
Expand Down
33 changes: 24 additions & 9 deletions src/servers/src/grpc/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common_runtime::Runtime;
use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer;
use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
use tokio::sync::Mutex;
use tonic::codec::CompressionEncoding;
use tonic::transport::server::RoutesBuilder;
use tower::ServiceBuilder;

Expand All @@ -45,11 +46,15 @@ macro_rules! add_service {
let max_recv_message_size = $builder.config().max_recv_message_size;
let max_send_message_size = $builder.config().max_send_message_size;

$builder.routes_builder_mut().add_service(
$service
.max_decoding_message_size(max_recv_message_size)
.max_encoding_message_size(max_send_message_size),
)
let service_builder = $service
.max_decoding_message_size(max_recv_message_size)
.max_encoding_message_size(max_send_message_size)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd);

$builder.routes_builder_mut().add_service(service_builder);
};
}

Expand Down Expand Up @@ -123,16 +128,26 @@ impl GrpcServerBuilder {
otlp_handler: OpenTelemetryProtocolHandlerRef,
user_provider: Option<UserProviderRef>,
) -> Self {
let tracing_service = TraceServiceServer::new(OtlpService::new(otlp_handler.clone()))
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd);

let trace_server = ServiceBuilder::new()
.layer(AuthMiddlewareLayer::with(user_provider.clone()))
.service(TraceServiceServer::new(OtlpService::new(
otlp_handler.clone(),
)));
.service(tracing_service);
self.routes_builder.add_service(trace_server);

let metrics_service = MetricsServiceServer::new(OtlpService::new(otlp_handler))
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd);

let metrics_server = ServiceBuilder::new()
.layer(AuthMiddlewareLayer::with(user_provider))
.service(MetricsServiceServer::new(OtlpService::new(otlp_handler)));
.service(metrics_service);
self.routes_builder.add_service(metrics_server);

self
Expand Down
53 changes: 53 additions & 0 deletions tests-integration/tests/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use api::v1::{
use auth::user_provider_from_option;
use client::{Client, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::MITO_ENGINE;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_query::Output;
use common_recordbatch::RecordBatches;
use servers::grpc::GrpcServerConfig;
Expand Down Expand Up @@ -70,6 +71,8 @@ macro_rules! grpc_tests {
test_insert_and_select,
test_dbname,
test_grpc_message_size_ok,
test_grpc_gzip_compression,
test_grpc_client_plain_with_server_compression,
test_grpc_message_size_limit_recv,
test_grpc_message_size_limit_send,
test_grpc_auth,
Expand Down Expand Up @@ -142,6 +145,56 @@ pub async fn test_grpc_message_size_ok(store_type: StorageType) {
guard.remove_all().await;
}

pub async fn test_grpc_gzip_compression(store_type: StorageType) {
// server and client both support gzip
let config = GrpcServerConfig {
max_recv_message_size: 1024,
max_send_message_size: 1024,
};
let (addr, mut guard, fe_grpc_server) =
setup_grpc_server_with(store_type, "auto_create_table", None, Some(config)).await;

let cm = ChannelManager::with_config(ChannelConfig {
..Default::default()
});

let grpc_client = Client::with_manager_and_urls(cm, vec![addr]);
let db = Database::new_with_dbname(
format!("{}-{}", DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME),
grpc_client,
);
db.sql("show tables;").await.unwrap();
let _ = fe_grpc_server.shutdown().await;
guard.remove_all().await;
}

pub async fn test_grpc_client_plain_with_server_compression(store_type: StorageType) {
// set server compression to gzip
let config = GrpcServerConfig {
max_recv_message_size: 1024,
max_send_message_size: 1024,
};
let (addr, mut guard, fe_grpc_server) =
setup_grpc_server_with(store_type, "auto_create_table", None, Some(config)).await;

// set client compression to true
let cm = ChannelManager::with_config(ChannelConfig {
accept_compressed: vec![],
..Default::default()
});

let grpc_client = Client::with_manager_and_urls(cm, vec![addr]);
let db = Database::new_with_dbname(
format!("{}-{}", DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME),
grpc_client,
);
let re = db.sql("show tables;").await;
assert!(re.is_ok());

let _ = fe_grpc_server.shutdown().await;
guard.remove_all().await;
}

pub async fn test_grpc_message_size_limit_send(store_type: StorageType) {
let config = GrpcServerConfig {
max_recv_message_size: 1024,
Expand Down
Loading