From e59df96076efdce5721dc0fbde0191324d0eb2e3 Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Fri, 28 Jun 2024 16:58:00 +0000 Subject: [PATCH] wip, wrong direction but useful --- nexus/db-model/src/schema.rs | 10 +- nexus/db-model/src/timeseries_schema.rs | 71 +-- .../src/db/datastore/timeseries_schema.rs | 460 ++++++++++++++---- schema/crdb/dbinit.sql | 30 +- 4 files changed, 433 insertions(+), 138 deletions(-) diff --git a/nexus/db-model/src/schema.rs b/nexus/db-model/src/schema.rs index 0117cb8bcc2..62cad6e5509 100644 --- a/nexus/db-model/src/schema.rs +++ b/nexus/db-model/src/schema.rs @@ -1762,9 +1762,8 @@ table! { } table! { - timeseries_schema (timeseries_name, version) { + timeseries_schema (timeseries_name) { timeseries_name -> Text, - version -> Int2, authz_scope -> crate::TimeseriesAuthzScopeEnum, target_description -> Text, metric_description -> Text, @@ -1772,6 +1771,7 @@ table! { units -> crate::TimeseriesUnitsEnum, time_created -> Timestamptz, time_modified -> Timestamptz, + generation -> Int8, } } @@ -1784,14 +1784,16 @@ table! { description -> Text, time_created -> Timestamptz, time_modified -> Timestamptz, + generation -> Int8, } } table! { - timeseries_version_field (timeseries_name, version, field_name) { + timeseries_field_by_version (timeseries_name, version, field_name) { timeseries_name -> Text, version -> Int2, field_name -> Text, + generation -> Int8, } } @@ -1900,5 +1902,5 @@ joinable!(network_interface -> probe (parent_id)); allow_tables_to_appear_in_same_query!( timeseries_field, timeseries_schema, - timeseries_version_field + timeseries_field_by_version ); diff --git a/nexus/db-model/src/timeseries_schema.rs b/nexus/db-model/src/timeseries_schema.rs index 082547f2cc7..20e9d224c15 100644 --- a/nexus/db-model/src/timeseries_schema.rs +++ b/nexus/db-model/src/timeseries_schema.rs @@ -4,16 +4,18 @@ //! Types modeling the timeseries schema tables. +use std::collections::BTreeSet; +use std::num::NonZeroU8; + use crate::impl_enum_type; use crate::schema::timeseries_field; use crate::schema::timeseries_schema; -use crate::schema::timeseries_version_field; +use crate::schema::timeseries_field_by_version; +use crate::Generation; use crate::SqlU8; use chrono::DateTime; use chrono::Utc; use omicron_common::api::external::Error; -use std::collections::BTreeSet; -use std::num::NonZeroU8; impl_enum_type! { #[derive(SqlType, QueryId, Debug, Clone, Copy)] @@ -287,8 +289,6 @@ impl From for oximeter::schema::Units { pub struct TimeseriesSchema { /// The name of the timeseries. pub timeseries_name: String, - /// The version of the timeseries. - pub version: SqlU8, /// The authorization scope of the timeseries. pub authz_scope: TimeseriesAuthzScope, /// The description of the timeseries's target. @@ -301,40 +301,32 @@ pub struct TimeseriesSchema { pub units: TimeseriesUnits, pub time_created: DateTime, pub time_modified: DateTime, + /// Generation number of the timeseries schema, shared by all records for a + /// single schema. Used for OCC. + pub generation: Generation, } -impl TryFrom for oximeter::TimeseriesSchema { - type Error = Error; - - // NOTE: This converts _only_ the parts in the actual `timeseries_schema` - // table, which omits the field schema. those must be added later. - fn try_from(value: TimeseriesSchema) -> Result { - let Ok(timeseries_name) = value.timeseries_name.as_str().try_into() +impl TimeseriesSchema { + pub fn into_bare_schema(self, version: NonZeroU8) -> Result { + let Ok(timeseries_name) = self.timeseries_name.as_str().try_into() else { return Err(Error::internal_error(&format!( "Invalid timeseries name in database: '{}'", - value.timeseries_name - ))); - }; - let Some(version) = NonZeroU8::new(*value.version) else { - return Err(Error::internal_error(&format!( - "Found zero version number for \ - timeseries '{}' in database", - *value.version, + self.timeseries_name ))); }; - Ok(Self { + Ok(oximeter::TimeseriesSchema { timeseries_name, description: oximeter::schema::TimeseriesDescription { - target: value.target_description, - metric: value.metric_description, + target: self.target_description, + metric: self.metric_description, }, field_schema: BTreeSet::new(), - datum_type: value.datum_type.into(), + datum_type: self.datum_type.into(), version, - authz_scope: value.authz_scope.into(), - units: value.units.into(), - created: value.time_created, + authz_scope: self.authz_scope.into(), + units: self.units.into(), + created: self.time_created, }) } } @@ -357,11 +349,11 @@ impl TimeseriesSchemaUpdate { } } -impl From<&oximeter::TimeseriesSchema> for TimeseriesSchema { - fn from(schema: &oximeter::TimeseriesSchema) -> Self { +impl TimeseriesSchema { + /// Create a schema row with the provided generation. + pub fn new(schema: &oximeter::TimeseriesSchema, generation: Generation) -> Self { Self { timeseries_name: schema.timeseries_name.to_string(), - version: schema.version.get().into(), authz_scope: schema.authz_scope.into(), target_description: schema.description.target.clone(), metric_description: schema.description.metric.clone(), @@ -369,6 +361,7 @@ impl From<&oximeter::TimeseriesSchema> for TimeseriesSchema { units: schema.units.into(), time_created: schema.created, time_modified: schema.created, + generation, } } } @@ -388,6 +381,9 @@ pub struct TimeseriesField { pub description: String, pub time_created: DateTime, pub time_modified: DateTime, + /// Generation number of the timeseries schema, shared by all records for a + /// single schema. Used for OCC. + pub generation: Generation, } impl From for oximeter::FieldSchema { @@ -402,7 +398,7 @@ impl From for oximeter::FieldSchema { } impl TimeseriesField { - pub fn for_schema(schema: &oximeter::TimeseriesSchema) -> Vec { + pub fn for_schema(schema: &oximeter::TimeseriesSchema, generation: Generation) -> Vec { schema .field_schema .iter() @@ -414,6 +410,7 @@ impl TimeseriesField { description: field.description.clone(), time_created: schema.created, time_modified: schema.created, + generation, }) .collect() } @@ -422,18 +419,21 @@ impl TimeseriesField { /// This type models the mapping from each version of a timeseries schema to a /// row it contains. #[derive(Insertable, Selectable, Queryable, Clone, Debug)] -#[diesel(table_name = timeseries_version_field)] -pub struct TimeseriesVersionField { +#[diesel(table_name = timeseries_field_by_version)] +pub struct TimeseriesFieldByVersion { /// The name of the timeseries this field belongs to. pub timeseries_name: String, /// The version of the timeseries this field belongs to. pub version: SqlU8, /// The name of the field. pub field_name: String, + /// Generation number of the timeseries schema, shared by all records for a + /// single schema. Used for OCC. + pub generation: Generation, } -impl TimeseriesVersionField { - pub fn for_schema(schema: &oximeter::TimeseriesSchema) -> Vec { +impl TimeseriesFieldByVersion { + pub fn for_schema(schema: &oximeter::TimeseriesSchema, generation: Generation) -> Vec { schema .field_schema .iter() @@ -441,6 +441,7 @@ impl TimeseriesVersionField { timeseries_name: schema.timeseries_name.to_string(), version: schema.version.get().into(), field_name: field.name.clone(), + generation, }) .collect() } diff --git a/nexus/db-queries/src/db/datastore/timeseries_schema.rs b/nexus/db-queries/src/db/datastore/timeseries_schema.rs index 829aac95507..f6eb6da2e4b 100644 --- a/nexus/db-queries/src/db/datastore/timeseries_schema.rs +++ b/nexus/db-queries/src/db/datastore/timeseries_schema.rs @@ -7,19 +7,27 @@ use super::DataStore; use crate::db::error::public_error_from_diesel; use crate::db::error::ErrorHandler; +use crate::db::raw_query_builder::QueryBuilder; use async_bb8_diesel::AsyncRunQueryDsl; -use diesel::upsert::excluded; +use diesel::sql_types; use diesel::BoolExpressionMethods as _; use diesel::ExpressionMethods as _; use diesel::JoinOnDsl as _; use diesel::QueryDsl as _; use diesel::SelectableHelper as _; +use diesel::result::Error as DieselError; +use diesel::result::DatabaseErrorKind; use nexus_auth::authz; use nexus_auth::context::OpContext; +use nexus_db_model::Generation; +use nexus_db_model::TimeseriesAuthzScopeEnum; +use nexus_db_model::TimeseriesDatumTypeEnum; use nexus_db_model::TimeseriesField; +use nexus_db_model::TimeseriesFieldSourceEnum; +use nexus_db_model::TimeseriesFieldTypeEnum; use nexus_db_model::TimeseriesSchema; -use nexus_db_model::TimeseriesSchemaUpdate; -use nexus_db_model::TimeseriesVersionField; +use nexus_db_model::TimeseriesFieldByVersion; +use nexus_db_model::TimeseriesUnitsEnum; use omicron_common::api::external::CreateResult; use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; @@ -27,6 +35,9 @@ use omicron_common::api::external::ListResultVec; use omicron_common::api::external::LookupResult; use std::collections::BTreeMap; use std::num::NonZeroU8; +use nexus_db_model::schema::timeseries_field::dsl as field_dsl; +use nexus_db_model::schema::timeseries_schema::dsl as schema_dsl; +use nexus_db_model::schema::timeseries_field_by_version::dsl as version_dsl; impl DataStore { /// Load timeseries schema from their static definitions. @@ -41,105 +52,73 @@ impl DataStore { Ok(()) } + // Insert or update a timeseries schema. + // + // This will only modify the description columns if a record already + // exists, leaving things like the actual datum type and fields alone. Those + // cannot be changed within a timeseries version. async fn upsert_timeseries_schema( &self, opctx: &OpContext, schema: &oximeter::TimeseriesSchema, ) -> CreateResult<()> { - use nexus_db_model::schema::timeseries_field::dsl as field_dsl; - use nexus_db_model::schema::timeseries_schema::dsl as schema_dsl; - use nexus_db_model::schema::timeseries_version_field::dsl as version_field_dsl; - // Insert the schema itself, possibly updating the descriptions. - let conn = self.pool_connection_authorized(opctx).await?; - let schema_row = TimeseriesSchema::from(schema); - let schema_updates = TimeseriesSchemaUpdate::new(&schema_row); - diesel::insert_into(schema_dsl::timeseries_schema) - .values(schema_row) - .on_conflict((schema_dsl::timeseries_name, schema_dsl::version)) - .do_update() - .set(schema_updates) - .execute_async(&*conn) - .await - .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; - - // Insert one record for each of the fields in the schema, also updating - // the description if needed. Importantly, we do not update the type. - let field_rows = TimeseriesField::for_schema(schema); - diesel::insert_into(field_dsl::timeseries_field) - .values(field_rows) - .on_conflict((field_dsl::timeseries_name, field_dsl::name)) - .do_update() - .set(( - // We currently support updating the description or the field - // source, which might happen if someone moved the field from - // the target to metric. - field_dsl::description.eq(excluded(field_dsl::description)), - field_dsl::source.eq(excluded(field_dsl::source)), - field_dsl::time_modified.eq(diesel::dsl::now), - )) - .execute_async(&*conn) - .await - .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; - - // Insert the mapping between the schema version and the fields it - // refers to. This uses the entire record as the PK, so conflicts can be - // ignored -- any mapping from a field name to a version of a timeseries - // schema that we already have is OK. - let field_mappings = TimeseriesVersionField::for_schema(schema); - diesel::insert_into(version_field_dsl::timeseries_version_field) - .values(field_mappings) - .on_conflict_do_nothing() - .execute_async(&*conn) - .await - .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) - .map(|_| ()) + // Fetch any existing timeseries schema at this version first, in order + // to correctly do OCC with updates. + let maybe_existing = self.fetch_timeseries_schema( + opctx, + &schema.timeseries_name, + schema.version, + ).await; + match maybe_existing { + Ok(existing_schema) => todo!(), + Err(Error::NotFound { .. }) => { + self.insert_new_timeseries_schema(opctx, schema).await + } + Err(e) => Err(e), + } } + /// Fetch a single timeseries schema by name and version. pub async fn fetch_timeseries_schema( &self, opctx: &OpContext, timeseries_name: &oximeter::TimeseriesName, version: NonZeroU8, ) -> LookupResult { - use nexus_db_model::schema::timeseries_field::dsl as field_dsl; - use nexus_db_model::schema::timeseries_schema::dsl as schema_dsl; - use nexus_db_model::schema::timeseries_version_field::dsl as version_field_dsl; // TODO-security: Correct authorization here. For now, must be a fleet // reader. opctx.authorize(authz::Action::Read, &authz::FLEET).await?; let conn = self.pool_connection_authorized(opctx).await?; - // See `list_timeseries_schema` for details on this JOIN query. + // 3-table JOIN between the schema, fields, and fields-by-version. All + // use the timeseries name. See `list_timeseries_schema()` for a more + // detailed explanation. let mut rows = schema_dsl::timeseries_schema .inner_join(field_dsl::timeseries_field.on( schema_dsl::timeseries_name.eq(field_dsl::timeseries_name), )) .inner_join( - version_field_dsl::timeseries_version_field.on( + version_dsl::timeseries_field_by_version.on( schema_dsl::timeseries_name - .eq(version_field_dsl::timeseries_name) - .and( - schema_dsl::version - .eq(version_field_dsl::version), - ) + .eq(version_dsl::timeseries_name) .and( field_dsl::name - .eq(version_field_dsl::field_name), + .eq(version_dsl::field_name), ), ), ) .select(<( TimeseriesSchema, TimeseriesField, - TimeseriesVersionField, + TimeseriesFieldByVersion, )>::as_select()) .filter( schema_dsl::timeseries_name .eq(timeseries_name.to_string()) - .and(schema_dsl::version.eq(i16::from(version.get()))), + .and(version_dsl::version.eq(i16::from(version.get()))), ) .load_async(&*conn) .await @@ -150,14 +129,12 @@ impl DataStore { // // This has no fields, but has all the other metadata for the schema. let Some((schema_row, field_row, version_row)) = rows.next() else { - // TODO-correctness: This "should" be a 404, but that all relies a - // bit on the authz story making more sense. - return Err(Error::invalid_request(format!( + return Err(Error::non_resourcetype_not_found(format!( "Timeseries '{}' version {} not found", timeseries_name, version, ))); }; - let mut schema = oximeter::TimeseriesSchema::try_from(schema_row)?; + let mut schema = schema_row.into_bare_schema(version)?; // Attach all field rows, including the one we already fetched above. let first_row = std::iter::once((field_row, version_row)); @@ -181,9 +158,6 @@ impl DataStore { opctx: &OpContext, pagparams: &DataPageParams<'_, (oximeter::TimeseriesName, NonZeroU8)>, ) -> ListResultVec { - use nexus_db_model::schema::timeseries_field::dsl as field_dsl; - use nexus_db_model::schema::timeseries_schema::dsl as schema_dsl; - use nexus_db_model::schema::timeseries_version_field::dsl as version_field_dsl; // For now, must be a fleet reader. // @@ -209,7 +183,7 @@ impl DataStore { // - `timeseries_version_field` // // We match up all tables on the timeseries name; and the versioned - // table additionally on the field name and timeseries version. + // table additionally on the field name. let query = schema_dsl::timeseries_schema // JOIN between the timeseries schema and field tables just using @@ -217,21 +191,17 @@ impl DataStore { .inner_join(field_dsl::timeseries_field.on( schema_dsl::timeseries_name.eq(field_dsl::timeseries_name), )) - // JOIN between that and the versioned field table using: - // - The timeseries name for the schema and versioned field table - // - The version the schema and versioned field table - // - The field name for the field and versioned field table + // JOIN between that and the field-by-version table using: + // - The timeseries name for the schema and field-by-version + // table. + // - The field name for the field and field-by-version table .inner_join( - version_field_dsl::timeseries_version_field.on( + version_dsl::timeseries_field_by_version.on( schema_dsl::timeseries_name - .eq(version_field_dsl::timeseries_name) - .and( - schema_dsl::version - .eq(version_field_dsl::version), - ) + .eq(version_dsl::timeseries_name) .and( field_dsl::name - .eq(version_field_dsl::field_name), + .eq(version_dsl::field_name), ), ), ) @@ -239,20 +209,20 @@ impl DataStore { .select(<( TimeseriesSchema, TimeseriesField, - TimeseriesVersionField, + TimeseriesFieldByVersion, )>::as_select()) - // Filter to the pagination marker, and limit the results to its - // page size. + // Filter by the pagination marker, and limit the results to the + // requested page size. .filter( schema_dsl::timeseries_name .eq(timeseries_name.clone()) - .and(schema_dsl::version.gt(i16::from(version.get()))), + .and(version_dsl::version.gt(i16::from(version.get()))), ) .or_filter( schema_dsl::timeseries_name.gt(timeseries_name.clone()), ) .order(schema_dsl::timeseries_name.asc()) - .then_order_by(schema_dsl::version.asc()) + .then_order_by(version_dsl::version.asc()) .limit(i64::from(pagparams.limit.get())); // Select the actual rows from the JOIN result. @@ -282,7 +252,7 @@ impl DataStore { // Find the current schema, or add the one built from this current // row of the schema table. This will have an empty set of fields, // which will always be added to after looking up the schema. - let Some(version) = NonZeroU8::new(*schema_row.version) else { + let Some(version) = NonZeroU8::new(*versioned_row.version) else { return Err(Error::internal_error(&format!( "database contains invalid version \ number of 0 for timeseries '{}'", @@ -293,7 +263,8 @@ impl DataStore { let is_new = schema .entry(key) .or_insert_with(|| { - oximeter::TimeseriesSchema::try_from(schema_row) + schema_row + .into_bare_schema(version) .expect("fallible parts checked above (name, version)") }) .field_schema @@ -309,6 +280,216 @@ impl DataStore { Ok(schema.into_values().collect()) } + + // Helper to insert a timeseries schema assumed to be new. + // + // This will fail on any conflicts. + async fn insert_new_timeseries_schema( + &self, + opctx: &OpContext, + schema: &oximeter::TimeseriesSchema + ) -> CreateResult<()> { + + // Collect the data for each table derived from the new schema. These + // all have generation 1, since we're assuming we're inserting the first + // records. + let generation = Generation::new(); + let schema_row = TimeseriesSchema::new(schema, generation); + let field_rows = TimeseriesField::for_schema(schema, generation); + let fields_by_version = TimeseriesFieldByVersion::for_schema(schema, generation); + let conn = self.pool_connection_authorized(opctx).await?; + + // We'll build out a CTE to insert all the records at once. First, the + // (single) record of the timeseries schema itself. + let mut builder = QueryBuilder::new() + .sql("WITH \ + inserted_schema AS (\ + INSERT INTO timeseries_schema \ + VALUES (") + .param() + .bind::(schema_row.timeseries_name) + .sql(", ") + .param() + .bind::(schema_row.authz_scope) + .sql(", ") + .param() + .bind::(schema_row.target_description) + .sql(", ") + .param() + .bind::(schema_row.metric_description) + .sql(", ") + .param() + .bind::(schema_row.datum_type) + .sql(", ") + .param() + .bind::(schema_row.units) + .sql(", ") + .param() + .bind::(schema_row.time_created) + .sql(", ") + .param() + .bind::(schema_row.time_modified) + .sql(", ") + .param() + .bind::(schema_row.generation) + .sql(") \ + RETURNING timeseries_schema.generation AS generation), \ + inserted_fields AS (\ + INSERT INTO timeseries_field \ + VALUES "); + + // Insert the row for each field. + let n_rows = field_rows.len(); + for (i, field_row) in field_rows.into_iter().enumerate() { + let tail = if i == n_rows - 1 { + ")" + } else { + "), " + }; + builder = builder + .sql("(") + .param() + .bind::(field_row.timeseries_name) + .sql(", ") + .param() + .bind::(field_row.name) + .sql(", ") + .param() + .bind::(field_row.source) + .sql(", ") + .param() + .bind::(field_row.type_) + .sql(", ") + .param() + .bind::(field_row.description) + .sql(", ") + .param() + .bind::(field_row.time_created) + .sql(", ") + .param() + .bind::(field_row.time_modified) + .sql(", ") + .param() + .bind::(field_row.generation) + .sql(tail); + } + + // And insert the row for each version / field. + builder = builder.sql(" \ + RETURNING 1 AS dummy1), \ + inserted_versions AS (\ + INSERT INTO timeseries_field_by_version \ + VALUES "); + let n_rows = fields_by_version.len(); + for (i, row) in fields_by_version.into_iter().enumerate() { + let tail = if i == n_rows - 1 { + ")" + } else { + "), " + }; + builder = builder + .sql("(") + .param() + .bind::(row.timeseries_name) + .sql(", ") + .param() + .bind::(row.version) + .sql(", ") + .param() + .bind::(row.field_name) + .sql(", ") + .param() + .bind::(row.generation) + .sql(tail); + } + + // Close the last CTE and return the sanity-checking data. + builder = builder.sql(" \ + RETURNING 1 as dummy2) \ + SELECT COUNT(*) FROM inserted_fields" + ); + + // Actually run the main query. + builder + .query::() + .get_result_async(&*conn) + .await + .map_err(|e| match e { + DieselError::DatabaseError(DatabaseErrorKind::UniqueViolation, ..) => { + // Someone else inserted between when we first looked for a + // schema (and didn't find one), and now, so bail out with a + // conflict error. + Error::conflict(format!( + "Failed to insert schema for timeseries '{}'. \ + Records for it were added between the initial \ + check and our attempt to insert it", + schema.timeseries_name, + )) + } + e => public_error_from_diesel(e, ErrorHandler::Server) + }) + .and_then(|count: i64| { + let Ok(count) = usize::try_from(count) else { + return Err(Error::internal_error(&format!( + "Unable to convert sanity check count from i64 \ + to usize! Count of inserted rows in timeseries_field \ + table is {}", + count, + ))); + }; + if count == schema.field_schema.len() { + Ok(()) + } else { + Err(Error::internal_error(&format!( + "Expected to insert {} rows in the timeseries_field \ + table when inserting new schema, but looks like we \ + inserted {} instead", + schema.field_schema.len(), + count, + ))) + } + }) + + /* + CteBuilder::new() + .add_subquery(InsertSchemaRow::new(schema_row)) + // Next add the subquery to insert a record for each fields in the + // schema. + .add_subquery( + diesel::insert_into(field_dsl::timeseries_field).values(field_rows) + ) + + // Finally, add the subquery to insert each field name for this + // version of the timeseries. + .add_subquery( + diesel::insert_into(version_dsl::timeseries_field_by_version) + .values(fields_by_version) + ) + // And then run the query! + // + // We need _some_ query fragment outside of the CTEs themselves, + // though we don't care what it is. `SELECT NOW()` is the simplest. + .build(Box::new(diesel::dsl::now)) + .into_boxed() + .load_async(&*conn) + .await + .map_err(|e| match e { + DieselError::DatabaseError(DatabaseErrorKind::UniqueViolation, ..) => { + // Someone else inserted between when we first looked for a + // schema (and didn't find one), and now, so bail out with a + // conflict error. + Error::conflict(format!( + "Failed to insert schema for timeseries '{}'. \ + Records for it were added between the initial \ + check and our attempt to insert it", + schema.timeseries_name, + )) + } + e => public_error_from_diesel(e, ErrorHandler::Server) + }) + .map(|_| ()) + */ + } } #[cfg(test)] @@ -318,6 +499,7 @@ mod tests { use dropshot::PaginationOrder; use nexus_test_utils::db::test_setup_database; use omicron_common::api::external::DataPageParams; + use omicron_common::api::external::Error; use omicron_test_utils::dev; use oximeter::schema::AuthzScope; use oximeter::schema::FieldSource; @@ -515,8 +697,104 @@ mod tests { logctx.cleanup_successful(); } + // What kinds of updates do we want to support? + // What kinds of updates do we want to disallow? + // + // When Nexus starts up, it may have an updated timeseries schema. "Updated" + // means two things: + // + // - A new version of the timeseries. + // - Updates to _descriptions_, which are shared by all versions. + // + // IMPORTANT: The timeseries_schema table should not have a version number + // in it. That's only in the version_field table. All the other columns are + // _shared_ across all versions of the schema. + // + // Let's look at the second case first. + // + // # New version + // + // The timeseries_schema table should be unchanged. + // + // The field table.... + // + // + // Versioned field table: + // + // There are three cases: + // + // - nothing in the table at all (no previous record of this version) + // - something there already, but it's exactly the same as we're inserting + // - something there already, but it's _different_ than what we're + // inserting. + // + // + // ## Nothing + // + // I don't believe it's possible to catch two concurrent inserts, with + // completely different sets of fields. (This is really unlikely, but I would + // still prefer if we could catch it.) + // + // Both clients would + // + // - Fetch the existing set of rows, which is empty + // - insert their rows with generation = 1 + // - that doesn't conflict, because they're inserting disjoint sets of rows + // + // So the current table structure cannot detect all the kinds of conflicts. + // It _can_ sort of detect overlapping but distinct rows, though it is + // definitely awkward to do that. + #[tokio::test] - async fn upsert_schema_fails_with_duplicate_field_names() { - todo!(); + async fn upsert_schema_fails_with_new_datum_type() { + let logctx = dev::test_setup_log("upsert_schema_fails_with_new_datum_type"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + + let original_schema = TimeseriesSchema { + timeseries_name: "foo:bar".try_into().unwrap(), + description: TimeseriesDescription { + target: "a target".into(), + metric: "a metric".into(), + }, + field_schema: BTreeSet::from([ + FieldSchema { + name: "f0".into(), + field_type: FieldType::Uuid, + source: FieldSource::Target, + description: "target field 0".into(), + }, + ]), + datum_type: DatumType::HistogramI64, + version: 1.try_into().unwrap(), + authz_scope: AuthzScope::Fleet, + units: Units::Count, + created: Utc::now(), + }; + datastore + .upsert_timeseries_schema(&opctx, &original_schema) + .await + .expect("expected to insert valid schema"); + + // Upsert a schema with modified datum type. This should not be + // possible, given the way we construct timeseries schema, but we + // disallow it here in any case. + let modified_schema = oximeter::TimeseriesSchema { + datum_type: DatumType::F64, + ..original_schema.clone() + }; + let err = datastore + .upsert_timeseries_schema(&opctx, &modified_schema) + .await + .expect_err("expected to insert schema with only new descriptions"); + assert!( + matches!(err, Error::Conflict { .. }), + "Expected a conflict when attempting to insert \ + an existing timeseries schema with a new datum \ + type, but found: {err:#?}", + ); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); } } diff --git a/schema/crdb/dbinit.sql b/schema/crdb/dbinit.sql index 4cedd80bd9b..fc3a1bd0fc5 100644 --- a/schema/crdb/dbinit.sql +++ b/schema/crdb/dbinit.sql @@ -4173,9 +4173,7 @@ CREATE TABLE IF NOT EXISTS omicron.public.timeseries_schema ( -- -- Names are derived from the concatenation of the target and metric names, -- joined with a colon (':'). - timeseries_name STRING(128) NOT NULL, - -- The version number of this timeseries's schema. - version INT2 NOT NULL CHECK (version > 0), + timeseries_name STRING(128) PRIMARY KEY, -- The authorization scope of the timeseries. authz_scope omicron.public.timeseries_authz_scope NOT NULL, -- Textual description of the timeseries's target. @@ -4188,7 +4186,11 @@ CREATE TABLE IF NOT EXISTS omicron.public.timeseries_schema ( units omicron.public.timeseries_units NOT NULL, time_created TIMESTAMPTZ NOT NULL, time_modified TIMESTAMPTZ NOT NULL, - PRIMARY KEY (timeseries_name, version) + + -- Generation number. This is shared by all records in the + -- `timeseries_schema`, `timeseries_field`, and + -- `timeseries_field_by_version` tables, and is used for OCC. + generation INT8 NOT NULL ); -- Fields for each timeseries schema. @@ -4197,14 +4199,20 @@ CREATE TABLE IF NOT EXISTS omicron.public.timeseries_field ( timeseries_name STRING(128) NOT NULL, -- Name of the field. name STRING(128) NOT NULL, - -- Data type of the field. - type_ omicron.public.timeseries_field_type NOT NULL, -- Source of the field, either 'target' or 'metric'. source omicron.public.timeseries_field_source NOT NULL, + -- Data type of the field. + type_ omicron.public.timeseries_field_type NOT NULL, -- Textual description for the timeseries field. description STRING(512) NOT NULL, time_created TIMESTAMPTZ NOT NULL, time_modified TIMESTAMPTZ NOT NULL, + + -- Generation number. This is shared by all records in the + -- `timeseries_schema`, `timeseries_field`, and + -- `timeseries_field_by_version` tables, and is used for OCC. + generation INT8 NOT NULL, + -- This primary key enforces that there is exactly one field with a -- particular name in _all_ versions of a timeseries schema. In the future, -- this will likely need to be relaxed, to enforcing a unique name within a @@ -4214,10 +4222,16 @@ CREATE TABLE IF NOT EXISTS omicron.public.timeseries_field ( ); -- List of fields by name on each version of the timeseries schema -CREATE TABLE IF NOT EXISTS omicron.public.timeseries_version_field ( +CREATE TABLE IF NOT EXISTS omicron.public.timeseries_field_by_version ( timeseries_name STRING(128) NOT NULL, - version INT2 NOT NULL CHECK (version > 0), + version INT2 NOT NULL CHECK (version BETWEEN 1 AND 256), field_name STRING(128) NOT NULL, + + -- Generation number. This is shared by all records in the + -- `timeseries_schema`, `timeseries_field`, and + -- `timeseries_field_by_version` tables, and is used for OCC. + generation INT8 NOT NULL, + PRIMARY KEY (timeseries_name, version, field_name) );