diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 92f187465a32..bb3161d349dd 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -45,6 +45,9 @@ use crate::error::{ConvertMetaResponseSnafu, Result}; pub type Id = (u64, u64); +const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3; +const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3; + #[derive(Clone, Debug, Default)] pub struct MetaClientBuilder { id: Id, @@ -130,7 +133,12 @@ impl MetaClientBuilder { let mgr = client.channel_manager.clone(); if self.enable_heartbeat { - client.heartbeat = Some(HeartbeatClient::new(self.id, self.role, mgr.clone())); + client.heartbeat = Some(HeartbeatClient::new( + self.id, + self.role, + mgr.clone(), + DEFAULT_ASK_LEADER_MAX_RETRY, + )); } if self.enable_router { client.router = Some(RouterClient::new(self.id, self.role, mgr.clone())); @@ -143,7 +151,12 @@ impl MetaClientBuilder { } if self.enable_ddl { let mgr = self.ddl_channel_manager.unwrap_or(mgr); - client.ddl = Some(DdlClient::new(self.id, self.role, mgr)); + client.ddl = Some(DdlClient::new( + self.id, + self.role, + mgr, + DEFAULT_SUBMIT_DDL_MAX_RETRY, + )); } client diff --git a/src/meta-client/src/client/ask_leader.rs b/src/meta-client/src/client/ask_leader.rs index 529c8f94a039..d1dfb81b3b05 100644 --- a/src/meta-client/src/client/ask_leader.rs +++ b/src/meta-client/src/client/ask_leader.rs @@ -38,6 +38,7 @@ pub struct AskLeader { role: Role, leadership_group: Arc>, channel_manager: ChannelManager, + max_retry: usize, } impl AskLeader { @@ -46,6 +47,7 @@ impl AskLeader { role: Role, peers: impl Into>, channel_manager: ChannelManager, + max_retry: usize, ) -> Self { let leadership_group = Arc::new(RwLock::new(LeadershipGroup { leader: None, @@ -56,6 +58,7 @@ impl AskLeader { role, leadership_group, channel_manager, + max_retry, } } @@ -63,7 +66,7 @@ impl AskLeader { self.leadership_group.read().unwrap().leader.clone() } - pub async fn ask_leader(&self) -> Result { + async fn ask_leader_inner(&self) -> Result { let mut peers = { let leadership_group = self.leadership_group.read().unwrap(); leadership_group.peers.clone() @@ -99,6 +102,24 @@ impl AskLeader { Ok(leader) } + pub async fn ask_leader(&self) -> Result { + let mut times = 0; + while times < self.max_retry { + match self.ask_leader_inner().await { + Ok(res) => { + return Ok(res); + } + Err(err) => { + warn!("Failed to ask leader, source: {err}, retry {times} times"); + times += 1; + continue; + } + } + } + + error::RetryTimesExceededSnafu {}.fail() + } + fn create_asker(&self, addr: impl AsRef) -> Result> { Ok(HeartbeatClient::new( self.channel_manager diff --git a/src/meta-client/src/client/ddl.rs b/src/meta-client/src/client/ddl.rs index a92d89f039e4..a4b5c81f897d 100644 --- a/src/meta-client/src/client/ddl.rs +++ b/src/meta-client/src/client/ddl.rs @@ -21,7 +21,6 @@ use common_telemetry::{info, warn}; use snafu::{ensure, ResultExt}; use tokio::sync::RwLock; use tonic::transport::Channel; -use tonic::Code; use crate::client::ask_leader::AskLeader; use crate::client::Id; @@ -34,12 +33,13 @@ pub struct Client { } impl Client { - pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self { + pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self { let inner = Arc::new(RwLock::new(Inner { id, role, channel_manager, ask_leader: None, + max_retry, })); Self { inner } @@ -75,6 +75,7 @@ struct Inner { role: Role, channel_manager: ChannelManager, ask_leader: Option, + max_retry: usize, } impl Inner { @@ -100,6 +101,7 @@ impl Inner { self.role, peers, self.channel_manager.clone(), + self.max_retry, )); Ok(()) @@ -132,7 +134,8 @@ impl Inner { req.set_header(self.id, self.role); let ask_leader = self.ask_leader.as_ref().unwrap(); - loop { + let mut times = 0; + while times < self.max_retry { if let Some(leader) = &ask_leader.get_leader() { let mut client = self.make_client(leader)?; @@ -147,7 +150,10 @@ impl Inner { if let Some(header) = res.header.as_ref() { if let Some(err) = header.error.as_ref() { if err.code == ErrorCode::NotLeader as i32 { - let _ = ask_leader.ask_leader().await?; + warn!("Failed to submitting ddl to {leader}, not a leader"); + let leader = ask_leader.ask_leader().await?; + info!("DDL client updated to new leader addr: {leader}"); + times += 1; continue; } } @@ -156,11 +162,10 @@ impl Inner { } // The leader may be unreachable. Err(err) => { - warn!( - "Submitting ddl to {leader} is unreachable, try to update leader addr" - ); + warn!("Failed to submitting ddl to {leader}, source: {err}"); let leader = ask_leader.ask_leader().await?; info!("DDL client updated to new leader addr: {leader}"); + times += 1; continue; } } @@ -168,5 +173,7 @@ impl Inner { return Err(err); } } + + error::RetryTimesExceededSnafu.fail() } } diff --git a/src/meta-client/src/client/heartbeat.rs b/src/meta-client/src/client/heartbeat.rs index c1dabfdcf061..a66950fa9fec 100644 --- a/src/meta-client/src/client/heartbeat.rs +++ b/src/meta-client/src/client/heartbeat.rs @@ -93,8 +93,13 @@ pub struct Client { } impl Client { - pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self { - let inner = Arc::new(RwLock::new(Inner::new(id, role, channel_manager))); + pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self { + let inner = Arc::new(RwLock::new(Inner::new( + id, + role, + channel_manager, + max_retry, + ))); Self { inner } } @@ -130,15 +135,17 @@ struct Inner { role: Role, channel_manager: ChannelManager, ask_leader: Option, + max_retry: usize, } impl Inner { - fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self { + fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self { Self { id, role, channel_manager, ask_leader: None, + max_retry, } } @@ -164,6 +171,7 @@ impl Inner { self.role, peers, self.channel_manager.clone(), + self.max_retry, )); Ok(()) @@ -251,7 +259,7 @@ mod test { #[tokio::test] async fn test_start_client() { - let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default()); + let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default(), 3); assert!(!client.is_started().await); client .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) @@ -262,7 +270,7 @@ mod test { #[tokio::test] async fn test_already_start() { - let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default()); + let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default(), 3); client .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) .await diff --git a/src/meta-client/src/error.rs b/src/meta-client/src/error.rs index b33496c0704f..c30edcecc2f7 100644 --- a/src/meta-client/src/error.rs +++ b/src/meta-client/src/error.rs @@ -65,6 +65,9 @@ pub enum Error { location: Location, source: common_meta::error::Error, }, + + #[snafu(display("Retry exceeded max times"))] + RetryTimesExceeded {}, } #[allow(dead_code)] @@ -83,7 +86,8 @@ impl ErrorExt for Error { | Error::NotStarted { .. } | Error::SendHeartbeat { .. } | Error::CreateHeartbeatStream { .. } - | Error::CreateChannel { .. } => StatusCode::Internal, + | Error::CreateChannel { .. } + | Error::RetryTimesExceeded { .. } => StatusCode::Internal, Error::MetaServer { code, .. } => *code,