diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs index ffa5d97d52e..63a07f8f1a2 100644 --- a/oximeter/db/src/client.rs +++ b/oximeter/db/src/client.rs @@ -35,7 +35,7 @@ use std::collections::BTreeSet; use std::convert::TryFrom; use std::net::SocketAddr; use std::num::NonZeroU32; -use std::sync::Mutex; +use tokio::sync::Mutex; use uuid::Uuid; #[usdt::provider(provider = "clickhouse__client")] @@ -208,16 +208,12 @@ impl Client { &self, name: &TimeseriesName, ) -> Result, Error> { - { - let map = self.schema.lock().unwrap(); - if let Some(s) = map.get(name) { - return Ok(Some(s.clone())); - } + let mut schema = self.schema.lock().await; + if let Some(s) = schema.get(name) { + return Ok(Some(s.clone())); } - // `get_schema` acquires the lock internally, so the above scope is required to avoid - // deadlock. - self.get_schema().await?; - Ok(self.schema.lock().unwrap().get(name).map(Clone::clone)) + self.get_schema_locked(&mut schema).await?; + Ok(schema.get(name).map(Clone::clone)) } /// List timeseries schema, paginated. @@ -384,30 +380,38 @@ impl Client { &self, sample: &Sample, ) -> Result, Error> { - let schema = model::schema_for(sample); - let name = schema.timeseries_name.clone(); - let maybe_new_schema = match self.schema.lock().unwrap().entry(name) { - Entry::Vacant(entry) => Ok(Some(entry.insert(schema).clone())), + let sample_schema = model::schema_for(sample); + let name = sample_schema.timeseries_name.clone(); + let mut schema = self.schema.lock().await; + match schema.entry(name) { Entry::Occupied(entry) => { let existing_schema = entry.get(); - if existing_schema == &schema { + if existing_schema == &sample_schema { Ok(None) } else { - let err = - error_for_schema_mismatch(&schema, &existing_schema); error!( self.log, - "timeseries schema mismatch, sample will be skipped: {}", - err + "timeseries schema mismatch, sample will be skipped"; + "expected" => ?existing_schema, + "actual" => ?sample_schema, + "sample" => ?sample, ); - Err(err) + Err(Error::SchemaMismatch { + expected: existing_schema.clone(), + actual: sample_schema, + }) } } - }?; - Ok(maybe_new_schema.map(|schema| { - serde_json::to_string(&model::DbTimeseriesSchema::from(schema)) - .expect("Failed to convert schema to DB model") - })) + Entry::Vacant(entry) => { + entry.insert(sample_schema.clone()); + Ok(Some( + serde_json::to_string(&model::DbTimeseriesSchema::from( + sample_schema, + )) + .expect("Failed to convert schema to DB model"), + )) + } + } } // Select the timeseries, including keys and field values, that match the given field-selection @@ -503,10 +507,12 @@ impl Client { response } - async fn get_schema(&self) -> Result<(), Error> { + async fn get_schema_locked( + &self, + schema: &mut BTreeMap, + ) -> Result<(), Error> { debug!(self.log, "retrieving timeseries schema from database"); let sql = { - let schema = self.schema.lock().unwrap(); if schema.is_empty() { format!( "SELECT * FROM {db_name}.timeseries_schema FORMAT JSONEachRow;", @@ -545,7 +551,7 @@ impl Client { ); (schema.timeseries_name.clone(), schema) }); - self.schema.lock().unwrap().extend(new); + schema.extend(new); } Ok(()) } @@ -730,28 +736,6 @@ async fn handle_db_response( } } -// Generate an error describing a schema mismatch -fn error_for_schema_mismatch( - schema: &TimeseriesSchema, - existing_schema: &TimeseriesSchema, -) -> Error { - let expected = existing_schema - .field_schema - .iter() - .map(|field| (field.name.clone(), field.ty)) - .collect(); - let actual = schema - .field_schema - .iter() - .map(|field| (field.name.clone(), field.ty)) - .collect(); - Error::SchemaMismatch { - name: schema.timeseries_name.to_string(), - expected, - actual, - } -} - #[cfg(test)] mod tests { use super::*; @@ -1599,7 +1583,7 @@ mod tests { ); // Clear the internal caches of seen schema - client.schema.lock().unwrap().clear(); + client.schema.lock().await.clear(); // Insert the new sample client.insert_samples(&[sample.clone()]).await.unwrap(); @@ -1611,7 +1595,7 @@ mod tests { let expected_schema = client .schema .lock() - .unwrap() + .await .get(×eries_name) .expect( "After inserting a new sample, its schema should be included", @@ -2484,13 +2468,13 @@ mod tests { #[tokio::test] async fn test_get_schema_no_new_values() { let (mut db, client, _) = setup_filter_testcase().await; - let schema = &client.schema.lock().unwrap().clone(); - client.get_schema().await.expect("Failed to get timeseries schema"); - assert_eq!( - schema, - &*client.schema.lock().unwrap(), - "Schema shouldn't change" - ); + let original_schema = client.schema.lock().await.clone(); + let mut schema = client.schema.lock().await; + client + .get_schema_locked(&mut schema) + .await + .expect("Failed to get timeseries schema"); + assert_eq!(&original_schema, &*schema, "Schema shouldn't change"); db.cleanup().await.expect("Failed to cleanup database"); } diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index c878b8ff2a2..11ecbeddc87 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -4,7 +4,7 @@ //! Tools for interacting with the control plane telemetry database. -// Copyright 2021 Oxide Computer Company +// Copyright 2023 Oxide Computer Company use crate::query::StringFieldSelector; use chrono::{DateTime, Utc}; @@ -13,6 +13,7 @@ pub use oximeter::{DatumType, Field, FieldType, Measurement, Sample}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; +use std::collections::BTreeSet; use std::convert::TryFrom; use std::num::NonZeroU32; use thiserror::Error; @@ -36,12 +37,8 @@ pub enum Error { Database(String), /// A schema provided when collecting samples did not match the expected schema - #[error("Schema mismatch for timeseries '{name}', expected fields {expected:?} found fields {actual:?}")] - SchemaMismatch { - name: String, - expected: BTreeMap, - actual: BTreeMap, - }, + #[error("Schema mismatch for timeseries '{0}'", expected.timeseries_name)] + SchemaMismatch { expected: TimeseriesSchema, actual: TimeseriesSchema }, #[error("Timeseries not found for: {0}")] TimeseriesNotFound(String), @@ -153,6 +150,13 @@ impl std::convert::TryFrom for TimeseriesName { } } +impl std::str::FromStr for TimeseriesName { + type Err = Error; + fn from_str(s: &str) -> Result { + s.try_into() + } +} + impl PartialEq for TimeseriesName where T: AsRef, @@ -177,7 +181,7 @@ fn validate_timeseries_name(s: &str) -> Result<&str, Error> { #[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)] pub struct TimeseriesSchema { pub timeseries_name: TimeseriesName, - pub field_schema: Vec, + pub field_schema: BTreeSet, pub datum_type: DatumType, pub created: DateTime, } @@ -398,6 +402,8 @@ const TIMESERIES_NAME_REGEX: &str = #[cfg(test)] mod tests { use super::*; + use crate::model::DbFieldList; + use crate::model::DbTimeseriesSchema; use std::convert::TryFrom; use uuid::Uuid; @@ -505,4 +511,122 @@ mod tests { &output.join("\n"), ); } + + // Test that we correctly order field across a target and metric. + // + // In an earlier commit, we switched from storing fields in an unordered Vec + // to using a BTree{Map,Set} to ensure ordering by name. However, the + // `TimeseriesSchema` type stored all its fields by chaining the sorted + // fields from the target and metric, without then sorting _across_ them. + // + // This was exacerbated by the error reporting, where we did in fact sort + // all fields across the target and metric, making it difficult to tell how + // the derived schema was different, if at all. + // + // This test generates a sample with a schema where the target and metric + // fields are sorted within them, but not across them. We check that the + // derived schema are actually equal, which means we've imposed that + // ordering when deriving the schema. + #[test] + fn test_schema_field_ordering_across_target_metric() { + let target_field = FieldSchema { + name: String::from("later"), + ty: FieldType::U64, + source: FieldSource::Target, + }; + let metric_field = FieldSchema { + name: String::from("earlier"), + ty: FieldType::U64, + source: FieldSource::Metric, + }; + let timeseries_name: TimeseriesName = "foo:bar".parse().unwrap(); + let datum_type = DatumType::U64; + let field_schema = + [target_field.clone(), metric_field.clone()].into_iter().collect(); + let expected_schema = TimeseriesSchema { + timeseries_name, + field_schema, + datum_type, + created: Utc::now(), + }; + + #[derive(oximeter::Target)] + struct Foo { + later: u64, + } + #[derive(oximeter::Metric)] + struct Bar { + earlier: u64, + datum: u64, + } + + let target = Foo { later: 1 }; + let metric = Bar { earlier: 2, datum: 10 }; + let sample = Sample::new(&target, &metric).unwrap(); + let derived_schema = model::schema_for(&sample); + assert_eq!(derived_schema, expected_schema); + } + + #[test] + fn test_unsorted_db_fields_are_sorted_on_read() { + let target_field = FieldSchema { + name: String::from("later"), + ty: FieldType::U64, + source: FieldSource::Target, + }; + let metric_field = FieldSchema { + name: String::from("earlier"), + ty: FieldType::U64, + source: FieldSource::Metric, + }; + let timeseries_name: TimeseriesName = "foo:bar".parse().unwrap(); + let datum_type = DatumType::U64; + let field_schema = + [target_field.clone(), metric_field.clone()].into_iter().collect(); + let expected_schema = TimeseriesSchema { + timeseries_name: timeseries_name.clone(), + field_schema, + datum_type, + created: Utc::now(), + }; + + // The fields here are sorted by target and then metric, which is how we + // used to insert them into the DB. We're checking that they are totally + // sorted when we read them out of the DB, even though they are not in + // the extracted model type. + let db_fields = DbFieldList { + names: vec![target_field.name.clone(), metric_field.name.clone()], + types: vec![target_field.ty.into(), metric_field.ty.into()], + sources: vec![ + target_field.source.into(), + metric_field.source.into(), + ], + }; + let db_schema = DbTimeseriesSchema { + timeseries_name: timeseries_name.to_string(), + field_schema: db_fields, + datum_type: datum_type.into(), + created: expected_schema.created, + }; + assert_eq!(expected_schema, TimeseriesSchema::from(db_schema)); + } + + #[test] + fn test_field_schema_ordering() { + let mut fields = BTreeSet::new(); + fields.insert(FieldSchema { + name: String::from("second"), + ty: FieldType::U64, + source: FieldSource::Target, + }); + fields.insert(FieldSchema { + name: String::from("first"), + ty: FieldType::U64, + source: FieldSource::Target, + }); + let mut iter = fields.iter(); + assert_eq!(iter.next().unwrap().name, "first"); + assert_eq!(iter.next().unwrap().name, "second"); + assert!(iter.next().is_none()); + } } diff --git a/oximeter/db/src/model.rs b/oximeter/db/src/model.rs index 1314c5c6494..7f5b150b469 100644 --- a/oximeter/db/src/model.rs +++ b/oximeter/db/src/model.rs @@ -30,6 +30,7 @@ use oximeter::types::Sample; use serde::Deserialize; use serde::Serialize; use std::collections::BTreeMap; +use std::collections::BTreeSet; use std::convert::TryFrom; use std::net::IpAddr; use std::net::Ipv6Addr; @@ -107,7 +108,7 @@ pub(crate) struct DbFieldList { pub sources: Vec, } -impl From for Vec { +impl From for BTreeSet { fn from(list: DbFieldList) -> Self { list.names .into_iter() @@ -122,8 +123,8 @@ impl From for Vec { } } -impl From> for DbFieldList { - fn from(list: Vec) -> Self { +impl From> for DbFieldList { + fn from(list: BTreeSet) -> Self { let mut names = Vec::with_capacity(list.len()); let mut types = Vec::with_capacity(list.len()); let mut sources = Vec::with_capacity(list.len()); @@ -914,6 +915,9 @@ pub(crate) fn unroll_measurement_row(sample: &Sample) -> (String, String) { /// Return the schema for a `Sample`. pub(crate) fn schema_for(sample: &Sample) -> TimeseriesSchema { + // The fields are iterated through whatever order the `Target` or `Metric` + // impl chooses. We'll store in a set ordered by field name, to ignore the + // declaration order. let created = Utc::now(); let field_schema = sample .target_fields() @@ -1403,7 +1407,7 @@ mod tests { sources: vec![DbFieldSource::Target, DbFieldSource::Metric], }; - let list = vec![ + let list: BTreeSet<_> = [ FieldSchema { name: String::from("field0"), ty: FieldType::I64, @@ -1414,11 +1418,13 @@ mod tests { ty: FieldType::IpAddr, source: FieldSource::Metric, }, - ]; + ] + .into_iter() + .collect(); assert_eq!(DbFieldList::from(list.clone()), db_list); assert_eq!(db_list, list.clone().into()); - let round_trip: Vec = + let round_trip: BTreeSet = DbFieldList::from(list.clone()).into(); assert_eq!(round_trip, list); } diff --git a/oximeter/db/src/query.rs b/oximeter/db/src/query.rs index e9e16007393..6a55d3f5181 100644 --- a/oximeter/db/src/query.rs +++ b/oximeter/db/src/query.rs @@ -721,6 +721,7 @@ mod tests { use crate::FieldSource; use crate::TimeseriesName; use chrono::NaiveDateTime; + use std::collections::BTreeSet; use std::convert::TryFrom; #[test] @@ -774,7 +775,7 @@ mod tests { fn test_select_query_builder_filter_raw() { let schema = TimeseriesSchema { timeseries_name: TimeseriesName::try_from("foo:bar").unwrap(), - field_schema: vec![ + field_schema: [ FieldSchema { name: "f0".to_string(), ty: FieldType::I64, @@ -785,7 +786,9 @@ mod tests { ty: FieldType::Bool, source: FieldSource::Target, }, - ], + ] + .into_iter() + .collect(), datum_type: DatumType::I64, created: Utc::now(), }; @@ -905,7 +908,7 @@ mod tests { fn test_select_query_builder_no_fields() { let schema = TimeseriesSchema { timeseries_name: TimeseriesName::try_from("foo:bar").unwrap(), - field_schema: vec![], + field_schema: BTreeSet::new(), datum_type: DatumType::I64, created: Utc::now(), }; @@ -927,7 +930,7 @@ mod tests { fn test_select_query_builder_limit_offset() { let schema = TimeseriesSchema { timeseries_name: TimeseriesName::try_from("foo:bar").unwrap(), - field_schema: vec![], + field_schema: BTreeSet::new(), datum_type: DatumType::I64, created: Utc::now(), }; @@ -996,7 +999,7 @@ mod tests { fn test_select_query_builder_no_selectors() { let schema = TimeseriesSchema { timeseries_name: TimeseriesName::try_from("foo:bar").unwrap(), - field_schema: vec![ + field_schema: [ FieldSchema { name: "f0".to_string(), ty: FieldType::I64, @@ -1007,7 +1010,9 @@ mod tests { ty: FieldType::Bool, source: FieldSource::Target, }, - ], + ] + .into_iter() + .collect(), datum_type: DatumType::I64, created: Utc::now(), }; @@ -1057,7 +1062,7 @@ mod tests { fn test_select_query_builder_field_selectors() { let schema = TimeseriesSchema { timeseries_name: TimeseriesName::try_from("foo:bar").unwrap(), - field_schema: vec![ + field_schema: [ FieldSchema { name: "f0".to_string(), ty: FieldType::I64, @@ -1068,7 +1073,9 @@ mod tests { ty: FieldType::Bool, source: FieldSource::Target, }, - ], + ] + .into_iter() + .collect(), datum_type: DatumType::I64, created: Utc::now(), }; @@ -1106,7 +1113,7 @@ mod tests { fn test_select_query_builder_full() { let schema = TimeseriesSchema { timeseries_name: TimeseriesName::try_from("foo:bar").unwrap(), - field_schema: vec![ + field_schema: [ FieldSchema { name: "f0".to_string(), ty: FieldType::I64, @@ -1117,7 +1124,9 @@ mod tests { ty: FieldType::Bool, source: FieldSource::Target, }, - ], + ] + .into_iter() + .collect(), datum_type: DatumType::I64, created: Utc::now(), };