From b9fa803cfd71c0573acafd4d2673c121015a0418 Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Wed, 3 Jul 2024 04:02:29 +0000 Subject: [PATCH] better, more progress --- .../src/db/datastore/timeseries_schema.rs | 800 +++++++++++++----- oximeter/impl/src/schema/mod.rs | 2 + 2 files changed, 580 insertions(+), 222 deletions(-) diff --git a/nexus/db-queries/src/db/datastore/timeseries_schema.rs b/nexus/db-queries/src/db/datastore/timeseries_schema.rs index f6eb6da2e4b..764fb37e9b1 100644 --- a/nexus/db-queries/src/db/datastore/timeseries_schema.rs +++ b/nexus/db-queries/src/db/datastore/timeseries_schema.rs @@ -8,25 +8,30 @@ use super::DataStore; use crate::db::error::public_error_from_diesel; use crate::db::error::ErrorHandler; use crate::db::raw_query_builder::QueryBuilder; +use crate::db::DbConnection; use async_bb8_diesel::AsyncRunQueryDsl; +use async_bb8_diesel::Connection; +use diesel::result::DatabaseErrorKind; +use diesel::result::Error as DieselError; use diesel::sql_types; use diesel::BoolExpressionMethods as _; use diesel::ExpressionMethods as _; use diesel::JoinOnDsl as _; use diesel::QueryDsl as _; use diesel::SelectableHelper as _; -use diesel::result::Error as DieselError; -use diesel::result::DatabaseErrorKind; use nexus_auth::authz; use nexus_auth::context::OpContext; +use nexus_db_model::schema::timeseries_field::dsl as field_dsl; +use nexus_db_model::schema::timeseries_field_by_version::dsl as version_dsl; +use nexus_db_model::schema::timeseries_schema::dsl as schema_dsl; use nexus_db_model::Generation; use nexus_db_model::TimeseriesAuthzScopeEnum; use nexus_db_model::TimeseriesDatumTypeEnum; use nexus_db_model::TimeseriesField; +use nexus_db_model::TimeseriesFieldByVersion; use nexus_db_model::TimeseriesFieldSourceEnum; use nexus_db_model::TimeseriesFieldTypeEnum; use nexus_db_model::TimeseriesSchema; -use nexus_db_model::TimeseriesFieldByVersion; use nexus_db_model::TimeseriesUnitsEnum; use omicron_common::api::external::CreateResult; use omicron_common::api::external::DataPageParams; @@ -35,9 +40,29 @@ use omicron_common::api::external::ListResultVec; use omicron_common::api::external::LookupResult; use std::collections::BTreeMap; use std::num::NonZeroU8; -use nexus_db_model::schema::timeseries_field::dsl as field_dsl; -use nexus_db_model::schema::timeseries_schema::dsl as schema_dsl; -use nexus_db_model::schema::timeseries_field_by_version::dsl as version_dsl; + +/// Helper used to select a timeseries version. +#[derive(Clone, Copy, Debug)] +enum VersionCheck { + /// Select a timeseries no greater than this version. + NoGreater(NonZeroU8), + /// Select a timeseries with exactly this version. + Exact(NonZeroU8), +} + +impl VersionCheck { + const fn version(&self) -> NonZeroU8 { + match self { + VersionCheck::NoGreater(v) => *v, + VersionCheck::Exact(v) => *v, + } + } +} + +/// Type alias for a tuple of database records selected from all three +/// timeseries schema tables, such as during a JOIN. +type SchemaTuple = + (TimeseriesSchema, TimeseriesField, TimeseriesFieldByVersion); impl DataStore { /// Load timeseries schema from their static definitions. @@ -52,113 +77,12 @@ impl DataStore { Ok(()) } - // Insert or update a timeseries schema. - // - // This will only modify the description columns if a record already - // exists, leaving things like the actual datum type and fields alone. Those - // cannot be changed within a timeseries version. - async fn upsert_timeseries_schema( - &self, - opctx: &OpContext, - schema: &oximeter::TimeseriesSchema, - ) -> CreateResult<()> { - - // Fetch any existing timeseries schema at this version first, in order - // to correctly do OCC with updates. - let maybe_existing = self.fetch_timeseries_schema( - opctx, - &schema.timeseries_name, - schema.version, - ).await; - match maybe_existing { - Ok(existing_schema) => todo!(), - Err(Error::NotFound { .. }) => { - self.insert_new_timeseries_schema(opctx, schema).await - } - Err(e) => Err(e), - } - } - - /// Fetch a single timeseries schema by name and version. - pub async fn fetch_timeseries_schema( - &self, - opctx: &OpContext, - timeseries_name: &oximeter::TimeseriesName, - version: NonZeroU8, - ) -> LookupResult { - - // TODO-security: Correct authorization here. For now, must be a fleet - // reader. - opctx.authorize(authz::Action::Read, &authz::FLEET).await?; - let conn = self.pool_connection_authorized(opctx).await?; - - // 3-table JOIN between the schema, fields, and fields-by-version. All - // use the timeseries name. See `list_timeseries_schema()` for a more - // detailed explanation. - let mut rows = - schema_dsl::timeseries_schema - .inner_join(field_dsl::timeseries_field.on( - schema_dsl::timeseries_name.eq(field_dsl::timeseries_name), - )) - .inner_join( - version_dsl::timeseries_field_by_version.on( - schema_dsl::timeseries_name - .eq(version_dsl::timeseries_name) - .and( - field_dsl::name - .eq(version_dsl::field_name), - ), - ), - ) - .select(<( - TimeseriesSchema, - TimeseriesField, - TimeseriesFieldByVersion, - )>::as_select()) - .filter( - schema_dsl::timeseries_name - .eq(timeseries_name.to_string()) - .and(version_dsl::version.eq(i16::from(version.get()))), - ) - .load_async(&*conn) - .await - .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))? - .into_iter(); - - // Extract just the schema itself from the JOIN result. - // - // This has no fields, but has all the other metadata for the schema. - let Some((schema_row, field_row, version_row)) = rows.next() else { - return Err(Error::non_resourcetype_not_found(format!( - "Timeseries '{}' version {} not found", - timeseries_name, version, - ))); - }; - let mut schema = schema_row.into_bare_schema(version)?; - - // Attach all field rows, including the one we already fetched above. - let first_row = std::iter::once((field_row, version_row)); - let remaining_rows = rows.into_iter().map(|(_, f, v)| (f, v)); - for (field_row, version_row) in first_row.chain(remaining_rows) { - let is_new = schema.field_schema.insert(field_row.into()); - if !is_new { - return Err(Error::internal_error(&format!( - "while fetching the schema for timeseries '{}' \ - version {}, the field '{}' appears duplicated", - timeseries_name, version, version_row.field_name, - ))); - } - } - Ok(schema) - } - /// Fetch a page of timeseries schema from the database. pub async fn list_timeseries_schema( &self, opctx: &OpContext, pagparams: &DataPageParams<'_, (oximeter::TimeseriesName, NonZeroU8)>, ) -> ListResultVec { - // For now, must be a fleet reader. // // TODO-correctness: Relax this, possibly to the set of things the @@ -184,46 +108,38 @@ impl DataStore { // // We match up all tables on the timeseries name; and the versioned // table additionally on the field name. - let query = - schema_dsl::timeseries_schema - // JOIN between the timeseries schema and field tables just using - // the timeseries name - .inner_join(field_dsl::timeseries_field.on( - schema_dsl::timeseries_name.eq(field_dsl::timeseries_name), - )) - // JOIN between that and the field-by-version table using: - // - The timeseries name for the schema and field-by-version - // table. - // - The field name for the field and field-by-version table - .inner_join( - version_dsl::timeseries_field_by_version.on( - schema_dsl::timeseries_name - .eq(version_dsl::timeseries_name) - .and( - field_dsl::name - .eq(version_dsl::field_name), - ), - ), - ) - // Select the record type from each table. - .select(<( - TimeseriesSchema, - TimeseriesField, - TimeseriesFieldByVersion, - )>::as_select()) - // Filter by the pagination marker, and limit the results to the - // requested page size. - .filter( + let query = schema_dsl::timeseries_schema + // JOIN between the timeseries schema and field tables just using + // the timeseries name + .inner_join( + field_dsl::timeseries_field + .on(schema_dsl::timeseries_name + .eq(field_dsl::timeseries_name)), + ) + // JOIN between that and the field-by-version table using: + // - The timeseries name for the schema and field-by-version + // table. + // - The field name for the field and field-by-version table + .inner_join( + version_dsl::timeseries_field_by_version.on( schema_dsl::timeseries_name - .eq(timeseries_name.clone()) - .and(version_dsl::version.gt(i16::from(version.get()))), - ) - .or_filter( - schema_dsl::timeseries_name.gt(timeseries_name.clone()), - ) - .order(schema_dsl::timeseries_name.asc()) - .then_order_by(version_dsl::version.asc()) - .limit(i64::from(pagparams.limit.get())); + .eq(version_dsl::timeseries_name) + .and(field_dsl::name.eq(version_dsl::field_name)), + ), + ) + // Select the record type from each table. + .select(SchemaTuple::as_select()) + // Filter by the pagination marker, and limit the results to the + // requested page size. + .filter( + schema_dsl::timeseries_name + .eq(timeseries_name.clone()) + .and(version_dsl::version.gt(i16::from(version.get()))), + ) + .or_filter(schema_dsl::timeseries_name.gt(timeseries_name.clone())) + .order(schema_dsl::timeseries_name.asc()) + .then_order_by(version_dsl::version.asc()) + .limit(i64::from(pagparams.limit.get())); // Select the actual rows from the JOIN result. let rows = query @@ -281,31 +197,512 @@ impl DataStore { Ok(schema.into_values().collect()) } - // Helper to insert a timeseries schema assumed to be new. - // - // This will fail on any conflicts. - async fn insert_new_timeseries_schema( + /// Fetch a single timeseries schema by name and version. + pub async fn fetch_timeseries_schema( + &self, + opctx: &OpContext, + timeseries_name: &oximeter::TimeseriesName, + version: NonZeroU8, + ) -> LookupResult { + self.fetch_timeseries_schema_impl( + opctx, + timeseries_name, + VersionCheck::Exact(version), + ) + .await + .map(|(schema, _generation)| schema) + } + + /// Insert or update a timeseries schema. + /// + /// This will only modify the description columns if a record already + /// exists, leaving things like the actual datum type and fields alone. Those + /// cannot be changed within a timeseries version. + async fn upsert_timeseries_schema( + &self, + opctx: &OpContext, + schema: &oximeter::TimeseriesSchema, + ) -> CreateResult<()> { + // We first fetch the latest version of the timeseries no greater than + // this one. + // + // The goal here is to handle three cases: + // + // - We are inserting a brand new schema + // - We are adding a new _version_ of a schema, of which we've seen + // previous versions + // - We are updating the exact same version of the schema. + // + // Each of these cases is distinguished by fetching the latest schema + // not greater than the current version. The return value of that for + // each of the above situations is, correspondingly: + // + // - an `Error::NotFound` + // - a schema with a previous version + // - a schema with the exact same version + // + // And in each of these cases, we take the following actions: + // + // # Brand new schema + // + // In this case, we have found no records for the schema at all. We + // insert a new record in the `timeseries_schema` table, and new records + // in both the `timeseries_field` table and + // `timeseries_field_by_version` table. + let maybe_existing = self + .fetch_latest_timeseries_schema( + opctx, + &schema.timeseries_name, + schema.version, + ) + .await; + match maybe_existing { + Ok((existing_schema, generation)) => { + if existing_schema.version == schema.version { + self.upsert_existing_timeseries_schema( + opctx, + schema, + Some(existing_schema), + generation, + ) + .await + } else if existing_schema.version < schema.version { + self.upsert_existing_timeseries_schema( + opctx, schema, None, generation, + ) + .await + } else { + todo!(); + } + } + Err(Error::NotFound { .. }) => { + self.insert_new_timeseries_schema(opctx, schema).await + } + Err(e) => Err(e), + } + } + + /// Fetch the timeseries schema by name with a version no greater than + /// `version`. + async fn fetch_latest_timeseries_schema( + &self, + opctx: &OpContext, + timeseries_name: &oximeter::TimeseriesName, + version: NonZeroU8, + ) -> LookupResult<(oximeter::TimeseriesSchema, Generation)> { + self.fetch_timeseries_schema_impl( + opctx, + timeseries_name, + VersionCheck::NoGreater(version), + ) + .await + } + + /// Internal implementation to fetch records from the DB and reconstitute + /// the `oximeter::TimeseriesSchema` from them, if possible. + async fn fetch_timeseries_schema_impl( + &self, + opctx: &OpContext, + timeseries_name: &oximeter::TimeseriesName, + version_check: VersionCheck, + ) -> LookupResult<(oximeter::TimeseriesSchema, Generation)> { + // TODO-security: Correct authorization here. For now, must be a fleet + // reader. + opctx.authorize(authz::Action::Read, &authz::FLEET).await?; + let conn = self.pool_connection_authorized(opctx).await?; + + // 3-table JOIN between the schema, fields, and fields-by-version. All + // use the timeseries name. See `list_timeseries_schema()` for a more + // detailed explanation. + let rows = match version_check { + VersionCheck::NoGreater(version) => { + self.fetch_timeseries_schema_no_greater_than_version( + &conn, + timeseries_name, + version, + ) + .await? + } + VersionCheck::Exact(version) => { + self.fetch_timeseries_schema_exact_version( + &conn, + timeseries_name, + version, + ) + .await? + } + }; + + // We may have selected more than one version of the schema, if we + // looked up the schema no greater than our current version. Filter + // things down so that we're only looking at the latest version of it. + let Some(extracted_version) = + rows.iter().map(|(_, _, version_row)| version_row.version).max() + else { + return Err(Error::non_resourcetype_not_found(format!( + "Timeseries '{}' version {} not found", + timeseries_name, + version_check.version(), + ))); + }; + let mut rows = rows.into_iter().filter(|(_, _, version_row)| { + version_row.version == extracted_version + }); + let Some(extracted_version) = NonZeroU8::new(*extracted_version) else { + return Err(Error::internal_error( + "while fetching the schema for timeseries '{}' \ + version {}, version of 0 was returned from the DB", + )); + }; + + // Extract just the schema itself from the JOIN result. + // + // This has no fields, but has all the other metadata for the schema. + let Some((schema_row, field_row, version_row)) = rows.next() else { + // Safety: We just found the max version, or returned an error if + // the list of rows was empty, and then filtered down the rows to + // those with the max version. That came from the rows themselves, + // so the iterator cannot be empty. + unreachable!(); + }; + + // Construct the bare schem, using the _retrieved_ version number. We + // may have selected something older depending on how this method was + // called. + let generation = schema_row.generation; + let mut schema = schema_row.into_bare_schema(extracted_version)?; + + // Attach all field rows, including the one we already fetched above. + let first_row = std::iter::once((field_row, version_row)); + let remaining_rows = rows.into_iter().map(|(_, f, v)| (f, v)); + for (field_row, version_row) in first_row.chain(remaining_rows) { + if field_row.generation != generation + || version_row.generation != generation + { + return Err(Error::internal_error(&format!( + "while fetching the schema for timeseries '{}' \ + version {}, generation numbers do not all match, \ + schema generation = {}, field generation = {}, \ + versioned field generation = {}", + timeseries_name, + extracted_version, + generation.0, + field_row.generation.0, + version_row.generation.0, + ))); + } + let is_new = schema.field_schema.insert(field_row.into()); + if !is_new { + return Err(Error::internal_error(&format!( + "while fetching the schema for timeseries '{}' \ + version {}, the field '{}' appears duplicated", + timeseries_name, extracted_version, version_row.field_name, + ))); + } + } + Ok((schema, generation)) + } + + /// Select a timeseries schema from the database by name, whose version is + /// less than or equal to the provided version. + /// + /// NOTE: This and `fetch_timeseries_schema_exact_version` are different + /// functions to avoid descending into diesel trait bound hell, which is + /// what happens if we try to specify just the comparison on the version + /// column itself (e.g., version = 1 or version <= 1). + async fn fetch_timeseries_schema_no_greater_than_version( + &self, + conn: &Connection, + timeseries_name: &oximeter::TimeseriesName, + version: NonZeroU8, + ) -> Result, Error> { + schema_dsl::timeseries_schema + .inner_join( + field_dsl::timeseries_field + .on(schema_dsl::timeseries_name + .eq(field_dsl::timeseries_name)), + ) + .inner_join( + version_dsl::timeseries_field_by_version.on( + schema_dsl::timeseries_name + .eq(version_dsl::timeseries_name) + .and(field_dsl::name.eq(version_dsl::field_name)), + ), + ) + .select(SchemaTuple::as_select()) + .filter( + schema_dsl::timeseries_name + .eq(timeseries_name.to_string()) + .and(version_dsl::version.le(i16::from(version.get()))), + ) + .load_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + + /// Select a timeseries schema from the database by name, whose version is + /// exactly equal to the provided version. + async fn fetch_timeseries_schema_exact_version( + &self, + conn: &Connection, + timeseries_name: &oximeter::TimeseriesName, + version: NonZeroU8, + ) -> Result, Error> { + schema_dsl::timeseries_schema + .inner_join( + field_dsl::timeseries_field + .on(schema_dsl::timeseries_name + .eq(field_dsl::timeseries_name)), + ) + .inner_join( + version_dsl::timeseries_field_by_version.on( + schema_dsl::timeseries_name + .eq(version_dsl::timeseries_name) + .and(field_dsl::name.eq(version_dsl::field_name)), + ), + ) + .select(SchemaTuple::as_select()) + .filter( + schema_dsl::timeseries_name + .eq(timeseries_name.to_string()) + .and(version_dsl::version.eq(i16::from(version.get()))), + ) + .load_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + + /// Upsert a timeseries schema where we already have the same version. + /// + /// There are two flavors of this function, depending on whether the + /// `existing_schema` is Some or None. + /// + /// # `existing_schema` is Some + /// + /// This is intended to operrate when the following conditions obtain: + /// + /// - We have a matching record in the `timeseries_schema` table; + /// - We have matching records in the `timeseries_field` table; and + /// - We have matching records in the `timeseries_field_by_version` table. + /// + /// In other words, we've already seen this exact timeseries schema. + /// + /// In this case we update: + /// + /// - the `timeseries_schema` columns `description`, `time_modified` and + /// `generation` + /// - the `timeseries_field` columns `description`, `time_modified` and + /// `generation` + /// + /// Nothing else is inserted. + /// + /// # `existing_schema` is None + /// + /// This is intended to operate when the following conditions obtain: + /// + /// - We have a matching record in the `timeseries_schema` table; + /// - We have matching records in the `timeseries_field` table; and + /// - We have **no** matching records in the `timeseries_field_by_version` + /// table. + /// + /// In other words, we have seen _some_ previous version of the schema, but + /// not this one. + /// + /// In this case, we update: + /// + /// - the `timeseries_schema` columns `description`, `time_modified` and + /// `generation` + /// - the `timeseries_field` columns `description`, `time_modified` and + /// `generation` + /// + /// And we insert the field version rows themselves, with the same + /// `generation` as the other tables. This upsert is conditional on the + /// generation number of the first two tables not changing. + async fn upsert_existing_timeseries_schema( &self, opctx: &OpContext, - schema: &oximeter::TimeseriesSchema + schema: &oximeter::TimeseriesSchema, + existing_schema: Option, + existing_generation: Generation, ) -> CreateResult<()> { + if let Some(existing_schema) = &existing_schema { + if schema != existing_schema { + return Err(Error::invalid_request(&format!( + "Timeseries '{}' version {} appears to already \ + have a record in the database, but whose datum type, \ + units, authz scope, or field schema differ", + schema.timeseries_name, schema.version, + ))); + } + } + + // Collect the data for each table derived from the new schema, using + // the next generation for OCC. + let generation = Generation::from(existing_generation.next()); + let field_rows = TimeseriesField::for_schema(schema, generation); + let fields_by_version = + TimeseriesFieldByVersion::for_schema(schema, generation); + let conn = self.pool_connection_authorized(opctx).await?; + + // Build out a CTE to update the descriptions, modification times, and + // generations only. Nothing else changes. + let mut builder = QueryBuilder::new() + .sql("WITH \ + updated_schema AS (\ + UPDATE timeseries_schema \ + SET (\ + target_description, \ + metric_description, \ + time_modified, \ + generation \ + ) = (" + ) + .param() + .bind::(schema.description.target.clone()) + .sql(", ") + .param() + .bind::(schema.description.metric.clone()) + .sql(", NOW(), timeseries_schema.generation + 1) WHERE timeseries_name = ") + .param() + .bind::(schema.timeseries_name.to_string()) + .sql(" AND generation = ") + .param() + .bind::(existing_generation) + .sql(" RETURNING generation), "); + + // Update description, modification time, and generation of each field. + // + // Because this requires updating different rows with different values, + // it's written as an `INSERT ... ON CONFLICT ... DO UPDATE` + builder = builder + .sql("updated_fields AS (INSERT INTO timeseries_field VALUES "); + let n_rows = field_rows.len(); + for (i, field_row) in field_rows.into_iter().enumerate() { + let tail = if i == n_rows - 1 { ")" } else { "), " }; + builder = builder + .sql("(") + .param() + .bind::(field_row.timeseries_name) + .sql(", ") + .param() + .bind::(field_row.name) + .sql(", ") + .param() + .bind::(field_row.source) + .sql(", ") + .param() + .bind::(field_row.type_) + .sql(", ") + .param() + .bind::(field_row.description) + .sql(", ") + .param() + .bind::(field_row.time_created) + .sql(", ") + .param() + .bind::(field_row.time_modified) + .sql(", ") + .param() + .bind::(field_row.generation) + .sql(tail); + } + builder = builder.sql( + " \ + ON CONFLICT (timeseries_name, name) \ + DO UPDATE SET \ + (description, time_modified, generation) = \ + (excluded.description, NOW(), timeseries_field.generation + 1) \ + RETURNING 1 AS dummy1)", + ); + // And insert the row for each version / field. This works differently + // if we're inserting a new version or updating an existing version. If + // an existing one, then we model it similar to above: an `INSERT ... ON + // CONFLICT ... DO UPDATE`. Otherwise, we just insert the rows directly. + builder = builder.sql( + ", \ + inserted_versions AS (\ + INSERT INTO timeseries_field_by_version \ + VALUES ", + ); + let n_rows = fields_by_version.len(); + for (i, row) in fields_by_version.into_iter().enumerate() { + let tail = if i == n_rows - 1 { ")" } else { "), " }; + builder = builder + .sql("(") + .param() + .bind::(row.timeseries_name) + .sql(", ") + .param() + .bind::(row.version) + .sql(", ") + .param() + .bind::(row.field_name) + .sql(", ") + .param() + .bind::(row.generation) + .sql(tail); + } + + // Close the last CTE and return the sanity-checking data. + let maybe_on_conflict_clause = if existing_schema.is_some() { + "\ + ON CONFLICT (timeseries_name, field_name, version) \ + DO UPDATE SET \ + (time_modified, generation) = \ + (NOW(), timeseries_field_by_version.generation + 1) " + } else { + "" + }; + builder = builder + .sql(maybe_on_conflict_clause) + .sql("RETURNING 2 as dummy2) SELECT COUNT(*) FROM updated_fields"); + + // Actually run the main query. + builder + .query::() + .get_result_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + .and_then(|count: i64| { + if count == 0 { + todo!("return conflict error"); + } else { + Ok(()) + } + }) + } + + /// Insert a brand new timeseries schema, where we have no previous + /// versions. + /// + /// In this case, we insert records in all three related tables with + /// generation = 1. We detect and log conflicts, but return success in that + /// case. + async fn insert_new_timeseries_schema( + &self, + opctx: &OpContext, + schema: &oximeter::TimeseriesSchema, + ) -> CreateResult<()> { // Collect the data for each table derived from the new schema. These // all have generation 1, since we're assuming we're inserting the first // records. let generation = Generation::new(); let schema_row = TimeseriesSchema::new(schema, generation); let field_rows = TimeseriesField::for_schema(schema, generation); - let fields_by_version = TimeseriesFieldByVersion::for_schema(schema, generation); + let fields_by_version = + TimeseriesFieldByVersion::for_schema(schema, generation); let conn = self.pool_connection_authorized(opctx).await?; // We'll build out a CTE to insert all the records at once. First, the // (single) record of the timeseries schema itself. let mut builder = QueryBuilder::new() - .sql("WITH \ + .sql( + "WITH \ inserted_schema AS (\ INSERT INTO timeseries_schema \ - VALUES (") + VALUES (", + ) .param() .bind::(schema_row.timeseries_name) .sql(", ") @@ -332,20 +729,18 @@ impl DataStore { .sql(", ") .param() .bind::(schema_row.generation) - .sql(") \ + .sql( + ") \ RETURNING timeseries_schema.generation AS generation), \ inserted_fields AS (\ INSERT INTO timeseries_field \ - VALUES "); + VALUES ", + ); // Insert the row for each field. let n_rows = field_rows.len(); for (i, field_row) in field_rows.into_iter().enumerate() { - let tail = if i == n_rows - 1 { - ")" - } else { - "), " - }; + let tail = if i == n_rows - 1 { ")" } else { "), " }; builder = builder .sql("(") .param() @@ -375,18 +770,16 @@ impl DataStore { } // And insert the row for each version / field. - builder = builder.sql(" \ + builder = builder.sql( + " \ RETURNING 1 AS dummy1), \ inserted_versions AS (\ INSERT INTO timeseries_field_by_version \ - VALUES "); + VALUES ", + ); let n_rows = fields_by_version.len(); for (i, row) in fields_by_version.into_iter().enumerate() { - let tail = if i == n_rows - 1 { - ")" - } else { - "), " - }; + let tail = if i == n_rows - 1 { ")" } else { "), " }; builder = builder .sql("(") .param() @@ -404,10 +797,11 @@ impl DataStore { } // Close the last CTE and return the sanity-checking data. - builder = builder.sql(" \ + builder = builder.sql( + " \ RETURNING 1 as dummy2) \ - SELECT COUNT(*) FROM inserted_fields" - ); + SELECT COUNT(*) FROM inserted_fields", + ); // Actually run the main query. builder @@ -415,7 +809,10 @@ impl DataStore { .get_result_async(&*conn) .await .map_err(|e| match e { - DieselError::DatabaseError(DatabaseErrorKind::UniqueViolation, ..) => { + DieselError::DatabaseError( + DatabaseErrorKind::UniqueViolation, + .., + ) => { // Someone else inserted between when we first looked for a // schema (and didn't find one), and now, so bail out with a // conflict error. @@ -426,7 +823,7 @@ impl DataStore { schema.timeseries_name, )) } - e => public_error_from_diesel(e, ErrorHandler::Server) + e => public_error_from_diesel(e, ErrorHandler::Server), }) .and_then(|count: i64| { let Ok(count) = usize::try_from(count) else { @@ -449,46 +846,6 @@ impl DataStore { ))) } }) - - /* - CteBuilder::new() - .add_subquery(InsertSchemaRow::new(schema_row)) - // Next add the subquery to insert a record for each fields in the - // schema. - .add_subquery( - diesel::insert_into(field_dsl::timeseries_field).values(field_rows) - ) - - // Finally, add the subquery to insert each field name for this - // version of the timeseries. - .add_subquery( - diesel::insert_into(version_dsl::timeseries_field_by_version) - .values(fields_by_version) - ) - // And then run the query! - // - // We need _some_ query fragment outside of the CTEs themselves, - // though we don't care what it is. `SELECT NOW()` is the simplest. - .build(Box::new(diesel::dsl::now)) - .into_boxed() - .load_async(&*conn) - .await - .map_err(|e| match e { - DieselError::DatabaseError(DatabaseErrorKind::UniqueViolation, ..) => { - // Someone else inserted between when we first looked for a - // schema (and didn't find one), and now, so bail out with a - // conflict error. - Error::conflict(format!( - "Failed to insert schema for timeseries '{}'. \ - Records for it were added between the initial \ - check and our attempt to insert it", - schema.timeseries_name, - )) - } - e => public_error_from_diesel(e, ErrorHandler::Server) - }) - .map(|_| ()) - */ } } @@ -747,7 +1104,8 @@ mod tests { #[tokio::test] async fn upsert_schema_fails_with_new_datum_type() { - let logctx = dev::test_setup_log("upsert_schema_fails_with_new_datum_type"); + let logctx = + dev::test_setup_log("upsert_schema_fails_with_new_datum_type"); let mut db = test_setup_database(&logctx.log).await; let (opctx, datastore) = datastore_test(&logctx, &db).await; @@ -757,14 +1115,12 @@ mod tests { target: "a target".into(), metric: "a metric".into(), }, - field_schema: BTreeSet::from([ - FieldSchema { - name: "f0".into(), - field_type: FieldType::Uuid, - source: FieldSource::Target, - description: "target field 0".into(), - }, - ]), + field_schema: BTreeSet::from([FieldSchema { + name: "f0".into(), + field_type: FieldType::Uuid, + source: FieldSource::Target, + description: "target field 0".into(), + }]), datum_type: DatumType::HistogramI64, version: 1.try_into().unwrap(), authz_scope: AuthzScope::Fleet, diff --git a/oximeter/impl/src/schema/mod.rs b/oximeter/impl/src/schema/mod.rs index 28dbf38ab85..ec08420bb04 100644 --- a/oximeter/impl/src/schema/mod.rs +++ b/oximeter/impl/src/schema/mod.rs @@ -319,7 +319,9 @@ impl PartialEq for TimeseriesSchema { fn eq(&self, other: &TimeseriesSchema) -> bool { self.timeseries_name == other.timeseries_name && self.version == other.version + && self.authz_scope == other.authz_scope && self.datum_type == other.datum_type + && self.units == other.units && self.field_schema == other.field_schema } }