Skip to content

Commit

Permalink
refactor sql queries use diesel dsl
Browse files Browse the repository at this point in the history
  • Loading branch information
internet-diglett committed Nov 14, 2023
1 parent 9858397 commit 92df045
Showing 1 changed file with 52 additions and 113 deletions.
165 changes: 52 additions & 113 deletions nexus/db-queries/src/db/datastore/ipv4_nat_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ use crate::db::model::{Ipv4NatEntry, Ipv4NatValues};
use async_bb8_diesel::AsyncRunQueryDsl;
use chrono::{DateTime, Utc};
use diesel::prelude::*;
use diesel::sql_query;
use diesel::sql_types::BigInt;
use diesel::sql_types::Inet;
use diesel::sql_types::Integer;
use diesel::sql_types::Uuid;
use nexus_db_model::ExternalIp;
use nexus_db_model::Ipv4NatEntryView;
use nexus_db_model::SqlU32;
Expand Down Expand Up @@ -84,19 +80,17 @@ impl DataStore {
opctx: &OpContext,
nat_entry: &Ipv4NatEntry,
) -> DeleteResult {
// We use pure SQL here so we can call nextval() to increment the sequence number in the
// database.
let updated_rows = sql_query(
"
UPDATE omicron.public.ipv4_nat_entry
SET
version_removed = nextval('omicron.public.ipv4_nat_version'),
time_deleted = now()
WHERE time_deleted IS NULL AND version_removed IS NULL AND id = $1 AND version_added = $2
",
)
.bind::<Uuid, _>(nat_entry.id)
.bind::<BigInt, _>(nat_entry.version_added)
use db::schema::ipv4_nat_entry::dsl;

let updated_rows = diesel::update(dsl::ipv4_nat_entry)
.set((
dsl::version_removed.eq(ipv4_nat_next_version().nullable()),
dsl::time_deleted.eq(Utc::now()),
))
.filter(dsl::time_deleted.is_null())
.filter(dsl::version_removed.is_null())
.filter(dsl::id.eq(nat_entry.id))
.filter(dsl::version_added.eq(nat_entry.version_added))
.execute_async(&*self.pool_connection_authorized(opctx).await?)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?;
Expand All @@ -112,41 +106,6 @@ impl DataStore {
Ok(())
}

pub async fn ipv4_nat_cancel_delete(
&self,
opctx: &OpContext,
nat_entry: &Ipv4NatEntry,
) -> DeleteResult {
let version_removed =
nat_entry.version_removed.ok_or(Error::InvalidRequest {
message: "supplied nat entry does not have version_deleted"
.to_string(),
})?;

let updated_rows = sql_query(
"
UPDATE omicron.public.ipv4_nat_entry
SET
version_removed = NULL,
time_deleted = NULL,
version_added = nextval('omicron.public.ipv4_nat_version')
WHERE version_removed IS NOT NULL AND id = $1 AND version_removed = $2
",
)
.bind::<Uuid, _>(nat_entry.id)
.bind::<BigInt, _>(version_removed)
.execute_async(&*self.pool_connection_authorized(opctx).await?)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?;

if updated_rows == 0 {
return Err(Error::InvalidRequest {
message: "no matching records".to_string(),
});
}
Ok(())
}

pub async fn ipv4_nat_find_by_id(
&self,
opctx: &OpContext,
Expand Down Expand Up @@ -176,25 +135,21 @@ impl DataStore {
opctx: &OpContext,
external_ip: &ExternalIp,
) -> DeleteResult {
let updated_rows = sql_query(
"
UPDATE omicron.public.ipv4_nat_entry
SET
version_removed = nextval('omicron.public.ipv4_nat_version'),
time_deleted = now()
WHERE time_deleted IS NULL
AND version_removed IS NULL
AND external_address = $1
AND first_port = $2
AND last_port = $3
",
)
.bind::<Inet, _>(external_ip.ip)
.bind::<Integer, _>(external_ip.first_port)
.bind::<Integer, _>(external_ip.last_port)
.execute_async(&*self.pool_connection_authorized(opctx).await?)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?;
use db::schema::ipv4_nat_entry::dsl;

let updated_rows = diesel::update(dsl::ipv4_nat_entry)
.set((
dsl::version_removed.eq(ipv4_nat_next_version().nullable()),
dsl::time_deleted.eq(Utc::now()),
))
.filter(dsl::time_deleted.is_null())
.filter(dsl::version_removed.is_null())
.filter(dsl::external_address.eq(external_ip.ip))
.filter(dsl::first_port.eq(external_ip.first_port))
.filter(dsl::last_port.eq(external_ip.last_port))
.execute_async(&*self.pool_connection_authorized(opctx).await?)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?;

if updated_rows == 0 {
return Err(Error::ObjectNotFound {
Expand Down Expand Up @@ -313,6 +268,10 @@ impl DataStore {
}
}

fn ipv4_nat_next_version() -> diesel::expression::SqlLiteral<BigInt> {
diesel::dsl::sql::<BigInt>("nextval('omicron.public.ipv4_nat_version')")
}

#[cfg(test)]
mod test {
use std::str::FromStr;
Expand Down Expand Up @@ -407,6 +366,23 @@ mod test {
2
);

// Test Cleanup logic
// Cleanup should only perma-delete entries that are older than a
// specified version number and whose `time_deleted` field is
// older than a specified age.
let time_cutoff = Utc::now();
datastore.ipv4_nat_cleanup(&opctx, 2, time_cutoff).await.unwrap();

// Nothing should have changed (no records currently marked for deletion)
let nat_entries =
datastore.ipv4_nat_list_since_version(&opctx, 0, 10).await.unwrap();

assert_eq!(nat_entries.len(), 2);
assert_eq!(
datastore.ipv4_nat_current_version(&opctx).await.unwrap(),
2
);

// Delete the first nat entry. It should show up as a later version number.
datastore.ipv4_nat_delete(&opctx, &first_entry).await.unwrap();
let nat_entries =
Expand All @@ -426,51 +402,14 @@ mod test {
3
);

// Cancel deletion of NAT entry
datastore.ipv4_nat_cancel_delete(&opctx, nat_entry).await.unwrap();
let nat_entries =
datastore.ipv4_nat_list_since_version(&opctx, 0, 10).await.unwrap();

// The NAT table has undergone four changes.
let nat_entry =
datastore.ipv4_nat_find_by_id(&opctx, nat_entry.id).await.unwrap();
assert_eq!(nat_entries.len(), 2);
assert_eq!(nat_entry.version_added(), 4);
assert_eq!(nat_entry.id, first_entry.id);
assert_eq!(
datastore.ipv4_nat_current_version(&opctx).await.unwrap(),
4
);

// Test Cleanup logic
// Cleanup should only perma-delete entries that are older than a
// specified version number and whose `time_deleted` field is
// older than a specified age.
let time_cutoff = Utc::now();
datastore.ipv4_nat_cleanup(&opctx, 4, time_cutoff).await.unwrap();

// Nothing should have changed (no records currently marked for deletion)
let nat_entries =
datastore.ipv4_nat_list_since_version(&opctx, 0, 10).await.unwrap();

assert_eq!(nat_entries.len(), 2);
assert_eq!(
datastore.ipv4_nat_current_version(&opctx).await.unwrap(),
4
);

// Soft delete a record
let nat_entry = nat_entries.last().unwrap();
datastore.ipv4_nat_delete(&opctx, nat_entry).await.unwrap();

// Try cleaning up with the old version and time cutoff values
datastore.ipv4_nat_cleanup(&opctx, 4, time_cutoff).await.unwrap();
datastore.ipv4_nat_cleanup(&opctx, 2, time_cutoff).await.unwrap();

// Try cleaning up with a greater version and old time cutoff values
datastore.ipv4_nat_cleanup(&opctx, 6, time_cutoff).await.unwrap();

// Try cleaning up with a older version and newer time cutoff values
datastore.ipv4_nat_cleanup(&opctx, 4, Utc::now()).await.unwrap();
datastore.ipv4_nat_cleanup(&opctx, 2, Utc::now()).await.unwrap();

// Both records should still exist (soft deleted record is newer than cutoff
// values )
Expand All @@ -480,11 +419,11 @@ mod test {
assert_eq!(nat_entries.len(), 2);
assert_eq!(
datastore.ipv4_nat_current_version(&opctx).await.unwrap(),
5
3
);

// Try cleaning up with a both cutoff values increased
datastore.ipv4_nat_cleanup(&opctx, 6, Utc::now()).await.unwrap();
datastore.ipv4_nat_cleanup(&opctx, 4, Utc::now()).await.unwrap();

// Soft deleted NAT entry should be removed from the table
let nat_entries =
Expand All @@ -495,7 +434,7 @@ mod test {
// version should be unchanged
assert_eq!(
datastore.ipv4_nat_current_version(&opctx).await.unwrap(),
5
3
);

db.cleanup().await.unwrap();
Expand Down

0 comments on commit 92df045

Please sign in to comment.