diff --git a/Cargo.lock b/Cargo.lock index 8188a1feb7..0421ec6653 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1147,6 +1147,26 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "const_format" +version = "0.2.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3a214c7af3d04997541b18d432afaff4c455e79e2029079647e72fc2bd27673" +dependencies = [ + "const_format_proc_macros", +] + +[[package]] +name = "const_format_proc_macros" +version = "0.2.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7f6ff08fd20f4f299298a28e2dfa8a8ba1036e6cd2460ac1de7b425d76f2500" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + [[package]] name = "constant_time_eq" version = "0.2.6" @@ -4590,6 +4610,7 @@ dependencies = [ "camino", "camino-tempfile", "chrono", + "const_format", "cookie 0.18.0", "db-macros", "diesel", diff --git a/Cargo.toml b/Cargo.toml index 018941f081..6546100e3f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -188,6 +188,7 @@ cfg-if = "1.0" chrono = { version = "0.4", features = [ "serde" ] } clap = { version = "4.5", features = ["cargo", "derive", "env", "wrap_help"] } colored = "2.1" +const_format = "0.2.32" cookie = "0.18" criterion = { version = "0.5.1", features = [ "async_tokio" ] } crossbeam = "0.8" diff --git a/nexus/db-model/src/queries/mod.rs b/nexus/db-model/src/queries/mod.rs index 7724d48bab..e138508f84 100644 --- a/nexus/db-model/src/queries/mod.rs +++ b/nexus/db-model/src/queries/mod.rs @@ -4,5 +4,4 @@ //! Subqueries used in CTEs. -pub mod region_allocation; pub mod virtual_provisioning_collection_update; diff --git a/nexus/db-model/src/queries/region_allocation.rs b/nexus/db-model/src/queries/region_allocation.rs deleted file mode 100644 index a1b9e0373a..0000000000 --- a/nexus/db-model/src/queries/region_allocation.rs +++ /dev/null @@ -1,195 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. - -//! Describes subqueries which may be issues as a part of CTEs. -//! -//! When possible, it's preferable to define subqueries close to their -//! usage. However, certain Diesel traits (such as those enabling joins) -//! require the table structures to be defined in the same crate. - -// TODO: We're currently piggy-backing on the table macro for convenience. -// We actually do not want to generate an entire table for each subquery - we'd -// like to have a query source (which we can use to generate SELECT statements, -// JOIN, etc), but we don't want this to be an INSERT/UPDATE/DELETE target. -// -// Similarly, we don't want to force callers to supply a "primary key". -// -// I've looked into Diesel's `alias!` macro for this purpose, but unfortunately -// that implementation is too opinionated about the output QueryFragment. -// It expects to use the form: -// -// " as ", which is actually the opposite of what we want in -// a CTE (where we want the alias name to come first). - -use crate::schema::dataset; -use crate::schema::sled; -use crate::schema::zpool; - -table! { - old_regions { - id -> Uuid, - time_created -> Timestamptz, - time_modified -> Timestamptz, - - dataset_id -> Uuid, - volume_id -> Uuid, - - block_size -> Int8, - blocks_per_extent -> Int8, - extent_count -> Int8, - } -} - -table! { - candidate_datasets { - id -> Uuid, - pool_id -> Uuid, - } -} - -table! { - shuffled_candidate_datasets { - id -> Uuid, - pool_id -> Uuid, - } -} - -table! { - candidate_regions { - id -> Uuid, - time_created -> Timestamptz, - time_modified -> Timestamptz, - - dataset_id -> Uuid, - volume_id -> Uuid, - - block_size -> Int8, - blocks_per_extent -> Int8, - extent_count -> Int8, - } -} - -table! { - proposed_dataset_changes { - id -> Uuid, - pool_id -> Uuid, - size_used_delta -> Int8, - } -} - -table! { - old_zpool_usage (pool_id) { - pool_id -> Uuid, - size_used -> Numeric, - } -} - -table! { - candidate_zpools (pool_id) { - pool_id -> Uuid - } -} - -table! { - do_insert (insert) { - insert -> Bool, - } -} - -table! { - one_zpool_per_sled (pool_id) { - pool_id -> Uuid - } -} - -table! { - one_dataset_per_zpool { - id -> Uuid, - pool_id -> Uuid - } -} - -table! { - inserted_regions { - id -> Uuid, - time_created -> Timestamptz, - time_modified -> Timestamptz, - - dataset_id -> Uuid, - volume_id -> Uuid, - - block_size -> Int8, - blocks_per_extent -> Int8, - extent_count -> Int8, - } -} - -table! { - updated_datasets (id) { - id -> Uuid, - time_created -> Timestamptz, - time_modified -> Timestamptz, - time_deleted -> Nullable, - rcgen -> Int8, - - pool_id -> Uuid, - - ip -> Inet, - port -> Int4, - - kind -> crate::DatasetKindEnum, - size_used -> Nullable, - } -} - -diesel::allow_tables_to_appear_in_same_query!( - proposed_dataset_changes, - dataset, -); - -diesel::allow_tables_to_appear_in_same_query!( - do_insert, - candidate_regions, - dataset, - zpool, -); - -diesel::allow_tables_to_appear_in_same_query!( - old_zpool_usage, - zpool, - sled, - proposed_dataset_changes, -); - -diesel::allow_tables_to_appear_in_same_query!(old_regions, dataset,); -diesel::allow_tables_to_appear_in_same_query!(old_regions, zpool,); - -diesel::allow_tables_to_appear_in_same_query!( - inserted_regions, - updated_datasets, -); - -diesel::allow_tables_to_appear_in_same_query!(candidate_zpools, dataset,); -diesel::allow_tables_to_appear_in_same_query!(candidate_zpools, zpool,); -diesel::allow_tables_to_appear_in_same_query!(candidate_datasets, dataset); - -// == Needed for random region allocation == - -pub mod cockroach_md5 { - pub mod functions { - use diesel::sql_types::*; - diesel::sql_function!(fn md5(x: Bytea) -> Bytea); - } - - pub mod helper_types { - pub type Md5 = super::functions::md5::HelperType; - } - - pub mod dsl { - pub use super::functions::*; - pub use super::helper_types::*; - } -} - -// == End random region allocation dependencies == diff --git a/nexus/db-queries/Cargo.toml b/nexus/db-queries/Cargo.toml index 595280780e..5f99b904fc 100644 --- a/nexus/db-queries/Cargo.toml +++ b/nexus/db-queries/Cargo.toml @@ -15,6 +15,7 @@ base64.workspace = true bb8.workspace = true camino.workspace = true chrono.workspace = true +const_format.workspace = true cookie.workspace = true diesel.workspace = true diesel-dtrace.workspace = true diff --git a/nexus/db-queries/src/db/cast_uuid_as_bytea.rs b/nexus/db-queries/src/db/cast_uuid_as_bytea.rs deleted file mode 100644 index c50c88971f..0000000000 --- a/nexus/db-queries/src/db/cast_uuid_as_bytea.rs +++ /dev/null @@ -1,62 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. - -//! Cast UUID to BYTES - -use diesel::expression::ValidGrouping; -use diesel::pg::Pg; -use diesel::query_builder::AstPass; -use diesel::query_builder::QueryFragment; -use diesel::query_builder::QueryId; -use diesel::Expression; -use diesel::SelectableExpression; - -/// Cast an expression which evaluates to a Uuid and cast it to a Bytea. It's -/// that simple! -#[derive(ValidGrouping, QueryId)] -pub struct CastUuidToBytea { - expression: E, -} - -impl CastUuidToBytea -where - E: Expression, -{ - pub const fn new(expression: E) -> Self { - Self { expression } - } -} - -impl Expression for CastUuidToBytea -where - E: Expression, -{ - type SqlType = diesel::sql_types::Bytea; -} - -impl diesel::AppearsOnTable for CastUuidToBytea where - E: diesel::AppearsOnTable -{ -} - -impl SelectableExpression for CastUuidToBytea where - E: SelectableExpression -{ -} - -impl QueryFragment for CastUuidToBytea -where - E: QueryFragment, -{ - fn walk_ast<'a>( - &'a self, - mut out: AstPass<'_, 'a, Pg>, - ) -> diesel::QueryResult<()> { - out.push_sql("CAST("); - self.expression.walk_ast(out.reborrow())?; - out.push_sql(" as BYTEA)"); - - Ok(()) - } -} diff --git a/nexus/db-queries/src/db/column_walker.rs b/nexus/db-queries/src/db/column_walker.rs index 64c3b450c8..cace2ba5fb 100644 --- a/nexus/db-queries/src/db/column_walker.rs +++ b/nexus/db-queries/src/db/column_walker.rs @@ -4,6 +4,7 @@ //! CTE utility for iterating over all columns in a table. +use crate::db::raw_query_builder::TrustedStr; use diesel::prelude::*; use std::marker::PhantomData; @@ -17,14 +18,30 @@ pub(crate) struct ColumnWalker { remaining: PhantomData, } +pub type AllColumnsOf = ColumnWalker<::AllColumns>; + impl ColumnWalker { - pub fn new() -> Self { + pub const fn new() -> Self { Self { remaining: PhantomData } } } macro_rules! impl_column_walker { ( $len:literal $($column:ident)+ ) => ( + #[allow(dead_code)] + impl<$($column: Column),+> ColumnWalker<($($column,)+)> { + pub fn with_prefix(prefix: &'static str) -> TrustedStr { + // This string is derived from: + // - The "table" type, with associated columns, which + // are not controlled by an arbitrary user, and + // - The "prefix" type, which is a "&'static str" (AKA, + // hopefully known at compile-time, and not leaked). + TrustedStr::i_take_responsibility_for_validating_this_string( + [$([prefix, $column::NAME].join("."),)+].join(", ") + ) + } + } + impl<$($column: Column),+> IntoIterator for ColumnWalker<($($column,)+)> { type Item = &'static str; type IntoIter = std::array::IntoIter; @@ -109,4 +126,12 @@ mod test { assert_eq!(iter.next(), Some("value")); assert_eq!(iter.next(), None); } + + #[test] + fn test_all_columns_with_prefix() { + assert_eq!( + AllColumnsOf::::with_prefix("foo").as_str(), + "foo.id, foo.value, foo.time_deleted" + ); + } } diff --git a/nexus/db-queries/src/db/datastore/mod.rs b/nexus/db-queries/src/db/datastore/mod.rs index 2a3f969183..0020cf99b3 100644 --- a/nexus/db-queries/src/db/datastore/mod.rs +++ b/nexus/db-queries/src/db/datastore/mod.rs @@ -949,10 +949,21 @@ mod test { assert_eq!(expected_region_count, dataset_and_regions.len()); let mut disk_datasets = HashSet::new(); let mut disk_zpools = HashSet::new(); + let mut regions = HashSet::new(); for (dataset, region) in dataset_and_regions { // Must be 3 unique datasets assert!(disk_datasets.insert(dataset.id())); + // All regions should be unique + assert!(regions.insert(region.id())); + + // Check there's no cross contamination between returned UUIDs + // + // This is a little goofy, but it catches a bug that has + // happened before. The returned columns share names (like + // "id"), so we need to process them in-order. + assert!(regions.get(&dataset.id()).is_none()); + assert!(disk_datasets.get(®ion.id()).is_none()); // Dataset must not be eligible for provisioning. if let Some(kind) = diff --git a/nexus/db-queries/src/db/datastore/region.rs b/nexus/db-queries/src/db/datastore/region.rs index 52e0ce4d88..ad89a9ca93 100644 --- a/nexus/db-queries/src/db/datastore/region.rs +++ b/nexus/db-queries/src/db/datastore/region.rs @@ -128,7 +128,7 @@ impl DataStore { let (blocks_per_extent, extent_count) = Self::get_crucible_allocation(&block_size, size); - let query = crate::db::queries::region_allocation::RegionAllocate::new( + let query = crate::db::queries::region_allocation::allocation_query( volume_id, block_size.to_bytes() as u64, blocks_per_extent, @@ -141,6 +141,12 @@ impl DataStore { crate::db::queries::region_allocation::from_diesel(e) })?; + info!( + self.log, + "Allocated regions for volume"; + "volume_id" => %volume_id, + "datasets_and_regions" => ?dataset_and_regions, + ); Ok(dataset_and_regions) } diff --git a/nexus/db-queries/src/db/mod.rs b/nexus/db-queries/src/db/mod.rs index d5262166ee..9b3d71970c 100644 --- a/nexus/db-queries/src/db/mod.rs +++ b/nexus/db-queries/src/db/mod.rs @@ -5,7 +5,6 @@ //! Facilities for working with the Omicron database pub(crate) mod alias; -pub(crate) mod cast_uuid_as_bytea; // This is not intended to be public, but this is necessary to use it from // doctests pub mod collection_attach; @@ -29,6 +28,7 @@ mod pool_connection; // This is marked public because the error types are used elsewhere, e.g., in // sagas. pub mod queries; +mod raw_query_builder; mod saga_recovery; mod sec_store; pub mod subquery; diff --git a/nexus/db-queries/src/db/queries/region_allocation.rs b/nexus/db-queries/src/db/queries/region_allocation.rs index a657d21c97..2e4f4cd776 100644 --- a/nexus/db-queries/src/db/queries/region_allocation.rs +++ b/nexus/db-queries/src/db/queries/region_allocation.rs @@ -4,35 +4,22 @@ //! Implementation of queries for provisioning regions. -use crate::db::alias::ExpressionAlias; -use crate::db::cast_uuid_as_bytea::CastUuidToBytea; +use crate::db::column_walker::AllColumnsOf; use crate::db::datastore::REGION_REDUNDANCY_THRESHOLD; -use crate::db::model::{Dataset, DatasetKind, Region}; -use crate::db::pool::DbConnection; -use crate::db::subquery::{AsQuerySource, Cte, CteBuilder, CteQuery}; -use crate::db::true_or_cast_error::{matches_sentinel, TrueOrCastError}; -use db_macros::Subquery; +use crate::db::model::{Dataset, Region}; +use crate::db::raw_query_builder::{QueryBuilder, TypedSqlQuery}; +use crate::db::schema; +use crate::db::true_or_cast_error::matches_sentinel; +use const_format::concatcp; use diesel::pg::Pg; -use diesel::query_builder::{AstPass, Query, QueryFragment, QueryId}; use diesel::result::Error as DieselError; -use diesel::PgBinaryExpressionMethods; -use diesel::{ - sql_types, BoolExpressionMethods, CombineDsl, ExpressionMethods, - Insertable, IntoSql, JoinOnDsl, NullableExpressionMethods, QueryDsl, - RunQueryDsl, -}; +use diesel::sql_types; use nexus_config::RegionAllocationStrategy; -use nexus_db_model::queries::region_allocation::{ - candidate_datasets, candidate_regions, candidate_zpools, cockroach_md5, - do_insert, inserted_regions, old_regions, old_zpool_usage, - proposed_dataset_changes, shuffled_candidate_datasets, updated_datasets, -}; -use nexus_db_model::schema; -use nexus_db_model::to_db_sled_policy; -use nexus_db_model::SledState; -use nexus_types::external_api::views::SledPolicy; use omicron_common::api::external; +type AllColumnsOfRegion = AllColumnsOf; +type AllColumnsOfDataset = AllColumnsOf; + const NOT_ENOUGH_DATASETS_SENTINEL: &'static str = "Not enough datasets"; const NOT_ENOUGH_ZPOOL_SPACE_SENTINEL: &'static str = "Not enough space"; const NOT_ENOUGH_UNIQUE_ZPOOLS_SENTINEL: &'static str = @@ -77,611 +64,345 @@ pub fn from_diesel(e: DieselError) -> external::Error { error::public_error_from_diesel(e, error::ErrorHandler::Server) } -/// A subquery to find all old regions associated with a particular volume. -#[derive(Subquery, QueryId)] -#[subquery(name = old_regions)] -struct OldRegions { - query: Box>, -} - -impl OldRegions { - fn new(volume_id: uuid::Uuid) -> Self { - use crate::db::schema::region::dsl; - Self { - query: Box::new(dsl::region.filter(dsl::volume_id.eq(volume_id))), - } - } -} - -/// A subquery to find datasets which could be used for provisioning regions. -/// -/// We only consider datasets which are already allocated as "Crucible". -/// This implicitly distinguishes between "M.2s" and "U.2s" -- Nexus needs to -/// determine during dataset provisioning which devices should be considered for -/// usage as Crucible storage. -/// -/// We select only one dataset from each zpool. -#[derive(Subquery, QueryId)] -#[subquery(name = candidate_datasets)] -struct CandidateDatasets { - query: Box>, -} - -impl CandidateDatasets { - fn new(candidate_zpools: &CandidateZpools, seed: u128) -> Self { - use crate::db::schema::dataset::dsl as dataset_dsl; - use candidate_zpools::dsl as candidate_zpool_dsl; - - let seed_bytes = seed.to_le_bytes(); - - let query: Box> = - Box::new( - dataset_dsl::dataset - .inner_join(candidate_zpools.query_source().on( - dataset_dsl::pool_id.eq(candidate_zpool_dsl::pool_id), - )) - .filter(dataset_dsl::time_deleted.is_null()) - .filter(dataset_dsl::size_used.is_not_null()) - .filter(dataset_dsl::kind.eq(DatasetKind::Crucible)) - .distinct_on(dataset_dsl::pool_id) - .order_by(( - dataset_dsl::pool_id, - cockroach_md5::dsl::md5( - CastUuidToBytea::new(dataset_dsl::id) - .concat(seed_bytes.to_vec()), - ), - )) - .select((dataset_dsl::id, dataset_dsl::pool_id)), - ); - Self { query } - } -} - -/// Shuffle the candidate datasets, and select REGION_REDUNDANCY_THRESHOLD -/// regions from it. -#[derive(Subquery, QueryId)] -#[subquery(name = shuffled_candidate_datasets)] -struct ShuffledCandidateDatasets { - query: Box>, -} - -impl ShuffledCandidateDatasets { - fn new(candidate_datasets: &CandidateDatasets, seed: u128) -> Self { - use candidate_datasets::dsl as candidate_datasets_dsl; - - let seed_bytes = seed.to_le_bytes(); - - let query: Box> = - Box::new( - candidate_datasets - .query_source() - // We order by md5 to shuffle the ordering of the datasets. - // md5 has a uniform output distribution so it does the job. - .order(cockroach_md5::dsl::md5( - CastUuidToBytea::new(candidate_datasets_dsl::id) - .concat(seed_bytes.to_vec()), - )) - .select(( - candidate_datasets_dsl::id, - candidate_datasets_dsl::pool_id, - )) - .limit(REGION_REDUNDANCY_THRESHOLD.try_into().unwrap()), - ); - Self { query } - } -} - -/// A subquery to create the regions-to-be-inserted for the volume. -#[derive(Subquery, QueryId)] -#[subquery(name = candidate_regions)] -struct CandidateRegions { - query: Box>, -} - -diesel::sql_function!(fn gen_random_uuid() -> Uuid); -diesel::sql_function!(fn now() -> Timestamptz); - -impl CandidateRegions { - fn new( - shuffled_candidate_datasets: &ShuffledCandidateDatasets, - volume_id: uuid::Uuid, - block_size: u64, - blocks_per_extent: u64, - extent_count: u64, - ) -> Self { - use schema::region; - use shuffled_candidate_datasets::dsl as shuffled_candidate_datasets_dsl; - - let volume_id = volume_id.into_sql::(); - let block_size = (block_size as i64).into_sql::(); - let blocks_per_extent = - (blocks_per_extent as i64).into_sql::(); - let extent_count = - (extent_count as i64).into_sql::(); - Self { - query: Box::new(shuffled_candidate_datasets.query_source().select( - ( - ExpressionAlias::new::(gen_random_uuid()), - ExpressionAlias::new::(now()), - ExpressionAlias::new::(now()), - ExpressionAlias::new::( - shuffled_candidate_datasets_dsl::id, - ), - ExpressionAlias::new::(volume_id), - ExpressionAlias::new::(block_size), - ExpressionAlias::new::( - blocks_per_extent, - ), - ExpressionAlias::new::(extent_count), - ), - )), - } - } -} - -/// A subquery which summarizes the changes we intend to make, showing: -/// -/// 1. Which datasets will have size adjustments -/// 2. Which pools those datasets belong to -/// 3. The delta in size-used -#[derive(Subquery, QueryId)] -#[subquery(name = proposed_dataset_changes)] -struct ProposedChanges { - query: Box>, -} - -impl ProposedChanges { - fn new(candidate_regions: &CandidateRegions) -> Self { - use crate::db::schema::dataset::dsl as dataset_dsl; - use candidate_regions::dsl as candidate_regions_dsl; - Self { - query: Box::new( - candidate_regions.query_source() - .inner_join( - dataset_dsl::dataset.on(dataset_dsl::id.eq(candidate_regions_dsl::dataset_id)) - ) - .select(( - ExpressionAlias::new::(candidate_regions_dsl::dataset_id), - ExpressionAlias::new::(dataset_dsl::pool_id), - ExpressionAlias::new::( - candidate_regions_dsl::block_size * - candidate_regions_dsl::blocks_per_extent * - candidate_regions_dsl::extent_count - ), - )) - ), - } - } -} - -/// A subquery which calculates the old size being used by zpools -/// under consideration as targets for region allocation. -#[derive(Subquery, QueryId)] -#[subquery(name = old_zpool_usage)] -struct OldPoolUsage { - query: Box>, -} - -impl OldPoolUsage { - fn new() -> Self { - use crate::db::schema::dataset::dsl as dataset_dsl; - Self { - query: Box::new( - dataset_dsl::dataset - .group_by(dataset_dsl::pool_id) - .filter(dataset_dsl::size_used.is_not_null()) - .filter(dataset_dsl::time_deleted.is_null()) - .select(( - dataset_dsl::pool_id, - ExpressionAlias::new::( - diesel::dsl::sum(dataset_dsl::size_used) - .assume_not_null(), - ), - )), - ), - } - } -} - -/// A subquery which identifies zpools with enough space for a region allocation. -#[derive(Subquery, QueryId)] -#[subquery(name = candidate_zpools)] -struct CandidateZpools { - query: Box>, -} - -impl CandidateZpools { - fn new( - old_zpool_usage: &OldPoolUsage, - zpool_size_delta: u64, - seed: u128, - distinct_sleds: bool, - ) -> Self { - use schema::sled::dsl as sled_dsl; - use schema::zpool::dsl as zpool_dsl; - - // Why are we using raw `diesel::dsl::sql` here? - // - // When SQL performs the "SUM" operation on "bigint" type, the result - // is promoted to "numeric" (see: old_zpool_usage::dsl::size_used). - // - // However, we'd like to compare that value with a different value - // (zpool_dsl::total_size) which is still a "bigint". This comparison - // is safe (after all, we basically want to promote "total_size" to a - // Numeric too) but Diesel demands that the input and output SQL types - // of expression methods like ".le" match exactly. - // - // For similar reasons, we use `diesel::dsl::sql` with zpool_size_delta. - // We would like to add it, but diesel only permits us to `to_sql()` it - // into a BigInt, not a Numeric. I welcome a better solution. - let it_will_fit = (old_zpool_usage::dsl::size_used - + diesel::dsl::sql(&zpool_size_delta.to_string())) - .le(diesel::dsl::sql( - "(SELECT total_size FROM omicron.public.inv_zpool WHERE - inv_zpool.id = old_zpool_usage.pool_id - ORDER BY inv_zpool.time_collected DESC LIMIT 1)", - )); - - // We need to join on the sled table to access provision_state. - let with_sled = sled_dsl::sled.on(zpool_dsl::sled_id.eq(sled_dsl::id)); - let with_zpool = zpool_dsl::zpool - .on(zpool_dsl::id.eq(old_zpool_usage::dsl::pool_id)) - .inner_join(with_sled); - - let sled_is_provisionable = sled_dsl::sled_policy - .eq(to_db_sled_policy(SledPolicy::provisionable())); - let sled_is_active = sled_dsl::sled_state.eq(SledState::Active); - - let base_query = old_zpool_usage - .query_source() - .inner_join(with_zpool) - .filter(it_will_fit) - .filter(sled_is_provisionable) - .filter(sled_is_active) - .select((old_zpool_usage::dsl::pool_id,)); - - let query = if distinct_sleds { - let seed_bytes = seed.to_le_bytes(); - - let query: Box> = - Box::new( - base_query - .order_by(( - zpool_dsl::sled_id, - cockroach_md5::dsl::md5( - CastUuidToBytea::new(zpool_dsl::id) - .concat(seed_bytes.to_vec()), - ), - )) - .distinct_on(zpool_dsl::sled_id), - ); - - query - } else { - let query: Box> = - Box::new(base_query); +type SelectableSql = < + >::SelectExpression as diesel::Expression +>::SqlType; - query +pub fn allocation_query( + volume_id: uuid::Uuid, + block_size: u64, + blocks_per_extent: u64, + extent_count: u64, + allocation_strategy: &RegionAllocationStrategy, +) -> TypedSqlQuery<(SelectableSql, SelectableSql)> { + let (seed, distinct_sleds) = { + let (input_seed, distinct_sleds) = match allocation_strategy { + RegionAllocationStrategy::Random { seed } => (seed, false), + RegionAllocationStrategy::RandomWithDistinctSleds { seed } => { + (seed, true) + } }; - - Self { query } - } -} - -diesel::sql_function! { - #[aggregate] - fn bool_and(b: sql_types::Bool) -> sql_types::Bool; -} - -/// A subquery which confirms whether or not the insertion and updates should -/// occur. -/// -/// This subquery additionally exits the CTE early with an error if either: -/// 1. Not enough datasets exist to provision regions with our required -/// redundancy, or -/// 2. Not enough space exists on zpools to perform the provisioning. -#[derive(Subquery, QueryId)] -#[subquery(name = do_insert)] -struct DoInsert { - query: Box>, -} - -impl DoInsert { - fn new( - old_regions: &OldRegions, - candidate_regions: &CandidateRegions, - candidate_zpools: &CandidateZpools, - ) -> Self { - let redundancy = REGION_REDUNDANCY_THRESHOLD as i64; - let not_allocated_yet = old_regions - .query_source() - .count() - .single_value() - .assume_not_null() - .lt(redundancy); - - let enough_candidate_zpools = candidate_zpools - .query_source() - .count() - .single_value() - .assume_not_null() - .ge(redundancy); - - let enough_candidate_regions = candidate_regions - .query_source() - .count() - .single_value() - .assume_not_null() - .ge(redundancy); - - // We want to ensure that we do not allocate on two datasets in the same - // zpool, for two reasons - // - Data redundancy: If a drive fails it should only take one of the 3 - // regions with it - // - Risk of overallocation: We only check that each zpool as enough - // room for one region, so we should not allocate more than one region - // to it. - // - // Selecting two datasets on the same zpool will not initially be - // possible, as at the time of writing each zpool only has one dataset. - // Additionally, we intend to modify the allocation strategy to select - // from 3 distinct sleds, removing the possibility entirely. But, if we - // introduce a change that adds another crucible dataset to zpools - // before we improve the allocation strategy, this check will make sure - // we don't violate drive redundancy, and generate an error instead. - use crate::db::schema::dataset::dsl as dataset_dsl; - use candidate_regions::dsl as candidate_dsl; - let enough_unique_candidate_zpools = candidate_regions - .query_source() - .inner_join( - dataset_dsl::dataset - .on(candidate_dsl::dataset_id.eq(dataset_dsl::id)), - ) - .select(diesel::dsl::count_distinct(dataset_dsl::pool_id)) - .single_value() - .assume_not_null() - .ge(redundancy); - - Self { - query: Box::new(diesel::select((ExpressionAlias::new::< - do_insert::insert, - >( - not_allocated_yet - .and(TrueOrCastError::new( - enough_candidate_zpools, - NOT_ENOUGH_ZPOOL_SPACE_SENTINEL, - )) - .and(TrueOrCastError::new( - enough_candidate_regions, - NOT_ENOUGH_DATASETS_SENTINEL, - )) - .and(TrueOrCastError::new( - enough_unique_candidate_zpools, - NOT_ENOUGH_UNIQUE_ZPOOLS_SENTINEL, - )), - ),))), - } - } -} - -/// A subquery which actually inserts the regions. -#[derive(Subquery, QueryId)] -#[subquery(name = inserted_regions)] -struct InsertRegions { - query: Box>, -} - -impl InsertRegions { - fn new(do_insert: &DoInsert, candidate_regions: &CandidateRegions) -> Self { - use crate::db::schema::region; - - Self { - query: Box::new( - candidate_regions - .query_source() - .select(candidate_regions::all_columns) - .filter( - do_insert - .query_source() - .select(do_insert::insert) - .single_value() - .assume_not_null(), - ) - .insert_into(region::table) - .returning(region::all_columns), + ( + input_seed.map_or_else( + || { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() + }, + |seed| seed as u128, ), - } - } -} - -/// A subquery which updates dataset size usage based on inserted regions. -#[derive(Subquery, QueryId)] -#[subquery(name = updated_datasets)] -struct UpdateDatasets { - query: Box>, -} - -impl UpdateDatasets { - fn new( - do_insert: &DoInsert, - proposed_dataset_changes: &ProposedChanges, - ) -> Self { - use crate::db::schema::dataset::dsl as dataset_dsl; - - let datasets_with_updates = proposed_dataset_changes - .query_source() - .select(proposed_dataset_changes::columns::id) - .into_boxed(); - - Self { - query: Box::new( - diesel::update( - dataset_dsl::dataset.filter( - dataset_dsl::id.eq_any(datasets_with_updates) - ) - ) - .filter( - do_insert.query_source() - .select(do_insert::insert) - .single_value() - .assume_not_null() - ) - .set( - dataset_dsl::size_used.eq( - dataset_dsl::size_used + proposed_dataset_changes.query_source() - .filter(proposed_dataset_changes::columns::id.eq(dataset_dsl::id)) - .select(proposed_dataset_changes::columns::size_used_delta) - .single_value() - ) - ) - .returning(crate::db::schema::dataset::all_columns) - ) - } + distinct_sleds, + ) + }; + + let seed = seed.to_le_bytes().to_vec(); + + let size_delta = block_size * blocks_per_extent * extent_count; + let redundancy: i64 = i64::try_from(REGION_REDUNDANCY_THRESHOLD).unwrap(); + + let builder = QueryBuilder::new().sql( + // Find all old regions associated with a particular volume +"WITH + old_regions AS ( + SELECT ").sql(AllColumnsOfRegion::with_prefix("region")).sql(" + FROM region WHERE (region.volume_id = ").param().sql(")),") + .bind::(volume_id) + + // Calculates the old size being used by zpools under consideration as targets for region + // allocation. + .sql(" + old_zpool_usage AS ( + SELECT + dataset.pool_id, + sum(dataset.size_used) AS size_used + FROM dataset WHERE ((dataset.size_used IS NOT NULL) AND (dataset.time_deleted IS NULL)) GROUP BY dataset.pool_id),") + .sql(" + candidate_zpools AS ("); + + // Identifies zpools with enough space for region allocation. + // + // NOTE: 'distinct_sleds' changes the format of the underlying SQL query, as it uses + // distinct bind parameters depending on the conditional branch. + let builder = if distinct_sleds { + builder.sql("SELECT DISTINCT ON (zpool.sled_id) ") + } else { + builder.sql("SELECT ") + }; + let builder = builder.sql(" + old_zpool_usage.pool_id + FROM ( + old_zpool_usage + INNER JOIN + (zpool INNER JOIN sled ON (zpool.sled_id = sled.id)) ON (zpool.id = old_zpool_usage.pool_id) + ) + WHERE ( + ((old_zpool_usage.size_used + ").param().sql(" ) <= + (SELECT total_size FROM omicron.public.inv_zpool WHERE + inv_zpool.id = old_zpool_usage.pool_id + ORDER BY inv_zpool.time_collected DESC LIMIT 1) + ) + AND + (sled.sled_policy = 'in_service') + AND + (sled.sled_state = 'active') + )" + ).bind::(size_delta as i64); + + let builder = if distinct_sleds { + builder + .sql("ORDER BY zpool.sled_id, md5((CAST(zpool.id as BYTEA) || ") + .param() + .sql("))") + .bind::(seed.clone()) + } else { + builder } + .sql("),"); + + // Find datasets which could be used for provisioning regions. + // + // We only consider datasets which are already allocated as "Crucible". + // This implicitly distinguishes between "M.2s" and "U.2s" -- Nexus needs to + // determine during dataset provisioning which devices should be considered for + // usage as Crucible storage. + // + // We select only one dataset from each zpool. + builder.sql(" + candidate_datasets AS ( + SELECT DISTINCT ON (dataset.pool_id) + dataset.id, + dataset.pool_id + FROM (dataset INNER JOIN candidate_zpools ON (dataset.pool_id = candidate_zpools.pool_id)) + WHERE ( + ((dataset.time_deleted IS NULL) AND + (dataset.size_used IS NOT NULL)) AND + (dataset.kind = 'crucible') + ) + ORDER BY dataset.pool_id, md5((CAST(dataset.id as BYTEA) || ").param().sql(")) + ),") + .bind::(seed.clone()) + // We order by md5 to shuffle the ordering of the datasets. + // md5 has a uniform output distribution so it does the job. + .sql(" + shuffled_candidate_datasets AS ( + SELECT + candidate_datasets.id, + candidate_datasets.pool_id + FROM candidate_datasets + ORDER BY md5((CAST(candidate_datasets.id as BYTEA) || ").param().sql(")) LIMIT ").param().sql(" + ),") + .bind::(seed) + .bind::(redundancy) + // Create the regions-to-be-inserted for the volume. + .sql(" + candidate_regions AS ( + SELECT + gen_random_uuid() AS id, + now() AS time_created, + now() AS time_modified, + shuffled_candidate_datasets.id AS dataset_id, + ").param().sql(" AS volume_id, + ").param().sql(" AS block_size, + ").param().sql(" AS blocks_per_extent, + ").param().sql(" AS extent_count + FROM shuffled_candidate_datasets + ),") + .bind::(volume_id) + .bind::(block_size as i64) + .bind::(blocks_per_extent as i64) + .bind::(extent_count as i64) + // A subquery which summarizes the changes we intend to make, showing: + // + // 1. Which datasets will have size adjustments + // 2. Which pools those datasets belong to + // 3. The delta in size-used + .sql(" + proposed_dataset_changes AS ( + SELECT + candidate_regions.dataset_id AS id, + dataset.pool_id AS pool_id, + ((candidate_regions.block_size * candidate_regions.blocks_per_extent) * candidate_regions.extent_count) AS size_used_delta + FROM (candidate_regions INNER JOIN dataset ON (dataset.id = candidate_regions.dataset_id)) + ),") + // Confirms whether or not the insertion and updates should + // occur. + // + // This subquery additionally exits the CTE early with an error if either: + // 1. Not enough datasets exist to provision regions with our required + // redundancy, or + // 2. Not enough space exists on zpools to perform the provisioning. + // + // We want to ensure that we do not allocate on two datasets in the same + // zpool, for two reasons + // - Data redundancy: If a drive fails it should only take one of the 3 + // regions with it + // - Risk of overallocation: We only check that each zpool as enough + // room for one region, so we should not allocate more than one region + // to it. + // + // Selecting two datasets on the same zpool will not initially be + // possible, as at the time of writing each zpool only has one dataset. + // Additionally, provide a configuration option ("distinct_sleds") to modify + // the allocation strategy to select from 3 distinct sleds, removing the + // possibility entirely. But, if we introduce a change that adds another + // crucible dataset to zpools before we improve the allocation strategy, + // this check will make sure we don't violate drive redundancy, and generate + // an error instead. + .sql(" + do_insert AS ( + SELECT ((( + ((SELECT COUNT(*) FROM old_regions LIMIT 1) < ").param().sql(") AND + CAST(IF(((SELECT COUNT(*) FROM candidate_zpools LIMIT 1) >= ").param().sql(concatcp!("), 'TRUE', '", NOT_ENOUGH_ZPOOL_SPACE_SENTINEL, "') AS BOOL)) AND + CAST(IF(((SELECT COUNT(*) FROM candidate_regions LIMIT 1) >= ")).param().sql(concatcp!("), 'TRUE', '", NOT_ENOUGH_DATASETS_SENTINEL, "') AS BOOL)) AND + CAST(IF(((SELECT COUNT(DISTINCT dataset.pool_id) FROM (candidate_regions INNER JOIN dataset ON (candidate_regions.dataset_id = dataset.id)) LIMIT 1) >= ")).param().sql(concatcp!("), 'TRUE', '", NOT_ENOUGH_UNIQUE_ZPOOLS_SENTINEL, "') AS BOOL) + ) AS insert + ),")) + .bind::(redundancy) + .bind::(redundancy) + .bind::(redundancy) + .bind::(redundancy) + .sql(" + inserted_regions AS ( + INSERT INTO region + (id, time_created, time_modified, dataset_id, volume_id, block_size, blocks_per_extent, extent_count) + SELECT ").sql(AllColumnsOfRegion::with_prefix("candidate_regions")).sql(" + FROM candidate_regions + WHERE + (SELECT do_insert.insert FROM do_insert LIMIT 1) + RETURNING ").sql(AllColumnsOfRegion::with_prefix("region")).sql(" + ), + updated_datasets AS ( + UPDATE dataset SET + size_used = (dataset.size_used + (SELECT proposed_dataset_changes.size_used_delta FROM proposed_dataset_changes WHERE (proposed_dataset_changes.id = dataset.id) LIMIT 1)) + WHERE ( + (dataset.id = ANY(SELECT proposed_dataset_changes.id FROM proposed_dataset_changes)) AND + (SELECT do_insert.insert FROM do_insert LIMIT 1)) + RETURNING ").sql(AllColumnsOfDataset::with_prefix("dataset")).sql(" + ) +( + SELECT ") + .sql(AllColumnsOfDataset::with_prefix("dataset")) + .sql(", ") + .sql(AllColumnsOfRegion::with_prefix("old_regions")).sql(" + FROM + (old_regions INNER JOIN dataset ON (old_regions.dataset_id = dataset.id)) +) +UNION +( + SELECT ") + .sql(AllColumnsOfDataset::with_prefix("updated_datasets")) + .sql(", ") + .sql(AllColumnsOfRegion::with_prefix("inserted_regions")).sql(" + FROM (inserted_regions INNER JOIN updated_datasets ON (inserted_regions.dataset_id = updated_datasets.id)) +)" + ).query() } -/// Constructs a CTE for allocating new regions, and updating the datasets to -/// which those regions belong. -#[derive(QueryId)] -pub struct RegionAllocate { - cte: Cte, -} - -impl RegionAllocate { - pub fn new( - volume_id: uuid::Uuid, - block_size: u64, - blocks_per_extent: u64, - extent_count: u64, - allocation_strategy: &RegionAllocationStrategy, - ) -> Self { - let (seed, distinct_sleds) = { - let (input_seed, distinct_sleds) = match allocation_strategy { - RegionAllocationStrategy::Random { seed } => (seed, false), - RegionAllocationStrategy::RandomWithDistinctSleds { seed } => { - (seed, true) - } - }; - ( - input_seed.map_or_else( - || { - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_nanos() - }, - |seed| seed as u128, - ), - distinct_sleds, - ) - }; - - let size_delta = block_size * blocks_per_extent * extent_count; - - let old_regions = OldRegions::new(volume_id); - - let old_pool_usage = OldPoolUsage::new(); - let candidate_zpools = CandidateZpools::new( - &old_pool_usage, - size_delta, - seed, - distinct_sleds, +#[cfg(test)] +mod test { + use super::*; + use crate::db::explain::ExplainableAsync; + use nexus_test_utils::db::test_setup_database; + use omicron_test_utils::dev; + use uuid::Uuid; + + // This test is a bit of a "change detector", but it's here to help with + // debugging too. If you change this query, it can be useful to see exactly + // how the output SQL has been altered. + #[tokio::test] + async fn expectorate_query() { + let volume_id = Uuid::nil(); + let block_size = 512; + let blocks_per_extent = 4; + let extent_count = 8; + + // First structure: "RandomWithDistinctSleds" + + let region_allocate = allocation_query( + volume_id, + block_size, + blocks_per_extent, + extent_count, + &RegionAllocationStrategy::RandomWithDistinctSleds { + seed: Some(1), + }, + ); + let s = dev::db::format_sql( + &diesel::debug_query::(®ion_allocate).to_string(), + ) + .await + .unwrap(); + expectorate::assert_contents( + "tests/output/region_allocate_distinct_sleds.sql", + &s, ); - let candidate_datasets = - CandidateDatasets::new(&candidate_zpools, seed); - - let shuffled_candidate_datasets = - ShuffledCandidateDatasets::new(&candidate_datasets, seed); + // Second structure: "Random" - let candidate_regions = CandidateRegions::new( - &shuffled_candidate_datasets, + let region_allocate = allocation_query( volume_id, block_size, blocks_per_extent, extent_count, + &RegionAllocationStrategy::Random { seed: Some(1) }, ); - let proposed_changes = ProposedChanges::new(&candidate_regions); - let do_insert = - DoInsert::new(&old_regions, &candidate_regions, &candidate_zpools); - let insert_regions = InsertRegions::new(&do_insert, &candidate_regions); - let updated_datasets = - UpdateDatasets::new(&do_insert, &proposed_changes); - - // Gather together all "(dataset, region)" rows for all regions which - // are allocated to the volume. - // - // This roughly translates to: - // - // old_regions INNER JOIN old_datasets - // UNION - // new_regions INNER JOIN updated_datasets - // - // Note that we cannot simply JOIN the old + new regions, and query for - // their associated datasets: doing so would return the pre-UPDATE - // values of datasets that are updated by this CTE. - let final_select = Box::new( - old_regions - .query_source() - .inner_join( - crate::db::schema::dataset::dsl::dataset - .on(old_regions::dataset_id - .eq(crate::db::schema::dataset::dsl::id)), - ) - .select(( - crate::db::schema::dataset::all_columns, - old_regions::all_columns, - )) - .union( - insert_regions - .query_source() - .inner_join( - updated_datasets::dsl::updated_datasets - .on(inserted_regions::dataset_id - .eq(updated_datasets::id)), - ) - .select(( - updated_datasets::all_columns, - inserted_regions::all_columns, - )), - ), + let s = dev::db::format_sql( + &diesel::debug_query::(®ion_allocate).to_string(), + ) + .await + .unwrap(); + expectorate::assert_contents( + "tests/output/region_allocate_random_sleds.sql", + &s, ); - - let cte = CteBuilder::new() - .add_subquery(old_regions) - .add_subquery(old_pool_usage) - .add_subquery(candidate_zpools) - .add_subquery(candidate_datasets) - .add_subquery(shuffled_candidate_datasets) - .add_subquery(candidate_regions) - .add_subquery(proposed_changes) - .add_subquery(do_insert) - .add_subquery(insert_regions) - .add_subquery(updated_datasets) - .build(final_select); - - Self { cte } } -} -impl QueryFragment for RegionAllocate { - fn walk_ast<'a>( - &'a self, - mut out: AstPass<'_, 'a, Pg>, - ) -> diesel::QueryResult<()> { - out.unsafe_to_cache_prepared(); + // Explain the possible forms of the SQL query to ensure that it + // creates a valid SQL string. + #[tokio::test] + async fn explainable() { + let logctx = dev::test_setup_log("explainable"); + let log = logctx.log.new(o!()); + let mut db = test_setup_database(&log).await; + let cfg = crate::db::Config { url: db.pg_config().clone() }; + let pool = crate::db::Pool::new(&logctx.log, &cfg); + let conn = pool.pool().get().await.unwrap(); + + let volume_id = Uuid::new_v4(); + let block_size = 512; + let blocks_per_extent = 4; + let extent_count = 8; + + // First structure: Explain the query with "RandomWithDistinctSleds" + + let region_allocate = allocation_query( + volume_id, + block_size, + blocks_per_extent, + extent_count, + &RegionAllocationStrategy::RandomWithDistinctSleds { seed: None }, + ); + let _ = region_allocate + .explain_async(&conn) + .await + .expect("Failed to explain query - is it valid SQL?"); - self.cte.walk_ast(out.reborrow())?; - Ok(()) - } -} + // Second structure: Explain the query with "Random" -type SelectableSql = < - >::SelectExpression as diesel::Expression ->::SqlType; + let region_allocate = allocation_query( + volume_id, + block_size, + blocks_per_extent, + extent_count, + &RegionAllocationStrategy::Random { seed: None }, + ); + let _ = region_allocate + .explain_async(&conn) + .await + .expect("Failed to explain query - is it valid SQL?"); -impl Query for RegionAllocate { - type SqlType = (SelectableSql, SelectableSql); + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } } - -impl RunQueryDsl for RegionAllocate {} diff --git a/nexus/db-queries/src/db/raw_query_builder.rs b/nexus/db-queries/src/db/raw_query_builder.rs new file mode 100644 index 0000000000..5c803e20ac --- /dev/null +++ b/nexus/db-queries/src/db/raw_query_builder.rs @@ -0,0 +1,195 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Utilities for building string-based queries. +//! +//! These largely side-step Diesel's type system, +//! and are recommended for more complex CTE + +use crate::db::pool::DbConnection; +use diesel::pg::Pg; +use diesel::query_builder::{AstPass, Query, QueryFragment, QueryId}; +use diesel::sql_types; +use diesel::RunQueryDsl; +use std::cell::Cell; +use std::marker::PhantomData; + +// Keeps a counter to "how many bind parameters have been used" to +// aid in the construction of the query string. +struct BindParamCounter(Cell); +impl BindParamCounter { + fn new() -> Self { + Self(0.into()) + } + fn next(&self) -> i32 { + self.0.set(self.0.get() + 1); + self.0.get() + } +} + +/// A "trusted" string, which can be used to construct SQL queries even +/// though it isn't static. We use "trust" to refer to "protection from +/// SQL injections". +/// +/// This is basically a workaround for cases where we haven't yet been +/// able to construct a query at compile-time. +pub struct TrustedStr(TrustedStrVariants); + +impl TrustedStr { + /// Explicitly constructs a string, with a name that hopefully + /// gives callers some pause when calling this API. + /// + /// If arbitrary user input is provided here, this string COULD + /// cause SQL injection attacks, so each call-site should have a + /// justification for "why it's safe". + pub fn i_take_responsibility_for_validating_this_string(s: String) -> Self { + Self(TrustedStrVariants::ValidatedExplicitly(s)) + } + + #[cfg(test)] + pub fn as_str(&self) -> &str { + match &self.0 { + TrustedStrVariants::Static(s) => s, + TrustedStrVariants::ValidatedExplicitly(s) => s.as_str(), + } + } +} + +impl From<&'static str> for TrustedStr { + fn from(s: &'static str) -> Self { + Self(TrustedStrVariants::Static(s)) + } +} + +// This enum should be kept non-pub to make it harder to accidentally +// construct a "ValidatedExplicitly" variant. +enum TrustedStrVariants { + Static(&'static str), + ValidatedExplicitly(String), +} + +trait SqlQueryBinds { + fn add_bind(self, bind_counter: &BindParamCounter) -> Self; +} + +impl<'a, Query> SqlQueryBinds + for diesel::query_builder::BoxedSqlQuery<'a, Pg, Query> +{ + fn add_bind(self, bind_counter: &BindParamCounter) -> Self { + self.sql("$").sql(bind_counter.next().to_string()) + } +} + +type BoxedQuery = diesel::query_builder::BoxedSqlQuery< + 'static, + Pg, + diesel::query_builder::SqlQuery, +>; + +/// A small wrapper around [diesel::query_builder::BoxedSqlQuery] which +/// assists with counting bind parameters and recommends avoiding the usage of +/// any non-static strings in query construction. +// NOTE: I'd really like to eventually be able to construct SQL statements +// entirely at compile-time, but the combination of "const generics" and "const +// fns" in stable Rust just isn't there yet. +// +// It's definitely possible to create static string builders that operate +// entirely at compile-time, like: +// https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=26d0276648c3315f285372a19d0d492f +// +// But this relies on nightly features. +pub struct QueryBuilder { + query: BoxedQuery, + bind_counter: BindParamCounter, +} + +impl QueryBuilder { + pub fn new() -> Self { + Self { + query: diesel::sql_query("").into_boxed(), + bind_counter: BindParamCounter::new(), + } + } + + /// Identifies that a bind parameter should exist in this location within + /// the SQL string. + /// + /// This should be called the same number of times as [Self::bind]. It is, + /// however, a distinct method, as "identifying bind params" should be + /// decoupled from "using bind parameters" to have an efficient statement + /// cache. + pub fn param(self) -> Self { + Self { + query: self + .query + .sql("$") + .sql(self.bind_counter.next().to_string()), + bind_counter: self.bind_counter, + } + } + + /// Slightly more strict than the "sql" method of Diesel's SqlQuery. + /// Only permits strings which have been validated intentionally to limit + /// susceptibility to SQL injection. + /// + /// See the documentation of [TrustedStr] for more details. + pub fn sql>(self, s: S) -> Self { + let query = match s.into().0 { + TrustedStrVariants::Static(s) => self.query.sql(s), + TrustedStrVariants::ValidatedExplicitly(s) => self.query.sql(s), + }; + Self { query, bind_counter: self.bind_counter } + } + + /// A call-through function to [diesel::query_builder::BoxedSqlQuery]. + pub fn bind(self, b: Value) -> Self + where + Pg: sql_types::HasSqlType, + Value: diesel::serialize::ToSql + Send + 'static, + BindSt: Send + 'static, + { + Self { query: self.query.bind(b), bind_counter: self.bind_counter } + } + + /// Takes the final boxed query + pub fn query(self) -> TypedSqlQuery { + TypedSqlQuery { inner: self.query, _phantom: PhantomData } + } +} + +/// Diesel's [diesel::query_builder::BoxedSqlQuery] has a few drawbacks that +/// make this wrapper more palatable: +/// +/// - It always implements "Query" with SqlType = Untyped, so a caller could try to +/// execute this query and get back any type. +/// - It forces the usage of "QueryableByName", which acts wrong if we're +/// returning multiple columns with the same name (this is normal! If you want +/// to UNION two objects that both have "id" columns, this happens). +#[derive(QueryId)] +pub struct TypedSqlQuery { + inner: diesel::query_builder::BoxedSqlQuery< + 'static, + Pg, + diesel::query_builder::SqlQuery, + >, + _phantom: PhantomData, +} + +impl QueryFragment for TypedSqlQuery { + fn walk_ast<'a>( + &'a self, + mut out: AstPass<'_, 'a, Pg>, + ) -> diesel::QueryResult<()> { + out.unsafe_to_cache_prepared(); + + self.inner.walk_ast(out.reborrow())?; + Ok(()) + } +} + +impl RunQueryDsl for TypedSqlQuery {} + +impl Query for TypedSqlQuery { + type SqlType = T; +} diff --git a/nexus/db-queries/tests/output/region_allocate_distinct_sleds.sql b/nexus/db-queries/tests/output/region_allocate_distinct_sleds.sql new file mode 100644 index 0000000000..7aa85458a6 --- /dev/null +++ b/nexus/db-queries/tests/output/region_allocate_distinct_sleds.sql @@ -0,0 +1,267 @@ +WITH + old_regions + AS ( + SELECT + region.id, + region.time_created, + region.time_modified, + region.dataset_id, + region.volume_id, + region.block_size, + region.blocks_per_extent, + region.extent_count + FROM + region + WHERE + region.volume_id = $1 + ), + old_zpool_usage + AS ( + SELECT + dataset.pool_id, sum(dataset.size_used) AS size_used + FROM + dataset + WHERE + (dataset.size_used IS NOT NULL) AND (dataset.time_deleted IS NULL) + GROUP BY + dataset.pool_id + ), + candidate_zpools + AS ( + SELECT + DISTINCT ON (zpool.sled_id) old_zpool_usage.pool_id + FROM + old_zpool_usage + INNER JOIN (zpool INNER JOIN sled ON zpool.sled_id = sled.id) ON + zpool.id = old_zpool_usage.pool_id + WHERE + (old_zpool_usage.size_used + $2) + <= ( + SELECT + total_size + FROM + omicron.public.inv_zpool + WHERE + inv_zpool.id = old_zpool_usage.pool_id + ORDER BY + inv_zpool.time_collected DESC + LIMIT + 1 + ) + AND sled.sled_policy = 'in_service' + AND sled.sled_state = 'active' + ORDER BY + zpool.sled_id, md5(CAST(zpool.id AS BYTES) || $3) + ), + candidate_datasets + AS ( + SELECT + DISTINCT ON (dataset.pool_id) dataset.id, dataset.pool_id + FROM + dataset INNER JOIN candidate_zpools ON dataset.pool_id = candidate_zpools.pool_id + WHERE + ((dataset.time_deleted IS NULL) AND (dataset.size_used IS NOT NULL)) + AND dataset.kind = 'crucible' + ORDER BY + dataset.pool_id, md5(CAST(dataset.id AS BYTES) || $4) + ), + shuffled_candidate_datasets + AS ( + SELECT + candidate_datasets.id, candidate_datasets.pool_id + FROM + candidate_datasets + ORDER BY + md5(CAST(candidate_datasets.id AS BYTES) || $5) + LIMIT + $6 + ), + candidate_regions + AS ( + SELECT + gen_random_uuid() AS id, + now() AS time_created, + now() AS time_modified, + shuffled_candidate_datasets.id AS dataset_id, + $7 AS volume_id, + $8 AS block_size, + $9 AS blocks_per_extent, + $10 AS extent_count + FROM + shuffled_candidate_datasets + ), + proposed_dataset_changes + AS ( + SELECT + candidate_regions.dataset_id AS id, + dataset.pool_id AS pool_id, + candidate_regions.block_size + * candidate_regions.blocks_per_extent + * candidate_regions.extent_count + AS size_used_delta + FROM + candidate_regions INNER JOIN dataset ON dataset.id = candidate_regions.dataset_id + ), + do_insert + AS ( + SELECT + ( + ( + (SELECT count(*) FROM old_regions LIMIT 1) < $11 + AND CAST( + IF( + ((SELECT count(*) FROM candidate_zpools LIMIT 1) >= $12), + 'TRUE', + 'Not enough space' + ) + AS BOOL + ) + ) + AND CAST( + IF( + ((SELECT count(*) FROM candidate_regions LIMIT 1) >= $13), + 'TRUE', + 'Not enough datasets' + ) + AS BOOL + ) + ) + AND CAST( + IF( + ( + ( + SELECT + count(DISTINCT dataset.pool_id) + FROM + candidate_regions + INNER JOIN dataset ON candidate_regions.dataset_id = dataset.id + LIMIT + 1 + ) + >= $14 + ), + 'TRUE', + 'Not enough unique zpools selected' + ) + AS BOOL + ) + AS insert + ), + inserted_regions + AS ( + INSERT + INTO + region + ( + id, + time_created, + time_modified, + dataset_id, + volume_id, + block_size, + blocks_per_extent, + extent_count + ) + SELECT + candidate_regions.id, + candidate_regions.time_created, + candidate_regions.time_modified, + candidate_regions.dataset_id, + candidate_regions.volume_id, + candidate_regions.block_size, + candidate_regions.blocks_per_extent, + candidate_regions.extent_count + FROM + candidate_regions + WHERE + (SELECT do_insert.insert FROM do_insert LIMIT 1) + RETURNING + region.id, + region.time_created, + region.time_modified, + region.dataset_id, + region.volume_id, + region.block_size, + region.blocks_per_extent, + region.extent_count + ), + updated_datasets + AS ( + UPDATE + dataset + SET + size_used + = dataset.size_used + + ( + SELECT + proposed_dataset_changes.size_used_delta + FROM + proposed_dataset_changes + WHERE + proposed_dataset_changes.id = dataset.id + LIMIT + 1 + ) + WHERE + dataset.id = ANY (SELECT proposed_dataset_changes.id FROM proposed_dataset_changes) + AND (SELECT do_insert.insert FROM do_insert LIMIT 1) + RETURNING + dataset.id, + dataset.time_created, + dataset.time_modified, + dataset.time_deleted, + dataset.rcgen, + dataset.pool_id, + dataset.ip, + dataset.port, + dataset.kind, + dataset.size_used + ) +( + SELECT + dataset.id, + dataset.time_created, + dataset.time_modified, + dataset.time_deleted, + dataset.rcgen, + dataset.pool_id, + dataset.ip, + dataset.port, + dataset.kind, + dataset.size_used, + old_regions.id, + old_regions.time_created, + old_regions.time_modified, + old_regions.dataset_id, + old_regions.volume_id, + old_regions.block_size, + old_regions.blocks_per_extent, + old_regions.extent_count + FROM + old_regions INNER JOIN dataset ON old_regions.dataset_id = dataset.id +) +UNION + ( + SELECT + updated_datasets.id, + updated_datasets.time_created, + updated_datasets.time_modified, + updated_datasets.time_deleted, + updated_datasets.rcgen, + updated_datasets.pool_id, + updated_datasets.ip, + updated_datasets.port, + updated_datasets.kind, + updated_datasets.size_used, + inserted_regions.id, + inserted_regions.time_created, + inserted_regions.time_modified, + inserted_regions.dataset_id, + inserted_regions.volume_id, + inserted_regions.block_size, + inserted_regions.blocks_per_extent, + inserted_regions.extent_count + FROM + inserted_regions + INNER JOIN updated_datasets ON inserted_regions.dataset_id = updated_datasets.id + ) diff --git a/nexus/db-queries/tests/output/region_allocate_random_sleds.sql b/nexus/db-queries/tests/output/region_allocate_random_sleds.sql new file mode 100644 index 0000000000..0918c8f2d1 --- /dev/null +++ b/nexus/db-queries/tests/output/region_allocate_random_sleds.sql @@ -0,0 +1,265 @@ +WITH + old_regions + AS ( + SELECT + region.id, + region.time_created, + region.time_modified, + region.dataset_id, + region.volume_id, + region.block_size, + region.blocks_per_extent, + region.extent_count + FROM + region + WHERE + region.volume_id = $1 + ), + old_zpool_usage + AS ( + SELECT + dataset.pool_id, sum(dataset.size_used) AS size_used + FROM + dataset + WHERE + (dataset.size_used IS NOT NULL) AND (dataset.time_deleted IS NULL) + GROUP BY + dataset.pool_id + ), + candidate_zpools + AS ( + SELECT + old_zpool_usage.pool_id + FROM + old_zpool_usage + INNER JOIN (zpool INNER JOIN sled ON zpool.sled_id = sled.id) ON + zpool.id = old_zpool_usage.pool_id + WHERE + (old_zpool_usage.size_used + $2) + <= ( + SELECT + total_size + FROM + omicron.public.inv_zpool + WHERE + inv_zpool.id = old_zpool_usage.pool_id + ORDER BY + inv_zpool.time_collected DESC + LIMIT + 1 + ) + AND sled.sled_policy = 'in_service' + AND sled.sled_state = 'active' + ), + candidate_datasets + AS ( + SELECT + DISTINCT ON (dataset.pool_id) dataset.id, dataset.pool_id + FROM + dataset INNER JOIN candidate_zpools ON dataset.pool_id = candidate_zpools.pool_id + WHERE + ((dataset.time_deleted IS NULL) AND (dataset.size_used IS NOT NULL)) + AND dataset.kind = 'crucible' + ORDER BY + dataset.pool_id, md5(CAST(dataset.id AS BYTES) || $3) + ), + shuffled_candidate_datasets + AS ( + SELECT + candidate_datasets.id, candidate_datasets.pool_id + FROM + candidate_datasets + ORDER BY + md5(CAST(candidate_datasets.id AS BYTES) || $4) + LIMIT + $5 + ), + candidate_regions + AS ( + SELECT + gen_random_uuid() AS id, + now() AS time_created, + now() AS time_modified, + shuffled_candidate_datasets.id AS dataset_id, + $6 AS volume_id, + $7 AS block_size, + $8 AS blocks_per_extent, + $9 AS extent_count + FROM + shuffled_candidate_datasets + ), + proposed_dataset_changes + AS ( + SELECT + candidate_regions.dataset_id AS id, + dataset.pool_id AS pool_id, + candidate_regions.block_size + * candidate_regions.blocks_per_extent + * candidate_regions.extent_count + AS size_used_delta + FROM + candidate_regions INNER JOIN dataset ON dataset.id = candidate_regions.dataset_id + ), + do_insert + AS ( + SELECT + ( + ( + (SELECT count(*) FROM old_regions LIMIT 1) < $10 + AND CAST( + IF( + ((SELECT count(*) FROM candidate_zpools LIMIT 1) >= $11), + 'TRUE', + 'Not enough space' + ) + AS BOOL + ) + ) + AND CAST( + IF( + ((SELECT count(*) FROM candidate_regions LIMIT 1) >= $12), + 'TRUE', + 'Not enough datasets' + ) + AS BOOL + ) + ) + AND CAST( + IF( + ( + ( + SELECT + count(DISTINCT dataset.pool_id) + FROM + candidate_regions + INNER JOIN dataset ON candidate_regions.dataset_id = dataset.id + LIMIT + 1 + ) + >= $13 + ), + 'TRUE', + 'Not enough unique zpools selected' + ) + AS BOOL + ) + AS insert + ), + inserted_regions + AS ( + INSERT + INTO + region + ( + id, + time_created, + time_modified, + dataset_id, + volume_id, + block_size, + blocks_per_extent, + extent_count + ) + SELECT + candidate_regions.id, + candidate_regions.time_created, + candidate_regions.time_modified, + candidate_regions.dataset_id, + candidate_regions.volume_id, + candidate_regions.block_size, + candidate_regions.blocks_per_extent, + candidate_regions.extent_count + FROM + candidate_regions + WHERE + (SELECT do_insert.insert FROM do_insert LIMIT 1) + RETURNING + region.id, + region.time_created, + region.time_modified, + region.dataset_id, + region.volume_id, + region.block_size, + region.blocks_per_extent, + region.extent_count + ), + updated_datasets + AS ( + UPDATE + dataset + SET + size_used + = dataset.size_used + + ( + SELECT + proposed_dataset_changes.size_used_delta + FROM + proposed_dataset_changes + WHERE + proposed_dataset_changes.id = dataset.id + LIMIT + 1 + ) + WHERE + dataset.id = ANY (SELECT proposed_dataset_changes.id FROM proposed_dataset_changes) + AND (SELECT do_insert.insert FROM do_insert LIMIT 1) + RETURNING + dataset.id, + dataset.time_created, + dataset.time_modified, + dataset.time_deleted, + dataset.rcgen, + dataset.pool_id, + dataset.ip, + dataset.port, + dataset.kind, + dataset.size_used + ) +( + SELECT + dataset.id, + dataset.time_created, + dataset.time_modified, + dataset.time_deleted, + dataset.rcgen, + dataset.pool_id, + dataset.ip, + dataset.port, + dataset.kind, + dataset.size_used, + old_regions.id, + old_regions.time_created, + old_regions.time_modified, + old_regions.dataset_id, + old_regions.volume_id, + old_regions.block_size, + old_regions.blocks_per_extent, + old_regions.extent_count + FROM + old_regions INNER JOIN dataset ON old_regions.dataset_id = dataset.id +) +UNION + ( + SELECT + updated_datasets.id, + updated_datasets.time_created, + updated_datasets.time_modified, + updated_datasets.time_deleted, + updated_datasets.rcgen, + updated_datasets.pool_id, + updated_datasets.ip, + updated_datasets.port, + updated_datasets.kind, + updated_datasets.size_used, + inserted_regions.id, + inserted_regions.time_created, + inserted_regions.time_modified, + inserted_regions.dataset_id, + inserted_regions.volume_id, + inserted_regions.block_size, + inserted_regions.blocks_per_extent, + inserted_regions.extent_count + FROM + inserted_regions + INNER JOIN updated_datasets ON inserted_regions.dataset_id = updated_datasets.id + ) diff --git a/nexus/src/app/sagas/snapshot_create.rs b/nexus/src/app/sagas/snapshot_create.rs index f1d1a2bd02..290868aae2 100644 --- a/nexus/src/app/sagas/snapshot_create.rs +++ b/nexus/src/app/sagas/snapshot_create.rs @@ -1175,7 +1175,7 @@ async fn ssc_start_running_snapshot( ); let snapshot_id = sagactx.lookup::("snapshot_id")?; - info!(log, "starting running snapshot for {snapshot_id}"); + info!(log, "starting running snapshot"; "snapshot_id" => %snapshot_id); let (.., disk) = LookupPath::new(&opctx, &osagactx.datastore()) .disk_id(params.disk_id) @@ -1198,7 +1198,13 @@ async fn ssc_start_running_snapshot( let url = format!("http://{}", dataset.address()); let client = CrucibleAgentClient::new(&url); - info!(log, "dataset {:?} region {:?} url {}", dataset, region, url); + info!( + log, + "contacting crucible agent to confirm region exists"; + "dataset" => ?dataset, + "region" => ?region, + "url" => url, + ); // Validate with the Crucible agent that the snapshot exists let crucible_region = retry_until_known_result(log, || async { @@ -1208,7 +1214,11 @@ async fn ssc_start_running_snapshot( .map_err(|e| e.to_string()) .map_err(ActionError::action_failed)?; - info!(log, "crucible region {:?}", crucible_region); + info!( + log, + "confirmed the region exists with crucible agent"; + "crucible region" => ?crucible_region + ); let crucible_snapshot = retry_until_known_result(log, || async { client @@ -1222,7 +1232,11 @@ async fn ssc_start_running_snapshot( .map_err(|e| e.to_string()) .map_err(ActionError::action_failed)?; - info!(log, "crucible snapshot {:?}", crucible_snapshot); + info!( + log, + "successfully accessed crucible snapshot"; + "crucible snapshot" => ?crucible_snapshot + ); // Start the snapshot running let crucible_running_snapshot = @@ -1238,7 +1252,11 @@ async fn ssc_start_running_snapshot( .map_err(|e| e.to_string()) .map_err(ActionError::action_failed)?; - info!(log, "crucible running snapshot {:?}", crucible_running_snapshot); + info!( + log, + "successfully started running region snapshot"; + "crucible running snapshot" => ?crucible_running_snapshot + ); // Map from the region to the snapshot let region_addr = format!( diff --git a/test-utils/src/dev/db.rs b/test-utils/src/dev/db.rs index c148a60e1c..d8b15520a4 100644 --- a/test-utils/src/dev/db.rs +++ b/test-utils/src/dev/db.rs @@ -21,6 +21,7 @@ use std::time::Duration; use tempfile::tempdir; use tempfile::TempDir; use thiserror::Error; +use tokio::io::AsyncWriteExt; use tokio_postgres::config::Host; use tokio_postgres::config::SslMode; @@ -497,6 +498,15 @@ pub enum CockroachStartError { )] TimedOut { pid: u32, time_waited: Duration }, + #[error("failed to write input to cockroachdb")] + FailedToWrite(#[source] std::io::Error), + + #[error("failed to await cockroachdb completing")] + FailedToWait(#[source] std::io::Error), + + #[error("Invalid cockroachdb output")] + InvalidOutput(#[from] std::string::FromUtf8Error), + #[error("unknown error waiting for cockroach to start")] Unknown { #[source] @@ -653,6 +663,45 @@ impl Drop for CockroachInstance { } } +/// Uses cockroachdb to run the "sqlfmt" command. +pub async fn format_sql(input: &str) -> Result { + let mut cmd = tokio::process::Command::new(COCKROACHDB_BIN); + let mut child = cmd + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .args(&[ + "sqlfmt", + "--tab-width", + "2", + "--use-spaces", + "true", + "--print-width", + "100", + ]) + .spawn() + .map_err(|source| CockroachStartError::BadCmd { + cmd: COCKROACHDB_BIN.to_string(), + source, + })?; + let stdin = child.stdin.as_mut().unwrap(); + stdin + .write_all(input.as_bytes()) + .await + .map_err(CockroachStartError::FailedToWrite)?; + let output = child + .wait_with_output() + .await + .map_err(CockroachStartError::FailedToWait)?; + + if !output.status.success() { + return Err(CockroachStartError::Exited { + exit_code: output.status.code().unwrap_or_else(|| -1), + }); + } + + Ok(String::from_utf8(output.stdout)?) +} + /// Verify that CockroachDB has the correct version pub async fn check_db_version() -> Result<(), CockroachStartError> { let mut cmd = tokio::process::Command::new(COCKROACHDB_BIN);