From 7b29090576a985c87b0c17487395ea7804394d3c Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Wed, 27 Mar 2024 09:54:07 -0700 Subject: [PATCH] [db-queries] Decouples CTE usage from Diesel (RegionAllocation) (#5063) Diesel's complex type have been a pain point in the region allocation CTE in particular: - It relies on several JOINs, which requires explicitly adding Diesel macros to make the type system happy - With ongoing work by @jmpesp , it would require additional `alias!` calls to allow a table to appear in a `SELECT` clause in multiple spots - Generally, the disconnect between "what SQL do I want to write" and "what invocations will make Diesel happy" has been "not really worth it" in this space. This PR does the following: - It relies heavily on https://docs.diesel.rs/master/diesel/query_builder/struct.SqlQuery.html , which is Diesel's "you do whatever you want" query type. Although this is still using Diesel, this usage is actually pretty aligned with other simpler DB interfaces - other crates (see: [tokio_postgres](https://docs.rs/tokio-postgres/latest/tokio_postgres/struct.Client.html#method.query)) take arguments like "a String + list of bind parameters", in some form. - It adds support in `raw_query_builder.rs` for a wrapper around Diesel's `SqlQuery` object, to make SQL injections less possible and to track bind parameter counting. - It fully converts the `RegionAllocation` CTE to use this builder, interleaved with raw SQL. - Since a large portion of the CTE was rote "repeated columns", I also added a function, accessible as `AllColumnsOf::with_prefix(&'static str)`, to enumerate all the columns of a table as strings. - I also added a simple `EXPLAIN` test to the CTE, to quickly validate that CockroachDB thinks it's producing valid output. Here are my thoughts for future improvements: - [ ] I spent a while trying to make the "query construction" a compile-time operation. I think that this is possible with nightly features. I think this is extremely difficult with stable rust. However, I think this is a great direction for future work, as statically-known queries would be easier to cache and validate. - [ ] I'd like to encapsulate more type information about the constructed query, as an "input/output" object. Right now, we're relying on existing integration tests for validation, but it seems possible to send these "example" queries (like the ones I'm using in my `EXPLAIN` tests) to ask CockroachDB to validate type information for us. - [ ] I want to make this format as digestible as possible. If there's anything I can do to make this easier to read, write, and validate, I'm totally on-board. I have been debating adding macro support for SQL formatting the raw strings, but I'm on the fence about whether or not that would make interleaved code harder to parse by humans. - As a follow-up: I'm auto-formatting the output of these queries in the EXPECTORATE-d output files --- Cargo.lock | 21 + Cargo.toml | 1 + nexus/db-model/src/queries/mod.rs | 1 - .../db-model/src/queries/region_allocation.rs | 195 ---- nexus/db-queries/Cargo.toml | 1 + nexus/db-queries/src/db/cast_uuid_as_bytea.rs | 62 -- nexus/db-queries/src/db/column_walker.rs | 27 +- nexus/db-queries/src/db/datastore/mod.rs | 11 + nexus/db-queries/src/db/datastore/region.rs | 8 +- nexus/db-queries/src/db/mod.rs | 2 +- .../src/db/queries/region_allocation.rs | 941 ++++++------------ nexus/db-queries/src/db/raw_query_builder.rs | 195 ++++ .../output/region_allocate_distinct_sleds.sql | 267 +++++ .../output/region_allocate_random_sleds.sql | 265 +++++ nexus/src/app/sagas/snapshot_create.rs | 28 +- test-utils/src/dev/db.rs | 49 + 16 files changed, 1198 insertions(+), 876 deletions(-) delete mode 100644 nexus/db-model/src/queries/region_allocation.rs delete mode 100644 nexus/db-queries/src/db/cast_uuid_as_bytea.rs create mode 100644 nexus/db-queries/src/db/raw_query_builder.rs create mode 100644 nexus/db-queries/tests/output/region_allocate_distinct_sleds.sql create mode 100644 nexus/db-queries/tests/output/region_allocate_random_sleds.sql diff --git a/Cargo.lock b/Cargo.lock index 8188a1feb7..0421ec6653 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1147,6 +1147,26 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "const_format" +version = "0.2.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3a214c7af3d04997541b18d432afaff4c455e79e2029079647e72fc2bd27673" +dependencies = [ + "const_format_proc_macros", +] + +[[package]] +name = "const_format_proc_macros" +version = "0.2.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7f6ff08fd20f4f299298a28e2dfa8a8ba1036e6cd2460ac1de7b425d76f2500" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + [[package]] name = "constant_time_eq" version = "0.2.6" @@ -4590,6 +4610,7 @@ dependencies = [ "camino", "camino-tempfile", "chrono", + "const_format", "cookie 0.18.0", "db-macros", "diesel", diff --git a/Cargo.toml b/Cargo.toml index 018941f081..6546100e3f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -188,6 +188,7 @@ cfg-if = "1.0" chrono = { version = "0.4", features = [ "serde" ] } clap = { version = "4.5", features = ["cargo", "derive", "env", "wrap_help"] } colored = "2.1" +const_format = "0.2.32" cookie = "0.18" criterion = { version = "0.5.1", features = [ "async_tokio" ] } crossbeam = "0.8" diff --git a/nexus/db-model/src/queries/mod.rs b/nexus/db-model/src/queries/mod.rs index 7724d48bab..e138508f84 100644 --- a/nexus/db-model/src/queries/mod.rs +++ b/nexus/db-model/src/queries/mod.rs @@ -4,5 +4,4 @@ //! Subqueries used in CTEs. -pub mod region_allocation; pub mod virtual_provisioning_collection_update; diff --git a/nexus/db-model/src/queries/region_allocation.rs b/nexus/db-model/src/queries/region_allocation.rs deleted file mode 100644 index a1b9e0373a..0000000000 --- a/nexus/db-model/src/queries/region_allocation.rs +++ /dev/null @@ -1,195 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. - -//! Describes subqueries which may be issues as a part of CTEs. -//! -//! When possible, it's preferable to define subqueries close to their -//! usage. However, certain Diesel traits (such as those enabling joins) -//! require the table structures to be defined in the same crate. - -// TODO: We're currently piggy-backing on the table macro for convenience. -// We actually do not want to generate an entire table for each subquery - we'd -// like to have a query source (which we can use to generate SELECT statements, -// JOIN, etc), but we don't want this to be an INSERT/UPDATE/DELETE target. -// -// Similarly, we don't want to force callers to supply a "primary key". -// -// I've looked into Diesel's `alias!` macro for this purpose, but unfortunately -// that implementation is too opinionated about the output QueryFragment. -// It expects to use the form: -// -// " as ", which is actually the opposite of what we want in -// a CTE (where we want the alias name to come first). - -use crate::schema::dataset; -use crate::schema::sled; -use crate::schema::zpool; - -table! { - old_regions { - id -> Uuid, - time_created -> Timestamptz, - time_modified -> Timestamptz, - - dataset_id -> Uuid, - volume_id -> Uuid, - - block_size -> Int8, - blocks_per_extent -> Int8, - extent_count -> Int8, - } -} - -table! { - candidate_datasets { - id -> Uuid, - pool_id -> Uuid, - } -} - -table! { - shuffled_candidate_datasets { - id -> Uuid, - pool_id -> Uuid, - } -} - -table! { - candidate_regions { - id -> Uuid, - time_created -> Timestamptz, - time_modified -> Timestamptz, - - dataset_id -> Uuid, - volume_id -> Uuid, - - block_size -> Int8, - blocks_per_extent -> Int8, - extent_count -> Int8, - } -} - -table! { - proposed_dataset_changes { - id -> Uuid, - pool_id -> Uuid, - size_used_delta -> Int8, - } -} - -table! { - old_zpool_usage (pool_id) { - pool_id -> Uuid, - size_used -> Numeric, - } -} - -table! { - candidate_zpools (pool_id) { - pool_id -> Uuid - } -} - -table! { - do_insert (insert) { - insert -> Bool, - } -} - -table! { - one_zpool_per_sled (pool_id) { - pool_id -> Uuid - } -} - -table! { - one_dataset_per_zpool { - id -> Uuid, - pool_id -> Uuid - } -} - -table! { - inserted_regions { - id -> Uuid, - time_created -> Timestamptz, - time_modified -> Timestamptz, - - dataset_id -> Uuid, - volume_id -> Uuid, - - block_size -> Int8, - blocks_per_extent -> Int8, - extent_count -> Int8, - } -} - -table! { - updated_datasets (id) { - id -> Uuid, - time_created -> Timestamptz, - time_modified -> Timestamptz, - time_deleted -> Nullable, - rcgen -> Int8, - - pool_id -> Uuid, - - ip -> Inet, - port -> Int4, - - kind -> crate::DatasetKindEnum, - size_used -> Nullable, - } -} - -diesel::allow_tables_to_appear_in_same_query!( - proposed_dataset_changes, - dataset, -); - -diesel::allow_tables_to_appear_in_same_query!( - do_insert, - candidate_regions, - dataset, - zpool, -); - -diesel::allow_tables_to_appear_in_same_query!( - old_zpool_usage, - zpool, - sled, - proposed_dataset_changes, -); - -diesel::allow_tables_to_appear_in_same_query!(old_regions, dataset,); -diesel::allow_tables_to_appear_in_same_query!(old_regions, zpool,); - -diesel::allow_tables_to_appear_in_same_query!( - inserted_regions, - updated_datasets, -); - -diesel::allow_tables_to_appear_in_same_query!(candidate_zpools, dataset,); -diesel::allow_tables_to_appear_in_same_query!(candidate_zpools, zpool,); -diesel::allow_tables_to_appear_in_same_query!(candidate_datasets, dataset); - -// == Needed for random region allocation == - -pub mod cockroach_md5 { - pub mod functions { - use diesel::sql_types::*; - diesel::sql_function!(fn md5(x: Bytea) -> Bytea); - } - - pub mod helper_types { - pub type Md5 = super::functions::md5::HelperType; - } - - pub mod dsl { - pub use super::functions::*; - pub use super::helper_types::*; - } -} - -// == End random region allocation dependencies == diff --git a/nexus/db-queries/Cargo.toml b/nexus/db-queries/Cargo.toml index 595280780e..5f99b904fc 100644 --- a/nexus/db-queries/Cargo.toml +++ b/nexus/db-queries/Cargo.toml @@ -15,6 +15,7 @@ base64.workspace = true bb8.workspace = true camino.workspace = true chrono.workspace = true +const_format.workspace = true cookie.workspace = true diesel.workspace = true diesel-dtrace.workspace = true diff --git a/nexus/db-queries/src/db/cast_uuid_as_bytea.rs b/nexus/db-queries/src/db/cast_uuid_as_bytea.rs deleted file mode 100644 index c50c88971f..0000000000 --- a/nexus/db-queries/src/db/cast_uuid_as_bytea.rs +++ /dev/null @@ -1,62 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. - -//! Cast UUID to BYTES - -use diesel::expression::ValidGrouping; -use diesel::pg::Pg; -use diesel::query_builder::AstPass; -use diesel::query_builder::QueryFragment; -use diesel::query_builder::QueryId; -use diesel::Expression; -use diesel::SelectableExpression; - -/// Cast an expression which evaluates to a Uuid and cast it to a Bytea. It's -/// that simple! -#[derive(ValidGrouping, QueryId)] -pub struct CastUuidToBytea { - expression: E, -} - -impl CastUuidToBytea -where - E: Expression, -{ - pub const fn new(expression: E) -> Self { - Self { expression } - } -} - -impl Expression for CastUuidToBytea -where - E: Expression, -{ - type SqlType = diesel::sql_types::Bytea; -} - -impl diesel::AppearsOnTable for CastUuidToBytea where - E: diesel::AppearsOnTable -{ -} - -impl SelectableExpression for CastUuidToBytea where - E: SelectableExpression -{ -} - -impl QueryFragment for CastUuidToBytea -where - E: QueryFragment, -{ - fn walk_ast<'a>( - &'a self, - mut out: AstPass<'_, 'a, Pg>, - ) -> diesel::QueryResult<()> { - out.push_sql("CAST("); - self.expression.walk_ast(out.reborrow())?; - out.push_sql(" as BYTEA)"); - - Ok(()) - } -} diff --git a/nexus/db-queries/src/db/column_walker.rs b/nexus/db-queries/src/db/column_walker.rs index 64c3b450c8..cace2ba5fb 100644 --- a/nexus/db-queries/src/db/column_walker.rs +++ b/nexus/db-queries/src/db/column_walker.rs @@ -4,6 +4,7 @@ //! CTE utility for iterating over all columns in a table. +use crate::db::raw_query_builder::TrustedStr; use diesel::prelude::*; use std::marker::PhantomData; @@ -17,14 +18,30 @@ pub(crate) struct ColumnWalker { remaining: PhantomData, } +pub type AllColumnsOf = ColumnWalker<::AllColumns>; + impl ColumnWalker { - pub fn new() -> Self { + pub const fn new() -> Self { Self { remaining: PhantomData } } } macro_rules! impl_column_walker { ( $len:literal $($column:ident)+ ) => ( + #[allow(dead_code)] + impl<$($column: Column),+> ColumnWalker<($($column,)+)> { + pub fn with_prefix(prefix: &'static str) -> TrustedStr { + // This string is derived from: + // - The "table" type, with associated columns, which + // are not controlled by an arbitrary user, and + // - The "prefix" type, which is a "&'static str" (AKA, + // hopefully known at compile-time, and not leaked). + TrustedStr::i_take_responsibility_for_validating_this_string( + [$([prefix, $column::NAME].join("."),)+].join(", ") + ) + } + } + impl<$($column: Column),+> IntoIterator for ColumnWalker<($($column,)+)> { type Item = &'static str; type IntoIter = std::array::IntoIter; @@ -109,4 +126,12 @@ mod test { assert_eq!(iter.next(), Some("value")); assert_eq!(iter.next(), None); } + + #[test] + fn test_all_columns_with_prefix() { + assert_eq!( + AllColumnsOf::::with_prefix("foo").as_str(), + "foo.id, foo.value, foo.time_deleted" + ); + } } diff --git a/nexus/db-queries/src/db/datastore/mod.rs b/nexus/db-queries/src/db/datastore/mod.rs index 2a3f969183..0020cf99b3 100644 --- a/nexus/db-queries/src/db/datastore/mod.rs +++ b/nexus/db-queries/src/db/datastore/mod.rs @@ -949,10 +949,21 @@ mod test { assert_eq!(expected_region_count, dataset_and_regions.len()); let mut disk_datasets = HashSet::new(); let mut disk_zpools = HashSet::new(); + let mut regions = HashSet::new(); for (dataset, region) in dataset_and_regions { // Must be 3 unique datasets assert!(disk_datasets.insert(dataset.id())); + // All regions should be unique + assert!(regions.insert(region.id())); + + // Check there's no cross contamination between returned UUIDs + // + // This is a little goofy, but it catches a bug that has + // happened before. The returned columns share names (like + // "id"), so we need to process them in-order. + assert!(regions.get(&dataset.id()).is_none()); + assert!(disk_datasets.get(®ion.id()).is_none()); // Dataset must not be eligible for provisioning. if let Some(kind) = diff --git a/nexus/db-queries/src/db/datastore/region.rs b/nexus/db-queries/src/db/datastore/region.rs index 52e0ce4d88..ad89a9ca93 100644 --- a/nexus/db-queries/src/db/datastore/region.rs +++ b/nexus/db-queries/src/db/datastore/region.rs @@ -128,7 +128,7 @@ impl DataStore { let (blocks_per_extent, extent_count) = Self::get_crucible_allocation(&block_size, size); - let query = crate::db::queries::region_allocation::RegionAllocate::new( + let query = crate::db::queries::region_allocation::allocation_query( volume_id, block_size.to_bytes() as u64, blocks_per_extent, @@ -141,6 +141,12 @@ impl DataStore { crate::db::queries::region_allocation::from_diesel(e) })?; + info!( + self.log, + "Allocated regions for volume"; + "volume_id" => %volume_id, + "datasets_and_regions" => ?dataset_and_regions, + ); Ok(dataset_and_regions) } diff --git a/nexus/db-queries/src/db/mod.rs b/nexus/db-queries/src/db/mod.rs index d5262166ee..9b3d71970c 100644 --- a/nexus/db-queries/src/db/mod.rs +++ b/nexus/db-queries/src/db/mod.rs @@ -5,7 +5,6 @@ //! Facilities for working with the Omicron database pub(crate) mod alias; -pub(crate) mod cast_uuid_as_bytea; // This is not intended to be public, but this is necessary to use it from // doctests pub mod collection_attach; @@ -29,6 +28,7 @@ mod pool_connection; // This is marked public because the error types are used elsewhere, e.g., in // sagas. pub mod queries; +mod raw_query_builder; mod saga_recovery; mod sec_store; pub mod subquery; diff --git a/nexus/db-queries/src/db/queries/region_allocation.rs b/nexus/db-queries/src/db/queries/region_allocation.rs index a657d21c97..2e4f4cd776 100644 --- a/nexus/db-queries/src/db/queries/region_allocation.rs +++ b/nexus/db-queries/src/db/queries/region_allocation.rs @@ -4,35 +4,22 @@ //! Implementation of queries for provisioning regions. -use crate::db::alias::ExpressionAlias; -use crate::db::cast_uuid_as_bytea::CastUuidToBytea; +use crate::db::column_walker::AllColumnsOf; use crate::db::datastore::REGION_REDUNDANCY_THRESHOLD; -use crate::db::model::{Dataset, DatasetKind, Region}; -use crate::db::pool::DbConnection; -use crate::db::subquery::{AsQuerySource, Cte, CteBuilder, CteQuery}; -use crate::db::true_or_cast_error::{matches_sentinel, TrueOrCastError}; -use db_macros::Subquery; +use crate::db::model::{Dataset, Region}; +use crate::db::raw_query_builder::{QueryBuilder, TypedSqlQuery}; +use crate::db::schema; +use crate::db::true_or_cast_error::matches_sentinel; +use const_format::concatcp; use diesel::pg::Pg; -use diesel::query_builder::{AstPass, Query, QueryFragment, QueryId}; use diesel::result::Error as DieselError; -use diesel::PgBinaryExpressionMethods; -use diesel::{ - sql_types, BoolExpressionMethods, CombineDsl, ExpressionMethods, - Insertable, IntoSql, JoinOnDsl, NullableExpressionMethods, QueryDsl, - RunQueryDsl, -}; +use diesel::sql_types; use nexus_config::RegionAllocationStrategy; -use nexus_db_model::queries::region_allocation::{ - candidate_datasets, candidate_regions, candidate_zpools, cockroach_md5, - do_insert, inserted_regions, old_regions, old_zpool_usage, - proposed_dataset_changes, shuffled_candidate_datasets, updated_datasets, -}; -use nexus_db_model::schema; -use nexus_db_model::to_db_sled_policy; -use nexus_db_model::SledState; -use nexus_types::external_api::views::SledPolicy; use omicron_common::api::external; +type AllColumnsOfRegion = AllColumnsOf; +type AllColumnsOfDataset = AllColumnsOf; + const NOT_ENOUGH_DATASETS_SENTINEL: &'static str = "Not enough datasets"; const NOT_ENOUGH_ZPOOL_SPACE_SENTINEL: &'static str = "Not enough space"; const NOT_ENOUGH_UNIQUE_ZPOOLS_SENTINEL: &'static str = @@ -77,611 +64,345 @@ pub fn from_diesel(e: DieselError) -> external::Error { error::public_error_from_diesel(e, error::ErrorHandler::Server) } -/// A subquery to find all old regions associated with a particular volume. -#[derive(Subquery, QueryId)] -#[subquery(name = old_regions)] -struct OldRegions { - query: Box>, -} - -impl OldRegions { - fn new(volume_id: uuid::Uuid) -> Self { - use crate::db::schema::region::dsl; - Self { - query: Box::new(dsl::region.filter(dsl::volume_id.eq(volume_id))), - } - } -} - -/// A subquery to find datasets which could be used for provisioning regions. -/// -/// We only consider datasets which are already allocated as "Crucible". -/// This implicitly distinguishes between "M.2s" and "U.2s" -- Nexus needs to -/// determine during dataset provisioning which devices should be considered for -/// usage as Crucible storage. -/// -/// We select only one dataset from each zpool. -#[derive(Subquery, QueryId)] -#[subquery(name = candidate_datasets)] -struct CandidateDatasets { - query: Box>, -} - -impl CandidateDatasets { - fn new(candidate_zpools: &CandidateZpools, seed: u128) -> Self { - use crate::db::schema::dataset::dsl as dataset_dsl; - use candidate_zpools::dsl as candidate_zpool_dsl; - - let seed_bytes = seed.to_le_bytes(); - - let query: Box> = - Box::new( - dataset_dsl::dataset - .inner_join(candidate_zpools.query_source().on( - dataset_dsl::pool_id.eq(candidate_zpool_dsl::pool_id), - )) - .filter(dataset_dsl::time_deleted.is_null()) - .filter(dataset_dsl::size_used.is_not_null()) - .filter(dataset_dsl::kind.eq(DatasetKind::Crucible)) - .distinct_on(dataset_dsl::pool_id) - .order_by(( - dataset_dsl::pool_id, - cockroach_md5::dsl::md5( - CastUuidToBytea::new(dataset_dsl::id) - .concat(seed_bytes.to_vec()), - ), - )) - .select((dataset_dsl::id, dataset_dsl::pool_id)), - ); - Self { query } - } -} - -/// Shuffle the candidate datasets, and select REGION_REDUNDANCY_THRESHOLD -/// regions from it. -#[derive(Subquery, QueryId)] -#[subquery(name = shuffled_candidate_datasets)] -struct ShuffledCandidateDatasets { - query: Box>, -} - -impl ShuffledCandidateDatasets { - fn new(candidate_datasets: &CandidateDatasets, seed: u128) -> Self { - use candidate_datasets::dsl as candidate_datasets_dsl; - - let seed_bytes = seed.to_le_bytes(); - - let query: Box> = - Box::new( - candidate_datasets - .query_source() - // We order by md5 to shuffle the ordering of the datasets. - // md5 has a uniform output distribution so it does the job. - .order(cockroach_md5::dsl::md5( - CastUuidToBytea::new(candidate_datasets_dsl::id) - .concat(seed_bytes.to_vec()), - )) - .select(( - candidate_datasets_dsl::id, - candidate_datasets_dsl::pool_id, - )) - .limit(REGION_REDUNDANCY_THRESHOLD.try_into().unwrap()), - ); - Self { query } - } -} - -/// A subquery to create the regions-to-be-inserted for the volume. -#[derive(Subquery, QueryId)] -#[subquery(name = candidate_regions)] -struct CandidateRegions { - query: Box>, -} - -diesel::sql_function!(fn gen_random_uuid() -> Uuid); -diesel::sql_function!(fn now() -> Timestamptz); - -impl CandidateRegions { - fn new( - shuffled_candidate_datasets: &ShuffledCandidateDatasets, - volume_id: uuid::Uuid, - block_size: u64, - blocks_per_extent: u64, - extent_count: u64, - ) -> Self { - use schema::region; - use shuffled_candidate_datasets::dsl as shuffled_candidate_datasets_dsl; - - let volume_id = volume_id.into_sql::(); - let block_size = (block_size as i64).into_sql::(); - let blocks_per_extent = - (blocks_per_extent as i64).into_sql::(); - let extent_count = - (extent_count as i64).into_sql::(); - Self { - query: Box::new(shuffled_candidate_datasets.query_source().select( - ( - ExpressionAlias::new::(gen_random_uuid()), - ExpressionAlias::new::(now()), - ExpressionAlias::new::(now()), - ExpressionAlias::new::( - shuffled_candidate_datasets_dsl::id, - ), - ExpressionAlias::new::(volume_id), - ExpressionAlias::new::(block_size), - ExpressionAlias::new::( - blocks_per_extent, - ), - ExpressionAlias::new::(extent_count), - ), - )), - } - } -} - -/// A subquery which summarizes the changes we intend to make, showing: -/// -/// 1. Which datasets will have size adjustments -/// 2. Which pools those datasets belong to -/// 3. The delta in size-used -#[derive(Subquery, QueryId)] -#[subquery(name = proposed_dataset_changes)] -struct ProposedChanges { - query: Box>, -} - -impl ProposedChanges { - fn new(candidate_regions: &CandidateRegions) -> Self { - use crate::db::schema::dataset::dsl as dataset_dsl; - use candidate_regions::dsl as candidate_regions_dsl; - Self { - query: Box::new( - candidate_regions.query_source() - .inner_join( - dataset_dsl::dataset.on(dataset_dsl::id.eq(candidate_regions_dsl::dataset_id)) - ) - .select(( - ExpressionAlias::new::(candidate_regions_dsl::dataset_id), - ExpressionAlias::new::(dataset_dsl::pool_id), - ExpressionAlias::new::( - candidate_regions_dsl::block_size * - candidate_regions_dsl::blocks_per_extent * - candidate_regions_dsl::extent_count - ), - )) - ), - } - } -} - -/// A subquery which calculates the old size being used by zpools -/// under consideration as targets for region allocation. -#[derive(Subquery, QueryId)] -#[subquery(name = old_zpool_usage)] -struct OldPoolUsage { - query: Box>, -} - -impl OldPoolUsage { - fn new() -> Self { - use crate::db::schema::dataset::dsl as dataset_dsl; - Self { - query: Box::new( - dataset_dsl::dataset - .group_by(dataset_dsl::pool_id) - .filter(dataset_dsl::size_used.is_not_null()) - .filter(dataset_dsl::time_deleted.is_null()) - .select(( - dataset_dsl::pool_id, - ExpressionAlias::new::( - diesel::dsl::sum(dataset_dsl::size_used) - .assume_not_null(), - ), - )), - ), - } - } -} - -/// A subquery which identifies zpools with enough space for a region allocation. -#[derive(Subquery, QueryId)] -#[subquery(name = candidate_zpools)] -struct CandidateZpools { - query: Box>, -} - -impl CandidateZpools { - fn new( - old_zpool_usage: &OldPoolUsage, - zpool_size_delta: u64, - seed: u128, - distinct_sleds: bool, - ) -> Self { - use schema::sled::dsl as sled_dsl; - use schema::zpool::dsl as zpool_dsl; - - // Why are we using raw `diesel::dsl::sql` here? - // - // When SQL performs the "SUM" operation on "bigint" type, the result - // is promoted to "numeric" (see: old_zpool_usage::dsl::size_used). - // - // However, we'd like to compare that value with a different value - // (zpool_dsl::total_size) which is still a "bigint". This comparison - // is safe (after all, we basically want to promote "total_size" to a - // Numeric too) but Diesel demands that the input and output SQL types - // of expression methods like ".le" match exactly. - // - // For similar reasons, we use `diesel::dsl::sql` with zpool_size_delta. - // We would like to add it, but diesel only permits us to `to_sql()` it - // into a BigInt, not a Numeric. I welcome a better solution. - let it_will_fit = (old_zpool_usage::dsl::size_used - + diesel::dsl::sql(&zpool_size_delta.to_string())) - .le(diesel::dsl::sql( - "(SELECT total_size FROM omicron.public.inv_zpool WHERE - inv_zpool.id = old_zpool_usage.pool_id - ORDER BY inv_zpool.time_collected DESC LIMIT 1)", - )); - - // We need to join on the sled table to access provision_state. - let with_sled = sled_dsl::sled.on(zpool_dsl::sled_id.eq(sled_dsl::id)); - let with_zpool = zpool_dsl::zpool - .on(zpool_dsl::id.eq(old_zpool_usage::dsl::pool_id)) - .inner_join(with_sled); - - let sled_is_provisionable = sled_dsl::sled_policy - .eq(to_db_sled_policy(SledPolicy::provisionable())); - let sled_is_active = sled_dsl::sled_state.eq(SledState::Active); - - let base_query = old_zpool_usage - .query_source() - .inner_join(with_zpool) - .filter(it_will_fit) - .filter(sled_is_provisionable) - .filter(sled_is_active) - .select((old_zpool_usage::dsl::pool_id,)); - - let query = if distinct_sleds { - let seed_bytes = seed.to_le_bytes(); - - let query: Box> = - Box::new( - base_query - .order_by(( - zpool_dsl::sled_id, - cockroach_md5::dsl::md5( - CastUuidToBytea::new(zpool_dsl::id) - .concat(seed_bytes.to_vec()), - ), - )) - .distinct_on(zpool_dsl::sled_id), - ); - - query - } else { - let query: Box> = - Box::new(base_query); +type SelectableSql = < + >::SelectExpression as diesel::Expression +>::SqlType; - query +pub fn allocation_query( + volume_id: uuid::Uuid, + block_size: u64, + blocks_per_extent: u64, + extent_count: u64, + allocation_strategy: &RegionAllocationStrategy, +) -> TypedSqlQuery<(SelectableSql, SelectableSql)> { + let (seed, distinct_sleds) = { + let (input_seed, distinct_sleds) = match allocation_strategy { + RegionAllocationStrategy::Random { seed } => (seed, false), + RegionAllocationStrategy::RandomWithDistinctSleds { seed } => { + (seed, true) + } }; - - Self { query } - } -} - -diesel::sql_function! { - #[aggregate] - fn bool_and(b: sql_types::Bool) -> sql_types::Bool; -} - -/// A subquery which confirms whether or not the insertion and updates should -/// occur. -/// -/// This subquery additionally exits the CTE early with an error if either: -/// 1. Not enough datasets exist to provision regions with our required -/// redundancy, or -/// 2. Not enough space exists on zpools to perform the provisioning. -#[derive(Subquery, QueryId)] -#[subquery(name = do_insert)] -struct DoInsert { - query: Box>, -} - -impl DoInsert { - fn new( - old_regions: &OldRegions, - candidate_regions: &CandidateRegions, - candidate_zpools: &CandidateZpools, - ) -> Self { - let redundancy = REGION_REDUNDANCY_THRESHOLD as i64; - let not_allocated_yet = old_regions - .query_source() - .count() - .single_value() - .assume_not_null() - .lt(redundancy); - - let enough_candidate_zpools = candidate_zpools - .query_source() - .count() - .single_value() - .assume_not_null() - .ge(redundancy); - - let enough_candidate_regions = candidate_regions - .query_source() - .count() - .single_value() - .assume_not_null() - .ge(redundancy); - - // We want to ensure that we do not allocate on two datasets in the same - // zpool, for two reasons - // - Data redundancy: If a drive fails it should only take one of the 3 - // regions with it - // - Risk of overallocation: We only check that each zpool as enough - // room for one region, so we should not allocate more than one region - // to it. - // - // Selecting two datasets on the same zpool will not initially be - // possible, as at the time of writing each zpool only has one dataset. - // Additionally, we intend to modify the allocation strategy to select - // from 3 distinct sleds, removing the possibility entirely. But, if we - // introduce a change that adds another crucible dataset to zpools - // before we improve the allocation strategy, this check will make sure - // we don't violate drive redundancy, and generate an error instead. - use crate::db::schema::dataset::dsl as dataset_dsl; - use candidate_regions::dsl as candidate_dsl; - let enough_unique_candidate_zpools = candidate_regions - .query_source() - .inner_join( - dataset_dsl::dataset - .on(candidate_dsl::dataset_id.eq(dataset_dsl::id)), - ) - .select(diesel::dsl::count_distinct(dataset_dsl::pool_id)) - .single_value() - .assume_not_null() - .ge(redundancy); - - Self { - query: Box::new(diesel::select((ExpressionAlias::new::< - do_insert::insert, - >( - not_allocated_yet - .and(TrueOrCastError::new( - enough_candidate_zpools, - NOT_ENOUGH_ZPOOL_SPACE_SENTINEL, - )) - .and(TrueOrCastError::new( - enough_candidate_regions, - NOT_ENOUGH_DATASETS_SENTINEL, - )) - .and(TrueOrCastError::new( - enough_unique_candidate_zpools, - NOT_ENOUGH_UNIQUE_ZPOOLS_SENTINEL, - )), - ),))), - } - } -} - -/// A subquery which actually inserts the regions. -#[derive(Subquery, QueryId)] -#[subquery(name = inserted_regions)] -struct InsertRegions { - query: Box>, -} - -impl InsertRegions { - fn new(do_insert: &DoInsert, candidate_regions: &CandidateRegions) -> Self { - use crate::db::schema::region; - - Self { - query: Box::new( - candidate_regions - .query_source() - .select(candidate_regions::all_columns) - .filter( - do_insert - .query_source() - .select(do_insert::insert) - .single_value() - .assume_not_null(), - ) - .insert_into(region::table) - .returning(region::all_columns), + ( + input_seed.map_or_else( + || { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() + }, + |seed| seed as u128, ), - } - } -} - -/// A subquery which updates dataset size usage based on inserted regions. -#[derive(Subquery, QueryId)] -#[subquery(name = updated_datasets)] -struct UpdateDatasets { - query: Box>, -} - -impl UpdateDatasets { - fn new( - do_insert: &DoInsert, - proposed_dataset_changes: &ProposedChanges, - ) -> Self { - use crate::db::schema::dataset::dsl as dataset_dsl; - - let datasets_with_updates = proposed_dataset_changes - .query_source() - .select(proposed_dataset_changes::columns::id) - .into_boxed(); - - Self { - query: Box::new( - diesel::update( - dataset_dsl::dataset.filter( - dataset_dsl::id.eq_any(datasets_with_updates) - ) - ) - .filter( - do_insert.query_source() - .select(do_insert::insert) - .single_value() - .assume_not_null() - ) - .set( - dataset_dsl::size_used.eq( - dataset_dsl::size_used + proposed_dataset_changes.query_source() - .filter(proposed_dataset_changes::columns::id.eq(dataset_dsl::id)) - .select(proposed_dataset_changes::columns::size_used_delta) - .single_value() - ) - ) - .returning(crate::db::schema::dataset::all_columns) - ) - } + distinct_sleds, + ) + }; + + let seed = seed.to_le_bytes().to_vec(); + + let size_delta = block_size * blocks_per_extent * extent_count; + let redundancy: i64 = i64::try_from(REGION_REDUNDANCY_THRESHOLD).unwrap(); + + let builder = QueryBuilder::new().sql( + // Find all old regions associated with a particular volume +"WITH + old_regions AS ( + SELECT ").sql(AllColumnsOfRegion::with_prefix("region")).sql(" + FROM region WHERE (region.volume_id = ").param().sql(")),") + .bind::(volume_id) + + // Calculates the old size being used by zpools under consideration as targets for region + // allocation. + .sql(" + old_zpool_usage AS ( + SELECT + dataset.pool_id, + sum(dataset.size_used) AS size_used + FROM dataset WHERE ((dataset.size_used IS NOT NULL) AND (dataset.time_deleted IS NULL)) GROUP BY dataset.pool_id),") + .sql(" + candidate_zpools AS ("); + + // Identifies zpools with enough space for region allocation. + // + // NOTE: 'distinct_sleds' changes the format of the underlying SQL query, as it uses + // distinct bind parameters depending on the conditional branch. + let builder = if distinct_sleds { + builder.sql("SELECT DISTINCT ON (zpool.sled_id) ") + } else { + builder.sql("SELECT ") + }; + let builder = builder.sql(" + old_zpool_usage.pool_id + FROM ( + old_zpool_usage + INNER JOIN + (zpool INNER JOIN sled ON (zpool.sled_id = sled.id)) ON (zpool.id = old_zpool_usage.pool_id) + ) + WHERE ( + ((old_zpool_usage.size_used + ").param().sql(" ) <= + (SELECT total_size FROM omicron.public.inv_zpool WHERE + inv_zpool.id = old_zpool_usage.pool_id + ORDER BY inv_zpool.time_collected DESC LIMIT 1) + ) + AND + (sled.sled_policy = 'in_service') + AND + (sled.sled_state = 'active') + )" + ).bind::(size_delta as i64); + + let builder = if distinct_sleds { + builder + .sql("ORDER BY zpool.sled_id, md5((CAST(zpool.id as BYTEA) || ") + .param() + .sql("))") + .bind::(seed.clone()) + } else { + builder } + .sql("),"); + + // Find datasets which could be used for provisioning regions. + // + // We only consider datasets which are already allocated as "Crucible". + // This implicitly distinguishes between "M.2s" and "U.2s" -- Nexus needs to + // determine during dataset provisioning which devices should be considered for + // usage as Crucible storage. + // + // We select only one dataset from each zpool. + builder.sql(" + candidate_datasets AS ( + SELECT DISTINCT ON (dataset.pool_id) + dataset.id, + dataset.pool_id + FROM (dataset INNER JOIN candidate_zpools ON (dataset.pool_id = candidate_zpools.pool_id)) + WHERE ( + ((dataset.time_deleted IS NULL) AND + (dataset.size_used IS NOT NULL)) AND + (dataset.kind = 'crucible') + ) + ORDER BY dataset.pool_id, md5((CAST(dataset.id as BYTEA) || ").param().sql(")) + ),") + .bind::(seed.clone()) + // We order by md5 to shuffle the ordering of the datasets. + // md5 has a uniform output distribution so it does the job. + .sql(" + shuffled_candidate_datasets AS ( + SELECT + candidate_datasets.id, + candidate_datasets.pool_id + FROM candidate_datasets + ORDER BY md5((CAST(candidate_datasets.id as BYTEA) || ").param().sql(")) LIMIT ").param().sql(" + ),") + .bind::(seed) + .bind::(redundancy) + // Create the regions-to-be-inserted for the volume. + .sql(" + candidate_regions AS ( + SELECT + gen_random_uuid() AS id, + now() AS time_created, + now() AS time_modified, + shuffled_candidate_datasets.id AS dataset_id, + ").param().sql(" AS volume_id, + ").param().sql(" AS block_size, + ").param().sql(" AS blocks_per_extent, + ").param().sql(" AS extent_count + FROM shuffled_candidate_datasets + ),") + .bind::(volume_id) + .bind::(block_size as i64) + .bind::(blocks_per_extent as i64) + .bind::(extent_count as i64) + // A subquery which summarizes the changes we intend to make, showing: + // + // 1. Which datasets will have size adjustments + // 2. Which pools those datasets belong to + // 3. The delta in size-used + .sql(" + proposed_dataset_changes AS ( + SELECT + candidate_regions.dataset_id AS id, + dataset.pool_id AS pool_id, + ((candidate_regions.block_size * candidate_regions.blocks_per_extent) * candidate_regions.extent_count) AS size_used_delta + FROM (candidate_regions INNER JOIN dataset ON (dataset.id = candidate_regions.dataset_id)) + ),") + // Confirms whether or not the insertion and updates should + // occur. + // + // This subquery additionally exits the CTE early with an error if either: + // 1. Not enough datasets exist to provision regions with our required + // redundancy, or + // 2. Not enough space exists on zpools to perform the provisioning. + // + // We want to ensure that we do not allocate on two datasets in the same + // zpool, for two reasons + // - Data redundancy: If a drive fails it should only take one of the 3 + // regions with it + // - Risk of overallocation: We only check that each zpool as enough + // room for one region, so we should not allocate more than one region + // to it. + // + // Selecting two datasets on the same zpool will not initially be + // possible, as at the time of writing each zpool only has one dataset. + // Additionally, provide a configuration option ("distinct_sleds") to modify + // the allocation strategy to select from 3 distinct sleds, removing the + // possibility entirely. But, if we introduce a change that adds another + // crucible dataset to zpools before we improve the allocation strategy, + // this check will make sure we don't violate drive redundancy, and generate + // an error instead. + .sql(" + do_insert AS ( + SELECT ((( + ((SELECT COUNT(*) FROM old_regions LIMIT 1) < ").param().sql(") AND + CAST(IF(((SELECT COUNT(*) FROM candidate_zpools LIMIT 1) >= ").param().sql(concatcp!("), 'TRUE', '", NOT_ENOUGH_ZPOOL_SPACE_SENTINEL, "') AS BOOL)) AND + CAST(IF(((SELECT COUNT(*) FROM candidate_regions LIMIT 1) >= ")).param().sql(concatcp!("), 'TRUE', '", NOT_ENOUGH_DATASETS_SENTINEL, "') AS BOOL)) AND + CAST(IF(((SELECT COUNT(DISTINCT dataset.pool_id) FROM (candidate_regions INNER JOIN dataset ON (candidate_regions.dataset_id = dataset.id)) LIMIT 1) >= ")).param().sql(concatcp!("), 'TRUE', '", NOT_ENOUGH_UNIQUE_ZPOOLS_SENTINEL, "') AS BOOL) + ) AS insert + ),")) + .bind::(redundancy) + .bind::(redundancy) + .bind::(redundancy) + .bind::(redundancy) + .sql(" + inserted_regions AS ( + INSERT INTO region + (id, time_created, time_modified, dataset_id, volume_id, block_size, blocks_per_extent, extent_count) + SELECT ").sql(AllColumnsOfRegion::with_prefix("candidate_regions")).sql(" + FROM candidate_regions + WHERE + (SELECT do_insert.insert FROM do_insert LIMIT 1) + RETURNING ").sql(AllColumnsOfRegion::with_prefix("region")).sql(" + ), + updated_datasets AS ( + UPDATE dataset SET + size_used = (dataset.size_used + (SELECT proposed_dataset_changes.size_used_delta FROM proposed_dataset_changes WHERE (proposed_dataset_changes.id = dataset.id) LIMIT 1)) + WHERE ( + (dataset.id = ANY(SELECT proposed_dataset_changes.id FROM proposed_dataset_changes)) AND + (SELECT do_insert.insert FROM do_insert LIMIT 1)) + RETURNING ").sql(AllColumnsOfDataset::with_prefix("dataset")).sql(" + ) +( + SELECT ") + .sql(AllColumnsOfDataset::with_prefix("dataset")) + .sql(", ") + .sql(AllColumnsOfRegion::with_prefix("old_regions")).sql(" + FROM + (old_regions INNER JOIN dataset ON (old_regions.dataset_id = dataset.id)) +) +UNION +( + SELECT ") + .sql(AllColumnsOfDataset::with_prefix("updated_datasets")) + .sql(", ") + .sql(AllColumnsOfRegion::with_prefix("inserted_regions")).sql(" + FROM (inserted_regions INNER JOIN updated_datasets ON (inserted_regions.dataset_id = updated_datasets.id)) +)" + ).query() } -/// Constructs a CTE for allocating new regions, and updating the datasets to -/// which those regions belong. -#[derive(QueryId)] -pub struct RegionAllocate { - cte: Cte, -} - -impl RegionAllocate { - pub fn new( - volume_id: uuid::Uuid, - block_size: u64, - blocks_per_extent: u64, - extent_count: u64, - allocation_strategy: &RegionAllocationStrategy, - ) -> Self { - let (seed, distinct_sleds) = { - let (input_seed, distinct_sleds) = match allocation_strategy { - RegionAllocationStrategy::Random { seed } => (seed, false), - RegionAllocationStrategy::RandomWithDistinctSleds { seed } => { - (seed, true) - } - }; - ( - input_seed.map_or_else( - || { - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_nanos() - }, - |seed| seed as u128, - ), - distinct_sleds, - ) - }; - - let size_delta = block_size * blocks_per_extent * extent_count; - - let old_regions = OldRegions::new(volume_id); - - let old_pool_usage = OldPoolUsage::new(); - let candidate_zpools = CandidateZpools::new( - &old_pool_usage, - size_delta, - seed, - distinct_sleds, +#[cfg(test)] +mod test { + use super::*; + use crate::db::explain::ExplainableAsync; + use nexus_test_utils::db::test_setup_database; + use omicron_test_utils::dev; + use uuid::Uuid; + + // This test is a bit of a "change detector", but it's here to help with + // debugging too. If you change this query, it can be useful to see exactly + // how the output SQL has been altered. + #[tokio::test] + async fn expectorate_query() { + let volume_id = Uuid::nil(); + let block_size = 512; + let blocks_per_extent = 4; + let extent_count = 8; + + // First structure: "RandomWithDistinctSleds" + + let region_allocate = allocation_query( + volume_id, + block_size, + blocks_per_extent, + extent_count, + &RegionAllocationStrategy::RandomWithDistinctSleds { + seed: Some(1), + }, + ); + let s = dev::db::format_sql( + &diesel::debug_query::(®ion_allocate).to_string(), + ) + .await + .unwrap(); + expectorate::assert_contents( + "tests/output/region_allocate_distinct_sleds.sql", + &s, ); - let candidate_datasets = - CandidateDatasets::new(&candidate_zpools, seed); - - let shuffled_candidate_datasets = - ShuffledCandidateDatasets::new(&candidate_datasets, seed); + // Second structure: "Random" - let candidate_regions = CandidateRegions::new( - &shuffled_candidate_datasets, + let region_allocate = allocation_query( volume_id, block_size, blocks_per_extent, extent_count, + &RegionAllocationStrategy::Random { seed: Some(1) }, ); - let proposed_changes = ProposedChanges::new(&candidate_regions); - let do_insert = - DoInsert::new(&old_regions, &candidate_regions, &candidate_zpools); - let insert_regions = InsertRegions::new(&do_insert, &candidate_regions); - let updated_datasets = - UpdateDatasets::new(&do_insert, &proposed_changes); - - // Gather together all "(dataset, region)" rows for all regions which - // are allocated to the volume. - // - // This roughly translates to: - // - // old_regions INNER JOIN old_datasets - // UNION - // new_regions INNER JOIN updated_datasets - // - // Note that we cannot simply JOIN the old + new regions, and query for - // their associated datasets: doing so would return the pre-UPDATE - // values of datasets that are updated by this CTE. - let final_select = Box::new( - old_regions - .query_source() - .inner_join( - crate::db::schema::dataset::dsl::dataset - .on(old_regions::dataset_id - .eq(crate::db::schema::dataset::dsl::id)), - ) - .select(( - crate::db::schema::dataset::all_columns, - old_regions::all_columns, - )) - .union( - insert_regions - .query_source() - .inner_join( - updated_datasets::dsl::updated_datasets - .on(inserted_regions::dataset_id - .eq(updated_datasets::id)), - ) - .select(( - updated_datasets::all_columns, - inserted_regions::all_columns, - )), - ), + let s = dev::db::format_sql( + &diesel::debug_query::(®ion_allocate).to_string(), + ) + .await + .unwrap(); + expectorate::assert_contents( + "tests/output/region_allocate_random_sleds.sql", + &s, ); - - let cte = CteBuilder::new() - .add_subquery(old_regions) - .add_subquery(old_pool_usage) - .add_subquery(candidate_zpools) - .add_subquery(candidate_datasets) - .add_subquery(shuffled_candidate_datasets) - .add_subquery(candidate_regions) - .add_subquery(proposed_changes) - .add_subquery(do_insert) - .add_subquery(insert_regions) - .add_subquery(updated_datasets) - .build(final_select); - - Self { cte } } -} -impl QueryFragment for RegionAllocate { - fn walk_ast<'a>( - &'a self, - mut out: AstPass<'_, 'a, Pg>, - ) -> diesel::QueryResult<()> { - out.unsafe_to_cache_prepared(); + // Explain the possible forms of the SQL query to ensure that it + // creates a valid SQL string. + #[tokio::test] + async fn explainable() { + let logctx = dev::test_setup_log("explainable"); + let log = logctx.log.new(o!()); + let mut db = test_setup_database(&log).await; + let cfg = crate::db::Config { url: db.pg_config().clone() }; + let pool = crate::db::Pool::new(&logctx.log, &cfg); + let conn = pool.pool().get().await.unwrap(); + + let volume_id = Uuid::new_v4(); + let block_size = 512; + let blocks_per_extent = 4; + let extent_count = 8; + + // First structure: Explain the query with "RandomWithDistinctSleds" + + let region_allocate = allocation_query( + volume_id, + block_size, + blocks_per_extent, + extent_count, + &RegionAllocationStrategy::RandomWithDistinctSleds { seed: None }, + ); + let _ = region_allocate + .explain_async(&conn) + .await + .expect("Failed to explain query - is it valid SQL?"); - self.cte.walk_ast(out.reborrow())?; - Ok(()) - } -} + // Second structure: Explain the query with "Random" -type SelectableSql = < - >::SelectExpression as diesel::Expression ->::SqlType; + let region_allocate = allocation_query( + volume_id, + block_size, + blocks_per_extent, + extent_count, + &RegionAllocationStrategy::Random { seed: None }, + ); + let _ = region_allocate + .explain_async(&conn) + .await + .expect("Failed to explain query - is it valid SQL?"); -impl Query for RegionAllocate { - type SqlType = (SelectableSql, SelectableSql); + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } } - -impl RunQueryDsl for RegionAllocate {} diff --git a/nexus/db-queries/src/db/raw_query_builder.rs b/nexus/db-queries/src/db/raw_query_builder.rs new file mode 100644 index 0000000000..5c803e20ac --- /dev/null +++ b/nexus/db-queries/src/db/raw_query_builder.rs @@ -0,0 +1,195 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Utilities for building string-based queries. +//! +//! These largely side-step Diesel's type system, +//! and are recommended for more complex CTE + +use crate::db::pool::DbConnection; +use diesel::pg::Pg; +use diesel::query_builder::{AstPass, Query, QueryFragment, QueryId}; +use diesel::sql_types; +use diesel::RunQueryDsl; +use std::cell::Cell; +use std::marker::PhantomData; + +// Keeps a counter to "how many bind parameters have been used" to +// aid in the construction of the query string. +struct BindParamCounter(Cell); +impl BindParamCounter { + fn new() -> Self { + Self(0.into()) + } + fn next(&self) -> i32 { + self.0.set(self.0.get() + 1); + self.0.get() + } +} + +/// A "trusted" string, which can be used to construct SQL queries even +/// though it isn't static. We use "trust" to refer to "protection from +/// SQL injections". +/// +/// This is basically a workaround for cases where we haven't yet been +/// able to construct a query at compile-time. +pub struct TrustedStr(TrustedStrVariants); + +impl TrustedStr { + /// Explicitly constructs a string, with a name that hopefully + /// gives callers some pause when calling this API. + /// + /// If arbitrary user input is provided here, this string COULD + /// cause SQL injection attacks, so each call-site should have a + /// justification for "why it's safe". + pub fn i_take_responsibility_for_validating_this_string(s: String) -> Self { + Self(TrustedStrVariants::ValidatedExplicitly(s)) + } + + #[cfg(test)] + pub fn as_str(&self) -> &str { + match &self.0 { + TrustedStrVariants::Static(s) => s, + TrustedStrVariants::ValidatedExplicitly(s) => s.as_str(), + } + } +} + +impl From<&'static str> for TrustedStr { + fn from(s: &'static str) -> Self { + Self(TrustedStrVariants::Static(s)) + } +} + +// This enum should be kept non-pub to make it harder to accidentally +// construct a "ValidatedExplicitly" variant. +enum TrustedStrVariants { + Static(&'static str), + ValidatedExplicitly(String), +} + +trait SqlQueryBinds { + fn add_bind(self, bind_counter: &BindParamCounter) -> Self; +} + +impl<'a, Query> SqlQueryBinds + for diesel::query_builder::BoxedSqlQuery<'a, Pg, Query> +{ + fn add_bind(self, bind_counter: &BindParamCounter) -> Self { + self.sql("$").sql(bind_counter.next().to_string()) + } +} + +type BoxedQuery = diesel::query_builder::BoxedSqlQuery< + 'static, + Pg, + diesel::query_builder::SqlQuery, +>; + +/// A small wrapper around [diesel::query_builder::BoxedSqlQuery] which +/// assists with counting bind parameters and recommends avoiding the usage of +/// any non-static strings in query construction. +// NOTE: I'd really like to eventually be able to construct SQL statements +// entirely at compile-time, but the combination of "const generics" and "const +// fns" in stable Rust just isn't there yet. +// +// It's definitely possible to create static string builders that operate +// entirely at compile-time, like: +// https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=26d0276648c3315f285372a19d0d492f +// +// But this relies on nightly features. +pub struct QueryBuilder { + query: BoxedQuery, + bind_counter: BindParamCounter, +} + +impl QueryBuilder { + pub fn new() -> Self { + Self { + query: diesel::sql_query("").into_boxed(), + bind_counter: BindParamCounter::new(), + } + } + + /// Identifies that a bind parameter should exist in this location within + /// the SQL string. + /// + /// This should be called the same number of times as [Self::bind]. It is, + /// however, a distinct method, as "identifying bind params" should be + /// decoupled from "using bind parameters" to have an efficient statement + /// cache. + pub fn param(self) -> Self { + Self { + query: self + .query + .sql("$") + .sql(self.bind_counter.next().to_string()), + bind_counter: self.bind_counter, + } + } + + /// Slightly more strict than the "sql" method of Diesel's SqlQuery. + /// Only permits strings which have been validated intentionally to limit + /// susceptibility to SQL injection. + /// + /// See the documentation of [TrustedStr] for more details. + pub fn sql>(self, s: S) -> Self { + let query = match s.into().0 { + TrustedStrVariants::Static(s) => self.query.sql(s), + TrustedStrVariants::ValidatedExplicitly(s) => self.query.sql(s), + }; + Self { query, bind_counter: self.bind_counter } + } + + /// A call-through function to [diesel::query_builder::BoxedSqlQuery]. + pub fn bind(self, b: Value) -> Self + where + Pg: sql_types::HasSqlType, + Value: diesel::serialize::ToSql + Send + 'static, + BindSt: Send + 'static, + { + Self { query: self.query.bind(b), bind_counter: self.bind_counter } + } + + /// Takes the final boxed query + pub fn query(self) -> TypedSqlQuery { + TypedSqlQuery { inner: self.query, _phantom: PhantomData } + } +} + +/// Diesel's [diesel::query_builder::BoxedSqlQuery] has a few drawbacks that +/// make this wrapper more palatable: +/// +/// - It always implements "Query" with SqlType = Untyped, so a caller could try to +/// execute this query and get back any type. +/// - It forces the usage of "QueryableByName", which acts wrong if we're +/// returning multiple columns with the same name (this is normal! If you want +/// to UNION two objects that both have "id" columns, this happens). +#[derive(QueryId)] +pub struct TypedSqlQuery { + inner: diesel::query_builder::BoxedSqlQuery< + 'static, + Pg, + diesel::query_builder::SqlQuery, + >, + _phantom: PhantomData, +} + +impl QueryFragment for TypedSqlQuery { + fn walk_ast<'a>( + &'a self, + mut out: AstPass<'_, 'a, Pg>, + ) -> diesel::QueryResult<()> { + out.unsafe_to_cache_prepared(); + + self.inner.walk_ast(out.reborrow())?; + Ok(()) + } +} + +impl RunQueryDsl for TypedSqlQuery {} + +impl Query for TypedSqlQuery { + type SqlType = T; +} diff --git a/nexus/db-queries/tests/output/region_allocate_distinct_sleds.sql b/nexus/db-queries/tests/output/region_allocate_distinct_sleds.sql new file mode 100644 index 0000000000..7aa85458a6 --- /dev/null +++ b/nexus/db-queries/tests/output/region_allocate_distinct_sleds.sql @@ -0,0 +1,267 @@ +WITH + old_regions + AS ( + SELECT + region.id, + region.time_created, + region.time_modified, + region.dataset_id, + region.volume_id, + region.block_size, + region.blocks_per_extent, + region.extent_count + FROM + region + WHERE + region.volume_id = $1 + ), + old_zpool_usage + AS ( + SELECT + dataset.pool_id, sum(dataset.size_used) AS size_used + FROM + dataset + WHERE + (dataset.size_used IS NOT NULL) AND (dataset.time_deleted IS NULL) + GROUP BY + dataset.pool_id + ), + candidate_zpools + AS ( + SELECT + DISTINCT ON (zpool.sled_id) old_zpool_usage.pool_id + FROM + old_zpool_usage + INNER JOIN (zpool INNER JOIN sled ON zpool.sled_id = sled.id) ON + zpool.id = old_zpool_usage.pool_id + WHERE + (old_zpool_usage.size_used + $2) + <= ( + SELECT + total_size + FROM + omicron.public.inv_zpool + WHERE + inv_zpool.id = old_zpool_usage.pool_id + ORDER BY + inv_zpool.time_collected DESC + LIMIT + 1 + ) + AND sled.sled_policy = 'in_service' + AND sled.sled_state = 'active' + ORDER BY + zpool.sled_id, md5(CAST(zpool.id AS BYTES) || $3) + ), + candidate_datasets + AS ( + SELECT + DISTINCT ON (dataset.pool_id) dataset.id, dataset.pool_id + FROM + dataset INNER JOIN candidate_zpools ON dataset.pool_id = candidate_zpools.pool_id + WHERE + ((dataset.time_deleted IS NULL) AND (dataset.size_used IS NOT NULL)) + AND dataset.kind = 'crucible' + ORDER BY + dataset.pool_id, md5(CAST(dataset.id AS BYTES) || $4) + ), + shuffled_candidate_datasets + AS ( + SELECT + candidate_datasets.id, candidate_datasets.pool_id + FROM + candidate_datasets + ORDER BY + md5(CAST(candidate_datasets.id AS BYTES) || $5) + LIMIT + $6 + ), + candidate_regions + AS ( + SELECT + gen_random_uuid() AS id, + now() AS time_created, + now() AS time_modified, + shuffled_candidate_datasets.id AS dataset_id, + $7 AS volume_id, + $8 AS block_size, + $9 AS blocks_per_extent, + $10 AS extent_count + FROM + shuffled_candidate_datasets + ), + proposed_dataset_changes + AS ( + SELECT + candidate_regions.dataset_id AS id, + dataset.pool_id AS pool_id, + candidate_regions.block_size + * candidate_regions.blocks_per_extent + * candidate_regions.extent_count + AS size_used_delta + FROM + candidate_regions INNER JOIN dataset ON dataset.id = candidate_regions.dataset_id + ), + do_insert + AS ( + SELECT + ( + ( + (SELECT count(*) FROM old_regions LIMIT 1) < $11 + AND CAST( + IF( + ((SELECT count(*) FROM candidate_zpools LIMIT 1) >= $12), + 'TRUE', + 'Not enough space' + ) + AS BOOL + ) + ) + AND CAST( + IF( + ((SELECT count(*) FROM candidate_regions LIMIT 1) >= $13), + 'TRUE', + 'Not enough datasets' + ) + AS BOOL + ) + ) + AND CAST( + IF( + ( + ( + SELECT + count(DISTINCT dataset.pool_id) + FROM + candidate_regions + INNER JOIN dataset ON candidate_regions.dataset_id = dataset.id + LIMIT + 1 + ) + >= $14 + ), + 'TRUE', + 'Not enough unique zpools selected' + ) + AS BOOL + ) + AS insert + ), + inserted_regions + AS ( + INSERT + INTO + region + ( + id, + time_created, + time_modified, + dataset_id, + volume_id, + block_size, + blocks_per_extent, + extent_count + ) + SELECT + candidate_regions.id, + candidate_regions.time_created, + candidate_regions.time_modified, + candidate_regions.dataset_id, + candidate_regions.volume_id, + candidate_regions.block_size, + candidate_regions.blocks_per_extent, + candidate_regions.extent_count + FROM + candidate_regions + WHERE + (SELECT do_insert.insert FROM do_insert LIMIT 1) + RETURNING + region.id, + region.time_created, + region.time_modified, + region.dataset_id, + region.volume_id, + region.block_size, + region.blocks_per_extent, + region.extent_count + ), + updated_datasets + AS ( + UPDATE + dataset + SET + size_used + = dataset.size_used + + ( + SELECT + proposed_dataset_changes.size_used_delta + FROM + proposed_dataset_changes + WHERE + proposed_dataset_changes.id = dataset.id + LIMIT + 1 + ) + WHERE + dataset.id = ANY (SELECT proposed_dataset_changes.id FROM proposed_dataset_changes) + AND (SELECT do_insert.insert FROM do_insert LIMIT 1) + RETURNING + dataset.id, + dataset.time_created, + dataset.time_modified, + dataset.time_deleted, + dataset.rcgen, + dataset.pool_id, + dataset.ip, + dataset.port, + dataset.kind, + dataset.size_used + ) +( + SELECT + dataset.id, + dataset.time_created, + dataset.time_modified, + dataset.time_deleted, + dataset.rcgen, + dataset.pool_id, + dataset.ip, + dataset.port, + dataset.kind, + dataset.size_used, + old_regions.id, + old_regions.time_created, + old_regions.time_modified, + old_regions.dataset_id, + old_regions.volume_id, + old_regions.block_size, + old_regions.blocks_per_extent, + old_regions.extent_count + FROM + old_regions INNER JOIN dataset ON old_regions.dataset_id = dataset.id +) +UNION + ( + SELECT + updated_datasets.id, + updated_datasets.time_created, + updated_datasets.time_modified, + updated_datasets.time_deleted, + updated_datasets.rcgen, + updated_datasets.pool_id, + updated_datasets.ip, + updated_datasets.port, + updated_datasets.kind, + updated_datasets.size_used, + inserted_regions.id, + inserted_regions.time_created, + inserted_regions.time_modified, + inserted_regions.dataset_id, + inserted_regions.volume_id, + inserted_regions.block_size, + inserted_regions.blocks_per_extent, + inserted_regions.extent_count + FROM + inserted_regions + INNER JOIN updated_datasets ON inserted_regions.dataset_id = updated_datasets.id + ) diff --git a/nexus/db-queries/tests/output/region_allocate_random_sleds.sql b/nexus/db-queries/tests/output/region_allocate_random_sleds.sql new file mode 100644 index 0000000000..0918c8f2d1 --- /dev/null +++ b/nexus/db-queries/tests/output/region_allocate_random_sleds.sql @@ -0,0 +1,265 @@ +WITH + old_regions + AS ( + SELECT + region.id, + region.time_created, + region.time_modified, + region.dataset_id, + region.volume_id, + region.block_size, + region.blocks_per_extent, + region.extent_count + FROM + region + WHERE + region.volume_id = $1 + ), + old_zpool_usage + AS ( + SELECT + dataset.pool_id, sum(dataset.size_used) AS size_used + FROM + dataset + WHERE + (dataset.size_used IS NOT NULL) AND (dataset.time_deleted IS NULL) + GROUP BY + dataset.pool_id + ), + candidate_zpools + AS ( + SELECT + old_zpool_usage.pool_id + FROM + old_zpool_usage + INNER JOIN (zpool INNER JOIN sled ON zpool.sled_id = sled.id) ON + zpool.id = old_zpool_usage.pool_id + WHERE + (old_zpool_usage.size_used + $2) + <= ( + SELECT + total_size + FROM + omicron.public.inv_zpool + WHERE + inv_zpool.id = old_zpool_usage.pool_id + ORDER BY + inv_zpool.time_collected DESC + LIMIT + 1 + ) + AND sled.sled_policy = 'in_service' + AND sled.sled_state = 'active' + ), + candidate_datasets + AS ( + SELECT + DISTINCT ON (dataset.pool_id) dataset.id, dataset.pool_id + FROM + dataset INNER JOIN candidate_zpools ON dataset.pool_id = candidate_zpools.pool_id + WHERE + ((dataset.time_deleted IS NULL) AND (dataset.size_used IS NOT NULL)) + AND dataset.kind = 'crucible' + ORDER BY + dataset.pool_id, md5(CAST(dataset.id AS BYTES) || $3) + ), + shuffled_candidate_datasets + AS ( + SELECT + candidate_datasets.id, candidate_datasets.pool_id + FROM + candidate_datasets + ORDER BY + md5(CAST(candidate_datasets.id AS BYTES) || $4) + LIMIT + $5 + ), + candidate_regions + AS ( + SELECT + gen_random_uuid() AS id, + now() AS time_created, + now() AS time_modified, + shuffled_candidate_datasets.id AS dataset_id, + $6 AS volume_id, + $7 AS block_size, + $8 AS blocks_per_extent, + $9 AS extent_count + FROM + shuffled_candidate_datasets + ), + proposed_dataset_changes + AS ( + SELECT + candidate_regions.dataset_id AS id, + dataset.pool_id AS pool_id, + candidate_regions.block_size + * candidate_regions.blocks_per_extent + * candidate_regions.extent_count + AS size_used_delta + FROM + candidate_regions INNER JOIN dataset ON dataset.id = candidate_regions.dataset_id + ), + do_insert + AS ( + SELECT + ( + ( + (SELECT count(*) FROM old_regions LIMIT 1) < $10 + AND CAST( + IF( + ((SELECT count(*) FROM candidate_zpools LIMIT 1) >= $11), + 'TRUE', + 'Not enough space' + ) + AS BOOL + ) + ) + AND CAST( + IF( + ((SELECT count(*) FROM candidate_regions LIMIT 1) >= $12), + 'TRUE', + 'Not enough datasets' + ) + AS BOOL + ) + ) + AND CAST( + IF( + ( + ( + SELECT + count(DISTINCT dataset.pool_id) + FROM + candidate_regions + INNER JOIN dataset ON candidate_regions.dataset_id = dataset.id + LIMIT + 1 + ) + >= $13 + ), + 'TRUE', + 'Not enough unique zpools selected' + ) + AS BOOL + ) + AS insert + ), + inserted_regions + AS ( + INSERT + INTO + region + ( + id, + time_created, + time_modified, + dataset_id, + volume_id, + block_size, + blocks_per_extent, + extent_count + ) + SELECT + candidate_regions.id, + candidate_regions.time_created, + candidate_regions.time_modified, + candidate_regions.dataset_id, + candidate_regions.volume_id, + candidate_regions.block_size, + candidate_regions.blocks_per_extent, + candidate_regions.extent_count + FROM + candidate_regions + WHERE + (SELECT do_insert.insert FROM do_insert LIMIT 1) + RETURNING + region.id, + region.time_created, + region.time_modified, + region.dataset_id, + region.volume_id, + region.block_size, + region.blocks_per_extent, + region.extent_count + ), + updated_datasets + AS ( + UPDATE + dataset + SET + size_used + = dataset.size_used + + ( + SELECT + proposed_dataset_changes.size_used_delta + FROM + proposed_dataset_changes + WHERE + proposed_dataset_changes.id = dataset.id + LIMIT + 1 + ) + WHERE + dataset.id = ANY (SELECT proposed_dataset_changes.id FROM proposed_dataset_changes) + AND (SELECT do_insert.insert FROM do_insert LIMIT 1) + RETURNING + dataset.id, + dataset.time_created, + dataset.time_modified, + dataset.time_deleted, + dataset.rcgen, + dataset.pool_id, + dataset.ip, + dataset.port, + dataset.kind, + dataset.size_used + ) +( + SELECT + dataset.id, + dataset.time_created, + dataset.time_modified, + dataset.time_deleted, + dataset.rcgen, + dataset.pool_id, + dataset.ip, + dataset.port, + dataset.kind, + dataset.size_used, + old_regions.id, + old_regions.time_created, + old_regions.time_modified, + old_regions.dataset_id, + old_regions.volume_id, + old_regions.block_size, + old_regions.blocks_per_extent, + old_regions.extent_count + FROM + old_regions INNER JOIN dataset ON old_regions.dataset_id = dataset.id +) +UNION + ( + SELECT + updated_datasets.id, + updated_datasets.time_created, + updated_datasets.time_modified, + updated_datasets.time_deleted, + updated_datasets.rcgen, + updated_datasets.pool_id, + updated_datasets.ip, + updated_datasets.port, + updated_datasets.kind, + updated_datasets.size_used, + inserted_regions.id, + inserted_regions.time_created, + inserted_regions.time_modified, + inserted_regions.dataset_id, + inserted_regions.volume_id, + inserted_regions.block_size, + inserted_regions.blocks_per_extent, + inserted_regions.extent_count + FROM + inserted_regions + INNER JOIN updated_datasets ON inserted_regions.dataset_id = updated_datasets.id + ) diff --git a/nexus/src/app/sagas/snapshot_create.rs b/nexus/src/app/sagas/snapshot_create.rs index f1d1a2bd02..290868aae2 100644 --- a/nexus/src/app/sagas/snapshot_create.rs +++ b/nexus/src/app/sagas/snapshot_create.rs @@ -1175,7 +1175,7 @@ async fn ssc_start_running_snapshot( ); let snapshot_id = sagactx.lookup::("snapshot_id")?; - info!(log, "starting running snapshot for {snapshot_id}"); + info!(log, "starting running snapshot"; "snapshot_id" => %snapshot_id); let (.., disk) = LookupPath::new(&opctx, &osagactx.datastore()) .disk_id(params.disk_id) @@ -1198,7 +1198,13 @@ async fn ssc_start_running_snapshot( let url = format!("http://{}", dataset.address()); let client = CrucibleAgentClient::new(&url); - info!(log, "dataset {:?} region {:?} url {}", dataset, region, url); + info!( + log, + "contacting crucible agent to confirm region exists"; + "dataset" => ?dataset, + "region" => ?region, + "url" => url, + ); // Validate with the Crucible agent that the snapshot exists let crucible_region = retry_until_known_result(log, || async { @@ -1208,7 +1214,11 @@ async fn ssc_start_running_snapshot( .map_err(|e| e.to_string()) .map_err(ActionError::action_failed)?; - info!(log, "crucible region {:?}", crucible_region); + info!( + log, + "confirmed the region exists with crucible agent"; + "crucible region" => ?crucible_region + ); let crucible_snapshot = retry_until_known_result(log, || async { client @@ -1222,7 +1232,11 @@ async fn ssc_start_running_snapshot( .map_err(|e| e.to_string()) .map_err(ActionError::action_failed)?; - info!(log, "crucible snapshot {:?}", crucible_snapshot); + info!( + log, + "successfully accessed crucible snapshot"; + "crucible snapshot" => ?crucible_snapshot + ); // Start the snapshot running let crucible_running_snapshot = @@ -1238,7 +1252,11 @@ async fn ssc_start_running_snapshot( .map_err(|e| e.to_string()) .map_err(ActionError::action_failed)?; - info!(log, "crucible running snapshot {:?}", crucible_running_snapshot); + info!( + log, + "successfully started running region snapshot"; + "crucible running snapshot" => ?crucible_running_snapshot + ); // Map from the region to the snapshot let region_addr = format!( diff --git a/test-utils/src/dev/db.rs b/test-utils/src/dev/db.rs index c148a60e1c..d8b15520a4 100644 --- a/test-utils/src/dev/db.rs +++ b/test-utils/src/dev/db.rs @@ -21,6 +21,7 @@ use std::time::Duration; use tempfile::tempdir; use tempfile::TempDir; use thiserror::Error; +use tokio::io::AsyncWriteExt; use tokio_postgres::config::Host; use tokio_postgres::config::SslMode; @@ -497,6 +498,15 @@ pub enum CockroachStartError { )] TimedOut { pid: u32, time_waited: Duration }, + #[error("failed to write input to cockroachdb")] + FailedToWrite(#[source] std::io::Error), + + #[error("failed to await cockroachdb completing")] + FailedToWait(#[source] std::io::Error), + + #[error("Invalid cockroachdb output")] + InvalidOutput(#[from] std::string::FromUtf8Error), + #[error("unknown error waiting for cockroach to start")] Unknown { #[source] @@ -653,6 +663,45 @@ impl Drop for CockroachInstance { } } +/// Uses cockroachdb to run the "sqlfmt" command. +pub async fn format_sql(input: &str) -> Result { + let mut cmd = tokio::process::Command::new(COCKROACHDB_BIN); + let mut child = cmd + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .args(&[ + "sqlfmt", + "--tab-width", + "2", + "--use-spaces", + "true", + "--print-width", + "100", + ]) + .spawn() + .map_err(|source| CockroachStartError::BadCmd { + cmd: COCKROACHDB_BIN.to_string(), + source, + })?; + let stdin = child.stdin.as_mut().unwrap(); + stdin + .write_all(input.as_bytes()) + .await + .map_err(CockroachStartError::FailedToWrite)?; + let output = child + .wait_with_output() + .await + .map_err(CockroachStartError::FailedToWait)?; + + if !output.status.success() { + return Err(CockroachStartError::Exited { + exit_code: output.status.code().unwrap_or_else(|| -1), + }); + } + + Ok(String::from_utf8(output.stdout)?) +} + /// Verify that CockroachDB has the correct version pub async fn check_db_version() -> Result<(), CockroachStartError> { let mut cmd = tokio::process::Command::new(COCKROACHDB_BIN);