Skip to content

Commit

Permalink
WIP quota check CTE
Browse files Browse the repository at this point in the history
  • Loading branch information
zephraph committed Dec 6, 2023
1 parent caf371b commit 392549c
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,36 @@ table! {
}
}

table! {
quotas (silo_id) {
silo_id -> Uuid,
cpus -> Int8,
memory -> Int8,
storage -> Int8,
}
}

table! {
silo_provisioned {
id -> Uuid,
virtual_disk_bytes_provisioned -> Int8,
cpus_provisioned -> Int8,
ram_provisioned -> Int8,
}
}

table! {
quota_check (passed) {
passed -> Bool,
}
}

diesel::allow_tables_to_appear_in_same_query!(silo, parent_silo,);

diesel::allow_tables_to_appear_in_same_query!(
virtual_provisioning_collection,
parent_silo,
all_collections,
do_update,
quotas
);
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,51 @@ use crate::db::pool::DbConnection;
use crate::db::schema::virtual_provisioning_collection;
use crate::db::schema::virtual_provisioning_resource;
use crate::db::subquery::{AsQuerySource, Cte, CteBuilder, CteQuery};
use crate::db::true_or_cast_error::matches_sentinel;
use crate::db::true_or_cast_error::TrueOrCastError;
use db_macros::Subquery;
use diesel::pg::Pg;
use diesel::query_builder::{AstPass, Query, QueryFragment, QueryId};
use diesel::result::Error as DieselError;
use diesel::{
sql_types, CombineDsl, ExpressionMethods, IntoSql,
NullableExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper,
};
use nexus_db_model::queries::virtual_provisioning_collection_update::{
all_collections, do_update, parent_silo,
all_collections, do_update, parent_silo, quota_check, quotas,
silo_provisioned,
};
use omicron_common::api::external;

const NOT_ENOUGH_CPUS_SENTINEL: &'static str = "Not enough cpus";
const NOT_ENOUGH_MEMORY_SENTINEL: &'static str = "Not enough memory";
const NOT_ENOUGH_STORAGE_SENTINEL: &'static str = "Not enough storage";

pub fn from_diesel(e: DieselError) -> external::Error {
use crate::db::error;

let sentinels = [
NOT_ENOUGH_CPUS_SENTINEL,
NOT_ENOUGH_MEMORY_SENTINEL,
NOT_ENOUGH_STORAGE_SENTINEL,
];
if let Some(sentinel) = matches_sentinel(&e, &sentinels) {
match sentinel {
NOT_ENOUGH_CPUS_SENTINEL => {
return external::Error::InvalidRequest { message: "Insufficient Capacity: Not enough CPUs to complete request. Either stop unused instances to free up resources or contact the rack operator to request a capacity increase.".to_string() }
}
NOT_ENOUGH_MEMORY_SENTINEL => {
return external::Error::InvalidRequest { message: "Insufficient Capacity: Not enough memory to complete request. Either stop unused instances to free up resources or contact the rack operator to request a capacity increase.".to_string() }
}
NOT_ENOUGH_STORAGE_SENTINEL => {
return external::Error::InvalidRequest { message: "Insufficient Capacity: Not enough storage to complete request. Either remove unneeded disks and snapshots to free up resources or contact the rack operator to request a capacity increase.".to_string() }
}
_ => {}
}

error::public_error_from_diesel(e, error::ErrorHandler::Server)
}
}

#[derive(Subquery, QueryId)]
#[subquery(name = parent_silo)]
Expand Down Expand Up @@ -161,6 +196,67 @@ impl UpdatedProvisions {
}
}

#[derive(Subquery, QueryId)]
#[subquery(name = quotas)]
struct Quotas {
query: Box<dyn CteQuery<SqlType = quotas::SqlType>>,
}

impl Quotas {
fn new(parent_silo: &ParentSilo, update_kind: UpdateKind) -> Self {
use crate::db::schema::silo_quotas::dsl;
Self {
query: Box::new(
dsl::silo_quotas.filter(dsl::silo_id.eq(parent_silo::id)),
),
}
}
}

#[derive(Subquery, QueryId)]
#[subquery(name = silo_provisioned)]
struct SiloProvisioned {
query: Box<dyn CteQuery<SqlType = silo_provisioned::SqlType>>,
}

impl SiloProvisioned {
fn new(parent_silo: &ParentSilo) -> Self {
use virtual_provisioning_collection::dsl;
Self {
query: Box::new(
dsl::virtual_provisioning_collection
.filter(dsl::id.eq(parent_silo::id)),
),
}
}
}

#[derive(Subquery, QueryId)]
#[subquery(name = quota_check)]
struct QuotaCheck {
query: Box<dyn CteQuery<SqlType = quota_check::SqlType>>,
}

