From c4293091c250a949739efd1deaa9083301ba7671 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Tue, 7 May 2024 19:02:31 +0800 Subject: [PATCH] fix: fix the issue where the sql election follower hangs (#16614) --- src/meta/src/rpc/election/sql.rs | 80 +++++++++++++++++++++++++++++--- 1 file changed, 73 insertions(+), 7 deletions(-) diff --git a/src/meta/src/rpc/election/sql.rs b/src/meta/src/rpc/election/sql.rs index 65c3ad613dde..49dd0474d497 100644 --- a/src/meta/src/rpc/election/sql.rs +++ b/src/meta/src/rpc/election/sql.rs @@ -64,6 +64,8 @@ pub trait SqlDriver: Send + Sync + 'static { async fn candidates(&self, service_name: &str) -> MetaResult>; async fn resign(&self, service_name: &str, id: &str) -> MetaResult<()>; + + async fn trim_candidates(&self, service_name: &str, timeout: i64) -> MetaResult<()>; } pub trait SqlDriverCommon { @@ -263,6 +265,23 @@ DO Ok(()) } + + async fn trim_candidates(&self, service_name: &str, timeout: i64) -> MetaResult<()> { + self.conn + .execute(Statement::from_sql_and_values( + DatabaseBackend::Sqlite, + format!( + r#" + DELETE FROM {table} WHERE service = $1 AND DATETIME({table}.last_heartbeat, '+' || $2 || ' second') < CURRENT_TIMESTAMP; + "#, + table = Self::member_table_name() + ), + vec![Value::from(service_name), Value::from(timeout)], + )) + .await?; + + Ok(()) + } } #[async_trait::async_trait] @@ -421,6 +440,23 @@ impl SqlDriver for MySqlDriver { Ok(()) } + + async fn trim_candidates(&self, service_name: &str, timeout: i64) -> MetaResult<()> { + self.conn + .execute(Statement::from_sql_and_values( + DatabaseBackend::MySql, + format!( + r#" + DELETE FROM {table} WHERE service = ? AND last_heartbeat < NOW() - INTERVAL ? SECOND; + "#, + table = Self::member_table_name() + ), + vec![Value::from(service_name), Value::from(timeout)], + )) + .await?; + + Ok(()) + } } #[async_trait::async_trait] @@ -579,6 +615,23 @@ impl SqlDriver for PostgresDriver { Ok(()) } + + async fn trim_candidates(&self, service_name: &str, timeout: i64) -> MetaResult<()> { + self.conn + .execute(Statement::from_sql_and_values( + DatabaseBackend::Postgres, + format!( + r#" + DELETE FROM {table} WHERE {table}.service = $1 AND {table}.last_heartbeat < NOW() - $2::INTERVAL; + "#, + table = Self::member_table_name() + ), + vec![Value::from(service_name), Value::from(timeout.to_string())], + )) + .await?; + + Ok(()) + } } #[async_trait::async_trait] @@ -638,13 +691,15 @@ where let mut election_ticker = time::interval(Duration::from_secs(1)); + let mut prev_leader = "".to_string(); + loop { tokio::select! { - _ = election_ticker.tick() => { - let election_row = self - .driver - .try_campaign(META_ELECTION_KEY, self.id.as_str(), ttl) - .await?; + _ = election_ticker.tick() => { + let election_row = self + .driver + .try_campaign(META_ELECTION_KEY, self.id.as_str(), ttl) + .await?; assert_eq!(election_row.service, META_ELECTION_KEY); @@ -652,14 +707,25 @@ where if !is_leader{ self.is_leader_sender.send_replace(true); is_leader = true; + } else { + self.is_leader_sender.send_replace(false); } } else if is_leader { tracing::warn!("leader has been changed to {}", election_row.id); break; + } else if prev_leader != election_row.id { + tracing::info!("leader is {}", election_row.id); + prev_leader = election_row.id.clone(); } - timeout_ticker.reset(); - } + timeout_ticker.reset(); + + if is_leader { + if let Err(e) = self.driver.trim_candidates(META_ELECTION_KEY, ttl * 2).await { + tracing::warn!(error = %e.as_report(), "trim candidates failed"); + } + } + } _ = timeout_ticker.tick() => { tracing::error!("member {} election timeout", self.id); break;