From 7c19eb2d303b586732d8f53fc461ed03acd66d34 Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Tue, 25 Jun 2024 23:52:57 +0000 Subject: [PATCH] WIP: Add timeseries schema to CockroachDB on Nexus startup --- Cargo.lock | 2 + nexus/auth/src/authz/mod.rs | 4 +- nexus/db-model/Cargo.toml | 7 +- nexus/db-model/src/lib.rs | 2 + nexus/db-model/src/schema.rs | 60 +- nexus/db-model/src/schema_versions.rs | 3 +- nexus/db-model/src/timeseries_schema.rs | 447 +++++++++++++++ nexus/db-queries/src/db/datastore/mod.rs | 1 + .../src/db/datastore/timeseries_schema.rs | 522 ++++++++++++++++++ nexus/db-queries/src/db/pagination.rs | 4 +- nexus/src/populate.rs | 19 +- oximeter/impl/src/schema/codegen.rs | 19 +- oximeter/oximeter/Cargo.toml | 7 + oximeter/oximeter/build.rs | 56 ++ .../oximeter/schema/physical-data-link.toml | 14 +- oximeter/oximeter/src/lib.rs | 3 + .../add-timeseries-schema-tables/up01.sql | 6 + .../add-timeseries-schema-tables/up02.sql | 4 + .../add-timeseries-schema-tables/up03.sql | 14 + .../add-timeseries-schema-tables/up04.sql | 29 + .../add-timeseries-schema-tables/up05.sql | 4 + .../add-timeseries-schema-tables/up06.sql | 12 + .../add-timeseries-schema-tables/up07.sql | 10 + .../add-timeseries-schema-tables/up08.sql | 6 + schema/crdb/dbinit.sql | 136 ++++- 25 files changed, 1347 insertions(+), 44 deletions(-) create mode 100644 nexus/db-model/src/timeseries_schema.rs create mode 100644 nexus/db-queries/src/db/datastore/timeseries_schema.rs create mode 100644 oximeter/oximeter/build.rs create mode 100644 schema/crdb/add-timeseries-schema-tables/up01.sql create mode 100644 schema/crdb/add-timeseries-schema-tables/up02.sql create mode 100644 schema/crdb/add-timeseries-schema-tables/up03.sql create mode 100644 schema/crdb/add-timeseries-schema-tables/up04.sql create mode 100644 schema/crdb/add-timeseries-schema-tables/up05.sql create mode 100644 schema/crdb/add-timeseries-schema-tables/up06.sql create mode 100644 schema/crdb/add-timeseries-schema-tables/up07.sql create mode 100644 schema/crdb/add-timeseries-schema-tables/up08.sql diff --git a/Cargo.lock b/Cargo.lock index f9ae4c29c16..35ad05ea90a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4594,6 +4594,7 @@ dependencies = [ "omicron-uuid-kinds", "omicron-workspace-hack", "once_cell", + "oximeter", "oxnet", "parse-display", "pq-sys", @@ -6233,6 +6234,7 @@ dependencies = [ "oximeter-macro-impl", "oximeter-timeseries-macro", "prettyplease", + "quote", "syn 2.0.68", "toml 0.8.13", "uuid", diff --git a/nexus/auth/src/authz/mod.rs b/nexus/auth/src/authz/mod.rs index 1c666d2296b..cffb09a63ae 100644 --- a/nexus/auth/src/authz/mod.rs +++ b/nexus/auth/src/authz/mod.rs @@ -106,7 +106,7 @@ //! | | | +---------------+-----------+-------------+-------------+---+ | //! | | | | resource_type | role_name | resource_id | identity_id |...| | //! | | | +---------------+-----------+-------------+-------------+---+ | -//! | | | | "project " | "viewer" | 234 | 123|...| | +//! | | | | "project " | "viewer" | 123 | 234|...| | //! | | | +--^------------+--^--------+----------^--+-----------^-+---+ | //! | | | | | | | | //! +-|-|----+ | | +------------+ @@ -120,7 +120,7 @@ //! how we find these records and make them available for the authz check. //! //! Built-in users are only one possible target for role assignments. IdP users -//! (Silo users) an also be assigned roles. This all works the same way, except +//! (Silo users) can also be assigned roles. This all works the same way, except //! that in that case `role_assignment.identity_id` refers to an entry in the //! `silo_user` table rather than `user_builtin`. How do we know the //! difference? There's also an `identity_type` column in the "role_assignment" diff --git a/nexus/db-model/Cargo.toml b/nexus/db-model/Cargo.toml index a7b6cd9de1f..6e754b72045 100644 --- a/nexus/db-model/Cargo.toml +++ b/nexus/db-model/Cargo.toml @@ -41,14 +41,15 @@ tokio.workspace = true uuid.workspace = true db-macros.workspace = true -omicron-certificates.workspace = true -omicron-common.workspace = true nexus-config.workspace = true nexus-defaults.workspace = true nexus-types.workspace = true +omicron-certificates.workspace = true +omicron-common.workspace = true omicron-passwords.workspace = true -sled-agent-client.workspace = true omicron-workspace-hack.workspace = true +oximeter.workspace = true +sled-agent-client.workspace = true [dev-dependencies] camino-tempfile.workspace = true diff --git a/nexus/db-model/src/lib.rs b/nexus/db-model/src/lib.rs index 30dc82965d7..c7d9ee98b31 100644 --- a/nexus/db-model/src/lib.rs +++ b/nexus/db-model/src/lib.rs @@ -92,6 +92,7 @@ mod sled_underlay_subnet_allocation; mod snapshot; mod ssh_key; mod switch; +mod timeseries_schema; mod tuf_repo; mod typed_uuid; mod unsigned; @@ -195,6 +196,7 @@ pub use ssh_key::*; pub use switch::*; pub use switch_interface::*; pub use switch_port::*; +pub use timeseries_schema::*; pub use tuf_repo::*; pub use typed_uuid::to_db_typed_uuid; pub use upstairs_repair::*; diff --git a/nexus/db-model/src/schema.rs b/nexus/db-model/src/schema.rs index d08a51edd40..0117cb8bcc2 100644 --- a/nexus/db-model/src/schema.rs +++ b/nexus/db-model/src/schema.rs @@ -1745,16 +1745,6 @@ table! { } } -table! { - db_metadata (singleton) { - singleton -> Bool, - time_created -> Timestamptz, - time_modified -> Timestamptz, - version -> Text, - target_version -> Nullable, - } -} - table! { migration (id) { id -> Uuid, @@ -1771,6 +1761,50 @@ table! { } } +table! { + timeseries_schema (timeseries_name, version) { + timeseries_name -> Text, + version -> Int2, + authz_scope -> crate::TimeseriesAuthzScopeEnum, + target_description -> Text, + metric_description -> Text, + datum_type -> crate::TimeseriesDatumTypeEnum, + units -> crate::TimeseriesUnitsEnum, + time_created -> Timestamptz, + time_modified -> Timestamptz, + } +} + +table! { + timeseries_field (timeseries_name, name) { + timeseries_name -> Text, + name -> Text, + type_ -> crate::TimeseriesFieldTypeEnum, + source -> crate::TimeseriesFieldSourceEnum, + description -> Text, + time_created -> Timestamptz, + time_modified -> Timestamptz, + } +} + +table! { + timeseries_version_field (timeseries_name, version, field_name) { + timeseries_name -> Text, + version -> Int2, + field_name -> Text, + } +} + +table! { + db_metadata (singleton) { + singleton -> Bool, + time_created -> Timestamptz, + time_modified -> Timestamptz, + version -> Text, + target_version -> Nullable, + } +} + allow_tables_to_appear_in_same_query!(instance, migration); allow_tables_to_appear_in_same_query!(migration, vmm); joinable!(instance -> migration (migration_id)); @@ -1862,3 +1896,9 @@ joinable!(instance_ssh_key -> instance (instance_id)); allow_tables_to_appear_in_same_query!(sled, sled_instance); joinable!(network_interface -> probe (parent_id)); + +allow_tables_to_appear_in_same_query!( + timeseries_field, + timeseries_schema, + timeseries_version_field +); diff --git a/nexus/db-model/src/schema_versions.rs b/nexus/db-model/src/schema_versions.rs index 04fafe4f938..bb53d468309 100644 --- a/nexus/db-model/src/schema_versions.rs +++ b/nexus/db-model/src/schema_versions.rs @@ -17,7 +17,7 @@ use std::collections::BTreeMap; /// /// This must be updated when you change the database schema. Refer to /// schema/crdb/README.adoc in the root of this repository for details. -pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(77, 0, 0); +pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(78, 0, 0); /// List of all past database schema versions, in *reverse* order /// @@ -29,6 +29,7 @@ static KNOWN_VERSIONS: Lazy> = Lazy::new(|| { // | leaving the first copy as an example for the next person. // v // KnownVersion::new(next_int, "unique-dirname-with-the-sql-files"), + KnownVersion::new(78, "add-timeseries-schema-tables"), KnownVersion::new(77, "remove-view-for-v2p-mappings"), KnownVersion::new(76, "lookup-region-snapshot-by-snapshot-id"), KnownVersion::new(75, "add-cockroach-zone-id-to-node-id"), diff --git a/nexus/db-model/src/timeseries_schema.rs b/nexus/db-model/src/timeseries_schema.rs new file mode 100644 index 00000000000..082547f2cc7 --- /dev/null +++ b/nexus/db-model/src/timeseries_schema.rs @@ -0,0 +1,447 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Types modeling the timeseries schema tables. + +use crate::impl_enum_type; +use crate::schema::timeseries_field; +use crate::schema::timeseries_schema; +use crate::schema::timeseries_version_field; +use crate::SqlU8; +use chrono::DateTime; +use chrono::Utc; +use omicron_common::api::external::Error; +use std::collections::BTreeSet; +use std::num::NonZeroU8; + +impl_enum_type! { + #[derive(SqlType, QueryId, Debug, Clone, Copy)] + #[diesel(postgres_type(name = "timeseries_authz_scope", schema = "public"))] + pub struct TimeseriesAuthzScopeEnum; + + #[derive(Clone, Copy, Debug, AsExpression, FromSqlRow, PartialEq)] + #[diesel(sql_type = TimeseriesAuthzScopeEnum)] + pub enum TimeseriesAuthzScope; + + Fleet => b"fleet" + Silo => b"silo" + Project => b"project" + ViewableToAll => b"viewable_to_all" +} + +impl From for TimeseriesAuthzScope { + fn from(value: oximeter::schema::AuthzScope) -> Self { + match value { + oximeter::schema::AuthzScope::Fleet => Self::Fleet, + oximeter::schema::AuthzScope::Silo => Self::Silo, + oximeter::schema::AuthzScope::Project => Self::Project, + oximeter::schema::AuthzScope::ViewableToAll => Self::ViewableToAll, + } + } +} + +impl From for oximeter::schema::AuthzScope { + fn from(value: TimeseriesAuthzScope) -> Self { + match value { + TimeseriesAuthzScope::Fleet => Self::Fleet, + TimeseriesAuthzScope::Silo => Self::Silo, + TimeseriesAuthzScope::Project => Self::Project, + TimeseriesAuthzScope::ViewableToAll => Self::ViewableToAll, + } + } +} + +impl_enum_type! { + #[derive(SqlType, QueryId, Debug, Clone, Copy)] + #[diesel(postgres_type(name = "timeseries_field_source", schema = "public"))] + pub struct TimeseriesFieldSourceEnum; + + #[derive(Clone, Copy, Debug, AsExpression, FromSqlRow, PartialEq)] + #[diesel(sql_type = TimeseriesFieldSourceEnum)] + pub enum TimeseriesFieldSource; + + Target => b"target" + Metric => b"metric" +} + +impl From for TimeseriesFieldSource { + fn from(value: oximeter::schema::FieldSource) -> Self { + match value { + oximeter::schema::FieldSource::Target => Self::Target, + oximeter::schema::FieldSource::Metric => Self::Metric, + } + } +} + +impl From for oximeter::schema::FieldSource { + fn from(value: TimeseriesFieldSource) -> Self { + match value { + TimeseriesFieldSource::Target => Self::Target, + TimeseriesFieldSource::Metric => Self::Metric, + } + } +} + +impl_enum_type! { + #[derive(SqlType, QueryId, Debug, Clone, Copy)] + #[diesel(postgres_type(name = "timeseries_field_type", schema = "public"))] + pub struct TimeseriesFieldTypeEnum; + + #[derive(Clone, Copy, Debug, AsExpression, FromSqlRow, PartialEq)] + #[diesel(sql_type = TimeseriesFieldTypeEnum)] + pub enum TimeseriesFieldType; + + String => b"string" + I8 => b"i8" + U8 => b"u8" + I16 => b"i16" + U16 => b"u16" + I32 => b"i32" + U32 => b"u32" + I64 => b"i64" + U64 => b"u64" + IpAddr => b"ip_addr" + Uuid => b"uuid" + Bool => b"bool" +} + +impl From for TimeseriesFieldType { + fn from(value: oximeter::FieldType) -> Self { + match value { + oximeter::FieldType::String => Self::String, + oximeter::FieldType::I8 => Self::I8, + oximeter::FieldType::U8 => Self::U8, + oximeter::FieldType::I16 => Self::I16, + oximeter::FieldType::U16 => Self::U16, + oximeter::FieldType::I32 => Self::I32, + oximeter::FieldType::U32 => Self::U32, + oximeter::FieldType::I64 => Self::I64, + oximeter::FieldType::U64 => Self::U64, + oximeter::FieldType::IpAddr => Self::IpAddr, + oximeter::FieldType::Uuid => Self::Uuid, + oximeter::FieldType::Bool => Self::Bool, + } + } +} + +impl From for oximeter::FieldType { + fn from(value: TimeseriesFieldType) -> Self { + match value { + TimeseriesFieldType::String => Self::String, + TimeseriesFieldType::I8 => Self::I8, + TimeseriesFieldType::U8 => Self::U8, + TimeseriesFieldType::I16 => Self::I16, + TimeseriesFieldType::U16 => Self::U16, + TimeseriesFieldType::I32 => Self::I32, + TimeseriesFieldType::U32 => Self::U32, + TimeseriesFieldType::I64 => Self::I64, + TimeseriesFieldType::U64 => Self::U64, + TimeseriesFieldType::IpAddr => Self::IpAddr, + TimeseriesFieldType::Uuid => Self::Uuid, + TimeseriesFieldType::Bool => Self::Bool, + } + } +} + +impl_enum_type! { + #[derive(SqlType, QueryId, Debug, Clone, Copy)] + #[diesel(postgres_type(name = "timeseries_datum_type", schema = "public"))] + pub struct TimeseriesDatumTypeEnum; + + #[derive(Clone, Copy, Debug, AsExpression, FromSqlRow, PartialEq)] + #[diesel(sql_type = TimeseriesDatumTypeEnum)] + pub enum TimeseriesDatumType; + + Bool => b"bool" + I8 => b"i8" + U8 => b"u8" + I16 => b"i16" + U16 => b"u16" + I32 => b"i32" + U32 => b"u32" + I64 => b"i64" + U64 => b"u64" + F32 => b"f32" + F64 => b"f64" + String => b"string" + Bytes => b"bytes" + CumulativeI64 => b"cumulative_i64" + CumulativeU64 => b"cumulative_u64" + CumulativeF32 => b"cumulative_f32" + CumulativeF64 => b"cumulative_f64" + HistogramI8 => b"histogram_i8" + HistogramU8 => b"histogram_u8" + HistogramI16 => b"histogram_i16" + HistogramU16 => b"histogram_u16" + HistogramI32 => b"histogram_i32" + HistogramU32 => b"histogram_u32" + HistogramI64 => b"histogram_i64" + HistogramU64 => b"histogram_u64" + HistogramF32 => b"histogram_f32" + HistogramF64 => b"histogram_f64" +} + +impl From for TimeseriesDatumType { + fn from(value: oximeter::DatumType) -> Self { + match value { + oximeter::DatumType::Bool => Self::Bool, + oximeter::DatumType::I8 => Self::I8, + oximeter::DatumType::U8 => Self::U8, + oximeter::DatumType::I16 => Self::I16, + oximeter::DatumType::U16 => Self::U16, + oximeter::DatumType::I32 => Self::I32, + oximeter::DatumType::U32 => Self::U32, + oximeter::DatumType::I64 => Self::I64, + oximeter::DatumType::U64 => Self::U64, + oximeter::DatumType::F32 => Self::F32, + oximeter::DatumType::F64 => Self::F64, + oximeter::DatumType::String => Self::String, + oximeter::DatumType::Bytes => Self::Bytes, + oximeter::DatumType::CumulativeI64 => Self::CumulativeI64, + oximeter::DatumType::CumulativeU64 => Self::CumulativeU64, + oximeter::DatumType::CumulativeF32 => Self::CumulativeF32, + oximeter::DatumType::CumulativeF64 => Self::CumulativeF64, + oximeter::DatumType::HistogramI8 => Self::HistogramI8, + oximeter::DatumType::HistogramU8 => Self::HistogramU8, + oximeter::DatumType::HistogramI16 => Self::HistogramI16, + oximeter::DatumType::HistogramU16 => Self::HistogramU16, + oximeter::DatumType::HistogramI32 => Self::HistogramI32, + oximeter::DatumType::HistogramU32 => Self::HistogramU32, + oximeter::DatumType::HistogramI64 => Self::HistogramI64, + oximeter::DatumType::HistogramU64 => Self::HistogramU64, + oximeter::DatumType::HistogramF32 => Self::HistogramF32, + oximeter::DatumType::HistogramF64 => Self::HistogramF64, + } + } +} + +impl From for oximeter::DatumType { + fn from(value: TimeseriesDatumType) -> Self { + match value { + TimeseriesDatumType::Bool => Self::Bool, + TimeseriesDatumType::I8 => Self::I8, + TimeseriesDatumType::U8 => Self::U8, + TimeseriesDatumType::I16 => Self::I16, + TimeseriesDatumType::U16 => Self::U16, + TimeseriesDatumType::I32 => Self::I32, + TimeseriesDatumType::U32 => Self::U32, + TimeseriesDatumType::I64 => Self::I64, + TimeseriesDatumType::U64 => Self::U64, + TimeseriesDatumType::F32 => Self::F32, + TimeseriesDatumType::F64 => Self::F64, + TimeseriesDatumType::String => Self::String, + TimeseriesDatumType::Bytes => Self::Bytes, + TimeseriesDatumType::CumulativeI64 => Self::CumulativeI64, + TimeseriesDatumType::CumulativeU64 => Self::CumulativeU64, + TimeseriesDatumType::CumulativeF32 => Self::CumulativeF32, + TimeseriesDatumType::CumulativeF64 => Self::CumulativeF64, + TimeseriesDatumType::HistogramI8 => Self::HistogramI8, + TimeseriesDatumType::HistogramU8 => Self::HistogramU8, + TimeseriesDatumType::HistogramI16 => Self::HistogramI16, + TimeseriesDatumType::HistogramU16 => Self::HistogramU16, + TimeseriesDatumType::HistogramI32 => Self::HistogramI32, + TimeseriesDatumType::HistogramU32 => Self::HistogramU32, + TimeseriesDatumType::HistogramI64 => Self::HistogramI64, + TimeseriesDatumType::HistogramU64 => Self::HistogramU64, + TimeseriesDatumType::HistogramF32 => Self::HistogramF32, + TimeseriesDatumType::HistogramF64 => Self::HistogramF64, + } + } +} + +impl_enum_type! { + #[derive(SqlType, QueryId, Debug, Clone, Copy)] + #[diesel(postgres_type(name = "timeseries_units", schema = "public"))] + pub struct TimeseriesUnitsEnum; + + #[derive(Clone, Copy, Debug, AsExpression, FromSqlRow, PartialEq)] + #[diesel(sql_type = TimeseriesUnitsEnum)] + pub enum TimeseriesUnits; + + Count => b"count" + Bytes => b"bytes" +} + +impl From for TimeseriesUnits { + fn from(value: oximeter::schema::Units) -> Self { + match value { + oximeter::schema::Units::Count => Self::Count, + oximeter::schema::Units::Bytes => Self::Bytes, + } + } +} + +impl From for oximeter::schema::Units { + fn from(value: TimeseriesUnits) -> Self { + match value { + TimeseriesUnits::Count => Self::Count, + TimeseriesUnits::Bytes => Self::Bytes, + } + } +} + +/// The schema of a timeseries. +#[derive(Insertable, Selectable, Queryable, Clone, Debug)] +#[diesel(table_name = timeseries_schema)] +pub struct TimeseriesSchema { + /// The name of the timeseries. + pub timeseries_name: String, + /// The version of the timeseries. + pub version: SqlU8, + /// The authorization scope of the timeseries. + pub authz_scope: TimeseriesAuthzScope, + /// The description of the timeseries's target. + pub target_description: String, + /// The description of the timeseries's metric. + pub metric_description: String, + /// The type of the timeseries's datum. + pub datum_type: TimeseriesDatumType, + /// The units of the timeseries's datum. + pub units: TimeseriesUnits, + pub time_created: DateTime, + pub time_modified: DateTime, +} + +impl TryFrom for oximeter::TimeseriesSchema { + type Error = Error; + + // NOTE: This converts _only_ the parts in the actual `timeseries_schema` + // table, which omits the field schema. those must be added later. + fn try_from(value: TimeseriesSchema) -> Result { + let Ok(timeseries_name) = value.timeseries_name.as_str().try_into() + else { + return Err(Error::internal_error(&format!( + "Invalid timeseries name in database: '{}'", + value.timeseries_name + ))); + }; + let Some(version) = NonZeroU8::new(*value.version) else { + return Err(Error::internal_error(&format!( + "Found zero version number for \ + timeseries '{}' in database", + *value.version, + ))); + }; + Ok(Self { + timeseries_name, + description: oximeter::schema::TimeseriesDescription { + target: value.target_description, + metric: value.metric_description, + }, + field_schema: BTreeSet::new(), + datum_type: value.datum_type.into(), + version, + authz_scope: value.authz_scope.into(), + units: value.units.into(), + created: value.time_created, + }) + } +} + +#[derive(AsChangeset)] +#[diesel(table_name = timeseries_schema)] +pub struct TimeseriesSchemaUpdate { + pub target_description: String, + pub metric_description: String, + pub time_modified: DateTime, +} + +impl TimeseriesSchemaUpdate { + pub fn new(schema: &TimeseriesSchema) -> Self { + Self { + target_description: schema.target_description.clone(), + metric_description: schema.metric_description.clone(), + time_modified: Utc::now(), + } + } +} + +impl From<&oximeter::TimeseriesSchema> for TimeseriesSchema { + fn from(schema: &oximeter::TimeseriesSchema) -> Self { + Self { + timeseries_name: schema.timeseries_name.to_string(), + version: schema.version.get().into(), + authz_scope: schema.authz_scope.into(), + target_description: schema.description.target.clone(), + metric_description: schema.description.metric.clone(), + datum_type: schema.datum_type.into(), + units: schema.units.into(), + time_created: schema.created, + time_modified: schema.created, + } + } +} + +#[derive(Insertable, Selectable, Queryable, Clone, Debug)] +#[diesel(table_name = timeseries_field)] +pub struct TimeseriesField { + /// The name of the timeseries this field belongs to. + pub timeseries_name: String, + /// The name of the field. + pub name: String, + /// The source of the field. + pub source: TimeseriesFieldSource, + /// The type of the field. + pub type_: TimeseriesFieldType, + /// The description of the field. + pub description: String, + pub time_created: DateTime, + pub time_modified: DateTime, +} + +impl From for oximeter::FieldSchema { + fn from(value: TimeseriesField) -> Self { + Self { + name: value.name, + field_type: value.type_.into(), + source: value.source.into(), + description: value.description, + } + } +} + +impl TimeseriesField { + pub fn for_schema(schema: &oximeter::TimeseriesSchema) -> Vec { + schema + .field_schema + .iter() + .map(|field| Self { + timeseries_name: schema.timeseries_name.to_string(), + name: field.name.clone(), + source: field.source.into(), + type_: field.field_type.into(), + description: field.description.clone(), + time_created: schema.created, + time_modified: schema.created, + }) + .collect() + } +} + +/// This type models the mapping from each version of a timeseries schema to a +/// row it contains. +#[derive(Insertable, Selectable, Queryable, Clone, Debug)] +#[diesel(table_name = timeseries_version_field)] +pub struct TimeseriesVersionField { + /// The name of the timeseries this field belongs to. + pub timeseries_name: String, + /// The version of the timeseries this field belongs to. + pub version: SqlU8, + /// The name of the field. + pub field_name: String, +} + +impl TimeseriesVersionField { + pub fn for_schema(schema: &oximeter::TimeseriesSchema) -> Vec { + schema + .field_schema + .iter() + .map(|field| Self { + timeseries_name: schema.timeseries_name.to_string(), + version: schema.version.get().into(), + field_name: field.name.clone(), + }) + .collect() + } +} diff --git a/nexus/db-queries/src/db/datastore/mod.rs b/nexus/db-queries/src/db/datastore/mod.rs index ca7c76c0ae4..70ebee1ab8a 100644 --- a/nexus/db-queries/src/db/datastore/mod.rs +++ b/nexus/db-queries/src/db/datastore/mod.rs @@ -97,6 +97,7 @@ mod switch_interface; mod switch_port; #[cfg(test)] pub(crate) mod test_utils; +mod timeseries_schema; mod update; mod utilization; mod v2p_mapping; diff --git a/nexus/db-queries/src/db/datastore/timeseries_schema.rs b/nexus/db-queries/src/db/datastore/timeseries_schema.rs new file mode 100644 index 00000000000..829aac95507 --- /dev/null +++ b/nexus/db-queries/src/db/datastore/timeseries_schema.rs @@ -0,0 +1,522 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! [`DataStore`] methods on timeseries schema. + +use super::DataStore; +use crate::db::error::public_error_from_diesel; +use crate::db::error::ErrorHandler; +use async_bb8_diesel::AsyncRunQueryDsl; +use diesel::upsert::excluded; +use diesel::BoolExpressionMethods as _; +use diesel::ExpressionMethods as _; +use diesel::JoinOnDsl as _; +use diesel::QueryDsl as _; +use diesel::SelectableHelper as _; +use nexus_auth::authz; +use nexus_auth::context::OpContext; +use nexus_db_model::TimeseriesField; +use nexus_db_model::TimeseriesSchema; +use nexus_db_model::TimeseriesSchemaUpdate; +use nexus_db_model::TimeseriesVersionField; +use omicron_common::api::external::CreateResult; +use omicron_common::api::external::DataPageParams; +use omicron_common::api::external::Error; +use omicron_common::api::external::ListResultVec; +use omicron_common::api::external::LookupResult; +use std::collections::BTreeMap; +use std::num::NonZeroU8; + +impl DataStore { + /// Load timeseries schema from their static definitions. + pub async fn load_timeseries_schema( + &self, + opctx: &OpContext, + ) -> CreateResult<()> { + opctx.authorize(authz::Action::Modify, &authz::DATABASE).await?; + for schema in oximeter::all_timeseries_schema() { + self.upsert_timeseries_schema(opctx, schema).await? + } + Ok(()) + } + + async fn upsert_timeseries_schema( + &self, + opctx: &OpContext, + schema: &oximeter::TimeseriesSchema, + ) -> CreateResult<()> { + use nexus_db_model::schema::timeseries_field::dsl as field_dsl; + use nexus_db_model::schema::timeseries_schema::dsl as schema_dsl; + use nexus_db_model::schema::timeseries_version_field::dsl as version_field_dsl; + + // Insert the schema itself, possibly updating the descriptions. + let conn = self.pool_connection_authorized(opctx).await?; + let schema_row = TimeseriesSchema::from(schema); + let schema_updates = TimeseriesSchemaUpdate::new(&schema_row); + diesel::insert_into(schema_dsl::timeseries_schema) + .values(schema_row) + .on_conflict((schema_dsl::timeseries_name, schema_dsl::version)) + .do_update() + .set(schema_updates) + .execute_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; + + // Insert one record for each of the fields in the schema, also updating + // the description if needed. Importantly, we do not update the type. + let field_rows = TimeseriesField::for_schema(schema); + diesel::insert_into(field_dsl::timeseries_field) + .values(field_rows) + .on_conflict((field_dsl::timeseries_name, field_dsl::name)) + .do_update() + .set(( + // We currently support updating the description or the field + // source, which might happen if someone moved the field from + // the target to metric. + field_dsl::description.eq(excluded(field_dsl::description)), + field_dsl::source.eq(excluded(field_dsl::source)), + field_dsl::time_modified.eq(diesel::dsl::now), + )) + .execute_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; + + // Insert the mapping between the schema version and the fields it + // refers to. This uses the entire record as the PK, so conflicts can be + // ignored -- any mapping from a field name to a version of a timeseries + // schema that we already have is OK. + let field_mappings = TimeseriesVersionField::for_schema(schema); + diesel::insert_into(version_field_dsl::timeseries_version_field) + .values(field_mappings) + .on_conflict_do_nothing() + .execute_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + .map(|_| ()) + } + + pub async fn fetch_timeseries_schema( + &self, + opctx: &OpContext, + timeseries_name: &oximeter::TimeseriesName, + version: NonZeroU8, + ) -> LookupResult { + use nexus_db_model::schema::timeseries_field::dsl as field_dsl; + use nexus_db_model::schema::timeseries_schema::dsl as schema_dsl; + use nexus_db_model::schema::timeseries_version_field::dsl as version_field_dsl; + + // TODO-security: Correct authorization here. For now, must be a fleet + // reader. + opctx.authorize(authz::Action::Read, &authz::FLEET).await?; + let conn = self.pool_connection_authorized(opctx).await?; + + // See `list_timeseries_schema` for details on this JOIN query. + let mut rows = + schema_dsl::timeseries_schema + .inner_join(field_dsl::timeseries_field.on( + schema_dsl::timeseries_name.eq(field_dsl::timeseries_name), + )) + .inner_join( + version_field_dsl::timeseries_version_field.on( + schema_dsl::timeseries_name + .eq(version_field_dsl::timeseries_name) + .and( + schema_dsl::version + .eq(version_field_dsl::version), + ) + .and( + field_dsl::name + .eq(version_field_dsl::field_name), + ), + ), + ) + .select(<( + TimeseriesSchema, + TimeseriesField, + TimeseriesVersionField, + )>::as_select()) + .filter( + schema_dsl::timeseries_name + .eq(timeseries_name.to_string()) + .and(schema_dsl::version.eq(i16::from(version.get()))), + ) + .load_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))? + .into_iter(); + + // Extract just the schema itself from the JOIN result. + // + // This has no fields, but has all the other metadata for the schema. + let Some((schema_row, field_row, version_row)) = rows.next() else { + // TODO-correctness: This "should" be a 404, but that all relies a + // bit on the authz story making more sense. + return Err(Error::invalid_request(format!( + "Timeseries '{}' version {} not found", + timeseries_name, version, + ))); + }; + let mut schema = oximeter::TimeseriesSchema::try_from(schema_row)?; + + // Attach all field rows, including the one we already fetched above. + let first_row = std::iter::once((field_row, version_row)); + let remaining_rows = rows.into_iter().map(|(_, f, v)| (f, v)); + for (field_row, version_row) in first_row.chain(remaining_rows) { + let is_new = schema.field_schema.insert(field_row.into()); + if !is_new { + return Err(Error::internal_error(&format!( + "while fetching the schema for timeseries '{}' \ + version {}, the field '{}' appears duplicated", + timeseries_name, version, version_row.field_name, + ))); + } + } + Ok(schema) + } + + /// Fetch a page of timeseries schema from the database. + pub async fn list_timeseries_schema( + &self, + opctx: &OpContext, + pagparams: &DataPageParams<'_, (oximeter::TimeseriesName, NonZeroU8)>, + ) -> ListResultVec { + use nexus_db_model::schema::timeseries_field::dsl as field_dsl; + use nexus_db_model::schema::timeseries_schema::dsl as schema_dsl; + use nexus_db_model::schema::timeseries_version_field::dsl as version_field_dsl; + + // For now, must be a fleet reader. + // + // TODO-correctness: Relax this, possibly to the set of things the + // particular user can see, though that might not be easily expressed. + opctx.authorize(authz::Action::Read, &authz::FLEET).await?; + let conn = self.pool_connection_authorized(opctx).await?; + + // Create the pagination marker. + // + // Note that we always need _something_ to put in the WHERE clause, to + // avoid tripping the full-table-scan check. + let (timeseries_name, version) = match pagparams.marker { + Some((n, v)) => (n.to_string(), *v), + None => (String::new(), unsafe { NonZeroU8::new_unchecked(1) }), + }; + + // Selecting the schema involves a 3-table JOIN, matching up rows from + // the following tables: + // + // - `timeseries_schema` + // - `timeseries_field` + // - `timeseries_version_field` + // + // We match up all tables on the timeseries name; and the versioned + // table additionally on the field name and timeseries version. + let query = + schema_dsl::timeseries_schema + // JOIN between the timeseries schema and field tables just using + // the timeseries name + .inner_join(field_dsl::timeseries_field.on( + schema_dsl::timeseries_name.eq(field_dsl::timeseries_name), + )) + // JOIN between that and the versioned field table using: + // - The timeseries name for the schema and versioned field table + // - The version the schema and versioned field table + // - The field name for the field and versioned field table + .inner_join( + version_field_dsl::timeseries_version_field.on( + schema_dsl::timeseries_name + .eq(version_field_dsl::timeseries_name) + .and( + schema_dsl::version + .eq(version_field_dsl::version), + ) + .and( + field_dsl::name + .eq(version_field_dsl::field_name), + ), + ), + ) + // Select the record type from each table. + .select(<( + TimeseriesSchema, + TimeseriesField, + TimeseriesVersionField, + )>::as_select()) + // Filter to the pagination marker, and limit the results to its + // page size. + .filter( + schema_dsl::timeseries_name + .eq(timeseries_name.clone()) + .and(schema_dsl::version.gt(i16::from(version.get()))), + ) + .or_filter( + schema_dsl::timeseries_name.gt(timeseries_name.clone()), + ) + .order(schema_dsl::timeseries_name.asc()) + .then_order_by(schema_dsl::version.asc()) + .limit(i64::from(pagparams.limit.get())); + + // Select the actual rows from the JOIN result. + let rows = query + .load_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; + + // Rebuild the list of schema, by merging all the fields from each + // version with the row from the `timeseries_schema` table. We'll do + // this by building a mapping from name/version to the current schema, + // and add fields into it as we process the JOIN result. + let mut schema = BTreeMap::< + (oximeter::TimeseriesName, NonZeroU8), + oximeter::TimeseriesSchema, + >::new(); + for (schema_row, field_row, versioned_row) in rows.into_iter() { + let Ok(timeseries_name) = oximeter::TimeseriesName::try_from( + schema_row.timeseries_name.as_str(), + ) else { + return Err(Error::internal_error(&format!( + "found invalid timeseries name in the database: '{}'", + schema_row.timeseries_name, + ))); + }; + + // Find the current schema, or add the one built from this current + // row of the schema table. This will have an empty set of fields, + // which will always be added to after looking up the schema. + let Some(version) = NonZeroU8::new(*schema_row.version) else { + return Err(Error::internal_error(&format!( + "database contains invalid version \ + number of 0 for timeseries '{}'", + timeseries_name, + ))); + }; + let key = (timeseries_name.clone(), version); + let is_new = schema + .entry(key) + .or_insert_with(|| { + oximeter::TimeseriesSchema::try_from(schema_row) + .expect("fallible parts checked above (name, version)") + }) + .field_schema + .insert(field_row.into()); + if !is_new { + return Err(Error::internal_error(&format!( + "while fetching the schema for timeseries '{}' \ + version {}, the field '{}' appears duplicated", + timeseries_name, version, versioned_row.field_name, + ))); + } + } + + Ok(schema.into_values().collect()) + } +} + +#[cfg(test)] +mod tests { + use crate::db::datastore::test_utils::datastore_test; + use chrono::Utc; + use dropshot::PaginationOrder; + use nexus_test_utils::db::test_setup_database; + use omicron_common::api::external::DataPageParams; + use omicron_test_utils::dev; + use oximeter::schema::AuthzScope; + use oximeter::schema::FieldSource; + use oximeter::schema::TimeseriesDescription; + use oximeter::schema::TimeseriesSchema; + use oximeter::schema::Units; + use oximeter::DatumType; + use oximeter::FieldSchema; + use oximeter::FieldType; + use std::collections::BTreeSet; + use std::num::NonZeroU32; + + #[tokio::test] + async fn correctly_recall_timeseries_schema() { + let logctx = dev::test_setup_log("correctly_recall_timeseries_schema"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + + let mut expected_schema = vec![ + TimeseriesSchema { + timeseries_name: "foo:bar".try_into().unwrap(), + description: TimeseriesDescription { + target: "a target".into(), + metric: "a metric".into(), + }, + field_schema: BTreeSet::from([ + FieldSchema { + name: "f0".into(), + field_type: FieldType::Uuid, + source: FieldSource::Target, + description: "target field 0".into(), + }, + FieldSchema { + name: "f1".into(), + field_type: FieldType::Bool, + source: FieldSource::Metric, + description: "metric field 1".into(), + }, + ]), + datum_type: DatumType::HistogramI64, + version: 1.try_into().unwrap(), + authz_scope: AuthzScope::Fleet, + units: Units::Count, + created: Utc::now(), + }, + TimeseriesSchema { + timeseries_name: "foo:bar".try_into().unwrap(), + description: TimeseriesDescription { + target: "a target".into(), + metric: "a metric".into(), + }, + field_schema: BTreeSet::from([ + FieldSchema { + name: "f0".into(), + field_type: FieldType::Uuid, + source: FieldSource::Target, + description: "target field 0".into(), + }, + FieldSchema { + name: "f1".into(), + field_type: FieldType::Bool, + source: FieldSource::Metric, + description: "metric field 1".into(), + }, + ]), + datum_type: DatumType::HistogramI64, + version: 2.try_into().unwrap(), + authz_scope: AuthzScope::Fleet, + units: Units::Count, + created: Utc::now(), + }, + TimeseriesSchema { + timeseries_name: "baz:quux".try_into().unwrap(), + description: TimeseriesDescription { + target: "a target".into(), + metric: "a metric".into(), + }, + field_schema: BTreeSet::from([ + FieldSchema { + name: "f0".into(), + field_type: FieldType::String, + source: FieldSource::Target, + description: "target field 0".into(), + }, + FieldSchema { + name: "f1".into(), + field_type: FieldType::IpAddr, + source: FieldSource::Metric, + description: "metric field 1".into(), + }, + ]), + datum_type: DatumType::U64, + version: 1.try_into().unwrap(), + authz_scope: AuthzScope::ViewableToAll, + units: Units::Count, + created: Utc::now(), + }, + ]; + for schema in expected_schema.iter() { + datastore + .upsert_timeseries_schema(&opctx, schema) + .await + .expect("expected to insert valid schema"); + } + let pagparams = DataPageParams { + marker: None, + direction: PaginationOrder::Ascending, + limit: NonZeroU32::new(u32::MAX).unwrap(), + }; + let schema = datastore + .list_timeseries_schema(&opctx, &pagparams) + .await + .expect("expected to list previously-inserted schema"); + + // The DB will always return them sorted by name / version, so we'll do + // that to our inputs to compare easily. + expected_schema.sort_by(|a, b| { + a.timeseries_name + .cmp(&b.timeseries_name) + .then(a.version.cmp(&b.version)) + }); + assert_eq!( + expected_schema, schema, + "Failed to recall expected timeseries schema from database", + ); + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn upsert_schema_updates_descriptions() { + let logctx = dev::test_setup_log("upsert_schema_updates_descriptions"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + + let original_schema = TimeseriesSchema { + timeseries_name: "foo:bar".try_into().unwrap(), + description: TimeseriesDescription { + target: "a target".into(), + metric: "a metric".into(), + }, + field_schema: BTreeSet::from([ + FieldSchema { + name: "f0".into(), + field_type: FieldType::Uuid, + source: FieldSource::Target, + description: "target field 0".into(), + }, + FieldSchema { + name: "f1".into(), + field_type: FieldType::Bool, + source: FieldSource::Metric, + description: "metric field 1".into(), + }, + ]), + datum_type: DatumType::HistogramI64, + version: 1.try_into().unwrap(), + authz_scope: AuthzScope::Fleet, + units: Units::Count, + created: Utc::now(), + }; + datastore + .upsert_timeseries_schema(&opctx, &original_schema) + .await + .expect("expected to insert valid schema"); + + // Upsert a schema with modified descriptions. + let mut modified_schema = original_schema.clone(); + modified_schema.description.target = "a new target".into(); + modified_schema.description.metric = "a new metric".into(); + let field = modified_schema.field_schema.pop_first().unwrap(); + modified_schema.field_schema.replace(FieldSchema { + description: "new target field 0".into(), + ..field + }); + datastore + .upsert_timeseries_schema(&opctx, &modified_schema) + .await + .expect("expected to insert schema with only new descriptions"); + + let schema = datastore + .fetch_timeseries_schema( + &opctx, + &original_schema.timeseries_name, + original_schema.version, + ) + .await + .expect("expected to fetch updated timeseries schema"); + assert_eq!( + schema, modified_schema, + "Failed to upsert new schema with modified descriptions", + ); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn upsert_schema_fails_with_duplicate_field_names() { + todo!(); + } +} diff --git a/nexus/db-queries/src/db/pagination.rs b/nexus/db-queries/src/db/pagination.rs index 4fc1cf59669..94b0f556210 100644 --- a/nexus/db-queries/src/db/pagination.rs +++ b/nexus/db-queries/src/db/pagination.rs @@ -70,8 +70,8 @@ where } } -/// Uses `pagparams` to list a subset of rows in `table`, ordered by `c1, and -/// then by `c2. +/// Uses `pagparams` to list a subset of rows in `table`, ordered by `c1`, and +/// then by `c2`. /// /// This is a two-column variation of the [`paginated`] function. // NOTE: This function could probably be made generic over an arbitrary number diff --git a/nexus/src/populate.rs b/nexus/src/populate.rs index 724b25162d3..8c71c3bcb87 100644 --- a/nexus/src/populate.rs +++ b/nexus/src/populate.rs @@ -334,7 +334,23 @@ impl Populator for PopulateRack { } } -const ALL_POPULATORS: [&dyn Populator; 10] = [ +#[derive(Debug)] +struct PopulateTimeseriesSchema; +impl Populator for PopulateTimeseriesSchema { + fn populate<'a, 'b>( + &self, + opctx: &'a OpContext, + datastore: &'a DataStore, + _args: &'a PopulateArgs, + ) -> BoxFuture<'b, Result<(), Error>> + where + 'a: 'b, + { + async { datastore.load_timeseries_schema(opctx).await }.boxed() + } +} + +const ALL_POPULATORS: [&dyn Populator; 11] = [ &PopulateBuiltinUsers {}, &PopulateBuiltinRoles {}, &PopulateBuiltinRoleAssignments {}, @@ -345,6 +361,7 @@ const ALL_POPULATORS: [&dyn Populator; 10] = [ &PopulateSiloUserRoleAssignments {}, &PopulateFleet {}, &PopulateRack {}, + &PopulateTimeseriesSchema {}, ]; #[cfg(test)] diff --git a/oximeter/impl/src/schema/codegen.rs b/oximeter/impl/src/schema/codegen.rs index 4aa09cf1361..7fa39744377 100644 --- a/oximeter/impl/src/schema/codegen.rs +++ b/oximeter/impl/src/schema/codegen.rs @@ -24,37 +24,22 @@ use quote::quote; /// Emit types for using one timeseries definition. /// /// Provided with a TOML-formatted schema definition, this emits Rust types for -/// the target and metric from the latest version; and a function that returns -/// the `TimeseriesSchema` for _all_ versions of the timeseries. +/// the target and metric from the latest version. /// -/// Both of these items are emitted in a module with the same name as the +/// All of these items are emitted in a module with the same name as the /// target. pub fn use_timeseries(contents: &str) -> Result { let schema = load_schema(contents)?; let latest = find_schema_version(schema.iter().cloned(), None); let mod_name = quote::format_ident!("{}", latest[0].target_name()); let types = emit_schema_types(latest); - let func = emit_schema_function(schema.into_iter()); Ok(quote! { pub mod #mod_name { #types - #func } }) } -fn emit_schema_function( - list: impl Iterator, -) -> TokenStream { - quote! { - pub fn timeseries_schema() -> Vec<::oximeter::schema::TimeseriesSchema> { - vec![ - #(#list),* - ] - } - } -} - fn emit_schema_types(list: Vec) -> TokenStream { let first_schema = list.first().expect("load_schema ensures non-empty"); let target_def = emit_target(first_schema); diff --git a/oximeter/oximeter/Cargo.toml b/oximeter/oximeter/Cargo.toml index c04d1bd3ae6..23a9c351fb8 100644 --- a/oximeter/oximeter/Cargo.toml +++ b/oximeter/oximeter/Cargo.toml @@ -20,3 +20,10 @@ prettyplease.workspace = true syn.workspace = true toml.workspace = true uuid.workspace = true + +[build-dependencies] +anyhow.workspace = true +oximeter-impl.workspace = true +prettyplease.workspace = true +quote.workspace = true +syn.workspace = true diff --git a/oximeter/oximeter/build.rs b/oximeter/oximeter/build.rs new file mode 100644 index 00000000000..559c0932ef5 --- /dev/null +++ b/oximeter/oximeter/build.rs @@ -0,0 +1,56 @@ +use anyhow::Context; +use anyhow::Result; +use oximeter_impl::schema::ir::load_schema; +use oximeter_impl::schema::SCHEMA_DIRECTORY; +use std::env; +use std::fs; +use std::path; + +fn main() -> Result<()> { + let mut all_schema = Vec::new(); + for entry in fs::read_dir(SCHEMA_DIRECTORY).with_context(|| { + format!("failed to read schema directory '{SCHEMA_DIRECTORY}'") + })? { + let entry = entry.with_context(|| { + format!( + "failed to read entry in schema directory '{SCHEMA_DIRECTORY}'" + ) + })?; + let path = entry.path(); + let contents = fs::read_to_string(&path).with_context(|| { + format!("failed to read schema file '{}'", path.display(),) + })?; + let schema = load_schema(&contents).with_context(|| { + format!("failed to load schema from '{}'", path.display()) + })?; + all_schema.extend(schema); + println!("cargo::rerun-if-changed={}", path.display()); + } + + let len = all_schema.len(); + let tokens = quote::quote! { + /// Return all timeseries schema known at build time. + pub fn all_timeseries_schema() -> &'static [::oximeter::TimeseriesSchema] { + static SCHEMA: + ::std::sync::OnceLock<[::oximeter::TimeseriesSchema; #len]> + = ::std::sync::OnceLock::new(); + SCHEMA.get_or_init(|| { + [ + #( #all_schema, )* + ] + }) + } + }; + + let file = syn::parse_file(&tokens.to_string()) + .context("failed to parse schema function as syn::File")?; + let contents = prettyplease::unparse(&file); + let out_dir = env::var("OUT_DIR").expect("Cargo should set OUT_DIR"); + let out_file = path::Path::new(&out_dir).join("all_schema.rs"); + fs::write(&out_file, contents).with_context(|| { + format!( + "failed to write all schema function to '{}'", + out_file.display() + ) + }) +} diff --git a/oximeter/oximeter/schema/physical-data-link.toml b/oximeter/oximeter/schema/physical-data-link.toml index d526aa6af19..ee16b9953dd 100644 --- a/oximeter/oximeter/schema/physical-data-link.toml +++ b/oximeter/oximeter/schema/physical-data-link.toml @@ -14,31 +14,31 @@ versions = [ [fields.rack_id] type = "uuid" -description = "UUID for the link's sled" +description = "UUID for the rack hosting the sled" [fields.sled_id] type = "uuid" -description = "UUID for the link's sled" +description = "UUID for the sled hosting the link" [fields.hostname] type = "string" -description = "Hostname of the link's sled" +description = "Hostname of the sled hosting the link" [fields.model] type = "string" -description = "Model number of the link's sled" +description = "Model number of the sled hosting the link" [fields.revision] type = "u32" -description = "Revision number of the sled" +description = "Revision number of the sled hosting the link" [fields.serial] type = "string" -description = "Serial number of the sled" +description = "Serial number of the sled hosting the link" [fields.link_name] type = "string" -description = "Name of the physical data link" +description = "Name of the data link" [[metrics]] name = "bytes_sent" diff --git a/oximeter/oximeter/src/lib.rs b/oximeter/oximeter/src/lib.rs index 9dd8fab47a8..c1c03fc029a 100644 --- a/oximeter/oximeter/src/lib.rs +++ b/oximeter/oximeter/src/lib.rs @@ -188,6 +188,9 @@ pub use oximeter_impl::*; pub use oximeter_timeseries_macro::use_timeseries; +extern crate self as oximeter; +include!(concat!(env!("OUT_DIR"), "/all_schema.rs")); + #[cfg(test)] mod test { use oximeter_impl::schema::ir::load_schema; diff --git a/schema/crdb/add-timeseries-schema-tables/up01.sql b/schema/crdb/add-timeseries-schema-tables/up01.sql new file mode 100644 index 00000000000..44d2a4e28de --- /dev/null +++ b/schema/crdb/add-timeseries-schema-tables/up01.sql @@ -0,0 +1,6 @@ +CREATE TYPE IF NOT EXISTS omicron.public.authz_scope AS ENUM ( + 'fleet', + 'silo', + 'project', + 'viewable_to_all' +); diff --git a/schema/crdb/add-timeseries-schema-tables/up02.sql b/schema/crdb/add-timeseries-schema-tables/up02.sql new file mode 100644 index 00000000000..cc4dfa44e0e --- /dev/null +++ b/schema/crdb/add-timeseries-schema-tables/up02.sql @@ -0,0 +1,4 @@ +CREATE TYPE IF NOT EXISTS omicron.public.timeseries_field_source AS ENUM ( + 'target', + 'metric' +); diff --git a/schema/crdb/add-timeseries-schema-tables/up03.sql b/schema/crdb/add-timeseries-schema-tables/up03.sql new file mode 100644 index 00000000000..cdeaa3e6995 --- /dev/null +++ b/schema/crdb/add-timeseries-schema-tables/up03.sql @@ -0,0 +1,14 @@ +CREATE TYPE IF NOT EXISTS omicron.public.timeseries_field_type AS ENUM ( + 'string', + 'i8', + 'u8', + 'i16', + 'u16', + 'i32', + 'u32', + 'i64', + 'u64', + 'ip_addr', + 'uuid', + 'bool' +); diff --git a/schema/crdb/add-timeseries-schema-tables/up04.sql b/schema/crdb/add-timeseries-schema-tables/up04.sql new file mode 100644 index 00000000000..1b04203ae38 --- /dev/null +++ b/schema/crdb/add-timeseries-schema-tables/up04.sql @@ -0,0 +1,29 @@ +CREATE TYPE IF NOT EXISTS omicron.public.timeseries_datum_type AS ENUM ( + 'bool' + 'i8', + 'u8', + 'i16', + 'u16', + 'i32', + 'u32', + 'i64', + 'u64', + 'f32', + 'f64', + 'string', + 'bytes', + 'cumulative_i64', + 'cumulative_u64', + 'cumulative_f32', + 'cumulative_f64', + 'histogram_i8', + 'histogram_u8', + 'histogram_i16', + 'histogram_u16', + 'histogram_i32', + 'histogram_u32', + 'histogram_i64', + 'histogram_u64', + 'histogram_f32', + 'histogram_f64' +); diff --git a/schema/crdb/add-timeseries-schema-tables/up05.sql b/schema/crdb/add-timeseries-schema-tables/up05.sql new file mode 100644 index 00000000000..9fac0518ffe --- /dev/null +++ b/schema/crdb/add-timeseries-schema-tables/up05.sql @@ -0,0 +1,4 @@ +CREATE TYPE IF NOT EXISTS omicron.public.timeseries_units AS ENUM ( + 'count', + 'bytes' +); diff --git a/schema/crdb/add-timeseries-schema-tables/up06.sql b/schema/crdb/add-timeseries-schema-tables/up06.sql new file mode 100644 index 00000000000..ad42166a24d --- /dev/null +++ b/schema/crdb/add-timeseries-schema-tables/up06.sql @@ -0,0 +1,12 @@ +CREATE TABLE IF NOT EXISTS omicron.public.timeseries_schema ( + timeseries_name STRING(128) NOT NULL, + version INT2 NOT NULL CHECK (version > 0), + authz_scope omicron.public.authz_scope NOT NULL, + target_description STRING(512) NOT NULL, + metric_description STRING(512) NOT NULL, + datum_type omicron.public.timeseries_datum_type NOT NULL, + units omicron.public.timeseries_units NOT NULL, + time_created TIMESTAMPTZ NOT NULL, + time_modified TIMESTAMPTZ NOT NULL, + PRIMARY KEY (timeseries_name, version) +); diff --git a/schema/crdb/add-timeseries-schema-tables/up07.sql b/schema/crdb/add-timeseries-schema-tables/up07.sql new file mode 100644 index 00000000000..eeedacd24e5 --- /dev/null +++ b/schema/crdb/add-timeseries-schema-tables/up07.sql @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS omicron.public.timeseries_field ( + timeseries_name STRING(128) NOT NULL, + name STRING(128) NOT NULL, + type omicron.public.timeseries_field_type NOT NULL, + source omicron.public.timeseries_field_source NOT NULL, + description STRING(512) NOT NULL, + time_created TIMESTAMPTZ NOT NULL, + time_modified TIMESTAMPTZ NOT NULL, + PRIMARY KEY (timeseries_name, name) +); diff --git a/schema/crdb/add-timeseries-schema-tables/up08.sql b/schema/crdb/add-timeseries-schema-tables/up08.sql new file mode 100644 index 00000000000..7e21b8e4453 --- /dev/null +++ b/schema/crdb/add-timeseries-schema-tables/up08.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS omicron.public.timeseries_version_field ( + timeseries_name STRING(128) NOT NULL, + version INT2 NOT NULL CHECK (version > 0), + field_name STRING(128) NOT NULL, + PRIMARY KEY (timeseries_name, version, field_name) +); diff --git a/schema/crdb/dbinit.sql b/schema/crdb/dbinit.sql index 905fd111c18..4cedd80bd9b 100644 --- a/schema/crdb/dbinit.sql +++ b/schema/crdb/dbinit.sql @@ -4087,6 +4087,140 @@ CREATE INDEX IF NOT EXISTS lookup_region_snapshot_by_snapshot_id on omicron.publ snapshot_id ); + +/* + *Timeseries schema tables. + */ + +-- The authorization scope indicates the level at which we check a user's +-- authorization when running a timeseries query. +CREATE TYPE IF NOT EXISTS omicron.public.timeseries_authz_scope AS ENUM ( + -- Timeseries data is only visible to a fleet reader. + 'fleet', + -- Timeseries data is limited to the silo of the user. + 'silo', + -- Timeseries data is limited to the project of the user. + 'project', + -- Timeseries data is viewable to all without limitation. + 'viewable_to_all' +); + +-- Indicates whether a field of a timeseries derives from the target or metric. +CREATE TYPE IF NOT EXISTS omicron.public.timeseries_field_source AS ENUM ( + 'target', + 'metric' +); + +-- The data type of a timeseries field. +CREATE TYPE IF NOT EXISTS omicron.public.timeseries_field_type AS ENUM ( + 'string', + 'i8', + 'u8', + 'i16', + 'u16', + 'i32', + 'u32', + 'i64', + 'u64', + 'ip_addr', + 'uuid', + 'bool' +); + +-- The data type of a timeseries's datum, the actual value it measures. +CREATE TYPE IF NOT EXISTS omicron.public.timeseries_datum_type AS ENUM ( + 'bool' + 'i8', + 'u8', + 'i16', + 'u16', + 'i32', + 'u32', + 'i64', + 'u64', + 'f32', + 'f64', + 'string', + 'bytes', + 'cumulative_i64', + 'cumulative_u64', + 'cumulative_f32', + 'cumulative_f64', + 'histogram_i8', + 'histogram_u8', + 'histogram_i16', + 'histogram_u16', + 'histogram_i32', + 'histogram_u32', + 'histogram_i64', + 'histogram_u64', + 'histogram_f32', + 'histogram_f64' +); + +-- Units of a timeseries's datum. +CREATE TYPE IF NOT EXISTS omicron.public.timeseries_units AS ENUM ( + 'count', + 'bytes' +); + +-- Timeseries schema describe the structure of a timeseries. +-- +-- These are defined in TOML in `oximeter/oximeter/schema`, and loaded into the +-- database on startup. +CREATE TABLE IF NOT EXISTS omicron.public.timeseries_schema ( + -- The name of the timeseries. + -- + -- Names are derived from the concatenation of the target and metric names, + -- joined with a colon (':'). + timeseries_name STRING(128) NOT NULL, + -- The version number of this timeseries's schema. + version INT2 NOT NULL CHECK (version > 0), + -- The authorization scope of the timeseries. + authz_scope omicron.public.timeseries_authz_scope NOT NULL, + -- Textual description of the timeseries's target. + target_description STRING(512) NOT NULL, + -- Textual description of the timeseries's metric. + metric_description STRING(512) NOT NULL, + -- Type of the actual datum the timeseries measures. + datum_type omicron.public.timeseries_datum_type NOT NULL, + -- Units of the timeseries's datum. + units omicron.public.timeseries_units NOT NULL, + time_created TIMESTAMPTZ NOT NULL, + time_modified TIMESTAMPTZ NOT NULL, + PRIMARY KEY (timeseries_name, version) +); + +-- Fields for each timeseries schema. +CREATE TABLE IF NOT EXISTS omicron.public.timeseries_field ( + -- Name of the timeseries. + timeseries_name STRING(128) NOT NULL, + -- Name of the field. + name STRING(128) NOT NULL, + -- Data type of the field. + type_ omicron.public.timeseries_field_type NOT NULL, + -- Source of the field, either 'target' or 'metric'. + source omicron.public.timeseries_field_source NOT NULL, + -- Textual description for the timeseries field. + description STRING(512) NOT NULL, + time_created TIMESTAMPTZ NOT NULL, + time_modified TIMESTAMPTZ NOT NULL, + -- This primary key enforces that there is exactly one field with a + -- particular name in _all_ versions of a timeseries schema. In the future, + -- this will likely need to be relaxed, to enforcing a unique name within a + -- single version of the timeseries only. At that point, we can inline the + -- `timeseries_fields_by_version` table as well. + PRIMARY KEY (timeseries_name, name) +); + +-- List of fields by name on each version of the timeseries schema +CREATE TABLE IF NOT EXISTS omicron.public.timeseries_version_field ( + timeseries_name STRING(128) NOT NULL, + version INT2 NOT NULL CHECK (version > 0), + field_name STRING(128) NOT NULL, + PRIMARY KEY (timeseries_name, version, field_name) +); + /* * Keep this at the end of file so that the database does not contain a version * until it is fully populated. @@ -4098,7 +4232,7 @@ INSERT INTO omicron.public.db_metadata ( version, target_version ) VALUES - (TRUE, NOW(), NOW(), '77.0.0', NULL) + (TRUE, NOW(), NOW(), '78.0.0', NULL) ON CONFLICT DO NOTHING; COMMIT;