Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix ddl client can not update leader addr #2205

Merged
merged 5 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions src/meta-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ use crate::error::{ConvertMetaResponseSnafu, Result};

pub type Id = (u64, u64);

const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;

#[derive(Clone, Debug, Default)]
pub struct MetaClientBuilder {
id: Id,
Expand Down Expand Up @@ -130,7 +133,12 @@ impl MetaClientBuilder {
let mgr = client.channel_manager.clone();

if self.enable_heartbeat {
client.heartbeat = Some(HeartbeatClient::new(self.id, self.role, mgr.clone()));
client.heartbeat = Some(HeartbeatClient::new(
self.id,
self.role,
mgr.clone(),
DEFAULT_ASK_LEADER_MAX_RETRY,
));
}
if self.enable_router {
client.router = Some(RouterClient::new(self.id, self.role, mgr.clone()));
Expand All @@ -143,7 +151,12 @@ impl MetaClientBuilder {
}
if self.enable_ddl {
let mgr = self.ddl_channel_manager.unwrap_or(mgr);
client.ddl = Some(DdlClient::new(self.id, self.role, mgr));
client.ddl = Some(DdlClient::new(
self.id,
self.role,
mgr,
DEFAULT_SUBMIT_DDL_MAX_RETRY,
));
}

client
Expand Down
27 changes: 26 additions & 1 deletion src/meta-client/src/client/ask_leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub struct AskLeader {
role: Role,
leadership_group: Arc<RwLock<LeadershipGroup>>,
channel_manager: ChannelManager,
max_retry: usize,
}

impl AskLeader {
Expand All @@ -46,6 +47,7 @@ impl AskLeader {
role: Role,
peers: impl Into<Vec<String>>,
channel_manager: ChannelManager,
max_retry: usize,
) -> Self {
let leadership_group = Arc::new(RwLock::new(LeadershipGroup {
leader: None,
Expand All @@ -56,14 +58,15 @@ impl AskLeader {
role,
leadership_group,
channel_manager,
max_retry,
}
}

pub fn get_leader(&self) -> Option<String> {
self.leadership_group.read().unwrap().leader.clone()
}

pub async fn ask_leader(&self) -> Result<String> {
async fn ask_leader_inner(&self) -> Result<String> {
let mut peers = {
let leadership_group = self.leadership_group.read().unwrap();
leadership_group.peers.clone()
Expand Down Expand Up @@ -99,6 +102,28 @@ impl AskLeader {
Ok(leader)
}

pub async fn ask_leader(&self) -> Result<String> {
let mut times = 0;
while times < self.max_retry {
match self.ask_leader_inner().await {
Ok(res) => {
return Ok(res);
}
Err(err) => {
warn!("Failed to ask leader, source: {err}, retry {times} times");
times += 1;
continue;
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

error::RetryTimesExceededSnafu {
msg: "Failed to ask leader",
times: self.max_retry,
}
.fail()
}

fn create_asker(&self, addr: impl AsRef<str>) -> Result<HeartbeatClient<Channel>> {
Ok(HeartbeatClient::new(
self.channel_manager
Expand Down
67 changes: 51 additions & 16 deletions src/meta-client/src/client/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
use std::sync::Arc;

use api::v1::meta::ddl_task_client::DdlTaskClient;
use api::v1::meta::{ErrorCode, Role, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use api::v1::meta::{ErrorCode, ResponseHeader, Role, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
use tokio::sync::RwLock;
use tonic::transport::Channel;
use tonic::{Code, Status};

use crate::client::ask_leader::AskLeader;
use crate::client::Id;
Expand All @@ -32,12 +34,13 @@ pub struct Client {
}

impl Client {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
let inner = Arc::new(RwLock::new(Inner {
id,
role,
channel_manager,
ask_leader: None,
max_retry,
}));

Self { inner }
Expand Down Expand Up @@ -73,6 +76,7 @@ struct Inner {
role: Role,
channel_manager: ChannelManager,
ask_leader: Option<AskLeader>,
max_retry: usize,
}

impl Inner {
Expand All @@ -98,6 +102,7 @@ impl Inner {
self.role,
peers,
self.channel_manager.clone(),
self.max_retry,
));

Ok(())
Expand Down Expand Up @@ -130,29 +135,59 @@ impl Inner {

req.set_header(self.id, self.role);
let ask_leader = self.ask_leader.as_ref().unwrap();
loop {
let mut times = 0;

while times < self.max_retry {
if let Some(leader) = &ask_leader.get_leader() {
let mut client = self.make_client(leader)?;
let res = client
.submit_ddl_task(req.clone())
.await
.map_err(error::Error::from)?;

let res = res.into_inner();

if let Some(header) = res.header.as_ref() {
if let Some(err) = header.error.as_ref() {
if err.code == ErrorCode::NotLeader as i32 {
let _ = ask_leader.ask_leader().await?;
match client.submit_ddl_task(req.clone()).await {
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
Ok(res) => {
let res = res.into_inner();
if is_not_leader(&res.header) {
warn!("Failed to submitting ddl to {leader}, not a leader");
let leader = ask_leader.ask_leader().await?;
info!("DDL client updated to new leader addr: {leader}");
times += 1;
continue;
}
return Ok(res);
}
Err(status) => {
// The leader may be unreachable.
if is_unreachable(&status) {
warn!("Failed to submitting ddl to {leader}, source: {status}");
let leader = ask_leader.ask_leader().await?;
info!("DDL client updated to new leader addr: {leader}");
times += 1;
continue;
} else {
return Err(error::Error::from(status));
}
}
}

return Ok(res);
} else if let Err(err) = ask_leader.ask_leader().await {
return Err(err);
}
}

error::RetryTimesExceededSnafu {
msg: "Failed to submit DDL task",
times: self.max_retry,
}
.fail()
}
}

fn is_unreachable(status: &Status) -> bool {
status.code() == Code::Unavailable || status.code() == Code::DeadlineExceeded
}

fn is_not_leader(header: &Option<ResponseHeader>) -> bool {
if let Some(header) = header {
if let Some(err) = header.error.as_ref() {
return err.code == ErrorCode::NotLeader as i32;
}
}

false
}
18 changes: 13 additions & 5 deletions src/meta-client/src/client/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,13 @@ pub struct Client {
}

impl Client {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
let inner = Arc::new(RwLock::new(Inner::new(id, role, channel_manager)));
pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
let inner = Arc::new(RwLock::new(Inner::new(
id,
role,
channel_manager,
max_retry,
)));
Self { inner }
}

Expand Down Expand Up @@ -130,15 +135,17 @@ struct Inner {
role: Role,
channel_manager: ChannelManager,
ask_leader: Option<AskLeader>,
max_retry: usize,
}

impl Inner {
fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
Self {
id,
role,
channel_manager,
ask_leader: None,
max_retry,
}
}

Expand All @@ -164,6 +171,7 @@ impl Inner {
self.role,
peers,
self.channel_manager.clone(),
self.max_retry,
));

Ok(())
Expand Down Expand Up @@ -251,7 +259,7 @@ mod test {

#[tokio::test]
async fn test_start_client() {
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default());
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default(), 3);
assert!(!client.is_started().await);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
Expand All @@ -262,7 +270,7 @@ mod test {

#[tokio::test]
async fn test_already_start() {
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default());
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default(), 3);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
Expand Down
6 changes: 5 additions & 1 deletion src/meta-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ pub enum Error {
location: Location,
source: common_meta::error::Error,
},

#[snafu(display("Retry exceeded max times({}), message: {}", times, msg))]
RetryTimesExceeded { times: usize, msg: String },
}

#[allow(dead_code)]
Expand All @@ -83,7 +86,8 @@ impl ErrorExt for Error {
| Error::NotStarted { .. }
| Error::SendHeartbeat { .. }
| Error::CreateHeartbeatStream { .. }
| Error::CreateChannel { .. } => StatusCode::Internal,
| Error::CreateChannel { .. }
| Error::RetryTimesExceeded { .. } => StatusCode::Internal,

Error::MetaServer { code, .. } => *code,

Expand Down
Loading