Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Aug 21, 2023
1 parent ad97a01 commit 8e48010
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 16 deletions.
17 changes: 15 additions & 2 deletions src/meta-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ use crate::error::{ConvertMetaResponseSnafu, Result};

pub type Id = (u64, u64);

const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;

#[derive(Clone, Debug, Default)]
pub struct MetaClientBuilder {
id: Id,
Expand Down Expand Up @@ -130,7 +133,12 @@ impl MetaClientBuilder {
let mgr = client.channel_manager.clone();

if self.enable_heartbeat {
client.heartbeat = Some(HeartbeatClient::new(self.id, self.role, mgr.clone()));
client.heartbeat = Some(HeartbeatClient::new(
self.id,
self.role,
mgr.clone(),
DEFAULT_ASK_LEADER_MAX_RETRY,
));
}
if self.enable_router {
client.router = Some(RouterClient::new(self.id, self.role, mgr.clone()));
Expand All @@ -143,7 +151,12 @@ impl MetaClientBuilder {
}
if self.enable_ddl {
let mgr = self.ddl_channel_manager.unwrap_or(mgr);
client.ddl = Some(DdlClient::new(self.id, self.role, mgr));
client.ddl = Some(DdlClient::new(
self.id,
self.role,
mgr,
DEFAULT_SUBMIT_DDL_MAX_RETRY,
));
}

client
Expand Down
23 changes: 22 additions & 1 deletion src/meta-client/src/client/ask_leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub struct AskLeader {
role: Role,
leadership_group: Arc<RwLock<LeadershipGroup>>,
channel_manager: ChannelManager,
max_retry: usize,
}

impl AskLeader {
Expand All @@ -46,6 +47,7 @@ impl AskLeader {
role: Role,
peers: impl Into<Vec<String>>,
channel_manager: ChannelManager,
max_retry: usize,
) -> Self {
let leadership_group = Arc::new(RwLock::new(LeadershipGroup {
leader: None,
Expand All @@ -56,14 +58,15 @@ impl AskLeader {
role,
leadership_group,
channel_manager,
max_retry,
}
}

pub fn get_leader(&self) -> Option<String> {
self.leadership_group.read().unwrap().leader.clone()
}

pub async fn ask_leader(&self) -> Result<String> {
async fn ask_leader_inner(&self) -> Result<String> {
let mut peers = {
let leadership_group = self.leadership_group.read().unwrap();
leadership_group.peers.clone()
Expand Down Expand Up @@ -99,6 +102,24 @@ impl AskLeader {
Ok(leader)
}

pub async fn ask_leader(&self) -> Result<String> {
let mut times = 0;
while times < self.max_retry {
match self.ask_leader_inner().await {
Ok(res) => {
return Ok(res);
}
Err(err) => {
warn!("Failed to ask leader, source: {err}, retry {times} times");
times += 1;
continue;
}
}
}

error::RetryTimesExceededSnafu {}.fail()
}

fn create_asker(&self, addr: impl AsRef<str>) -> Result<HeartbeatClient<Channel>> {
Ok(HeartbeatClient::new(
self.channel_manager
Expand Down
21 changes: 14 additions & 7 deletions src/meta-client/src/client/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
use tokio::sync::RwLock;
use tonic::transport::Channel;
use tonic::Code;

use crate::client::ask_leader::AskLeader;
use crate::client::Id;
Expand All @@ -34,12 +33,13 @@ pub struct Client {
}

impl Client {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
let inner = Arc::new(RwLock::new(Inner {
id,
role,
channel_manager,
ask_leader: None,
max_retry,
}));

Self { inner }
Expand Down Expand Up @@ -75,6 +75,7 @@ struct Inner {
role: Role,
channel_manager: ChannelManager,
ask_leader: Option<AskLeader>,
max_retry: usize,
}

impl Inner {
Expand All @@ -100,6 +101,7 @@ impl Inner {
self.role,
peers,
self.channel_manager.clone(),
self.max_retry,
));

Ok(())
Expand Down Expand Up @@ -132,7 +134,8 @@ impl Inner {

req.set_header(self.id, self.role);
let ask_leader = self.ask_leader.as_ref().unwrap();
loop {
let mut times = 0;
while times < self.max_retry {
if let Some(leader) = &ask_leader.get_leader() {
let mut client = self.make_client(leader)?;

Expand All @@ -147,7 +150,10 @@ impl Inner {
if let Some(header) = res.header.as_ref() {
if let Some(err) = header.error.as_ref() {
if err.code == ErrorCode::NotLeader as i32 {
let _ = ask_leader.ask_leader().await?;
warn!("Failed to submitting ddl to {leader}, not a leader");
let leader = ask_leader.ask_leader().await?;
info!("DDL client updated to new leader addr: {leader}");
times += 1;
continue;
}
}
Expand All @@ -156,17 +162,18 @@ impl Inner {
}
// The leader may be unreachable.
Err(err) => {
warn!(
"Submitting ddl to {leader} is unreachable, try to update leader addr"
);
warn!("Failed to submitting ddl to {leader}, source: {err}");
let leader = ask_leader.ask_leader().await?;
info!("DDL client updated to new leader addr: {leader}");
times += 1;
continue;
}
}
} else if let Err(err) = ask_leader.ask_leader().await {
return Err(err);
}
}

error::RetryTimesExceededSnafu.fail()
}
}
18 changes: 13 additions & 5 deletions src/meta-client/src/client/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,13 @@ pub struct Client {
}

impl Client {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
let inner = Arc::new(RwLock::new(Inner::new(id, role, channel_manager)));
pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
let inner = Arc::new(RwLock::new(Inner::new(
id,
role,
channel_manager,
max_retry,
)));
Self { inner }
}

Expand Down Expand Up @@ -130,15 +135,17 @@ struct Inner {
role: Role,
channel_manager: ChannelManager,
ask_leader: Option<AskLeader>,
max_retry: usize,
}

impl Inner {
fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
Self {
id,
role,
channel_manager,
ask_leader: None,
max_retry,
}
}

Expand All @@ -164,6 +171,7 @@ impl Inner {
self.role,
peers,
self.channel_manager.clone(),
self.max_retry,
));

Ok(())
Expand Down Expand Up @@ -251,7 +259,7 @@ mod test {

#[tokio::test]
async fn test_start_client() {
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default());
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default(), 3);
assert!(!client.is_started().await);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
Expand All @@ -262,7 +270,7 @@ mod test {

#[tokio::test]
async fn test_already_start() {
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default());
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default(), 3);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
Expand Down
6 changes: 5 additions & 1 deletion src/meta-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ pub enum Error {
location: Location,
source: common_meta::error::Error,
},

#[snafu(display("Retry exceeded max times"))]
RetryTimesExceeded {},
}

#[allow(dead_code)]
Expand All @@ -83,7 +86,8 @@ impl ErrorExt for Error {
| Error::NotStarted { .. }
| Error::SendHeartbeat { .. }
| Error::CreateHeartbeatStream { .. }
| Error::CreateChannel { .. } => StatusCode::Internal,
| Error::CreateChannel { .. }
| Error::RetryTimesExceeded { .. } => StatusCode::Internal,

Error::MetaServer { code, .. } => *code,

Expand Down

0 comments on commit 8e48010

Please sign in to comment.