Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Aug 21, 2023
1 parent bad8b6e commit 3c77074
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 20 deletions.
1 change: 1 addition & 0 deletions src/meta-client/src/client/ask_leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ impl AskLeader {

error::RetryTimesExceededSnafu {
msg: "Failed to ask leader",
times: self.max_retry,
}
.fail()
}
Expand Down
46 changes: 28 additions & 18 deletions src/meta-client/src/client/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
use std::sync::Arc;

use api::v1::meta::ddl_task_client::DdlTaskClient;
use api::v1::meta::{ErrorCode, Role, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use api::v1::meta::{ErrorCode, ResponseHeader, Role, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
use tokio::sync::RwLock;
use tonic::transport::Channel;
use tonic::Code;
use tonic::{Code, Status};

use crate::client::ask_leader::AskLeader;
use crate::client::Id;
Expand Down Expand Up @@ -136,37 +136,32 @@ impl Inner {
req.set_header(self.id, self.role);
let ask_leader = self.ask_leader.as_ref().unwrap();
let mut times = 0;

while times < self.max_retry {
if let Some(leader) = &ask_leader.get_leader() {
let mut client = self.make_client(leader)?;

match client.submit_ddl_task(req.clone()).await {
Ok(res) => {
let res = res.into_inner();

if let Some(header) = res.header.as_ref() {
if let Some(err) = header.error.as_ref() {
if err.code == ErrorCode::NotLeader as i32 {
warn!("Failed to submitting ddl to {leader}, not a leader");
let leader = ask_leader.ask_leader().await?;
info!("DDL client updated to new leader addr: {leader}");
times += 1;
continue;
}
}
if is_not_leader(&res.header) {
warn!("Failed to submitting ddl to {leader}, not a leader");
let leader = ask_leader.ask_leader().await?;
info!("DDL client updated to new leader addr: {leader}");
times += 1;
continue;
}
return Ok(res);
}
Err(err) => {
Err(status) => {
// The leader may be unreachable.
if err.code() == Code::Unavailable || err.code() == Code::DeadlineExceeded {
warn!("Failed to submitting ddl to {leader}, source: {err}");
if is_unreachable(&status) {
warn!("Failed to submitting ddl to {leader}, source: {status}");
let leader = ask_leader.ask_leader().await?;
info!("DDL client updated to new leader addr: {leader}");
times += 1;
continue;
} else {
return Err(error::Error::from(err));
return Err(error::Error::from(status));
}
}
}
Expand All @@ -177,7 +172,22 @@ impl Inner {

error::RetryTimesExceededSnafu {
msg: "Failed to submit DDL task",
times: self.max_retry,
}
.fail()
}
}

fn is_unreachable(status: &Status) -> bool {
status.code() == Code::Unavailable || status.code() == Code::DeadlineExceeded
}

fn is_not_leader(header: &Option<ResponseHeader>) -> bool {
if let Some(header) = header {
if let Some(err) = header.error.as_ref() {
return err.code == ErrorCode::NotLeader as i32;
}
}

false
}
4 changes: 2 additions & 2 deletions src/meta-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ pub enum Error {
source: common_meta::error::Error,
},

#[snafu(display("Retry exceeded max times, message: {}", msg))]
RetryTimesExceeded { msg: String },
#[snafu(display("Retry exceeded max times({}), message: {}", times, msg))]
RetryTimesExceeded { times: usize, msg: String },
}

#[allow(dead_code)]
Expand Down

0 comments on commit 3c77074

Please sign in to comment.