From 5074686e811f00f83a17556792b7e8e0f7b688d2 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Thu, 19 Dec 2024 20:13:19 +0800 Subject: [PATCH] refactor: check both lock and expire time, add more comments --- src/meta-srv/src/election/etcd.rs | 14 +- src/meta-srv/src/election/postgres.rs | 362 +++++++++++--------------- src/meta-srv/src/error.rs | 10 + 3 files changed, 172 insertions(+), 214 deletions(-) diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index 60dcadc65f66..274a6a7e4792 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -282,12 +282,9 @@ impl Election for EtcdElection { .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) .is_ok() { - if let Err(e) = self - .leader_watcher + self.leader_watcher .send(LeaderChangeMessage::StepDown(Arc::new(leader.clone()))) - { - error!(e; "Failed to send leader change message"); - } + .context(error::SendLeaderChangeSnafu)?; } } @@ -342,12 +339,9 @@ impl EtcdElection { { self.infancy.store(true, Ordering::Relaxed); - if let Err(e) = self - .leader_watcher + self.leader_watcher .send(LeaderChangeMessage::Elected(Arc::new(leader))) - { - error!(e; "Failed to send leader change message"); - } + .context(error::SendLeaderChangeSnafu)?; } } diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 61d96717c035..85c03b721603 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -20,12 +20,11 @@ use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, MET use common_meta::kv_backend::postgres::{ CAS, POINT_DELETE, POINT_GET, PREFIX_SCAN, PUT_IF_NOT_EXISTS, }; -use common_telemetry::{error, info, warn}; +use common_telemetry::{info, warn}; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; -use tokio::time::{timeout, MissedTickBehavior}; use tokio_postgres::Client; use crate::election::{ @@ -33,24 +32,28 @@ use crate::election::{ KEEP_ALIVE_INTERVAL_SECS, }; use crate::error::{ - DeserializeFromJsonSnafu, NoLeaderSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu, - UnexpectedSnafu, + DeserializeFromJsonSnafu, NoLeaderSnafu, PostgresExecutionSnafu, Result, SendLeaderChangeSnafu, + SerializeToJsonSnafu, UnexpectedSnafu, }; use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; -// TODO: make key id configurable +// TODO: make key id and idle session timeout configurable. const CAMPAIGN: &str = "SELECT pg_try_advisory_lock(1)"; const UNLOCK: &str = "SELECT pg_advisory_unlock(1)"; +const SET_IDLE_SESSION_TIMEOUT: &str = "SET idle_in_transaction_session_timeout = $1"; +// Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive. +// Either the leader reconnects and step down or the session expires and the lock is released. +const IDLE_SESSION_TIMEOUT: &str = "10s"; /// Value with a expire time. The expire time is in seconds since UNIX epoch. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Default)] struct ValueWithLease { value: String, expire_time: f64, } /// Leader key for PostgreSql. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] struct PgLeaderKey { name: Vec, key: Vec, @@ -92,6 +95,12 @@ impl PgElection { client: Client, store_key_prefix: String, ) -> Result { + // Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock. + client + .execute(SET_IDLE_SESSION_TIMEOUT, &[&IDLE_SESSION_TIMEOUT]) + .await + .context(PostgresExecutionSnafu)?; + let (tx, mut rx) = broadcast::channel(100); let leader_ident = leader_value.clone(); let _handle = common_runtime::spawn_global(async move { @@ -177,11 +186,11 @@ impl Election for PgElection { + CANDIDATE_LEASE_SECS as f64, }; let res = self.put_value_with_lease(&key, &value_with_lease).await?; - // May registered before, check if the lease is expired. If not, just renew the lease. + // May registered before, check if the lease expired. If so, delete and re-register. if !res { - let prev = self.get_value_with_lease(&key).await?; + let prev = self.get_value_with_lease(&key).await?.unwrap_or_default(); if prev.expire_time - < time::SystemTime::now() + <= time::SystemTime::now() .duration_since(time::UNIX_EPOCH) .unwrap_or_default() .as_secs_f64() @@ -191,29 +200,34 @@ impl Election for PgElection { } } - // Renew the lease + // Check if the current lease has expired and renew the lease. let mut keep_alive_interval = tokio::time::interval(Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS)); - - let leader_key = PgLeaderKey { - name: self.leader_value.clone().into_bytes(), - key: key.clone(), - // TODO: Add rev and lease information - rev: 0, - lease: 0, - }; loop { let _ = keep_alive_interval.tick().await; - match self.keep_alive(&key, leader_key.clone(), false).await { - Ok(_) => {} - Err(e) => { - warn!(e; "Candidate lease expired, key: {}", self.candidate_key()); - break; + + let prev = self.get_value_with_lease(&key).await?.unwrap_or_default(); + let now = time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64(); + + ensure!( + prev.expire_time > now, + UnexpectedSnafu { + violated: format!( + "Candidate lease expired, key: {:?}", + String::from_utf8_lossy(&key) + ), } - } - } + ); - Ok(()) + let updated = ValueWithLease { + value: prev.value.clone(), + expire_time: now + CANDIDATE_LEASE_SECS as f64, + }; + self.update_value_with_lease(&key, &prev, &updated).await?; + } } async fn all_candidates(&self) -> Result> { @@ -225,7 +239,6 @@ impl Election for PgElection { .as_secs_f64(); // Remove expired candidates candidates.retain(|c| c.expire_time > now); - let mut valid_candidates = Vec::with_capacity(candidates.len()); for c in candidates { let node_info: MetasrvNodeInfo = @@ -237,129 +250,86 @@ impl Election for PgElection { Ok(valid_candidates) } + /// Campaign loop, every metasrv node will: + /// - Try to acquire the advisory lock. + /// - If successful, the current node is the leader. Leader should check the lease: + /// - If newly elected, put the leader key with the lease and notify the leader watcher. + /// - If the lease is not expired, keep the lock and renew the lease. + /// - If the lease expired, delete the key, unlock, step down, try to initiate a new campaign. + /// - If not successful, the current node is a follower. Follower should check the lease: + /// - If the lease expired, delete the key and return, try to initiate a new campaign. + /// Caution: The leader may still hold the advisory lock while the lease is expired. The leader should step down in this case. async fn campaign(&self) -> Result<()> { let key = self.election_key().into_bytes(); - let res = self - .client - .query(CAMPAIGN, &[]) - .await - .context(PostgresExecutionSnafu)?; - let leader_key = PgLeaderKey { - name: self.leader_value.clone().into_bytes(), - key: key.clone(), - // TODO: Add rev and lease information - rev: 0, - lease: 0, - }; - - if let Some(row) = res.first() { - let is_locked: bool = row.get(0); - // If the lock is acquired, then the current node is the leader - if is_locked { - let leader_value_with_lease = ValueWithLease { - value: self.leader_value.clone(), - expire_time: time::SystemTime::now() + let mut keep_alive_interval = + tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS)); + loop { + let _ = keep_alive_interval.tick().await; + let res = self + .client + .query(CAMPAIGN, &[]) + .await + .context(PostgresExecutionSnafu)?; + if let Some(row) = res.first() { + // Successfully acquired the advisory lock, leader branch: + if row.get(0) { + let now = time::SystemTime::now() .duration_since(time::UNIX_EPOCH) .unwrap_or_default() - .as_secs_f64() - + META_LEASE_SECS as f64, - }; - // If the leader value is successfully put, then we start the keep alive loop - if self - .put_value_with_lease(&key, &leader_value_with_lease) - .await? - { - let res = self - .client - .query(UNLOCK, &[]) - .await - .context(PostgresExecutionSnafu)?; - match res.first() { - Some(row) => { - let unlocked: bool = row.get(0); - if !unlocked { - return UnexpectedSnafu { - violated: "Failed to unlock the advisory lock".to_string(), + .as_secs_f64(); + let new_leader_value_with_lease = ValueWithLease { + value: self.leader_value.clone(), + expire_time: now + META_LEASE_SECS as f64, + }; + if self.is_leader() { + // Old leader, renew the lease + match self.get_value_with_lease(&key).await? { + Some(prev) => { + if prev.expire_time <= now { + self.step_down().await?; } - .fail(); + self.update_value_with_lease( + &key, + &prev, + &new_leader_value_with_lease, + ) + .await?; } - } - None => { - return UnexpectedSnafu { - violated: "Failed to unlock the advisory lock".to_string(), + // Deleted by other followers since the lease expired, but still hold the lock. Just step down. + None => { + warn!("Leader lease expired, but still hold the lock. Now stepping down."); + self.step_down().await?; } - .fail(); - } - } - let keep_lease_duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS); - let mut keep_alive_interval = tokio::time::interval(keep_lease_duration); - keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - loop { - match timeout( - keep_lease_duration, - self.keep_alive(&key, leader_key.clone(), true), - ) - .await - { - Ok(Ok(())) => { - let _ = keep_alive_interval.tick().await; - } - Ok(Err(err)) => { - error!(err; "Failed to keep alive"); - break; - } - Err(_) => { - error!("Refresh lease timeout"); - break; - } - } - } - - // Step down - if self - .is_leader - .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) - .is_ok() - { - if let Err(e) = self - .leader_watcher - .send(LeaderChangeMessage::StepDown(Arc::new(leader_key))) - { - error!(e; "Failed to send leader change message"); } + } else { + // Newly elected + self.elected().await?; } + // Follower branch } else { - let res = self - .client - .query(UNLOCK, &[]) - .await - .context(PostgresExecutionSnafu)?; - match res.first() { - Some(row) => { - let unlocked: bool = row.get(0); - if !unlocked { - return UnexpectedSnafu { - violated: "Failed to unlock the advisory lock".to_string(), - } - .fail(); - } - } - None => { - return UnexpectedSnafu { - violated: "Failed to unlock the advisory lock".to_string(), - } - .fail(); + let prev = self.get_value_with_lease(&key).await?.ok_or_else(|| { + UnexpectedSnafu { + violated: "Advisory lock held but leader key not found", } + .build() + })?; + if prev.expire_time + <= time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64() + { + self.delete_value(&key).await?; + return Ok(()); } - // Failed to put the leader value, we fall back to the candidate and check the leadership - self.check_leadership(&key).await?; } } else { - self.check_leadership(&key).await?; + return UnexpectedSnafu { + violated: "Failed to acquire the advisory lock".to_string(), + } + .fail(); } } - - Ok(()) } async fn leader(&self) -> Result { @@ -367,7 +337,10 @@ impl Election for PgElection { Ok(self.leader_value.as_bytes().into()) } else { let key = self.election_key().into_bytes(); - let value_with_lease = self.get_value_with_lease(&key).await?; + let value_with_lease = self + .get_value_with_lease(&key) + .await? + .ok_or_else(|| NoLeaderSnafu {}.build())?; if value_with_lease.expire_time > time::SystemTime::now() .duration_since(time::UNIX_EPOCH) @@ -387,7 +360,7 @@ impl Election for PgElection { } impl PgElection { - async fn get_value_with_lease(&self, key: &Vec) -> Result { + async fn get_value_with_lease(&self, key: &Vec) -> Result> { let prev = self .client .query(POINT_GET, &[&key]) @@ -400,15 +373,9 @@ impl PgElection { serde_json::from_str(&value).with_context(|_| DeserializeFromJsonSnafu { input: format!("{value:?}"), })?; - Ok(value_with_lease) + Ok(Some(value_with_lease)) } else { - UnexpectedSnafu { - violated: format!( - "Failed to get value from key: {:?}", - String::from_utf8_lossy(key) - ), - } - .fail() + Ok(None) } } @@ -501,70 +468,57 @@ impl PgElection { Ok(res.len() == 1) } - async fn keep_alive( - &self, - key: &Vec, - leader: PgLeaderKey, - keep_leader: bool, - ) -> Result<()> { - // Check if the current lease has expired - let prev = self.get_value_with_lease(key).await?; - let now = time::SystemTime::now() - .duration_since(time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs_f64(); - - ensure!( - prev.expire_time > now, - UnexpectedSnafu { - violated: format!( - "Failed to renew lease. Lease expired, key: {:?}", - String::from_utf8_lossy(key) - ), - } - ); - - // Renew the lease - let updated = ValueWithLease { - value: prev.value.clone(), - expire_time: now + CANDIDATE_LEASE_SECS as f64, + /// Step down the leader. The leader should delete the key and notify the leader watcher. + /// Do not check if the deletion is successful, since the key may be deleted by other followers. + /// Caution: Should only step down while holding the advisory lock. + async fn step_down(&self) -> Result<()> { + let key = self.election_key().into_bytes(); + let leader_key = PgLeaderKey { + name: self.leader_value.clone().into_bytes(), + key: key.clone(), + ..Default::default() }; - self.update_value_with_lease(key, &prev, &updated).await?; - - if keep_leader - && self - .is_leader - .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) - .is_ok() + if self + .is_leader + .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() { - self.infancy.store(true, Ordering::Relaxed); - - if let Err(e) = self - .leader_watcher - .send(LeaderChangeMessage::Elected(Arc::new(leader))) - { - error!(e; "Failed to send leader change message"); - } + self.delete_value(&key).await?; + self.client + .query(UNLOCK, &[]) + .await + .context(PostgresExecutionSnafu)?; + self.leader_watcher + .send(LeaderChangeMessage::StepDown(Arc::new(leader_key))) + .context(SendLeaderChangeSnafu)?; } Ok(()) } - // Check if the leader is still valid - async fn check_leadership(&self, key: &Vec) -> Result<()> { - let check_interval = Duration::from_secs(META_LEASE_SECS); - let mut check_interval = tokio::time::interval(check_interval); - loop { - let _ = check_interval.tick().await; - let leader_value_with_lease = self.get_value_with_lease(key).await?; - let now = time::SystemTime::now() - .duration_since(time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs_f64(); - if leader_value_with_lease.expire_time <= now { - // Invalidate previous leader - self.delete_value(key).await?; - return Ok(()); - } - } + /// Elected as leader. The leader should put the key and notify the leader watcher. + /// Caution: Should only elected while holding the advisory lock. + async fn elected(&self) -> Result<()> { + let key = self.election_key().into_bytes(); + let leader_key = PgLeaderKey { + name: self.leader_value.clone().into_bytes(), + key: key.clone(), + ..Default::default() + }; + self.put_value_with_lease( + &key, + &ValueWithLease { + value: self.leader_value.clone(), + expire_time: time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64() + + META_LEASE_SECS as f64, + }, + ) + .await?; + self.leader_watcher + .send(LeaderChangeMessage::Elected(Arc::new(leader_key))) + .context(SendLeaderChangeSnafu)?; + Ok(()) } } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 2c99c5e1b235..31a63ce5293c 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -24,6 +24,7 @@ use table::metadata::TableId; use tokio::sync::mpsc::error::SendError; use tonic::codegen::http; +use crate::election::LeaderChangeMessage; use crate::metasrv::SelectTarget; use crate::pubsub::Message; @@ -718,6 +719,14 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to send leader change message"))] + SendLeaderChange { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: tokio::sync::broadcast::error::SendError, + }, } impl Error { @@ -760,6 +769,7 @@ impl ErrorExt for Error { | Error::StartGrpc { .. } | Error::NoEnoughAvailableNode { .. } | Error::PublishMessage { .. } + | Error::SendLeaderChange { .. } | Error::Join { .. } | Error::PeerUnavailable { .. } | Error::ExceededDeadline { .. }