Skip to content

Commit

Permalink
feat: init PgElection
Browse files Browse the repository at this point in the history
fix: release advisory lock

fix: handle duplicate keys

chore: update comments

fix: unlock if acquired the lock

chore: add TODO and avoid unwrap

refactor: check both lock and expire time, add more comments

chore: fmt

fix: deal with multiple edge cases

feat: init PgElection with candidate registration

chore: fmt

chore: remove
  • Loading branch information
CookiePieWw committed Dec 20, 2024
1 parent a578eea commit 089a04b
Show file tree
Hide file tree
Showing 6 changed files with 380 additions and 32 deletions.
10 changes: 5 additions & 5 deletions src/common/meta/src/kv_backend/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ const METADKV_CREATION: &str =

const FULL_TABLE_SCAN: &str = "SELECT k, v FROM greptime_metakv $1 ORDER BY K";

const POINT_GET: &str = "SELECT k, v FROM greptime_metakv WHERE k = $1";
pub const POINT_GET: &str = "SELECT k, v FROM greptime_metakv WHERE k = $1";

const PREFIX_SCAN: &str = "SELECT k, v FROM greptime_metakv WHERE k LIKE $1 ORDER BY K";
pub const PREFIX_SCAN: &str = "SELECT k, v FROM greptime_metakv WHERE k LIKE $1 ORDER BY K";

const RANGE_SCAN_LEFT_BOUNDED: &str = "SELECT k, v FROM greptime_metakv WHERE k >= $1 ORDER BY K";

Expand All @@ -56,7 +56,7 @@ const RANGE_SCAN_FULL_RANGE: &str =

const FULL_TABLE_DELETE: &str = "DELETE FROM greptime_metakv RETURNING k,v";

const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE K = $1 RETURNING k,v;";
pub const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE K = $1 RETURNING k,v;";

const PREFIX_DELETE: &str = "DELETE FROM greptime_metakv WHERE k LIKE $1 RETURNING k,v;";

Expand All @@ -65,7 +65,7 @@ const RANGE_DELETE_LEFT_BOUNDED: &str = "DELETE FROM greptime_metakv WHERE k >=
const RANGE_DELETE_FULL_RANGE: &str =
"DELETE FROM greptime_metakv WHERE k >= $1 AND K < $2 RETURNING k,v;";

const CAS: &str = r#"
pub const CAS: &str = r#"
WITH prev AS (
SELECT k,v FROM greptime_metakv WHERE k = $1 AND v = $2
), update AS (
Expand All @@ -79,7 +79,7 @@ WHERE
SELECT k, v FROM prev;
"#;

const PUT_IF_NOT_EXISTS: &str = r#"
pub const PUT_IF_NOT_EXISTS: &str = r#"
WITH prev AS (
select k,v from greptime_metakv where k = $1
), insert AS (
Expand Down
16 changes: 13 additions & 3 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ use tonic::transport::server::{Router, TcpIncoming};

use crate::election::etcd::EtcdElection;
#[cfg(feature = "pg_kvbackend")]
use crate::election::postgres::PgElection;
#[cfg(feature = "pg_kvbackend")]
use crate::error::InvalidArgumentsSnafu;
use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu};
use crate::metasrv::builder::MetasrvBuilder;
Expand Down Expand Up @@ -224,9 +226,17 @@ pub async fn metasrv_builder(
#[cfg(feature = "pg_kvbackend")]
(None, BackendImpl::PostgresStore) => {
let pg_client = create_postgres_client(opts).await?;
let kv_backend = PgStore::with_pg_client(pg_client).await.unwrap();
// TODO(jeremy, weny): implement election for postgres
(kv_backend, None)
let kv_backend = PgStore::with_pg_client(pg_client)
.await
.context(error::KvBackendSnafu)?;
let election_client = create_postgres_client(opts).await?;
let election = PgElection::with_pg_client(
opts.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
)
.await?;
(kv_backend, Some(election))
}
};

Expand Down
19 changes: 15 additions & 4 deletions src/meta-srv/src/election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
// limitations under the License.

pub mod etcd;
#[cfg(feature = "pg_kvbackend")]
pub mod postgres;

use std::fmt;
use std::fmt::{self, Debug};
use std::sync::Arc;

use etcd_client::LeaderKey;
use tokio::sync::broadcast::Receiver;

use crate::error::Result;
Expand All @@ -26,10 +27,20 @@ use crate::metasrv::MetasrvNodeInfo;
pub const ELECTION_KEY: &str = "__metasrv_election";
pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";

const CANDIDATE_LEASE_SECS: u64 = 600;
const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;

#[derive(Debug, Clone)]
pub enum LeaderChangeMessage {
Elected(Arc<LeaderKey>),
StepDown(Arc<LeaderKey>),
Elected(Arc<dyn LeaderKey>),
StepDown(Arc<dyn LeaderKey>),
}

pub trait LeaderKey: Send + Sync + Debug {
fn name(&self) -> &[u8];
fn key(&self) -> &[u8];
fn rev(&self) -> i64;
fn lease(&self) -> i64;
}

impl fmt::Display for LeaderChangeMessage {
Expand Down
54 changes: 34 additions & 20 deletions src/meta-srv/src/election/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,41 @@ use std::time::Duration;

use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
use common_telemetry::{error, info, warn};
use etcd_client::{Client, GetOptions, LeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions};
use etcd_client::{
Client, GetOptions, LeaderKey as EtcdLeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions,
};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use tokio::time::{timeout, MissedTickBehavior};

use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY};
use crate::election::{
Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY,
KEEP_ALIVE_INTERVAL_SECS,
};
use crate::error;
use crate::error::Result;
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};

