From 07d8c8f6123a169157786d56438a1b67c6303b23 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 16 Aug 2024 12:19:57 +0800 Subject: [PATCH 1/8] retry only if connection error Signed-off-by: Bugen Zhao --- src/rpc_client/src/error.rs | 15 +++++++++ src/rpc_client/src/meta_client.rs | 55 +++++++++++++++---------------- 2 files changed, 42 insertions(+), 28 deletions(-) diff --git a/src/rpc_client/src/error.rs b/src/rpc_client/src/error.rs index c5c5613a32a4b..c7b0af96b9ada 100644 --- a/src/rpc_client/src/error.rs +++ b/src/rpc_client/src/error.rs @@ -77,3 +77,18 @@ macro_rules! impl_from_status { } impl_from_status!(stream, batch, meta, compute, compactor, connector); + +impl RpcError { + /// Returns `true` if the error is a connection error. Typically used to determine if + /// the error is transient and can be retried. + pub fn is_connection_error(&self) -> bool { + match self { + RpcError::TransportError(_) => true, + RpcError::GrpcStatus(status) => status.inner().code() == tonic::Code::Unavailable, + RpcError::MetaAddressParse(_) => false, + RpcError::Internal(anyhow) => anyhow + .downcast_ref::() + .map_or(false, Self::is_connection_error), + } + } +} diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 5a45e0752c9d8..1a1319d182689 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -238,34 +238,33 @@ impl MetaClient { if property.is_unschedulable { tracing::warn!("worker {:?} registered as unschedulable", addr.clone()); } - let init_result: Result<_> = tokio_retry::Retry::spawn(retry_strategy, || async { - let grpc_meta_client = GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?; - - let add_worker_resp = grpc_meta_client - .add_worker_node(AddWorkerNodeRequest { - worker_type: worker_type as i32, - host: Some(addr.to_protobuf()), - property: Some(property), - resource: Some(risingwave_pb::common::worker_node::Resource { - rw_version: RW_VERSION.to_string(), - total_memory_bytes: system_memory_available_bytes() as _, - total_cpu_cores: total_cpu_available() as _, - }), - }) - .await?; - if let Some(status) = &add_worker_resp.status - && status.code() == risingwave_pb::common::status::Code::UnknownWorker - { - tracing::error!("invalid worker: {}", status.message); - std::process::exit(1); - } - - let system_params_resp = grpc_meta_client - .get_system_params(GetSystemParamsRequest {}) - .await?; - - Ok((add_worker_resp, system_params_resp, grpc_meta_client)) - }) + let init_result: Result<_> = tokio_retry::RetryIf::spawn( + retry_strategy, + || async { + let grpc_meta_client = + GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?; + + let add_worker_resp = grpc_meta_client + .add_worker_node(AddWorkerNodeRequest { + worker_type: worker_type as i32, + host: Some(addr.to_protobuf()), + property: Some(property), + resource: Some(risingwave_pb::common::worker_node::Resource { + rw_version: RW_VERSION.to_string(), + total_memory_bytes: system_memory_available_bytes() as _, + total_cpu_cores: total_cpu_available() as _, + }), + }) + .await?; + + let system_params_resp = grpc_meta_client + .get_system_params(GetSystemParamsRequest {}) + .await?; + + Ok((add_worker_resp, system_params_resp, grpc_meta_client)) + }, + RpcError::is_connection_error, + ) .await; let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?; From 119564c51d50d329962a9fc9ac3261b1ea64ec9b Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 16 Aug 2024 12:22:15 +0800 Subject: [PATCH 2/8] remove status field Signed-off-by: Bugen Zhao --- proto/meta.proto | 3 --- src/meta/service/src/cluster_service.rs | 29 ++++++------------------- src/rpc_client/src/meta_client.rs | 6 +++-- 3 files changed, 11 insertions(+), 27 deletions(-) diff --git a/proto/meta.proto b/proto/meta.proto index bcb6c331549f2..4f51522cc006d 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -321,9 +321,6 @@ message AddWorkerNodeRequest { } message AddWorkerNodeResponse { - reserved 3; - reserved "system_params"; - common.Status status = 1; optional uint32 node_id = 2; string cluster_id = 4; } diff --git a/src/meta/service/src/cluster_service.rs b/src/meta/service/src/cluster_service.rs index 39cd40ed37400..94c9fbf97774c 100644 --- a/src/meta/service/src/cluster_service.rs +++ b/src/meta/service/src/cluster_service.rs @@ -58,31 +58,16 @@ impl ClusterService for ClusterServiceImpl { .property .ok_or_else(|| MetaError::invalid_parameter("worker node property is not provided"))?; let resource = req.resource.unwrap_or_default(); - let result = self + let worker_id = self .metadata_manager .add_worker_node(worker_type, host, property, resource) - .await; + .await?; let cluster_id = self.metadata_manager.cluster_id().to_string(); - match result { - Ok(worker_id) => Ok(Response::new(AddWorkerNodeResponse { - status: None, - node_id: Some(worker_id), - cluster_id, - })), - Err(e) => { - if e.is_invalid_worker() { - return Ok(Response::new(AddWorkerNodeResponse { - status: Some(risingwave_pb::common::Status { - code: risingwave_pb::common::status::Code::UnknownWorker as i32, - message: e.to_report_string(), - }), - node_id: None, - cluster_id, - })); - } - Err(e.into()) - } - } + + Ok(Response::new(AddWorkerNodeResponse { + node_id: Some(worker_id), + cluster_id, + })) } /// Update schedulability of a compute node. Will not affect actors which are already running on diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 1a1319d182689..1bfd1d25042cd 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -255,11 +255,13 @@ impl MetaClient { total_cpu_cores: total_cpu_available() as _, }), }) - .await?; + .await + .context("failed to add worker node")?; let system_params_resp = grpc_meta_client .get_system_params(GetSystemParamsRequest {}) - .await?; + .await + .context("failed to get initial system params")?; Ok((add_worker_resp, system_params_resp, grpc_meta_client)) }, From ca8ce2a158cd40eb0bb0cfd6285c50d2ba7064a4 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 16 Aug 2024 12:26:13 +0800 Subject: [PATCH 3/8] add unimplemented Signed-off-by: Bugen Zhao --- src/rpc_client/src/error.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/rpc_client/src/error.rs b/src/rpc_client/src/error.rs index c7b0af96b9ada..73aef679f478f 100644 --- a/src/rpc_client/src/error.rs +++ b/src/rpc_client/src/error.rs @@ -84,7 +84,10 @@ impl RpcError { pub fn is_connection_error(&self) -> bool { match self { RpcError::TransportError(_) => true, - RpcError::GrpcStatus(status) => status.inner().code() == tonic::Code::Unavailable, + RpcError::GrpcStatus(status) => matches!( + status.inner().code(), + tonic::Code::Unavailable | tonic::Code::Unimplemented + ), RpcError::MetaAddressParse(_) => false, RpcError::Internal(anyhow) => anyhow .downcast_ref::() From 7cef3faa229dfde827dc6f0d49550b53ea1e5bb3 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 16 Aug 2024 13:48:16 +0800 Subject: [PATCH 4/8] exit on error Signed-off-by: Bugen Zhao --- src/compute/src/server.rs | 3 +-- src/ctl/src/common/meta_service.rs | 2 +- src/frontend/src/session.rs | 2 +- src/rpc_client/src/meta_client.rs | 21 +++++++++++++++++++ src/storage/compactor/src/server.rs | 3 +-- .../src/compaction_test_runner.rs | 8 +++---- 6 files changed, 29 insertions(+), 10 deletions(-) diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 3270897c29d31..c850f839e620f 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -130,8 +130,7 @@ pub async fn compute_node_serve( }, &config.meta, ) - .await - .unwrap(); + .await; let state_store_url = system_params.state_store(); diff --git a/src/ctl/src/common/meta_service.rs b/src/ctl/src/common/meta_service.rs index ac539b9233ea6..6d70bdf942833 100644 --- a/src/ctl/src/common/meta_service.rs +++ b/src/ctl/src/common/meta_service.rs @@ -62,7 +62,7 @@ Note: the default value of `RW_META_ADDR` is 'http://127.0.0.1:5690'."; Property::default(), &MetaConfig::default(), ) - .await?; + .await; let worker_id = client.worker_id(); tracing::info!("registered as RiseCtl worker, worker_id = {}", worker_id); Ok(client) diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index b970899ef080f..685000dbbe650 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -267,7 +267,7 @@ impl FrontendEnv { Default::default(), &config.meta, ) - .await?; + .await; let worker_id = meta_client.worker_id(); info!("Assigned worker node id {}", worker_id); diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 1bfd1d25042cd..e31b1f9d4fcf6 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -220,12 +220,33 @@ impl MetaClient { } /// Register the current node to the cluster and set the corresponding worker id. + /// + /// Retry if there's connection issue with the meta node. Exit the process if the registration fails. pub async fn register_new( addr_strategy: MetaAddressStrategy, worker_type: WorkerType, addr: &HostAddr, property: Property, meta_config: &MetaConfig, + ) -> (Self, SystemParamsReader) { + let ret = + Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await; + + match ret { + Ok(ret) => ret, + Err(err) => { + tracing::error!(error = %err.as_report(), "failed to register worker, exiting..."); + std::process::exit(1); + } + } + } + + async fn register_new_inner( + addr_strategy: MetaAddressStrategy, + worker_type: WorkerType, + addr: &HostAddr, + property: Property, + meta_config: &MetaConfig, ) -> Result<(Self, SystemParamsReader)> { tracing::info!("register meta client using strategy: {}", addr_strategy); diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index e139bc201cd48..0b086ee06fe1e 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -197,8 +197,7 @@ pub async fn compactor_serve( Default::default(), &config.meta, ) - .await - .unwrap(); + .await; info!("Assigned compactor id {}", meta_client.worker_id()); diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index 0aa7d1d83c8d5..328c23f8fbe80 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -238,7 +238,7 @@ async fn init_metadata_for_replay( std::process::exit(0); }, ret = MetaClient::register_new(cluster_meta_endpoint.parse()?, WorkerType::RiseCtl, advertise_addr, Default::default(), &meta_config) => { - (meta_client, _) = ret.unwrap(); + (meta_client, _) = ret; }, } let worker_id = meta_client.worker_id(); @@ -254,7 +254,7 @@ async fn init_metadata_for_replay( Default::default(), &meta_config, ) - .await?; + .await; new_meta_client.activate(advertise_addr).await.unwrap(); if ci_mode { let table_to_check = tables.iter().find(|t| t.name == "nexmark_q7").unwrap(); @@ -286,7 +286,7 @@ async fn pull_version_deltas( Default::default(), &MetaConfig::default(), ) - .await?; + .await; let worker_id = meta_client.worker_id(); tracing::info!("Assigned pull worker id {}", worker_id); meta_client.activate(advertise_addr).await.unwrap(); @@ -335,7 +335,7 @@ async fn start_replay( Default::default(), &config.meta, ) - .await?; + .await; let worker_id = meta_client.worker_id(); tracing::info!("Assigned replay worker id {}", worker_id); meta_client.activate(&advertise_addr).await.unwrap(); From 6c07e5ededfdca5f6806018d7e3a1ef9c405b43d Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 16 Aug 2024 15:52:13 +0800 Subject: [PATCH 5/8] fix failed to connect error variant Signed-off-by: Bugen Zhao --- src/rpc_client/src/error.rs | 5 +++-- src/rpc_client/src/meta_client.rs | 16 +++++++++++----- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/rpc_client/src/error.rs b/src/rpc_client/src/error.rs index 73aef679f478f..cd42974730401 100644 --- a/src/rpc_client/src/error.rs +++ b/src/rpc_client/src/error.rs @@ -86,11 +86,12 @@ impl RpcError { RpcError::TransportError(_) => true, RpcError::GrpcStatus(status) => matches!( status.inner().code(), - tonic::Code::Unavailable | tonic::Code::Unimplemented + tonic::Code::Unavailable // server not started + | tonic::Code::Unimplemented // meta leader service not started ), RpcError::MetaAddressParse(_) => false, RpcError::Internal(anyhow) => anyhow - .downcast_ref::() + .downcast_ref::() // this skips all contexts attached to the error .map_or(false, Self::is_connection_error), } } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index e31b1f9d4fcf6..8b28ee5f6df04 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -36,6 +36,7 @@ use risingwave_common::util::meta_addr::MetaAddressStrategy; use risingwave_common::util::resource_util::cpu::total_cpu_available; use risingwave_common::util::resource_util::memory::system_memory_available_bytes; use risingwave_common::RW_VERSION; +use risingwave_error::bail; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{ @@ -1938,7 +1939,7 @@ impl GrpcMetaClient { .map(|addr| (Self::addr_to_endpoint(addr.clone()), addr)) .collect(); - let endpoints = endpoints.clone(); + let mut last_error = None; for (endpoint, addr) in endpoints { match Self::connect_to_endpoint(endpoint).await { @@ -1951,14 +1952,19 @@ impl GrpcMetaClient { error = %e.as_report(), "Failed to connect to meta server {}, trying again", addr, - ) + ); + last_error = Some(e); } } } - Err(RpcError::Internal(anyhow!( - "Failed to connect to meta server" - ))) + if let Some(last_error) = last_error { + Err(anyhow::anyhow!(last_error) + .context("failed to connect to all meta servers") + .into()) + } else { + bail!("no meta server address provided") + } } async fn connect_to_endpoint(endpoint: Endpoint) -> Result { From 0fdc78a4203d6c452ff69a57a91236052bd0997d Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 16 Aug 2024 15:54:47 +0800 Subject: [PATCH 6/8] fix check Signed-off-by: Bugen Zhao --- src/meta/service/src/cluster_service.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/meta/service/src/cluster_service.rs b/src/meta/service/src/cluster_service.rs index 94c9fbf97774c..6f1bfd68e5b31 100644 --- a/src/meta/service/src/cluster_service.rs +++ b/src/meta/service/src/cluster_service.rs @@ -25,7 +25,6 @@ use risingwave_pb::meta::{ ListAllNodesResponse, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse, }; -use thiserror_ext::AsReport; use tonic::{Request, Response, Status}; use crate::MetaError; From cf08b45cd05ec6ba19368510283cb606e00d8eff Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 16 Aug 2024 17:07:13 +0800 Subject: [PATCH 7/8] also tolerate no members Signed-off-by: Bugen Zhao --- src/rpc_client/src/meta_client.rs | 40 ++++++++++++++++--------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 8b28ee5f6df04..ae99c57afd7c9 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1731,38 +1731,40 @@ impl MetaMemberManagement { let mut fetched_members = None; for (addr, client) in &mut member_group.members { - let client: Result = try { - match client { + let members: Result<_> = try { + let mut client = match client { Some(cached_client) => cached_client.to_owned(), None => { let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone()); - let channel = GrpcMetaClient::connect_to_endpoint(endpoint).await?; + let channel = GrpcMetaClient::connect_to_endpoint(endpoint) + .await + .context("failed to create client")?; let new_client: MetaMemberClient = MetaMemberServiceClient::new(channel); *client = Some(new_client.clone()); new_client } - } + }; + + let resp = client + .members(MembersRequest {}) + .await + .context("failed to fetch members")?; + + resp.into_inner().members }; - if let Err(err) = client { - tracing::warn!(%addr, error = %err.as_report(), "failed to create client"); - continue; - } - match client.unwrap().members(MembersRequest {}).await { - Err(err) => { - tracing::warn!(%addr, error = %err.as_report(), "failed to fetch members"); - continue; - } - Ok(resp) => { - fetched_members = Some(resp.into_inner().members); - break; - } + + let fetched = members.is_ok(); + fetched_members = Some(members); + if fetched { + break; } } - let members = - fetched_members.ok_or_else(|| anyhow!("could not refresh members"))?; + let members = fetched_members + .context("no member available in the list")? + .context("could not refresh members")?; // find new leader let mut leader = None; From 4c8e678214a08773535ac9f669c3e981b741c85e Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 16 Aug 2024 17:19:57 +0800 Subject: [PATCH 8/8] also tolerate unknown error Signed-off-by: Bugen Zhao --- src/rpc_client/src/error.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rpc_client/src/error.rs b/src/rpc_client/src/error.rs index cd42974730401..26204cc1908c6 100644 --- a/src/rpc_client/src/error.rs +++ b/src/rpc_client/src/error.rs @@ -87,6 +87,7 @@ impl RpcError { RpcError::GrpcStatus(status) => matches!( status.inner().code(), tonic::Code::Unavailable // server not started + | tonic::Code::Unknown // could be transport error | tonic::Code::Unimplemented // meta leader service not started ), RpcError::MetaAddressParse(_) => false,