diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index 19af05ccc74..9a2b7b1bd30 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -866,6 +866,7 @@ impl Client { let id = usdt::UniqueId::new(); probes::sql__query__start!(|| (&id, &sql)); let start = Instant::now(); + // Submit the SQL request itself. let response = self .client @@ -1071,6 +1072,7 @@ mod tests { // explored or decided on these. // // TODO-robustness TODO-correctness: Figure out the ClickHouse options we need. + #[tokio::test] async fn test_single_node() { let logctx = test_setup_log("test_single_node"); @@ -3099,7 +3101,7 @@ mod tests { let insert_sql = format!( "INSERT INTO {measurement_table} FORMAT JSONEachRow {inserted_row}", ); - println!("Row to insert: {}", inserted_row); + println!("Inserted row: {}", inserted_row); client .execute(insert_sql) .await @@ -3112,17 +3114,11 @@ mod tests { measurement.timestamp().format(crate::DATABASE_TIMESTAMP_FORMAT), crate::DATABASE_SELECT_FORMAT, ); - - println!("Selecting row: {}", select_sql); - let body = client .execute_with_body(select_sql) .await .expect("Failed to select measurement row") .1; - - println!("Body: {}", body); - let (_, actual_row) = crate::model::parse_measurement_from_row( &body, measurement.datum_type(), @@ -3675,12 +3671,12 @@ mod tests { // Write out the upgrading SQL files. // - // Note that all of these statements are going in the version 6 schema + // Note that all of these statements are going in the version 2 schema // directory. let (schema_dir, version_dirs) = create_test_upgrade_schema_directory(replicated, &[NEXT_VERSION]) .await; - const NEXT_VERSION: u64 = OXIMETER_VERSION + 1; + const NEXT_VERSION: u64 = 2; let first_sql = format!("ALTER TABLE {test_name}.tbl ADD COLUMN `col1` UInt16;"); let second_sql = @@ -3899,7 +3895,7 @@ mod tests { // Write out the upgrading SQL files. // // Note that each statement goes into a different version. - const VERSIONS: [u64; 4] = [1, 2, 3, 4]; + const VERSIONS: [u64; 3] = [1, 2, 3]; let (schema_dir, version_dirs) = create_test_upgrade_schema_directory(replicated, &VERSIONS).await; let first_sql = String::new(); diff --git a/oximeter/db/src/client/oxql.rs b/oximeter/db/src/client/oxql.rs index b389179b7aa..29586b8189b 100644 --- a/oximeter/db/src/client/oxql.rs +++ b/oximeter/db/src/client/oxql.rs @@ -825,14 +825,13 @@ impl Client { datum_type: oximeter::DatumType, ) -> String { let value_columns = if datum_type.is_histogram() { - r#"timeseries_key, start_time, timestamp, bins, counts, min, max, - sum_of_samples, squared_mean, - p50_marker_heights, p50_marker_positions, - p50_desired_marker_positions, - p90_marker_heights, p90_marker_positions, - p90_desired_marker_positions, - p99_marker_heights, p99_marker_positions, - p99_desired_marker_positions"# + concat!( + "timeseries_key, start_time, timestamp, bins, counts, min, max, ", + "sum_of_samples, squared_mean, p50_marker_heights, p50_marker_positions, ", + "p50_desired_marker_positions, p90_marker_heights, p90_marker_positions, ", + "p90_desired_marker_positions, p99_marker_heights, p99_marker_positions, ", + "p99_desired_marker_positions" + ) } else if datum_type.is_cumulative() { "timeseries_key, start_time, timestamp, datum" } else { diff --git a/oximeter/db/src/oxql/ast/table_ops/filter.rs b/oximeter/db/src/oxql/ast/table_ops/filter.rs index 1d425761e68..9e796bc730a 100644 --- a/oximeter/db/src/oxql/ast/table_ops/filter.rs +++ b/oximeter/db/src/oxql/ast/table_ops/filter.rs @@ -518,21 +518,9 @@ fn implicit_field_names( MetricType::Gauge, DataType::IntegerDistribution | DataType::DoubleDistribution, ) => { - out.insert(special_idents::BINS); - out.insert(special_idents::COUNTS); - out.insert(special_idents::MIN); - out.insert(special_idents::MAX); - out.insert(special_idents::SUM_OF_SAMPLES); - out.insert(special_idents::SQUARED_MEAN); - out.insert(special_idents::P50_MARKER_HEIGHTS); - out.insert(special_idents::P50_MARKER_POSITIONS); - out.insert(special_idents::P50_DESIRED_MARKER_POSITIONS); - out.insert(special_idents::P90_MARKER_HEIGHTS); - out.insert(special_idents::P90_MARKER_POSITIONS); - out.insert(special_idents::P90_DESIRED_MARKER_POSITIONS); - out.insert(special_idents::P99_MARKER_HEIGHTS); - out.insert(special_idents::P99_MARKER_POSITIONS); - out.insert(special_idents::P99_DESIRED_MARKER_POSITIONS); + special_idents::DISTRIBUTION_IDENTS.iter().for_each(|ident| { + out.insert(ident); + }); } // Scalars, either delta or cumulatives. ( @@ -547,21 +535,9 @@ fn implicit_field_names( MetricType::Delta | MetricType::Cumulative, DataType::IntegerDistribution | DataType::DoubleDistribution, ) => { - out.insert(special_idents::BINS); - out.insert(special_idents::COUNTS); - out.insert(special_idents::MIN); - out.insert(special_idents::MAX); - out.insert(special_idents::SUM_OF_SAMPLES); - out.insert(special_idents::SQUARED_MEAN); - out.insert(special_idents::P50_MARKER_HEIGHTS); - out.insert(special_idents::P50_MARKER_POSITIONS); - out.insert(special_idents::P50_DESIRED_MARKER_POSITIONS); - out.insert(special_idents::P90_MARKER_HEIGHTS); - out.insert(special_idents::P90_MARKER_POSITIONS); - out.insert(special_idents::P90_DESIRED_MARKER_POSITIONS); - out.insert(special_idents::P99_MARKER_HEIGHTS); - out.insert(special_idents::P99_MARKER_POSITIONS); - out.insert(special_idents::P99_DESIRED_MARKER_POSITIONS); + special_idents::DISTRIBUTION_IDENTS.iter().for_each(|ident| { + out.insert(ident); + }); out.insert(special_idents::START_TIME); } // Impossible combinations diff --git a/oximeter/db/src/oxql/point.rs b/oximeter/db/src/oxql/point.rs index cfc8671973e..eabe370e347 100644 --- a/oximeter/db/src/oxql/point.rs +++ b/oximeter/db/src/oxql/point.rs @@ -1553,27 +1553,22 @@ where .collect::>() .join(", "); - let p50_estimate = self - .p50 - .as_ref() - .map(|q| q.estimate().unwrap_or_default()) - .unwrap_or_default(); - let p90_estimate = self - .p90 - .as_ref() - .map(|q| q.estimate().unwrap_or_default()) - .unwrap_or_default(); - let p99_estimate = self - .p99 - .as_ref() - .map(|q| q.estimate().unwrap_or_default()) - .unwrap_or_default(); + let unwrap_estimate = |opt: Option| { + opt.map_or("None".to_string(), |v| match v.estimate() { + Ok(v) => v.to_string(), + Err(err) => err.to_string(), + }) + }; + + let p50_estimate = unwrap_estimate(self.p50); + let p90_estimate = unwrap_estimate(self.p90); + let p99_estimate = unwrap_estimate(self.p99); write!( f, "{}, min: {}, max: {}, mean: {}, std_dev: {}, p50: {}, p90: {}, p99: {}", elems, - self.min.unwrap_or_default(), + self.min.map_or("none".to_string(), |m| m.to_string()), self.max.unwrap_or_default(), self.mean(), self.std_dev().unwrap_or_default(), @@ -1593,6 +1588,16 @@ where /// Min and max values are returned as None, as they lose meaning /// when subtracting distributions. The same is true for p50, p90, and p99 /// quantiles. + /// + /// TODO: It's not really clear how to compute the "difference" of two + /// histograms for items like min, max, p*'s. It's certainly not linear, and + /// although we might be able to make some estimates in the case of min and + /// max, we'll defer it for now. Instead, we'll store None for all these + /// values when computing the diff. They will be very useful later, when we + /// start generating distributions in OxQL itself, from a sequence of + /// scalars (similar to a DTrace aggregation). We'll wait to put that in + /// place until we have more data that we want to start aggregating that + /// way. fn checked_sub( &self, rhs: &Distribution, diff --git a/oximeter/db/src/oxql/query/mod.rs b/oximeter/db/src/oxql/query/mod.rs index 5e9c748bfd9..40a6c82f93f 100644 --- a/oximeter/db/src/oxql/query/mod.rs +++ b/oximeter/db/src/oxql/query/mod.rs @@ -44,21 +44,6 @@ pub mod special_idents { pub const MAX: &str = "max"; pub const SUM_OF_SAMPLES: &str = "sum_of_samples"; pub const SQUARED_MEAN: &str = "squared_mean"; - pub const P50_MARKER_HEIGHTS: &str = gen_marker!("50", "marker_heights"); - pub const P50_MARKER_POSITIONS: &str = - gen_marker!("50", "marker_positions"); - pub const P50_DESIRED_MARKER_POSITIONS: &str = - gen_marker!("50", "desired_marker_positions"); - pub const P90_MARKER_HEIGHTS: &str = gen_marker!("90", "marker_heights"); - pub const P90_MARKER_POSITIONS: &str = - gen_marker!("90", "marker_positions"); - pub const P90_DESIRED_MARKER_POSITIONS: &str = - gen_marker!("90", "desired_marker_positions"); - pub const P99_MARKER_HEIGHTS: &str = gen_marker!("99", "marker_heights"); - pub const P99_MARKER_POSITIONS: &str = - gen_marker!("99", "marker_positions"); - pub const P99_DESIRED_MARKER_POSITIONS: &str = - gen_marker!("99", "desired_marker_positions"); pub const DATETIME64: &str = "DateTime64"; pub const ARRAYU64: &str = "Array[u64]"; pub const ARRAYFLOAT64: &str = "Array[f64]"; @@ -66,6 +51,24 @@ pub mod special_idents { pub const FLOAT64: &str = "f64"; pub const UINT64: &str = "u64"; + pub const DISTRIBUTION_IDENTS: [&str; 15] = [ + "bins", + "counts", + "min", + "max", + "sum_of_samples", + "squared_mean", + gen_marker!("50", "marker_heights"), + gen_marker!("50", "marker_positions"), + gen_marker!("50", "desired_marker_positions"), + gen_marker!("90", "marker_heights"), + gen_marker!("90", "marker_positions"), + gen_marker!("90", "desired_marker_positions"), + gen_marker!("99", "marker_heights"), + gen_marker!("99", "marker_positions"), + gen_marker!("99", "desired_marker_positions"), + ]; + pub fn array_type_name_from_histogram_type( type_: DatumType, ) -> Option { diff --git a/oximeter/db/src/sql/mod.rs b/oximeter/db/src/sql/mod.rs index e742637503b..e434608b1c1 100644 --- a/oximeter/db/src/sql/mod.rs +++ b/oximeter/db/src/sql/mod.rs @@ -611,7 +611,7 @@ impl RestrictedQuery { // // Scalar measurements have only a timestamp and datum. Cumulative counters // have those plus a start_time. And histograms have those plus the bins, - // counts, min, max, sum of samples, sum of squares, and quantiile arrays. + // counts, min, max, sum of samples, sum of squares, and quantile arrays. fn datum_type_to_columns( datum_type: &DatumType, ) -> &'static [&'static str] { diff --git a/oximeter/oximeter/src/quantile.rs b/oximeter/oximeter/src/quantile.rs index 310ed525c0e..8bc144bb0a4 100644 --- a/oximeter/oximeter/src/quantile.rs +++ b/oximeter/oximeter/src/quantile.rs @@ -93,7 +93,6 @@ impl Quantile { p, marker_heights: [0.; FILLED_MARKER_LEN], // We start with a sample size of 0. - //marker_positions: [0, 1, 2, 3, 0], marker_positions: [1, 2, 3, 4, 0], // 1-indexed, which is like the paper, but // used to keep track of the sample size without @@ -250,7 +249,6 @@ impl Quantile { // We've already checked that the value is finite. let value_f = value.to_f64().unwrap(); - // if !self.is_filled { if self.len() < FILLED_MARKER_LEN as u64 { self.marker_heights[self.len() as usize] = value_f; self.marker_positions[4] += 1; @@ -311,14 +309,11 @@ impl Quantile { if value < self.marker_heights[0] { None } else { - let mut k = 0; - while k + 1 < FILLED_MARKER_LEN - && value >= self.marker_heights[k + 1] - { - k += 1; - } - - Some(k) + Some( + self.marker_heights + .partition_point(|&height| height <= value) + .saturating_sub(1), + ) } } @@ -333,7 +328,7 @@ impl Quantile { if (d >= 1. && self.marker_positions[i + 1] > self.marker_positions[i] + 1) || (d <= -1. - && self.marker_positions[i - 1] < self.marker_positions[i]) + && self.marker_positions[i - 1] < self.marker_positions[i] - 1) { let d_signum = d.signum(); let q_prime = self.parabolic(i, d_signum); @@ -362,8 +357,8 @@ impl Quantile { /// Read /// for more. fn adaptive_init(&mut self) { - self.desired_marker_positions[1..FILLED_MARKER_LEN] - .copy_from_slice(&self.marker_heights[1..FILLED_MARKER_LEN]); + self.desired_marker_positions[..FILLED_MARKER_LEN] + .copy_from_slice(&self.marker_heights[..FILLED_MARKER_LEN]); self.marker_positions[1] = (1. + 2. * self.p()).round() as u64; self.marker_positions[2] = (1. + 4. * self.p()).round() as u64; @@ -435,6 +430,33 @@ mod tests { q } + #[test] + fn test_min_p() { + let observations = [3, 6, 7, 8, 8, 10, 13, 15, 16, 20]; + + let mut q = Quantile::new(0.0).unwrap(); + //assert_eq!(q.p(), 0.1); + for &o in observations.iter() { + q.append(o).unwrap(); + } + assert_eq!(q.estimate().unwrap(), 3.); + } + + /// Compared with C# implementation of P² algorithm. + #[test] + fn test_max_p() { + let observations = [3, 6, 7, 8, 8, 10, 13, 15, 16, 20]; + + let mut q = Quantile::new(1.).unwrap(); + assert_eq!(q.p(), 1.); + + for &o in observations.iter() { + q.append(o).unwrap(); + } + + assert_eq!(q.estimate().unwrap(), 11.66543209876543); + } + /// Example observations from the P² paper. #[test] fn test_float_observations() { @@ -447,8 +469,9 @@ mod tests { q.append(o).unwrap(); } assert_eq!(q.marker_positions, [1, 6, 10, 16, 20]); - assert_eq!(q.desired_marker_positions, [1., 5.75, 10.50, 15.25, 20.0]); + assert_eq!(q.desired_marker_positions, [0.02, 5.75, 10.5, 15.25, 20.0]); assert_eq!(q.p(), 0.5); + assert_eq!(q.len(), 20); assert_relative_eq!(q.estimate().unwrap(), 4.2462394088036435,); }