From 1a4fe9e23315bbd616347815a4dde1f4096cb3f7 Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Sat, 21 Oct 2023 22:26:17 +0000 Subject: [PATCH] Sort fields when extracting timeseries schema - Fields are reported in a sample sorted within a target or metric, but not necessarily between them. When we derive a schema from a sample, this commit collects fields into a set to impose a total order. - Convert between a `DbFieldList` and `BTreeSet` when inserting / reading fields from the nested tables in ClickHouse. - Add sanity test that we're sorting field schema correctly, including on read from the database. - Errors for schema mismatches report entire schema, not just fields. - Logic around new schema was pretty complicated, and it was difficult to reason about its correctness. Make the lock async, and check both the internal cache and database when looking for a schema for a new sample. - Add test that we don't add a schema to the DB again when it exists in the DB, but not the internal cache. Instead, we update the cache. --- oximeter/db/src/client.rs | 161 ++++++++++++++++++++++++-------------- oximeter/db/src/lib.rs | 140 +++++++++++++++++++++++++++++++-- oximeter/db/src/model.rs | 18 +++-- oximeter/db/src/query.rs | 29 ++++--- 4 files changed, 264 insertions(+), 84 deletions(-) diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs index ffa5d97d52e..df695bbf8ad 100644 --- a/oximeter/db/src/client.rs +++ b/oximeter/db/src/client.rs @@ -35,7 +35,7 @@ use std::collections::BTreeSet; use std::convert::TryFrom; use std::net::SocketAddr; use std::num::NonZeroU32; -use std::sync::Mutex; +use tokio::sync::Mutex; use uuid::Uuid; #[usdt::provider(provider = "clickhouse__client")] @@ -208,16 +208,12 @@ impl Client { &self, name: &TimeseriesName, ) -> Result, Error> { - { - let map = self.schema.lock().unwrap(); - if let Some(s) = map.get(name) { - return Ok(Some(s.clone())); - } + let mut schema = self.schema.lock().await; + if let Some(s) = schema.get(name) { + return Ok(Some(s.clone())); } - // `get_schema` acquires the lock internally, so the above scope is required to avoid - // deadlock. - self.get_schema().await?; - Ok(self.schema.lock().unwrap().get(name).map(Clone::clone)) + self.get_schema_locked(&mut schema).await?; + Ok(schema.get(name).map(Clone::clone)) } /// List timeseries schema, paginated. @@ -384,30 +380,47 @@ impl Client { &self, sample: &Sample, ) -> Result, Error> { - let schema = model::schema_for(sample); - let name = schema.timeseries_name.clone(); - let maybe_new_schema = match self.schema.lock().unwrap().entry(name) { - Entry::Vacant(entry) => Ok(Some(entry.insert(schema).clone())), + let sample_schema = model::schema_for(sample); + let name = sample_schema.timeseries_name.clone(); + let mut schema = self.schema.lock().await; + + // We need to possibly check that this schema is in the local cache, or + // in the database, all while we hold the lock to ensure there's no + // concurrent additions. This containment check is needed so that we + // check both the local cache and the database, to avoid adding a schema + // a second time. + if !schema.contains_key(&name) { + self.get_schema_locked(&mut schema).await?; + } + match schema.entry(name) { Entry::Occupied(entry) => { let existing_schema = entry.get(); - if existing_schema == &schema { + if existing_schema == &sample_schema { Ok(None) } else { - let err = - error_for_schema_mismatch(&schema, &existing_schema); error!( self.log, - "timeseries schema mismatch, sample will be skipped: {}", - err + "timeseries schema mismatch, sample will be skipped"; + "expected" => ?existing_schema, + "actual" => ?sample_schema, + "sample" => ?sample, ); - Err(err) + Err(Error::SchemaMismatch { + expected: existing_schema.clone(), + actual: sample_schema, + }) } } - }?; - Ok(maybe_new_schema.map(|schema| { - serde_json::to_string(&model::DbTimeseriesSchema::from(schema)) - .expect("Failed to convert schema to DB model") - })) + Entry::Vacant(entry) => { + entry.insert(sample_schema.clone()); + Ok(Some( + serde_json::to_string(&model::DbTimeseriesSchema::from( + sample_schema, + )) + .expect("Failed to convert schema to DB model"), + )) + } + } } // Select the timeseries, including keys and field values, that match the given field-selection @@ -503,10 +516,12 @@ impl Client { response } - async fn get_schema(&self) -> Result<(), Error> { + async fn get_schema_locked( + &self, + schema: &mut BTreeMap, + ) -> Result<(), Error> { debug!(self.log, "retrieving timeseries schema from database"); let sql = { - let schema = self.schema.lock().unwrap(); if schema.is_empty() { format!( "SELECT * FROM {db_name}.timeseries_schema FORMAT JSONEachRow;", @@ -545,7 +560,7 @@ impl Client { ); (schema.timeseries_name.clone(), schema) }); - self.schema.lock().unwrap().extend(new); + schema.extend(new); } Ok(()) } @@ -593,7 +608,7 @@ impl DbWrite for Client { } Ok(schema) => { if let Some(schema) = schema { - debug!(self.log, "new timeseries schema: {:?}", schema); + debug!(self.log, "new timeseries schema"; "schema" => ?schema); new_schema.push(schema); } } @@ -730,28 +745,6 @@ async fn handle_db_response( } } -// Generate an error describing a schema mismatch -fn error_for_schema_mismatch( - schema: &TimeseriesSchema, - existing_schema: &TimeseriesSchema, -) -> Error { - let expected = existing_schema - .field_schema - .iter() - .map(|field| (field.name.clone(), field.ty)) - .collect(); - let actual = schema - .field_schema - .iter() - .map(|field| (field.name.clone(), field.ty)) - .collect(); - Error::SchemaMismatch { - name: schema.timeseries_name.to_string(), - expected, - actual, - } -} - #[cfg(test)] mod tests { use super::*; @@ -1599,7 +1592,7 @@ mod tests { ); // Clear the internal caches of seen schema - client.schema.lock().unwrap().clear(); + client.schema.lock().await.clear(); // Insert the new sample client.insert_samples(&[sample.clone()]).await.unwrap(); @@ -1611,7 +1604,7 @@ mod tests { let expected_schema = client .schema .lock() - .unwrap() + .await .get(×eries_name) .expect( "After inserting a new sample, its schema should be included", @@ -2484,13 +2477,13 @@ mod tests { #[tokio::test] async fn test_get_schema_no_new_values() { let (mut db, client, _) = setup_filter_testcase().await; - let schema = &client.schema.lock().unwrap().clone(); - client.get_schema().await.expect("Failed to get timeseries schema"); - assert_eq!( - schema, - &*client.schema.lock().unwrap(), - "Schema shouldn't change" - ); + let original_schema = client.schema.lock().await.clone(); + let mut schema = client.schema.lock().await; + client + .get_schema_locked(&mut schema) + .await + .expect("Failed to get timeseries schema"); + assert_eq!(&original_schema, &*schema, "Schema shouldn't change"); db.cleanup().await.expect("Failed to cleanup database"); } @@ -2585,4 +2578,52 @@ mod tests { ); db.cleanup().await.expect("Failed to cleanup database"); } + + #[tokio::test] + async fn test_update_schema_cache_on_new_sample() { + usdt::register_probes().unwrap(); + let logctx = test_setup_log("test_update_schema_cache_on_new_sample"); + let log = &logctx.log; + + // Let the OS assign a port and discover it after ClickHouse starts + let mut db = ClickHouseInstance::new_single_node(0) + .await + .expect("Failed to start ClickHouse"); + let address = SocketAddr::new("::1".parse().unwrap(), db.port()); + + let client = Client::new(address, &log); + client + .init_single_node_db() + .await + .expect("Failed to initialize timeseries database"); + let samples = [test_util::make_sample()]; + client.insert_samples(&samples).await.unwrap(); + + // Get the count of schema directly from the DB, which should have just + // one. + let response = client.execute_with_body( + "SELECT COUNT() FROM oximeter.timeseries_schema FORMAT JSONEachRow; + ").await.unwrap(); + assert_eq!(response.lines().count(), 1, "Expected exactly 1 schema"); + assert_eq!(client.schema.lock().await.len(), 1); + + // Clear the internal cache, and insert the sample again. + // + // This should cause us to look up the schema in the DB again, but _not_ + // insert a new one. + client.schema.lock().await.clear(); + assert!(client.schema.lock().await.is_empty()); + + client.insert_samples(&samples).await.unwrap(); + + // Get the count of schema directly from the DB, which should still have + // only the one schema. + let response = client.execute_with_body( + "SELECT COUNT() FROM oximeter.timeseries_schema FORMAT JSONEachRow; + ").await.unwrap(); + assert_eq!(response.lines().count(), 1, "Expected exactly 1 schema again"); + assert_eq!(client.schema.lock().await.len(), 1); + db.cleanup().await.expect("Failed to cleanup ClickHouse server"); + logctx.cleanup_successful(); + } } diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index c878b8ff2a2..11ecbeddc87 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -4,7 +4,7 @@ //! Tools for interacting with the control plane telemetry database. -// Copyright 2021 Oxide Computer Company +// Copyright 2023 Oxide Computer Company use crate::query::StringFieldSelector; use chrono::{DateTime, Utc}; @@ -13,6 +13,7 @@ pub use oximeter::{DatumType, Field, FieldType, Measurement, Sample}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; +use std::collections::BTreeSet; use std::convert::TryFrom; use std::num::NonZeroU32; use thiserror::Error; @@ -36,12 +37,8 @@ pub enum Error { Database(String), /// A schema provided when collecting samples did not match the expected schema - #[error("Schema mismatch for timeseries '{name}', expected fields {expected:?} found fields {actual:?}")] - SchemaMismatch { - name: String, - expected: BTreeMap, - actual: BTreeMap, - }, + #[error("Schema mismatch for timeseries '{0}'", expected.timeseries_name)] + SchemaMismatch { expected: TimeseriesSchema, actual: TimeseriesSchema }, #[error("Timeseries not found for: {0}")] TimeseriesNotFound(String), @@ -153,6 +150,13 @@ impl std::convert::TryFrom for TimeseriesName { } } +impl std::str::FromStr for TimeseriesName { + type Err = Error; + fn from_str(s: &str) -> Result { + s.try_into() + } +} + impl PartialEq for TimeseriesName where T: AsRef, @@ -177,7 +181,7 @@ fn validate_timeseries_name(s: &str) -> Result<&str, Error> { #[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)] pub struct TimeseriesSchema { pub timeseries_name: TimeseriesName, - pub field_schema: Vec, + pub field_schema: BTreeSet, pub datum_type: DatumType, pub created: DateTime, } @@ -398,6 +402,8 @@ const TIMESERIES_NAME_REGEX: &str = #[cfg(test)] mod tests { use super::*; + use crate::model::DbFieldList; + use crate::model::DbTimeseriesSchema; use std::convert::TryFrom; use uuid::Uuid; @@ -505,4 +511,122 @@ mod tests { &output.join("\n"), ); } + + // Test that we correctly order field across a target and metric. + // + // In an earlier commit, we switched from storing fields in an unordered Vec + // to using a BTree{Map,Set} to ensure ordering by name. However, the + // `TimeseriesSchema` type stored all its fields by chaining the sorted + // fields from the target and metric, without then sorting _across_ them. + // + // This was exacerbated by the error reporting, where we did in fact sort + // all fields across the target and metric, making it difficult to tell how + // the derived schema was different, if at all. + // + // This test generates a sample with a schema where the target and metric + // fields are sorted within them, but not across them. We check that the + // derived schema are actually equal, which means we've imposed that + // ordering when deriving the schema. + #[test] + fn test_schema_field_ordering_across_target_metric() { + let target_field = FieldSchema { + name: String::from("later"), + ty: FieldType::U64, + source: FieldSource::Target, + }; + let metric_field = FieldSchema { + name: String::from("earlier"), + ty: FieldType::U64, + source: FieldSource::Metric, + }; + let timeseries_name: TimeseriesName = "foo:bar".parse().unwrap(); + let datum_type = DatumType::U64; + let field_schema = + [target_field.clone(), metric_field.clone()].into_iter().collect(); + let expected_schema = TimeseriesSchema { + timeseries_name, + field_schema, + datum_type, + created: Utc::now(), + }; + + #[derive(oximeter::Target)] + struct Foo { + later: u64, + } + #[derive(oximeter::Metric)] + struct Bar { + earlier: u64, + datum: u64, + } + + let target = Foo { later: 1 }; + let metric = Bar { earlier: 2, datum: 10 }; + let sample = Sample::new(&target, &metric).unwrap(); + let derived_schema = model::schema_for(&sample); + assert_eq!(derived_schema, expected_schema); + } + + #[test] + fn test_unsorted_db_fields_are_sorted_on_read() { + let target_field = FieldSchema { + name: String::from("later"), + ty: FieldType::U64, + source: FieldSource::Target, + }; + let metric_field = FieldSchema { + name: String::from("earlier"), + ty: FieldType::U64, + source: FieldSource::Metric, + }; + let timeseries_name: TimeseriesName = "foo:bar".parse().unwrap(); + let datum_type = DatumType::U64; + let field_schema = + [target_field.clone(), metric_field.clone()].into_iter().collect(); + let expected_schema = TimeseriesSchema { + timeseries_name: timeseries_name.clone(), + field_schema, + datum_type, + created: Utc::now(), + }; + + // The fields here are sorted by target and then metric, which is how we + // used to insert them into the DB. We're checking that they are totally + // sorted when we read them out of the DB, even though they are not in + // the extracted model type. + let db_fields = DbFieldList { + names: vec![target_field.name.clone(), metric_field.name.clone()], + types: vec![target_field.ty.into(), metric_field.ty.into()], + sources: vec![ + target_field.source.into(), + metric_field.source.into(), + ], + }; + let db_schema = DbTimeseriesSchema { + timeseries_name: timeseries_name.to_string(), + field_schema: db_fields, + datum_type: datum_type.into(), + created: expected_schema.created, + }; + assert_eq!(expected_schema, TimeseriesSchema::from(db_schema)); + } + + #[test] + fn test_field_schema_ordering() { + let mut fields = BTreeSet::new(); + fields.insert(FieldSchema { + name: String::from("second"), + ty: FieldType::U64, + source: FieldSource::Target, + }); + fields.insert(FieldSchema { + name: String::from("first"), + ty: FieldType::U64, + source: FieldSource::Target, + }); + let mut iter = fields.iter(); + assert_eq!(iter.next().unwrap().name, "first"); + assert_eq!(iter.next().unwrap().name, "second"); + assert!(iter.next().is_none()); + } } diff --git a/oximeter/db/src/model.rs b/oximeter/db/src/model.rs index 1314c5c6494..7f5b150b469 100644 --- a/oximeter/db/src/model.rs +++ b/oximeter/db/src/model.rs @@ -30,6 +30,7 @@ use oximeter::types::Sample; use serde::Deserialize; use serde::Serialize; use std::collections::BTreeMap; +use std::collections::BTreeSet; use std::convert::TryFrom; use std::net::IpAddr; use std::net::Ipv6Addr; @@ -107,7 +108,7 @@ pub(crate) struct DbFieldList { pub sources: Vec, } -impl From for Vec { +impl From for BTreeSet { fn from(list: DbFieldList) -> Self { list.names .into_iter() @@ -122,8 +123,8 @@ impl From for Vec { } } -impl From> for DbFieldList { - fn from(list: Vec) -> Self { +impl From> for DbFieldList { + fn from(list: BTreeSet) -> Self { let mut names = Vec::with_capacity(list.len()); let mut types = Vec::with_capacity(list.len()); let mut sources = Vec::with_capacity(list.len()); @@ -914,6 +915,9 @@ pub(crate) fn unroll_measurement_row(sample: &Sample) -> (String, String) { /// Return the schema for a `Sample`. pub(crate) fn schema_for(sample: &Sample) -> TimeseriesSchema { + // The fields are iterated through whatever order the `Target` or `Metric` + // impl chooses. We'll store in a set ordered by field name, to ignore the + // declaration order. let created = Utc::now(); let field_schema = sample .target_fields() @@ -1403,7 +1407,7 @@ mod tests { sources: vec![DbFieldSource::Target, DbFieldSource::Metric], }; - let list = vec![ + let list: BTreeSet<_> = [ FieldSchema { name: String::from("field0"), ty: FieldType::I64, @@ -1414,11 +1418,13 @@ mod tests { ty: FieldType::IpAddr, source: FieldSource::Metric, }, - ]; + ] + .into_iter() + .collect(); assert_eq!(DbFieldList::from(list.clone()), db_list); assert_eq!(db_list, list.clone().into()); - let round_trip: Vec = + let round_trip: BTreeSet = DbFieldList::from(list.clone()).into(); assert_eq!(round_trip, list); } diff --git a/oximeter/db/src/query.rs b/oximeter/db/src/query.rs index e9e16007393..6a55d3f5181 100644 --- a/oximeter/db/src/query.rs +++ b/oximeter/db/src/query.rs @@ -721,6 +721,7 @@ mod tests { use crate::FieldSource; use crate::TimeseriesName; use chrono::NaiveDateTime; + use std::collections::BTreeSet; use std::convert::TryFrom; #[test] @@ -774,7 +775,7 @@ mod tests { fn test_select_query_builder_filter_raw() { let schema = TimeseriesSchema { timeseries_name: TimeseriesName::try_from("foo:bar").unwrap(), - field_schema: vec![ + field_schema: [ FieldSchema { name: "f0".to_string(), ty: FieldType::I64, @@ -785,7 +786,9 @@ mod tests { ty: FieldType::Bool, source: FieldSource::Target, }, - ], + ] + .into_iter() + .collect(), datum_type: DatumType::I64, created: Utc::now(), }; @@ -905,7 +908,7 @@ mod tests { fn test_select_query_builder_no_fields() { let schema = TimeseriesSchema { timeseries_name: TimeseriesName::try_from("foo:bar").unwrap(), - field_schema: vec![], + field_schema: BTreeSet::new(), datum_type: DatumType::I64, created: Utc::now(), }; @@ -927,7 +930,7 @@ mod tests { fn test_select_query_builder_limit_offset() { let schema = TimeseriesSchema { timeseries_name: TimeseriesName::try_from("foo:bar").unwrap(), - field_schema: vec![], + field_schema: BTreeSet::new(), datum_type: DatumType::I64, created: Utc::now(), }; @@ -996,7 +999,7 @@ mod tests { fn test_select_query_builder_no_selectors() { let schema = TimeseriesSchema { timeseries_name: TimeseriesName::try_from("foo:bar").unwrap(), - field_schema: vec![ + field_schema: [ FieldSchema { name: "f0".to_string(), ty: FieldType::I64, @@ -1007,7 +1010,9 @@ mod tests { ty: FieldType::Bool, source: FieldSource::Target, }, - ], + ] + .into_iter() + .collect(), datum_type: DatumType::I64, created: Utc::now(), }; @@ -1057,7 +1062,7 @@ mod tests { fn test_select_query_builder_field_selectors() { let schema = TimeseriesSchema { timeseries_name: TimeseriesName::try_from("foo:bar").unwrap(), - field_schema: vec![ + field_schema: [ FieldSchema { name: "f0".to_string(), ty: FieldType::I64, @@ -1068,7 +1073,9 @@ mod tests { ty: FieldType::Bool, source: FieldSource::Target, }, - ], + ] + .into_iter() + .collect(), datum_type: DatumType::I64, created: Utc::now(), }; @@ -1106,7 +1113,7 @@ mod tests { fn test_select_query_builder_full() { let schema = TimeseriesSchema { timeseries_name: TimeseriesName::try_from("foo:bar").unwrap(), - field_schema: vec![ + field_schema: [ FieldSchema { name: "f0".to_string(), ty: FieldType::I64, @@ -1117,7 +1124,9 @@ mod tests { ty: FieldType::Bool, source: FieldSource::Target, }, - ], + ] + .into_iter() + .collect(), datum_type: DatumType::I64, created: Utc::now(), };