Skip to content

Commit

Permalink
fix: handle duplicate keys
Browse files Browse the repository at this point in the history
  • Loading branch information
CookiePieWw committed Dec 19, 2024
1 parent 903865d commit 7dbfb5a
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 4 deletions.
2 changes: 1 addition & 1 deletion src/common/meta/src/kv_backend/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const RANGE_SCAN_FULL_RANGE: &str =

const FULL_TABLE_DELETE: &str = "DELETE FROM greptime_metakv RETURNING k,v";

const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE K = $1 RETURNING k,v;";
pub const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE K = $1 RETURNING k,v;";

const PREFIX_DELETE: &str = "DELETE FROM greptime_metakv WHERE k LIKE $1 RETURNING k,v;";

Expand Down
36 changes: 33 additions & 3 deletions src/meta-srv/src/election/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use std::sync::Arc;
use std::time::{self, Duration};

use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
use common_meta::kv_backend::postgres::{CAS, POINT_GET, PREFIX_SCAN, PUT_IF_NOT_EXISTS};
use common_meta::kv_backend::postgres::{
CAS, POINT_DELETE, POINT_GET, PREFIX_SCAN, PUT_IF_NOT_EXISTS,
};
use common_telemetry::{error, info, warn};
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
Expand Down Expand Up @@ -170,8 +172,22 @@ impl Election for PgElection {
.as_secs_f64()
+ CANDIDATE_LEASE_SECS as f64,
};
self.put_value_with_lease(&key, &value_with_lease).await?;
let res = self.put_value_with_lease(&key, &value_with_lease).await?;
// May registered before, check if the lease is expired. If not, just renew the lease.
if !res {
let prev = self.get_value_with_lease(&key).await?;
if prev.expire_time
< time::SystemTime::now()
.duration_since(time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64()
{
self.delete_value(&key).await?;
self.put_value_with_lease(&key, &value_with_lease).await?;
}
}

// Renew the lease
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS));

Expand Down Expand Up @@ -430,7 +446,7 @@ impl PgElection {
}
}

// Returns `true` if the insertion is successful
/// Returns `true` if the insertion is successful
async fn put_value_with_lease(&self, key: &Vec<u8>, value: &ValueWithLease) -> Result<bool> {
let value = serde_json::to_string(value)
.with_context(|_| SerializeToJsonSnafu {
Expand All @@ -447,6 +463,18 @@ impl PgElection {
Ok(res.is_empty())
}

/// Returns `true` if the deletion is successful.
/// Caution: Should only delete the key if the lease is expired.
async fn delete_value(&self, key: &Vec<u8>) -> Result<bool> {
let res = self
.client
.query(POINT_DELETE, &[key])
.await
.context(PostgresExecutionSnafu)?;

Ok(res.len() == 1)
}

async fn keep_alive(
&self,
key: &Vec<u8>,
Expand Down Expand Up @@ -507,6 +535,8 @@ impl PgElection {
.unwrap_or_default()
.as_secs_f64();
if leader_value_with_lease.expire_time <= now {
// Invalidate preivous leader

Check warning on line 538 in src/meta-srv/src/election/postgres.rs

View workflow job for this annotation

GitHub Actions / Check typos and docs

"preivous" should be "previous".
self.delete_value(key).await?;
return Ok(());
}
}
Expand Down

0 comments on commit 7dbfb5a

Please sign in to comment.