diff --git a/Cargo.lock b/Cargo.lock index 9cb5773386..616ed26de1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -909,9 +909,9 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.31" +version = "0.4.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b" dependencies = [ "android-tzdata", "iana-time-zone", @@ -919,7 +919,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.48.5", + "windows-targets 0.52.0", ] [[package]] @@ -4009,16 +4009,6 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" -[[package]] -name = "matrixmultiply" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7574c1cf36da4798ab73da5b215bbf444f50718207754cb522201d78d1cd0ff2" -dependencies = [ - "autocfg", - "rawpointer", -] - [[package]] name = "maybe-uninit" version = "2.0.0" @@ -4230,21 +4220,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "ndarray" -version = "0.15.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adb12d4e967ec485a5f71c6311fe28158e9d6f4bc4a447b474184d0f91a8fa32" -dependencies = [ - "matrixmultiply", - "num-complex", - "num-integer", - "num-traits", - "rawpointer", - "rayon", - "serde", -] - [[package]] name = "nested" version = "0.1.1" @@ -5808,7 +5783,6 @@ dependencies = [ "highway", "indexmap 2.2.3", "itertools 0.12.1", - "ndarray", "omicron-common", "omicron-test-utils", "omicron-workspace-hack", @@ -6958,12 +6932,6 @@ dependencies = [ "unicode-width", ] -[[package]] -name = "rawpointer" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60a357793950651c4ed0f3f52338f53b2f809f32d83a07f72909fa13e4c6c1e3" - [[package]] name = "rayon" version = "1.8.0" diff --git a/Cargo.toml b/Cargo.toml index 27ad3e54d3..bffedbdf03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -179,13 +179,11 @@ chrono = { version = "0.4", features = [ "serde" ] } clap = { version = "4.5", features = ["cargo", "derive", "env", "wrap_help"] } cookie = "0.18" criterion = { version = "0.5.1", features = [ "async_tokio" ] } -crossbeam = "0.8" crossterm = { version = "0.27.0", features = ["event-stream"] } crucible-agent-client = { git = "https://github.com/oxidecomputer/crucible", rev = "796dce526dd7ed7b52a0429a486ccba4a9da1ce5" } crucible-pantry-client = { git = "https://github.com/oxidecomputer/crucible", rev = "796dce526dd7ed7b52a0429a486ccba4a9da1ce5" } crucible-smf = { git = "https://github.com/oxidecomputer/crucible", rev = "796dce526dd7ed7b52a0429a486ccba4a9da1ce5" } csv = "1.3.0" -curve25519-dalek = "4" datatest-stable = "0.2.3" display-error-chain = "0.2.0" ddm-admin-client = { path = "clients/ddm-admin-client" } @@ -252,7 +250,6 @@ mockall = "0.12" newtype_derive = "0.1.6" mg-admin-client = { path = "clients/mg-admin-client" } multimap = "0.8.1" -ndarray = { version = "0.15.6", default-features = false, features = ["std", "rayon", "serde"] } nexus-blueprint-execution = { path = "nexus/blueprint-execution" } nexus-client = { path = "clients/nexus-client" } nexus-db-model = { path = "nexus/db-model" } @@ -273,7 +270,6 @@ omicron-workspace-hack = "0.1.0" omicron-common = { path = "common" } omicron-gateway = { path = "gateway" } omicron-nexus = { path = "nexus" } -omicron-package = { path = "package" } omicron-rpaths = { path = "rpaths" } omicron-sled-agent = { path = "sled-agent" } omicron-test-utils = { path = "test-utils" } @@ -297,7 +293,6 @@ oximeter-collector = { path = "oximeter/collector" } oximeter-instruments = { path = "oximeter/instruments" } oximeter-macro-impl = { path = "oximeter/oximeter-macro-impl" } oximeter-producer = { path = "oximeter/producer" } -p256 = "0.13" parse-display = "0.9.0" partial-io = { version = "0.5.4", features = ["proptest1", "tokio1"] } parse-size = "1.0.0" @@ -338,7 +333,6 @@ schemars = "0.8.16" secrecy = "0.8.0" semver = { version = "1.0.21", features = ["std", "serde"] } serde = { version = "1.0", default-features = false, features = [ "derive" ] } -serde_derive = "1.0" serde_human_bytes = { git = "http://github.com/oxidecomputer/serde_human_bytes", branch = "main" } serde_json = "1.0.113" serde_path_to_error = "0.1.15" @@ -446,12 +440,6 @@ debug = "line-tables-only" # times, because it allows target and host dependencies to be unified. debug = "line-tables-only" -# `lalrpop` is used by `polar-core`'s build script; building it with -# optimizations makes that build script run ~20x faster, more than offsetting -# the additional build time added to `lalrpop` itself. -[profile.dev.package.lalrpop] -opt-level = 3 - # Password hashing is expensive by construction. Build the hashing libraries # with optimizations to significantly speed up tests. [profile.dev.package.argon2] diff --git a/oximeter/db/Cargo.toml b/oximeter/db/Cargo.toml index 030cfdd71b..9d40d43c61 100644 --- a/oximeter/db/Cargo.toml +++ b/oximeter/db/Cargo.toml @@ -37,10 +37,6 @@ features = [ "serde" ] workspace = true optional = true -[dependencies.ndarray] -workspace = true -optional = true - [dependencies.peg] workspace = true optional = true @@ -97,7 +93,7 @@ sql = [ "dep:sqlparser", "dep:tabled" ] -oxql = [ "dep:ndarray", "dep:peg", "dep:reedline", "dep:tabled" ] +oxql = ["dep:peg", "dep:reedline", "dep:tabled"] [[bin]] name = "oxdb" diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs index f34c986aa0..19cef998b3 100644 --- a/oximeter/db/src/client.rs +++ b/oximeter/db/src/client.rs @@ -372,7 +372,7 @@ impl Client { // type of the timeseries. All timeseries support "timestamp". let ident = atom.ident.as_str(); if ident == "timestamp" { - if matches!(atom.expr, oxql::Literal::String(_)) { + if matches!(atom.expr, oxql::Literal::Timestamp(_)) { return Ok(Some(atom.to_db_safe_string())); } return Err(Error::from(oxql::Error::InvalidTimestampType)); @@ -434,6 +434,16 @@ impl Client { &self, query: impl AsRef, ) -> Result<(), Error> { + // TODO-security: Need a way to implement authz checks for things like + // viewing resources in another project or silo. + // + // I think one way to do that is look at the predicates and make sure + // they refer to things the user has access to. Another is to add some + // implicit predicates here, indicating the subset of fields that the + // query should be able to access. + // + // This probably means we'll need to parse the query in Nexus, so that + // we can attach the other filters ourselves. let query = oxql::Query::new(query.as_ref())?; let mut results = BTreeMap::new(); @@ -452,26 +462,36 @@ impl Client { .select_matching_timeseries_info(&all_fields_query, &schema) .await?; + // If there are no consistent keys, skip this entirely. + if info.is_empty() { + continue; + } + // Insert an empty result array into the results now. In the case // where there are no such results, the result of the // measurement-selection query will be empty, but we still want an // array here. for (key, (target, metric)) in info.iter() { - results.entry(key.clone()).or_insert_with(|| { - oxql::Timeseries { - fields: target - .fields - .iter() - .chain(metric.fields.iter()) - .map(|field| { - (field.name.clone(), field.value.clone()) - }) - .collect(), - data: Vec::new(), - } - }); + if let Entry::Vacant(entry) = results.entry(key.clone()) { + let new = oxql::Timeseries::new( + target.fields.iter().chain(metric.fields.iter()).map( + |field| (field.name.clone(), field.value.clone()), + ), + std::iter::once(metric.datum_type), + )?; + entry.insert(new); + } } + // First, let's check that we're not going to return an enormous + // amount of data here. + self.verify_measurement_query_limit( + &schema, + preds.as_ref(), + info.keys(), + ) + .await?; + // Fetch the consistent measurements for this timeseries. let measurements_query = self.measurements_query(&schema, preds.as_ref(), info.keys())?; @@ -483,14 +503,67 @@ impl Client { results .get_mut(&key) .expect("All keys inserted above") - .data - .push(measurement); - } - for key in info.keys() { - println!("{key}: {:?}", results.get(key).unwrap()); + .values + .push(vec![measurement])?; } } + for (key, timeseries) in results.into_iter() { + println!("{key} {}: {timeseries:#?}", timeseries.key()); + } + + // Ok, here are the relevant data structures + // + // // A single list of measurements, all with the same field names and + // // values. This is the thing we have above in `results`. + // struct Timeseries { + // pub fields: BTreeMap, + // pub data: Vec>, // Could be Array2 + // } + // + // // A list of timeseries, I think all with the same set of field + // _names_, though possibly not values. I.e., the result of a single + // `get` or `group_by`, or after merging >= 2 tables in a `join` or + // `union` operation. + // pub struct Table { + // pub timeseries: Vec, + // } + // + // And then the table operation / timeseries transformation trait looks + // like: + // + // pub trait TableOp { + // fn apply(&self, tables: &[Table]) -> Vec; + // } + // + // A filter operation takes 1 table, and produces 1 table, filtering to + // those that match the set of predicates. + // + // A union operation takes 2 or more tables, and produces 1 table. I + // think all tables need the same set of field names (not values). It is + // effectively just a flatten operation, concatenating all the tables + // into 1 table. + // + // A join operation takes 2 or more tables, and produces 1 table. All + // the tables need the same set of field names. Instead of concatenating + // all the timeseries into one vec, it "stacks" the samples from + // timeseries with the same field names / values / timestamp. (Need + // alignment here.) So you get a data array where each sample as 2 or + // more datum values, one for each input table. + + // We now have a map from key -> { fields, data }, for every timeseries + // referenced in the query. The key itself isn't really relevant, but we + // basically want to run the _values_ as set of "tables" through the + // rest of the pipeline now. + // + // Something like: + // + // let mut tables: Vec<_> = results.into_values().collect(); + // for transform in query.transformations { + // tables = transform.apply(tables)?; + // } + // Ok(tables) + // Get the list of referenced timeseries. // // For each of them, construct the all_fields query from its schema. @@ -510,12 +583,70 @@ impl Client { Ok(()) } + #[cfg(any(feature = "oxql", test))] + async fn verify_measurement_query_limit<'keys>( + &self, + schema: &TimeseriesSchema, + preds: Option<&oxql::FilterItem>, + consistent_keys: impl ExactSizeIterator, + ) -> Result<(), Error> { + const MAX_ROWS_PER_TIMESERIES_PER_QUERY: usize = 100_000; + let count_query = + self.count_measurements_query(&schema, preds, consistent_keys)?; + let res = self.execute_with_body(&count_query).await?; + let Ok(count) = res.trim().parse() else { + return Err(Error::Database( + "Failed to get measurement query row count".to_string(), + )); + }; + if count > MAX_ROWS_PER_TIMESERIES_PER_QUERY { + return Err(Error::from(oxql::Error::QueryTooLarge { + count, + max: MAX_ROWS_PER_TIMESERIES_PER_QUERY, + })); + } + Ok(()) + } + + // Return a query that will select the count of measurements consistent with + // the provided predicates and keys. + #[cfg(any(feature = "oxql", test))] + fn count_measurements_query<'keys>( + &self, + schema: &TimeseriesSchema, + preds: Option<&oxql::FilterItem>, + consistent_keys: impl ExactSizeIterator, + ) -> Result { + self.measurements_query_impl( + schema, + preds, + consistent_keys, + /* select_count = */ true, + ) + } + #[cfg(any(feature = "oxql", test))] fn measurements_query<'keys>( &self, schema: &TimeseriesSchema, preds: Option<&oxql::FilterItem>, consistent_keys: impl ExactSizeIterator, + ) -> Result { + self.measurements_query_impl( + schema, + preds, + consistent_keys, + /* select_count = */ false, + ) + } + + #[cfg(any(feature = "oxql", test))] + fn measurements_query_impl<'keys>( + &self, + schema: &TimeseriesSchema, + preds: Option<&oxql::FilterItem>, + consistent_keys: impl ExactSizeIterator, + select_count: bool, ) -> Result { use std::fmt::Write; @@ -526,7 +657,8 @@ impl Client { .map(|p| Self::rewrite_predicate_for_measurements(schema, p)) .transpose()? .flatten(); - let mut query = self.measurements_query_raw(schema.datum_type); + let mut query = + self.measurements_query_raw(schema.datum_type, select_count); query.push_str(" WHERE "); if let Some(preds) = preds_for_measurements { query.push_str(&preds); @@ -545,8 +677,13 @@ impl Client { ); query.push_str(")"); } - query.push_str(" FORMAT "); - query.push_str(crate::DATABASE_SELECT_FORMAT); + // Use JSON format if we're selecting data, otherwise, use the default, + // which is TSV. This is just easier to parse and check for the count() + // query. + if !select_count { + query.push_str(" FORMAT "); + query.push_str(crate::DATABASE_SELECT_FORMAT); + } Ok(query) } @@ -554,16 +691,21 @@ impl Client { fn measurements_query_raw( &self, datum_type: oximeter::DatumType, + select_count: bool, ) -> String { - let value_columns = if datum_type.is_histogram() { - "start_time, bins, counts" - } else if datum_type.is_cumulative() { - "start_time, datum" + let value_columns = if select_count { + "count()" } else { - "datum" + if datum_type.is_histogram() { + "timeseries_key, timestamp, start_time, bins, counts" + } else if datum_type.is_cumulative() { + "timeseries_key, timestamp, start_time, datum" + } else { + "timeseries_key, timestamp, datum" + } }; format!( - "SELECT timeseries_key, timestamp, {} \ + "SELECT {} \ FROM {}", value_columns, crate::query::measurement_table_name(datum_type), diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index 80e5eb09b9..13c0d29317 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -64,7 +64,7 @@ pub enum Error { BadMetadata { key: String, msg: String }, /// An error interacting with the telemetry database - #[error("Error interacting with telemetry database")] + #[error("Error interacting with telemetry database: {0}")] Database(String), /// A schema provided when collecting samples did not match the expected schema diff --git a/oximeter/db/src/oxql/mod.rs b/oximeter/db/src/oxql/mod.rs index b4a6e95b27..a0e48cc151 100644 --- a/oximeter/db/src/oxql/mod.rs +++ b/oximeter/db/src/oxql/mod.rs @@ -4,6 +4,7 @@ // Copyright 2024 Oxide Computer Company +use oximeter::DatumType; use oximeter::FieldType; use peg::error::ParseError as PegError; use peg::str::LineCol; @@ -25,11 +26,37 @@ pub enum Error { )] InvalidFieldType { field_name: String, expected: FieldType }, - #[error("Timestamp expressions must evaluate to a string")] + #[error("Must provide a timestamp expression")] InvalidTimestampType, #[error("Expression type does not match datum type ({0})")] InvalidDatumType(oximeter::DatumType), + + #[error("Timeseries must have at least one field")] + EmptyFields, + + #[error("Timeseries must have at least one datum type")] + EmptyDatumTypes, + + #[error( + "Value has incorrect type at measurement \ + {value_index}, expected {expected_type} \ + found {actual_type}" + )] + WrongDatumType { + value_index: usize, + expected_type: DatumType, + actual_type: DatumType, + }, + + #[error( + "Value has incorrect dimensionality, \ + expected {expected}, found {found}" + )] + WrongValueDimensionality { expected: usize, found: usize }, + + #[error("OxQL query result returns {count} rows, exceeding max of {max}")] + QueryTooLarge { count: usize, max: usize }, } /// An error during OxQL processing. diff --git a/oximeter/db/src/oxql/query/ast.rs b/oximeter/db/src/oxql/query/ast.rs index 05f0e1caad..f730f5adee 100644 --- a/oximeter/db/src/oxql/query/ast.rs +++ b/oximeter/db/src/oxql/query/ast.rs @@ -6,8 +6,13 @@ // Copyright 2024 Oxide Computer Company -use oximeter::{DatumType, FieldType, TimeseriesName}; +use chrono::DateTime; +use chrono::Utc; +use oximeter::DatumType; +use oximeter::FieldType; +use oximeter::TimeseriesName; use std::fmt; +use std::net::IpAddr; use std::time::Duration; use uuid::Uuid; @@ -52,6 +57,8 @@ pub enum Literal { Boolean(bool), Uuid(Uuid), Duration(Duration), + Timestamp(DateTime), + IpAddr(IpAddr), } impl Literal { @@ -66,6 +73,13 @@ impl Literal { let (count, interval) = duration_to_db_interval(inner); format!("INTERVAL {} {}", count, interval) } + Literal::Timestamp(inner) => { + format!("'{}'", inner.format(crate::DATABASE_TIMESTAMP_FORMAT)) + } + Literal::IpAddr(inner) => { + let fn_name = if inner.is_ipv6() { "toIPv6" } else { "toIPv4" }; + format!("{fn_name}('{inner}')") + } } } } @@ -134,6 +148,8 @@ impl fmt::Display for Literal { Literal::Boolean(inner) => write!(f, "{inner}"), Literal::Uuid(inner) => write!(f, "\"{inner}\""), Literal::Duration(inner) => write!(f, "{inner:?}"), + Literal::Timestamp(inner) => write!(f, "@{inner}"), + Literal::IpAddr(inner) => write!(f, "{inner}"), } } } @@ -184,6 +200,7 @@ impl fmt::Display for LogicalOp { } /// Comparison operators. +// TODO-completeness: Operators for other types, like IP containment ('<<'). #[derive(Clone, Copy, Debug, PartialEq)] pub enum Comparison { Eq, @@ -194,6 +211,7 @@ pub enum Comparison { Le, Like, } + impl Comparison { fn to_db_safe_string(&self) -> &'static str { match self { @@ -276,6 +294,8 @@ impl FilterAtom { Literal::Boolean(_) => matches!(field_type, FieldType::Bool), Literal::Uuid(_) => matches!(field_type, FieldType::Uuid), Literal::Duration(_) => false, + Literal::Timestamp(_) => false, + Literal::IpAddr(_) => matches!(field_type, FieldType::IpAddr), } } @@ -313,6 +333,8 @@ impl FilterAtom { Literal::Boolean(_) => matches!(datum_type, DatumType::Bool), Literal::Uuid(_) => false, Literal::Duration(_) => false, + Literal::Timestamp(_) => false, + Literal::IpAddr(_) => false, } } } diff --git a/oximeter/db/src/oxql/query/grammar.rs b/oximeter/db/src/oxql/query/grammar.rs index 306fd74d61..0e161ea168 100644 --- a/oximeter/db/src/oxql/query/grammar.rs +++ b/oximeter/db/src/oxql/query/grammar.rs @@ -24,6 +24,14 @@ peg::parser! { use oximeter::TimeseriesName; use std::time::Duration; use uuid::Uuid; + use chrono::Utc; + use chrono::DateTime; + use chrono::NaiveDateTime; + use chrono::NaiveDate; + use chrono::NaiveTime; + use std::net::IpAddr; + use std::net::Ipv4Addr; + use std::net::Ipv6Addr; rule _ = quiet!{[' ' | '\n' | '\t']+} @@ -93,6 +101,59 @@ peg::parser! { pub rule duration_literal() -> Literal = d:duration_literal_impl() { Literal::Duration(d) } + + /// Parse a literal timestamp. + /// + /// Timestamps are literals prefixed with `@`. They can be in one of + /// several formats: + /// + /// - YYYY-MM-DD + /// - HH:MM:SS[.f] + /// - RFC 3339, `YYYY-MM-DDTHH:MM:SS.f` + /// - The literal `now()` + /// + /// All timestamps are in UTC. + pub rule timestamp_literal() -> Literal + = t:timestamp_literal_impl() { Literal::Timestamp(t) } + + rule timestamp_literal_impl() -> DateTime + = timestamp_string() + / now_timestamp() + + pub(in super) rule timestamp_string() -> DateTime + = "@" s:$(['0'..='9' | '-' | 'T' | ':' | '.']+) + {? + if let Ok(t) = NaiveDate::parse_from_str(s, "%F") { + return Ok(t.and_hms_opt(0, 0, 0).unwrap().and_utc()); + } + if let Ok(t) = NaiveTime::parse_from_str(s, "%H:%M:%S%.f") { + return Ok(NaiveDateTime::new(Utc::now().date_naive(), t).and_utc()); + } + if let Ok(t) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") { + return Ok(t.and_utc()); + } + Err("a recognized timestamp format") + } + + rule now_timestamp() -> DateTime = "@now()" { Utc::now() } + + /// Parse an IP address literal, either IPv4 or IPv6 + pub rule ip_literal() -> Literal + = ip:ipv4_literal() { Literal::IpAddr(IpAddr::V4(ip)) } + / ip:ipv6_literal() { Literal::IpAddr(IpAddr::V6(ip)) } + + pub(in super) rule ipv4_literal() -> Ipv4Addr + = s:$((['0'..='9']*<1,3>)**<4> ".") + {? + s.parse().map_err(|_| "an IPv4 address") + } + + pub(in super) rule ipv6_literal() -> Ipv6Addr + = s:$(['a'..='f' | '0'..='9' | ':']+) + {? + s.parse().map_err(|_| "an IPv6 address") + } + rule dashed_uuid_literal() -> Uuid = s:$( "\"" @@ -217,7 +278,9 @@ peg::parser! { integer_literal() / double_literal() / uuid_literal() / - string_literal() + ip_literal() / + string_literal() / + timestamp_literal() ) { lit @@ -442,6 +505,14 @@ mod tests { use crate::oxql::query::ast::LogicalOp; use crate::oxql::query::ast::Reducer; use crate::oxql::query::grammar::recognize_escape_sequences; + use chrono::NaiveDate; + use chrono::NaiveDateTime; + use chrono::NaiveTime; + use chrono::TimeZone; + use chrono::Utc; + use std::net::IpAddr; + use std::net::Ipv4Addr; + use std::net::Ipv6Addr; use std::time::Duration; use uuid::Uuid; @@ -839,4 +910,77 @@ mod tests { assert_eq!(query_parser::reducer("mean").unwrap(), Reducer::Mean); assert!(query_parser::reducer("foo").is_err()); } + + #[test] + fn test_parse_literal_timestamp_string() { + assert_eq!( + query_parser::timestamp_string("@2020-01-01").unwrap(), + Utc.with_ymd_and_hms(2020, 01, 01, 0, 0, 0).unwrap(), + ); + assert_eq!( + query_parser::timestamp_string("@01:01:01").unwrap().time(), + NaiveTime::from_hms_opt(01, 01, 01).unwrap(), + ); + assert_eq!( + query_parser::timestamp_string("@01:01:01.123456").unwrap().time(), + NaiveTime::from_hms_micro_opt(01, 01, 01, 123456).unwrap(), + ); + assert_eq!( + query_parser::timestamp_string("@2020-01-01T01:01:01.123456") + .unwrap(), + NaiveDateTime::new( + NaiveDate::from_ymd_opt(2020, 01, 01).unwrap(), + NaiveTime::from_hms_micro_opt(01, 01, 01, 123456).unwrap(), + ) + .and_utc(), + ); + } + + #[test] + fn test_parse_ipv4_literal() { + let check = |s: &str, addr: IpAddr| { + let Literal::IpAddr(ip) = query_parser::ip_literal(s).unwrap() + else { + panic!("expected '{}' to be parsed into {}", s, addr); + }; + assert_eq!(ip, addr); + }; + check("100.100.100.100", Ipv4Addr::new(100, 100, 100, 100).into()); + check("1.2.3.4", Ipv4Addr::new(1, 2, 3, 4).into()); + check("0.0.0.0", Ipv4Addr::UNSPECIFIED.into()); + + assert!(query_parser::ip_literal("abcd").is_err()); + assert!(query_parser::ip_literal("1.1.1.").is_err()); + assert!(query_parser::ip_literal("1.1.1.1.1.1").is_err()); + assert!(query_parser::ip_literal("2555.1.1.1").is_err()); + } + + #[test] + fn test_parse_ipv6_literal() { + let check = |s: &str, addr: IpAddr| { + let Literal::IpAddr(ip) = query_parser::ip_literal(s).unwrap() + else { + panic!("expected '{}' to be parsed into {}", s, addr); + }; + assert_eq!(ip, addr); + }; + + // IPv6 is nuts, let's just check a few common patterns. + check("::1", Ipv6Addr::LOCALHOST.into()); + check("::", Ipv6Addr::UNSPECIFIED.into()); + check("fd00::1", Ipv6Addr::new(0xfd00, 0, 0, 0, 0, 0, 0, 1).into()); + check( + "fd00:1:2:3:4:5:6:7", + Ipv6Addr::new(0xfd00, 1, 2, 3, 4, 5, 6, 7).into(), + ); + + // Don't currently support IPv6-mapped IPv4 addresses + assert!(query_parser::ip_literal("::ffff:127.0.0.1").is_err()); + + // Other obviously bad patterns. + assert!(query_parser::ip_literal("1").is_err()); + assert!(query_parser::ip_literal(":1::1::1").is_err()); + assert!(query_parser::ip_literal("::g").is_err()); + assert!(query_parser::ip_literal(":::").is_err()); + } } diff --git a/oximeter/db/src/oxql/transformation.rs b/oximeter/db/src/oxql/transformation.rs index 9f7f95d969..2ae056b59f 100644 --- a/oximeter/db/src/oxql/transformation.rs +++ b/oximeter/db/src/oxql/transformation.rs @@ -8,23 +8,129 @@ use super::Error; use crate::Client; -use ndarray::Array2; -use oximeter::Datum; +use crate::TimeseriesKey; +use highway::HighwayHasher; +use oximeter::DatumType; use oximeter::FieldValue; +use oximeter::Measurement; use std::collections::BTreeMap; +use std::hash::Hash; +use std::hash::Hasher; + +/// A list of data values for a timeseries. +#[derive(Clone, Debug)] +pub struct Values { + // The list of values. + // + // Each value contains one or more measurement. In the case where a single + // timeseries is selected from the database, each element of `measurements` + // has an array of length 1. + // + // As timeseries are joined together in various ways, their measurements + // may be stacked here, resulting in inner arrays of length 2 or greater. + // + // This is effectively a 2D array, where the first dimension indexes the + // time points, and the second the values at that time. + values: Vec>, + + // The type of each value. + // + // This is stored separately to keep track of: + // + // 1. The number of measurements in each value. + // 2. The types in the case where there are zero values. + types: Vec, +} + +impl Values { + /// Construct an empty array of values to hold points of the provided types. + pub fn new(types: impl Iterator) -> Result { + let types: Vec<_> = types.collect(); + if types.is_empty() { + return Err(Error::EmptyDatumTypes); + } + Ok(Self { values: Vec::new(), types }) + } + + /// Push a new value onto the list. + /// + /// This returns an error if the value has the wrong type, including the + /// wrong number of measurements. + pub fn push(&mut self, value: Vec) -> Result<(), Error> { + if value.len() != self.types.len() { + return Err(Error::WrongValueDimensionality { + expected: self.types.len(), + found: value.len(), + }); + } + for (i, (expected_type, actual_type)) in self + .types + .iter() + .zip(value.iter().map(|m| m.datum_type())) + .enumerate() + { + if expected_type != &actual_type { + return Err(Error::WrongDatumType { + value_index: i, + expected_type: *expected_type, + actual_type, + }); + } + } + self.values.push(value); + Ok(()) + } +} /// A timeseries is the result of an OxQL query. #[derive(Clone, Debug)] pub struct Timeseries { pub fields: BTreeMap, - //pub data: Array2, - pub data: Vec, + pub values: Values, +} + +impl Timeseries { + pub fn new<'a>( + fields: impl Iterator, + types: impl Iterator, + ) -> Result { + let fields: BTreeMap<_, _> = fields.collect(); + if fields.is_empty() { + return Err(Error::EmptyFields); + } + Ok(Self { fields, values: Values::new(types)? }) + } + + pub fn key(&self) -> TimeseriesKey { + // NOTE: The key here is _not_ stable, like the one used in the database + // itself to identify timeseries. That's OK, however, because we do not + // serialize this value anywhere -- it's used entirely for the lifetime + // of one query, and then thrown away, and only needs to be consistent + // for that long. + let mut hasher = HighwayHasher::default(); + + // Hash the field names and values. + for (name, value) in self.fields.iter() { + name.hash(&mut hasher); + value.hash(&mut hasher); + } + // Hash the expected datum _types_. + for dt in self.values.types.iter() { + dt.hash(&mut hasher); + } + hasher.finish() + } +} + +#[derive(Clone, Debug)] +pub struct Table { + pub timeseries: Vec, } -pub(crate) trait Transformation { +pub(crate) trait TableTransformation { fn apply( &self, client: &Client, - timeseries: &[Timeseries], - ) -> Result, Error>; + tables: &[Table], + ) -> Result, Error>; }