From bb91eb52478a6f486c4a542cf847bb45ad47772a Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Thu, 19 Dec 2024 11:40:01 +0800 Subject: [PATCH 1/9] feat: init PgElection --- src/common/meta/src/kv_backend/postgres.rs | 8 +- src/meta-srv/src/bootstrap.rs | 12 +- src/meta-srv/src/election.rs | 19 +- src/meta-srv/src/election/etcd.rs | 40 +- src/meta-srv/src/election/postgres.rs | 492 +++++++++++++++++++++ src/meta-srv/src/error.rs | 2 + 6 files changed, 553 insertions(+), 20 deletions(-) create mode 100644 src/meta-srv/src/election/postgres.rs diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs index 8add65cd49c4..d55e0b8a1505 100644 --- a/src/common/meta/src/kv_backend/postgres.rs +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -45,9 +45,9 @@ const METADKV_CREATION: &str = const FULL_TABLE_SCAN: &str = "SELECT k, v FROM greptime_metakv $1 ORDER BY K"; -const POINT_GET: &str = "SELECT k, v FROM greptime_metakv WHERE k = $1"; +pub const POINT_GET: &str = "SELECT k, v FROM greptime_metakv WHERE k = $1"; -const PREFIX_SCAN: &str = "SELECT k, v FROM greptime_metakv WHERE k LIKE $1 ORDER BY K"; +pub const PREFIX_SCAN: &str = "SELECT k, v FROM greptime_metakv WHERE k LIKE $1 ORDER BY K"; const RANGE_SCAN_LEFT_BOUNDED: &str = "SELECT k, v FROM greptime_metakv WHERE k >= $1 ORDER BY K"; @@ -65,7 +65,7 @@ const RANGE_DELETE_LEFT_BOUNDED: &str = "DELETE FROM greptime_metakv WHERE k >= const RANGE_DELETE_FULL_RANGE: &str = "DELETE FROM greptime_metakv WHERE k >= $1 AND K < $2 RETURNING k,v;"; -const CAS: &str = r#" +pub const CAS: &str = r#" WITH prev AS ( SELECT k,v FROM greptime_metakv WHERE k = $1 AND v = $2 ), update AS ( @@ -79,7 +79,7 @@ WHERE SELECT k, v FROM prev; "#; -const PUT_IF_NOT_EXISTS: &str = r#" +pub const PUT_IF_NOT_EXISTS: &str = r#" WITH prev AS ( select k,v from greptime_metakv where k = $1 ), insert AS ( diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 85770e1f3d4d..fdc75ec330ed 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -46,6 +46,8 @@ use tonic::transport::server::{Router, TcpIncoming}; use crate::election::etcd::EtcdElection; #[cfg(feature = "pg_kvbackend")] +use crate::election::postgres::PgElection; +#[cfg(feature = "pg_kvbackend")] use crate::error::InvalidArgumentsSnafu; use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu}; use crate::metasrv::builder::MetasrvBuilder; @@ -225,8 +227,14 @@ pub async fn metasrv_builder( (None, BackendImpl::PostgresStore) => { let pg_client = create_postgres_client(opts).await?; let kv_backend = PgStore::with_pg_client(pg_client).await.unwrap(); - // TODO(jeremy, weny): implement election for postgres - (kv_backend, None) + let election_client = create_postgres_client(opts).await?; + let election = PgElection::with_pg_client( + opts.server_addr.clone(), + election_client, + opts.store_key_prefix.clone(), + ) + .await?; + (kv_backend, Some(election)) } }; diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index d73f453a88f0..4db7814a2846 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -13,11 +13,12 @@ // limitations under the License. pub mod etcd; +#[cfg(feature = "pg_kvbackend")] +pub mod postgres; -use std::fmt; +use std::fmt::{self, Debug}; use std::sync::Arc; -use etcd_client::LeaderKey; use tokio::sync::broadcast::Receiver; use crate::error::Result; @@ -26,10 +27,20 @@ use crate::metasrv::MetasrvNodeInfo; pub const ELECTION_KEY: &str = "__metasrv_election"; pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/"; +const CANDIDATE_LEASE_SECS: u64 = 600; +const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2; + #[derive(Debug, Clone)] pub enum LeaderChangeMessage { - Elected(Arc), - StepDown(Arc), + Elected(Arc), + StepDown(Arc), +} + +pub trait LeaderKey: Send + Sync + Debug { + fn name(&self) -> &[u8]; + fn key(&self) -> &[u8]; + fn rev(&self) -> i64; + fn lease(&self) -> i64; } impl fmt::Display for LeaderChangeMessage { diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index fef7e928a783..60dcadc65f66 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -18,18 +18,41 @@ use std::time::Duration; use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS}; use common_telemetry::{error, info, warn}; -use etcd_client::{Client, GetOptions, LeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions}; +use etcd_client::{ + Client, GetOptions, LeaderKey as EtcdLeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions, +}; use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::Receiver; use tokio::time::{timeout, MissedTickBehavior}; -use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY}; +use crate::election::{ + Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY, + KEEP_ALIVE_INTERVAL_SECS, +}; use crate::error; use crate::error::Result; use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; +impl LeaderKey for EtcdLeaderKey { + fn name(&self) -> &[u8] { + self.name() + } + + fn key(&self) -> &[u8] { + self.key() + } + + fn rev(&self) -> i64 { + self.rev() + } + + fn lease(&self) -> i64 { + self.lease() + } +} + pub struct EtcdElection { leader_value: String, client: Client, @@ -75,14 +98,14 @@ impl EtcdElection { LeaderChangeMessage::Elected(key) => { info!( "[{leader_ident}] is elected as leader: {:?}, lease: {}", - key.name_str(), + String::from_utf8_lossy(key.name()), key.lease() ); } LeaderChangeMessage::StepDown(key) => { warn!( "[{leader_ident}] is stepping down: {:?}, lease: {}", - key.name_str(), + String::from_utf8_lossy(key.name()), key.lease() ); } @@ -133,9 +156,6 @@ impl Election for EtcdElection { } async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> { - const CANDIDATE_LEASE_SECS: u64 = 600; - const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2; - let mut lease_client = self.client.lease_client(); let res = lease_client .grant(CANDIDATE_LEASE_SECS as i64, None) @@ -239,7 +259,7 @@ impl Election for EtcdElection { // The keep alive operation MUST be done in `META_KEEP_ALIVE_INTERVAL_SECS`. match timeout( keep_lease_duration, - self.keep_alive(&mut keeper, &mut receiver, leader), + self.keep_alive(&mut keeper, &mut receiver, leader.clone()), ) .await { @@ -303,7 +323,7 @@ impl EtcdElection { &self, keeper: &mut LeaseKeeper, receiver: &mut LeaseKeepAliveStream, - leader: &LeaderKey, + leader: EtcdLeaderKey, ) -> Result<()> { keeper.keep_alive().await.context(error::EtcdFailedSnafu)?; if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? { @@ -324,7 +344,7 @@ impl EtcdElection { if let Err(e) = self .leader_watcher - .send(LeaderChangeMessage::Elected(Arc::new(leader.clone()))) + .send(LeaderChangeMessage::Elected(Arc::new(leader))) { error!(e; "Failed to send leader change message"); } diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs new file mode 100644 index 000000000000..12510eb84b10 --- /dev/null +++ b/src/meta-srv/src/election/postgres.rs @@ -0,0 +1,492 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::{self, Duration}; + +use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS}; +use common_meta::kv_backend::postgres::{CAS, POINT_GET, PREFIX_SCAN, PUT_IF_NOT_EXISTS}; +use common_telemetry::{error, info, warn}; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, ResultExt}; +use tokio::sync::broadcast; +use tokio::sync::broadcast::error::RecvError; +use tokio::time::{timeout, MissedTickBehavior}; +use tokio_postgres::Client; + +use crate::election::{ + Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY, + KEEP_ALIVE_INTERVAL_SECS, +}; +use crate::error::{ + DeserializeFromJsonSnafu, NoLeaderSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu, + UnexpectedSnafu, +}; +use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; + +const CAMPAIGN: &str = "SELECT pg_try_advisory_lock(1)"; + +#[derive(Debug, Serialize, Deserialize)] +struct ValueWithLease { + value: String, + expire_time: f64, +} + +#[derive(Debug, Clone)] +struct PgLeaderKey { + name: Vec, + key: Vec, + rev: i64, + lease: i64, +} + +impl LeaderKey for PgLeaderKey { + fn name(&self) -> &[u8] { + &self.name + } + + fn key(&self) -> &[u8] { + &self.key + } + + fn rev(&self) -> i64 { + self.rev + } + + fn lease(&self) -> i64 { + self.lease + } +} + +pub struct PgElection { + leader_value: String, + client: Client, + is_leader: AtomicBool, + infancy: AtomicBool, + leader_watcher: broadcast::Sender, + store_key_prefix: String, +} + +impl PgElection { + pub async fn with_pg_client( + leader_value: String, + client: Client, + store_key_prefix: String, + ) -> Result { + let (tx, mut rx) = broadcast::channel(100); + let leader_ident = leader_value.clone(); + let _handle = common_runtime::spawn_global(async move { + loop { + match rx.recv().await { + Ok(msg) => match msg { + LeaderChangeMessage::Elected(key) => { + info!( + "[{leader_ident}] is elected as leader: {:?}, lease: {}", + key.name(), + key.lease() + ); + } + LeaderChangeMessage::StepDown(key) => { + warn!( + "[{leader_ident}] is stepping down: {:?}, lease: {}", + key.name(), + key.lease() + ); + } + }, + Err(RecvError::Lagged(_)) => { + warn!("Log printing is too slow or leader changed too fast!"); + } + Err(RecvError::Closed) => break, + } + } + }); + + Ok(Arc::new(Self { + leader_value, + client, + is_leader: AtomicBool::new(false), + infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix, + })) + } + + fn election_key(&self) -> String { + format!("{}{}", self.store_key_prefix, ELECTION_KEY) + } + + fn candidate_root(&self) -> String { + format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT) + } + + fn candidate_key(&self) -> String { + format!("{}{}", self.candidate_root(), self.leader_value) + } +} + +#[async_trait::async_trait] +impl Election for PgElection { + type Leader = LeaderValue; + + fn subscribe_leader_change(&self) -> broadcast::Receiver { + self.leader_watcher.subscribe() + } + + fn is_leader(&self) -> bool { + self.is_leader.load(Ordering::Relaxed) + } + + fn in_infancy(&self) -> bool { + self.infancy + .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + } + + async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> { + let key = self.candidate_key().into_bytes(); + let node_info = + serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu { + input: format!("{node_info:?}"), + })?; + let value_with_lease = ValueWithLease { + value: node_info, + expire_time: time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64() + + CANDIDATE_LEASE_SECS as f64, + }; + self.put_value_with_lease(&key, &value_with_lease).await?; + + let mut keep_alive_interval = + tokio::time::interval(Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS)); + + let leader_key = PgLeaderKey { + name: self.leader_value.clone().into_bytes(), + key: key.clone(), + // TODO: Add rev and lease information + rev: 0, + lease: 0, + }; + loop { + let _ = keep_alive_interval.tick().await; + match self.keep_alive(&key, leader_key.clone(), false).await { + Ok(_) => {} + Err(e) => { + warn!(e; "Candidate lease expired, key: {}", self.candidate_key()); + break; + } + } + } + + Ok(()) + } + + async fn all_candidates(&self) -> Result> { + let key_prefix = self.candidate_root().into_bytes(); + let mut candidates = self.get_value_with_lease_by_prefix(&key_prefix).await?; + let now = time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64(); + // Remove expired candidates + candidates.retain(|c| c.expire_time > now); + + let mut valid_candidates = Vec::with_capacity(candidates.len()); + for c in candidates { + let node_info: MetasrvNodeInfo = + serde_json::from_str(&c.value).with_context(|_| DeserializeFromJsonSnafu { + input: format!("{:?}", c.value), + })?; + valid_candidates.push(node_info); + } + Ok(valid_candidates) + } + + async fn campaign(&self) -> Result<()> { + let key = self.election_key().into_bytes(); + let res = self + .client + .query(CAMPAIGN, &[]) + .await + .context(PostgresExecutionSnafu)?; + let leader_key = PgLeaderKey { + name: self.leader_value.clone().into_bytes(), + key: key.clone(), + // TODO: Add rev and lease information + rev: 0, + lease: 0, + }; + + if let Some(row) = res.first() { + let is_locked: bool = row.get(0); + // If the lock is acquired, then the current node is the leader + if is_locked { + let leader_value_with_lease = ValueWithLease { + value: self.leader_value.clone(), + expire_time: time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64() + + META_LEASE_SECS as f64, + }; + self.put_value_with_lease(&key, &leader_value_with_lease) + .await?; + + let keep_lease_duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS); + let mut keep_alive_interval = tokio::time::interval(keep_lease_duration); + keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + loop { + match timeout( + keep_lease_duration, + self.keep_alive(&key, leader_key.clone(), true), + ) + .await + { + Ok(Ok(())) => { + let _ = keep_alive_interval.tick().await; + } + Ok(Err(err)) => { + error!(err; "Failed to keep alive"); + break; + } + Err(_) => { + error!("Refresh lease timeout"); + break; + } + } + } + + // Step down + if self + .is_leader + .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + if let Err(e) = self + .leader_watcher + .send(LeaderChangeMessage::StepDown(Arc::new(leader_key))) + { + error!(e; "Failed to send leader change message"); + } + } + } else { + // Not the leader, we check if the leader is still alive + let check_interval = Duration::from_secs(META_LEASE_SECS); + let mut check_interval = tokio::time::interval(check_interval); + loop { + let _ = check_interval.tick().await; + let leader_value_with_lease = self.get_value_with_lease(&key).await?; + let now = time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64(); + // If the leader is expired, we re-initiate the campaign + if leader_value_with_lease.expire_time <= now { + break; + } + } + } + } + + Ok(()) + } + + async fn leader(&self) -> Result { + if self.is_leader.load(Ordering::Relaxed) { + Ok(self.leader_value.as_bytes().into()) + } else { + let key = self.election_key().into_bytes(); + let value_with_lease = self.get_value_with_lease(&key).await?; + if value_with_lease.expire_time + > time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64() + { + Ok(value_with_lease.value.into()) + } else { + return NoLeaderSnafu {}.fail(); + } + } + } + + async fn resign(&self) -> Result<()> { + todo!() + } +} + +impl PgElection { + async fn get_value_with_lease(&self, key: &Vec) -> Result { + let prev = self + .client + .query(POINT_GET, &[&key]) + .await + .context(PostgresExecutionSnafu)?; + + if let Some(row) = prev.first() { + let value: String = row.get(0); + let value_with_lease: ValueWithLease = + serde_json::from_str(&value).with_context(|_| DeserializeFromJsonSnafu { + input: format!("{value:?}"), + })?; + Ok(value_with_lease) + } else { + UnexpectedSnafu { + violated: format!( + "Failed to get value from key: {:?}", + String::from_utf8_lossy(key) + ), + } + .fail() + } + } + + async fn get_value_with_lease_by_prefix( + &self, + key_prefix: &Vec, + ) -> Result> { + let prev = self + .client + .query(PREFIX_SCAN, &[key_prefix]) + .await + .context(PostgresExecutionSnafu)?; + + let mut res = Vec::new(); + for row in prev { + let value: String = row.get(0); + let value_with_lease: ValueWithLease = + serde_json::from_str(&value).with_context(|_| DeserializeFromJsonSnafu { + input: format!("{value:?}"), + })?; + res.push(value_with_lease); + } + + Ok(res) + } + + async fn update_value_with_lease( + &self, + key: &Vec, + prev: &ValueWithLease, + updated: &ValueWithLease, + ) -> Result<()> { + let prev = serde_json::to_string(prev) + .with_context(|_| SerializeToJsonSnafu { + input: format!("{prev:?}"), + })? + .into_bytes(); + + let updated = serde_json::to_string(updated) + .with_context(|_| SerializeToJsonSnafu { + input: format!("{updated:?}"), + })? + .into_bytes(); + + let res = self + .client + .query(CAS, &[key, &prev, &updated]) + .await + .context(PostgresExecutionSnafu)?; + + // CAS operation will return the updated value if the operation is successful + match res.is_empty() { + false => Ok(()), + true => UnexpectedSnafu { + violated: format!( + "Failed to update value from key: {:?}", + String::from_utf8_lossy(key) + ), + } + .fail(), + } + } + + // Returns `true` if the insertion is successful + async fn put_value_with_lease(&self, key: &Vec, value: &ValueWithLease) -> Result<()> { + let value = serde_json::to_string(value) + .with_context(|_| SerializeToJsonSnafu { + input: format!("{value:?}"), + })? + .into_bytes(); + + let res = self + .client + .query(PUT_IF_NOT_EXISTS, &[key, &value]) + .await + .context(PostgresExecutionSnafu)?; + + ensure!( + res.is_empty(), + UnexpectedSnafu { + violated: format!( + "Failed to insert value from key: {:?}", + String::from_utf8_lossy(key) + ), + } + ); + + Ok(()) + } + + async fn keep_alive( + &self, + key: &Vec, + leader: PgLeaderKey, + keep_leader: bool, + ) -> Result<()> { + // Check if the current lease has expired + let prev = self.get_value_with_lease(key).await?; + let now = time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64(); + + ensure!( + prev.expire_time > now, + UnexpectedSnafu { + violated: format!( + "Failed to renew lease. Lease expired, key: {:?}", + String::from_utf8_lossy(key) + ), + } + ); + + // Renew the lease + let updated = ValueWithLease { + value: prev.value.clone(), + expire_time: now + CANDIDATE_LEASE_SECS as f64, + }; + self.update_value_with_lease(key, &prev, &updated).await?; + + if keep_leader + && self + .is_leader + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + self.infancy.store(true, Ordering::Relaxed); + + if let Err(e) = self + .leader_watcher + .send(LeaderChangeMessage::Elected(Arc::new(leader))) + { + error!(e; "Failed to send leader change message"); + } + } + Ok(()) + } +} diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 705f31ac49f4..2c99c5e1b235 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -697,6 +697,8 @@ pub enum Error { #[cfg(feature = "pg_kvbackend")] #[snafu(display("Failed to execute via postgres"))] PostgresExecution { + #[snafu(source)] + error: tokio_postgres::Error, #[snafu(implicit)] location: Location, }, From 903865dca1c0fae36bc75a44812d6638a2ce1cc3 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Thu, 19 Dec 2024 12:08:03 +0800 Subject: [PATCH 2/9] fix: release advisory lock --- src/meta-srv/src/election/postgres.rs | 140 +++++++++++++++----------- 1 file changed, 81 insertions(+), 59 deletions(-) diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 12510eb84b10..2f88794d2b6c 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -37,6 +37,7 @@ use crate::error::{ use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; const CAMPAIGN: &str = "SELECT pg_try_advisory_lock(1)"; +const UNLOCK: &str = "SELECT pg_advisory_unlock(1)"; #[derive(Debug, Serialize, Deserialize)] struct ValueWithLease { @@ -243,62 +244,76 @@ impl Election for PgElection { .as_secs_f64() + META_LEASE_SECS as f64, }; - self.put_value_with_lease(&key, &leader_value_with_lease) - .await?; - - let keep_lease_duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS); - let mut keep_alive_interval = tokio::time::interval(keep_lease_duration); - keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - loop { - match timeout( - keep_lease_duration, - self.keep_alive(&key, leader_key.clone(), true), - ) - .await - { - Ok(Ok(())) => { - let _ = keep_alive_interval.tick().await; + // If the leader value is successfully put, then we start the keep alive loop + if self + .put_value_with_lease(&key, &leader_value_with_lease) + .await? + { + let res = self + .client + .query(UNLOCK, &[]) + .await + .context(PostgresExecutionSnafu)?; + match res.first() { + Some(row) => { + let unlocked: bool = row.get(0); + if !unlocked { + return UnexpectedSnafu { + violated: "Failed to unlock the advisory lock".to_string(), + } + .fail(); + } } - Ok(Err(err)) => { - error!(err; "Failed to keep alive"); - break; + None => { + return UnexpectedSnafu { + violated: "Failed to unlock the advisory lock".to_string(), + } + .fail(); } - Err(_) => { - error!("Refresh lease timeout"); - break; + } + let keep_lease_duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS); + let mut keep_alive_interval = tokio::time::interval(keep_lease_duration); + keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + loop { + match timeout( + keep_lease_duration, + self.keep_alive(&key, leader_key.clone(), true), + ) + .await + { + Ok(Ok(())) => { + let _ = keep_alive_interval.tick().await; + } + Ok(Err(err)) => { + error!(err; "Failed to keep alive"); + break; + } + Err(_) => { + error!("Refresh lease timeout"); + break; + } } } - } - // Step down - if self - .is_leader - .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) - .is_ok() - { - if let Err(e) = self - .leader_watcher - .send(LeaderChangeMessage::StepDown(Arc::new(leader_key))) + // Step down + if self + .is_leader + .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() { - error!(e; "Failed to send leader change message"); + if let Err(e) = self + .leader_watcher + .send(LeaderChangeMessage::StepDown(Arc::new(leader_key))) + { + error!(e; "Failed to send leader change message"); + } } + } else { + // Failed to put the leader value, we fall back to the candidate and check the leadership + self.check_leadership(&key).await?; } } else { - // Not the leader, we check if the leader is still alive - let check_interval = Duration::from_secs(META_LEASE_SECS); - let mut check_interval = tokio::time::interval(check_interval); - loop { - let _ = check_interval.tick().await; - let leader_value_with_lease = self.get_value_with_lease(&key).await?; - let now = time::SystemTime::now() - .duration_since(time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs_f64(); - // If the leader is expired, we re-initiate the campaign - if leader_value_with_lease.expire_time <= now { - break; - } - } + self.check_leadership(&key).await?; } } @@ -416,7 +431,7 @@ impl PgElection { } // Returns `true` if the insertion is successful - async fn put_value_with_lease(&self, key: &Vec, value: &ValueWithLease) -> Result<()> { + async fn put_value_with_lease(&self, key: &Vec, value: &ValueWithLease) -> Result { let value = serde_json::to_string(value) .with_context(|_| SerializeToJsonSnafu { input: format!("{value:?}"), @@ -429,17 +444,7 @@ impl PgElection { .await .context(PostgresExecutionSnafu)?; - ensure!( - res.is_empty(), - UnexpectedSnafu { - violated: format!( - "Failed to insert value from key: {:?}", - String::from_utf8_lossy(key) - ), - } - ); - - Ok(()) + Ok(res.is_empty()) } async fn keep_alive( @@ -489,4 +494,21 @@ impl PgElection { } Ok(()) } + + // Check if the leader is still valid + async fn check_leadership(&self, key: &Vec) -> Result<()> { + let check_interval = Duration::from_secs(META_LEASE_SECS); + let mut check_interval = tokio::time::interval(check_interval); + loop { + let _ = check_interval.tick().await; + let leader_value_with_lease = self.get_value_with_lease(key).await?; + let now = time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64(); + if leader_value_with_lease.expire_time <= now { + return Ok(()); + } + } + } } From 7dbfb5a047af2b2d4eaefad66efacc614789b3d9 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Thu, 19 Dec 2024 12:24:46 +0800 Subject: [PATCH 3/9] fix: handle duplicate keys --- src/common/meta/src/kv_backend/postgres.rs | 2 +- src/meta-srv/src/election/postgres.rs | 36 ++++++++++++++++++++-- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs index d55e0b8a1505..23c506761a9d 100644 --- a/src/common/meta/src/kv_backend/postgres.rs +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -56,7 +56,7 @@ const RANGE_SCAN_FULL_RANGE: &str = const FULL_TABLE_DELETE: &str = "DELETE FROM greptime_metakv RETURNING k,v"; -const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE K = $1 RETURNING k,v;"; +pub const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE K = $1 RETURNING k,v;"; const PREFIX_DELETE: &str = "DELETE FROM greptime_metakv WHERE k LIKE $1 RETURNING k,v;"; diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 2f88794d2b6c..485e9ac0c042 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -17,7 +17,9 @@ use std::sync::Arc; use std::time::{self, Duration}; use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS}; -use common_meta::kv_backend::postgres::{CAS, POINT_GET, PREFIX_SCAN, PUT_IF_NOT_EXISTS}; +use common_meta::kv_backend::postgres::{ + CAS, POINT_DELETE, POINT_GET, PREFIX_SCAN, PUT_IF_NOT_EXISTS, +}; use common_telemetry::{error, info, warn}; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; @@ -170,8 +172,22 @@ impl Election for PgElection { .as_secs_f64() + CANDIDATE_LEASE_SECS as f64, }; - self.put_value_with_lease(&key, &value_with_lease).await?; + let res = self.put_value_with_lease(&key, &value_with_lease).await?; + // May registered before, check if the lease is expired. If not, just renew the lease. + if !res { + let prev = self.get_value_with_lease(&key).await?; + if prev.expire_time + < time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64() + { + self.delete_value(&key).await?; + self.put_value_with_lease(&key, &value_with_lease).await?; + } + } + // Renew the lease let mut keep_alive_interval = tokio::time::interval(Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS)); @@ -430,7 +446,7 @@ impl PgElection { } } - // Returns `true` if the insertion is successful + /// Returns `true` if the insertion is successful async fn put_value_with_lease(&self, key: &Vec, value: &ValueWithLease) -> Result { let value = serde_json::to_string(value) .with_context(|_| SerializeToJsonSnafu { @@ -447,6 +463,18 @@ impl PgElection { Ok(res.is_empty()) } + /// Returns `true` if the deletion is successful. + /// Caution: Should only delete the key if the lease is expired. + async fn delete_value(&self, key: &Vec) -> Result { + let res = self + .client + .query(POINT_DELETE, &[key]) + .await + .context(PostgresExecutionSnafu)?; + + Ok(res.len() == 1) + } + async fn keep_alive( &self, key: &Vec, @@ -507,6 +535,8 @@ impl PgElection { .unwrap_or_default() .as_secs_f64(); if leader_value_with_lease.expire_time <= now { + // Invalidate preivous leader + self.delete_value(key).await?; return Ok(()); } } From a059fa11894397ba371b472b5e1d822134b93dab Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Thu, 19 Dec 2024 12:35:52 +0800 Subject: [PATCH 4/9] chore: update comments --- src/meta-srv/src/election/postgres.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 485e9ac0c042..daaeac44dd39 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -41,12 +41,14 @@ use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; const CAMPAIGN: &str = "SELECT pg_try_advisory_lock(1)"; const UNLOCK: &str = "SELECT pg_advisory_unlock(1)"; +/// Value with a expire time. The expire time is in seconds since UNIX epoch. #[derive(Debug, Serialize, Deserialize)] struct ValueWithLease { value: String, expire_time: f64, } +/// Leader key for PostgreSql. #[derive(Debug, Clone)] struct PgLeaderKey { name: Vec, @@ -73,6 +75,7 @@ impl LeaderKey for PgLeaderKey { } } +/// PostgreSql implementation of Election. pub struct PgElection { leader_value: String, client: Client, @@ -535,7 +538,7 @@ impl PgElection { .unwrap_or_default() .as_secs_f64(); if leader_value_with_lease.expire_time <= now { - // Invalidate preivous leader + // Invalidate previous leader self.delete_value(key).await?; return Ok(()); } From 0d8b8bcfc6e6a154f341e881f6b70d3456983c06 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Thu, 19 Dec 2024 12:44:26 +0800 Subject: [PATCH 5/9] fix: unlock if acquired the lock --- src/meta-srv/src/election/postgres.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index daaeac44dd39..1ee386718b00 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -328,6 +328,28 @@ impl Election for PgElection { } } } else { + let res = self + .client + .query(UNLOCK, &[]) + .await + .context(PostgresExecutionSnafu)?; + match res.first() { + Some(row) => { + let unlocked: bool = row.get(0); + if !unlocked { + return UnexpectedSnafu { + violated: "Failed to unlock the advisory lock".to_string(), + } + .fail(); + } + } + None => { + return UnexpectedSnafu { + violated: "Failed to unlock the advisory lock".to_string(), + } + .fail(); + } + } // Failed to put the leader value, we fall back to the candidate and check the leadership self.check_leadership(&key).await?; } From 22c628fca2a8ae832f99c74c6ae1c427484d6e29 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Thu, 19 Dec 2024 15:28:33 +0800 Subject: [PATCH 6/9] chore: add TODO and avoid unwrap --- src/meta-srv/src/bootstrap.rs | 4 +++- src/meta-srv/src/election/postgres.rs | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index fdc75ec330ed..4d12caa41fec 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -226,7 +226,9 @@ pub async fn metasrv_builder( #[cfg(feature = "pg_kvbackend")] (None, BackendImpl::PostgresStore) => { let pg_client = create_postgres_client(opts).await?; - let kv_backend = PgStore::with_pg_client(pg_client).await.unwrap(); + let kv_backend = PgStore::with_pg_client(pg_client) + .await + .context(error::KvBackendSnafu)?; let election_client = create_postgres_client(opts).await?; let election = PgElection::with_pg_client( opts.server_addr.clone(), diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 1ee386718b00..61d96717c035 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -38,6 +38,7 @@ use crate::error::{ }; use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; +// TODO: make key id configurable const CAMPAIGN: &str = "SELECT pg_try_advisory_lock(1)"; const UNLOCK: &str = "SELECT pg_advisory_unlock(1)"; From 5074686e811f00f83a17556792b7e8e0f7b688d2 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Thu, 19 Dec 2024 20:13:19 +0800 Subject: [PATCH 7/9] refactor: check both lock and expire time, add more comments --- src/meta-srv/src/election/etcd.rs | 14 +- src/meta-srv/src/election/postgres.rs | 362 +++++++++++--------------- src/meta-srv/src/error.rs | 10 + 3 files changed, 172 insertions(+), 214 deletions(-) diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index 60dcadc65f66..274a6a7e4792 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -282,12 +282,9 @@ impl Election for EtcdElection { .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) .is_ok() { - if let Err(e) = self - .leader_watcher + self.leader_watcher .send(LeaderChangeMessage::StepDown(Arc::new(leader.clone()))) - { - error!(e; "Failed to send leader change message"); - } + .context(error::SendLeaderChangeSnafu)?; } } @@ -342,12 +339,9 @@ impl EtcdElection { { self.infancy.store(true, Ordering::Relaxed); - if let Err(e) = self - .leader_watcher + self.leader_watcher .send(LeaderChangeMessage::Elected(Arc::new(leader))) - { - error!(e; "Failed to send leader change message"); - } + .context(error::SendLeaderChangeSnafu)?; } } diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 61d96717c035..85c03b721603 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -20,12 +20,11 @@ use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, MET use common_meta::kv_backend::postgres::{ CAS, POINT_DELETE, POINT_GET, PREFIX_SCAN, PUT_IF_NOT_EXISTS, }; -use common_telemetry::{error, info, warn}; +use common_telemetry::{info, warn}; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; -use tokio::time::{timeout, MissedTickBehavior}; use tokio_postgres::Client; use crate::election::{ @@ -33,24 +32,28 @@ use crate::election::{ KEEP_ALIVE_INTERVAL_SECS, }; use crate::error::{ - DeserializeFromJsonSnafu, NoLeaderSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu, - UnexpectedSnafu, + DeserializeFromJsonSnafu, NoLeaderSnafu, PostgresExecutionSnafu, Result, SendLeaderChangeSnafu, + SerializeToJsonSnafu, UnexpectedSnafu, }; use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; -// TODO: make key id configurable +// TODO: make key id and idle session timeout configurable. const CAMPAIGN: &str = "SELECT pg_try_advisory_lock(1)"; const UNLOCK: &str = "SELECT pg_advisory_unlock(1)"; +const SET_IDLE_SESSION_TIMEOUT: &str = "SET idle_in_transaction_session_timeout = $1"; +// Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive. +// Either the leader reconnects and step down or the session expires and the lock is released. +const IDLE_SESSION_TIMEOUT: &str = "10s"; /// Value with a expire time. The expire time is in seconds since UNIX epoch. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Default)] struct ValueWithLease { value: String, expire_time: f64, } /// Leader key for PostgreSql. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] struct PgLeaderKey { name: Vec, key: Vec, @@ -92,6 +95,12 @@ impl PgElection { client: Client, store_key_prefix: String, ) -> Result { + // Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock. + client + .execute(SET_IDLE_SESSION_TIMEOUT, &[&IDLE_SESSION_TIMEOUT]) + .await + .context(PostgresExecutionSnafu)?; + let (tx, mut rx) = broadcast::channel(100); let leader_ident = leader_value.clone(); let _handle = common_runtime::spawn_global(async move { @@ -177,11 +186,11 @@ impl Election for PgElection { + CANDIDATE_LEASE_SECS as f64, }; let res = self.put_value_with_lease(&key, &value_with_lease).await?; - // May registered before, check if the lease is expired. If not, just renew the lease. + // May registered before, check if the lease expired. If so, delete and re-register. if !res { - let prev = self.get_value_with_lease(&key).await?; + let prev = self.get_value_with_lease(&key).await?.unwrap_or_default(); if prev.expire_time - < time::SystemTime::now() + <= time::SystemTime::now() .duration_since(time::UNIX_EPOCH) .unwrap_or_default() .as_secs_f64() @@ -191,29 +200,34 @@ impl Election for PgElection { } } - // Renew the lease + // Check if the current lease has expired and renew the lease. let mut keep_alive_interval = tokio::time::interval(Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS)); - - let leader_key = PgLeaderKey { - name: self.leader_value.clone().into_bytes(), - key: key.clone(), - // TODO: Add rev and lease information - rev: 0, - lease: 0, - }; loop { let _ = keep_alive_interval.tick().await; - match self.keep_alive(&key, leader_key.clone(), false).await { - Ok(_) => {} - Err(e) => { - warn!(e; "Candidate lease expired, key: {}", self.candidate_key()); - break; + + let prev = self.get_value_with_lease(&key).await?.unwrap_or_default(); + let now = time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64(); + + ensure!( + prev.expire_time > now, + UnexpectedSnafu { + violated: format!( + "Candidate lease expired, key: {:?}", + String::from_utf8_lossy(&key) + ), } - } - } + ); - Ok(()) + let updated = ValueWithLease { + value: prev.value.clone(), + expire_time: now + CANDIDATE_LEASE_SECS as f64, + }; + self.update_value_with_lease(&key, &prev, &updated).await?; + } } async fn all_candidates(&self) -> Result> { @@ -225,7 +239,6 @@ impl Election for PgElection { .as_secs_f64(); // Remove expired candidates candidates.retain(|c| c.expire_time > now); - let mut valid_candidates = Vec::with_capacity(candidates.len()); for c in candidates { let node_info: MetasrvNodeInfo = @@ -237,129 +250,86 @@ impl Election for PgElection { Ok(valid_candidates) } + /// Campaign loop, every metasrv node will: + /// - Try to acquire the advisory lock. + /// - If successful, the current node is the leader. Leader should check the lease: + /// - If newly elected, put the leader key with the lease and notify the leader watcher. + /// - If the lease is not expired, keep the lock and renew the lease. + /// - If the lease expired, delete the key, unlock, step down, try to initiate a new campaign. + /// - If not successful, the current node is a follower. Follower should check the lease: + /// - If the lease expired, delete the key and return, try to initiate a new campaign. + /// Caution: The leader may still hold the advisory lock while the lease is expired. The leader should step down in this case. async fn campaign(&self) -> Result<()> { let key = self.election_key().into_bytes(); - let res = self - .client - .query(CAMPAIGN, &[]) - .await - .context(PostgresExecutionSnafu)?; - let leader_key = PgLeaderKey { - name: self.leader_value.clone().into_bytes(), - key: key.clone(), - // TODO: Add rev and lease information - rev: 0, - lease: 0, - }; - - if let Some(row) = res.first() { - let is_locked: bool = row.get(0); - // If the lock is acquired, then the current node is the leader - if is_locked { - let leader_value_with_lease = ValueWithLease { - value: self.leader_value.clone(), - expire_time: time::SystemTime::now() + let mut keep_alive_interval = + tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS)); + loop { + let _ = keep_alive_interval.tick().await; + let res = self + .client + .query(CAMPAIGN, &[]) + .await + .context(PostgresExecutionSnafu)?; + if let Some(row) = res.first() { + // Successfully acquired the advisory lock, leader branch: + if row.get(0) { + let now = time::SystemTime::now() .duration_since(time::UNIX_EPOCH) .unwrap_or_default() - .as_secs_f64() - + META_LEASE_SECS as f64, - }; - // If the leader value is successfully put, then we start the keep alive loop - if self - .put_value_with_lease(&key, &leader_value_with_lease) - .await? - { - let res = self - .client - .query(UNLOCK, &[]) - .await - .context(PostgresExecutionSnafu)?; - match res.first() { - Some(row) => { - let unlocked: bool = row.get(0); - if !unlocked { - return UnexpectedSnafu { - violated: "Failed to unlock the advisory lock".to_string(), + .as_secs_f64(); + let new_leader_value_with_lease = ValueWithLease { + value: self.leader_value.clone(), + expire_time: now + META_LEASE_SECS as f64, + }; + if self.is_leader() { + // Old leader, renew the lease + match self.get_value_with_lease(&key).await? { + Some(prev) => { + if prev.expire_time <= now { + self.step_down().await?; } - .fail(); + self.update_value_with_lease( + &key, + &prev, + &new_leader_value_with_lease, + ) + .await?; } - } - None => { - return UnexpectedSnafu { - violated: "Failed to unlock the advisory lock".to_string(), + // Deleted by other followers since the lease expired, but still hold the lock. Just step down. + None => { + warn!("Leader lease expired, but still hold the lock. Now stepping down."); + self.step_down().await?; } - .fail(); - } - } - let keep_lease_duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS); - let mut keep_alive_interval = tokio::time::interval(keep_lease_duration); - keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - loop { - match timeout( - keep_lease_duration, - self.keep_alive(&key, leader_key.clone(), true), - ) - .await - { - Ok(Ok(())) => { - let _ = keep_alive_interval.tick().await; - } - Ok(Err(err)) => { - error!(err; "Failed to keep alive"); - break; - } - Err(_) => { - error!("Refresh lease timeout"); - break; - } - } - } - - // Step down - if self - .is_leader - .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) - .is_ok() - { - if let Err(e) = self - .leader_watcher - .send(LeaderChangeMessage::StepDown(Arc::new(leader_key))) - { - error!(e; "Failed to send leader change message"); } + } else { + // Newly elected + self.elected().await?; } + // Follower branch } else { - let res = self - .client - .query(UNLOCK, &[]) - .await - .context(PostgresExecutionSnafu)?; - match res.first() { - Some(row) => { - let unlocked: bool = row.get(0); - if !unlocked { - return UnexpectedSnafu { - violated: "Failed to unlock the advisory lock".to_string(), - } - .fail(); - } - } - None => { - return UnexpectedSnafu { - violated: "Failed to unlock the advisory lock".to_string(), - } - .fail(); + let prev = self.get_value_with_lease(&key).await?.ok_or_else(|| { + UnexpectedSnafu { + violated: "Advisory lock held but leader key not found", } + .build() + })?; + if prev.expire_time + <= time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64() + { + self.delete_value(&key).await?; + return Ok(()); } - // Failed to put the leader value, we fall back to the candidate and check the leadership - self.check_leadership(&key).await?; } } else { - self.check_leadership(&key).await?; + return UnexpectedSnafu { + violated: "Failed to acquire the advisory lock".to_string(), + } + .fail(); } } - - Ok(()) } async fn leader(&self) -> Result { @@ -367,7 +337,10 @@ impl Election for PgElection { Ok(self.leader_value.as_bytes().into()) } else { let key = self.election_key().into_bytes(); - let value_with_lease = self.get_value_with_lease(&key).await?; + let value_with_lease = self + .get_value_with_lease(&key) + .await? + .ok_or_else(|| NoLeaderSnafu {}.build())?; if value_with_lease.expire_time > time::SystemTime::now() .duration_since(time::UNIX_EPOCH) @@ -387,7 +360,7 @@ impl Election for PgElection { } impl PgElection { - async fn get_value_with_lease(&self, key: &Vec) -> Result { + async fn get_value_with_lease(&self, key: &Vec) -> Result> { let prev = self .client .query(POINT_GET, &[&key]) @@ -400,15 +373,9 @@ impl PgElection { serde_json::from_str(&value).with_context(|_| DeserializeFromJsonSnafu { input: format!("{value:?}"), })?; - Ok(value_with_lease) + Ok(Some(value_with_lease)) } else { - UnexpectedSnafu { - violated: format!( - "Failed to get value from key: {:?}", - String::from_utf8_lossy(key) - ), - } - .fail() + Ok(None) } } @@ -501,70 +468,57 @@ impl PgElection { Ok(res.len() == 1) } - async fn keep_alive( - &self, - key: &Vec, - leader: PgLeaderKey, - keep_leader: bool, - ) -> Result<()> { - // Check if the current lease has expired - let prev = self.get_value_with_lease(key).await?; - let now = time::SystemTime::now() - .duration_since(time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs_f64(); - - ensure!( - prev.expire_time > now, - UnexpectedSnafu { - violated: format!( - "Failed to renew lease. Lease expired, key: {:?}", - String::from_utf8_lossy(key) - ), - } - ); - - // Renew the lease - let updated = ValueWithLease { - value: prev.value.clone(), - expire_time: now + CANDIDATE_LEASE_SECS as f64, + /// Step down the leader. The leader should delete the key and notify the leader watcher. + /// Do not check if the deletion is successful, since the key may be deleted by other followers. + /// Caution: Should only step down while holding the advisory lock. + async fn step_down(&self) -> Result<()> { + let key = self.election_key().into_bytes(); + let leader_key = PgLeaderKey { + name: self.leader_value.clone().into_bytes(), + key: key.clone(), + ..Default::default() }; - self.update_value_with_lease(key, &prev, &updated).await?; - - if keep_leader - && self - .is_leader - .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) - .is_ok() + if self + .is_leader + .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() { - self.infancy.store(true, Ordering::Relaxed); - - if let Err(e) = self - .leader_watcher - .send(LeaderChangeMessage::Elected(Arc::new(leader))) - { - error!(e; "Failed to send leader change message"); - } + self.delete_value(&key).await?; + self.client + .query(UNLOCK, &[]) + .await + .context(PostgresExecutionSnafu)?; + self.leader_watcher + .send(LeaderChangeMessage::StepDown(Arc::new(leader_key))) + .context(SendLeaderChangeSnafu)?; } Ok(()) } - // Check if the leader is still valid - async fn check_leadership(&self, key: &Vec) -> Result<()> { - let check_interval = Duration::from_secs(META_LEASE_SECS); - let mut check_interval = tokio::time::interval(check_interval); - loop { - let _ = check_interval.tick().await; - let leader_value_with_lease = self.get_value_with_lease(key).await?; - let now = time::SystemTime::now() - .duration_since(time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs_f64(); - if leader_value_with_lease.expire_time <= now { - // Invalidate previous leader - self.delete_value(key).await?; - return Ok(()); - } - } + /// Elected as leader. The leader should put the key and notify the leader watcher. + /// Caution: Should only elected while holding the advisory lock. + async fn elected(&self) -> Result<()> { + let key = self.election_key().into_bytes(); + let leader_key = PgLeaderKey { + name: self.leader_value.clone().into_bytes(), + key: key.clone(), + ..Default::default() + }; + self.put_value_with_lease( + &key, + &ValueWithLease { + value: self.leader_value.clone(), + expire_time: time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64() + + META_LEASE_SECS as f64, + }, + ) + .await?; + self.leader_watcher + .send(LeaderChangeMessage::Elected(Arc::new(leader_key))) + .context(SendLeaderChangeSnafu)?; + Ok(()) } } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 2c99c5e1b235..31a63ce5293c 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -24,6 +24,7 @@ use table::metadata::TableId; use tokio::sync::mpsc::error::SendError; use tonic::codegen::http; +use crate::election::LeaderChangeMessage; use crate::metasrv::SelectTarget; use crate::pubsub::Message; @@ -718,6 +719,14 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to send leader change message"))] + SendLeaderChange { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: tokio::sync::broadcast::error::SendError, + }, } impl Error { @@ -760,6 +769,7 @@ impl ErrorExt for Error { | Error::StartGrpc { .. } | Error::NoEnoughAvailableNode { .. } | Error::PublishMessage { .. } + | Error::SendLeaderChange { .. } | Error::Join { .. } | Error::PeerUnavailable { .. } | Error::ExceededDeadline { .. } From cb021ef0aff635601685bf5fbf3ef3be4db1ecf2 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Fri, 20 Dec 2024 10:46:05 +0800 Subject: [PATCH 8/9] chore: fmt --- src/meta-srv/src/election/postgres.rs | 1 + src/meta-srv/src/error.rs | 2 -- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 85c03b721603..9a1f9c469561 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -504,6 +504,7 @@ impl PgElection { key: key.clone(), ..Default::default() }; + self.delete_value(&key).await?; self.put_value_with_lease( &key, &ValueWithLease { diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 28ee2bf8f52f..60f04eccff8a 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -720,7 +720,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to send leader change message"))] SendLeaderChange { #[snafu(implicit)] @@ -734,7 +733,6 @@ pub enum Error { #[snafu(implicit)] location: Location, source: common_meta::error::Error, - }, } From 8e70124543b9bb71e33ec5ee4764babf1bf3e439 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Fri, 20 Dec 2024 11:36:06 +0800 Subject: [PATCH 9/9] fix: deal with multiple edge cases --- src/meta-srv/src/election/postgres.rs | 133 ++++++++++++++++---------- 1 file changed, 83 insertions(+), 50 deletions(-) diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 9a1f9c469561..237198a46451 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -260,7 +260,6 @@ impl Election for PgElection { /// - If the lease expired, delete the key and return, try to initiate a new campaign. /// Caution: The leader may still hold the advisory lock while the lease is expired. The leader should step down in this case. async fn campaign(&self) -> Result<()> { - let key = self.election_key().into_bytes(); let mut keep_alive_interval = tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS)); loop { @@ -271,56 +270,14 @@ impl Election for PgElection { .await .context(PostgresExecutionSnafu)?; if let Some(row) = res.first() { - // Successfully acquired the advisory lock, leader branch: - if row.get(0) { - let now = time::SystemTime::now() - .duration_since(time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs_f64(); - let new_leader_value_with_lease = ValueWithLease { - value: self.leader_value.clone(), - expire_time: now + META_LEASE_SECS as f64, - }; - if self.is_leader() { - // Old leader, renew the lease - match self.get_value_with_lease(&key).await? { - Some(prev) => { - if prev.expire_time <= now { - self.step_down().await?; - } - self.update_value_with_lease( - &key, - &prev, - &new_leader_value_with_lease, - ) - .await?; - } - // Deleted by other followers since the lease expired, but still hold the lock. Just step down. - None => { - warn!("Leader lease expired, but still hold the lock. Now stepping down."); - self.step_down().await?; - } - } - } else { - // Newly elected - self.elected().await?; - } - // Follower branch - } else { - let prev = self.get_value_with_lease(&key).await?.ok_or_else(|| { - UnexpectedSnafu { - violated: "Advisory lock held but leader key not found", + match row.try_get(0) { + Ok(true) => self.leader_action().await?, + Ok(false) => self.follower_action().await?, + Err(_) => { + return UnexpectedSnafu { + violated: "Failed to acquire the advisory lock".to_string(), } - .build() - })?; - if prev.expire_time - <= time::SystemTime::now() - .duration_since(time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs_f64() - { - self.delete_value(&key).await?; - return Ok(()); + .fail(); } } } else { @@ -522,4 +479,80 @@ impl PgElection { .context(SendLeaderChangeSnafu)?; Ok(()) } + + /// Leader failed to acquire the advisory lock, just step down and tell the leader watcher. + async fn step_down_without_lock(&self) -> Result<()> { + let key = self.election_key().into_bytes(); + let leader_key = PgLeaderKey { + name: self.leader_value.clone().into_bytes(), + key: key.clone(), + ..Default::default() + }; + if self + .is_leader + .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + self.leader_watcher + .send(LeaderChangeMessage::StepDown(Arc::new(leader_key))) + .context(SendLeaderChangeSnafu)?; + } + Ok(()) + } + + async fn leader_action(&self) -> Result<()> { + let key = self.election_key().into_bytes(); + let now = time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64(); + let new_leader_value_with_lease = ValueWithLease { + value: self.leader_value.clone(), + expire_time: now + META_LEASE_SECS as f64, + }; + if self.is_leader() { + // Old leader, renew the lease + match self.get_value_with_lease(&key).await? { + Some(prev) => { + if prev.value != self.leader_value || prev.expire_time <= now { + self.step_down().await?; + } + self.update_value_with_lease(&key, &prev, &new_leader_value_with_lease) + .await?; + } + None => { + warn!("Leader lease not found, but still hold the lock. Now stepping down."); + self.step_down().await?; + } + } + } else { + // Newly elected + self.elected().await?; + } + Ok(()) + } + + async fn follower_action(&self) -> Result<()> { + let key = self.election_key().into_bytes(); + // Previously held the advisory lock, but failed to acquire it. Step down. + if self.is_leader() { + self.step_down_without_lock().await?; + } + let prev = self.get_value_with_lease(&key).await?.ok_or_else(|| { + UnexpectedSnafu { + violated: "Advisory lock held by others but leader key not found", + } + .build() + })?; + if prev.expire_time + <= time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64() + { + warn!("Leader lease expired, now re-init campaign."); + return Err(NoLeaderSnafu {}.build()); + } + Ok(()) + } }