diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 3be4d701857b..c3b1deec613b 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -28,8 +28,7 @@ use snafu::ResultExt; use tokio::net::TcpListener; use tokio::select; use tokio::sync::mpsc::{self, Receiver, Sender}; -use tokio_stream::wrappers::TcpListenerStream; -use tonic::transport::server::Router; +use tonic::transport::server::{Router, TcpIncoming}; use crate::election::etcd::EtcdElection; use crate::lock::etcd::EtcdLock; @@ -121,10 +120,12 @@ pub async fn bootstrap_meta_srv_with_router( let listener = TcpListener::bind(bind_addr) .await .context(error::TcpBindSnafu { addr: bind_addr })?; - let listener = TcpListenerStream::new(listener); + + let incoming = + TcpIncoming::from_listener(listener, true, None).context(error::TcpIncomingSnafu)?; router - .serve_with_incoming_shutdown(listener, async { + .serve_with_incoming_shutdown(incoming, async { let _ = signal.recv().await; }) .await diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 16ee839ce928..0835039d12bf 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -150,6 +150,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to convert to TcpIncoming"))] + TcpIncoming { + #[snafu(source)] + error: Box, + }, + #[snafu(display("Failed to start gRPC server"))] StartGrpc { #[snafu(source)] @@ -546,6 +552,7 @@ impl ErrorExt for Error { Error::EtcdFailed { .. } | Error::ConnectEtcd { .. } | Error::TcpBind { .. } + | Error::TcpIncoming { .. } | Error::SerializeToJson { .. } | Error::DeserializeFromJson { .. } | Error::DecodeTableRoute { .. } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 4ff5b903a545..8647b60cddeb 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -78,6 +78,12 @@ pub enum Error { error: std::io::Error, }, + #[snafu(display("Failed to convert to TcpIncoming"))] + TcpIncoming { + #[snafu(source)] + error: Box, + }, + #[snafu(display("Failed to execute query, query: {}", query))] ExecuteQuery { query: String, @@ -391,6 +397,7 @@ impl ErrorExt for Error { | AlreadyStarted { .. } | InvalidPromRemoteReadQueryResult { .. } | TcpBind { .. } + | TcpIncoming { .. } | CatalogError { .. } | GrpcReflectionService { .. } | BuildHttpResponse { .. } => StatusCode::Internal, diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index 69ea1943a6fa..baa0a73d8938 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -44,14 +44,16 @@ use snafu::{ensure, OptionExt, ResultExt}; use tokio::net::TcpListener; use tokio::sync::oneshot::{self, Receiver, Sender}; use tokio::sync::Mutex; -use tokio_stream::wrappers::TcpListenerStream; +use tonic::transport::server::TcpIncoming; use tonic::{Request, Response, Status}; use tonic_reflection::server::{ServerReflection, ServerReflectionServer}; use self::flight::{FlightCraftRef, FlightCraftWrapper}; use self::prom_query_gateway::PrometheusGatewayService; use self::region_server::{RegionServerHandlerRef, RegionServerRequestHandler}; -use crate::error::{AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu}; +use crate::error::{ + AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu, +}; use crate::grpc::database::DatabaseService; use crate::grpc::greptime_handler::GreptimeRequestHandler; use crate::prometheus_handler::PrometheusHandlerRef; @@ -209,7 +211,7 @@ impl Server for GrpcServer { let max_recv_message_size = self.config.max_recv_message_size; let max_send_message_size = self.config.max_send_message_size; let (tx, rx) = oneshot::channel(); - let (listener, addr) = { + let (incoming, addr) = { let mut shutdown_tx = self.shutdown_tx.lock().await; ensure!( shutdown_tx.is_none(), @@ -220,11 +222,13 @@ impl Server for GrpcServer { .await .context(TcpBindSnafu { addr })?; let addr = listener.local_addr().context(TcpBindSnafu { addr })?; + let incoming = + TcpIncoming::from_listener(listener, true, None).context(TcpIncomingSnafu)?; info!("gRPC server is bound to {}", addr); *shutdown_tx = Some(tx); - (listener, addr) + (incoming, addr) }; let mut builder = tonic::transport::Server::builder() @@ -271,7 +275,7 @@ impl Server for GrpcServer { let _handle = common_runtime::spawn_bg(async move { let result = builder - .serve_with_incoming_shutdown(TcpListenerStream::new(listener), rx.map(drop)) + .serve_with_incoming_shutdown(incoming, rx.map(drop)) .await .context(StartGrpcSnafu); serve_state_tx.send(result)