impl QuotaCheck {
fn new(silo_provisioned: &SiloProvisioned, quotas: &Quotas) -> Self {
Self {
query: Box::new(diesel::select(
(ExpressionAlias::new::<quota_check::passed>(
TrueOrCastError::new(enough_cpus, NOT_ENOUGH_CPUS_SENTINEL)
.and(TrueOrCastError::new(
enough_memory,
NOT_ENOUGH_MEMORY_SENTINEL,
))
.and(TrueOrCastError::new(
enough_storage,
NOT_ENOUGH_STORAGE_SENTINEL,
)),
)),
)),
}
}
}

// This structure wraps a query, such that it can be used within a CTE.
//
// It generates a name that can be used by the "CteBuilder", but does not
Expand Down Expand Up @@ -195,6 +291,13 @@ where
}
}

/// The virtual resource collection is only updated when a resource is inserted
/// or deleted from the resource provisioning table for idempotency.
enum UpdateKind {
Insert(VirtualProvisioningResource),
Delete(uuid::Uuid),
}

/// Constructs a CTE for updating resource provisioning information in all
/// collections for a particular object.
#[derive(QueryId)]
Expand All @@ -220,7 +323,7 @@ impl VirtualProvisioningCollectionUpdate {
// - values: The updated values to propagate through collections (iff
// "do_update" evaluates to "true").
fn apply_update<U, V>(
do_update: DoUpdate,
update_kind: UpdateKind,
update: U,
project_id: uuid::Uuid,
values: V,
Expand All @@ -231,14 +334,24 @@ impl VirtualProvisioningCollectionUpdate {
<V as diesel::AsChangeset>::Changeset:
QueryFragment<Pg> + Send + 'static,
{
let do_update = match update_kind {
UpdateKind::Insert(resource) => {
DoUpdate::new_for_insert(resource.id)
}
UpdateKind::Delete(id) => DoUpdate::new_for_delete(id),
};
let parent_silo = ParentSilo::new(project_id);
let all_collections = AllCollections::new(
project_id,
&parent_silo,
*crate::db::fixed_data::FLEET_ID,
);

let updated_collections =
UpdatedProvisions::new(&all_collections, &do_update, values);
let quotas = Quotas::new(&parent_silo, update_kind);
let silo_provisioned = SiloProvisioned::new(&parent_silo);
let quota_check = QuotaCheck::new(&quotas, &silo_provisioned);

// TODO: Do we want to select from "all_collections" instead? Seems more
// idempotent; it'll work even when we don't update anything...
Expand All @@ -251,6 +364,8 @@ impl VirtualProvisioningCollectionUpdate {
let cte = CteBuilder::new()
.add_subquery(parent_silo)
.add_subquery(all_collections)
.add_subquery(quotas)
.add_subquery(silo_provisioned)
.add_subquery(do_update)
.add_subquery(update)
.add_subquery(updated_collections)
Expand All @@ -273,8 +388,7 @@ impl VirtualProvisioningCollectionUpdate {
provision.virtual_disk_bytes_provisioned = disk_byte_diff;

Self::apply_update(
// We should insert the record if it does not already exist.
DoUpdate::new_for_insert(id),
UpdateKind::Insert(provision),
// The query to actually insert the record.
UnreferenceableSubquery(
diesel::insert_into(
Expand Down Expand Up @@ -305,8 +419,7 @@ impl VirtualProvisioningCollectionUpdate {
use virtual_provisioning_resource::dsl as resource_dsl;

Self::apply_update(
// We should delete the record if it exists.
DoUpdate::new_for_delete(id),
UpdateKind::Delete(id),
// The query to actually delete the record.
UnreferenceableSubquery(
diesel::delete(resource_dsl::virtual_provisioning_resource)
Expand Down Expand Up @@ -342,8 +455,7 @@ impl VirtualProvisioningCollectionUpdate {
provision.ram_provisioned = ram_diff;

Self::apply_update(
// We should insert the record if it does not already exist.
DoUpdate::new_for_insert(id),
UpdateKind::Insert(provision),
// The query to actually insert the record.
UnreferenceableSubquery(
diesel::insert_into(
Expand Down Expand Up @@ -378,8 +490,7 @@ impl VirtualProvisioningCollectionUpdate {
use virtual_provisioning_resource::dsl as resource_dsl;

Self::apply_update(
// We should delete the record if it exists.
DoUpdate::new_for_delete(id),
UpdateKind::Delete(id),
// The query to actually delete the record.
//
// The filter condition here ensures that the provisioning record is
Expand Down

0 comments on commit 392549c

Please sign in to comment.