diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 92f187465a32..bb3161d349dd 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -45,6 +45,9 @@ use crate::error::{ConvertMetaResponseSnafu, Result}; pub type Id = (u64, u64); +const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3; +const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3; + #[derive(Clone, Debug, Default)] pub struct MetaClientBuilder { id: Id, @@ -130,7 +133,12 @@ impl MetaClientBuilder { let mgr = client.channel_manager.clone(); if self.enable_heartbeat { - client.heartbeat = Some(HeartbeatClient::new(self.id, self.role, mgr.clone())); + client.heartbeat = Some(HeartbeatClient::new( + self.id, + self.role, + mgr.clone(), + DEFAULT_ASK_LEADER_MAX_RETRY, + )); } if self.enable_router { client.router = Some(RouterClient::new(self.id, self.role, mgr.clone())); @@ -143,7 +151,12 @@ impl MetaClientBuilder { } if self.enable_ddl { let mgr = self.ddl_channel_manager.unwrap_or(mgr); - client.ddl = Some(DdlClient::new(self.id, self.role, mgr)); + client.ddl = Some(DdlClient::new( + self.id, + self.role, + mgr, + DEFAULT_SUBMIT_DDL_MAX_RETRY, + )); } client diff --git a/src/meta-client/src/client/ask_leader.rs b/src/meta-client/src/client/ask_leader.rs index 529c8f94a039..e79294bf1fb5 100644 --- a/src/meta-client/src/client/ask_leader.rs +++ b/src/meta-client/src/client/ask_leader.rs @@ -38,6 +38,7 @@ pub struct AskLeader { role: Role, leadership_group: Arc>, channel_manager: ChannelManager, + max_retry: usize, } impl AskLeader { @@ -46,6 +47,7 @@ impl AskLeader { role: Role, peers: impl Into>, channel_manager: ChannelManager, + max_retry: usize, ) -> Self { let leadership_group = Arc::new(RwLock::new(LeadershipGroup { leader: None, @@ -56,6 +58,7 @@ impl AskLeader { role, leadership_group, channel_manager, + max_retry, } } @@ -63,7 +66,7 @@ impl AskLeader { self.leadership_group.read().unwrap().leader.clone() } - pub async fn ask_leader(&self) -> Result { + async fn ask_leader_inner(&self) -> Result { let mut peers = { let leadership_group = self.leadership_group.read().unwrap(); leadership_group.peers.clone() @@ -99,6 +102,28 @@ impl AskLeader { Ok(leader) } + pub async fn ask_leader(&self) -> Result { + let mut times = 0; + while times < self.max_retry { + match self.ask_leader_inner().await { + Ok(res) => { + return Ok(res); + } + Err(err) => { + warn!("Failed to ask leader, source: {err}, retry {times} times"); + times += 1; + continue; + } + } + } + + error::RetryTimesExceededSnafu { + msg: "Failed to ask leader", + times: self.max_retry, + } + .fail() + } + fn create_asker(&self, addr: impl AsRef) -> Result> { Ok(HeartbeatClient::new( self.channel_manager diff --git a/src/meta-client/src/client/ddl.rs b/src/meta-client/src/client/ddl.rs index 0714f6252a96..7150d1808dfa 100644 --- a/src/meta-client/src/client/ddl.rs +++ b/src/meta-client/src/client/ddl.rs @@ -15,11 +15,13 @@ use std::sync::Arc; use api::v1::meta::ddl_task_client::DdlTaskClient; -use api::v1::meta::{ErrorCode, Role, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; +use api::v1::meta::{ErrorCode, ResponseHeader, Role, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_grpc::channel_manager::ChannelManager; +use common_telemetry::{info, warn}; use snafu::{ensure, ResultExt}; use tokio::sync::RwLock; use tonic::transport::Channel; +use tonic::{Code, Status}; use crate::client::ask_leader::AskLeader; use crate::client::Id; @@ -32,12 +34,13 @@ pub struct Client { } impl Client { - pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self { + pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self { let inner = Arc::new(RwLock::new(Inner { id, role, channel_manager, ask_leader: None, + max_retry, })); Self { inner } @@ -73,6 +76,7 @@ struct Inner { role: Role, channel_manager: ChannelManager, ask_leader: Option, + max_retry: usize, } impl Inner { @@ -98,6 +102,7 @@ impl Inner { self.role, peers, self.channel_manager.clone(), + self.max_retry, )); Ok(()) @@ -130,29 +135,59 @@ impl Inner { req.set_header(self.id, self.role); let ask_leader = self.ask_leader.as_ref().unwrap(); - loop { + let mut times = 0; + + while times < self.max_retry { if let Some(leader) = &ask_leader.get_leader() { let mut client = self.make_client(leader)?; - let res = client - .submit_ddl_task(req.clone()) - .await - .map_err(error::Error::from)?; - - let res = res.into_inner(); - - if let Some(header) = res.header.as_ref() { - if let Some(err) = header.error.as_ref() { - if err.code == ErrorCode::NotLeader as i32 { - let _ = ask_leader.ask_leader().await?; + match client.submit_ddl_task(req.clone()).await { + Ok(res) => { + let res = res.into_inner(); + if is_not_leader(&res.header) { + warn!("Failed to submitting ddl to {leader}, not a leader"); + let leader = ask_leader.ask_leader().await?; + info!("DDL client updated to new leader addr: {leader}"); + times += 1; + continue; + } + return Ok(res); + } + Err(status) => { + // The leader may be unreachable. + if is_unreachable(&status) { + warn!("Failed to submitting ddl to {leader}, source: {status}"); + let leader = ask_leader.ask_leader().await?; + info!("DDL client updated to new leader addr: {leader}"); + times += 1; continue; + } else { + return Err(error::Error::from(status)); } } } - - return Ok(res); } else if let Err(err) = ask_leader.ask_leader().await { return Err(err); } } + + error::RetryTimesExceededSnafu { + msg: "Failed to submit DDL task", + times: self.max_retry, + } + .fail() + } +} + +fn is_unreachable(status: &Status) -> bool { + status.code() == Code::Unavailable || status.code() == Code::DeadlineExceeded +} + +fn is_not_leader(header: &Option) -> bool { + if let Some(header) = header { + if let Some(err) = header.error.as_ref() { + return err.code == ErrorCode::NotLeader as i32; + } } + + false } diff --git a/src/meta-client/src/client/heartbeat.rs b/src/meta-client/src/client/heartbeat.rs index c1dabfdcf061..a66950fa9fec 100644 --- a/src/meta-client/src/client/heartbeat.rs +++ b/src/meta-client/src/client/heartbeat.rs @@ -93,8 +93,13 @@ pub struct Client { } impl Client { - pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self { - let inner = Arc::new(RwLock::new(Inner::new(id, role, channel_manager))); + pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self { + let inner = Arc::new(RwLock::new(Inner::new( + id, + role, + channel_manager, + max_retry, + ))); Self { inner } } @@ -130,15 +135,17 @@ struct Inner { role: Role, channel_manager: ChannelManager, ask_leader: Option, + max_retry: usize, } impl Inner { - fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self { + fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self { Self { id, role, channel_manager, ask_leader: None, + max_retry, } } @@ -164,6 +171,7 @@ impl Inner { self.role, peers, self.channel_manager.clone(), + self.max_retry, )); Ok(()) @@ -251,7 +259,7 @@ mod test { #[tokio::test] async fn test_start_client() { - let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default()); + let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default(), 3); assert!(!client.is_started().await); client .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) @@ -262,7 +270,7 @@ mod test { #[tokio::test] async fn test_already_start() { - let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default()); + let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default(), 3); client .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) .await diff --git a/src/meta-client/src/error.rs b/src/meta-client/src/error.rs index b33496c0704f..862a5b3b7316 100644 --- a/src/meta-client/src/error.rs +++ b/src/meta-client/src/error.rs @@ -65,6 +65,9 @@ pub enum Error { location: Location, source: common_meta::error::Error, }, + + #[snafu(display("Retry exceeded max times({}), message: {}", times, msg))] + RetryTimesExceeded { times: usize, msg: String }, } #[allow(dead_code)] @@ -83,7 +86,8 @@ impl ErrorExt for Error { | Error::NotStarted { .. } | Error::SendHeartbeat { .. } | Error::CreateHeartbeatStream { .. } - | Error::CreateChannel { .. } => StatusCode::Internal, + | Error::CreateChannel { .. } + | Error::RetryTimesExceeded { .. } => StatusCode::Internal, Error::MetaServer { code, .. } => *code,