Skip to content

Commit

Permalink
fix: fix ddl client can not update leader addr (GreptimeTeam#2205)
Browse files Browse the repository at this point in the history
* fix: fix ddl client can not update leader addr

* chore: apply suggestions from CR

* feat: add message to context

* fix: only retry if unavailable or deadline exceeded

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored and paomian committed Oct 19, 2023
1 parent 091da78 commit 02b379a
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 25 deletions.
17 changes: 15 additions & 2 deletions src/meta-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ use crate::error::{ConvertMetaResponseSnafu, Result};

pub type Id = (u64, u64);

const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;

#[derive(Clone, Debug, Default)]
pub struct MetaClientBuilder {
id: Id,
Expand Down Expand Up @@ -130,7 +133,12 @@ impl MetaClientBuilder {
let mgr = client.channel_manager.clone();

if self.enable_heartbeat {
client.heartbeat = Some(HeartbeatClient::new(self.id, self.role, mgr.clone()));
client.heartbeat = Some(HeartbeatClient::new(
self.id,
self.role,
mgr.clone(),
DEFAULT_ASK_LEADER_MAX_RETRY,
));
}
if self.enable_router {
client.router = Some(RouterClient::new(self.id, self.role, mgr.clone()));
Expand All @@ -143,7 +151,12 @@ impl MetaClientBuilder {
}
if self.enable_ddl {
let mgr = self.ddl_channel_manager.unwrap_or(mgr);
client.ddl = Some(DdlClient::new(self.id, self.role, mgr));
client.ddl = Some(DdlClient::new(
self.id,
self.role,
mgr,
DEFAULT_SUBMIT_DDL_MAX_RETRY,
));
}

client
Expand Down
27 changes: 26 additions & 1 deletion src/meta-client/src/client/ask_leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub struct AskLeader {
role: Role,
leadership_group: Arc<RwLock<LeadershipGroup>>,
channel_manager: ChannelManager,
max_retry: usize,
}

impl AskLeader {
Expand All @@ -46,6 +47,7 @@ impl AskLeader {
role: Role,
peers: impl Into<Vec<String>>,
channel_manager: ChannelManager,
max_retry: usize,
) -> Self {
let leadership_group = Arc::new(RwLock::new(LeadershipGroup {
leader: None,
Expand All @@ -56,14 +58,15 @@ impl AskLeader {
role,
leadership_group,
channel_manager,
max_retry,
}
}

pub fn get_leader(&self) -> Option<String> {
self.leadership_group.read().unwrap().leader.clone()
}

pub async fn ask_leader(&self) -> Result<String> {
async fn ask_leader_inner(&self) -> Result<String> {
let mut peers = {
let leadership_group = self.leadership_group.read().unwrap();
leadership_group.peers.clone()
Expand Down Expand Up @@ -99,6 +102,28 @@ impl AskLeader {
Ok(leader)
}

pub async fn ask_leader(&self) -> Result<String> {
let mut times = 0;
while times < self.max_retry {
match self.ask_leader_inner().await {
Ok(res) => {
return Ok(res);
}
Err(err) => {
warn!("Failed to ask leader, source: {err}, retry {times} times");
times += 1;
continue;
}
}
}

error::RetryTimesExceededSnafu {
msg: "Failed to ask leader",
times: self.max_retry,
}
.fail()
}

fn create_asker(&self, addr: impl AsRef<str>) -> Result<HeartbeatClient<Channel>> {
Ok(HeartbeatClient::new(
self.channel_manager
Expand Down
67 changes: 51 additions & 16 deletions src/meta-client/src/client/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
use std::sync::Arc;

use api::v1::meta::ddl_task_client::DdlTaskClient;
use api::v1::meta::{ErrorCode, Role, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use api::v1::meta::{ErrorCode, ResponseHeader, Role, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
use tokio::sync::RwLock;
use tonic::transport::Channel;
use tonic::{Code, Status};

use crate::client::ask_leader::AskLeader;
use crate::client::Id;
Expand All @@ -32,12 +34,13 @@ pub struct Client {
}

impl Client {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
let inner = Arc::new(RwLock::new(Inner {
id,
role,
channel_manager,
ask_leader: None,
max_retry,
}));

Self { inner }
Expand Down Expand Up @@ -73,6 +76,7 @@ struct Inner {
role: Role,
channel_manager: ChannelManager,
ask_leader: Option<AskLeader>,
max_retry: usize,
}

impl Inner {
Expand All @@ -98,6 +102,7 @@ impl Inner {
self.role,
peers,
self.channel_manager.clone(),
self.max_retry,
));

Ok(())
Expand Down Expand Up @@ -130,29 +135,59 @@ impl Inner {

req.set_header(self.id, self.role);
let ask_leader = self.ask_leader.as_ref().unwrap();
loop {
let mut times = 0;

while times < self.max_retry {
if let Some(leader) = &ask_leader.get_leader() {
let mut client = self.make_client(leader)?;
let res = client
.submit_ddl_task(req.clone())
.await
.map_err(error::Error::from)?;

let res = res.into_inner();

if let Some(header) = res.header.as_ref() {
if let Some(err) = header.error.as_ref() {
if err.code == ErrorCode::NotLeader as i32 {
let _ = ask_leader.ask_leader().await?;
match client.submit_ddl_task(req.clone()).await {
Ok(res) => {
let res = res.into_inner();
if is_not_leader(&res.header) {
warn!("Failed to submitting ddl to {leader}, not a leader");
let leader = ask_leader.ask_leader().await?;
info!("DDL client updated to new leader addr: {leader}");
times += 1;
continue;
}
return Ok(res);
}
Err(status) => {
// The leader may be unreachable.
if is_unreachable(&status) {
warn!("Failed to submitting ddl to {leader}, source: {status}");
let leader = ask_leader.ask_leader().await?;
info!("DDL client updated to new leader addr: {leader}");
times += 1;
continue;
} else {
return Err(error::Error::from(status));
}
}
}

return Ok(res);
} else if let Err(err) = ask_leader.ask_leader().await {
return Err(err);
}
}

error::RetryTimesExceededSnafu {
msg: "Failed to submit DDL task",
times: self.max_retry,
}
.fail()
}
}

fn is_unreachable(status: &Status) -> bool {
status.code() == Code::Unavailable || status.code() == Code::DeadlineExceeded
}

fn is_not_leader(header: &Option<ResponseHeader>) -> bool {
if let Some(header) = header {
if let Some(err) = header.error.as_ref() {
return err.code == ErrorCode::NotLeader as i32;
}
}

false
}
18 changes: 13 additions & 5 deletions src/meta-client/src/client/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,13 @@ pub struct Client {
}

impl Client {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
let inner = Arc::new(RwLock::new(Inner::new(id, role, channel_manager)));
pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
let inner = Arc::new(RwLock::new(Inner::new(
id,
role,
channel_manager,
max_retry,
)));
Self { inner }
}

Expand Down Expand Up @@ -130,15 +135,17 @@ struct Inner {
role: Role,
channel_manager: ChannelManager,
ask_leader: Option<AskLeader>,
max_retry: usize,
}

impl Inner {
fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
Self {
id,
role,
channel_manager,
ask_leader: None,
max_retry,
}
}

Expand All @@ -164,6 +171,7 @@ impl Inner {
self.role,
peers,
self.channel_manager.clone(),
self.max_retry,
));

Ok(())
Expand Down Expand Up @@ -251,7 +259,7 @@ mod test {

#[tokio::test]
async fn test_start_client() {
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default());
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default(), 3);
assert!(!client.is_started().await);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
Expand All @@ -262,7 +270,7 @@ mod test {

#[tokio::test]
async fn test_already_start() {
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default());
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default(), 3);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
Expand Down
6 changes: 5 additions & 1 deletion src/meta-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ pub enum Error {
location: Location,
source: common_meta::error::Error,
},

#[snafu(display("Retry exceeded max times({}), message: {}", times, msg))]
RetryTimesExceeded { times: usize, msg: String },
}

#[allow(dead_code)]
Expand All @@ -83,7 +86,8 @@ impl ErrorExt for Error {
| Error::NotStarted { .. }
| Error::SendHeartbeat { .. }
| Error::CreateHeartbeatStream { .. }
| Error::CreateChannel { .. } => StatusCode::Internal,
| Error::CreateChannel { .. }
| Error::RetryTimesExceeded { .. } => StatusCode::Internal,

Error::MetaServer { code, .. } => *code,

Expand Down

0 comments on commit 02b379a

Please sign in to comment.