Skip to content

Commit

Permalink
feat(serverless compaction): add retry strategy for grpc proxy client (
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu authored Sep 26, 2023
1 parent 31aa925 commit ea9b94c
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 19 deletions.
111 changes: 95 additions & 16 deletions src/rpc_client/src/compactor_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,17 @@ use risingwave_pb::meta::{GetSystemParamsRequest, GetSystemParamsResponse};
use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
use tokio::sync::RwLock;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tonic::transport::{Channel, Endpoint};

use crate::error::Result;
use crate::retry_rpc;
const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;

const DEFAULT_RETRY_INTERVAL: u64 = 20;
const DEFAULT_RETRY_MAX_DELAY: Duration = Duration::from_secs(5);
const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 3;
#[derive(Clone)]
pub struct CompactorClient {
pub monitor_client: MonitorServiceClient<Channel>,
Expand Down Expand Up @@ -82,56 +89,128 @@ impl GrpcCompactorProxyClientCore {
#[derive(Debug, Clone)]
pub struct GrpcCompactorProxyClient {
pub core: Arc<RwLock<GrpcCompactorProxyClientCore>>,
endpoint: String,
}

impl GrpcCompactorProxyClient {
pub fn new(channel: Channel) -> Self {
pub fn new(channel: Channel, endpoint: String) -> Self {
let core = Arc::new(RwLock::new(GrpcCompactorProxyClientCore::new(channel)));
Self { core }
Self { core, endpoint }
}

fn _recreate_core(&self, _channel: Channel) {
todo!()
async fn recreate_core(&self) {
tracing::info!("GrpcCompactorProxyClient rpc transfer failed, try to reconnect");
let channel = self.connect_to_endpoint().await;
let mut core = self.core.write().await;
*core = GrpcCompactorProxyClientCore::new(channel);
}

async fn connect_to_endpoint(&self) -> Channel {
let endpoint =
Endpoint::from_shared(self.endpoint.clone()).expect("Fail to construct tonic Endpoint");
endpoint
.http2_keep_alive_interval(Duration::from_secs(ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
.keep_alive_timeout(Duration::from_secs(ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
.connect_timeout(Duration::from_secs(5))
.connect()
.await
.expect("Failed to create channel via proxy rpc endpoint.")
}

pub async fn get_new_sst_ids(
&self,
request: GetNewSstIdsRequest,
) -> std::result::Result<tonic::Response<GetNewSstIdsResponse>, tonic::Status> {
let mut hummock_client = self.core.read().await.hummock_client.clone();
hummock_client.get_new_sst_ids(request).await
retry_rpc!(self, get_new_sst_ids, request, GetNewSstIdsResponse)
}

pub async fn report_compaction_task(
&self,
request: ReportCompactionTaskRequest,
) -> std::result::Result<tonic::Response<ReportCompactionTaskResponse>, tonic::Status> {
let mut hummock_client = self.core.read().await.hummock_client.clone();
hummock_client.report_compaction_task(request).await
retry_rpc!(
self,
report_compaction_task,
request,
ReportCompactionTaskResponse
)
}

pub async fn report_full_scan_task(
&self,
request: ReportFullScanTaskRequest,
) -> std::result::Result<tonic::Response<ReportFullScanTaskResponse>, tonic::Status> {
let mut hummock_client = self.core.read().await.hummock_client.clone();
hummock_client.report_full_scan_task(request).await
retry_rpc!(
self,
report_full_scan_task,
request,
ReportFullScanTaskResponse
)
}

pub async fn report_vacuum_task(
&self,
request: ReportVacuumTaskRequest,
) -> std::result::Result<tonic::Response<ReportVacuumTaskResponse>, tonic::Status> {
let mut hummock_client = self.core.read().await.hummock_client.clone();
hummock_client.report_vacuum_task(request).await
retry_rpc!(self, report_vacuum_task, request, ReportVacuumTaskResponse)
}

pub async fn get_system_params(
&self,
) -> std::result::Result<tonic::Response<GetSystemParamsResponse>, tonic::Status> {
let mut system_params_client = self.core.read().await.system_params_client.clone();
system_params_client
.get_system_params(GetSystemParamsRequest {})
.await
tokio_retry::RetryIf::spawn(
Self::get_retry_strategy(),
|| async {
let mut system_params_client = self.core.read().await.system_params_client.clone();
let rpc_res = system_params_client
.get_system_params(GetSystemParamsRequest {})
.await;
if rpc_res.is_err() {
self.recreate_core().await;
}
rpc_res
},
Self::should_retry,
)
.await
}

#[inline(always)]
fn get_retry_strategy() -> impl Iterator<Item = Duration> {
ExponentialBackoff::from_millis(DEFAULT_RETRY_INTERVAL)
.max_delay(DEFAULT_RETRY_MAX_DELAY)
.take(DEFAULT_RETRY_MAX_ATTEMPTS)
.map(jitter)
}

#[inline(always)]
fn should_retry(status: &tonic::Status) -> bool {
if status.code() == tonic::Code::Unavailable
|| status.code() == tonic::Code::Unknown
|| (status.code() == tonic::Code::Unauthenticated
&& status.message().contains("invalid auth token"))
{
return true;
}
false
}
}

#[macro_export]
macro_rules! retry_rpc {
($self:expr, $rpc_call:ident, $request:expr, $response:ty) => {
tokio_retry::RetryIf::spawn(
Self::get_retry_strategy(),
|| async {
let mut hummock_client = $self.core.read().await.hummock_client.clone();
let rpc_res = hummock_client.$rpc_call($request.clone()).await;
if rpc_res.is_err() {
$self.recreate_core().await;
}
rpc_res
},
Self::should_retry,
)
.await
};
}
7 changes: 4 additions & 3 deletions src/storage/compactor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,16 +319,17 @@ pub async fn shared_compactor_serve(
);
info!("> version: {} ({})", RW_VERSION, GIT_SHA);

let endpoint: &'static str = Box::leak(opts.proxy_rpc_endpoint.clone().into_boxed_str());
let endpoint = Endpoint::from_static(endpoint);
let endpoint_str = opts.proxy_rpc_endpoint.clone().to_string();
let endpoint =
Endpoint::from_shared(opts.proxy_rpc_endpoint).expect("Fail to construct tonic Endpoint");
let channel = endpoint
.http2_keep_alive_interval(Duration::from_secs(ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
.keep_alive_timeout(Duration::from_secs(ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
.connect_timeout(Duration::from_secs(5))
.connect()
.await
.expect("Failed to create channel via proxy rpc endpoint.");
let grpc_proxy_client = GrpcCompactorProxyClient::new(channel);
let grpc_proxy_client = GrpcCompactorProxyClient::new(channel, endpoint_str);
let system_params_response = grpc_proxy_client
.get_system_params()
.await
Expand Down

0 comments on commit ea9b94c

Please sign in to comment.