From 49bc27d35b3968cc80a0180a186b7a48da349d65 Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Fri, 5 Apr 2024 16:12:41 +0000 Subject: [PATCH] playing around --- Cargo.lock | 4 + oximeter/oximeter/Cargo.toml | 4 + .../physical_data_link_bytes_received.ron | 61 ++ oximeter/oximeter/src/foo.rs | 198 ++++++ oximeter/oximeter/src/lib.rs | 1 + oximeter/oximeter/src/schema.rs | 34 +- oximeter/oximeter/src/schemagen.rs | 577 ++++++++++++++++++ 7 files changed, 877 insertions(+), 2 deletions(-) create mode 100644 oximeter/oximeter/schema/physical_data_link_bytes_received.ron create mode 100644 oximeter/oximeter/src/foo.rs create mode 100644 oximeter/oximeter/src/schemagen.rs diff --git a/Cargo.lock b/Cargo.lock index 1dfaff0d779..b353363a43a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6139,20 +6139,24 @@ dependencies = [ name = "oximeter" version = "0.1.0" dependencies = [ + "anyhow", "approx", "bytes", "chrono", + "heck 0.5.0", "num", "omicron-common", "omicron-workspace-hack", "oximeter-macro-impl", "regex", + "ron 0.8.1", "rstest", "schemars", "serde", "serde_json", "strum", "thiserror", + "toml 0.8.13", "trybuild", "uuid", ] diff --git a/oximeter/oximeter/Cargo.toml b/oximeter/oximeter/Cargo.toml index 2445e0483a4..93dd51e65c7 100644 --- a/oximeter/oximeter/Cargo.toml +++ b/oximeter/oximeter/Cargo.toml @@ -9,8 +9,10 @@ license = "MPL-2.0" workspace = true [dependencies] +anyhow.workspace = true bytes = { workspace = true, features = [ "serde" ] } chrono.workspace = true +heck.workspace = true num.workspace = true omicron-common.workspace = true oximeter-macro-impl.workspace = true @@ -22,9 +24,11 @@ strum.workspace = true thiserror.workspace = true uuid.workspace = true omicron-workspace-hack.workspace = true +ron = "0.8.1" [dev-dependencies] approx.workspace = true rstest.workspace = true serde_json.workspace = true +toml.workspace = true trybuild.workspace = true diff --git a/oximeter/oximeter/schema/physical_data_link_bytes_received.ron b/oximeter/oximeter/schema/physical_data_link_bytes_received.ron new file mode 100644 index 00000000000..6974ec59593 --- /dev/null +++ b/oximeter/oximeter/schema/physical_data_link_bytes_received.ron @@ -0,0 +1,61 @@ +[ + ( + name: "physical_data_link", + description: "A physical link", + versions: { + 1: ( + ) + }, + embed_fields: [], + fields_by_version: { + 0: { + "sled_model": string, + "sled_revision": u32, + "sled_serial": string, + "link_name": string, + }, + } + metrics: [ + ( + name: "bytes_received", + description: "count of bytes received", + fields_by_version: {0: {}}, + datum_type: cumulative_u64, + units: bytes, + ), + ( + name: "bytes_sent", + description: "count of bytes sent", + fields_by_version: {0: {}}, + datum_type: cumulative_u64, + units: bytes, + ) + ] + ), + ( + name: "http_service", + description: "An Oxide HTTP service", + embed_fields: [], + fields: { + 0: { + "name": string, + "id": uuid, + }, + }, + metrics: [ + ( + name: "request_latency_histogram", + description: "Distribution of request latencies", + fields: { + 0: { + "route": string, + "method": string, + "status_code": u16, + }, + } + datum_type: histogram_f64, + units: seconds, + ) + ] + ) +] diff --git a/oximeter/oximeter/src/foo.rs b/oximeter/oximeter/src/foo.rs new file mode 100644 index 00000000000..77c9bc8288d --- /dev/null +++ b/oximeter/oximeter/src/foo.rs @@ -0,0 +1,198 @@ +use oximeter::Datum; +use oximeter::DatumType; +use oximeter::FieldType; +use oximeter::FieldValue; +use std::collections::BTreeMap; + +#[derive(Clone, Debug)] +struct TimeseriesSchema { + target: TargetSchema, + metric: MetricSchema, +} + +impl TimeseriesSchema { + fn new(target: TargetSchema, metric: MetricSchema) -> Result { + for tf in target.fields.keys() { + if metric.fields.contains_key(tf) { + return Err(()); + } + } + Ok(Self { target, metric }) + } +} + +#[derive(Clone, Debug)] +struct FieldBuilder { + name: String, + fields: BTreeMap, +} + +impl FieldBuilder { + fn new(name: &str) -> Result { + // validate name + Ok(Self { name: name.to_string(), fields: BTreeMap::new() }) + } + + fn field(mut self, field_name: &str, field_type: FieldType) -> Self { + let _ = self.fields.insert(field_name.to_string(), field_type); + self + } + + fn build_target(self) -> TargetSchema { + TargetSchema { name: self.name, fields: self.fields } + } + + fn build_metric(self, datum_type: DatumType) -> MetricSchema { + MetricSchema { name: self.name, fields: self.fields, datum_type } + } +} + +#[derive(Clone, Debug)] +struct TargetSchema { + name: String, + fields: BTreeMap, +} + +#[derive(Clone, Debug)] +struct MetricSchema { + name: String, + fields: BTreeMap, + datum_type: DatumType, +} + +#[derive(Clone, Debug, Default)] +struct Target { + name: String, + fields: BTreeMap, +} + +#[derive(Clone, Debug)] +struct Metric { + name: String, + fields: BTreeMap, + datum_type: DatumType, + datum: Option, +} + +#[derive(Clone, Debug)] +struct Timeseries { + target: Target, + metric: Metric, +} + +#[derive(Clone, Debug)] +struct TimeseriesBuilder<'a> { + schema: &'a TimeseriesSchema, + target_fields: BTreeMap, + metric_fields: BTreeMap, +} + +impl<'a> TimeseriesBuilder<'a> { + fn field>( + mut self, + field_name: &str, + field_value: V, + ) -> Result { + // Find the field in either the target or metric fields. + let (map, ty) = { + if let Some(ty) = self.schema.target.fields.get(field_name) { + (&mut self.target_fields, ty) + } else if let Some(ty) = self.schema.metric.fields.get(field_name) { + (&mut self.metric_fields, ty) + } else { + return Err(()); + } + }; + let field_value = field_value.into(); + if field_value.field_type() != *ty { + return Err(()); + } + map.insert(field_name.to_string(), field_value); + Ok(self) + } + + fn build(self) -> Result { + for tf in self.schema.target.fields.keys() { + if !self.target_fields.contains_key(tf) { + return Err(()); + } + } + for tf in self.schema.metric.fields.keys() { + if !self.metric_fields.contains_key(tf) { + return Err(()); + } + } + let target = Target { + name: self.schema.target.name.clone(), + fields: self.target_fields, + }; + let metric = Metric { + name: self.schema.metric.name.clone(), + fields: self.metric_fields, + datum_type: self.schema.metric.datum_type, + datum: None, + }; + Ok(Timeseries { target, metric }) + } +} + +impl TimeseriesSchema { + pub fn builder(&self) -> TimeseriesBuilder { + TimeseriesBuilder { + schema: self, + target_fields: BTreeMap::new(), + metric_fields: BTreeMap::new(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_foo() { + FieldBuilder::new("oxide.rack.switch.transceiver") + .field("part", FieldType::String) + .field("vendor", FieldType::String) + .field("serial", FieldType::String) + .build_metric("temperature", DatumType::F64) + .unwrap(); + let target = FieldBuilder::new("target") + .unwrap() + .field("first", FieldType::U8) + .field("second", FieldType::IpAddr) + .build_target(); + + let metric = FieldBuilder::new("metric") + .unwrap() + .field("third", FieldType::U8) + .field("fourth", FieldType::IpAddr) + .build_metric(DatumType::I32); + + let timeseries_schema = TimeseriesSchema::new(target, metric).unwrap(); + let mut timeseries = timeseries_schema + .builder() + .field::("first", 0) + .unwrap() + .field( + "second", + std::net::IpAddr::from(std::net::Ipv4Addr::LOCALHOST), + ) + .unwrap() + .field("third", 1u8) + .unwrap() + .field( + "fourth", + std::net::IpAddr::from(std::net::Ipv4Addr::LOCALHOST), + ) + .unwrap() + .build() + .unwrap(); + + timeseries.metric.datum.replace(0i64.into()); + + println!("{:#?}", timeseries_schema); + println!("{:#?}", timeseries); + } +} diff --git a/oximeter/oximeter/src/lib.rs b/oximeter/oximeter/src/lib.rs index 1855762abe3..7f1dad258c0 100644 --- a/oximeter/oximeter/src/lib.rs +++ b/oximeter/oximeter/src/lib.rs @@ -109,6 +109,7 @@ extern crate self as oximeter; pub mod histogram; pub mod schema; +pub mod schemagen; pub mod test_util; pub mod traits; pub mod types; diff --git a/oximeter/oximeter/src/schema.rs b/oximeter/oximeter/src/schema.rs index 2a577fc8f1b..a747056c8b5 100644 --- a/oximeter/oximeter/src/schema.rs +++ b/oximeter/oximeter/src/schema.rs @@ -162,9 +162,23 @@ pub struct TimeseriesSchema { pub timeseries_name: TimeseriesName, pub field_schema: BTreeSet, pub datum_type: DatumType, + #[serde(default = "crate::schemagen::Units::none")] + pub units: crate::schemagen::Units, + #[serde(default = "default_version")] + pub version: u8, pub created: DateTime, } +const fn default_version() -> u8 { + 1 +} + +// TODO(ben) Don't need to do this conversion at all. +// +// Instead, the plan will be to read these schema at startup, generate a sample +// which refers to the target / metric names (as it currently does) and a +// version. Then we'll lookup the schema in the static schema library, no +// conversion needed (or the "conversion" will just be a lookup). impl From<&Sample> for TimeseriesSchema { fn from(sample: &Sample) -> Self { let timeseries_name = sample.timeseries_name.parse().unwrap(); @@ -186,7 +200,14 @@ impl From<&Sample> for TimeseriesSchema { field_schema.insert(schema); } let datum_type = sample.measurement.datum_type(); - Self { timeseries_name, field_schema, datum_type, created: Utc::now() } + Self { + timeseries_name, + field_schema, + datum_type, + units: crate::schemagen::Units::None, + version: default_version(), + created: Utc::now(), + } } } @@ -218,7 +239,14 @@ impl TimeseriesSchema { field_schema.insert(schema); } let datum_type = metric.datum_type(); - Self { timeseries_name, field_schema, datum_type, created: Utc::now() } + Self { + timeseries_name, + field_schema, + datum_type, + units: crate::schemagen::Units::None, + version: default_version(), + created: Utc::now(), + } } /// Construct a timeseries schema from a sample @@ -600,6 +628,8 @@ mod tests { timeseries_name, field_schema, datum_type, + units: crate::schemagen::Units::None, + version: 1, created: Utc::now(), }; diff --git a/oximeter/oximeter/src/schemagen.rs b/oximeter/oximeter/src/schemagen.rs new file mode 100644 index 00000000000..71d7ce25c26 --- /dev/null +++ b/oximeter/oximeter/src/schemagen.rs @@ -0,0 +1,577 @@ +// What are we trying to do? +// +// - Make it possible / safe to update schema +// - Provide some measure of consistency in naming, e.g. "project_id" and not +// "project_uuid" +// - Try to avoid breaking everything or dropping data in the process +// +// # Consistency +// +// Can provide a list of common fields and their types. New schema can embed or +// use them by name. This does introduce a problem -- changing one of those +// breaks any downstream schema that uses it. Maybe unavoidable? +// +// # Updating schema +// +// Need to support multiple versions concurrently. +// Adding new fields should be easiest. Then removing them. +// It has to be possible to make breaking changes, though we could require that +// this means losing old data if that makes things easier. +// +// # How do we keep history? +// +// If it's just in some file, then the developer could always delete the whole +// thing. Compare against checked in file somehow. Ordered list of the set of +// fields for each timeseries target and metric. Expectoration. +// +// # ClickHouse +// +// Need an index or ID on the schema versions. +// Need description and unit columns +// Make the schema table use `ReplacingMergeTree` engine, and just insert every +// schema (replace based on (name, ID) or maybe (name, field arrays) if possible) +// +// # Collector itself +// +// Remove local cache probably, it's been a fucking headache +// Read schema file on startup, and insert all of them +// Maybe API endpoints for fetching / updating this? Or just restart the service +// after copying in a new file... +// Check samples against those schema, never insert new ones at collection time +// +// +// # SEQUENCE +// +// - Add tools to write schema in text file format. Maybe TOML or RON, whatever. +// This needs to include an ordered sequence of the field names for targets and +// metrics (zipped together). Can add or remove fields. Cannot change the type +// of field (or add a field that _used_ to exist with a different type). +// +// - Describe some subset of the current schema in this format for testing / +// development +// +// - Decide how to identify a version of the schema. Index of target / metric? +// Probably a BCS-style hash of the name + field names / types (sorted by name) +// +// - Make minimal set of ClickHouse schema updates to support this. Change +// schema table engine to be a `ReplacingMergeTree`. The sorting key is probably +// timeseries name and then either the schema ID _or_ the entire nested table of +// fields, if that's possible. Not sure if other changes are also needed. +// +// - CONCURRENT WITH ABOVE: Change the collector to: +// - read / validate / insert the schema on startup +// - validate new samples against this (or against DB) and never insert new +// schema derived from the samples +// +// - Querying changes -- need to handle multiple versions, probably picking +// subset of the available schema that are consistent with the query? Not sure, +// this one is going to be tricky. + +// These below are the serialized forms. Need to convert into one version of +// `TargetSchema` and `MetricSchema` (together make a timeseries schema) +// +// Do we actually need to convert into the existing `TimeseriesSchema`? + +use crate::schema::FieldSchema; +use crate::schema::FieldSource; +use crate::DatumType; +use crate::FieldType; +use crate::TimeseriesName; +use chrono::Utc; +use serde::Deserialize; +use serde::Serialize; +use schemars::JsonSchema; +use std::cell::RefCell; +use std::collections::BTreeMap; +use std::collections::BTreeSet; +use std::rc::Rc; + +#[derive(Debug, Deserialize)] +#[serde(untagged)] +enum FieldInfo { + Removed { removed_in: u8 }, + Added { added_in: u8, fields: BTreeMap }, + Version(VersionedFields), +} + +#[derive(Debug, Deserialize)] +struct VersionedFields { + version: u8, + fields: BTreeMap, +} + +#[derive(Debug, Deserialize)] +struct TargetSchemaHistory { + name: String, + description: String, + fields: Vec, +} + +#[derive(Debug, Deserialize)] +struct MetricSchemaHistory { + name: String, + description: String, + fields: Vec, + units: Units, + datum_type: DatumType, +} + +#[derive(Debug, Deserialize)] +struct TimeseriesSchemaHistory { + target: TargetSchemaHistory, + metrics: Vec, +} + +#[derive(Clone, Copy, Debug)] +enum CurrentVersion { + Active { version: u8 }, + Inactive { last_active_version: u8 }, +} + +impl TimeseriesSchemaHistory { + fn into_schema( + self, + ) -> anyhow::Result> { + let mut out = Vec::with_capacity(self.metrics.len()); + + // All fields we've ever seen for this target. + // + // Fields can be added or removed, possibly many times. However, once a + // field has been added, it can only be added back with exactly the same + // type. I.e., we disallow changing the type of a field. + let mut all_known_target_fields: BTreeMap<&str, FieldType> = + BTreeMap::new(); + + // First, create a _map_ from target version to the fields. This is so + // that we can do O(lg n) lookups from the metric version. + let mut target_fields_by_version = BTreeMap::new(); + for (expected_version, target_fields) in + (1u8..).zip(self.target.fields.iter()) + { + anyhow::ensure!( + expected_version == target_fields.version, + "Target versions should be sequential and \ + monotonically increasing (expected {}, found {})", + expected_version, + target_fields.version, + ); + + // Ensure that the fields of the target itself are not changing type + // either. + for (field_name, field_type) in target_fields.fields.iter() { + if let Some(original_type) = all_known_target_fields + .insert(field_name.as_str(), *field_type) + { + anyhow::ensure!( + original_type == *field_type, + "Target field '{}' changed type from '{}' to '{}'", + field_name, + original_type, + field_type, + ); + } + } + + // Store these fields, checking this version is unique. + if target_fields_by_version + .insert(expected_version, target_fields) + .is_some() + { + anyhow::bail!("Target version {} is duplicated", expected_version); + } + } + + // We start by looping over all the metrics in the history. + for metric in self.metrics.iter() { + let metric_name = metric.name.as_str(); + + // Store the current version of the metric itself. These need not be + // sequential, but the do have to be monotonic, and match up with + // the versions of the target. + let mut current_version: Option = None; + + // Store the last used version of the target. We use this to support + // skipping unchanged metric versions in their history. + let mut last_target_version = 1; + + // All fields we've seen for this metric. + // + // Similar to targets, metrics can be added or removed, potentially + // multiple times. We allow this so long as the fields are never in + // _conflict_, meaning the type of a field changes. + let all_known_metric_fields = Rc::new(RefCell::new(BTreeMap::new())); + + // Iterate over each version of the metric. + // + // In general, we expect metrics to be added in the first version, + // modified by adding / removing fields, and possibly removed at the + // end. However, we do allow them to be added and removed multiple + // times (or "undeleted"). + for metric_fields in metric.fields.iter() { + let (new_version, maybe_new_fields) = extract_metric_fields( + metric_name, + metric_fields, + current_version, + )?; + + // TODO(ben): The below needs to check if the current target + // version is _after_ the last metric version, and fill in using + // the latest active version. I.e., we need to support omitting + // an unchanged metric version in the "middle" of the history. + + // Store our version and grab the fields (or continue if there + // aren't any). + let _ = current_version.insert(new_version); + let Some(fields) = maybe_new_fields else { + continue; + }; + + // At this point, we've ensured we have a set of fields, new or + // modified, and so a meaningful version of the timeseries + // schema. Do a few final sanity checks and then append the + // schema with the matching target version. Note that we support + // this for any target version, to allow omitting unchanged + // metrics in the history. + // + // TODO(ben): This "back-fills" from the newest set of metric + // fields. We need to take the last metric fields and + // "back-fill" with those until `last_target_version == + // version`. + let CurrentVersion::Active { version } = &new_version else { + unreachable!(); + }; + while last_target_version <= *version { + let Some(target_version) = + target_fields_by_version.get(&last_target_version) + else { + anyhow::bail!( + "Metric '{}' version {} does not have a \ + matching version in the target '{}'", + metric_name, + last_target_version, + self.target.name, + ); + }; + + let field_schema = construct_field_schema( + &metric_name, + &target_version.fields, + fields, + &all_known_target_fields, + all_known_metric_fields.clone(), + )?; + out.push(crate::TimeseriesSchema { + timeseries_name: TimeseriesName::try_from(format!( + "{}:{}", + self.target.name, metric_name + )) + .unwrap(), + field_schema, + datum_type: metric.datum_type, + units: metric.units, + version: last_target_version, + created: Utc::now(), + }); + last_target_version += 1; + } + } + + // We allow omitting later versions of metrics if they are + // unchanged. A target must specify every version, even if it's the + // same, but metrics need only specify differences. + // + // Look for any target versions strictly later than the last metric + // version, and create a corresponding target/metric pair for it. + if let Some(last_metric_fields) = metric.fields.last() { + match last_metric_fields { + FieldInfo::Removed { .. } => {}, + FieldInfo::Added { added_in: last_metric_version, fields } | + FieldInfo::Version(VersionedFields { version: last_metric_version, fields }) => + { + let next_version = last_metric_version + 1; + for (_, target_version) in target_fields_by_version.range(next_version..) { + let field_schema = construct_field_schema( + &metric_name, + &target_version.fields, + fields, + &all_known_target_fields, + all_known_metric_fields.clone(), + )?; + out.push(crate::schema::TimeseriesSchema { + timeseries_name: TimeseriesName::try_from(format!( + "{}:{}", + self.target.name, metric_name + )) + .unwrap(), + field_schema, + datum_type: metric.datum_type, + units: metric.units, + version: target_version.version, + created: Utc::now(), + }); + } + } + } + } + } + Ok(out) + } +} + +fn extract_metric_fields<'a>( + metric_name: &'a str, + metric_fields: &'a FieldInfo, + current_version: Option, +) -> anyhow::Result<(CurrentVersion, Option<&'a BTreeMap>)> { + let (new_version, new_fields) = match metric_fields { + FieldInfo::Removed { removed_in } => { + match current_version { + Some(CurrentVersion::Active { version }) => { + // This metric was active, and is now being + // removed. Bump the version and mark it active, + // but there are no fields to return here. + anyhow::ensure!( + *removed_in > version, + "Version for removal of metric '{}' \ + is not monotonically increasing \ + (expected at least {}, found {})", + metric_name, + version + 1, + removed_in, + ); + ( + CurrentVersion::Inactive { + last_active_version: *removed_in, + }, + None, + ) + } + Some(CurrentVersion::Inactive { + last_active_version, + }) => { + anyhow::bail!( + "Metric '{}' was already removed \ + in version {}, it cannot be removed again", + metric_name, + last_active_version, + ); + } + None => { + anyhow::bail!( + "Metric '{metric_name}' has no previous \ + version, it cannot be removed." + ); + } + } + } + FieldInfo::Added { added_in, fields } => { + match current_version { + Some(CurrentVersion::Active { .. }) => { + anyhow::bail!( + "Metric '{metric_name}' is already active, \ + it cannot be added again" + ); + } + Some(CurrentVersion::Inactive { + last_active_version, + }) => { + // The metric is currently inactive, just check + // that the newly-active version is greater. + anyhow::ensure!( + *added_in > last_active_version, + "Re-added metric '{}' must appear in a later \ + version than the last active ({})", + metric_name, + last_active_version, + ); + ( + CurrentVersion::Active { + version: *added_in, + }, + Some(fields), + ) + } + None => { + // There was no previous version for this + // metric, just add it. + ( + CurrentVersion::Active { + version: *added_in, + }, + Some(fields), + ) + } + } + } + FieldInfo::Version(new_fields) => { + match current_version { + Some(CurrentVersion::Active { version }) => { + // The happy-path, we're stepping the version + // and possibly modifying the fields. + anyhow::ensure!( + new_fields.version > version, + "Metric '{}' version should increment, \ + expected at least {}, found {}", + metric_name, + version + 1, + new_fields.version, + ); + ( + CurrentVersion::Active { + version: new_fields.version, + }, + Some(&new_fields.fields), + ) + } + Some(CurrentVersion::Inactive { + last_active_version, + }) => { + // The metric has been removed in the past, it + // needs to be added again first. + anyhow::bail!( + "Metric '{}' was last active at version {}, \ + it must be added again first", + metric_name, + last_active_version, + ); + } + None => { + // The metric never existed, it must be added + // first. + anyhow::bail!( + "Metric '{}' must be added in at its first + version, and can then be modified", + metric_name, + ); + } + } + } + }; + Ok((new_version, new_fields)) +} + +fn construct_field_schema<'a>( + metric_name: &'a str, + target_fields: &'a BTreeMap, + metric_fields: &'a BTreeMap, + all_known_target_fields: &'a BTreeMap<&'a str, FieldType>, + all_known_metric_fields: Rc>> +) -> anyhow::Result> { + let mut field_schema: BTreeSet<_> = target_fields + .iter() + .map(|(field_name, field_type)| FieldSchema { + name: field_name.to_string(), + field_type: *field_type, + source: FieldSource::Target, + }) + .collect(); + for (field_name, field_type) in metric_fields.iter() { + // Ensure that the fields are all consistent with any past + // version of the same name. + if let Some(existing_type) = all_known_metric_fields + .borrow_mut() + .insert(field_name.as_str(), *field_type) + { + anyhow::ensure!( + existing_type == *field_type, + "Metric '{}' field '{}' changed \ + type from '{}' to '{}'", + metric_name, + field_name, + existing_type, + field_type, + ); + } + + // Targets and metrics must have unique field names. + if all_known_target_fields.contains_key(field_name.as_str()) + { + anyhow::bail!( + "Field '{}' cannot appear in the metric \ + '{}', it is already a target field", + field_name, + metric_name, + ); + } + + field_schema.insert(FieldSchema { + name: field_name.to_string(), + field_type: *field_type, + source: FieldSource::Metric, + }); + } + Ok(field_schema) +} + +#[derive(Clone, Debug)] +struct SchemaDiff<'src> { + pub first_version: u8, + pub second_version: u8, + pub added: BTreeSet<&'src FieldSchema>, + pub removed: BTreeSet<&'src FieldSchema>, +} + +impl<'src> SchemaDiff<'src> { + pub fn new( + first: &'src crate::schema::TimeseriesSchema, + second: &'src crate::schema::TimeseriesSchema, + ) -> Self { + assert_eq!(first.timeseries_name, second.timeseries_name); + assert!(first.version < second.version); + let added = + second.field_schema.difference(&first.field_schema).collect(); + let removed = + first.field_schema.difference(&second.field_schema).collect(); + Self { + first_version: first.version, + second_version: second.version, + added, + removed + } + } +} + +#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, JsonSchema)] +#[serde(rename_all = "snake_case")] +pub enum Units { + #[default] + None, + Count, + Bytes, + Seconds, + Amps, + Volts, + DegreesCelsius, +} + +impl Units { + pub const fn none() -> Self { + Self::None + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_it() { + const FILE: &str = + concat!(env!("CARGO_MANIFEST_DIR"), "/../../foo.toml"); + println!("{}", FILE); + let contents = std::fs::read_to_string(FILE).unwrap(); + let out: TimeseriesSchemaHistory = toml::from_str(&contents).unwrap(); + let schema = out.into_schema().unwrap(); + let dl = schema.iter().filter(|s| s.timeseries_name == "physical_data_link:bytes_sent").collect::>(); + println!("{dl:#?}"); + println!(); + println!("{:#?}", dl.first().unwrap()); + for (first, second) in dl.iter().zip(dl.iter().skip(1)) { + let diff = SchemaDiff::new(first, second); + println!("{} -> {}", diff.first_version, diff.second_version); + println!("removed: {:#?}", diff.removed); + println!("added: {:#?}", diff.added); + println!(); + } + } +}