Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rpc): workaround for tonic hanging on large error message #18639

Merged
merged 8 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ disallowed-methods = [
{ path = "speedate::DateTime::parse_bytes_with_config", reason = "Please use `parse_bytes_rfc3339_with_config` instead." },
{ path = "speedate::Date::parse_str", reason = "Please use `parse_str_rfc3339` instead." },
{ path = "speedate::Date::parse_bytes", reason = "Please use `parse_bytes_rfc3339` instead." },
{ path = "tonic::transport::Endpoint::connect", reason = "Please use `EndpointExt::monitored_connect` instead." },
{ path = "tonic::transport::Endpoint::connect_lazy", reason = "Please use `EndpointExt::monitored_connect_lazy` instead." },
]
disallowed-types = [
{ path = "num_traits::AsPrimitive", reason = "Please use `From` or `TryFrom` with `OrderedFloat` instead." },
Expand Down
1 change: 1 addition & 0 deletions src/common/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ normal = ["workspace-hack"]
[dependencies]
auto_impl = "1"
bytes = "1"
cfg-or-panic = "0.2"
clap = { workspace = true }
easy-ext = "1"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
Expand Down
56 changes: 40 additions & 16 deletions src/common/metrics/src/monitor/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::sync::LazyLock;
use std::task::{Context, Poll};
use std::time::Duration;

use cfg_or_panic::cfg_or_panic;
use futures::FutureExt;
use http::Uri;
use hyper_util::client::legacy::connect::dns::{GaiAddrs, GaiFuture, GaiResolver, Name};
Expand Down Expand Up @@ -482,6 +483,16 @@ pub struct TcpConfig {
pub keepalive_duration: Option<Duration>,
}

#[allow(clippy::derivable_impls)]
impl Default for TcpConfig {
fn default() -> Self {
Self {
tcp_nodelay: false,
keepalive_duration: None,
}
}
}

pub fn monitor_connector<C>(
connector: C,
connection_type: impl Into<String>,
Expand Down Expand Up @@ -581,23 +592,40 @@ impl Service<Name> for MonitoredGaiResolver {
}
}

#[cfg_or_panic(not(madsim))]
fn monitored_http_connector(
connection_type: impl Into<String>,
config: TcpConfig,
) -> MonitoredConnection<HttpConnector<MonitoredGaiResolver>, MonitorNewConnectionImpl> {
let resolver = MonitoredGaiResolver::default();
let mut http = HttpConnector::new_with_resolver(resolver);

http.enforce_http(false);
http.set_nodelay(config.tcp_nodelay);
http.set_keepalive(config.keepalive_duration);

monitor_connector(http, connection_type)
}

/// Attach general configurations to the endpoint.
#[cfg_or_panic(not(madsim))]
fn configure_endpoint(endpoint: Endpoint) -> Endpoint {
// This is to mitigate https://github.com/risingwavelabs/risingwave/issues/18039.
// TODO: remove this after https://github.com/hyperium/hyper/issues/3724 gets resolved.
endpoint.http2_max_header_list_size(16 * 1024 * 1024)
}

#[easy_ext::ext(EndpointExt)]
impl Endpoint {
pub async fn monitored_connect(
self,
mut self,
connection_type: impl Into<String>,
config: TcpConfig,
) -> Result<Channel, tonic::transport::Error> {
#[cfg(not(madsim))]
{
let resolver = MonitoredGaiResolver::default();
let mut http = HttpConnector::new_with_resolver(resolver);

http.enforce_http(false);
http.set_nodelay(config.tcp_nodelay);
http.set_keepalive(config.keepalive_duration);

let connector = monitor_connector(http, connection_type);
self = configure_endpoint(self);
let connector = monitored_http_connector(connection_type, config);
self.connect_with_connector(connector).await
}
#[cfg(madsim)]
Expand All @@ -608,16 +636,12 @@ impl Endpoint {

#[cfg(not(madsim))]
pub fn monitored_connect_lazy(
self,
mut self,
connection_type: impl Into<String>,
config: TcpConfig,
) -> Channel {
let mut http = HttpConnector::new();
http.enforce_http(false);
http.set_nodelay(config.tcp_nodelay);
http.set_keepalive(config.keepalive_duration);

let connector = monitor_connector(http, connection_type);
self = configure_endpoint(self);
let connector = monitored_http_connector(connection_type, config);
self.connect_with_connector_lazy(connector)
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/ctl/src/cmd_impl/hummock/tiered_cache_tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::time::Duration;

use futures::future::try_join_all;
use itertools::Itertools;
use risingwave_common::monitor::EndpointExt;
use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
use risingwave_pb::monitor_service::TieredCacheTracingRequest;
use tonic::transport::Endpoint;
Expand All @@ -40,7 +41,7 @@ pub async fn tiered_cache_tracing(
let addr = worker_node.get_host().unwrap();
let channel = Endpoint::from_shared(format!("http://{}:{}", addr.host, addr.port))?
.connect_timeout(Duration::from_secs(5))
.connect()
.monitored_connect("grpc-tiered-cache-tracing-client", Default::default())
.await?;
let mut client = MonitorServiceClient::new(channel);
client
Expand Down
15 changes: 8 additions & 7 deletions src/rpc_client/src/compactor_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;
use std::time::Duration;

use risingwave_common::monitor::EndpointExt;
use risingwave_common::util::addr::HostAddr;
use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
use risingwave_pb::hummock::{
Expand Down Expand Up @@ -47,7 +48,7 @@ impl CompactorClient {
pub async fn new(host_addr: HostAddr) -> Result<Self> {
let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
.connect_timeout(Duration::from_secs(5))
.connect()
.monitored_connect("grpc-compactor-client", Default::default())
.await?;
Ok(Self {
monitor_client: MonitorServiceClient::new(channel),
Expand Down Expand Up @@ -94,26 +95,26 @@ pub struct GrpcCompactorProxyClient {
}

impl GrpcCompactorProxyClient {
pub fn new(channel: Channel, endpoint: String) -> Self {
pub async fn new(endpoint: String) -> Self {
let channel = Self::connect_to_endpoint(endpoint.clone()).await;
let core = Arc::new(RwLock::new(GrpcCompactorProxyClientCore::new(channel)));
Self { core, endpoint }
}

async fn recreate_core(&self) {
tracing::info!("GrpcCompactorProxyClient rpc transfer failed, try to reconnect");
let channel = self.connect_to_endpoint().await;
let channel = Self::connect_to_endpoint(self.endpoint.clone()).await;
let mut core = self.core.write().await;
*core = GrpcCompactorProxyClientCore::new(channel);
}

async fn connect_to_endpoint(&self) -> Channel {
let endpoint =
Endpoint::from_shared(self.endpoint.clone()).expect("Fail to construct tonic Endpoint");
async fn connect_to_endpoint(endpoint: String) -> Channel {
let endpoint = Endpoint::from_shared(endpoint).expect("Fail to construct tonic Endpoint");
endpoint
.http2_keep_alive_interval(Duration::from_secs(ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
.keep_alive_timeout(Duration::from_secs(ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
.connect_timeout(Duration::from_secs(5))
.connect()
.monitored_connect("grpc-compactor-proxy-client", Default::default())
.await
.expect("Failed to create channel via proxy rpc endpoint.")
}
Expand Down
2 changes: 1 addition & 1 deletion src/rpc_client/src/compute_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl ComputeClient {
"grpc-compute-client",
TcpConfig {
tcp_nodelay: true,
keepalive_duration: None,
..Default::default()
},
)
.await?;
Expand Down
Loading
Loading