Skip to content

Commit

Permalink
Sort fields when extracting timeseries schema
Browse files Browse the repository at this point in the history
- Fields are reported in a sample sorted within a target or metric, but
  not necessarily between them. When we derive a schema from a sample,
  this commit collects fields into a set to impose a total order.
- Convert between a `DbFieldList` and `BTreeSet` when inserting /
  reading fields from the nested tables in ClickHouse.
- Add sanity test that we're sorting field schema correctly, including
  on read from the database.
- Errors for schema mismatches report entire schema, not just fields.
- Logic around new schema was pretty complicated, and it was difficult
  to reason about its correctness. Make the lock async, and check both
  the internal cache and database when looking for a schema for a new
  sample.
  • Loading branch information
bnaecker committed Oct 23, 2023
1 parent 1cd3314 commit 24fddc9
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 83 deletions.
102 changes: 43 additions & 59 deletions oximeter/db/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use std::collections::BTreeSet;
use std::convert::TryFrom;
use std::net::SocketAddr;
use std::num::NonZeroU32;
use std::sync::Mutex;
use tokio::sync::Mutex;
use uuid::Uuid;

#[usdt::provider(provider = "clickhouse__client")]
Expand Down Expand Up @@ -208,16 +208,12 @@ impl Client {
&self,
name: &TimeseriesName,
) -> Result<Option<TimeseriesSchema>, Error> {
{
let map = self.schema.lock().unwrap();
if let Some(s) = map.get(name) {
return Ok(Some(s.clone()));
}
let mut schema = self.schema.lock().await;
if let Some(s) = schema.get(name) {
return Ok(Some(s.clone()));
}
// `get_schema` acquires the lock internally, so the above scope is required to avoid
// deadlock.
self.get_schema().await?;
Ok(self.schema.lock().unwrap().get(name).map(Clone::clone))
self.get_schema_locked(&mut schema).await?;
Ok(schema.get(name).map(Clone::clone))
}

/// List timeseries schema, paginated.
Expand Down Expand Up @@ -384,30 +380,38 @@ impl Client {
&self,
sample: &Sample,
) -> Result<Option<String>, Error> {
let schema = model::schema_for(sample);
let name = schema.timeseries_name.clone();
let maybe_new_schema = match self.schema.lock().unwrap().entry(name) {
Entry::Vacant(entry) => Ok(Some(entry.insert(schema).clone())),
let sample_schema = model::schema_for(sample);
let name = sample_schema.timeseries_name.clone();
let mut schema = self.schema.lock().await;
match schema.entry(name) {
Entry::Occupied(entry) => {
let existing_schema = entry.get();
if existing_schema == &schema {
if existing_schema == &sample_schema {
Ok(None)
} else {
let err =
error_for_schema_mismatch(&schema, &existing_schema);
error!(
self.log,
"timeseries schema mismatch, sample will be skipped: {}",
err
"timeseries schema mismatch, sample will be skipped";
"expected" => ?existing_schema,
"actual" => ?sample_schema,
"sample" => ?sample,
);
Err(err)
Err(Error::SchemaMismatch {
expected: existing_schema.clone(),
actual: sample_schema,
})
}
}
}?;
Ok(maybe_new_schema.map(|schema| {
serde_json::to_string(&model::DbTimeseriesSchema::from(schema))
.expect("Failed to convert schema to DB model")
}))
Entry::Vacant(entry) => {
entry.insert(sample_schema.clone());
Ok(Some(
serde_json::to_string(&model::DbTimeseriesSchema::from(
sample_schema,
))
.expect("Failed to convert schema to DB model"),
))
}
}
}

// Select the timeseries, including keys and field values, that match the given field-selection
Expand Down Expand Up @@ -503,10 +507,12 @@ impl Client {
response
}

async fn get_schema(&self) -> Result<(), Error> {
async fn get_schema_locked(
&self,
schema: &mut BTreeMap<TimeseriesName, TimeseriesSchema>,
) -> Result<(), Error> {
debug!(self.log, "retrieving timeseries schema from database");
let sql = {
let schema = self.schema.lock().unwrap();
if schema.is_empty() {
format!(
"SELECT * FROM {db_name}.timeseries_schema FORMAT JSONEachRow;",
Expand Down Expand Up @@ -545,7 +551,7 @@ impl Client {
);
(schema.timeseries_name.clone(), schema)
});
self.schema.lock().unwrap().extend(new);
schema.extend(new);
}
Ok(())
}
Expand Down Expand Up @@ -730,28 +736,6 @@ async fn handle_db_response(
}
}

// Generate an error describing a schema mismatch
fn error_for_schema_mismatch(
schema: &TimeseriesSchema,
existing_schema: &TimeseriesSchema,
) -> Error {
let expected = existing_schema
.field_schema
.iter()
.map(|field| (field.name.clone(), field.ty))
.collect();
let actual = schema
.field_schema
.iter()
.map(|field| (field.name.clone(), field.ty))
.collect();
Error::SchemaMismatch {
name: schema.timeseries_name.to_string(),
expected,
actual,
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -1599,7 +1583,7 @@ mod tests {
);

// Clear the internal caches of seen schema
client.schema.lock().unwrap().clear();
client.schema.lock().await.clear();

// Insert the new sample
client.insert_samples(&[sample.clone()]).await.unwrap();
Expand All @@ -1611,7 +1595,7 @@ mod tests {
let expected_schema = client
.schema
.lock()
.unwrap()
.await
.get(&timeseries_name)
.expect(
"After inserting a new sample, its schema should be included",
Expand Down Expand Up @@ -2484,13 +2468,13 @@ mod tests {
#[tokio::test]
async fn test_get_schema_no_new_values() {
let (mut db, client, _) = setup_filter_testcase().await;
let schema = &client.schema.lock().unwrap().clone();
client.get_schema().await.expect("Failed to get timeseries schema");
assert_eq!(
schema,
&*client.schema.lock().unwrap(),
"Schema shouldn't change"
);
let original_schema = client.schema.lock().await.clone();
let mut schema = client.schema.lock().await;
client
.get_schema_locked(&mut schema)
.await
.expect("Failed to get timeseries schema");
assert_eq!(&original_schema, &*schema, "Schema shouldn't change");
db.cleanup().await.expect("Failed to cleanup database");
}

Expand Down
140 changes: 132 additions & 8 deletions oximeter/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

//! Tools for interacting with the control plane telemetry database.
// Copyright 2021 Oxide Computer Company
// Copyright 2023 Oxide Computer Company

use crate::query::StringFieldSelector;
use chrono::{DateTime, Utc};
Expand All @@ -13,6 +13,7 @@ pub use oximeter::{DatumType, Field, FieldType, Measurement, Sample};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::convert::TryFrom;
use std::num::NonZeroU32;
use thiserror::Error;
Expand All @@ -36,12 +37,8 @@ pub enum Error {
Database(String),

/// A schema provided when collecting samples did not match the expected schema
#[error("Schema mismatch for timeseries '{name}', expected fields {expected:?} found fields {actual:?}")]
SchemaMismatch {
name: String,
expected: BTreeMap<String, FieldType>,
actual: BTreeMap<String, FieldType>,
},
#[error("Schema mismatch for timeseries '{0}'", expected.timeseries_name)]
SchemaMismatch { expected: TimeseriesSchema, actual: TimeseriesSchema },

#[error("Timeseries not found for: {0}")]
TimeseriesNotFound(String),
Expand Down Expand Up @@ -153,6 +150,13 @@ impl std::convert::TryFrom<String> for TimeseriesName {
}
}

impl std::str::FromStr for TimeseriesName {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
s.try_into()
}
}

impl<T> PartialEq<T> for TimeseriesName
where
T: AsRef<str>,
Expand All @@ -177,7 +181,7 @@ fn validate_timeseries_name(s: &str) -> Result<&str, Error> {
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
pub struct TimeseriesSchema {
pub timeseries_name: TimeseriesName,
pub field_schema: Vec<FieldSchema>,
pub field_schema: BTreeSet<FieldSchema>,
pub datum_type: DatumType,
pub created: DateTime<Utc>,
}
Expand Down Expand Up @@ -398,6 +402,8 @@ const TIMESERIES_NAME_REGEX: &str =
#[cfg(test)]
mod tests {
use super::*;
use crate::model::DbFieldList;
use crate::model::DbTimeseriesSchema;
use std::convert::TryFrom;
use uuid::Uuid;

Expand Down Expand Up @@ -505,4 +511,122 @@ mod tests {
&output.join("\n"),
);
}

// Test that we correctly order field across a target and metric.
//
// In an earlier commit, we switched from storing fields in an unordered Vec
// to using a BTree{Map,Set} to ensure ordering by name. However, the
// `TimeseriesSchema` type stored all its fields by chaining the sorted
// fields from the target and metric, without then sorting _across_ them.
//
// This was exacerbated by the error reporting, where we did in fact sort
// all fields across the target and metric, making it difficult to tell how
// the derived schema was different, if at all.
//
// This test generates a sample with a schema where the target and metric
// fields are sorted within them, but not across them. We check that the
// derived schema are actually equal, which means we've imposed that
// ordering when deriving the schema.
#[test]
fn test_schema_field_ordering_across_target_metric() {
let target_field = FieldSchema {
name: String::from("later"),
ty: FieldType::U64,
source: FieldSource::Target,
};
let metric_field = FieldSchema {
name: String::from("earlier"),
ty: FieldType::U64,
source: FieldSource::Metric,
};
let timeseries_name: TimeseriesName = "foo:bar".parse().unwrap();
let datum_type = DatumType::U64;
let field_schema =
[target_field.clone(), metric_field.clone()].into_iter().collect();
let expected_schema = TimeseriesSchema {
timeseries_name,
field_schema,
datum_type,
created: Utc::now(),
};

#[derive(oximeter::Target)]
struct Foo {
later: u64,
}
#[derive(oximeter::Metric)]
struct Bar {
earlier: u64,
datum: u64,
}

let target = Foo { later: 1 };
let metric = Bar { earlier: 2, datum: 10 };
let sample = Sample::new(&target, &metric).unwrap();
let derived_schema = model::schema_for(&sample);
assert_eq!(derived_schema, expected_schema);
}

#[test]
fn test_unsorted_db_fields_are_sorted_on_read() {
let target_field = FieldSchema {
name: String::from("later"),
ty: FieldType::U64,
source: FieldSource::Target,
};
let metric_field = FieldSchema {
name: String::from("earlier"),
ty: FieldType::U64,
source: FieldSource::Metric,
};
let timeseries_name: TimeseriesName = "foo:bar".parse().unwrap();
let datum_type = DatumType::U64;
let field_schema =
[target_field.clone(), metric_field.clone()].into_iter().collect();
let expected_schema = TimeseriesSchema {
timeseries_name: timeseries_name.clone(),
field_schema,
datum_type,
created: Utc::now(),
};

// The fields here are sorted by target and then metric, which is how we
// used to insert them into the DB. We're checking that they are totally
// sorted when we read them out of the DB, even though they are not in
// the extracted model type.
let db_fields = DbFieldList {
names: vec![target_field.name.clone(), metric_field.name.clone()],
types: vec![target_field.ty.into(), metric_field.ty.into()],
sources: vec![
target_field.source.into(),
metric_field.source.into(),
],
};
let db_schema = DbTimeseriesSchema {
timeseries_name: timeseries_name.to_string(),
field_schema: db_fields,
datum_type: datum_type.into(),
created: expected_schema.created,
};
assert_eq!(expected_schema, TimeseriesSchema::from(db_schema));
}

#[test]
fn test_field_schema_ordering() {
let mut fields = BTreeSet::new();
fields.insert(FieldSchema {
name: String::from("second"),
ty: FieldType::U64,
source: FieldSource::Target,
});
fields.insert(FieldSchema {
name: String::from("first"),
ty: FieldType::U64,
source: FieldSource::Target,
});
let mut iter = fields.iter();
assert_eq!(iter.next().unwrap().name, "first");
assert_eq!(iter.next().unwrap().name, "second");
assert!(iter.next().is_none());
}
}
Loading

0 comments on commit 24fddc9

Please sign in to comment.