Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): only retry on connection error when registering worker node #18061

Merged
merged 8 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,7 @@
common.WorkerNode.Resource resource = 5;
}

message AddWorkerNodeResponse {

Check failure on line 323 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "1" with name "status" on message "AddWorkerNodeResponse" was deleted without reserving the name "status".

Check failure on line 323 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "1" with name "status" on message "AddWorkerNodeResponse" was deleted without reserving the number "1".

Check failure on line 323 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present reserved name "system_params" on message "AddWorkerNodeResponse" was deleted.

Check failure on line 323 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present reserved range "[3]" on message "AddWorkerNodeResponse" is missing values: [3] were removed.
reserved 3;
reserved "system_params";
common.Status status = 1;
optional uint32 node_id = 2;
string cluster_id = 4;
}
Expand Down
3 changes: 1 addition & 2 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ pub async fn compute_node_serve(
},
&config.meta,
)
.await
.unwrap();
.await;

let state_store_url = system_params.state_store();

Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/common/meta_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Note: the default value of `RW_META_ADDR` is 'http://127.0.0.1:5690'.";
Property::default(),
&MetaConfig::default(),
)
.await?;
.await;
let worker_id = client.worker_id();
tracing::info!("registered as RiseCtl worker, worker_id = {}", worker_id);
Ok(client)
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ impl FrontendEnv {
Default::default(),
&config.meta,
)
.await?;
.await;

let worker_id = meta_client.worker_id();
info!("Assigned worker node id {}", worker_id);
Expand Down
30 changes: 7 additions & 23 deletions src/meta/service/src/cluster_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use risingwave_pb::meta::{
ListAllNodesResponse, UpdateWorkerNodeSchedulabilityRequest,
UpdateWorkerNodeSchedulabilityResponse,
};
use thiserror_ext::AsReport;
use tonic::{Request, Response, Status};

use crate::MetaError;
Expand Down Expand Up @@ -58,31 +57,16 @@ impl ClusterService for ClusterServiceImpl {
.property
.ok_or_else(|| MetaError::invalid_parameter("worker node property is not provided"))?;
let resource = req.resource.unwrap_or_default();
let result = self
let worker_id = self
.metadata_manager
.add_worker_node(worker_type, host, property, resource)
.await;
.await?;
let cluster_id = self.metadata_manager.cluster_id().to_string();
match result {
Ok(worker_id) => Ok(Response::new(AddWorkerNodeResponse {
status: None,
node_id: Some(worker_id),
cluster_id,
})),
Err(e) => {
if e.is_invalid_worker() {
return Ok(Response::new(AddWorkerNodeResponse {
status: Some(risingwave_pb::common::Status {
code: risingwave_pb::common::status::Code::UnknownWorker as i32,
message: e.to_report_string(),
}),
node_id: None,
cluster_id,
}));
}
Err(e.into())
}
}

Ok(Response::new(AddWorkerNodeResponse {
node_id: Some(worker_id),
cluster_id,
}))
}

/// Update schedulability of a compute node. Will not affect actors which are already running on
Expand Down
20 changes: 20 additions & 0 deletions src/rpc_client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,23 @@ macro_rules! impl_from_status {
}

impl_from_status!(stream, batch, meta, compute, compactor, connector);

