Skip to content

Commit

Permalink
fix: fix the issue where the sql election follower hangs (#16614)
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky authored May 7, 2024
1 parent cfedc2a commit c429309
Showing 1 changed file with 73 additions and 7 deletions.
80 changes: 73 additions & 7 deletions src/meta/src/rpc/election/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ pub trait SqlDriver: Send + Sync + 'static {
async fn candidates(&self, service_name: &str) -> MetaResult<Vec<ElectionRow>>;

async fn resign(&self, service_name: &str, id: &str) -> MetaResult<()>;

async fn trim_candidates(&self, service_name: &str, timeout: i64) -> MetaResult<()>;
}

pub trait SqlDriverCommon {
Expand Down Expand Up @@ -263,6 +265,23 @@ DO

Ok(())
}

async fn trim_candidates(&self, service_name: &str, timeout: i64) -> MetaResult<()> {
self.conn
.execute(Statement::from_sql_and_values(
DatabaseBackend::Sqlite,
format!(
r#"
DELETE FROM {table} WHERE service = $1 AND DATETIME({table}.last_heartbeat, '+' || $2 || ' second') < CURRENT_TIMESTAMP;
"#,
table = Self::member_table_name()
),
vec![Value::from(service_name), Value::from(timeout)],
))
.await?;

Ok(())
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -421,6 +440,23 @@ impl SqlDriver for MySqlDriver {

Ok(())
}

async fn trim_candidates(&self, service_name: &str, timeout: i64) -> MetaResult<()> {
self.conn
.execute(Statement::from_sql_and_values(
DatabaseBackend::MySql,
format!(
r#"
DELETE FROM {table} WHERE service = ? AND last_heartbeat < NOW() - INTERVAL ? SECOND;
"#,
table = Self::member_table_name()
),
vec![Value::from(service_name), Value::from(timeout)],
))
.await?;

Ok(())
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -579,6 +615,23 @@ impl SqlDriver for PostgresDriver {

Ok(())
}

async fn trim_candidates(&self, service_name: &str, timeout: i64) -> MetaResult<()> {
self.conn
.execute(Statement::from_sql_and_values(
DatabaseBackend::Postgres,
format!(
r#"
DELETE FROM {table} WHERE {table}.service = $1 AND {table}.last_heartbeat < NOW() - $2::INTERVAL;
"#,
table = Self::member_table_name()
),
vec![Value::from(service_name), Value::from(timeout.to_string())],
))
.await?;

Ok(())
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -638,28 +691,41 @@ where

let mut election_ticker = time::interval(Duration::from_secs(1));

let mut prev_leader = "".to_string();

loop {
tokio::select! {
_ = election_ticker.tick() => {
let election_row = self
.driver
.try_campaign(META_ELECTION_KEY, self.id.as_str(), ttl)
.await?;
_ = election_ticker.tick() => {
let election_row = self
.driver
.try_campaign(META_ELECTION_KEY, self.id.as_str(), ttl)
.await?;

assert_eq!(election_row.service, META_ELECTION_KEY);

if election_row.id.eq_ignore_ascii_case(self.id.as_str()) {
if !is_leader{
self.is_leader_sender.send_replace(true);
is_leader = true;
} else {
self.is_leader_sender.send_replace(false);
}
} else if is_leader {
tracing::warn!("leader has been changed to {}", election_row.id);
break;
} else if prev_leader != election_row.id {
tracing::info!("leader is {}", election_row.id);
prev_leader = election_row.id.clone();
}

timeout_ticker.reset();
}
timeout_ticker.reset();

if is_leader {
if let Err(e) = self.driver.trim_candidates(META_ELECTION_KEY, ttl * 2).await {
tracing::warn!(error = %e.as_report(), "trim candidates failed");
}
}
}
_ = timeout_ticker.tick() => {
tracing::error!("member {} election timeout", self.id);
break;
Expand Down

0 comments on commit c429309

Please sign in to comment.