diff --git a/src/rpc_client/src/compactor_client.rs b/src/rpc_client/src/compactor_client.rs index 8574e76695740..77fd3e0a44700 100644 --- a/src/rpc_client/src/compactor_client.rs +++ b/src/rpc_client/src/compactor_client.rs @@ -27,10 +27,17 @@ use risingwave_pb::meta::{GetSystemParamsRequest, GetSystemParamsResponse}; use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient; use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse}; use tokio::sync::RwLock; +use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tonic::transport::{Channel, Endpoint}; use crate::error::Result; +use crate::retry_rpc; +const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60; +const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60; +const DEFAULT_RETRY_INTERVAL: u64 = 20; +const DEFAULT_RETRY_MAX_DELAY: Duration = Duration::from_secs(5); +const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 3; #[derive(Clone)] pub struct CompactorClient { pub monitor_client: MonitorServiceClient, @@ -82,56 +89,128 @@ impl GrpcCompactorProxyClientCore { #[derive(Debug, Clone)] pub struct GrpcCompactorProxyClient { pub core: Arc>, + endpoint: String, } impl GrpcCompactorProxyClient { - pub fn new(channel: Channel) -> Self { + pub fn new(channel: Channel, endpoint: String) -> Self { let core = Arc::new(RwLock::new(GrpcCompactorProxyClientCore::new(channel))); - Self { core } + Self { core, endpoint } } - fn _recreate_core(&self, _channel: Channel) { - todo!() + async fn recreate_core(&self) { + tracing::info!("GrpcCompactorProxyClient rpc transfer failed, try to reconnect"); + let channel = self.connect_to_endpoint().await; + let mut core = self.core.write().await; + *core = GrpcCompactorProxyClientCore::new(channel); + } + + async fn connect_to_endpoint(&self) -> Channel { + let endpoint = + Endpoint::from_shared(self.endpoint.clone()).expect("Fail to construct tonic Endpoint"); + endpoint + .http2_keep_alive_interval(Duration::from_secs(ENDPOINT_KEEP_ALIVE_INTERVAL_SEC)) + .keep_alive_timeout(Duration::from_secs(ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC)) + .connect_timeout(Duration::from_secs(5)) + .connect() + .await + .expect("Failed to create channel via proxy rpc endpoint.") } pub async fn get_new_sst_ids( &self, request: GetNewSstIdsRequest, ) -> std::result::Result, tonic::Status> { - let mut hummock_client = self.core.read().await.hummock_client.clone(); - hummock_client.get_new_sst_ids(request).await + retry_rpc!(self, get_new_sst_ids, request, GetNewSstIdsResponse) } pub async fn report_compaction_task( &self, request: ReportCompactionTaskRequest, ) -> std::result::Result, tonic::Status> { - let mut hummock_client = self.core.read().await.hummock_client.clone(); - hummock_client.report_compaction_task(request).await + retry_rpc!( + self, + report_compaction_task, + request, + ReportCompactionTaskResponse + ) } pub async fn report_full_scan_task( &self, request: ReportFullScanTaskRequest, ) -> std::result::Result, tonic::Status> { - let mut hummock_client = self.core.read().await.hummock_client.clone(); - hummock_client.report_full_scan_task(request).await + retry_rpc!( + self, + report_full_scan_task, + request, + ReportFullScanTaskResponse + ) } pub async fn report_vacuum_task( &self, request: ReportVacuumTaskRequest, ) -> std::result::Result, tonic::Status> { - let mut hummock_client = self.core.read().await.hummock_client.clone(); - hummock_client.report_vacuum_task(request).await + retry_rpc!(self, report_vacuum_task, request, ReportVacuumTaskResponse) } pub async fn get_system_params( &self, ) -> std::result::Result, tonic::Status> { - let mut system_params_client = self.core.read().await.system_params_client.clone(); - system_params_client - .get_system_params(GetSystemParamsRequest {}) - .await + tokio_retry::RetryIf::spawn( + Self::get_retry_strategy(), + || async { + let mut system_params_client = self.core.read().await.system_params_client.clone(); + let rpc_res = system_params_client + .get_system_params(GetSystemParamsRequest {}) + .await; + if rpc_res.is_err() { + self.recreate_core().await; + } + rpc_res + }, + Self::should_retry, + ) + .await } + + #[inline(always)] + fn get_retry_strategy() -> impl Iterator { + ExponentialBackoff::from_millis(DEFAULT_RETRY_INTERVAL) + .max_delay(DEFAULT_RETRY_MAX_DELAY) + .take(DEFAULT_RETRY_MAX_ATTEMPTS) + .map(jitter) + } + + #[inline(always)] + fn should_retry(status: &tonic::Status) -> bool { + if status.code() == tonic::Code::Unavailable + || status.code() == tonic::Code::Unknown + || (status.code() == tonic::Code::Unauthenticated + && status.message().contains("invalid auth token")) + { + return true; + } + false + } +} + +#[macro_export] +macro_rules! retry_rpc { + ($self:expr, $rpc_call:ident, $request:expr, $response:ty) => { + tokio_retry::RetryIf::spawn( + Self::get_retry_strategy(), + || async { + let mut hummock_client = $self.core.read().await.hummock_client.clone(); + let rpc_res = hummock_client.$rpc_call($request.clone()).await; + if rpc_res.is_err() { + $self.recreate_core().await; + } + rpc_res + }, + Self::should_retry, + ) + .await + }; } diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index e099ed16ffcbd..779eec208f72c 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -319,8 +319,9 @@ pub async fn shared_compactor_serve( ); info!("> version: {} ({})", RW_VERSION, GIT_SHA); - let endpoint: &'static str = Box::leak(opts.proxy_rpc_endpoint.clone().into_boxed_str()); - let endpoint = Endpoint::from_static(endpoint); + let endpoint_str = opts.proxy_rpc_endpoint.clone().to_string(); + let endpoint = + Endpoint::from_shared(opts.proxy_rpc_endpoint).expect("Fail to construct tonic Endpoint"); let channel = endpoint .http2_keep_alive_interval(Duration::from_secs(ENDPOINT_KEEP_ALIVE_INTERVAL_SEC)) .keep_alive_timeout(Duration::from_secs(ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC)) @@ -328,7 +329,7 @@ pub async fn shared_compactor_serve( .connect() .await .expect("Failed to create channel via proxy rpc endpoint."); - let grpc_proxy_client = GrpcCompactorProxyClient::new(channel); + let grpc_proxy_client = GrpcCompactorProxyClient::new(channel, endpoint_str); let system_params_response = grpc_proxy_client .get_system_params() .await