diff --git a/src/meta-client/src/client/cluster.rs b/src/meta-client/src/client/cluster.rs index b1c7ff1089a1..72b2307790fa 100644 --- a/src/meta-client/src/client/cluster.rs +++ b/src/meta-client/src/client/cluster.rs @@ -31,6 +31,7 @@ use common_meta::rpc::store::{ use common_telemetry::{info, warn}; use snafu::{ensure, ResultExt}; use tokio::sync::RwLock; +use tonic::codec::CompressionEncoding; use tonic::transport::Channel; use tonic::Status; @@ -173,7 +174,10 @@ impl Inner { fn make_client(&self, addr: impl AsRef) -> Result> { let channel = self.channel_manager.get(addr).context(CreateChannelSnafu)?; - Ok(ClusterClient::new(channel)) + Ok(ClusterClient::new(channel) + .accept_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Zstd) + .send_compressed(CompressionEncoding::Zstd)) } #[inline] diff --git a/src/meta-client/src/client/heartbeat.rs b/src/meta-client/src/client/heartbeat.rs index 47984360b44b..b1214d72df6d 100644 --- a/src/meta-client/src/client/heartbeat.rs +++ b/src/meta-client/src/client/heartbeat.rs @@ -23,6 +23,7 @@ use common_telemetry::tracing_context::TracingContext; use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::{mpsc, RwLock}; use tokio_stream::wrappers::ReceiverStream; +use tonic::codec::CompressionEncoding; use tonic::transport::Channel; use tonic::Streaming; @@ -249,7 +250,10 @@ impl Inner { .get(addr) .context(error::CreateChannelSnafu)?; - Ok(HeartbeatClient::new(channel)) + Ok(HeartbeatClient::new(channel) + .accept_compressed(CompressionEncoding::Zstd) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Zstd)) } #[inline] diff --git a/src/meta-client/src/client/procedure.rs b/src/meta-client/src/client/procedure.rs index eccfe90dff93..2f310ab85d30 100644 --- a/src/meta-client/src/client/procedure.rs +++ b/src/meta-client/src/client/procedure.rs @@ -27,6 +27,7 @@ use common_telemetry::tracing_context::TracingContext; use common_telemetry::{info, warn}; use snafu::{ensure, ResultExt}; use tokio::sync::RwLock; +use tonic::codec::CompressionEncoding; use tonic::transport::Channel; use tonic::Status; @@ -141,7 +142,10 @@ impl Inner { .get(addr) .context(error::CreateChannelSnafu)?; - Ok(ProcedureServiceClient::new(channel)) + Ok(ProcedureServiceClient::new(channel) + .accept_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Zstd) + .send_compressed(CompressionEncoding::Zstd)) } #[inline] diff --git a/src/meta-client/src/client/store.rs b/src/meta-client/src/client/store.rs index e63f7ade1f5a..4f0fea7e0f23 100644 --- a/src/meta-client/src/client/store.rs +++ b/src/meta-client/src/client/store.rs @@ -25,6 +25,7 @@ use common_grpc::channel_manager::ChannelManager; use common_telemetry::tracing_context::TracingContext; use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::RwLock; +use tonic::codec::CompressionEncoding; use tonic::transport::Channel; use crate::client::{load_balance as lb, Id}; @@ -236,7 +237,10 @@ impl Inner { .get(addr) .context(error::CreateChannelSnafu)?; - Ok(StoreClient::new(channel)) + Ok(StoreClient::new(channel) + .accept_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Zstd) + .send_compressed(CompressionEncoding::Zstd)) } #[inline] diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index d5f369a3488d..47afa0ab416b 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -41,6 +41,7 @@ use tokio::net::TcpListener; use tokio::sync::mpsc::{self, Receiver, Sender}; #[cfg(feature = "pg_kvbackend")] use tokio_postgres::NoTls; +use tonic::codec::CompressionEncoding; use tonic::transport::server::{Router, TcpIncoming}; use crate::election::etcd::EtcdElection; @@ -178,14 +179,26 @@ pub async fn bootstrap_metasrv_with_router( Ok(()) } +#[macro_export] +macro_rules! add_compressed_service { + ($builder:expr, $server:expr) => { + $builder.add_service( + $server + .accept_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Zstd) + .send_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Zstd), + ) + }; +} + pub fn router(metasrv: Arc) -> Router { - tonic::transport::Server::builder() - .accept_http1(true) // for admin services - .add_service(HeartbeatServer::from_arc(metasrv.clone())) - .add_service(StoreServer::from_arc(metasrv.clone())) - .add_service(ClusterServer::from_arc(metasrv.clone())) - .add_service(ProcedureServiceServer::from_arc(metasrv.clone())) - .add_service(admin::make_admin_service(metasrv)) + let mut router = tonic::transport::Server::builder().accept_http1(true); // for admin services + let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone())); + let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone())); + let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone())); + let router = add_compressed_service!(router, ProcedureServiceServer::from_arc(metasrv.clone())); + router.add_service(admin::make_admin_service(metasrv)) } pub async fn metasrv_builder( diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index c84e1755cab7..0c5c34370400 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -38,6 +38,7 @@ pub mod selector; pub mod service; pub mod state; pub mod table_meta_alloc; + pub use crate::error::Result; mod greptimedb_telemetry; diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index 318fd16c87f2..4991c2ab8a4d 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -24,8 +24,10 @@ use common_meta::key::TableMetadataManager; use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::KvBackendRef; +use tonic::codec::CompressionEncoding; use tower::service_fn; +use crate::add_compressed_service; use crate::metasrv::builder::MetasrvBuilder; use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef}; @@ -80,11 +82,14 @@ pub async fn mock( let (client, server) = tokio::io::duplex(1024); let metasrv = Arc::new(metasrv); let service = metasrv.clone(); + let _handle = tokio::spawn(async move { - tonic::transport::Server::builder() - .add_service(HeartbeatServer::from_arc(service.clone())) - .add_service(StoreServer::from_arc(service.clone())) - .add_service(ProcedureServiceServer::from_arc(service.clone())) + let mut router = tonic::transport::Server::builder(); + let router = add_compressed_service!(router, HeartbeatServer::from_arc(service.clone())); + let router = add_compressed_service!(router, StoreServer::from_arc(service.clone())); + let router = + add_compressed_service!(router, ProcedureServiceServer::from_arc(service.clone())); + router .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await });