impl LeaderKey for EtcdLeaderKey {
fn name(&self) -> &[u8] {
self.name()
}

fn key(&self) -> &[u8] {
self.key()
}

fn rev(&self) -> i64 {
self.rev()
}

fn lease(&self) -> i64 {
self.lease()
}
}

pub struct EtcdElection {
leader_value: String,
client: Client,
Expand Down Expand Up @@ -75,14 +98,14 @@ impl EtcdElection {
LeaderChangeMessage::Elected(key) => {
info!(
"[{leader_ident}] is elected as leader: {:?}, lease: {}",
key.name_str(),
String::from_utf8_lossy(key.name()),
key.lease()
);
}
LeaderChangeMessage::StepDown(key) => {
warn!(
"[{leader_ident}] is stepping down: {:?}, lease: {}",
key.name_str(),
String::from_utf8_lossy(key.name()),
key.lease()
);
}
Expand Down Expand Up @@ -133,9 +156,6 @@ impl Election for EtcdElection {
}

async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
const CANDIDATE_LEASE_SECS: u64 = 600;
const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;

let mut lease_client = self.client.lease_client();
let res = lease_client
.grant(CANDIDATE_LEASE_SECS as i64, None)
Expand Down Expand Up @@ -239,7 +259,7 @@ impl Election for EtcdElection {
// The keep alive operation MUST be done in `META_KEEP_ALIVE_INTERVAL_SECS`.
match timeout(
keep_lease_duration,
self.keep_alive(&mut keeper, &mut receiver, leader),
self.keep_alive(&mut keeper, &mut receiver, leader.clone()),
)
.await
{
Expand All @@ -262,12 +282,9 @@ impl Election for EtcdElection {
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
if let Err(e) = self
.leader_watcher
self.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader.clone())))
{
error!(e; "Failed to send leader change message");
}
.context(error::SendLeaderChangeSnafu)?;
}
}

Expand Down Expand Up @@ -303,7 +320,7 @@ impl EtcdElection {
&self,
keeper: &mut LeaseKeeper,
receiver: &mut LeaseKeepAliveStream,
leader: &LeaderKey,
leader: EtcdLeaderKey,
) -> Result<()> {
keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
Expand All @@ -322,12 +339,9 @@ impl EtcdElection {
{
self.infancy.store(true, Ordering::Relaxed);

if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::Elected(Arc::new(leader.clone())))
{
error!(e; "Failed to send leader change message");
}
self.leader_watcher
.send(LeaderChangeMessage::Elected(Arc::new(leader)))
.context(error::SendLeaderChangeSnafu)?;
}
}

Expand Down
Loading

0 comments on commit 089a04b

Please sign in to comment.