From 089a04bbf12965759991399a4ab33d7600e42bd6 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Thu, 19 Dec 2024 11:40:01 +0800 Subject: [PATCH 01/23] feat: init PgElection fix: release advisory lock fix: handle duplicate keys chore: update comments fix: unlock if acquired the lock chore: add TODO and avoid unwrap refactor: check both lock and expire time, add more comments chore: fmt fix: deal with multiple edge cases feat: init PgElection with candidate registration chore: fmt chore: remove --- src/common/meta/src/kv_backend/postgres.rs | 10 +- src/meta-srv/src/bootstrap.rs | 16 +- src/meta-srv/src/election.rs | 19 +- src/meta-srv/src/election/etcd.rs | 54 ++-- src/meta-srv/src/election/postgres.rs | 301 +++++++++++++++++++++ src/meta-srv/src/error.rs | 12 + 6 files changed, 380 insertions(+), 32 deletions(-) create mode 100644 src/meta-srv/src/election/postgres.rs diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs index 8add65cd49c4..23c506761a9d 100644 --- a/src/common/meta/src/kv_backend/postgres.rs +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -45,9 +45,9 @@ const METADKV_CREATION: &str = const FULL_TABLE_SCAN: &str = "SELECT k, v FROM greptime_metakv $1 ORDER BY K"; -const POINT_GET: &str = "SELECT k, v FROM greptime_metakv WHERE k = $1"; +pub const POINT_GET: &str = "SELECT k, v FROM greptime_metakv WHERE k = $1"; -const PREFIX_SCAN: &str = "SELECT k, v FROM greptime_metakv WHERE k LIKE $1 ORDER BY K"; +pub const PREFIX_SCAN: &str = "SELECT k, v FROM greptime_metakv WHERE k LIKE $1 ORDER BY K"; const RANGE_SCAN_LEFT_BOUNDED: &str = "SELECT k, v FROM greptime_metakv WHERE k >= $1 ORDER BY K"; @@ -56,7 +56,7 @@ const RANGE_SCAN_FULL_RANGE: &str = const FULL_TABLE_DELETE: &str = "DELETE FROM greptime_metakv RETURNING k,v"; -const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE K = $1 RETURNING k,v;"; +pub const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE K = $1 RETURNING k,v;"; const PREFIX_DELETE: &str = "DELETE FROM greptime_metakv WHERE k LIKE $1 RETURNING k,v;"; @@ -65,7 +65,7 @@ const RANGE_DELETE_LEFT_BOUNDED: &str = "DELETE FROM greptime_metakv WHERE k >= const RANGE_DELETE_FULL_RANGE: &str = "DELETE FROM greptime_metakv WHERE k >= $1 AND K < $2 RETURNING k,v;"; -const CAS: &str = r#" +pub const CAS: &str = r#" WITH prev AS ( SELECT k,v FROM greptime_metakv WHERE k = $1 AND v = $2 ), update AS ( @@ -79,7 +79,7 @@ WHERE SELECT k, v FROM prev; "#; -const PUT_IF_NOT_EXISTS: &str = r#" +pub const PUT_IF_NOT_EXISTS: &str = r#" WITH prev AS ( select k,v from greptime_metakv where k = $1 ), insert AS ( diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 85770e1f3d4d..4d12caa41fec 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -46,6 +46,8 @@ use tonic::transport::server::{Router, TcpIncoming}; use crate::election::etcd::EtcdElection; #[cfg(feature = "pg_kvbackend")] +use crate::election::postgres::PgElection; +#[cfg(feature = "pg_kvbackend")] use crate::error::InvalidArgumentsSnafu; use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu}; use crate::metasrv::builder::MetasrvBuilder; @@ -224,9 +226,17 @@ pub async fn metasrv_builder( #[cfg(feature = "pg_kvbackend")] (None, BackendImpl::PostgresStore) => { let pg_client = create_postgres_client(opts).await?; - let kv_backend = PgStore::with_pg_client(pg_client).await.unwrap(); - // TODO(jeremy, weny): implement election for postgres - (kv_backend, None) + let kv_backend = PgStore::with_pg_client(pg_client) + .await + .context(error::KvBackendSnafu)?; + let election_client = create_postgres_client(opts).await?; + let election = PgElection::with_pg_client( + opts.server_addr.clone(), + election_client, + opts.store_key_prefix.clone(), + ) + .await?; + (kv_backend, Some(election)) } }; diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index d73f453a88f0..4db7814a2846 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -13,11 +13,12 @@ // limitations under the License. pub mod etcd; +#[cfg(feature = "pg_kvbackend")] +pub mod postgres; -use std::fmt; +use std::fmt::{self, Debug}; use std::sync::Arc; -use etcd_client::LeaderKey; use tokio::sync::broadcast::Receiver; use crate::error::Result; @@ -26,10 +27,20 @@ use crate::metasrv::MetasrvNodeInfo; pub const ELECTION_KEY: &str = "__metasrv_election"; pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/"; +const CANDIDATE_LEASE_SECS: u64 = 600; +const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2; + #[derive(Debug, Clone)] pub enum LeaderChangeMessage { - Elected(Arc), - StepDown(Arc), + Elected(Arc), + StepDown(Arc), +} + +pub trait LeaderKey: Send + Sync + Debug { + fn name(&self) -> &[u8]; + fn key(&self) -> &[u8]; + fn rev(&self) -> i64; + fn lease(&self) -> i64; } impl fmt::Display for LeaderChangeMessage { diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index fef7e928a783..274a6a7e4792 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -18,18 +18,41 @@ use std::time::Duration; use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS}; use common_telemetry::{error, info, warn}; -use etcd_client::{Client, GetOptions, LeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions}; +use etcd_client::{ + Client, GetOptions, LeaderKey as EtcdLeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions, +}; use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::Receiver; use tokio::time::{timeout, MissedTickBehavior}; -use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY}; +use crate::election::{ + Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY, + KEEP_ALIVE_INTERVAL_SECS, +}; use crate::error; use crate::error::Result; use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; +impl LeaderKey for EtcdLeaderKey { + fn name(&self) -> &[u8] { + self.name() + } + + fn key(&self) -> &[u8] { + self.key() + } + + fn rev(&self) -> i64 { + self.rev() + } + + fn lease(&self) -> i64 { + self.lease() + } +} + pub struct EtcdElection { leader_value: String, client: Client, @@ -75,14 +98,14 @@ impl EtcdElection { LeaderChangeMessage::Elected(key) => { info!( "[{leader_ident}] is elected as leader: {:?}, lease: {}", - key.name_str(), + String::from_utf8_lossy(key.name()), key.lease() ); } LeaderChangeMessage::StepDown(key) => { warn!( "[{leader_ident}] is stepping down: {:?}, lease: {}", - key.name_str(), + String::from_utf8_lossy(key.name()), key.lease() ); } @@ -133,9 +156,6 @@ impl Election for EtcdElection { } async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> { - const CANDIDATE_LEASE_SECS: u64 = 600; - const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2; - let mut lease_client = self.client.lease_client(); let res = lease_client .grant(CANDIDATE_LEASE_SECS as i64, None) @@ -239,7 +259,7 @@ impl Election for EtcdElection { // The keep alive operation MUST be done in `META_KEEP_ALIVE_INTERVAL_SECS`. match timeout( keep_lease_duration, - self.keep_alive(&mut keeper, &mut receiver, leader), + self.keep_alive(&mut keeper, &mut receiver, leader.clone()), ) .await { @@ -262,12 +282,9 @@ impl Election for EtcdElection { .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) .is_ok() { - if let Err(e) = self - .leader_watcher + self.leader_watcher .send(LeaderChangeMessage::StepDown(Arc::new(leader.clone()))) - { - error!(e; "Failed to send leader change message"); - } + .context(error::SendLeaderChangeSnafu)?; } } @@ -303,7 +320,7 @@ impl EtcdElection { &self, keeper: &mut LeaseKeeper, receiver: &mut LeaseKeepAliveStream, - leader: &LeaderKey, + leader: EtcdLeaderKey, ) -> Result<()> { keeper.keep_alive().await.context(error::EtcdFailedSnafu)?; if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? { @@ -322,12 +339,9 @@ impl EtcdElection { { self.infancy.store(true, Ordering::Relaxed); - if let Err(e) = self - .leader_watcher - .send(LeaderChangeMessage::Elected(Arc::new(leader.clone()))) - { - error!(e; "Failed to send leader change message"); - } + self.leader_watcher + .send(LeaderChangeMessage::Elected(Arc::new(leader))) + .context(error::SendLeaderChangeSnafu)?; } } diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs new file mode 100644 index 000000000000..c1150e30e3f1 --- /dev/null +++ b/src/meta-srv/src/election/postgres.rs @@ -0,0 +1,301 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::{self, Duration}; + +use common_meta::kv_backend::postgres::{ + CAS, POINT_DELETE, POINT_GET, PREFIX_SCAN, PUT_IF_NOT_EXISTS, +}; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, ResultExt}; +use tokio::sync::broadcast; +use tokio_postgres::Client; + +use crate::election::{ + Election, LeaderChangeMessage, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY, + KEEP_ALIVE_INTERVAL_SECS, +}; +use crate::error::{ + DeserializeFromJsonSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu, UnexpectedSnafu, +}; +use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; + +/// Value with a expire time. The expire time is in seconds since UNIX epoch. +#[derive(Debug, Serialize, Deserialize, Default)] +struct ValueWithLease { + value: String, + expire_time: f64, +} + +/// PostgreSql implementation of Election. +pub struct PgElection { + leader_value: String, + client: Client, + is_leader: AtomicBool, + infancy: AtomicBool, + leader_watcher: broadcast::Sender, + store_key_prefix: String, +} + +impl PgElection { + pub async fn with_pg_client( + leader_value: String, + client: Client, + store_key_prefix: String, + ) -> Result { + let (tx, _) = broadcast::channel(100); + Ok(Arc::new(Self { + leader_value, + client, + is_leader: AtomicBool::new(false), + infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix, + })) + } + + fn _election_key(&self) -> String { + format!("{}{}", self.store_key_prefix, ELECTION_KEY) + } + + fn candidate_root(&self) -> String { + format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT) + } + + fn candidate_key(&self) -> String { + format!("{}{}", self.candidate_root(), self.leader_value) + } +} + +#[async_trait::async_trait] +impl Election for PgElection { + type Leader = LeaderValue; + + fn subscribe_leader_change(&self) -> broadcast::Receiver { + self.leader_watcher.subscribe() + } + + fn is_leader(&self) -> bool { + self.is_leader.load(Ordering::Relaxed) + } + + fn in_infancy(&self) -> bool { + self.infancy + .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + } + + async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> { + let key = self.candidate_key().into_bytes(); + let node_info = + serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu { + input: format!("{node_info:?}"), + })?; + let value_with_lease = ValueWithLease { + value: node_info, + expire_time: time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64() + + CANDIDATE_LEASE_SECS as f64, + }; + let res = self.put_value_with_lease(&key, &value_with_lease).await?; + // May registered before, check if the lease expired. If so, delete and re-register. + if !res { + let prev = self.get_value_with_lease(&key).await?.unwrap_or_default(); + if prev.expire_time + <= time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64() + { + self.delete_value(&key).await?; + self.put_value_with_lease(&key, &value_with_lease).await?; + } + } + + // Check if the current lease has expired and renew the lease. + let mut keep_alive_interval = + tokio::time::interval(Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS)); + loop { + let _ = keep_alive_interval.tick().await; + + let prev = self.get_value_with_lease(&key).await?.unwrap_or_default(); + let now = time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64(); + + ensure!( + prev.expire_time > now, + UnexpectedSnafu { + violated: format!( + "Candidate lease expired, key: {:?}", + String::from_utf8_lossy(&key) + ), + } + ); + + let updated = ValueWithLease { + value: prev.value.clone(), + expire_time: now + CANDIDATE_LEASE_SECS as f64, + }; + self.update_value_with_lease(&key, &prev, &updated).await?; + } + } + + async fn all_candidates(&self) -> Result> { + let key_prefix = self.candidate_root().into_bytes(); + let mut candidates = self.get_value_with_lease_by_prefix(&key_prefix).await?; + let now = time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64(); + // Remove expired candidates + candidates.retain(|c| c.expire_time > now); + let mut valid_candidates = Vec::with_capacity(candidates.len()); + for c in candidates { + let node_info: MetasrvNodeInfo = + serde_json::from_str(&c.value).with_context(|_| DeserializeFromJsonSnafu { + input: format!("{:?}", c.value), + })?; + valid_candidates.push(node_info); + } + Ok(valid_candidates) + } + + async fn campaign(&self) -> Result<()> { + todo!() + } + + async fn leader(&self) -> Result { + todo!() + } + + async fn resign(&self) -> Result<()> { + todo!() + } +} + +impl PgElection { + async fn get_value_with_lease(&self, key: &Vec) -> Result> { + let prev = self + .client + .query(POINT_GET, &[&key]) + .await + .context(PostgresExecutionSnafu)?; + + if let Some(row) = prev.first() { + let value: String = row.get(0); + let value_with_lease: ValueWithLease = + serde_json::from_str(&value).with_context(|_| DeserializeFromJsonSnafu { + input: format!("{value:?}"), + })?; + Ok(Some(value_with_lease)) + } else { + Ok(None) + } + } + + async fn get_value_with_lease_by_prefix( + &self, + key_prefix: &Vec, + ) -> Result> { + let prev = self + .client + .query(PREFIX_SCAN, &[key_prefix]) + .await + .context(PostgresExecutionSnafu)?; + + let mut res = Vec::new(); + for row in prev { + let value: String = row.get(0); + let value_with_lease: ValueWithLease = + serde_json::from_str(&value).with_context(|_| DeserializeFromJsonSnafu { + input: format!("{value:?}"), + })?; + res.push(value_with_lease); + } + + Ok(res) + } + + async fn update_value_with_lease( + &self, + key: &Vec, + prev: &ValueWithLease, + updated: &ValueWithLease, + ) -> Result<()> { + let prev = serde_json::to_string(prev) + .with_context(|_| SerializeToJsonSnafu { + input: format!("{prev:?}"), + })? + .into_bytes(); + + let updated = serde_json::to_string(updated) + .with_context(|_| SerializeToJsonSnafu { + input: format!("{updated:?}"), + })? + .into_bytes(); + + let res = self + .client + .query(CAS, &[key, &prev, &updated]) + .await + .context(PostgresExecutionSnafu)?; + + // CAS operation will return the updated value if the operation is successful + match res.is_empty() { + false => Ok(()), + true => UnexpectedSnafu { + violated: format!( + "Failed to update value from key: {:?}", + String::from_utf8_lossy(key) + ), + } + .fail(), + } + } + + /// Returns `true` if the insertion is successful + async fn put_value_with_lease(&self, key: &Vec, value: &ValueWithLease) -> Result { + let value = serde_json::to_string(value) + .with_context(|_| SerializeToJsonSnafu { + input: format!("{value:?}"), + })? + .into_bytes(); + + let res = self + .client + .query(PUT_IF_NOT_EXISTS, &[key, &value]) + .await + .context(PostgresExecutionSnafu)?; + + Ok(res.is_empty()) + } + + /// Returns `true` if the deletion is successful. + /// Caution: Should only delete the key if the lease is expired. + async fn delete_value(&self, key: &Vec) -> Result { + let res = self + .client + .query(POINT_DELETE, &[key]) + .await + .context(PostgresExecutionSnafu)?; + + Ok(res.len() == 1) + } +} diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 1c529f06d606..60f04eccff8a 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -24,6 +24,7 @@ use table::metadata::TableId; use tokio::sync::mpsc::error::SendError; use tonic::codegen::http; +use crate::election::LeaderChangeMessage; use crate::metasrv::SelectTarget; use crate::pubsub::Message; @@ -697,6 +698,8 @@ pub enum Error { #[cfg(feature = "pg_kvbackend")] #[snafu(display("Failed to execute via postgres"))] PostgresExecution { + #[snafu(source)] + error: tokio_postgres::Error, #[snafu(implicit)] location: Location, }, @@ -717,6 +720,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to send leader change message"))] + SendLeaderChange { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: tokio::sync::broadcast::error::SendError, + }, + #[snafu(display("Flow state handler error"))] FlowStateHandler { #[snafu(implicit)] @@ -765,6 +776,7 @@ impl ErrorExt for Error { | Error::StartGrpc { .. } | Error::NoEnoughAvailableNode { .. } | Error::PublishMessage { .. } + | Error::SendLeaderChange { .. } | Error::Join { .. } | Error::PeerUnavailable { .. } | Error::ExceededDeadline { .. } From 4b05bdbd2e7e398b154f4d0380d60a84a7dc42e0 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Fri, 20 Dec 2024 14:09:31 +0800 Subject: [PATCH 02/23] test: add unit test for pg candidate registration --- src/meta-srv/src/election/postgres.rs | 165 ++++++++++++++++++++++++++ 1 file changed, 165 insertions(+) diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index c1150e30e3f1..81708b6f9289 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -299,3 +299,168 @@ impl PgElection { Ok(res.len() == 1) } } + +#[cfg(test)] +mod tests { + use std::env; + + use tokio_postgres::NoTls; + + use super::*; + + #[tokio::test] + async fn test_postgres_crud() { + let endpoint = env::var("GT_PG_ENDPOINTS").unwrap_or_default(); + let (client, connection) = tokio_postgres::connect(&endpoint, NoTls) + .await + .expect("Failed to connect to Postgres"); + + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); + + let key = b"test_key".to_vec(); + let value = ValueWithLease { + value: "test_value".to_string(), + expire_time: 0.0, + }; + + let (tx, _) = broadcast::channel(100); + let pg_election = PgElection { + leader_value: "test_leader".to_string(), + client, + is_leader: AtomicBool::new(false), + infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: "test_prefix".to_string(), + }; + + let res = pg_election + .put_value_with_lease(&key, &value) + .await + .unwrap(); + assert!(res); + + let res = pg_election + .get_value_with_lease(&key) + .await + .unwrap() + .unwrap(); + assert_eq!(res.value, value.value); + + let res = pg_election.delete_value(&key).await.unwrap(); + assert!(res); + + let res = pg_election.get_value_with_lease(&key).await.unwrap(); + assert!(res.is_none()); + + for i in 0..10 { + let key = format!("test_key_{}", i).into_bytes(); + let value = ValueWithLease { + value: format!("test_value_{}", i), + expire_time: 0.0, + }; + pg_election + .put_value_with_lease(&key, &value) + .await + .unwrap(); + } + + let key_prefix = "test_key".to_string().into_bytes(); + let res = pg_election + .get_value_with_lease_by_prefix(&key_prefix) + .await + .unwrap(); + assert_eq!(res.len(), 10); + + for i in 0..10 { + let key = format!("test_key_{}", i).into_bytes(); + let res = pg_election.delete_value(&key).await.unwrap(); + assert!(res); + } + + let res = pg_election + .get_value_with_lease_by_prefix(&key_prefix) + .await + .unwrap(); + assert!(res.is_empty()); + } + + async fn candidate(leader_value: String) { + let endpoint = env::var("GT_PG_ENDPOINTS").unwrap_or_default(); + let (client, connection) = tokio_postgres::connect(&endpoint, NoTls) + .await + .expect("Failed to connect to Postgres"); + + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); + + let (tx, _) = broadcast::channel(100); + let pg_election = PgElection { + leader_value, + client, + is_leader: AtomicBool::new(false), + infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: "test_prefix".to_string(), + }; + + let node_info = MetasrvNodeInfo { + addr: "test_addr".to_string(), + version: "test_version".to_string(), + git_commit: "test_git_commit".to_string(), + start_time_ms: 0, + }; + pg_election.register_candidate(&node_info).await.unwrap(); + } + + #[tokio::test] + async fn test_candidate_registration() { + let leader_value = "test_leader".to_string(); + let mut handles = vec![]; + for _ in 0..10 { + let handle = tokio::spawn(candidate(leader_value.clone())); + handles.push(handle); + } + // TODO: Reduce the sleep time by make the lease time configurable. + tokio::time::sleep(Duration::from_secs(10)).await; + + let endpoint = env::var("GT_PG_ENDPOINTS").unwrap_or_default(); + let (client, connection) = tokio_postgres::connect(&endpoint, NoTls) + .await + .expect("Failed to connect to Postgres"); + + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); + + let (tx, _) = broadcast::channel(100); + let pg_election = PgElection { + leader_value, + client, + is_leader: AtomicBool::new(false), + infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: "test_prefix".to_string(), + }; + + let candidates = pg_election.all_candidates().await.unwrap(); + assert_eq!(candidates.len(), 10); + + for handle in handles { + handle.abort(); + } + + // Wait for the candidate lease to expire. + tokio::time::sleep(Duration::from_secs(10)).await; + let candidates = pg_election.all_candidates().await.unwrap(); + assert!(candidates.is_empty()); + } +} From 0bdedab19c997866bf432544b75f77b158ebe97a Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Fri, 20 Dec 2024 18:09:25 +0800 Subject: [PATCH 03/23] test: add unit test for pg candidate registration --- src/meta-srv/src/bootstrap.rs | 3 + src/meta-srv/src/election.rs | 13 +- src/meta-srv/src/election/postgres.rs | 181 +++++++++++++++----------- 3 files changed, 117 insertions(+), 80 deletions(-) diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 4d12caa41fec..b9804ad93cdb 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -57,6 +57,8 @@ use crate::selector::load_based::LoadBasedSelector; use crate::selector::round_robin::RoundRobinSelector; use crate::selector::SelectorType; use crate::service::admin; +#[cfg(feature = "pg_kvbackend")] +use crate::election::CANDIDATE_LEASE_SECS; use crate::{error, Result}; pub struct MetasrvInstance { @@ -234,6 +236,7 @@ pub async fn metasrv_builder( opts.server_addr.clone(), election_client, opts.store_key_prefix.clone(), + CANDIDATE_LEASE_SECS, ) .await?; (kv_backend, Some(election)) diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index 4db7814a2846..05fe6446b882 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -27,19 +27,30 @@ use crate::metasrv::MetasrvNodeInfo; pub const ELECTION_KEY: &str = "__metasrv_election"; pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/"; -const CANDIDATE_LEASE_SECS: u64 = 600; +pub(crate) const CANDIDATE_LEASE_SECS: u64 = 600; const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2; +/// Messages sent when the leader changes. #[derive(Debug, Clone)] pub enum LeaderChangeMessage { Elected(Arc), StepDown(Arc), } +/// LeaderKey is a key that represents the leader of metasrv. +/// The structure is correponding to [etcd_client::LeaderKey]. pub trait LeaderKey: Send + Sync + Debug { + /// The name in byte. name is the election identifier that corresponds to the leadership key. fn name(&self) -> &[u8]; + + /// The key in byte. key is an opaque key representing the ownership of the election. If the key + /// is deleted, then leadership is lost. fn key(&self) -> &[u8]; + + /// The creation revision of the key. fn rev(&self) -> i64; + + /// The lease ID of the election leader. fn lease(&self) -> i64; } diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 81708b6f9289..f9618c879261 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -26,7 +26,6 @@ use tokio_postgres::Client; use crate::election::{ Election, LeaderChangeMessage, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY, - KEEP_ALIVE_INTERVAL_SECS, }; use crate::error::{ DeserializeFromJsonSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu, UnexpectedSnafu, @@ -48,6 +47,7 @@ pub struct PgElection { infancy: AtomicBool, leader_watcher: broadcast::Sender, store_key_prefix: String, + candidate_lease_ttl: u64, } impl PgElection { @@ -55,6 +55,7 @@ impl PgElection { leader_value: String, client: Client, store_key_prefix: String, + candidate_lease_ttl: u64, ) -> Result { let (tx, _) = broadcast::channel(100); Ok(Arc::new(Self { @@ -64,6 +65,7 @@ impl PgElection { infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix, + candidate_lease_ttl, })) } @@ -99,7 +101,7 @@ impl Election for PgElection { } async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> { - let key = self.candidate_key().into_bytes(); + let key = self.candidate_key(); let node_info = serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu { input: format!("{node_info:?}"), @@ -110,26 +112,18 @@ impl Election for PgElection { .duration_since(time::UNIX_EPOCH) .unwrap_or_default() .as_secs_f64() - + CANDIDATE_LEASE_SECS as f64, + + self.candidate_lease_ttl as f64, }; let res = self.put_value_with_lease(&key, &value_with_lease).await?; - // May registered before, check if the lease expired. If so, delete and re-register. + // May registered before, just update the lease. if !res { - let prev = self.get_value_with_lease(&key).await?.unwrap_or_default(); - if prev.expire_time - <= time::SystemTime::now() - .duration_since(time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs_f64() - { - self.delete_value(&key).await?; - self.put_value_with_lease(&key, &value_with_lease).await?; - } + self.delete_value(&key).await?; + self.put_value_with_lease(&key, &value_with_lease).await?; } // Check if the current lease has expired and renew the lease. let mut keep_alive_interval = - tokio::time::interval(Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS)); + tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl / 2)); loop { let _ = keep_alive_interval.tick().await; @@ -144,7 +138,7 @@ impl Election for PgElection { UnexpectedSnafu { violated: format!( "Candidate lease expired, key: {:?}", - String::from_utf8_lossy(&key) + String::from_utf8_lossy(&key.into_bytes()) ), } ); @@ -158,7 +152,7 @@ impl Election for PgElection { } async fn all_candidates(&self) -> Result> { - let key_prefix = self.candidate_root().into_bytes(); + let key_prefix = self.candidate_root(); let mut candidates = self.get_value_with_lease_by_prefix(&key_prefix).await?; let now = time::SystemTime::now() .duration_since(time::UNIX_EPOCH) @@ -191,15 +185,15 @@ impl Election for PgElection { } impl PgElection { - async fn get_value_with_lease(&self, key: &Vec) -> Result> { + async fn get_value_with_lease(&self, key: &String) -> Result> { let prev = self .client - .query(POINT_GET, &[&key]) + .query(POINT_GET, &[key]) .await .context(PostgresExecutionSnafu)?; if let Some(row) = prev.first() { - let value: String = row.get(0); + let value: String = row.get(1); let value_with_lease: ValueWithLease = serde_json::from_str(&value).with_context(|_| DeserializeFromJsonSnafu { input: format!("{value:?}"), @@ -212,17 +206,18 @@ impl PgElection { async fn get_value_with_lease_by_prefix( &self, - key_prefix: &Vec, + key_prefix: &String, ) -> Result> { + let key_prefix = format!("{}%", key_prefix); let prev = self .client - .query(PREFIX_SCAN, &[key_prefix]) + .query(PREFIX_SCAN, &[&key_prefix]) .await .context(PostgresExecutionSnafu)?; let mut res = Vec::new(); for row in prev { - let value: String = row.get(0); + let value: String = row.get(1); let value_with_lease: ValueWithLease = serde_json::from_str(&value).with_context(|_| DeserializeFromJsonSnafu { input: format!("{value:?}"), @@ -235,21 +230,17 @@ impl PgElection { async fn update_value_with_lease( &self, - key: &Vec, + key: &String, prev: &ValueWithLease, updated: &ValueWithLease, ) -> Result<()> { - let prev = serde_json::to_string(prev) - .with_context(|_| SerializeToJsonSnafu { - input: format!("{prev:?}"), - })? - .into_bytes(); - - let updated = serde_json::to_string(updated) - .with_context(|_| SerializeToJsonSnafu { - input: format!("{updated:?}"), - })? - .into_bytes(); + let prev = serde_json::to_string(prev).with_context(|_| SerializeToJsonSnafu { + input: format!("{prev:?}"), + })?; + + let updated = serde_json::to_string(updated).with_context(|_| SerializeToJsonSnafu { + input: format!("{updated:?}"), + })?; let res = self .client @@ -262,8 +253,8 @@ impl PgElection { false => Ok(()), true => UnexpectedSnafu { violated: format!( - "Failed to update value from key: {:?}", - String::from_utf8_lossy(key) + "CAS operation failed, key: {:?}", + String::from_utf8_lossy(&key.clone().into_bytes()) ), } .fail(), @@ -271,12 +262,10 @@ impl PgElection { } /// Returns `true` if the insertion is successful - async fn put_value_with_lease(&self, key: &Vec, value: &ValueWithLease) -> Result { - let value = serde_json::to_string(value) - .with_context(|_| SerializeToJsonSnafu { - input: format!("{value:?}"), - })? - .into_bytes(); + async fn put_value_with_lease(&self, key: &String, value: &ValueWithLease) -> Result { + let value = serde_json::to_string(value).with_context(|_| SerializeToJsonSnafu { + input: format!("{value:?}"), + })?; let res = self .client @@ -289,7 +278,7 @@ impl PgElection { /// Returns `true` if the deletion is successful. /// Caution: Should only delete the key if the lease is expired. - async fn delete_value(&self, key: &Vec) -> Result { + async fn delete_value(&self, key: &String) -> Result { let res = self .client .query(POINT_DELETE, &[key]) @@ -308,20 +297,55 @@ mod tests { use super::*; - #[tokio::test] - async fn test_postgres_crud() { - let endpoint = env::var("GT_PG_ENDPOINTS").unwrap_or_default(); - let (client, connection) = tokio_postgres::connect(&endpoint, NoTls) + async fn create_postgres_client(addr: &str) -> Result { + if addr.is_empty() { + return UnexpectedSnafu { + violated: "Postgres address is empty".to_string(), + } + .fail(); + } + let (client, connection) = tokio_postgres::connect(&addr, NoTls) .await - .expect("Failed to connect to Postgres"); - + .context(PostgresExecutionSnafu)?; tokio::spawn(async move { if let Err(e) = connection.await { eprintln!("connection error: {}", e); } }); + Ok(client) + } + + #[tokio::test] + async fn temp_test() { + let endpoint = env::var("GT_PG_ADDR").unwrap_or_default(); + let client = create_postgres_client(&endpoint).await.unwrap(); + client + .execute( + "CREATE TABLE IF NOT EXISTS greptime_metakv(k varchar PRIMARY KEY, v varchar)", + &[], + ) + .await + .expect("Failed to create metadkv table"); + + client + .execute("DELETE FROM greptime_metakv", &[]) + .await + .expect("Failed to delete metakv table"); + } - let key = b"test_key".to_vec(); + #[tokio::test] + async fn test_postgres_crud() { + let endpoint = env::var("GT_PG_ADDR").unwrap_or_default(); + let client = create_postgres_client(&endpoint).await.unwrap(); + client + .execute( + "CREATE TABLE IF NOT EXISTS greptime_metakv(k varchar PRIMARY KEY, v varchar)", + &[], + ) + .await + .expect("Failed to create metadkv table"); + + let key = "test_key".to_string(); let value = ValueWithLease { value: "test_value".to_string(), expire_time: 0.0, @@ -335,6 +359,7 @@ mod tests { infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: "test_prefix".to_string(), + candidate_lease_ttl: 10, }; let res = pg_election @@ -357,7 +382,7 @@ mod tests { assert!(res.is_none()); for i in 0..10 { - let key = format!("test_key_{}", i).into_bytes(); + let key = format!("test_key_{}", i); let value = ValueWithLease { value: format!("test_value_{}", i), expire_time: 0.0, @@ -368,7 +393,7 @@ mod tests { .unwrap(); } - let key_prefix = "test_key".to_string().into_bytes(); + let key_prefix = "test_key".to_string(); let res = pg_election .get_value_with_lease_by_prefix(&key_prefix) .await @@ -376,7 +401,7 @@ mod tests { assert_eq!(res.len(), 10); for i in 0..10 { - let key = format!("test_key_{}", i).into_bytes(); + let key = format!("test_key_{}", i); let res = pg_election.delete_value(&key).await.unwrap(); assert!(res); } @@ -389,16 +414,8 @@ mod tests { } async fn candidate(leader_value: String) { - let endpoint = env::var("GT_PG_ENDPOINTS").unwrap_or_default(); - let (client, connection) = tokio_postgres::connect(&endpoint, NoTls) - .await - .expect("Failed to connect to Postgres"); - - tokio::spawn(async move { - if let Err(e) = connection.await { - eprintln!("connection error: {}", e); - } - }); + let endpoint = env::var("GT_PG_ADDR").unwrap_or_default(); + let client = create_postgres_client(&endpoint).await.unwrap(); let (tx, _) = broadcast::channel(100); let pg_election = PgElection { @@ -408,6 +425,7 @@ mod tests { infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: "test_prefix".to_string(), + candidate_lease_ttl: 10, }; let node_info = MetasrvNodeInfo { @@ -421,27 +439,21 @@ mod tests { #[tokio::test] async fn test_candidate_registration() { - let leader_value = "test_leader".to_string(); + let leader_value_prefix = "test_leader".to_string(); let mut handles = vec![]; - for _ in 0..10 { + for i in 0..10 { + let leader_value = format!("{}{}", leader_value_prefix, i); let handle = tokio::spawn(candidate(leader_value.clone())); handles.push(handle); } - // TODO: Reduce the sleep time by make the lease time configurable. - tokio::time::sleep(Duration::from_secs(10)).await; + // Wait for candidates registrating themselves. + tokio::time::sleep(Duration::from_secs(3)).await; - let endpoint = env::var("GT_PG_ENDPOINTS").unwrap_or_default(); - let (client, connection) = tokio_postgres::connect(&endpoint, NoTls) - .await - .expect("Failed to connect to Postgres"); - - tokio::spawn(async move { - if let Err(e) = connection.await { - eprintln!("connection error: {}", e); - } - }); + let endpoint = env::var("GT_PG_ADDR").unwrap_or_default(); + let client = create_postgres_client(&endpoint).await.unwrap(); let (tx, _) = broadcast::channel(100); + let leader_value = "test_leader".to_string(); let pg_election = PgElection { leader_value, client, @@ -449,6 +461,7 @@ mod tests { infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: "test_prefix".to_string(), + candidate_lease_ttl: 5, }; let candidates = pg_election.all_candidates().await.unwrap(); @@ -458,9 +471,19 @@ mod tests { handle.abort(); } - // Wait for the candidate lease to expire. + // Wait for the candidate leases to expire. tokio::time::sleep(Duration::from_secs(10)).await; let candidates = pg_election.all_candidates().await.unwrap(); assert!(candidates.is_empty()); + + // Garbage collection + for i in 0..10 { + let key = format!( + "{}{}{}{}", + "test_prefix", CANDIDATES_ROOT, leader_value_prefix, i + ); + let res = pg_election.delete_value(&key).await.unwrap(); + assert!(res); + } } } From 8ad864f3ee03907f91e3ddda3e82329ba37978c2 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Fri, 20 Dec 2024 18:13:15 +0800 Subject: [PATCH 04/23] chore: update pg env --- src/meta-srv/src/bootstrap.rs | 4 ++-- src/meta-srv/src/election/postgres.rs | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index b9804ad93cdb..78c28b6fbf96 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -48,6 +48,8 @@ use crate::election::etcd::EtcdElection; #[cfg(feature = "pg_kvbackend")] use crate::election::postgres::PgElection; #[cfg(feature = "pg_kvbackend")] +use crate::election::CANDIDATE_LEASE_SECS; +#[cfg(feature = "pg_kvbackend")] use crate::error::InvalidArgumentsSnafu; use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu}; use crate::metasrv::builder::MetasrvBuilder; @@ -57,8 +59,6 @@ use crate::selector::load_based::LoadBasedSelector; use crate::selector::round_robin::RoundRobinSelector; use crate::selector::SelectorType; use crate::service::admin; -#[cfg(feature = "pg_kvbackend")] -use crate::election::CANDIDATE_LEASE_SECS; use crate::{error, Result}; pub struct MetasrvInstance { diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index f9618c879261..52bf41825eca 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -317,7 +317,7 @@ mod tests { #[tokio::test] async fn temp_test() { - let endpoint = env::var("GT_PG_ADDR").unwrap_or_default(); + let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default(); let client = create_postgres_client(&endpoint).await.unwrap(); client .execute( @@ -335,7 +335,7 @@ mod tests { #[tokio::test] async fn test_postgres_crud() { - let endpoint = env::var("GT_PG_ADDR").unwrap_or_default(); + let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default(); let client = create_postgres_client(&endpoint).await.unwrap(); client .execute( @@ -414,7 +414,7 @@ mod tests { } async fn candidate(leader_value: String) { - let endpoint = env::var("GT_PG_ADDR").unwrap_or_default(); + let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default(); let client = create_postgres_client(&endpoint).await.unwrap(); let (tx, _) = broadcast::channel(100); @@ -449,7 +449,7 @@ mod tests { // Wait for candidates registrating themselves. tokio::time::sleep(Duration::from_secs(3)).await; - let endpoint = env::var("GT_PG_ADDR").unwrap_or_default(); + let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default(); let client = create_postgres_client(&endpoint).await.unwrap(); let (tx, _) = broadcast::channel(100); From 4ef916666ed14dbec4adb716fce79dc04af2f15e Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Fri, 20 Dec 2024 18:30:59 +0800 Subject: [PATCH 05/23] chore: make ci happy --- src/meta-srv/src/election/postgres.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 52bf41825eca..ff368aeb09d6 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -308,9 +308,7 @@ mod tests { .await .context(PostgresExecutionSnafu)?; tokio::spawn(async move { - if let Err(e) = connection.await { - eprintln!("connection error: {}", e); - } + connection.await.context(PostgresExecutionSnafu).unwrap(); }); Ok(client) } From 30bab10438b7988adb28f4800f28470dc6da50db Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Fri, 20 Dec 2024 18:34:59 +0800 Subject: [PATCH 06/23] fix: spawn a background connection thread --- src/meta-srv/src/bootstrap.rs | 9 ++++++++- src/meta-srv/src/election/postgres.rs | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 78c28b6fbf96..5a4cd1284227 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -288,8 +288,15 @@ async fn create_postgres_client(opts: &MetasrvOptions) -> Result Date: Fri, 20 Dec 2024 19:16:01 +0800 Subject: [PATCH 07/23] chore: typo --- src/meta-srv/src/election.rs | 2 +- src/meta-srv/src/election/postgres.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index 05fe6446b882..2b8f011e986b 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -38,7 +38,7 @@ pub enum LeaderChangeMessage { } /// LeaderKey is a key that represents the leader of metasrv. -/// The structure is correponding to [etcd_client::LeaderKey]. +/// The structure is corresponding to [etcd_client::LeaderKey]. pub trait LeaderKey: Send + Sync + Debug { /// The name in byte. name is the election identifier that corresponds to the leadership key. fn name(&self) -> &[u8]; diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 9da2b1dd00dd..ce0b4b08f804 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -444,7 +444,7 @@ mod tests { let handle = tokio::spawn(candidate(leader_value.clone())); handles.push(handle); } - // Wait for candidates registrating themselves. + // Wait for candidates to registrate themselves. tokio::time::sleep(Duration::from_secs(3)).await; let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default(); From 12260fee7ea00553db4782669a1c787d80f8f4df Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Fri, 20 Dec 2024 19:36:49 +0800 Subject: [PATCH 08/23] fix: shadow the election client for now --- src/meta-srv/src/bootstrap.rs | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 5a4cd1284227..cdc9f579c2c2 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -45,13 +45,7 @@ use tonic::codec::CompressionEncoding; use tonic::transport::server::{Router, TcpIncoming}; use crate::election::etcd::EtcdElection; -#[cfg(feature = "pg_kvbackend")] -use crate::election::postgres::PgElection; -#[cfg(feature = "pg_kvbackend")] -use crate::election::CANDIDATE_LEASE_SECS; -#[cfg(feature = "pg_kvbackend")] -use crate::error::InvalidArgumentsSnafu; -use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu}; +use crate::error::{InitExportMetricsTaskSnafu, InvalidArgumentsSnafu, TomlFormatSnafu}; use crate::metasrv::builder::MetasrvBuilder; use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef}; use crate::selector::lease_based::LeaseBasedSelector; @@ -231,15 +225,7 @@ pub async fn metasrv_builder( let kv_backend = PgStore::with_pg_client(pg_client) .await .context(error::KvBackendSnafu)?; - let election_client = create_postgres_client(opts).await?; - let election = PgElection::with_pg_client( - opts.server_addr.clone(), - election_client, - opts.store_key_prefix.clone(), - CANDIDATE_LEASE_SECS, - ) - .await?; - (kv_backend, Some(election)) + (kv_backend, None) } }; From ab945834fb0cde5d7fa35b5ab400bf55a24b46ae Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Fri, 20 Dec 2024 19:45:50 +0800 Subject: [PATCH 09/23] fix: fix ci --- src/meta-srv/src/bootstrap.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index cdc9f579c2c2..3dfa03d66774 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -45,7 +45,9 @@ use tonic::codec::CompressionEncoding; use tonic::transport::server::{Router, TcpIncoming}; use crate::election::etcd::EtcdElection; -use crate::error::{InitExportMetricsTaskSnafu, InvalidArgumentsSnafu, TomlFormatSnafu}; +#[cfg(feature = "pg_kvbackend")] +use crate::error::InvalidArgumentsSnafu; +use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu}; use crate::metasrv::builder::MetasrvBuilder; use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef}; use crate::selector::lease_based::LeaseBasedSelector; From 5d263549e03519547e5febfdd7a268ba4e3b6eb1 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Sat, 21 Dec 2024 17:09:19 +0800 Subject: [PATCH 10/23] chore: readability --- src/meta-srv/src/election/postgres.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index ce0b4b08f804..36c6654b2d64 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -40,6 +40,7 @@ struct ValueWithLease { } /// PostgreSql implementation of Election. +/// TODO: Currently only support candidate registration. Add election logic. pub struct PgElection { leader_value: String, client: Client, @@ -47,7 +48,7 @@ pub struct PgElection { infancy: AtomicBool, leader_watcher: broadcast::Sender, store_key_prefix: String, - candidate_lease_ttl: u64, + candidate_lease_ttl_secs: u64, } impl PgElection { @@ -55,7 +56,7 @@ impl PgElection { leader_value: String, client: Client, store_key_prefix: String, - candidate_lease_ttl: u64, + candidate_lease_ttl_secs: u64, ) -> Result { let (tx, _) = broadcast::channel(100); Ok(Arc::new(Self { @@ -65,7 +66,7 @@ impl PgElection { infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix, - candidate_lease_ttl, + candidate_lease_ttl_secs, })) } @@ -112,7 +113,7 @@ impl Election for PgElection { .duration_since(time::UNIX_EPOCH) .unwrap_or_default() .as_secs_f64() - + self.candidate_lease_ttl as f64, + + self.candidate_lease_ttl_secs as f64, }; let res = self.put_value_with_lease(&key, &value_with_lease).await?; // May registered before, just update the lease. @@ -123,7 +124,7 @@ impl Election for PgElection { // Check if the current lease has expired and renew the lease. let mut keep_alive_interval = - tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl / 2)); + tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl_secs / 2)); loop { let _ = keep_alive_interval.tick().await; @@ -357,7 +358,7 @@ mod tests { infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: "test_prefix".to_string(), - candidate_lease_ttl: 10, + candidate_lease_ttl_secs: 10, }; let res = pg_election @@ -423,7 +424,7 @@ mod tests { infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: "test_prefix".to_string(), - candidate_lease_ttl: 10, + candidate_lease_ttl_secs: 10, }; let node_info = MetasrvNodeInfo { @@ -459,7 +460,7 @@ mod tests { infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: "test_prefix".to_string(), - candidate_lease_ttl: 5, + candidate_lease_ttl_secs: 5, }; let candidates = pg_election.all_candidates().await.unwrap(); From f66e9f6bd21fd4778864e94515eb0e4d8c2ce36f Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Mon, 23 Dec 2024 13:42:17 +0800 Subject: [PATCH 11/23] chore: follow review comments --- src/meta-srv/src/election.rs | 8 ++++---- src/meta-srv/src/election/etcd.rs | 24 +++++++++++++++--------- src/meta-srv/src/election/postgres.rs | 2 +- 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index 2b8f011e986b..217f3abb1c3c 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -48,10 +48,10 @@ pub trait LeaderKey: Send + Sync + Debug { fn key(&self) -> &[u8]; /// The creation revision of the key. - fn rev(&self) -> i64; + fn rev_id(&self) -> i64; /// The lease ID of the election leader. - fn lease(&self) -> i64; + fn lease_id(&self) -> i64; } impl fmt::Display for LeaderChangeMessage { @@ -69,8 +69,8 @@ impl fmt::Display for LeaderChangeMessage { write!(f, "LeaderKey {{ ")?; write!(f, "name: {}", String::from_utf8_lossy(leader_key.name()))?; write!(f, ", key: {}", String::from_utf8_lossy(leader_key.key()))?; - write!(f, ", rev: {}", leader_key.rev())?; - write!(f, ", lease: {}", leader_key.lease())?; + write!(f, ", rev: {}", leader_key.rev_id())?; + write!(f, ", lease: {}", leader_key.lease_id())?; write!(f, " }})") } } diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index 274a6a7e4792..9d73cd3d3031 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -44,11 +44,11 @@ impl LeaderKey for EtcdLeaderKey { self.key() } - fn rev(&self) -> i64 { + fn rev_id(&self) -> i64 { self.rev() } - fn lease(&self) -> i64 { + fn lease_id(&self) -> i64 { self.lease() } } @@ -99,14 +99,14 @@ impl EtcdElection { info!( "[{leader_ident}] is elected as leader: {:?}, lease: {}", String::from_utf8_lossy(key.name()), - key.lease() + key.lease_id() ); } LeaderChangeMessage::StepDown(key) => { warn!( "[{leader_ident}] is stepping down: {:?}, lease: {}", String::from_utf8_lossy(key.name()), - key.lease() + key.lease_id() ); } }, @@ -282,9 +282,12 @@ impl Election for EtcdElection { .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) .is_ok() { - self.leader_watcher + if let Err(e) = self + .leader_watcher .send(LeaderChangeMessage::StepDown(Arc::new(leader.clone()))) - .context(error::SendLeaderChangeSnafu)?; + { + error!(e; "Failed to send leader change message"); + } } } @@ -339,9 +342,12 @@ impl EtcdElection { { self.infancy.store(true, Ordering::Relaxed); - self.leader_watcher - .send(LeaderChangeMessage::Elected(Arc::new(leader))) - .context(error::SendLeaderChangeSnafu)?; + if let Err(e) = self + .leader_watcher + .send(LeaderChangeMessage::Elected(Arc::new(leader.clone()))) + { + error!(e; "Failed to send leader change message"); + } } } diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 36c6654b2d64..19abe8f439d4 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -40,7 +40,7 @@ struct ValueWithLease { } /// PostgreSql implementation of Election. -/// TODO: Currently only support candidate registration. Add election logic. +/// TODO(CookiePie): Currently only support candidate registration. Add election logic. pub struct PgElection { leader_value: String, client: Client, From 4014428a0acbd700197dbef97921c63725ef6f95 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Mon, 23 Dec 2024 15:58:26 +0800 Subject: [PATCH 12/23] refactor: use kvbackend for pg election --- src/meta-srv/Cargo.toml | 2 +- src/meta-srv/src/election/postgres.rs | 132 +++++++++++--------------- src/meta-srv/src/error.rs | 10 -- 3 files changed, 59 insertions(+), 85 deletions(-) diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 71430ee992af..8b06126fe88e 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -6,7 +6,7 @@ license.workspace = true [features] mock = [] -pg_kvbackend = ["dep:tokio-postgres"] +pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend"] [lints] workspace = true diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 19abe8f439d4..1dac4acbc642 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -16,19 +16,17 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{self, Duration}; -use common_meta::kv_backend::postgres::{ - CAS, POINT_DELETE, POINT_GET, PREFIX_SCAN, PUT_IF_NOT_EXISTS, -}; +use common_meta::kv_backend::KvBackendRef; +use common_meta::rpc::store::{CompareAndPutRequest, PutRequest, RangeRequest}; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; use tokio::sync::broadcast; -use tokio_postgres::Client; use crate::election::{ Election, LeaderChangeMessage, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY, }; use crate::error::{ - DeserializeFromJsonSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu, UnexpectedSnafu, + DeserializeFromJsonSnafu, KvBackendSnafu, Result, SerializeToJsonSnafu, UnexpectedSnafu, }; use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; @@ -43,7 +41,7 @@ struct ValueWithLease { /// TODO(CookiePie): Currently only support candidate registration. Add election logic. pub struct PgElection { leader_value: String, - client: Client, + kv_backend: KvBackendRef, is_leader: AtomicBool, infancy: AtomicBool, leader_watcher: broadcast::Sender, @@ -54,14 +52,14 @@ pub struct PgElection { impl PgElection { pub async fn with_pg_client( leader_value: String, - client: Client, + kv_backend: KvBackendRef, store_key_prefix: String, candidate_lease_ttl_secs: u64, ) -> Result { let (tx, _) = broadcast::channel(100); Ok(Arc::new(Self { leader_value, - client, + kv_backend, is_leader: AtomicBool::new(false), infancy: AtomicBool::new(true), leader_watcher: tx, @@ -188,13 +186,13 @@ impl Election for PgElection { impl PgElection { async fn get_value_with_lease(&self, key: &String) -> Result> { let prev = self - .client - .query(POINT_GET, &[key]) + .kv_backend + .get(key.as_bytes()) .await - .context(PostgresExecutionSnafu)?; + .context(KvBackendSnafu)?; - if let Some(row) = prev.first() { - let value: String = row.get(1); + if let Some(kv) = prev { + let value: String = String::from_utf8_lossy(kv.value()).to_string(); let value_with_lease: ValueWithLease = serde_json::from_str(&value).with_context(|_| DeserializeFromJsonSnafu { input: format!("{value:?}"), @@ -209,24 +207,24 @@ impl PgElection { &self, key_prefix: &String, ) -> Result> { - let key_prefix = format!("{}%", key_prefix); - let prev = self - .client - .query(PREFIX_SCAN, &[&key_prefix]) + let range_request = RangeRequest::new().with_prefix(key_prefix.clone()); + let res = self + .kv_backend + .range(range_request) .await - .context(PostgresExecutionSnafu)?; + .context(KvBackendSnafu)?; - let mut res = Vec::new(); - for row in prev { - let value: String = row.get(1); + let mut value_with_leases = Vec::with_capacity(res.kvs.len()); + for kv in res.kvs { + let value: String = String::from_utf8_lossy(kv.value()).to_string(); let value_with_lease: ValueWithLease = serde_json::from_str(&value).with_context(|_| DeserializeFromJsonSnafu { input: format!("{value:?}"), })?; - res.push(value_with_lease); + value_with_leases.push(value_with_lease); } - Ok(res) + Ok(value_with_leases) } async fn update_value_with_lease( @@ -238,21 +236,23 @@ impl PgElection { let prev = serde_json::to_string(prev).with_context(|_| SerializeToJsonSnafu { input: format!("{prev:?}"), })?; - let updated = serde_json::to_string(updated).with_context(|_| SerializeToJsonSnafu { input: format!("{updated:?}"), })?; + let cas_request = CompareAndPutRequest::new() + .with_key(key.clone()) + .with_expect(updated) + .with_value(prev); let res = self - .client - .query(CAS, &[key, &prev, &updated]) + .kv_backend + .compare_and_put(cas_request) .await - .context(PostgresExecutionSnafu)?; + .context(KvBackendSnafu)?; - // CAS operation will return the updated value if the operation is successful - match res.is_empty() { - false => Ok(()), - true => UnexpectedSnafu { + match res.success { + true => Ok(()), + false => UnexpectedSnafu { violated: format!( "CAS operation failed, key: {:?}", String::from_utf8_lossy(&key.clone().into_bytes()) @@ -268,25 +268,26 @@ impl PgElection { input: format!("{value:?}"), })?; + let put_request = PutRequest::new().with_key(key.clone()).with_value(value); let res = self - .client - .query(PUT_IF_NOT_EXISTS, &[key, &value]) + .kv_backend + .put(put_request) .await - .context(PostgresExecutionSnafu)?; + .context(KvBackendSnafu)?; - Ok(res.is_empty()) + Ok(res.prev_kv.is_none()) } /// Returns `true` if the deletion is successful. /// Caution: Should only delete the key if the lease is expired. async fn delete_value(&self, key: &String) -> Result { let res = self - .client - .query(POINT_DELETE, &[key]) + .kv_backend + .delete(key.as_bytes(), false) .await - .context(PostgresExecutionSnafu)?; + .context(KvBackendSnafu)?; - Ok(res.len() == 1) + Ok(res.is_none()) } } @@ -294,18 +295,20 @@ impl PgElection { mod tests { use std::env; - use tokio_postgres::NoTls; + use common_meta::kv_backend::postgres::PgStore; + use tokio_postgres::{Client, NoTls}; use super::*; + use crate::error::PostgresExecutionSnafu; - async fn create_postgres_client(addr: &str) -> Result { - if addr.is_empty() { + async fn create_postgres_client(endpoint: &str) -> Result { + if endpoint.is_empty() { return UnexpectedSnafu { - violated: "Postgres address is empty".to_string(), + violated: "Postgres endpoint is empty".to_string(), } .fail(); } - let (client, connection) = tokio_postgres::connect(addr, NoTls) + let (client, connection) = tokio_postgres::connect(endpoint, NoTls) .await .context(PostgresExecutionSnafu)?; tokio::spawn(async move { @@ -314,35 +317,18 @@ mod tests { Ok(client) } - #[tokio::test] - async fn temp_test() { + async fn create_pg_kvbackend() -> Result { let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default(); - let client = create_postgres_client(&endpoint).await.unwrap(); - client - .execute( - "CREATE TABLE IF NOT EXISTS greptime_metakv(k varchar PRIMARY KEY, v varchar)", - &[], - ) - .await - .expect("Failed to create metadkv table"); - - client - .execute("DELETE FROM greptime_metakv", &[]) + let client = create_postgres_client(&endpoint).await?; + let kv_backend = PgStore::with_pg_client(client) .await - .expect("Failed to delete metakv table"); + .context(KvBackendSnafu)?; + Ok(kv_backend) } #[tokio::test] async fn test_postgres_crud() { - let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default(); - let client = create_postgres_client(&endpoint).await.unwrap(); - client - .execute( - "CREATE TABLE IF NOT EXISTS greptime_metakv(k varchar PRIMARY KEY, v varchar)", - &[], - ) - .await - .expect("Failed to create metadkv table"); + let kv_backend = create_pg_kvbackend().await.unwrap(); let key = "test_key".to_string(); let value = ValueWithLease { @@ -353,7 +339,7 @@ mod tests { let (tx, _) = broadcast::channel(100); let pg_election = PgElection { leader_value: "test_leader".to_string(), - client, + kv_backend, is_leader: AtomicBool::new(false), infancy: AtomicBool::new(true), leader_watcher: tx, @@ -413,13 +399,12 @@ mod tests { } async fn candidate(leader_value: String) { - let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default(); - let client = create_postgres_client(&endpoint).await.unwrap(); + let kv_backend = create_pg_kvbackend().await.unwrap(); let (tx, _) = broadcast::channel(100); let pg_election = PgElection { leader_value, - client, + kv_backend, is_leader: AtomicBool::new(false), infancy: AtomicBool::new(true), leader_watcher: tx, @@ -448,14 +433,13 @@ mod tests { // Wait for candidates to registrate themselves. tokio::time::sleep(Duration::from_secs(3)).await; - let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default(); - let client = create_postgres_client(&endpoint).await.unwrap(); + let kv_backend = create_pg_kvbackend().await.unwrap(); let (tx, _) = broadcast::channel(100); let leader_value = "test_leader".to_string(); let pg_election = PgElection { leader_value, - client, + kv_backend, is_leader: AtomicBool::new(false), infancy: AtomicBool::new(true), leader_watcher: tx, diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 60f04eccff8a..ddc9d3658bad 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -24,7 +24,6 @@ use table::metadata::TableId; use tokio::sync::mpsc::error::SendError; use tonic::codegen::http; -use crate::election::LeaderChangeMessage; use crate::metasrv::SelectTarget; use crate::pubsub::Message; @@ -720,14 +719,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to send leader change message"))] - SendLeaderChange { - #[snafu(implicit)] - location: Location, - #[snafu(source)] - error: tokio::sync::broadcast::error::SendError, - }, - #[snafu(display("Flow state handler error"))] FlowStateHandler { #[snafu(implicit)] @@ -776,7 +767,6 @@ impl ErrorExt for Error { | Error::StartGrpc { .. } | Error::NoEnoughAvailableNode { .. } | Error::PublishMessage { .. } - | Error::SendLeaderChange { .. } | Error::Join { .. } | Error::PeerUnavailable { .. } | Error::ExceededDeadline { .. } From b3d3ed836b0df65291ec3d24c6f0d02fd60f058e Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Mon, 23 Dec 2024 15:59:58 +0800 Subject: [PATCH 13/23] chore: rename --- src/meta-srv/src/election.rs | 4 ++-- src/meta-srv/src/election/etcd.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index 217f3abb1c3c..22222d6cc7bb 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -48,7 +48,7 @@ pub trait LeaderKey: Send + Sync + Debug { fn key(&self) -> &[u8]; /// The creation revision of the key. - fn rev_id(&self) -> i64; + fn revision(&self) -> i64; /// The lease ID of the election leader. fn lease_id(&self) -> i64; @@ -69,7 +69,7 @@ impl fmt::Display for LeaderChangeMessage { write!(f, "LeaderKey {{ ")?; write!(f, "name: {}", String::from_utf8_lossy(leader_key.name()))?; write!(f, ", key: {}", String::from_utf8_lossy(leader_key.key()))?; - write!(f, ", rev: {}", leader_key.rev_id())?; + write!(f, ", rev: {}", leader_key.revision())?; write!(f, ", lease: {}", leader_key.lease_id())?; write!(f, " }})") } diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index 9d73cd3d3031..edb72fddf8b7 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -44,7 +44,7 @@ impl LeaderKey for EtcdLeaderKey { self.key() } - fn rev_id(&self) -> i64 { + fn revision(&self) -> i64 { self.rev() } From b8ed37ecb1a1d597c0ffb189495d0f3b96ff46ea Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Mon, 23 Dec 2024 16:06:40 +0800 Subject: [PATCH 14/23] chore: make clippy happy --- src/meta-srv/src/election/postgres.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 1dac4acbc642..1c59103ca889 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -205,9 +205,9 @@ impl PgElection { async fn get_value_with_lease_by_prefix( &self, - key_prefix: &String, + key_prefix: &str, ) -> Result> { - let range_request = RangeRequest::new().with_prefix(key_prefix.clone()); + let range_request = RangeRequest::new().with_prefix(key_prefix); let res = self .kv_backend .range(range_request) @@ -229,7 +229,7 @@ impl PgElection { async fn update_value_with_lease( &self, - key: &String, + key: &str, prev: &ValueWithLease, updated: &ValueWithLease, ) -> Result<()> { @@ -241,7 +241,7 @@ impl PgElection { })?; let cas_request = CompareAndPutRequest::new() - .with_key(key.clone()) + .with_key(key) .with_expect(updated) .with_value(prev); let res = self @@ -255,7 +255,7 @@ impl PgElection { false => UnexpectedSnafu { violated: format!( "CAS operation failed, key: {:?}", - String::from_utf8_lossy(&key.clone().into_bytes()) + String::from_utf8_lossy(key.as_bytes()) ), } .fail(), @@ -263,12 +263,12 @@ impl PgElection { } /// Returns `true` if the insertion is successful - async fn put_value_with_lease(&self, key: &String, value: &ValueWithLease) -> Result { + async fn put_value_with_lease(&self, key: &str, value: &ValueWithLease) -> Result { let value = serde_json::to_string(value).with_context(|_| SerializeToJsonSnafu { input: format!("{value:?}"), })?; - let put_request = PutRequest::new().with_key(key.clone()).with_value(value); + let put_request = PutRequest::new().with_key(key).with_value(value); let res = self .kv_backend .put(put_request) From 1b77dd27f65290374fdd15d16cac8c691315f8a7 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Wed, 25 Dec 2024 17:44:05 +0800 Subject: [PATCH 15/23] refactor: use pg server time instead of local ones --- src/common/meta/src/kv_backend/postgres.rs | 6 +- src/meta-srv/Cargo.toml | 3 +- src/meta-srv/src/election/postgres.rs | 365 ++++++++++++--------- 3 files changed, 211 insertions(+), 163 deletions(-) diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs index 23c506761a9d..aa95c0634f30 100644 --- a/src/common/meta/src/kv_backend/postgres.rs +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -45,9 +45,9 @@ const METADKV_CREATION: &str = const FULL_TABLE_SCAN: &str = "SELECT k, v FROM greptime_metakv $1 ORDER BY K"; -pub const POINT_GET: &str = "SELECT k, v FROM greptime_metakv WHERE k = $1"; +const POINT_GET: &str = "SELECT k, v FROM greptime_metakv WHERE k = $1"; -pub const PREFIX_SCAN: &str = "SELECT k, v FROM greptime_metakv WHERE k LIKE $1 ORDER BY K"; +const PREFIX_SCAN: &str = "SELECT k, v FROM greptime_metakv WHERE k LIKE $1 ORDER BY K"; const RANGE_SCAN_LEFT_BOUNDED: &str = "SELECT k, v FROM greptime_metakv WHERE k >= $1 ORDER BY K"; @@ -56,7 +56,7 @@ const RANGE_SCAN_FULL_RANGE: &str = const FULL_TABLE_DELETE: &str = "DELETE FROM greptime_metakv RETURNING k,v"; -pub const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE K = $1 RETURNING k,v;"; +const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE K = $1 RETURNING k,v;"; const PREFIX_DELETE: &str = "DELETE FROM greptime_metakv WHERE k LIKE $1 RETURNING k,v;"; diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 8b06126fe88e..13975ff95091 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -14,6 +14,7 @@ workspace = true [dependencies] api.workspace = true async-trait = "0.1" +chrono.workspace = true clap.workspace = true client.workspace = true common-base.workspace = true @@ -55,7 +56,7 @@ snafu.workspace = true store-api.workspace = true table.workspace = true tokio.workspace = true -tokio-postgres = { workspace = true, optional = true } +tokio-postgres = { workspace = true, optional = true, features = ["with-chrono-0_4"] } tokio-stream = { workspace = true, features = ["net"] } toml.workspace = true tonic.workspace = true diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 1c59103ca889..0113327580a2 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -14,34 +14,79 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::time::{self, Duration}; +use std::time::Duration; -use common_meta::kv_backend::KvBackendRef; -use common_meta::rpc::store::{CompareAndPutRequest, PutRequest, RangeRequest}; -use serde::{Deserialize, Serialize}; +use common_time::Timestamp; +use itertools::Itertools; use snafu::{ensure, ResultExt}; use tokio::sync::broadcast; +use tokio_postgres::Client; -use crate::election::{ - Election, LeaderChangeMessage, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY, -}; +use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY}; use crate::error::{ - DeserializeFromJsonSnafu, KvBackendSnafu, Result, SerializeToJsonSnafu, UnexpectedSnafu, + DeserializeFromJsonSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu, UnexpectedSnafu, }; use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; -/// Value with a expire time. The expire time is in seconds since UNIX epoch. -#[derive(Debug, Serialize, Deserialize, Default)] -struct ValueWithLease { - value: String, - expire_time: f64, +// Seperator between value and expire time. +const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#; + +// SQL to put a value with expire time. Parameters: key, value, lease_prefix, expire_time +const PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME: &str = r#" +WITH prev AS ( + SELECT k, v FROM greptime_metakv WHERE k = $1 +), insert AS ( + INSERT INTO greptime_metakv + VALUES($1, $2 || $3 || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS')) + ON CONFLICT (k) DO NOTHING +) + +SELECT k, v FROM prev; +"#; + +// SQL to update a value with expire time. Parameters: key, prev_value_with_lease, updated_value, lease_prefix, expire_time +const CAS_WITH_EXPIRE_TIME: &str = r#" +UPDATE greptime_metakv +SET k=$1, +v=$3 || $4 || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $5, 'YYYY-MM-DD HH24:MI:SS.MS') +WHERE + k=$1 AND v=$2 +"#; + +const GET_WITH_CURRENT_TIMESTAMP: &str = r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM greptime_metakv WHERE k = $1"#; + +const PREFIX_GET_WITH_CURRENT_TIMESTAMP: &str = r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM greptime_metakv WHERE k LIKE $1"#; + +const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE k = $1 RETURNING k,v;"; + +/// Parse the value and expire time from the given string. The value should be in the format "value || __metadata_lease_prefix || expire_time". +fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> { + if let Some((value, expire_time)) = value.split(LEASE_SEP).collect_tuple() { + // Given expire_time is in the format 'YYYY-MM-DD HH24:MI:SS.MS' + let expire_time = match Timestamp::from_str(expire_time, None) { + Ok(ts) => ts, + Err(_) => UnexpectedSnafu { + violated: format!("Invalid timestamp: {}", expire_time), + } + .fail()?, + }; + Ok((value.to_string(), expire_time)) + } else { + UnexpectedSnafu { + violated: format!( + "Invalid value {}, expect node info || {} || expire time", + value, LEASE_SEP + ), + } + .fail() + } } /// PostgreSql implementation of Election. /// TODO(CookiePie): Currently only support candidate registration. Add election logic. pub struct PgElection { leader_value: String, - kv_backend: KvBackendRef, + client: Client, is_leader: AtomicBool, infancy: AtomicBool, leader_watcher: broadcast::Sender, @@ -52,14 +97,14 @@ pub struct PgElection { impl PgElection { pub async fn with_pg_client( leader_value: String, - kv_backend: KvBackendRef, + client: Client, store_key_prefix: String, candidate_lease_ttl_secs: u64, ) -> Result { let (tx, _) = broadcast::channel(100); Ok(Arc::new(Self { leader_value, - kv_backend, + client, is_leader: AtomicBool::new(false), infancy: AtomicBool::new(true), leader_watcher: tx, @@ -99,25 +144,18 @@ impl Election for PgElection { .is_ok() } + /// TODO(CookiePie): Split the candidate registration and keep alive logic into separate methods, so that upper layers can call them separately. async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> { let key = self.candidate_key(); let node_info = serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu { input: format!("{node_info:?}"), })?; - let value_with_lease = ValueWithLease { - value: node_info, - expire_time: time::SystemTime::now() - .duration_since(time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs_f64() - + self.candidate_lease_ttl_secs as f64, - }; - let res = self.put_value_with_lease(&key, &value_with_lease).await?; + let res = self.put_value_with_lease(&key, &node_info).await?; // May registered before, just update the lease. if !res { self.delete_value(&key).await?; - self.put_value_with_lease(&key, &value_with_lease).await?; + self.put_value_with_lease(&key, &node_info).await?; } // Check if the current lease has expired and renew the lease. @@ -126,14 +164,13 @@ impl Election for PgElection { loop { let _ = keep_alive_interval.tick().await; - let prev = self.get_value_with_lease(&key).await?.unwrap_or_default(); - let now = time::SystemTime::now() - .duration_since(time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs_f64(); + let (_, prev_expire_time, current_time, origin) = self + .get_value_with_lease(&key, true) + .await? + .unwrap_or_default(); ensure!( - prev.expire_time > now, + prev_expire_time > current_time, UnexpectedSnafu { violated: format!( "Candidate lease expired, key: {:?}", @@ -142,28 +179,23 @@ impl Election for PgElection { } ); - let updated = ValueWithLease { - value: prev.value.clone(), - expire_time: now + CANDIDATE_LEASE_SECS as f64, - }; - self.update_value_with_lease(&key, &prev, &updated).await?; + // Safety: origin is Some since we are using `get_value_with_lease` with `true`. + let origin = origin.unwrap(); + self.update_value_with_lease(&key, &origin, &node_info) + .await?; } } async fn all_candidates(&self) -> Result> { let key_prefix = self.candidate_root(); - let mut candidates = self.get_value_with_lease_by_prefix(&key_prefix).await?; - let now = time::SystemTime::now() - .duration_since(time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs_f64(); + let (mut candidates, current) = self.get_value_with_lease_by_prefix(&key_prefix).await?; // Remove expired candidates - candidates.retain(|c| c.expire_time > now); + candidates.retain(|c| c.1 > current); let mut valid_candidates = Vec::with_capacity(candidates.len()); - for c in candidates { + for (c, _) in candidates { let node_info: MetasrvNodeInfo = - serde_json::from_str(&c.value).with_context(|_| DeserializeFromJsonSnafu { - input: format!("{:?}", c.value), + serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu { + input: format!("{:?}", c), })?; valid_candidates.push(node_info); } @@ -184,110 +216,132 @@ impl Election for PgElection { } impl PgElection { - async fn get_value_with_lease(&self, key: &String) -> Result> { - let prev = self - .kv_backend - .get(key.as_bytes()) + /// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned. + async fn get_value_with_lease( + &self, + key: &String, + with_origin: bool, + ) -> Result)>> { + let res = self + .client + .query(GET_WITH_CURRENT_TIMESTAMP, &[&key]) .await - .context(KvBackendSnafu)?; + .context(PostgresExecutionSnafu)?; - if let Some(kv) = prev { - let value: String = String::from_utf8_lossy(kv.value()).to_string(); - let value_with_lease: ValueWithLease = - serde_json::from_str(&value).with_context(|_| DeserializeFromJsonSnafu { - input: format!("{value:?}"), - })?; - Ok(Some(value_with_lease)) - } else { + if res.is_empty() { Ok(None) + } else { + let current_time_str = res[0].get(1); + let current_time = match Timestamp::from_str(current_time_str, None) { + Ok(ts) => ts, + Err(_) => UnexpectedSnafu { + violated: format!("Invalid timestamp: {}", current_time_str), + } + .fail()?, + }; + + let value_and_expire_time = res[0].get(0); + let (value, expire_time) = parse_value_and_expire_time(value_and_expire_time)?; + + if with_origin { + Ok(Some(( + value, + expire_time, + current_time, + Some(value_and_expire_time.to_string()), + ))) + } else { + Ok(Some((value, expire_time, current_time, None))) + } } } + /// Returns all values and expire time with the given key prefix. Also returns the current time. async fn get_value_with_lease_by_prefix( &self, key_prefix: &str, - ) -> Result> { - let range_request = RangeRequest::new().with_prefix(key_prefix); + ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> { + let key_prefix = format!("{}%", key_prefix); let res = self - .kv_backend - .range(range_request) + .client + .query(PREFIX_GET_WITH_CURRENT_TIMESTAMP, &[&key_prefix]) .await - .context(KvBackendSnafu)?; - - let mut value_with_leases = Vec::with_capacity(res.kvs.len()); - for kv in res.kvs { - let value: String = String::from_utf8_lossy(kv.value()).to_string(); - let value_with_lease: ValueWithLease = - serde_json::from_str(&value).with_context(|_| DeserializeFromJsonSnafu { - input: format!("{value:?}"), - })?; - value_with_leases.push(value_with_lease); - } + .context(PostgresExecutionSnafu)?; + + let mut values_with_leases = vec![]; + let mut current = Timestamp::default(); + for row in res { + let current_time_str = row.get(1); + current = match Timestamp::from_str(current_time_str, None) { + Ok(ts) => ts, + Err(_) => UnexpectedSnafu { + violated: format!("Invalid timestamp: {}", current_time_str), + } + .fail()?, + }; + + let value_and_expire_time = row.get(0); + let (value, expire_time) = parse_value_and_expire_time(value_and_expire_time)?; - Ok(value_with_leases) + values_with_leases.push((value, expire_time)); + } + Ok((values_with_leases, current)) } - async fn update_value_with_lease( - &self, - key: &str, - prev: &ValueWithLease, - updated: &ValueWithLease, - ) -> Result<()> { - let prev = serde_json::to_string(prev).with_context(|_| SerializeToJsonSnafu { - input: format!("{prev:?}"), - })?; - let updated = serde_json::to_string(updated).with_context(|_| SerializeToJsonSnafu { - input: format!("{updated:?}"), - })?; - - let cas_request = CompareAndPutRequest::new() - .with_key(key) - .with_expect(updated) - .with_value(prev); + async fn update_value_with_lease(&self, key: &str, prev: &str, updated: &str) -> Result<()> { let res = self - .kv_backend - .compare_and_put(cas_request) + .client + .execute( + CAS_WITH_EXPIRE_TIME, + &[ + &key, + &prev, + &updated, + &LEASE_SEP, + &(self.candidate_lease_ttl_secs as f64), + ], + ) .await - .context(KvBackendSnafu)?; - - match res.success { - true => Ok(()), - false => UnexpectedSnafu { - violated: format!( - "CAS operation failed, key: {:?}", - String::from_utf8_lossy(key.as_bytes()) - ), + .context(PostgresExecutionSnafu)?; + + ensure!( + res == 1, + UnexpectedSnafu { + violated: format!("Failed to update key: {}", key), } - .fail(), - } + ); + + Ok(()) } /// Returns `true` if the insertion is successful - async fn put_value_with_lease(&self, key: &str, value: &ValueWithLease) -> Result { - let value = serde_json::to_string(value).with_context(|_| SerializeToJsonSnafu { - input: format!("{value:?}"), - })?; - - let put_request = PutRequest::new().with_key(key).with_value(value); + async fn put_value_with_lease(&self, key: &str, value: &str) -> Result { let res = self - .kv_backend - .put(put_request) + .client + .query( + PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME, + &[ + &key, + &value, + &LEASE_SEP, + &(self.candidate_lease_ttl_secs as f64), + ], + ) .await - .context(KvBackendSnafu)?; - - Ok(res.prev_kv.is_none()) + .context(PostgresExecutionSnafu)?; + Ok(res.is_empty()) } /// Returns `true` if the deletion is successful. /// Caution: Should only delete the key if the lease is expired. async fn delete_value(&self, key: &String) -> Result { let res = self - .kv_backend - .delete(key.as_bytes(), false) + .client + .query(POINT_DELETE, &[&key]) .await - .context(KvBackendSnafu)?; + .context(PostgresExecutionSnafu)?; - Ok(res.is_none()) + Ok(res.len() == 1) } } @@ -295,20 +349,20 @@ impl PgElection { mod tests { use std::env; - use common_meta::kv_backend::postgres::PgStore; use tokio_postgres::{Client, NoTls}; use super::*; use crate::error::PostgresExecutionSnafu; - async fn create_postgres_client(endpoint: &str) -> Result { + async fn create_postgres_client() -> Result { + let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default(); if endpoint.is_empty() { return UnexpectedSnafu { violated: "Postgres endpoint is empty".to_string(), } .fail(); } - let (client, connection) = tokio_postgres::connect(endpoint, NoTls) + let (client, connection) = tokio_postgres::connect(&endpoint, NoTls) .await .context(PostgresExecutionSnafu)?; tokio::spawn(async move { @@ -317,29 +371,17 @@ mod tests { Ok(client) } - async fn create_pg_kvbackend() -> Result { - let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default(); - let client = create_postgres_client(&endpoint).await?; - let kv_backend = PgStore::with_pg_client(client) - .await - .context(KvBackendSnafu)?; - Ok(kv_backend) - } - #[tokio::test] async fn test_postgres_crud() { - let kv_backend = create_pg_kvbackend().await.unwrap(); + let client = create_postgres_client().await.unwrap(); let key = "test_key".to_string(); - let value = ValueWithLease { - value: "test_value".to_string(), - expire_time: 0.0, - }; + let value = "test_value".to_string(); let (tx, _) = broadcast::channel(100); let pg_election = PgElection { leader_value: "test_leader".to_string(), - kv_backend, + client, is_leader: AtomicBool::new(false), infancy: AtomicBool::new(true), leader_watcher: tx, @@ -353,25 +395,28 @@ mod tests { .unwrap(); assert!(res); - let res = pg_election - .get_value_with_lease(&key) + let (value, _, _, prev) = pg_election + .get_value_with_lease(&key, true) .await .unwrap() .unwrap(); - assert_eq!(res.value, value.value); + assert_eq!(value, value); + + let prev = prev.unwrap(); + pg_election + .update_value_with_lease(&key, &prev, &value) + .await + .unwrap(); let res = pg_election.delete_value(&key).await.unwrap(); assert!(res); - let res = pg_election.get_value_with_lease(&key).await.unwrap(); + let res = pg_election.get_value_with_lease(&key, false).await.unwrap(); assert!(res.is_none()); for i in 0..10 { let key = format!("test_key_{}", i); - let value = ValueWithLease { - value: format!("test_value_{}", i), - expire_time: 0.0, - }; + let value = format!("test_value_{}", i); pg_election .put_value_with_lease(&key, &value) .await @@ -379,7 +424,7 @@ mod tests { } let key_prefix = "test_key".to_string(); - let res = pg_election + let (res, _) = pg_election .get_value_with_lease_by_prefix(&key_prefix) .await .unwrap(); @@ -391,25 +436,26 @@ mod tests { assert!(res); } - let res = pg_election + let (res, current) = pg_election .get_value_with_lease_by_prefix(&key_prefix) .await .unwrap(); assert!(res.is_empty()); + assert!(current == Timestamp::default()); } - async fn candidate(leader_value: String) { - let kv_backend = create_pg_kvbackend().await.unwrap(); + async fn candidate(leader_value: String, candidate_lease_ttl_secs: u64) { + let client = create_postgres_client().await.unwrap(); let (tx, _) = broadcast::channel(100); let pg_election = PgElection { leader_value, - kv_backend, + client, is_leader: AtomicBool::new(false), infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: "test_prefix".to_string(), - candidate_lease_ttl_secs: 10, + candidate_lease_ttl_secs, }; let node_info = MetasrvNodeInfo { @@ -424,27 +470,28 @@ mod tests { #[tokio::test] async fn test_candidate_registration() { let leader_value_prefix = "test_leader".to_string(); + let candidate_lease_ttl_secs = 5; let mut handles = vec![]; for i in 0..10 { let leader_value = format!("{}{}", leader_value_prefix, i); - let handle = tokio::spawn(candidate(leader_value.clone())); + let handle = tokio::spawn(candidate(leader_value, candidate_lease_ttl_secs)); handles.push(handle); } - // Wait for candidates to registrate themselves. - tokio::time::sleep(Duration::from_secs(3)).await; + // Wait for candidates to registrate themselves and renew their leases at least once. + tokio::time::sleep(Duration::from_secs(6)).await; - let kv_backend = create_pg_kvbackend().await.unwrap(); + let client = create_postgres_client().await.unwrap(); let (tx, _) = broadcast::channel(100); let leader_value = "test_leader".to_string(); let pg_election = PgElection { leader_value, - kv_backend, + client, is_leader: AtomicBool::new(false), infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: "test_prefix".to_string(), - candidate_lease_ttl_secs: 5, + candidate_lease_ttl_secs, }; let candidates = pg_election.all_candidates().await.unwrap(); @@ -455,7 +502,7 @@ mod tests { } // Wait for the candidate leases to expire. - tokio::time::sleep(Duration::from_secs(10)).await; + tokio::time::sleep(Duration::from_secs(5)).await; let candidates = pg_election.all_candidates().await.unwrap(); assert!(candidates.is_empty()); From ec5dac3125ad83bc6440f3ea9a6a8bb0eec656c3 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Wed, 25 Dec 2024 17:54:20 +0800 Subject: [PATCH 16/23] chore: typo --- src/meta-srv/src/election/postgres.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 0113327580a2..f9d80e221384 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -28,7 +28,7 @@ use crate::error::{ }; use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; -// Seperator between value and expire time. +// Separator between value and expire time. const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#; // SQL to put a value with expire time. Parameters: key, value, lease_prefix, expire_time From 12d225103322479633015d13134141dd1aa9eb9f Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Wed, 25 Dec 2024 18:04:15 +0800 Subject: [PATCH 17/23] chore: rename infancy to leader_infancy for clarification --- src/meta-srv/src/election.rs | 2 +- src/meta-srv/src/election/etcd.rs | 2 +- src/meta-srv/src/election/postgres.rs | 14 +++++++------- .../src/handler/on_leader_start_handler.rs | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index 22222d6cc7bb..232b1481a921 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -87,7 +87,7 @@ pub trait Election: Send + Sync { /// initialization operations can be performed. /// /// note: a new leader will only return true on the first call. - fn in_infancy(&self) -> bool; + fn in_leader_infancy(&self) -> bool; /// Registers a candidate for the election. async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()>; diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index edb72fddf8b7..617cb3248296 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -149,7 +149,7 @@ impl Election for EtcdElection { self.is_leader.load(Ordering::Relaxed) } - fn in_infancy(&self) -> bool { + fn in_leader_infancy(&self) -> bool { self.infancy .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) .is_ok() diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index f9d80e221384..9d68821db92d 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -88,7 +88,7 @@ pub struct PgElection { leader_value: String, client: Client, is_leader: AtomicBool, - infancy: AtomicBool, + leader_infancy: AtomicBool, leader_watcher: broadcast::Sender, store_key_prefix: String, candidate_lease_ttl_secs: u64, @@ -106,7 +106,7 @@ impl PgElection { leader_value, client, is_leader: AtomicBool::new(false), - infancy: AtomicBool::new(true), + leader_infancy: AtomicBool::new(false), leader_watcher: tx, store_key_prefix, candidate_lease_ttl_secs, @@ -138,8 +138,8 @@ impl Election for PgElection { self.is_leader.load(Ordering::Relaxed) } - fn in_infancy(&self) -> bool { - self.infancy + fn in_leader_infancy(&self) -> bool { + self.leader_infancy .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) .is_ok() } @@ -383,7 +383,7 @@ mod tests { leader_value: "test_leader".to_string(), client, is_leader: AtomicBool::new(false), - infancy: AtomicBool::new(true), + leader_infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: "test_prefix".to_string(), candidate_lease_ttl_secs: 10, @@ -452,7 +452,7 @@ mod tests { leader_value, client, is_leader: AtomicBool::new(false), - infancy: AtomicBool::new(true), + leader_infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: "test_prefix".to_string(), candidate_lease_ttl_secs, @@ -488,7 +488,7 @@ mod tests { leader_value, client, is_leader: AtomicBool::new(false), - infancy: AtomicBool::new(true), + leader_infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: "test_prefix".to_string(), candidate_lease_ttl_secs, diff --git a/src/meta-srv/src/handler/on_leader_start_handler.rs b/src/meta-srv/src/handler/on_leader_start_handler.rs index dccb8d3d60f9..97e1704343b8 100644 --- a/src/meta-srv/src/handler/on_leader_start_handler.rs +++ b/src/meta-srv/src/handler/on_leader_start_handler.rs @@ -36,7 +36,7 @@ impl HeartbeatHandler for OnLeaderStartHandler { return Ok(HandleControl::Continue); }; - if election.in_infancy() { + if election.in_leader_infancy() { ctx.is_infancy = true; // TODO(weny): Unifies the multiple leader state between Context and Metasrv. // we can't ensure the in-memory kv has already been reset in the outside loop. From 98eeaca1595fc4a8b42ea38cf21e0f58e766d7d5 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Thu, 26 Dec 2024 10:47:29 +0800 Subject: [PATCH 18/23] chore: clean up --- src/common/meta/src/kv_backend/postgres.rs | 4 ++-- src/meta-srv/src/election/etcd.rs | 2 +- src/meta-srv/src/election/postgres.rs | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs index aa95c0634f30..8add65cd49c4 100644 --- a/src/common/meta/src/kv_backend/postgres.rs +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -65,7 +65,7 @@ const RANGE_DELETE_LEFT_BOUNDED: &str = "DELETE FROM greptime_metakv WHERE k >= const RANGE_DELETE_FULL_RANGE: &str = "DELETE FROM greptime_metakv WHERE k >= $1 AND K < $2 RETURNING k,v;"; -pub const CAS: &str = r#" +const CAS: &str = r#" WITH prev AS ( SELECT k,v FROM greptime_metakv WHERE k = $1 AND v = $2 ), update AS ( @@ -79,7 +79,7 @@ WHERE SELECT k, v FROM prev; "#; -pub const PUT_IF_NOT_EXISTS: &str = r#" +const PUT_IF_NOT_EXISTS: &str = r#" WITH prev AS ( select k,v from greptime_metakv where k = $1 ), insert AS ( diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index 617cb3248296..5f2cf3342007 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -344,7 +344,7 @@ impl EtcdElection { if let Err(e) = self .leader_watcher - .send(LeaderChangeMessage::Elected(Arc::new(leader.clone()))) + .send(LeaderChangeMessage::Elected(Arc::new(leader))) { error!(e; "Failed to send leader change message"); } diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 9d68821db92d..1a2cf8f7e715 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -31,7 +31,7 @@ use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; // Separator between value and expire time. const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#; -// SQL to put a value with expire time. Parameters: key, value, lease_prefix, expire_time +// SQL to put a value with expire time. Parameters: key, value, LEASE_SEP, expire_time const PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME: &str = r#" WITH prev AS ( SELECT k, v FROM greptime_metakv WHERE k = $1 @@ -44,7 +44,7 @@ WITH prev AS ( SELECT k, v FROM prev; "#; -// SQL to update a value with expire time. Parameters: key, prev_value_with_lease, updated_value, lease_prefix, expire_time +// SQL to update a value with expire time. Parameters: key, prev_value_with_lease, updated_value, LEASE_SEP, expire_time const CAS_WITH_EXPIRE_TIME: &str = r#" UPDATE greptime_metakv SET k=$1, @@ -59,7 +59,7 @@ const PREFIX_GET_WITH_CURRENT_TIMESTAMP: &str = r#"SELECT v, TO_CHAR(CURRENT_TIM const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE k = $1 RETURNING k,v;"; -/// Parse the value and expire time from the given string. The value should be in the format "value || __metadata_lease_prefix || expire_time". +/// Parse the value and expire time from the given string. The value should be in the format "value || LEASE_SEP || expire_time". fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> { if let Some((value, expire_time)) = value.split(LEASE_SEP).collect_tuple() { // Given expire_time is in the format 'YYYY-MM-DD HH24:MI:SS.MS' From c03ab3cebcc0635a7b5294ab484f529bbddac253 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Thu, 26 Dec 2024 13:27:34 +0800 Subject: [PATCH 19/23] chore: follow review comments --- src/meta-srv/src/election/postgres.rs | 37 +++++++++++++-------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 1a2cf8f7e715..e66e2f79d767 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -18,7 +18,7 @@ use std::time::Duration; use common_time::Timestamp; use itertools::Itertools; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::broadcast; use tokio_postgres::Client; @@ -61,25 +61,24 @@ const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE k = $1 RETURNING k /// Parse the value and expire time from the given string. The value should be in the format "value || LEASE_SEP || expire_time". fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> { - if let Some((value, expire_time)) = value.split(LEASE_SEP).collect_tuple() { - // Given expire_time is in the format 'YYYY-MM-DD HH24:MI:SS.MS' - let expire_time = match Timestamp::from_str(expire_time, None) { - Ok(ts) => ts, - Err(_) => UnexpectedSnafu { - violated: format!("Invalid timestamp: {}", expire_time), - } - .fail()?, - }; - Ok((value.to_string(), expire_time)) - } else { - UnexpectedSnafu { + let (value, expire_time) = value + .split(LEASE_SEP) + .collect_tuple() + .context(UnexpectedSnafu { violated: format!( "Invalid value {}, expect node info || {} || expire time", value, LEASE_SEP ), + })?; + // Given expire_time is in the format 'YYYY-MM-DD HH24:MI:SS.MS' + let expire_time = match Timestamp::from_str(expire_time, None) { + Ok(ts) => ts, + Err(_) => UnexpectedSnafu { + violated: format!("Invalid timestamp: {}", expire_time), } - .fail() - } + .fail()?, + }; + Ok((value.to_string(), expire_time)) } /// PostgreSql implementation of Election. @@ -130,10 +129,6 @@ impl PgElection { impl Election for PgElection { type Leader = LeaderValue; - fn subscribe_leader_change(&self) -> broadcast::Receiver { - self.leader_watcher.subscribe() - } - fn is_leader(&self) -> bool { self.is_leader.load(Ordering::Relaxed) } @@ -213,6 +208,10 @@ impl Election for PgElection { async fn resign(&self) -> Result<()> { todo!() } + + fn subscribe_leader_change(&self) -> broadcast::Receiver { + self.leader_watcher.subscribe() + } } impl PgElection { From a82b7d9b41181e6f391a204adb70cbef292e9e15 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Thu, 26 Dec 2024 15:04:06 +0800 Subject: [PATCH 20/23] chore: follow review comments --- src/common/meta/src/kv_backend/postgres.rs | 7 ++++++- src/meta-srv/src/bootstrap.rs | 9 +++++---- src/meta-srv/src/election/postgres.rs | 3 ++- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs index 8add65cd49c4..f67f527871ea 100644 --- a/src/common/meta/src/kv_backend/postgres.rs +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -16,6 +16,7 @@ use std::any::Any; use std::borrow::Cow; use std::sync::Arc; +use common_telemetry::error; use snafu::ResultExt; use tokio_postgres::types::ToSql; use tokio_postgres::{Client, NoTls}; @@ -97,7 +98,11 @@ impl PgStore { let (client, conn) = tokio_postgres::connect(url, NoTls) .await .context(ConnectPostgresSnafu)?; - tokio::spawn(async move { conn.await.context(ConnectPostgresSnafu) }); + tokio::spawn(async move { + if let Err(e) = conn.await { + error!(e; "connection error"); + } + }); Self::with_pg_client(client).await } diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 3dfa03d66774..b4408db7acde 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -26,6 +26,8 @@ use common_meta::kv_backend::memory::MemoryKvBackend; #[cfg(feature = "pg_kvbackend")] use common_meta::kv_backend::postgres::PgStore; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; +#[cfg(feature = "pg_kvbackend")] +use common_telemetry::error; use common_telemetry::info; use etcd_client::Client; use futures::future; @@ -281,10 +283,9 @@ async fn create_postgres_client(opts: &MetasrvOptions) -> Result ts, @@ -238,7 +239,7 @@ impl PgElection { } .fail()?, }; - + // Safety: Checked if res is empty above. let value_and_expire_time = res[0].get(0); let (value, expire_time) = parse_value_and_expire_time(value_and_expire_time)?; From c44e6b9cc886ccda3e4aadda6e5d22798f7d8290 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Thu, 26 Dec 2024 16:18:34 +0800 Subject: [PATCH 21/23] ci: unit test should test all features --- .github/workflows/develop.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 8939453f9dd9..e5c401f99b77 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -697,7 +697,7 @@ jobs: working-directory: tests-integration/fixtures/postgres run: docker compose -f docker-compose-standalone.yml up -d --wait - name: Run nextest cases - run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard + run: cargo llvm-cov nextest --features all --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard env: CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=lld" RUST_BACKTRACE: 1 From 9273f4c1b307dfee0146b7df708871c2db905558 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Thu, 26 Dec 2024 16:23:38 +0800 Subject: [PATCH 22/23] ci: fix --- .github/workflows/develop.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index e5c401f99b77..4995b3734217 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -697,7 +697,7 @@ jobs: working-directory: tests-integration/fixtures/postgres run: docker compose -f docker-compose-standalone.yml up -d --wait - name: Run nextest cases - run: cargo llvm-cov nextest --features all --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard + run: cargo llvm-cov nextest --features all-features --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard env: CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=lld" RUST_BACKTRACE: 1 From fe747c308080cbab38e1c0f5c59a1190dee738fe Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Thu, 26 Dec 2024 16:25:53 +0800 Subject: [PATCH 23/23] ci: just test pg --- .github/workflows/develop.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 4995b3734217..e901dcb721bd 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -697,7 +697,7 @@ jobs: working-directory: tests-integration/fixtures/postgres run: docker compose -f docker-compose-standalone.yml up -d --wait - name: Run nextest cases - run: cargo llvm-cov nextest --features all-features --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard + run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard -F pg_kvbackend env: CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=lld" RUST_BACKTRACE: 1