Skip to content

Commit

Permalink
feat: enable tcp no_delay by default for internal services (#2527)
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu authored Oct 7, 2023
1 parent 201acd1 commit 00fe7d1
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 9 deletions.
9 changes: 5 additions & 4 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ use snafu::ResultExt;
use tokio::net::TcpListener;
use tokio::select;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::server::Router;
use tonic::transport::server::{Router, TcpIncoming};

use crate::election::etcd::EtcdElection;
use crate::lock::etcd::EtcdLock;
Expand Down Expand Up @@ -121,10 +120,12 @@ pub async fn bootstrap_meta_srv_with_router(
let listener = TcpListener::bind(bind_addr)
.await
.context(error::TcpBindSnafu { addr: bind_addr })?;
let listener = TcpListenerStream::new(listener);

let incoming =
TcpIncoming::from_listener(listener, true, None).context(error::TcpIncomingSnafu)?;

router
.serve_with_incoming_shutdown(listener, async {
.serve_with_incoming_shutdown(incoming, async {
let _ = signal.recv().await;
})
.await
Expand Down
7 changes: 7 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to convert to TcpIncoming"))]
TcpIncoming {
#[snafu(source)]
error: Box<dyn std::error::Error + Send + Sync>,
},

#[snafu(display("Failed to start gRPC server"))]
StartGrpc {
#[snafu(source)]
Expand Down Expand Up @@ -546,6 +552,7 @@ impl ErrorExt for Error {
Error::EtcdFailed { .. }
| Error::ConnectEtcd { .. }
| Error::TcpBind { .. }
| Error::TcpIncoming { .. }
| Error::SerializeToJson { .. }
| Error::DeserializeFromJson { .. }
| Error::DecodeTableRoute { .. }
Expand Down
7 changes: 7 additions & 0 deletions src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ pub enum Error {
error: std::io::Error,
},

#[snafu(display("Failed to convert to TcpIncoming"))]
TcpIncoming {
#[snafu(source)]
error: Box<dyn std::error::Error + Send + Sync>,
},

#[snafu(display("Failed to execute query, query: {}", query))]
ExecuteQuery {
query: String,
Expand Down Expand Up @@ -391,6 +397,7 @@ impl ErrorExt for Error {
| AlreadyStarted { .. }
| InvalidPromRemoteReadQueryResult { .. }
| TcpBind { .. }
| TcpIncoming { .. }
| CatalogError { .. }
| GrpcReflectionService { .. }
| BuildHttpResponse { .. } => StatusCode::Internal,
Expand Down
14 changes: 9 additions & 5 deletions src/servers/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ use snafu::{ensure, OptionExt, ResultExt};
use tokio::net::TcpListener;
use tokio::sync::oneshot::{self, Receiver, Sender};
use tokio::sync::Mutex;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::server::TcpIncoming;
use tonic::{Request, Response, Status};
use tonic_reflection::server::{ServerReflection, ServerReflectionServer};

use self::flight::{FlightCraftRef, FlightCraftWrapper};
use self::prom_query_gateway::PrometheusGatewayService;
use self::region_server::{RegionServerHandlerRef, RegionServerRequestHandler};
use crate::error::{AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu};
use crate::error::{
AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu,
};
use crate::grpc::database::DatabaseService;
use crate::grpc::greptime_handler::GreptimeRequestHandler;
use crate::prometheus_handler::PrometheusHandlerRef;
Expand Down Expand Up @@ -209,7 +211,7 @@ impl Server for GrpcServer {
let max_recv_message_size = self.config.max_recv_message_size;
let max_send_message_size = self.config.max_send_message_size;
let (tx, rx) = oneshot::channel();
let (listener, addr) = {
let (incoming, addr) = {
let mut shutdown_tx = self.shutdown_tx.lock().await;
ensure!(
shutdown_tx.is_none(),
Expand All @@ -220,11 +222,13 @@ impl Server for GrpcServer {
.await
.context(TcpBindSnafu { addr })?;
let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
let incoming =
TcpIncoming::from_listener(listener, true, None).context(TcpIncomingSnafu)?;
info!("gRPC server is bound to {}", addr);

*shutdown_tx = Some(tx);

(listener, addr)
(incoming, addr)
};

let mut builder = tonic::transport::Server::builder()
Expand Down Expand Up @@ -271,7 +275,7 @@ impl Server for GrpcServer {

let _handle = common_runtime::spawn_bg(async move {
let result = builder
.serve_with_incoming_shutdown(TcpListenerStream::new(listener), rx.map(drop))
.serve_with_incoming_shutdown(incoming, rx.map(drop))
.await
.context(StartGrpcSnafu);
serve_state_tx.send(result)
Expand Down

0 comments on commit 00fe7d1

Please sign in to comment.