Skip to content

Commit

Permalink
Add functions to catch timeseries schema changes (#4602)
Browse files Browse the repository at this point in the history
- Move schema types from the `oximeter-db` crate to `oximeter` proper
- Add a `SchemaSet` for storing a unique set of timeseries schema, and
comparing against a set deserialized from file. This works like
expectorate, with a few tweaks, and allows developers to catch any
changes. If we want to relax this to catching _compatible_ changes, for
some definition of that, we can do that pretty easily.
  • Loading branch information
bnaecker authored Dec 11, 2023
1 parent 4ad7325 commit 0c5c559
Show file tree
Hide file tree
Showing 14 changed files with 881 additions and 368 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions openapi/nexus-internal.json
Original file line number Diff line number Diff line change
Expand Up @@ -4231,6 +4231,20 @@
"content",
"type"
]
},
{
"type": "object",
"properties": {
"type": {
"type": "string",
"enum": [
"invalid_timeseries_name"
]
}
},
"required": [
"type"
]
}
]
},
Expand Down
14 changes: 14 additions & 0 deletions openapi/sled-agent.json
Original file line number Diff line number Diff line change
Expand Up @@ -5209,6 +5209,20 @@
"content",
"type"
]
},
{
"type": "object",
"properties": {
"type": {
"type": "string",
"enum": [
"invalid_timeseries_name"
]
}
},
"required": [
"type"
]
}
]
},
Expand Down
53 changes: 53 additions & 0 deletions oximeter/collector/src/self_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,15 @@ impl CollectionTaskStats {

#[cfg(test)]
mod tests {
use super::Collections;
use super::Cumulative;
use super::FailedCollections;
use super::FailureReason;
use super::OximeterCollector;
use super::StatusCode;
use oximeter::schema::SchemaSet;
use std::net::IpAddr;
use std::net::Ipv6Addr;

#[test]
fn test_failure_reason_serialization() {
Expand All @@ -168,4 +175,50 @@ mod tests {
assert_eq!(variant.to_string(), *as_str);
}
}

const fn collector() -> OximeterCollector {
OximeterCollector {
collector_id: uuid::uuid!("cfebaa5f-3ba9-4bb5-9145-648d287df78a"),
collector_ip: IpAddr::V6(Ipv6Addr::LOCALHOST),
collector_port: 12345,
}
}

fn collections() -> Collections {
Collections {
producer_id: uuid::uuid!("718452ab-7cca-42f6-b8b1-1aaaa1b09104"),
producer_ip: IpAddr::V6(Ipv6Addr::LOCALHOST),
producer_port: 12345,
base_route: String::from("/"),
datum: Cumulative::new(0),
}
}

fn failed_collections() -> FailedCollections {
FailedCollections {
producer_id: uuid::uuid!("718452ab-7cca-42f6-b8b1-1aaaa1b09104"),
producer_ip: IpAddr::V6(Ipv6Addr::LOCALHOST),
producer_port: 12345,
base_route: String::from("/"),
reason: FailureReason::Unreachable.to_string(),
datum: Cumulative::new(0),
}
}

// Check that the self-stat timeseries schema have not changed.
#[test]
fn test_no_schema_changes() {
let collector = collector();
let collections = collections();
let failed = failed_collections();
let mut set = SchemaSet::default();
assert!(set.insert_checked(&collector, &collections).is_none());
assert!(set.insert_checked(&collector, &failed).is_none());

const PATH: &'static str = concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/output/self-stat-schema.json"
);
set.assert_contents(PATH);
}
}
91 changes: 91 additions & 0 deletions oximeter/collector/tests/output/self-stat-schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
{
"oximeter_collector:collections": {
"timeseries_name": "oximeter_collector:collections",
"field_schema": [
{
"name": "base_route",
"field_type": "string",
"source": "metric"
},
{
"name": "collector_id",
"field_type": "uuid",
"source": "target"
},
{
"name": "collector_ip",
"field_type": "ip_addr",
"source": "target"
},
{
"name": "collector_port",
"field_type": "u16",
"source": "target"
},
{
"name": "producer_id",
"field_type": "uuid",
"source": "metric"
},
{
"name": "producer_ip",
"field_type": "ip_addr",
"source": "metric"
},
{
"name": "producer_port",
"field_type": "u16",
"source": "metric"
}
],
"datum_type": "cumulative_u64",
"created": "2023-12-04T17:49:47.797495948Z"
},
"oximeter_collector:failed_collections": {
"timeseries_name": "oximeter_collector:failed_collections",
"field_schema": [
{
"name": "base_route",
"field_type": "string",
"source": "metric"
},
{
"name": "collector_id",
"field_type": "uuid",
"source": "target"
},
{
"name": "collector_ip",
"field_type": "ip_addr",
"source": "target"
},
{
"name": "collector_port",
"field_type": "u16",
"source": "target"
},
{
"name": "producer_id",
"field_type": "uuid",
"source": "metric"
},
{
"name": "producer_ip",
"field_type": "ip_addr",
"source": "metric"
},
{
"name": "producer_port",
"field_type": "u16",
"source": "metric"
},
{
"name": "reason",
"field_type": "string",
"source": "metric"
}
],
"datum_type": "cumulative_u64",
"created": "2023-12-04T17:49:47.799970009Z"
}
}
4 changes: 2 additions & 2 deletions oximeter/db/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ impl Client {
&self,
sample: &Sample,
) -> Result<Option<(TimeseriesName, String)>, Error> {
let sample_schema = model::schema_for(sample);
let sample_schema = TimeseriesSchema::from(sample);
let name = sample_schema.timeseries_name.clone();
let mut schema = self.schema.lock().await;

Expand Down Expand Up @@ -1873,7 +1873,7 @@ mod tests {
client.insert_samples(&[sample.clone()]).await.unwrap();

// The internal map should now contain both the new timeseries schema
let actual_schema = model::schema_for(&sample);
let actual_schema = TimeseriesSchema::from(&sample);
let timeseries_name =
TimeseriesName::try_from(sample.timeseries_name.as_str()).unwrap();
let expected_schema = client
Expand Down
Loading

0 comments on commit 0c5c559

Please sign in to comment.