impl RpcError {
/// Returns `true` if the error is a connection error. Typically used to determine if
/// the error is transient and can be retried.
pub fn is_connection_error(&self) -> bool {
match self {
RpcError::TransportError(_) => true,
RpcError::GrpcStatus(status) => matches!(
status.inner().code(),
tonic::Code::Unavailable // server not started
| tonic::Code::Unknown // could be transport error
| tonic::Code::Unimplemented // meta leader service not started
),
RpcError::MetaAddressParse(_) => false,
RpcError::Internal(anyhow) => anyhow
.downcast_ref::<Self>() // this skips all contexts attached to the error
.map_or(false, Self::is_connection_error),
}
}
}
130 changes: 80 additions & 50 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use risingwave_common::util::meta_addr::MetaAddressStrategy;
use risingwave_common::util::resource_util::cpu::total_cpu_available;
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use risingwave_common::RW_VERSION;
use risingwave_error::bail;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::{
Expand Down Expand Up @@ -220,12 +221,33 @@ impl MetaClient {
}

/// Register the current node to the cluster and set the corresponding worker id.
///
/// Retry if there's connection issue with the meta node. Exit the process if the registration fails.
pub async fn register_new(
addr_strategy: MetaAddressStrategy,
worker_type: WorkerType,
addr: &HostAddr,
property: Property,
meta_config: &MetaConfig,
) -> (Self, SystemParamsReader) {
let ret =
Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;

match ret {
Ok(ret) => ret,
Err(err) => {
tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
std::process::exit(1);
}
}
}

async fn register_new_inner(
addr_strategy: MetaAddressStrategy,
worker_type: WorkerType,
addr: &HostAddr,
property: Property,
meta_config: &MetaConfig,
) -> Result<(Self, SystemParamsReader)> {
tracing::info!("register meta client using strategy: {}", addr_strategy);

Expand All @@ -238,34 +260,35 @@ impl MetaClient {
if property.is_unschedulable {
tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
}
let init_result: Result<_> = tokio_retry::Retry::spawn(retry_strategy, || async {
let grpc_meta_client = GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;

let add_worker_resp = grpc_meta_client
.add_worker_node(AddWorkerNodeRequest {
worker_type: worker_type as i32,
host: Some(addr.to_protobuf()),
property: Some(property),
resource: Some(risingwave_pb::common::worker_node::Resource {
rw_version: RW_VERSION.to_string(),
total_memory_bytes: system_memory_available_bytes() as _,
total_cpu_cores: total_cpu_available() as _,
}),
})
.await?;
if let Some(status) = &add_worker_resp.status
&& status.code() == risingwave_pb::common::status::Code::UnknownWorker
{
tracing::error!("invalid worker: {}", status.message);
std::process::exit(1);
}
let init_result: Result<_> = tokio_retry::RetryIf::spawn(
retry_strategy,
|| async {
let grpc_meta_client =
GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;

let add_worker_resp = grpc_meta_client
.add_worker_node(AddWorkerNodeRequest {
worker_type: worker_type as i32,
host: Some(addr.to_protobuf()),
property: Some(property),
resource: Some(risingwave_pb::common::worker_node::Resource {
rw_version: RW_VERSION.to_string(),
total_memory_bytes: system_memory_available_bytes() as _,
total_cpu_cores: total_cpu_available() as _,
}),
})
.await
.context("failed to add worker node")?;

let system_params_resp = grpc_meta_client
.get_system_params(GetSystemParamsRequest {})
.await?;
let system_params_resp = grpc_meta_client
.get_system_params(GetSystemParamsRequest {})
.await
.context("failed to get initial system params")?;

Ok((add_worker_resp, system_params_resp, grpc_meta_client))
})
Ok((add_worker_resp, system_params_resp, grpc_meta_client))
},
RpcError::is_connection_error,
)
.await;

let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
Expand Down Expand Up @@ -1708,38 +1731,40 @@ impl MetaMemberManagement {
let mut fetched_members = None;

for (addr, client) in &mut member_group.members {
let client: Result<MetaMemberClient> = try {
match client {
let members: Result<_> = try {
let mut client = match client {
Some(cached_client) => cached_client.to_owned(),
None => {
let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
let channel = GrpcMetaClient::connect_to_endpoint(endpoint).await?;
let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
.await
.context("failed to create client")?;
let new_client: MetaMemberClient =
MetaMemberServiceClient::new(channel);
*client = Some(new_client.clone());

new_client
}
}
};

let resp = client
.members(MembersRequest {})
.await
.context("failed to fetch members")?;

resp.into_inner().members
};
if let Err(err) = client {
tracing::warn!(%addr, error = %err.as_report(), "failed to create client");
continue;
}
match client.unwrap().members(MembersRequest {}).await {
Err(err) => {
tracing::warn!(%addr, error = %err.as_report(), "failed to fetch members");
continue;
}
Ok(resp) => {
fetched_members = Some(resp.into_inner().members);
break;
}

let fetched = members.is_ok();
fetched_members = Some(members);
if fetched {
break;
}
}

let members =
fetched_members.ok_or_else(|| anyhow!("could not refresh members"))?;
let members = fetched_members
.context("no member available in the list")?
.context("could not refresh members")?;

// find new leader
let mut leader = None;
Expand Down Expand Up @@ -1916,7 +1941,7 @@ impl GrpcMetaClient {
.map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
.collect();

let endpoints = endpoints.clone();
let mut last_error = None;

for (endpoint, addr) in endpoints {
match Self::connect_to_endpoint(endpoint).await {
Expand All @@ -1929,14 +1954,19 @@ impl GrpcMetaClient {
error = %e.as_report(),
"Failed to connect to meta server {}, trying again",
addr,
)
);
last_error = Some(e);
}
}
}

Err(RpcError::Internal(anyhow!(
"Failed to connect to meta server"
)))
if let Some(last_error) = last_error {
Err(anyhow::anyhow!(last_error)
.context("failed to connect to all meta servers")
.into())
} else {
bail!("no meta server address provided")
}
}

async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
Expand Down
3 changes: 1 addition & 2 deletions src/storage/compactor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,7 @@ pub async fn compactor_serve(
Default::default(),
&config.meta,
)
.await
.unwrap();
.await;

info!("Assigned compactor id {}", meta_client.worker_id());

Expand Down
8 changes: 4 additions & 4 deletions src/tests/compaction_test/src/compaction_test_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ async fn init_metadata_for_replay(
std::process::exit(0);
},
ret = MetaClient::register_new(cluster_meta_endpoint.parse()?, WorkerType::RiseCtl, advertise_addr, Default::default(), &meta_config) => {
(meta_client, _) = ret.unwrap();
(meta_client, _) = ret;
},
}
let worker_id = meta_client.worker_id();
Expand All @@ -254,7 +254,7 @@ async fn init_metadata_for_replay(
Default::default(),
&meta_config,
)
.await?;
.await;
new_meta_client.activate(advertise_addr).await.unwrap();
if ci_mode {
let table_to_check = tables.iter().find(|t| t.name == "nexmark_q7").unwrap();
Expand Down Expand Up @@ -286,7 +286,7 @@ async fn pull_version_deltas(
Default::default(),
&MetaConfig::default(),
)
.await?;
.await;
let worker_id = meta_client.worker_id();
tracing::info!("Assigned pull worker id {}", worker_id);
meta_client.activate(advertise_addr).await.unwrap();
Expand Down Expand Up @@ -335,7 +335,7 @@ async fn start_replay(
Default::default(),
&config.meta,
)
.await?;
.await;
let worker_id = meta_client.worker_id();
tracing::info!("Assigned replay worker id {}", worker_id);
meta_client.activate(&advertise_addr).await.unwrap();
Expand Down
Loading