diff --git a/src/meta/src/rpc/election/etcd.rs b/src/meta/src/rpc/election/etcd.rs index 78e066bc49669..f30d8253cb95d 100644 --- a/src/meta/src/rpc/election/etcd.rs +++ b/src/meta/src/rpc/election/etcd.rs @@ -349,7 +349,8 @@ mod tests { use tokio::sync::watch::Sender; use tokio::time; - use crate::rpc::election_client::{ElectionClient, EtcdElectionClient, META_ELECTION_KEY}; + use crate::rpc::election::etcd::EtcdElectionClient; + use crate::rpc::election::{ElectionClient, META_ELECTION_KEY}; type ElectionHandle = (Sender<()>, Arc); diff --git a/src/meta/src/rpc/election/sql.rs b/src/meta/src/rpc/election/sql.rs index 6567857d857ec..4d4124036bda9 100644 --- a/src/meta/src/rpc/election/sql.rs +++ b/src/meta/src/rpc/election/sql.rs @@ -52,10 +52,10 @@ pub(crate) trait SqlDriverCommon { const ELECTION_LEADER_TABLE_NAME: &'static str = "election_leader"; const ELECTION_MEMBER_TABLE_NAME: &'static str = "election_members"; - fn election_table_name(&self) -> &'static str { + fn election_table_name() -> &'static str { Self::ELECTION_LEADER_TABLE_NAME } - fn member_table_name(&self) -> &'static str { + fn member_table_name() -> &'static str { Self::ELECTION_MEMBER_TABLE_NAME } } @@ -88,7 +88,7 @@ ON CONFLICT (id, service) DO UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat; "#, - table = self.member_table_name() + table = Self::member_table_name() )) .bind(id) .bind(service_name) @@ -120,7 +120,7 @@ ON CONFLICT (service) END RETURNING service, id, last_heartbeat; "#, - table = self.election_table_name() + table = Self::election_table_name() )) .bind(service_name) .bind(id) @@ -134,7 +134,7 @@ RETURNING service, id, last_heartbeat; async fn leader(&self, service_name: &str) -> MetaResult> { let row = sqlx::query_as::<_, ElectionRow>(&format!( r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#, - table = self.election_table_name() + table = Self::election_table_name() )) .bind(service_name) .fetch_optional(&self.pool) @@ -146,7 +146,7 @@ RETURNING service, id, last_heartbeat; async fn candidates(&self, service_name: &str) -> MetaResult> { let row = sqlx::query_as::<_, ElectionRow>(&format!( r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#, - table = self.member_table_name() + table = Self::member_table_name() )) .bind(service_name) .fetch_all(&self.pool) @@ -161,7 +161,7 @@ RETURNING service, id, last_heartbeat; r#" DELETE FROM {table} WHERE service = $1 AND id = $2; "#, - table = self.election_table_name() + table = Self::election_table_name() )) .bind(service_name) .bind(id) @@ -172,7 +172,7 @@ RETURNING service, id, last_heartbeat; r#" DELETE FROM {table} WHERE service = $1 AND id = $2; "#, - table = self.member_table_name() + table = Self::member_table_name() )) .bind(service_name) .bind(id) @@ -194,7 +194,7 @@ VALUES(?, ?, NOW()) ON duplicate KEY UPDATE last_heartbeat = VALUES(last_heartbeat); "#, - table = self.member_table_name() + table = Self::member_table_name() )) .bind(id) .bind(service_name) @@ -210,7 +210,7 @@ ON duplicate KEY id: &str, ttl: i64, ) -> MetaResult { - let row = sqlx::query::(&format!( + let _ = sqlx::query::(&format!( r#"INSERT IGNORE INTO {table} (service, id, last_heartbeat) @@ -221,7 +221,7 @@ ON duplicate KEY last_heartbeat = if(id = VALUES(id), VALUES(last_heartbeat), last_heartbeat);"#, - table = self.election_table_name() + table = Self::election_table_name() )) .bind(service_name) .bind(id) @@ -229,11 +229,9 @@ ON duplicate KEY .execute(&self.pool) .await?; - println!("row {:?}", row); - let row = sqlx::query_as::(&format!( r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#, - table = self.election_table_name(), + table = Self::election_table_name(), )) .bind(service_name) .fetch_one(&self.pool) @@ -245,7 +243,7 @@ ON duplicate KEY async fn leader(&self, service_name: &str) -> MetaResult> { let row = sqlx::query_as::(&format!( r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#, - table = self.election_table_name() + table = Self::election_table_name() )) .bind(service_name) .fetch_optional(&self.pool) @@ -257,7 +255,7 @@ ON duplicate KEY async fn candidates(&self, service_name: &str) -> MetaResult> { let row = sqlx::query_as::(&format!( r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#, - table = self.member_table_name() + table = Self::member_table_name() )) .bind(service_name) .fetch_all(&self.pool) @@ -272,7 +270,7 @@ ON duplicate KEY r#" DELETE FROM {table} WHERE service = ? AND id = ?; "#, - table = self.election_table_name() + table = Self::election_table_name() )) .bind(service_name) .bind(id) @@ -283,7 +281,7 @@ ON duplicate KEY r#" DELETE FROM {table} WHERE service = ? AND id = ?; "#, - table = self.member_table_name() + table = Self::member_table_name() )) .bind(service_name) .bind(id) @@ -306,7 +304,7 @@ ON CONFLICT (id, service) DO UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat; "#, - table = self.member_table_name() + table = Self::member_table_name() )) .bind(id) .bind(service_name) @@ -338,7 +336,7 @@ ON CONFLICT (service) END RETURNING service, id, last_heartbeat; "#, - table = self.election_table_name() + table = Self::election_table_name() )) .bind(service_name) .bind(id) @@ -346,15 +344,13 @@ RETURNING service, id, last_heartbeat; .fetch_one(&self.pool) .await?; - println!("row {:?}", row); - Ok(row) } async fn leader(&self, service_name: &str) -> MetaResult> { let row = sqlx::query_as::(&format!( r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#, - table = self.election_table_name() + table = Self::election_table_name() )) .bind(service_name) .fetch_optional(&self.pool) @@ -366,7 +362,7 @@ RETURNING service, id, last_heartbeat; async fn candidates(&self, service_name: &str) -> MetaResult> { let row = sqlx::query_as::(&format!( r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#, - table = self.member_table_name() + table = Self::member_table_name() )) .bind(service_name) .fetch_all(&self.pool) @@ -381,7 +377,7 @@ RETURNING service, id, last_heartbeat; r#" DELETE FROM {table} WHERE service = $1 AND id = $2; "#, - table = self.election_table_name() + table = Self::election_table_name() )) .bind(service_name) .bind(id) @@ -392,7 +388,7 @@ RETURNING service, id, last_heartbeat; r#" DELETE FROM {table} WHERE service = $1 AND id = $2; "#, - table = self.member_table_name() + table = Self::member_table_name() )) .bind(service_name) .bind(id) @@ -417,8 +413,6 @@ where async fn run_once(&self, ttl: i64, stop: Receiver<()>) -> MetaResult<()> { let stop = stop.clone(); - let mut election_ticker = tokio::time::interval(Duration::from_secs(1)); - let member_refresh_driver = self.driver.clone(); let id = self.id.clone(); @@ -431,6 +425,7 @@ where loop { tokio::select! { _ = ticker.tick() => { + if let Err(e) = member_refresh_driver .update_heartbeat(META_ELECTION_KEY, id.as_str()) .await { @@ -456,13 +451,15 @@ where let mut is_leader = false; + let mut election_ticker = time::interval(Duration::from_secs(1)); + loop { tokio::select! { - _ = election_ticker.tick() => { - let election_row = self - .driver - .try_campaign(META_ELECTION_KEY, self.id.as_str(), ttl) - .await?; + _ = election_ticker.tick() => { + let election_row = self + .driver + .try_campaign(META_ELECTION_KEY, self.id.as_str(), ttl) + .await?; assert_eq!(election_row.service, META_ELECTION_KEY); @@ -472,13 +469,12 @@ where is_leader = true; } } else if is_leader { - tracing::warn!("leader has been changed to {}", election_row.id); - break; - + tracing::warn!("leader has been changed to {}", election_row.id); + break; } - timeout_ticker.reset(); - } + timeout_ticker.reset(); + } _ = timeout_ticker.tick() => { tracing::error!("member {} election timeout", self.id); break; @@ -497,7 +493,6 @@ where } } } - self.is_leader_sender.send_replace(false); return Ok(()); @@ -539,3 +534,91 @@ where *self.is_leader_sender.borrow() } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use sqlx::sqlite::SqlitePoolOptions; + use sqlx::SqlitePool; + use tokio::sync::watch; + + use crate::rpc::election::sql::{SqlBackendElectionClient, SqlDriverCommon, SqliteDriver}; + use crate::{ElectionClient, MetaResult}; + + async fn prepare_sqlite_env() -> MetaResult { + let pool = SqlitePoolOptions::new().connect("sqlite::memory:").await?; + let _ = sqlx::query( + &format!("CREATE TABLE {table} (service VARCHAR(256) PRIMARY KEY, id VARCHAR(256), last_heartbeat DATETIME)", + table = SqliteDriver::election_table_name())) + .execute(&pool).await?; + + let _ = sqlx::query( + &format!("CREATE TABLE {table} (service VARCHAR(256), id VARCHAR(256), last_heartbeat DATETIME, PRIMARY KEY (service, id))", + table = SqliteDriver::member_table_name())) + .execute(&pool).await?; + + Ok(pool) + } + + #[tokio::test] + async fn test_sql_election() { + let id = "test_id".to_string(); + let pool = prepare_sqlite_env().await.unwrap(); + + let provider = SqliteDriver { pool }; + let (sender, _) = watch::channel(false); + let sql_election_client: Arc = Arc::new(SqlBackendElectionClient { + id, + driver: Arc::new(provider), + is_leader_sender: sender, + }); + let (stop_sender, _) = watch::channel(()); + + let stop_receiver = stop_sender.subscribe(); + + let mut receiver = sql_election_client.subscribe(); + let client_ = sql_election_client.clone(); + tokio::spawn(async move { client_.run_once(10, stop_receiver).await.unwrap() }); + + loop { + receiver.changed().await.unwrap(); + if *receiver.borrow() { + assert!(sql_election_client.is_leader().await); + break; + } + } + } + + #[tokio::test] + async fn test_sql_election_multi() { + let (stop_sender, _) = watch::channel(()); + + let mut clients = vec![]; + + let pool = prepare_sqlite_env().await.unwrap(); + for i in 1..3 { + let id = format!("test_id_{}", i); + let provider = SqliteDriver { pool: pool.clone() }; + let (sender, _) = watch::channel(false); + let sql_election_client: Arc = Arc::new(SqlBackendElectionClient { + id, + driver: Arc::new(provider), + is_leader_sender: sender, + }); + + let stop_receiver = stop_sender.subscribe(); + let client_ = sql_election_client.clone(); + tokio::spawn(async move { client_.run_once(10, stop_receiver).await.unwrap() }); + clients.push(sql_election_client); + } + + let mut is_leaders = vec![]; + + for client in clients { + is_leaders.push(client.is_leader().await); + } + + assert!(is_leaders.iter().filter(|&x| *x).count() <= 1); + } +}