diff --git a/crates/counter-agg/src/lib.rs b/crates/counter-agg/src/lib.rs index 889247c6..6ad0a9a7 100644 --- a/crates/counter-agg/src/lib.rs +++ b/crates/counter-agg/src/lib.rs @@ -1,11 +1,10 @@ - use time_series::TSPoint; use stats_agg::{XYPair, stats2d::StatsSummary2D}; -use serde::{Deserialize, Serialize}; use std::fmt; pub mod range; +pub mod stable; #[cfg(test)] mod tests; @@ -23,7 +22,7 @@ pub enum CounterError{ // nonsensical results rather than unsound behavior, garbage in garbage out. // But much better if we can validate at deserialization. We can do that in // the builder if we want. -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct MetricSummary { // TODO invariants? pub first: TSPoint, @@ -43,7 +42,7 @@ pub struct MetricSummary { // out upon garbage in. pub stats: StatsSummary2D, // TODO See TODOs in I64Range about protecting from deserialization. - pub bounds: Option, + pub bounds: range::I64Range, } // Note that this can lose fidelity with the timestamp, but it would only lose it in the microseconds, @@ -66,8 +65,8 @@ fn to_seconds(t: f64)-> f64{ /// it is treated as a reset of the counter and the previous value is added to the "true value" of the /// counter at that timestamp. impl MetricSummary { - pub fn new(pt: &TSPoint, bounds:Option) -> MetricSummary { - let mut n = MetricSummary{ + pub fn new(pt: &TSPoint, bounds: range::I64Range) -> MetricSummary { + let mut n = MetricSummary { first: *pt, second: *pt, penultimate: *pt, @@ -150,7 +149,7 @@ impl MetricSummary { self.num_changes += incoming.num_changes; self.stats = self.stats.combine(stats).unwrap(); - self.bounds_extend(incoming.bounds); + self.bounds_extend(incoming.bounds.clone()); Ok(()) } @@ -204,31 +203,45 @@ impl MetricSummary { } pub fn bounds_valid(&self) -> bool { - match self.bounds{ - None => true, // unbounded contains everything - Some(b) => b.contains(self.last.ts) && b.contains(self.first.ts) - } + self.bounds.contains(self.last.ts) && self.bounds.contains(self.first.ts) } - fn bounds_extend(&mut self, in_bounds:Option){ - match (self.bounds, in_bounds) { - (None, _) => {self.bounds = in_bounds}, - (_, None) => {}, - (Some(mut a), Some(b)) => { - a.extend(&b); - self.bounds = Some(a); - } - }; + fn bounds_extend(&mut self, in_bounds: range::I64Range) { + // TODO(epg): This preserves existing behavior, which seems odd to me: + // match (self.bounds, in_bounds) { + // If we're unbounded, narrow to in_bounds (that's not "extend"). + // (None, _) => self.bounds = in_bounds, + if self.bounds.is_infinite() { + self.bounds = in_bounds; + } + // Else if in_bounds is infinite, ignore it! + // (_, None) => {} + else if in_bounds.is_infinite() { + } + // Else, widen or narrow at both ends as necessary (makes sense but inconsistent!). + // (Some(mut a), Some(b)) => { + else { + self.bounds.extend(&in_bounds); + } } // based on: https://github.com/timescale/promscale_extension/blob/d51a0958442f66cb78d38b584a10100f0d278298/src/lib.rs#L208, // which is based on: // https://github.com/prometheus/prometheus/blob/e5ffa8c9a08a5ee4185271c8c26051ddc1388b7a/promql/functions.go#L59 pub fn prometheus_delta(&self) -> Result, CounterError>{ - if self.bounds.is_none() || !self.bounds_valid() || self.bounds.unwrap().has_infinite() { + if !self.bounds_valid() { return Err(CounterError::BoundsInvalid); } + let (left, right); + match self.bounds.both() { + None => return Err(CounterError::BoundsInvalid), + Some((l, r)) => { + left = l; + right = r; + } + } //must have at least 2 values - if self.single_value() || self.bounds.unwrap().is_singleton() { //technically, the is_singleton check is redundant, it's included for clarity (any singleton bound that is valid can only be one point) + if self.single_value() || self.bounds.is_singleton() { + //technically, the is_singleton check is redundant, it's included for clarity (any singleton bound that is valid can only be one point) return Ok(None); } @@ -236,10 +249,10 @@ impl MetricSummary { // all calculated durations in seconds in Prom implementation, so we'll do that here. // we can unwrap all of the bounds accesses as they are guaranteed to be there from the checks above - let mut duration_to_start = to_seconds((self.first.ts - self.bounds.unwrap().left.unwrap()) as f64); + let mut duration_to_start = to_seconds((self.first.ts - left) as f64); /* bounds stores [L,H), but Prom takes the duration using the inclusive range [L, H-1ms]. Subtract an extra ms, ours is in microseconds. */ - let duration_to_end = to_seconds((self.bounds.unwrap().right.unwrap() - self.last.ts - 1_000) as f64); + let duration_to_end = to_seconds((right - self.last.ts - 1_000) as f64); let sampled_interval = self.time_delta(); let avg_duration_between_samples = sampled_interval / (self.stats.n - 1) as f64; // don't have to worry about divide by zero because we know we have at least 2 values from the above. @@ -282,9 +295,9 @@ impl MetricSummary { return Ok(None); } let delta = delta.unwrap(); - let bounds = self.bounds.unwrap() ; // if we got through delta without error then we have bounds + let bounds = &self.bounds; // if we got through delta without error then we have bounds /* bounds stores [L,H), but Prom takes the duration using the inclusive range [L, H-1ms]. So subtract an extra ms from the duration*/ - let duration = bounds.duration().unwrap() - 1_000; + let duration = bounds.duration() - 1_000; if duration <= 0 { return Ok(None); // if we have a total duration under a ms, it's less than prom could deal with so we return none. } @@ -304,11 +317,11 @@ impl fmt::Display for CounterError { } } -#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +#[derive(Clone, Debug, PartialEq)] pub struct GaugeSummaryBuilder(MetricSummary); impl GaugeSummaryBuilder { - pub fn new(pt: &TSPoint, bounds: Option) -> Self { + pub fn new(pt: &TSPoint, bounds: range::I64Range) -> Self { Self(MetricSummary::new(pt, bounds)) } @@ -322,7 +335,7 @@ impl GaugeSummaryBuilder { self.0.combine(incoming) } - pub fn set_bounds(&mut self, bounds: Option) { + pub fn set_bounds(&mut self, bounds: range::I64Range) { self.0.bounds = bounds; } @@ -346,11 +359,11 @@ impl From for GaugeSummaryBuilder { } } -#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +#[derive(Clone, Debug, PartialEq)] pub struct CounterSummaryBuilder(MetricSummary); impl CounterSummaryBuilder { - pub fn new(pt: &TSPoint, bounds: Option) -> Self { + pub fn new(pt: &TSPoint, bounds: range::I64Range) -> Self { Self(MetricSummary::new(pt, bounds)) } @@ -366,7 +379,7 @@ impl CounterSummaryBuilder { self.0.combine(incoming) } - pub fn set_bounds(&mut self, bounds: Option) { + pub fn set_bounds(&mut self, bounds: range::I64Range) { self.0.bounds = bounds; } diff --git a/crates/counter-agg/src/range.rs b/crates/counter-agg/src/range.rs index e6782f30..5e2652c7 100644 --- a/crates/counter-agg/src/range.rs +++ b/crates/counter-agg/src/range.rs @@ -6,39 +6,77 @@ use serde::{Deserialize, Serialize}; // we are a discrete type so translating is simple [), this enforces equality // between ranges like [0, 10) and [0, 9] // None values denote infinite bounds on that side -#[derive(Debug, PartialEq, Copy, Clone, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] #[repr(C)] pub struct I64Range { - pub left: Option, - pub right: Option + left: Option, + right: Option } - impl I64Range { - pub fn has_infinite(&self)-> bool{ - self.left.is_none() || self.right.is_none() + /// Panics if `left` > `right`. + pub fn new(left: Option, right: Option) -> Self { + let range = Self { left, right }; + assert!(range.is_valid()); + range } - // TODO See TODO below about range validity. Right now we don't care - // much. If we start to care, move the caring to `new` and `extend` - // methods. That will allow this crate to protect the integrity of - // MetricSummary and I64Range in the face of the extension needing to be - // able to construct them from raw (and therefore potentially - // corrupt) inputs. - fn is_valid(&self) -> bool { - match (self.left, self.right) { - (Some(a), Some(b)) => a <= b, - _ => true, + pub fn infinite() -> Self { + Self { + left: None, + right: None, } } - pub fn is_singleton(&self) -> bool{ + /// Return `Some([left]))` when it is finite, else `None`. + #[inline] + pub fn left(&self) -> Option { + self.left + } + + /// Return `Some([right]))` when it is finite, else `None`. + #[inline] + pub fn right(&self) -> Option { + self.right + } + + /// Return `Some(([left], [right]))` when both are finite, else `None`. + pub fn both(&self) -> Option<(i64, i64)> { match (self.left, self.right) { - (Some(a), Some(b)) => a == b, - _ => false, + (Some(left), Some(right)) => Some((left, right)), + _ => None, } } + pub fn is_infinite_either(&self) -> bool { + self.is_infinite_left() || self.is_infinite_right() + } + + pub fn is_infinite(&self) -> bool { + self.is_infinite_left() && self.is_infinite_right() + } + + pub fn is_infinite_left(&self) -> bool { + self.left.is_none() + } + + pub fn is_infinite_right(&self) -> bool { + self.right.is_none() + } + + // TODO See TODO below about range validity. + fn is_valid(&self) -> bool { + self.both() + .map(|(left, right)| left <= right) + .unwrap_or(true) + } + + pub fn is_singleton(&self) -> bool { + self.both() + .map(|(left, right)| left == right) + .unwrap_or(false) + } + pub fn extend(&mut self, other: &Self) { // TODO: What should extend do with invalid ranges on either side? right now it treats them as if they are real... self.left = match (self.left, other.left) { @@ -61,15 +99,11 @@ impl I64Range { (None, None) => true, } } - - // pub fn contains(&self, other: I64Range) -> bool{ - // unimplemented!() - // } - pub fn duration(&self) -> Option { - if self.has_infinite() || !self.is_valid() { - return None - } - Some(self.right.unwrap() - self.left.unwrap()) + + /// Panics if either `left` or `right` is infinite. + pub fn duration(&self) -> i64 { + let (left, right) = self.both().expect("infinite duration"); + right - left } } @@ -131,7 +165,6 @@ mod tests { let normal = I64Range{left:Some(2), right:Some(9)}; weird.extend(&normal); assert_eq!(weird, I64Range{left:Some(-6), right:Some(9)}); - } #[test] @@ -163,21 +196,33 @@ mod tests { #[test] fn test_duration(){ let a = I64Range{left:Some(3), right:Some(7)}; - assert_eq!(a.duration().unwrap(), 4); + assert_eq!(a.duration(), 4); let a = I64Range{left:Some(-3), right:Some(7)}; - assert_eq!(a.duration().unwrap(), 10); - let a = I64Range{left:None, right:Some(7)}; - assert_eq!(a.duration(), None); - let a = I64Range{left:Some(3), right:None}; - assert_eq!(a.duration(), None); - //invalid ranges return None durations as well - let a = I64Range{left:Some(3), right:Some(0)}; - assert_eq!(a.duration(), None); + assert_eq!(a.duration(), 10); + } + + #[test] + #[should_panic(expected = "infinite duration")] + fn duration_infinite_left() { + I64Range{left:None, right:Some(7)} + .duration(); + } + + #[test] + #[should_panic(expected = "infinite duration")] + fn duration_infinite_right() { + I64Range{left:Some(-1), right:None} + .duration(); } #[test] - fn test_checks(){ + #[should_panic(expected = "infinite duration")] + fn duration_infinite_both() { + I64Range::infinite().duration(); + } + #[test] + fn test_checks() { let a = I64Range{left:Some(2), right:Some(5)}; assert!(a.is_valid()); assert!(!a.is_singleton()); @@ -190,9 +235,30 @@ mod tests { let a = I64Range{left:Some(2), right:Some(2)}; assert!(a.is_valid()); assert!(a.is_singleton()); - assert_eq!(a.duration().unwrap(), 0); + assert_eq!(a.duration(), 0); let a = I64Range{left:Some(0), right:Some(-10)}; assert!(!a.is_valid()); assert!(!a.is_singleton()); } + + #[test] + fn infinite() { + let range = I64Range { left: None, right: None }; + assert!(range.contains(i64::MIN)); + assert!(range.contains(i64::MIN + 1)); + assert!(range.contains(i64::MAX)); + assert!(range.contains(i64::MAX - 1)); + } + + #[test] + fn exclude_i64_max() { + let range = I64Range { left: Some(i64::MIN), right: Some(i64::MAX) }; + assert!(range.contains(i64::MIN)); + // TODO If we don't need to exclude i64::MAX, we can simplify even + // further and make right non-Option (left already doesn't need to be + // Option as None and Some(i64::MIN) are handled the same way. + // How important is it that we draw the line at + // 9,223,372,036,854,775,807 rather than 9,223,372,036,854,775,806? + assert!(!range.contains(i64::MAX)); + } } diff --git a/crates/counter-agg/src/stable.rs b/crates/counter-agg/src/stable.rs new file mode 100644 index 00000000..c3b336cc --- /dev/null +++ b/crates/counter-agg/src/stable.rs @@ -0,0 +1,62 @@ +//! These data structures are stable, meaning they may not change even in +//! their layout in memory, as the raw bytes-in-memory are serialized and +//! exchanged by PostgreSQL. +//! +//! Note that [MetricSummary] is already in violation, as it does not lock in +//! a memory representation and the Rust project makes no guarantees to +//! preserve this across releases of the compiler. We should bump its +//! serialization version and repr(C) the new one. + +use serde::{Deserialize, Serialize}; + +use stats_agg::stats2d::StatsSummary2D; +use time_series::TSPoint; + +use crate::range::I64Range; + +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +// TODO Our one serialization test (counter_byte_io) passes, but is that just luck? +//#[repr(C)] +pub struct MetricSummary { + pub first: TSPoint, + pub second: TSPoint, + pub penultimate: TSPoint, + pub last: TSPoint, + pub reset_sum: f64, + pub num_resets: u64, + pub num_changes: u64, + pub stats: StatsSummary2D, + pub bounds: Option, +} + +impl From for MetricSummary { + fn from(range: super::MetricSummary) -> Self { + Self { + first: range.first, + second: range.second, + penultimate: range.penultimate, + last: range.last, + reset_sum: range.reset_sum, + num_resets: range.num_resets, + num_changes: range.num_changes, + stats: range.stats, + bounds: if range.bounds.is_infinite() { None } else { Some(range.bounds) }, + } + } +} + +impl From for super::MetricSummary { + fn from(stable: MetricSummary) -> Self { + Self { + first: stable.first, + second: stable.second, + penultimate: stable.penultimate, + last: stable.last, + reset_sum: stable.reset_sum, + num_resets: stable.num_resets, + num_changes: stable.num_changes, + stats: stable.stats, + bounds: stable.bounds.unwrap_or_else(I64Range::infinite), + } + } +} diff --git a/crates/counter-agg/src/tests.rs b/crates/counter-agg/src/tests.rs index 5393ef6b..8e207a85 100644 --- a/crates/counter-agg/src/tests.rs +++ b/crates/counter-agg/src/tests.rs @@ -26,7 +26,7 @@ #[test] fn create() { let testpt = TSPoint{ts: 0, val:0.0}; - let test = CounterSummaryBuilder::new(&testpt, None).build(); + let test = CounterSummaryBuilder::new(&testpt, I64Range::infinite()).build(); assert_eq!(test.first, testpt); assert_eq!(test.second, testpt); assert_eq!(test.penultimate, testpt); @@ -35,7 +35,7 @@ } #[test] fn adding_point() { - let mut test = CounterSummaryBuilder::new( &TSPoint{ts: 0, val:0.0}, None); + let mut test = CounterSummaryBuilder::new( &TSPoint{ts: 0, val:0.0}, I64Range::infinite()); let testpt = TSPoint{ts:5, val:10.0}; test.add_point(&testpt).unwrap(); @@ -54,7 +54,7 @@ #[test] fn adding_points_to_counter() { let startpt = TSPoint{ts: 0, val:0.0}; - let mut summary = CounterSummaryBuilder::new( &startpt, None); + let mut summary = CounterSummaryBuilder::new( &startpt, I64Range::infinite()); summary.add_point(&TSPoint{ts: 5, val:10.0}).unwrap(); summary.add_point(&TSPoint{ts: 10, val:20.0}).unwrap(); @@ -80,7 +80,7 @@ #[test] fn adding_out_of_order_counter(){ let startpt = TSPoint{ts: 0, val:0.0}; - let mut summary = CounterSummaryBuilder::new( &startpt, None); + let mut summary = CounterSummaryBuilder::new( &startpt, I64Range::infinite()); summary.add_point(&TSPoint{ts: 5, val:10.0}).unwrap(); assert_eq!(CounterError::OrderError, summary.add_point(&TSPoint{ts: 2, val:9.0}).unwrap_err()); @@ -90,7 +90,7 @@ #[test] fn test_counter_delta(){ let startpt = &TSPoint{ts: 0, val:10.0}; - let mut summary = CounterSummaryBuilder::new(&startpt, None); + let mut summary = CounterSummaryBuilder::new(&startpt, I64Range::infinite()); // with one point assert_relative_eq!(summary.clone().build().delta(), 0.0); @@ -106,7 +106,7 @@ #[test] fn test_combine(){ - let mut summary = CounterSummaryBuilder::new( &TSPoint{ts: 0, val:0.0}, None); + let mut summary = CounterSummaryBuilder::new( &TSPoint{ts: 0, val:0.0}, I64Range::infinite()); summary.add_point(&TSPoint{ts: 5, val:10.0}).unwrap(); summary.add_point(&TSPoint{ts: 10, val:20.0}).unwrap(); summary.add_point(&TSPoint{ts: 15, val:30.0}).unwrap(); @@ -115,11 +115,11 @@ summary.add_point(&TSPoint{ts: 30, val:40.0}).unwrap(); - let mut part1 = CounterSummaryBuilder::new(&TSPoint{ts: 0, val:0.0}, None); + let mut part1 = CounterSummaryBuilder::new(&TSPoint{ts: 0, val:0.0}, I64Range::infinite()); part1.add_point(&TSPoint{ts: 5, val:10.0}).unwrap(); part1.add_point(&TSPoint{ts: 10, val:20.0}).unwrap(); - let mut part2 = CounterSummaryBuilder::new(&TSPoint{ts: 15, val:30.0}, None); + let mut part2 = CounterSummaryBuilder::new(&TSPoint{ts: 15, val:30.0}, I64Range::infinite()); part2.add_point(&TSPoint{ts: 20, val:50.0}).unwrap(); part2.add_point(&TSPoint{ts: 25, val:10.0}).unwrap(); part2.add_point(&TSPoint{ts: 30, val:40.0}).unwrap(); @@ -135,13 +135,13 @@ #[test] fn test_combine_with_small_summary(){ - let mut summary = CounterSummaryBuilder::new( &TSPoint{ts: 0, val:50.0}, None); + let mut summary = CounterSummaryBuilder::new( &TSPoint{ts: 0, val:50.0}, I64Range::infinite()); summary.add_point(&TSPoint{ts: 25, val:10.0}).unwrap(); // also tests that a reset at the boundary works correctly - let part1 = CounterSummaryBuilder::new( &TSPoint{ts: 0, val:50.0}, None); - let part2 = CounterSummaryBuilder::new( &TSPoint{ts: 25, val:10.0}, None); + let part1 = CounterSummaryBuilder::new( &TSPoint{ts: 0, val:50.0}, I64Range::infinite()); + let part2 = CounterSummaryBuilder::new( &TSPoint{ts: 25, val:10.0}, I64Range::infinite()); let mut combined = part1.clone(); combined.combine(&part2.clone().build()).unwrap(); @@ -154,7 +154,7 @@ #[test] fn test_multiple_resets() { let startpt = TSPoint{ts: 0, val:0.0}; - let mut summary = CounterSummaryBuilder::new( &startpt, None); + let mut summary = CounterSummaryBuilder::new( &startpt, I64Range::infinite()); summary.add_point(&TSPoint{ts: 5, val:10.0}).unwrap(); summary.add_point(&TSPoint{ts: 10, val:20.0}).unwrap(); @@ -176,11 +176,11 @@ // non obvious one here, sy should be the sum of all values including the resets at the time they were added. assert_relative_eq!(summary.stats.sum().unwrap().y, 0.0 + 10.0 + 20.0 + 30.0 + 60.0 + 80.0 + 100.0); - let mut part1 = CounterSummaryBuilder::new(&TSPoint{ts: 0, val:0.0}, None); + let mut part1 = CounterSummaryBuilder::new(&TSPoint{ts: 0, val:0.0}, I64Range::infinite()); part1.add_point(&TSPoint{ts: 5, val:10.0}).unwrap(); part1.add_point(&TSPoint{ts: 10, val:20.0}).unwrap(); - let mut part2 = CounterSummaryBuilder::new(&TSPoint{ts: 15, val:10.0}, None); + let mut part2 = CounterSummaryBuilder::new(&TSPoint{ts: 15, val:10.0}, I64Range::infinite()); part2.add_point(&TSPoint{ts: 20, val:40.0}).unwrap(); part2.add_point(&TSPoint{ts: 25, val:20.0}).unwrap(); part2.add_point(&TSPoint{ts: 30, val:40.0}).unwrap(); @@ -197,7 +197,7 @@ #[test] fn test_extraction_single_point() { let startpt = TSPoint{ts: 20, val:10.0}; - let summary = CounterSummaryBuilder::new( &startpt, None).build(); + let summary = CounterSummaryBuilder::new( &startpt, I64Range::infinite()).build(); assert_relative_eq!(summary.delta(), 0.0); assert_eq!(summary.rate(), None); assert_relative_eq!(summary.idelta_left(), 0.0); @@ -210,7 +210,7 @@ #[test] fn test_extraction_simple(){ - let mut summary = CounterSummaryBuilder::new(&TSPoint{ts: 0, val:0.0}, None); + let mut summary = CounterSummaryBuilder::new(&TSPoint{ts: 0, val:0.0}, I64Range::infinite()); summary.add_point(&TSPoint{ts: 5, val:5.0}).unwrap(); summary.add_point(&TSPoint{ts: 10, val:20.0}).unwrap(); summary.add_point(&TSPoint{ts: 15, val: 30.0}).unwrap(); @@ -228,7 +228,7 @@ #[test] fn test_extraction_with_resets(){ - let mut summary = CounterSummaryBuilder::new(&TSPoint{ts: 0, val: 10.0}, None); + let mut summary = CounterSummaryBuilder::new(&TSPoint{ts: 0, val: 10.0}, I64Range::infinite()); summary.add_point(&TSPoint{ts: 5, val:5.0}).unwrap(); summary.add_point(&TSPoint{ts: 10, val:30.0}).unwrap(); summary.add_point(&TSPoint{ts: 15, val: 15.0}).unwrap(); @@ -246,14 +246,14 @@ #[test] fn test_bounds(){ - let summary = CounterSummaryBuilder::new(&TSPoint{ts: 0, val: 10.0}, None); + let summary = CounterSummaryBuilder::new(&TSPoint{ts: 0, val: 10.0}, I64Range::infinite()); assert!(summary.bounds_valid()); // no bound is fine. - let summary = CounterSummaryBuilder::new(&TSPoint{ts: 0, val: 10.0}, Some(I64Range{left:Some(5), right:Some(10)})); + let summary = CounterSummaryBuilder::new(&TSPoint{ts: 0, val: 10.0}, I64Range::new(Some(5), Some(10))); assert!(!summary.bounds_valid()); // wrong bound not // left bound inclusive - let mut summary = CounterSummaryBuilder::new(&TSPoint{ts: 0, val: 10.0}, Some(I64Range{left:Some(0), right:Some(10)})); + let mut summary = CounterSummaryBuilder::new(&TSPoint{ts: 0, val: 10.0}, I64Range::new(Some(0), Some(10))); assert!(summary.bounds_valid()); summary.add_point(&TSPoint{ts: 5, val:5.0}).unwrap(); assert!(summary.bounds_valid()); @@ -264,57 +264,57 @@ assert!(!summary.bounds_valid()); // slightly weird case here... two invalid bounds can produce a validly bounded object once the bounds are combined, this is a bit weird, but seems like it's the correct behavior - let summary2 = CounterSummaryBuilder::new(&TSPoint{ts: 15, val: 10.0}, Some(I64Range{left:Some(20), right:Some(30)})); + let summary2 = CounterSummaryBuilder::new(&TSPoint{ts: 15, val: 10.0}, I64Range::new(Some(20), Some(30))); summary.combine(&summary2.build()).unwrap(); assert!(summary.bounds_valid()); - assert_eq!(summary.clone().build().bounds.unwrap(), I64Range{left:Some(0), right:Some(30)}); + assert_eq!(summary.clone().build().bounds, I64Range::new(Some(0), Some(30))); // two of the same valid bounds remain the same and valid - let summary2 = CounterSummaryBuilder::new(&TSPoint{ts: 20, val: 10.0}, Some(I64Range{left:Some(0), right:Some(30)})); + let summary2 = CounterSummaryBuilder::new(&TSPoint{ts: 20, val: 10.0}, I64Range::new(Some(0), Some(30))); summary.combine(&summary2.build()).unwrap(); assert!(summary.bounds_valid()); - assert_eq!(summary.clone().build().bounds.unwrap(), I64Range{left:Some(0), right:Some(30)}); + assert_eq!(summary.clone().build().bounds, I64Range::new(Some(0), Some(30))); // combining with unbounded ones is fine, but the bounds survive - let summary2 = CounterSummaryBuilder::new(&TSPoint{ts: 25, val: 10.0}, None); + let summary2 = CounterSummaryBuilder::new(&TSPoint{ts: 25, val: 10.0}, I64Range::infinite()); summary.combine(&summary2.build()).unwrap(); assert!(summary.bounds_valid()); - assert_eq!(summary.clone().build().bounds.unwrap(), I64Range{left:Some(0), right:Some(30)}); + assert_eq!(summary.clone().build().bounds, I64Range::new(Some(0), Some(30))); // and combining bounds that do not span are still invalid - let summary2 = CounterSummaryBuilder::new(&TSPoint{ts: 35, val: 10.0}, Some(I64Range{left:Some(0), right:Some(32)})); + let summary2 = CounterSummaryBuilder::new(&TSPoint{ts: 35, val: 10.0}, I64Range::new(Some(0), Some(32))); summary.combine(&summary2.build()).unwrap(); assert!(!summary.bounds_valid()); - assert_eq!(summary.build().bounds.unwrap(), I64Range{left:Some(0), right:Some(32)}); + assert_eq!(summary.build().bounds, I64Range::new(Some(0), Some(32))); // combining unbounded with bounded ones is fine, but the bounds survive - let mut summary = CounterSummaryBuilder::new(&TSPoint{ts: 0, val: 10.0}, None); - let summary2 = CounterSummaryBuilder::new(&TSPoint{ts: 25, val: 10.0}, Some(I64Range{left:Some(0), right:Some(30)})); + let mut summary = CounterSummaryBuilder::new(&TSPoint{ts: 0, val: 10.0}, I64Range::infinite()); + let summary2 = CounterSummaryBuilder::new(&TSPoint{ts: 25, val: 10.0}, I64Range::new(Some(0), Some(30))); summary.combine(&summary2.build()).unwrap(); assert!(summary.bounds_valid()); - assert_eq!(summary.build().bounds.unwrap(), I64Range{left:Some(0), right:Some(30)}); + assert_eq!(summary.build().bounds, I64Range::new(Some(0), Some(30))); } #[test] fn test_prometheus_extrapolation_simple(){ //error on lack of bounds provided - let summary = CounterSummaryBuilder::new(&TSPoint{ts: 5000, val:15.0}, None); + let summary = CounterSummaryBuilder::new(&TSPoint{ts: 5000, val:15.0}, I64Range::infinite()); let summary = summary.build(); assert_eq!(summary.prometheus_delta().unwrap_err(), CounterError::BoundsInvalid); assert_eq!(summary.prometheus_rate().unwrap_err(), CounterError::BoundsInvalid); //error on infinite bounds - let summary = CounterSummaryBuilder::new(&TSPoint{ts: 5000, val:15.0}, Some(I64Range{left:None, right:Some(21000)})).build(); + let summary = CounterSummaryBuilder::new(&TSPoint{ts: 5000, val:15.0}, I64Range::new(None, Some(21000))).build(); assert_eq!(summary.prometheus_delta().unwrap_err(), CounterError::BoundsInvalid); assert_eq!(summary.prometheus_rate().unwrap_err(), CounterError::BoundsInvalid); //ranges less than 1ms are treated as zero by Prom - let mut summary = CounterSummaryBuilder::new(&TSPoint{ts: 300, val:15.0}, Some(I64Range{left:Some(0), right:Some(900)})); + let mut summary = CounterSummaryBuilder::new(&TSPoint{ts: 300, val:15.0}, I64Range::new(Some(0), Some(900))); summary.add_point(&TSPoint{ts: 600, val:20.0}).unwrap(); assert_eq!(summary.build().prometheus_rate().unwrap(), None); //ranges should go out an extra 1000 so that we account for the extra duration that prom subtracts (1 ms) - let mut summary = CounterSummaryBuilder::new(&TSPoint{ts: 5000, val:15.0}, Some(I64Range{left:Some(0), right:Some(21000)})); + let mut summary = CounterSummaryBuilder::new(&TSPoint{ts: 5000, val:15.0}, I64Range::new(Some(0), Some(21000))); // singletons should return none assert_eq!(summary.clone().build().prometheus_delta().unwrap(), None); assert_eq!(summary.clone().build().prometheus_rate().unwrap(), None); @@ -323,7 +323,7 @@ summary.add_point(&TSPoint{ts: 10000, val:20.0}).unwrap(); //ranges should go out an extra 1000 so that we account for the extra duration that prom subtracts (1 ms) - let mut summary = CounterSummaryBuilder::new(&TSPoint{ts: 5000, val:15.0}, Some(I64Range{left:Some(0), right:Some(21000)})); + let mut summary = CounterSummaryBuilder::new(&TSPoint{ts: 5000, val:15.0}, I64Range::new(Some(0), Some(21000))); // singletons should return none assert_eq!(summary.clone().build().prometheus_delta().unwrap(), None); assert_eq!(summary.clone().build().prometheus_rate().unwrap(), None); @@ -349,7 +349,7 @@ #[test] fn test_prometheus_extrapolation_bound_size(){ - let mut summary = CounterSummaryBuilder::new(&TSPoint{ts: 20000, val:40.0}, Some(I64Range{left:Some(10000), right:Some(51000)})); + let mut summary = CounterSummaryBuilder::new(&TSPoint{ts: 20000, val:40.0}, I64Range::new(Some(10000), Some(51000))); summary.add_point(&TSPoint{ts: 30000, val:20.0}).unwrap(); summary.add_point(&TSPoint{ts: 40000, val: 40.0}).unwrap(); let summary = summary.build(); @@ -362,7 +362,7 @@ // now lets push the bounds to be a bit bigger let mut summary = CounterSummaryBuilder::from(summary); - summary.set_bounds(Some(I64Range{left:Some(8000), right:Some(53000)})); + summary.set_bounds(I64Range::new(Some(8000), Some(53000))); // now because we're further than 1.1 out on each side, we end projecting out to half the avg distance on each side assert_relative_eq!(summary.clone().build().prometheus_delta().unwrap().unwrap(), 60.0); // but the rate is still divided by the full bound duration @@ -371,7 +371,7 @@ //this should all be the same as the last one in the first part. // The change occurs because we hit the zero boundary condition // so things change on the second bit because of where resets occur and our starting value - let mut summary = CounterSummaryBuilder::new(&TSPoint{ts: 20000, val:20.0}, Some(I64Range{left:Some(10000), right:Some(51000)})); + let mut summary = CounterSummaryBuilder::new(&TSPoint{ts: 20000, val:20.0}, I64Range::new(Some(10000), Some(51000))); summary.add_point(&TSPoint{ts: 30000, val:40.0}).unwrap(); summary.add_point(&TSPoint{ts: 40000, val: 20.0}).unwrap(); let summary = summary.build(); @@ -384,7 +384,7 @@ // now lets push the bounds to be a bit bigger let mut summary = CounterSummaryBuilder::from(summary); - summary.set_bounds(Some(I64Range{left:Some(8000), right:Some(53000)})); + summary.set_bounds(I64Range::new(Some(8000), Some(53000))); let summary = summary.build(); // now because we're further than 1.1 out on the right side, // we end projecting out to half the avg distance on that side, @@ -394,3 +394,59 @@ // but the rate is still divided by the full bound duration assert_relative_eq!(summary.prometheus_rate().unwrap().unwrap(), to_micro(70.0 / 44000.0)); } + +// TODO But don't move these +mod bounds_extend { + use super::*; + + const POINT: TSPoint = TSPoint { ts: 0, val: 0.0 }; + + // TODO A method called "extend" shouldn't narrow. + #[test] + fn on_unbounded_narrows_to_input() { + let input = I64Range::new(Some(11), Some(12)); + let expected = input.clone(); + let mut summary = MetricSummary::new(&POINT, I64Range::infinite()); + summary.bounds_extend(input); + assert_eq!(&expected, &summary.bounds); + } + + // TODO Should a method called "extend" ignore any inputs? + // - Why is `summary.bounds_extend(unbounded)` not the same as + // `summary.bounds_extend(unbounded_left); summary.bounds_extend(unbounded_right)`? + // - If for some reason it shouldn't, shouldn't it error, not silently ignore? + #[test] + fn on_bounded_with_input_unbounded_ignores_input() { + let input = I64Range::infinite(); + let expected = I64Range::new(Some(11), Some(12)); + let mut summary = MetricSummary::new(&POINT, expected.clone()); + summary.bounds_extend(input); + assert_eq!(&expected, &summary.bounds); + + // But extending to unbounded one side at a time DOES work + // (bounds:None is handled the same as bounds:I64Range{left:None,right:None}). + let expected = I64Range::new(None, None); + let mut summary = MetricSummary::new(&POINT, I64Range::new(Some(11), Some(12))); + summary.bounds_extend(I64Range::new(None, Some(0))); + summary.bounds_extend(I64Range::new(Some(0), None)); + assert_eq!(&expected, &summary.bounds); + } + + #[test] + fn on_bounded_with_input_unbounded_on_right_extends_right() { + let input = I64Range::new(Some(12), None); + let expected = I64Range::new(Some(11), None); + let mut summary = MetricSummary::new(&POINT, I64Range::new(Some(11), Some(15))); + summary.bounds_extend(input); + assert_eq!(&expected, &summary.bounds); + } + + #[test] + fn on_bounded_with_input_unbounded_on_left_extends_left() { + let input = I64Range::new(None, Some(14)); + let expected = I64Range::new(None, Some(15)); + let mut summary = MetricSummary::new(&POINT, I64Range::new(Some(11), Some(15))); + summary.bounds_extend(input); + assert_eq!(&expected, &summary.bounds); + } +} diff --git a/extension/src/accessors.rs b/extension/src/accessors.rs index 1f96aeaa..ab41c148 100644 --- a/extension/src/accessors.rs +++ b/extension/src/accessors.rs @@ -1001,11 +1001,11 @@ pub mod toolkit_experimental { match range { None => accessor.range_null = 1, Some(range) => { - if let Some(left) = range.left { + if let Some(left) = range.left() { accessor.lower_present = 1; accessor.lower = left; } - if let Some(right) = range.right { + if let Some(right) = range.right() { accessor.upper_present = 1; accessor.upper = right; } @@ -1015,15 +1015,15 @@ pub mod toolkit_experimental { } impl<'i> AccessorWithBounds<'i> { - pub fn bounds(&self) -> Option { + pub fn bounds(&self) -> I64Range { if self.range_null != 0{ - return None + return I64Range::infinite(); } - I64Range { - left: (self.lower_present != 0).then(|| self.lower), - right: (self.upper_present != 0).then(|| self.upper), - }.into() + I64Range::new( + (self.lower_present != 0).then(|| self.lower), + (self.upper_present != 0).then(|| self.upper), + ) } } -} \ No newline at end of file +} diff --git a/extension/src/counter_agg.rs b/extension/src/counter_agg.rs index 16e74c06..efe3bdaa 100644 --- a/extension/src/counter_agg.rs +++ b/extension/src/counter_agg.rs @@ -21,6 +21,7 @@ use counter_agg::{ MetricSummary, CounterSummaryBuilder, range::I64Range, + stable, }; use stats_agg::stats2d::StatsSummary2D; @@ -100,7 +101,7 @@ pub struct CounterSummaryTransState { // We have a summary buffer here in order to deal with the fact that when the cmobine function gets called it // must first build up a buffer of InternalMetricSummaries, then sort them, then call the combine function in // the correct order. - summary_buffer: Vec, + summary_buffer: Vec, } impl CounterSummaryTransState { @@ -126,7 +127,10 @@ impl CounterSummaryTransState { } self.point_buffer.sort_unstable_by_key(|p| p.ts); let mut iter = self.point_buffer.iter(); - let mut summary = CounterSummaryBuilder::new( iter.next().unwrap(), self.bounds); + let mut summary = CounterSummaryBuilder::new( + iter.next().unwrap(), + self.bounds.clone().unwrap_or_else(I64Range::infinite), + ); for p in iter { summary.add_point(p).unwrap_or_else(|e| pgx::error!("{}", e)); } @@ -136,7 +140,7 @@ impl CounterSummaryTransState { if !summary.bounds_valid() { panic!("counter bounds invalid") } - self.summary_buffer.push(summary.build()); + self.summary_buffer.push(summary.build().into()); } fn push_summary(&mut self, other: &CounterSummaryTransState) { @@ -150,16 +154,19 @@ impl CounterSummaryTransState { self.combine_points(); if self.summary_buffer.len() <= 1 { - return + return; } // TODO move much of this method to crate? self.summary_buffer.sort_unstable_by_key(|s| s.first.ts); - let mut sum_iter = self.summary_buffer.iter(); - let mut new_summary = CounterSummaryBuilder::from(sum_iter.next().unwrap().clone()); + let mut sum_iter = self.summary_buffer.drain(..); + let first = sum_iter.next().expect("already handled empty case"); + let mut new_summary = CounterSummaryBuilder::from(MetricSummary::from(first)); for sum in sum_iter { - new_summary.combine(sum).unwrap_or_else(|e| pgx::error!("{}", e)); + new_summary + .combine(&sum.into()) + .unwrap_or_else(|e| pgx::error!("{}", e)); } - self.summary_buffer = vec![new_summary.build()]; + self.summary_buffer.push(new_summary.build().into()); } } @@ -255,11 +262,11 @@ pub fn counter_agg_summary_trans_inner( (state, None) => state, (None, Some(value)) => { let mut state = CounterSummaryTransState::new(); - state.summary_buffer.push(value.to_internal_counter_summary()); + state.summary_buffer.push(stable::MetricSummary::from(value.to_internal_counter_summary())); Some(state.into()) } (Some(mut state), Some(value)) => { - state.summary_buffer.push(value.to_internal_counter_summary()); + state.summary_buffer.push(stable::MetricSummary::from(value.to_internal_counter_summary())); Some(state) } } @@ -323,6 +330,7 @@ fn counter_agg_final_inner( match state.summary_buffer.pop() { None => None, Some(st) => { + let st = MetricSummary::from(st); // there are some edge cases that this should prevent, but I'm not sure it's necessary, we do check the bounds in the functions that use them. if !st.bounds_valid() { panic!("counter bounds invalid") @@ -530,7 +538,7 @@ fn counter_agg_with_bounds( unsafe{ let ptr = bounds.0 as *mut pg_sys::varlena; let mut builder = CounterSummaryBuilder::from(summary.to_internal_counter_summary()); - builder.set_bounds(get_range(ptr)); + builder.set_bounds(get_range(ptr).unwrap_or_else(I64Range::infinite)); CounterSummary::from_internal_counter_summary(builder.build()) } } diff --git a/extension/src/gauge_agg.rs b/extension/src/gauge_agg.rs index 99d6b9aa..4194cc54 100644 --- a/extension/src/gauge_agg.rs +++ b/extension/src/gauge_agg.rs @@ -1,7 +1,7 @@ use pgx::*; use serde::{Deserialize, Serialize}; -use counter_agg::{range::I64Range, GaugeSummaryBuilder, MetricSummary}; +use counter_agg::{range::I64Range, stable, GaugeSummaryBuilder, MetricSummary}; use flat_serialize::FlatSerializable; use flat_serialize_macro::FlatSerializable; use stats_agg::stats2d::StatsSummary2D; @@ -60,7 +60,7 @@ struct GaugeSummaryTransState { // We have a summary buffer here in order to deal with the fact that when the cmobine function gets called it // must first build up a buffer of InternalMetricSummaries, then sort them, then call the combine function in // the correct order. - summary_buffer: Vec, + summary_buffer: Vec, } impl GaugeSummaryTransState { @@ -82,7 +82,10 @@ impl GaugeSummaryTransState { } self.point_buffer.sort_unstable_by_key(|p| p.ts); let mut iter = self.point_buffer.iter(); - let mut summary = GaugeSummaryBuilder::new(iter.next().unwrap(), self.bounds); + let mut summary = GaugeSummaryBuilder::new( + iter.next().unwrap(), + self.bounds.clone().unwrap_or_else(I64Range::infinite), + ); for p in iter { summary .add_point(p) @@ -94,7 +97,7 @@ impl GaugeSummaryTransState { if !summary.bounds_valid() { panic!("counter bounds invalid") } - self.summary_buffer.push(summary.build()); + self.summary_buffer.push(summary.build().into()); } fn push_summary(&mut self, other: &Self) { @@ -110,16 +113,17 @@ impl GaugeSummaryTransState { if self.summary_buffer.len() <= 1 { return; } + // TODO move much of this method to crate? self.summary_buffer.sort_unstable_by_key(|s| s.first.ts); let mut sum_iter = self.summary_buffer.drain(..); let first = sum_iter.next().expect("already handled empty case"); - let mut new_summary = GaugeSummaryBuilder::from(first); + let mut new_summary = GaugeSummaryBuilder::from(MetricSummary::from(first)); for sum in sum_iter { new_summary - .combine(&sum) + .combine(&sum.into()) .unwrap_or_else(|e| pgx::error!("{}", e)); } - self.summary_buffer.push(new_summary.build()); + self.summary_buffer.push(new_summary.build().into()); } } @@ -209,11 +213,15 @@ fn gauge_agg_summary_trans_inner( (state, None) => state, (None, Some(value)) => { let mut state = GaugeSummaryTransState::new(); - state.summary_buffer.push(value.into()); + state + .summary_buffer + .push(stable::MetricSummary::from(MetricSummary::from(value))); Some(state.into()) } (Some(mut state), Some(value)) => { - state.summary_buffer.push(value.into()); + state + .summary_buffer + .push(stable::MetricSummary::from(MetricSummary::from(value))); Some(state) } }) @@ -282,6 +290,7 @@ fn gauge_agg_final_inner( match state.summary_buffer.pop() { None => None, Some(st) => { + let st = MetricSummary::from(st); // there are some edge cases that this should prevent, but I'm not sure it's necessary, we do check the bounds in the functions that use them. if !st.bounds_valid() { panic!("counter bounds invalid") diff --git a/extension/src/range.rs b/extension/src/range.rs index dda45aa7..b2fd10dd 100644 --- a/extension/src/range.rs +++ b/extension/src/range.rs @@ -11,15 +11,13 @@ pub type tstzrange = *mut pg_sys::varlena; // Derived from Postgres' range_deserialize: https://github.com/postgres/postgres/blob/27e1f14563cf982f1f4d71e21ef247866662a052/src/backend/utils/adt/rangetypes.c#L1779 // but we modify because we only allow specific types of ranges, namely [) inclusive on left and exclusive on right, as this makes a lot of logic simpler, and allows for a standard way to represent a range. +/// Returns `None` to represent [I64Range::infinite]. #[allow(clippy::missing_safety_doc)] pub unsafe fn get_range(range: tstzrange) -> Option { let range_bytes = get_toasted_bytes(&*range); let mut range_bytes = &range_bytes[8..]; // don't care about the Header and Oid let flags = *range_bytes.last().unwrap(); - let mut range = I64Range{ - left: None, - right: None, - }; + let (mut range_left, mut range_right) = (None, None); if flags & RANGE_EMPTY != 0{ return None } @@ -30,7 +28,7 @@ pub unsafe fn get_range(range: tstzrange) -> Option { if !lbound_inclusive(flags) { left += 1; } - range.left = Some(left); + range_left = Some(left); } if range_has_rbound(flags){ let bytes = range_bytes[..8].try_into().unwrap(); @@ -38,10 +36,9 @@ pub unsafe fn get_range(range: tstzrange) -> Option { if rbound_inclusive(flags) { right += 1; } - range.right = Some(right); + range_right = Some(right); } - Some(range) - + Some(I64Range::new(range_left, range_right)) } unsafe fn get_toasted_bytes(ptr: &pg_sys::varlena) -> &[u8] { @@ -104,34 +101,32 @@ flat_serialize! { } } impl I64RangeWrapper { - pub fn to_i64range(&self) -> Option { + pub fn to_i64range(&self) -> I64Range { if self.is_present == 0 { - return None + return I64Range::infinite(); } - Some(I64Range{ - left: self.left, - right: self.right, - }) + I64Range::new(self.left, self.right) } - pub fn from_i64range(b: Option) -> Self { - match b { - Some(range) => Self { + pub fn from_i64range(range: I64Range) -> Self { + if !range.is_infinite() { + Self { is_present: 1, - has_left: range.left.is_some().into(), - has_right: range.right.is_some().into(), + has_left: !range.is_infinite_left() as u8, + has_right: !range.is_infinite_right() as u8, padding: [0; 5], - left: range.left, - right: range.right, - }, - None => Self { + left: range.left(), + right: range.right(), + } + } else { + Self { is_present: 0, has_left: 0, has_right: 0, padding: [0; 5], left: None, right: None, - }, + } } } } diff --git a/extension/src/time_series/pipeline/aggregation.rs b/extension/src/time_series/pipeline/aggregation.rs index cc5dd502..8480fe3a 100644 --- a/extension/src/time_series/pipeline/aggregation.rs +++ b/extension/src/time_series/pipeline/aggregation.rs @@ -478,7 +478,7 @@ pub fn arrow_run_pipeline_then_counter_agg( return None } let mut it = timevector.iter(); - let mut summary = CounterSummaryBuilder::new(&it.next().unwrap(), None); + let mut summary = CounterSummaryBuilder::new(&it.next().unwrap(), counter_agg::range::I64Range::infinite()); for point in it { summary.add_point(&point).expect("error while running counter_agg"); }