diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs index 19cef998b3..a3a98506da 100644 --- a/oximeter/db/src/client.rs +++ b/oximeter/db/src/client.rs @@ -319,11 +319,11 @@ impl Client { #[cfg(any(feature = "oxql", test))] fn rewrite_predicate_for_fields( schema: &TimeseriesSchema, - preds: &oxql::FilterItem, + preds: &oxql::Filter, ) -> Result, Error> { // Walk the set of predicates, keeping those which apply to this schema. match preds { - oxql::FilterItem::Atom(atom) => { + oxql::Filter::Atom(atom) => { // If the predicate names a field in this timeseries schema, // return that predicate printed as a string. If not, we return // None. @@ -342,7 +342,7 @@ impl Client { } Ok(Some(atom.to_db_safe_string())) } - oxql::FilterItem::Expr(expr) => { + oxql::Filter::Expr(expr) => { let left_pred = Self::rewrite_predicate_for_fields(schema, &expr.left)?; let right_pred = @@ -363,11 +363,11 @@ impl Client { #[cfg(any(feature = "oxql", test))] fn rewrite_predicate_for_measurements( schema: &TimeseriesSchema, - preds: &oxql::FilterItem, + preds: &oxql::Filter, ) -> Result, Error> { // Walk the set of predicates, keeping those which apply to this schema. match preds { - oxql::FilterItem::Atom(atom) => { + oxql::Filter::Atom(atom) => { // The relevant columns on which we filter depend on the datum // type of the timeseries. All timeseries support "timestamp". let ident = atom.ident.as_str(); @@ -408,7 +408,7 @@ impl Client { } Ok(None) } - oxql::FilterItem::Expr(expr) => { + oxql::Filter::Expr(expr) => { let left_pred = Self::rewrite_predicate_for_measurements( schema, &expr.left, )?; @@ -446,7 +446,8 @@ impl Client { // we can attach the other filters ourselves. let query = oxql::Query::new(query.as_ref())?; - let mut results = BTreeMap::new(); + // We'll collect each of the tables here, one per timeseries _schema_. + let mut tables: BTreeMap = BTreeMap::new(); let preds = query.coalesced_predicates(); for name in query.timeseries_names() { let Some(schema) = self.schema_for_timeseries(name).await? else { @@ -467,21 +468,22 @@ impl Client { continue; } - // Insert an empty result array into the results now. In the case - // where there are no such results, the result of the - // measurement-selection query will be empty, but we still want an - // array here. - for (key, (target, metric)) in info.iter() { - if let Entry::Vacant(entry) = results.entry(key.clone()) { - let new = oxql::Timeseries::new( + // Collect the list of timeseries for this schema. We'll append + // measurements here, and then tack it onto the list of tables at + // the end. Do this now, so we have an empty timeseries even if + // there a no measurements for it. + let mut this_table = info + .iter() + .map(|(key, (target, metric))| { + oxql::Timeseries::new( target.fields.iter().chain(metric.fields.iter()).map( |field| (field.name.clone(), field.value.clone()), ), std::iter::once(metric.datum_type), - )?; - entry.insert(new); - } - } + ) + .map(|timeseries| (key, timeseries)) + }) + .collect::, _>>()?; // First, let's check that we're not going to return an enormous // amount of data here. @@ -500,17 +502,26 @@ impl Client { { let (key, measurement) = model::parse_measurement_from_row(line, schema.datum_type); - results + this_table .get_mut(&key) .expect("All keys inserted above") .values .push(vec![measurement])?; } + + tables + .entry(schema.timeseries_name.clone()) + .or_default() + .extend(this_table.into_values()); } - for (key, timeseries) in results.into_iter() { - println!("{key} {}: {timeseries:#?}", timeseries.key()); + // At this point, let's construct a set of tables and run the results + // through the transformation pipeline. + let out = query.apply_transformations(tables.into_values())?; + for each in out { + println!("{each:#?}"); } + Ok(()) // Ok, here are the relevant data structures // @@ -563,31 +574,13 @@ impl Client { // tables = transform.apply(tables)?; // } // Ok(tables) - - // Get the list of referenced timeseries. - // - // For each of them, construct the all_fields query from its schema. - // - // Construct the coalesced / reduced filter (need to separate those - // applying to fields and those to data) - // - // SELECT the consistent fields for each (timeseries_name, - // timeseries_key) - // - // SELECT the consistent data for each (timeseries_name, timeseries_key) - // - // Staple them together. - // - // Run through the remainder of the actual processing pipeline. - - Ok(()) } #[cfg(any(feature = "oxql", test))] async fn verify_measurement_query_limit<'keys>( &self, schema: &TimeseriesSchema, - preds: Option<&oxql::FilterItem>, + preds: Option<&oxql::Filter>, consistent_keys: impl ExactSizeIterator, ) -> Result<(), Error> { const MAX_ROWS_PER_TIMESERIES_PER_QUERY: usize = 100_000; @@ -614,7 +607,7 @@ impl Client { fn count_measurements_query<'keys>( &self, schema: &TimeseriesSchema, - preds: Option<&oxql::FilterItem>, + preds: Option<&oxql::Filter>, consistent_keys: impl ExactSizeIterator, ) -> Result { self.measurements_query_impl( @@ -629,7 +622,7 @@ impl Client { fn measurements_query<'keys>( &self, schema: &TimeseriesSchema, - preds: Option<&oxql::FilterItem>, + preds: Option<&oxql::Filter>, consistent_keys: impl ExactSizeIterator, ) -> Result { self.measurements_query_impl( @@ -644,7 +637,7 @@ impl Client { fn measurements_query_impl<'keys>( &self, schema: &TimeseriesSchema, - preds: Option<&oxql::FilterItem>, + preds: Option<&oxql::Filter>, consistent_keys: impl ExactSizeIterator, select_count: bool, ) -> Result { @@ -716,7 +709,7 @@ impl Client { fn all_fields_query( &self, schema: &TimeseriesSchema, - preds: Option<&oxql::FilterItem>, + preds: Option<&oxql::Filter>, ) -> Result { // Filter down the fields to those which apply to this timeseries // itself, and rewrite as a DB-safe WHERE clause. diff --git a/oximeter/db/src/oxql/mod.rs b/oximeter/db/src/oxql/mod.rs index a0e48cc151..2402d81d77 100644 --- a/oximeter/db/src/oxql/mod.rs +++ b/oximeter/db/src/oxql/mod.rs @@ -57,6 +57,12 @@ pub enum Error { #[error("OxQL query result returns {count} rows, exceeding max of {max}")] QueryTooLarge { count: usize, max: usize }, + + #[error( + "OxQL table transformation '{op}' expected {expected} \ + tables, but found {found}" + )] + InvalidTableCount { op: &'static str, expected: usize, found: usize }, } /// An error during OxQL processing. @@ -101,7 +107,9 @@ impl fmt::Display for ParseError { mod query; mod transformation; +pub use crate::oxql::transformation::Table; +pub use crate::oxql::transformation::TableTransformation; pub use crate::oxql::transformation::Timeseries; -pub use query::ast::FilterItem; +pub use query::ast::Filter; pub use query::ast::Literal; pub use query::Query; diff --git a/oximeter/db/src/oxql/query/ast.rs b/oximeter/db/src/oxql/query/ast.rs index f730f5adee..35a4ede4e4 100644 --- a/oximeter/db/src/oxql/query/ast.rs +++ b/oximeter/db/src/oxql/query/ast.rs @@ -6,10 +6,17 @@ // Copyright 2024 Oxide Computer Company +use crate::oxql::transformation::Values; +use crate::oxql::Error; +use crate::oxql::Table; +use crate::oxql::TableTransformation; +use crate::oxql::Timeseries; use chrono::DateTime; use chrono::Utc; +use oximeter::Datum; use oximeter::DatumType; use oximeter::FieldType; +use oximeter::FieldValue; use oximeter::TimeseriesName; use std::fmt; use std::net::IpAddr; @@ -24,17 +31,33 @@ pub struct Get { #[derive(Clone, Debug, PartialEq)] pub enum TimeseriesTransformation { - Filter(FilterItem), + Filter(Filter), GroupBy(GroupBy), Join, } +impl TimeseriesTransformation { + pub(crate) fn apply(&self, tables: &[Table]) -> Result, Error> { + match self { + TimeseriesTransformation::Filter(inner) => inner.apply(tables), + TimeseriesTransformation::GroupBy(inner) => inner.apply(tables), + TimeseriesTransformation::Join => Join.apply(tables), + } + } +} + #[derive(Clone, Debug, PartialEq)] pub struct GroupBy { pub identifiers: Vec, pub reducer: Reducer, } +impl TableTransformation for GroupBy { + fn apply(&self, tables: &[Table]) -> Result, Error> { + Err(Error::Unsupported("group_by is not yet implemented".to_string())) + } +} + #[derive(Clone, Debug, Default, PartialEq)] pub enum Reducer { #[default] @@ -48,10 +71,76 @@ pub struct Query { pub transformations: Vec, } +#[derive(Clone, Debug, PartialEq)] +pub enum BasicTableOp { + Get(TimeseriesName), + Filter(Filter), + GroupBy(GroupBy), + Join, +} + +#[derive(Clone, Debug, PartialEq)] +pub struct GroupedTableOp { + pub ops: Vec, +} + +#[derive(Clone, Debug, PartialEq)] +pub enum TableOp { + Basic(BasicTableOp), + Grouped(GroupedTableOp), +} + +impl TableOp { + // Is this a merging table operation, one which takes 2 or more tables, and + // produces 1. + fn is_merge(&self) -> bool { + matches!(self, TableOp::Basic(BasicTableOp::Join)) + } +} + +#[derive(Clone, Debug, PartialEq)] +pub struct Query2 { + pub ops: Vec, +} + +impl Query2 { + // Check that this query (and any subqueries) start with a get table op. + pub(crate) fn starts_with_get(&self) -> bool { + let Some(op) = self.ops.first() else { + return false; + }; + match op { + TableOp::Basic(basic) => matches!(basic, BasicTableOp::Get(_)), + TableOp::Grouped(GroupedTableOp { ops }) => { + ops.iter().all(Query2::starts_with_get) + } + } + } + + // Check that all subqueries (grouped table ops) are followed by a merging + // operation. + pub(crate) fn merge_ops_follow_subqueries(&self) -> bool { + let (mut ops, mut next_ops) = + (self.ops.iter(), self.ops.iter().skip(1)); + while let (Some(op), Some(next_op)) = (ops.next(), next_ops.next()) { + if let TableOp::Grouped(GroupedTableOp { ops: subq_ops }) = op { + if !next_op.is_merge() { + return false; + } + if !subq_ops.iter().all(Query2::merge_ops_follow_subqueries) { + return false; + } + } + } + true + } +} + /// A literal value. #[derive(Clone, Debug, PartialEq)] pub enum Literal { - Integer(i64), + // TODO-performance: An i128 here is a bit gratuitous. + Integer(i128), Double(f64), String(String), Boolean(bool), @@ -82,6 +171,276 @@ impl Literal { } } } + + fn is_compatible_with_datum(&self, datum_type: DatumType) -> bool { + match self { + Literal::Integer(_) => matches!( + datum_type, + DatumType::U8 + | DatumType::I8 + | DatumType::U16 + | DatumType::I16 + | DatumType::U32 + | DatumType::I32 + | DatumType::U64 + | DatumType::I64 + | DatumType::CumulativeI64 + | DatumType::CumulativeU64 + | DatumType::CumulativeF32 + | DatumType::CumulativeF64 + ), + Literal::Double(_) => { + matches!(datum_type, DatumType::F32 | DatumType::F64) + } + Literal::String(_) => matches!(datum_type, DatumType::String), + Literal::Boolean(_) => matches!(datum_type, DatumType::Bool), + Literal::Uuid(_) => false, + Literal::Duration(_) => false, + Literal::Timestamp(_) => false, + Literal::IpAddr(_) => false, + } + } + + fn is_compatible_with_field(&self, field_type: FieldType) -> bool { + match self { + Literal::Integer(_) => matches!( + field_type, + FieldType::U8 + | FieldType::I8 + | FieldType::U16 + | FieldType::I16 + | FieldType::U32 + | FieldType::I32 + | FieldType::U64 + | FieldType::I64 + ), + Literal::Double(_) => false, + Literal::String(_) => matches!(field_type, FieldType::String), + Literal::Boolean(_) => matches!(field_type, FieldType::Bool), + Literal::Uuid(_) => matches!(field_type, FieldType::Uuid), + Literal::Duration(_) => false, + Literal::Timestamp(_) => false, + Literal::IpAddr(_) => matches!(field_type, FieldType::IpAddr), + } + } + + // Apply the comparison op between self and the provided field. + // + // Return None if the comparison cannot be applied, either because the type + // is not compatible or the comparison doesn't make sense. + fn compare_field( + &self, + value: &FieldValue, + cmp: Comparison, + ) -> Option { + if !self.is_compatible_with_field(value.field_type()) { + return None; + } + macro_rules! generate_cmp_match { + ($lhs:ident, $rhs:ident) => { + match cmp { + Comparison::Eq => Some($lhs == $rhs), + Comparison::Ne => Some($lhs != $rhs), + Comparison::Gt => Some($lhs > $rhs), + Comparison::Ge => Some($lhs >= $rhs), + Comparison::Lt => Some($lhs < $rhs), + Comparison::Le => Some($lhs <= $rhs), + Comparison::Like => None, + } + }; + } + // Filters are written like `column == literal`, so the RHS is self, and + // the LHS is the field value + match (self, value) { + (Literal::Boolean(rhs), FieldValue::Bool(lhs)) => { + generate_cmp_match!(rhs, lhs) + } + (Literal::String(rhs), FieldValue::String(lhs)) => match cmp { + Comparison::Eq => Some(lhs == rhs), + Comparison::Ne => Some(lhs != rhs), + Comparison::Gt => Some(lhs > rhs), + Comparison::Ge => Some(lhs >= rhs), + Comparison::Lt => Some(lhs < rhs), + Comparison::Le => Some(lhs <= rhs), + Comparison::Like => todo!(), + }, + (Literal::IpAddr(rhs), FieldValue::IpAddr(lhs)) => { + generate_cmp_match!(rhs, lhs) + } + (Literal::Uuid(rhs), FieldValue::Uuid(lhs)) => { + generate_cmp_match!(rhs, lhs) + } + (Literal::Integer(rhs), FieldValue::I8(lhs)) => { + let lhs = i128::from(*lhs); + let rhs = *rhs; + generate_cmp_match!(lhs, rhs) + } + (Literal::Integer(rhs), FieldValue::U8(lhs)) => { + let lhs = i128::from(*lhs); + let rhs = *rhs; + generate_cmp_match!(lhs, rhs) + } + (Literal::Integer(rhs), FieldValue::I16(lhs)) => { + let lhs = i128::from(*lhs); + let rhs = *rhs; + generate_cmp_match!(lhs, rhs) + } + (Literal::Integer(rhs), FieldValue::U16(lhs)) => { + let lhs = i128::from(*lhs); + let rhs = *rhs; + generate_cmp_match!(lhs, rhs) + } + (Literal::Integer(rhs), FieldValue::U32(lhs)) => { + let lhs = i128::from(*lhs); + let rhs = *rhs; + generate_cmp_match!(lhs, rhs) + } + (Literal::Integer(rhs), FieldValue::I32(lhs)) => { + let lhs = i128::from(*lhs); + let rhs = *rhs; + generate_cmp_match!(lhs, rhs) + } + (Literal::Integer(rhs), FieldValue::U64(lhs)) => { + let lhs = i128::from(*lhs); + let rhs = *rhs; + generate_cmp_match!(lhs, rhs) + } + (Literal::Integer(rhs), FieldValue::I64(lhs)) => { + let lhs = i128::from(*lhs); + let rhs = *rhs; + generate_cmp_match!(lhs, rhs) + } + (_, _) => unreachable!(), + } + } + + fn compare_datum( + &self, + value: &oximeter::Measurement, + cmp: Comparison, + ) -> Option { + if !self.is_compatible_with_datum(value.datum_type()) { + return None; + } + macro_rules! generate_cmp_match { + ($lhs:ident, $rhs:ident) => { + match cmp { + Comparison::Eq => Some($lhs == $rhs), + Comparison::Ne => Some($lhs != $rhs), + Comparison::Gt => Some($lhs > $rhs), + Comparison::Ge => Some($lhs >= $rhs), + Comparison::Lt => Some($lhs < $rhs), + Comparison::Le => Some($lhs <= $rhs), + Comparison::Like => None, + } + }; + } + // Expressions are written like `column == literal`, which means the RHS + // is self, and the LHS is the column's value. + match (self, value.datum()) { + (Literal::Boolean(rhs), Datum::Bool(lhs)) => { + generate_cmp_match!(lhs, rhs) + } + (Literal::String(rhs), Datum::String(lhs)) => match cmp { + Comparison::Eq => Some(lhs == rhs), + Comparison::Ne => Some(lhs != rhs), + Comparison::Gt => Some(lhs > rhs), + Comparison::Ge => Some(lhs >= rhs), + Comparison::Lt => Some(lhs < rhs), + Comparison::Le => Some(lhs <= rhs), + Comparison::Like => todo!(), + }, + (Literal::Integer(rhs), Datum::I8(lhs)) => { + let lhs = i128::from(*lhs); + let rhs = *rhs; + generate_cmp_match!(lhs, rhs) + } + (Literal::Integer(rhs), Datum::U8(lhs)) => { + let lhs = i128::from(*lhs); + let rhs = *rhs; + generate_cmp_match!(lhs, rhs) + } + (Literal::Integer(rhs), Datum::I16(lhs)) => { + let lhs = i128::from(*lhs); + let rhs = *rhs; + generate_cmp_match!(lhs, rhs) + } + (Literal::Integer(rhs), Datum::U16(lhs)) => { + let lhs = i128::from(*lhs); + let rhs = *rhs; + generate_cmp_match!(lhs, rhs) + } + (Literal::Integer(rhs), Datum::U32(lhs)) => { + let lhs = i128::from(*lhs); + let rhs = *rhs; + generate_cmp_match!(lhs, rhs) + } + (Literal::Integer(rhs), Datum::I32(lhs)) => { + let lhs = i128::from(*lhs); + let rhs = *rhs; + generate_cmp_match!(lhs, rhs) + } + (Literal::Integer(rhs), Datum::U64(lhs)) => { + let lhs = i128::from(*lhs); + let rhs = *rhs; + generate_cmp_match!(lhs, rhs) + } + (Literal::Integer(rhs), Datum::I64(lhs)) => { + let lhs = i128::from(*lhs); + let rhs = *rhs; + generate_cmp_match!(lhs, rhs) + } + (Literal::Integer(rhs), Datum::CumulativeI64(lhs)) => { + let lhs = i128::from(lhs.value()); + let rhs = *rhs; + generate_cmp_match!(lhs, rhs) + } + (Literal::Integer(rhs), Datum::CumulativeU64(lhs)) => { + let lhs = i128::from(lhs.value()); + let rhs = *rhs; + generate_cmp_match!(lhs, rhs) + } + (Literal::Double(rhs), Datum::F32(lhs)) => { + let lhs = f64::from(*lhs); + let rhs = *rhs; + generate_cmp_match!(lhs, rhs) + } + (Literal::Double(rhs), Datum::F64(lhs)) => { + generate_cmp_match!(lhs, rhs) + } + (Literal::Double(rhs), Datum::CumulativeF32(lhs)) => { + let lhs = f64::from(lhs.value()); + let rhs = *rhs; + generate_cmp_match!(lhs, rhs) + } + (Literal::Double(rhs), Datum::CumulativeF64(lhs)) => { + let lhs = lhs.value(); + let rhs = *rhs; + generate_cmp_match!(lhs, rhs) + } + (_, _) => unreachable!(), + } + } + + fn compare_timestamp( + &self, + value: &oximeter::Measurement, + cmp: Comparison, + ) -> Option { + let Literal::Timestamp(t) = self else { + return None; + }; + let ts = value.timestamp(); + match cmp { + Comparison::Eq => Some(&ts == t), + Comparison::Ne => Some(&ts != t), + Comparison::Gt => Some(&ts > t), + Comparison::Ge => Some(&ts >= t), + Comparison::Lt => Some(&ts < t), + Comparison::Le => Some(&ts <= t), + Comparison::Like => None, + } + } } pub(crate) mod duration_consts { @@ -250,9 +609,30 @@ impl fmt::Display for Comparison { // NOTE: This should really be extended to a generic binary op expression. #[derive(Clone, Debug, PartialEq)] pub struct FilterExpr { - pub left: Box, + pub left: Box, pub op: LogicalOp, - pub right: Box, + pub right: Box, +} + +impl FilterExpr { + fn filter_field(&self, name: &str, value: &FieldValue) -> Option { + let left = self.left.filter_field(name, value)?; + let right = self.right.filter_field(name, value)?; + match self.op { + LogicalOp::And => Some(left && right), + LogicalOp::Or => Some(left || right), + } + } + + fn filter_values(&self, values: &Values) -> Result { + // TODO: need to take each value depending onthe logical op. Not sure + // how to do that. Seems like rather than returning the _values_ we'd + // need to return a boolean array indicating if it was filtered. Then + // compress with the logical op. + Err(Error::Unsupported( + "filtering expressions not yet implemented".to_string(), + )) + } } impl fmt::Display for FilterExpr { @@ -273,30 +653,37 @@ pub struct FilterAtom { } impl FilterAtom { + // Apply this filter to the provided field. + // + // If the field name differs from the ident, then return `Some(true)` + // because `self` does not filter out the field. + // + // If the name matches and the type of self is compatible, then return + // `Some(x)` where `x` is the logical application of the filter to the + // field. + // + // If the field matches the name, but the type is not compatible, return + // None. + fn filter_field(&self, name: &str, value: &FieldValue) -> Option { + // If the name matches, this filter does _not_ apply, and so we do not + // filter the field. + if self.ident.as_str() != name { + return Some(true); + } + self.expr.compare_field(value, self.cmp).map(|res| { + if self.negated { + !res + } else { + res + } + }) + } + pub(crate) fn expr_type_is_compatible_with_field( &self, field_type: FieldType, ) -> bool { - match self.expr { - Literal::Integer(_) => matches!( - field_type, - FieldType::U8 - | FieldType::I8 - | FieldType::U16 - | FieldType::I16 - | FieldType::U32 - | FieldType::I32 - | FieldType::U64 - | FieldType::I64 - ), - Literal::Double(_) => false, - Literal::String(_) => matches!(field_type, FieldType::String), - Literal::Boolean(_) => matches!(field_type, FieldType::Bool), - Literal::Uuid(_) => matches!(field_type, FieldType::Uuid), - Literal::Duration(_) => false, - Literal::Timestamp(_) => false, - Literal::IpAddr(_) => matches!(field_type, FieldType::IpAddr), - } + self.expr.is_compatible_with_field(field_type) } pub(crate) fn to_db_safe_string(&self) -> String { @@ -310,32 +697,39 @@ impl FilterAtom { &self, datum_type: DatumType, ) -> bool { - match self.expr { - Literal::Integer(_) => matches!( - datum_type, - DatumType::U8 - | DatumType::I8 - | DatumType::U16 - | DatumType::I16 - | DatumType::U32 - | DatumType::I32 - | DatumType::U64 - | DatumType::I64 - | DatumType::CumulativeI64 - | DatumType::CumulativeU64 - | DatumType::CumulativeF32 - | DatumType::CumulativeF64 - ), - Literal::Double(_) => { - matches!(datum_type, DatumType::F32 | DatumType::F64) + self.expr.is_compatible_with_datum(datum_type) + } + + fn filter_values(&self, values: &Values) -> Result { + if values.types().len() > 1 { + return Err(Error::Unsupported(String::from( + "Filtering values with more than \ + one point is not yet supported", + ))); + } + let mut out = Values::new(values.types().iter().copied())?; + for value in values.iter().map(|v| v.first().unwrap()) { + // Check if this filter applies to the timestamps or the values. + let keep = if self.ident.as_str() == "timestamp" { + self.expr + .compare_timestamp(value, self.cmp) + .map(|res| if self.negated { !res } else { res }) + .ok_or_else(|| Error::InvalidTimestampType)? + } else if self.ident.as_str() == "datum" { + self.expr + .compare_datum(value, self.cmp) + .map(|res| if self.negated { !res } else { res }) + .ok_or_else(|| { + Error::InvalidDatumType(value.datum_type()) + })? + } else { + true + }; + if keep { + out.push(vec![value.clone()])?; } - Literal::String(_) => matches!(datum_type, DatumType::String), - Literal::Boolean(_) => matches!(datum_type, DatumType::Bool), - Literal::Uuid(_) => false, - Literal::Duration(_) => false, - Literal::Timestamp(_) => false, - Literal::IpAddr(_) => false, } + Ok(out) } } @@ -352,20 +746,83 @@ impl fmt::Display for FilterAtom { /// expression, such as: `filter hostname == "foo" || (hostname == "bar" /// && id == "baz")`. #[derive(Clone, Debug, PartialEq)] -pub enum FilterItem { +pub enum Filter { Atom(FilterAtom), Expr(FilterExpr), } -impl fmt::Display for FilterItem { +impl Filter { + fn filter_field(&self, name: &str, value: &FieldValue) -> Option { + match self { + Filter::Atom(atom) => atom.filter_field(name, value), + Filter::Expr(expr) => expr.filter_field(name, value), + } + } + + fn filter_values(&self, values: &Values) -> Result { + match self { + Filter::Atom(atom) => atom.filter_values(values), + Filter::Expr(expr) => expr.filter_values(values), + } + } +} + +impl fmt::Display for Filter { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - FilterItem::Atom(inner) => write!(f, "{}", inner), - FilterItem::Expr(inner) => write!(f, "{}", inner), + Filter::Atom(inner) => write!(f, "{}", inner), + Filter::Expr(inner) => write!(f, "{}", inner), } } } +impl TableTransformation for Filter { + fn apply(&self, tables: &[Table]) -> Result, Error> { + if tables.len() != 1 { + return Err(Error::InvalidTableCount { + op: "filter", + expected: 1, + found: tables.len(), + }); + } + let table = tables.get(0).unwrap(); + let mut out = Vec::with_capacity(table.timeseries.len()); + 'timeseries: for timeseries in table.timeseries.iter() { + // Apply the filter to all the fields of the timeseries. + for (name, value) in timeseries.fields.iter() { + match self.filter_field(name, value) { + Some(true) => continue, + Some(false) => break 'timeseries, + None => { + return Err(Error::InvalidFieldType { + field_name: name.to_string(), + expected: value.field_type(), + }) + } + } + } + + // And also to the measurements. + let new_values = self.filter_values(×eries.values)?; + out.push(Timeseries { + fields: timeseries.fields.clone(), + values: new_values, + }); + } + Ok(vec![Table { timeseries: out }]) + } +} + +/// A `join` table operation. +#[derive(Clone, Copy, Debug, PartialEq)] +pub struct Join; + +impl TableTransformation for Join { + fn apply(&self, tables: &[Table]) -> Result, Error> { + Err(Error::Unsupported("join is not yet implemented".to_string())) + } +} + #[cfg(test)] mod tests { use super::duration_consts::*; diff --git a/oximeter/db/src/oxql/query/grammar.rs b/oximeter/db/src/oxql/query/grammar.rs index 0e161ea168..f44fd8d631 100644 --- a/oximeter/db/src/oxql/query/grammar.rs +++ b/oximeter/db/src/oxql/query/grammar.rs @@ -11,13 +11,17 @@ peg::parser! { use crate::oxql::query::ast::Comparison; use crate::oxql::query::ast::FilterAtom; use crate::oxql::query::ast::FilterExpr; - use crate::oxql::query::ast::FilterItem; + use crate::oxql::query::ast::Filter; use crate::oxql::query::ast::Get; use crate::oxql::query::ast::GroupBy; use crate::oxql::query::ast::Ident; use crate::oxql::query::ast::Literal; use crate::oxql::query::ast::LogicalOp; use crate::oxql::query::ast::Query; + use crate::oxql::query::ast::Query2; + use crate::oxql::query::ast::GroupedTableOp; + use crate::oxql::query::ast::BasicTableOp; + use crate::oxql::query::ast::TableOp; use crate::oxql::query::ast::Reducer; use crate::oxql::query::ast::TimeseriesTransformation; use crate::oxql::query::ast::duration_consts; @@ -247,8 +251,20 @@ peg::parser! { pub rule string_literal() -> Literal = s:string_literal_impl() { Literal::String(s) } - pub(in super) rule integer_literal_impl() -> i64 - = n:$("-"? ['0'..='9']+ !['e' | 'E' | '.']) {? n.parse().or(Err("integer literal")) } + pub(in super) rule integer_literal_impl() -> i128 + = n:$("-"? ['0'..='9']+ !['e' | 'E' | '.']) + {? + let Ok(x) = n.parse() else { + return Err("integer literal"); + }; + if x < i128::from(i64::MIN) { + Err("negative overflow") + } else if x > i128::from(u64::MAX) { + Err("positive overflow") + } else { + Ok(x) + } + } /// Parse integer literals. pub rule integer_literal() -> Literal @@ -292,11 +308,11 @@ peg::parser! { // Parse a filter item, which is one logical expression used in a filter // operation. #[cache_left_rec] - pub(in super) rule filter_item() -> FilterItem = precedence! { + pub(in super) rule filter_item() -> Filter = precedence! { // Note: We need to separate the logical operations into different // levels of precedence. left:(@) _? "||" _? right:@ { - FilterItem::Expr(FilterExpr { + Filter::Expr(FilterExpr { left: Box::new(left), op: LogicalOp::Or, right: Box::new(right), @@ -304,14 +320,14 @@ peg::parser! { } -- left:(@) _? "&&" _? right:@ { - FilterItem::Expr(FilterExpr { + Filter::Expr(FilterExpr { left: Box::new(left), op: LogicalOp::And, right: Box::new(right), }) } -- - a:filter_atom() { FilterItem::Atom(a) } + a:filter_atom() { Filter::Atom(a) } "(" e:filter_item() ")" { e } } @@ -334,7 +350,7 @@ peg::parser! { } /// Parse a "filter" table operation. - pub rule filter() -> FilterItem + pub rule filter() -> Filter = "filter" _ item:filter_item() { item @@ -409,6 +425,29 @@ peg::parser! { Query { gets, transformations } } + pub(in super) rule basic_table_op() -> TableOp + = g:"get" _ t:timeseries_name() { TableOp::Basic(BasicTableOp::Get(t)) } + / f:filter() { TableOp::Basic(BasicTableOp::Filter(f)) } + / g:group_by() { TableOp::Basic(BasicTableOp::GroupBy(g)) } + / join() { TableOp::Basic(BasicTableOp::Join) } + + pub(in super) rule grouped_table_op() -> TableOp + = "{" _? ops:(query2() ++ grouped_table_op_delim()) _? "}" + { + TableOp::Grouped(GroupedTableOp { ops }) + } + + /// TODO(ben) This is the way. + pub rule query2() -> Query2 + = ops:(basic_table_op() / grouped_table_op()) **<1,> query_delim() + { + Query2 { ops } + } + + rule grouped_table_op_delim() = quiet!{ _? ";" _? } + rule query_delim() = quiet!{ _? "|" _? } + + // TODO-completeness: We need a way to disambiguate which timeseries a // particular filter (or other component which uses an identifier) // applies to. For example, suppose you have: @@ -497,9 +536,9 @@ fn recognize_escape_sequences(s: &str) -> Option { mod tests { use super::query_parser; use crate::oxql::query::ast::Comparison; + use crate::oxql::query::ast::Filter; use crate::oxql::query::ast::FilterAtom; use crate::oxql::query::ast::FilterExpr; - use crate::oxql::query::ast::FilterItem; use crate::oxql::query::ast::Ident; use crate::oxql::query::ast::Literal; use crate::oxql::query::ast::LogicalOp; @@ -715,7 +754,7 @@ mod tests { #[test] fn test_filter_item_single_atom() { - let atom = FilterItem::Atom(FilterAtom { + let atom = Filter::Atom(FilterAtom { negated: false, ident: Ident("a".to_string()), cmp: Comparison::Eq, @@ -731,7 +770,7 @@ mod tests { fn test_filter_item_single_negated_atom() { assert_eq!( query_parser::filter_item("!(a > 1.)").unwrap(), - FilterItem::Atom(FilterAtom { + Filter::Atom(FilterAtom { negated: true, ident: Ident("a".to_string()), cmp: Comparison::Gt, @@ -744,13 +783,13 @@ mod tests { #[test] fn test_filter_item_two_atoms() { - let left = FilterItem::Atom(FilterAtom { + let left = Filter::Atom(FilterAtom { negated: false, ident: Ident("a".to_string()), cmp: Comparison::Eq, expr: Literal::Boolean(true), }); - let right = FilterItem::Atom(FilterAtom { + let right = Filter::Atom(FilterAtom { negated: false, ident: Ident("a".to_string()), cmp: Comparison::Eq, @@ -758,7 +797,7 @@ mod tests { }); for op in [LogicalOp::And, LogicalOp::Or] { - let expected = FilterItem::Expr(FilterExpr { + let expected = Filter::Expr(FilterExpr { left: Box::new(left.clone()), op, right: Box::new(right.clone()), @@ -773,7 +812,7 @@ mod tests { #[test] fn test_filter_atom_precedence() { - let atom = FilterItem::Atom(FilterAtom { + let atom = Filter::Atom(FilterAtom { negated: false, ident: Ident("a".to_string()), cmp: Comparison::Eq, @@ -783,14 +822,14 @@ mod tests { let parsed = query_parser::filter_item(&as_str).unwrap(); // && should bind more tightly - let FilterItem::Expr(FilterExpr { left, op, right }) = parsed else { + let Filter::Expr(FilterExpr { left, op, right }) = parsed else { unreachable!(); }; assert_eq!(op, LogicalOp::Or); assert_eq!(atom, *left); // Destructure the RHS, and check it. - let FilterItem::Expr(FilterExpr { left, op, right }) = *right else { + let Filter::Expr(FilterExpr { left, op, right }) = *right else { unreachable!(); }; assert_eq!(op, LogicalOp::And); @@ -800,7 +839,7 @@ mod tests { #[test] fn test_filter_atom_overridden_precedence() { - let atom = FilterItem::Atom(FilterAtom { + let atom = Filter::Atom(FilterAtom { negated: false, ident: Ident("a".to_string()), cmp: Comparison::Eq, @@ -811,14 +850,14 @@ mod tests { // Now, || should bind more tightly, so we should have (a && b) at the // top-level, where b is the test atom. - let FilterItem::Expr(FilterExpr { left, op, right }) = parsed else { + let Filter::Expr(FilterExpr { left, op, right }) = parsed else { unreachable!(); }; assert_eq!(op, LogicalOp::And); assert_eq!(atom, *right); // Destructure the LHS and check it. - let FilterItem::Expr(FilterExpr { left, op, right }) = *left else { + let Filter::Expr(FilterExpr { left, op, right }) = *left else { unreachable!(); }; assert_eq!(op, LogicalOp::Or); @@ -983,4 +1022,25 @@ mod tests { assert!(query_parser::ip_literal("::g").is_err()); assert!(query_parser::ip_literal(":::").is_err()); } + + #[test] + fn test_query2() { + println!("{:#?}", query_parser::query2("get foo:bar")); + println!("{:#?}", query_parser::query2("get foo:bar | filter x == 0")); + println!( + "{:#?}", + query_parser::query2("{ get foo:bar | filter x == 0 }") + ); + println!( + "{:#?}", + query_parser::query2("{ get foo:bar | filter x == 0; get x:y }") + ); + println!( + "{:#?}", + query_parser::query2( + "{ get foo:bar | filter x == 0; get x:y } | join" + ) + ); + println!("{:#?}", query_parser::query2("{ get foo:bar | filter x == 0; get x:y } | join | group_by [a, b, c]")); + } } diff --git a/oximeter/db/src/oxql/query/mod.rs b/oximeter/db/src/oxql/query/mod.rs index 98fb7cc01a..8d05d0fc33 100644 --- a/oximeter/db/src/oxql/query/mod.rs +++ b/oximeter/db/src/oxql/query/mod.rs @@ -13,7 +13,9 @@ use crate::oxql::Error; use crate::oxql::ParseError; use crate::TimeseriesName; -use self::ast::{FilterExpr, FilterItem, LogicalOp, TimeseriesTransformation}; +use self::ast::{Filter, FilterExpr, LogicalOp, TimeseriesTransformation}; + +use super::transformation::Table; #[derive(Clone, Debug)] pub struct Query { @@ -61,7 +63,36 @@ impl Query { /// /// Note that this may return `None`, in the case where there are zero /// predicates of any kind. - pub fn coalesced_predicates(&self) -> Option { + // + // NOTE: This is too aggressive. + // + // For example, we can't pull predicates through a group_by necessarily. + // Consider: + // + // `get a:b | group_by [c] | filter d == 0` + // + // If we run the query with the group_by first, then the aggregation will + // have already applied to `d`. So it's "mixed in" with the groups. + // + // If we run the filter first, then it's not included in the group_by. + // + // Specifically, I think we can only pull filter predicates through a + // group_by if the filter applies to (1) the timestamps or (2) one of the + // columns named in the group_by itself. + // + // Let's make this easier: + // + // A filter can be pushed through a group_by if every column it refers to is + // in the group_by as well. If it refers to any fields _not_ in the group_by + // list, that's an error -- pushing it through would change the results, + // and keeping it after is a no-op. So let's just fail it. + // + // What about value filters? + // + // - timestamp: I guess it depends on how we reduce across time, but let's + // just fail it now. + // - datum: same thing. + pub fn coalesced_predicates(&self) -> Option { self.parsed .transformations .iter() @@ -73,13 +104,25 @@ impl Query { } }) .reduce(|accum, elem| { - FilterItem::Expr(FilterExpr { + Filter::Expr(FilterExpr { left: Box::new(accum), op: LogicalOp::And, right: Box::new(elem), }) }) } + + /// Apply each transformation in `self`'s pipeline. + pub fn apply_transformations( + &self, + tables: impl Iterator, + ) -> Result, Error> { + let mut tables: Vec<_> = tables.collect(); + for tr in self.parsed.transformations.iter() { + tables = tr.apply(&tables)?; + } + Ok(tables) + } } // What are we doing here? diff --git a/oximeter/db/src/oxql/transformation.rs b/oximeter/db/src/oxql/transformation.rs index 2ae056b59f..9dc3c6426d 100644 --- a/oximeter/db/src/oxql/transformation.rs +++ b/oximeter/db/src/oxql/transformation.rs @@ -7,7 +7,6 @@ // Copyright 2024 Oxide Computer use super::Error; -use crate::Client; use crate::TimeseriesKey; use highway::HighwayHasher; use oximeter::DatumType; @@ -80,6 +79,14 @@ impl Values { self.values.push(value); Ok(()) } + + pub(crate) fn types(&self) -> &[DatumType] { + &self.types + } + + pub(crate) fn iter(&self) -> impl Iterator> { + self.values.iter() + } } /// A timeseries is the result of an OxQL query. @@ -127,10 +134,22 @@ pub struct Table { pub timeseries: Vec, } -pub(crate) trait TableTransformation { - fn apply( - &self, - client: &Client, - tables: &[Table], - ) -> Result, Error>; +impl Table { + pub fn push(&mut self, timeseries: Timeseries) { + self.timeseries.push(timeseries) + } + + pub fn extend(&mut self, timeseries: impl Iterator) { + self.timeseries.extend(timeseries) + } +} + +impl Default for Table { + fn default() -> Self { + Self { timeseries: Vec::new() } + } +} + +pub trait TableTransformation { + fn apply(&self, tables: &[Table]) -> Result, Error>; }