Skip to content

Commit

Permalink
address review and fix up adaptive initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
zeeshanlakhani committed Jun 22, 2024
1 parent adb1136 commit fc729fe
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 94 deletions.
16 changes: 6 additions & 10 deletions oximeter/db/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,7 @@ impl Client {
let id = usdt::UniqueId::new();
probes::sql__query__start!(|| (&id, &sql));
let start = Instant::now();

// Submit the SQL request itself.
let response = self
.client
Expand Down Expand Up @@ -1071,6 +1072,7 @@ mod tests {
// explored or decided on these.
//
// TODO-robustness TODO-correctness: Figure out the ClickHouse options we need.

#[tokio::test]
async fn test_single_node() {
let logctx = test_setup_log("test_single_node");
Expand Down Expand Up @@ -3099,7 +3101,7 @@ mod tests {
let insert_sql = format!(
"INSERT INTO {measurement_table} FORMAT JSONEachRow {inserted_row}",
);
println!("Row to insert: {}", inserted_row);
println!("Inserted row: {}", inserted_row);
client
.execute(insert_sql)
.await
Expand All @@ -3112,17 +3114,11 @@ mod tests {
measurement.timestamp().format(crate::DATABASE_TIMESTAMP_FORMAT),
crate::DATABASE_SELECT_FORMAT,
);

println!("Selecting row: {}", select_sql);

let body = client
.execute_with_body(select_sql)
.await
.expect("Failed to select measurement row")
.1;

println!("Body: {}", body);

let (_, actual_row) = crate::model::parse_measurement_from_row(
&body,
measurement.datum_type(),
Expand Down Expand Up @@ -3675,12 +3671,12 @@ mod tests {

// Write out the upgrading SQL files.
//
// Note that all of these statements are going in the version 6 schema
// Note that all of these statements are going in the version 2 schema
// directory.
let (schema_dir, version_dirs) =
create_test_upgrade_schema_directory(replicated, &[NEXT_VERSION])
.await;
const NEXT_VERSION: u64 = OXIMETER_VERSION + 1;
const NEXT_VERSION: u64 = 2;
let first_sql =
format!("ALTER TABLE {test_name}.tbl ADD COLUMN `col1` UInt16;");
let second_sql =
Expand Down Expand Up @@ -3899,7 +3895,7 @@ mod tests {
// Write out the upgrading SQL files.
//
// Note that each statement goes into a different version.
const VERSIONS: [u64; 4] = [1, 2, 3, 4];
const VERSIONS: [u64; 3] = [1, 2, 3];
let (schema_dir, version_dirs) =
create_test_upgrade_schema_directory(replicated, &VERSIONS).await;
let first_sql = String::new();
Expand Down
15 changes: 7 additions & 8 deletions oximeter/db/src/client/oxql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -825,14 +825,13 @@ impl Client {
datum_type: oximeter::DatumType,
) -> String {
let value_columns = if datum_type.is_histogram() {
r#"timeseries_key, start_time, timestamp, bins, counts, min, max,
sum_of_samples, squared_mean,
p50_marker_heights, p50_marker_positions,
p50_desired_marker_positions,
p90_marker_heights, p90_marker_positions,
p90_desired_marker_positions,
p99_marker_heights, p99_marker_positions,
p99_desired_marker_positions"#
concat!(
"timeseries_key, start_time, timestamp, bins, counts, min, max, ",
"sum_of_samples, squared_mean, p50_marker_heights, p50_marker_positions, ",
"p50_desired_marker_positions, p90_marker_heights, p90_marker_positions, ",
"p90_desired_marker_positions, p99_marker_heights, p99_marker_positions, ",
"p99_desired_marker_positions"
)
} else if datum_type.is_cumulative() {
"timeseries_key, start_time, timestamp, datum"
} else {
Expand Down
36 changes: 6 additions & 30 deletions oximeter/db/src/oxql/ast/table_ops/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,21 +518,9 @@ fn implicit_field_names(
MetricType::Gauge,
DataType::IntegerDistribution | DataType::DoubleDistribution,
) => {
out.insert(special_idents::BINS);
out.insert(special_idents::COUNTS);
out.insert(special_idents::MIN);
out.insert(special_idents::MAX);
out.insert(special_idents::SUM_OF_SAMPLES);
out.insert(special_idents::SQUARED_MEAN);
out.insert(special_idents::P50_MARKER_HEIGHTS);
out.insert(special_idents::P50_MARKER_POSITIONS);
out.insert(special_idents::P50_DESIRED_MARKER_POSITIONS);
out.insert(special_idents::P90_MARKER_HEIGHTS);
out.insert(special_idents::P90_MARKER_POSITIONS);
out.insert(special_idents::P90_DESIRED_MARKER_POSITIONS);
out.insert(special_idents::P99_MARKER_HEIGHTS);
out.insert(special_idents::P99_MARKER_POSITIONS);
out.insert(special_idents::P99_DESIRED_MARKER_POSITIONS);
special_idents::DISTRIBUTION_IDENTS.iter().for_each(|ident| {
out.insert(ident);
});
}
// Scalars, either delta or cumulatives.
(
Expand All @@ -547,21 +535,9 @@ fn implicit_field_names(
MetricType::Delta | MetricType::Cumulative,
DataType::IntegerDistribution | DataType::DoubleDistribution,
) => {
out.insert(special_idents::BINS);
out.insert(special_idents::COUNTS);
out.insert(special_idents::MIN);
out.insert(special_idents::MAX);
out.insert(special_idents::SUM_OF_SAMPLES);
out.insert(special_idents::SQUARED_MEAN);
out.insert(special_idents::P50_MARKER_HEIGHTS);
out.insert(special_idents::P50_MARKER_POSITIONS);
out.insert(special_idents::P50_DESIRED_MARKER_POSITIONS);
out.insert(special_idents::P90_MARKER_HEIGHTS);
out.insert(special_idents::P90_MARKER_POSITIONS);
out.insert(special_idents::P90_DESIRED_MARKER_POSITIONS);
out.insert(special_idents::P99_MARKER_HEIGHTS);
out.insert(special_idents::P99_MARKER_POSITIONS);
out.insert(special_idents::P99_DESIRED_MARKER_POSITIONS);
special_idents::DISTRIBUTION_IDENTS.iter().for_each(|ident| {
out.insert(ident);
});
out.insert(special_idents::START_TIME);
}
// Impossible combinations
Expand Down
37 changes: 21 additions & 16 deletions oximeter/db/src/oxql/point.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1553,27 +1553,22 @@ where
.collect::<Vec<_>>()
.join(", ");

let p50_estimate = self
.p50
.as_ref()
.map(|q| q.estimate().unwrap_or_default())
.unwrap_or_default();
let p90_estimate = self
.p90
.as_ref()
.map(|q| q.estimate().unwrap_or_default())
.unwrap_or_default();
let p99_estimate = self
.p99
.as_ref()
.map(|q| q.estimate().unwrap_or_default())
.unwrap_or_default();
let unwrap_estimate = |opt: Option<Quantile>| {
opt.map_or("None".to_string(), |v| match v.estimate() {
Ok(v) => v.to_string(),
Err(err) => err.to_string(),
})
};

let p50_estimate = unwrap_estimate(self.p50);
let p90_estimate = unwrap_estimate(self.p90);
let p99_estimate = unwrap_estimate(self.p99);

write!(
f,
"{}, min: {}, max: {}, mean: {}, std_dev: {}, p50: {}, p90: {}, p99: {}",
elems,
self.min.unwrap_or_default(),
self.min.map_or("none".to_string(), |m| m.to_string()),
self.max.unwrap_or_default(),
self.mean(),
self.std_dev().unwrap_or_default(),
Expand All @@ -1593,6 +1588,16 @@ where
/// Min and max values are returned as None, as they lose meaning
/// when subtracting distributions. The same is true for p50, p90, and p99
/// quantiles.
///
/// TODO: It's not really clear how to compute the "difference" of two
/// histograms for items like min, max, p*'s. It's certainly not linear, and
/// although we might be able to make some estimates in the case of min and
/// max, we'll defer it for now. Instead, we'll store None for all these
/// values when computing the diff. They will be very useful later, when we
/// start generating distributions in OxQL itself, from a sequence of
/// scalars (similar to a DTrace aggregation). We'll wait to put that in
/// place until we have more data that we want to start aggregating that
/// way.
fn checked_sub(
&self,
rhs: &Distribution<T>,
Expand Down
33 changes: 18 additions & 15 deletions oximeter/db/src/oxql/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,31 @@ pub mod special_idents {
pub const MAX: &str = "max";
pub const SUM_OF_SAMPLES: &str = "sum_of_samples";
pub const SQUARED_MEAN: &str = "squared_mean";
pub const P50_MARKER_HEIGHTS: &str = gen_marker!("50", "marker_heights");
pub const P50_MARKER_POSITIONS: &str =
gen_marker!("50", "marker_positions");
pub const P50_DESIRED_MARKER_POSITIONS: &str =
gen_marker!("50", "desired_marker_positions");
pub const P90_MARKER_HEIGHTS: &str = gen_marker!("90", "marker_heights");
pub const P90_MARKER_POSITIONS: &str =
gen_marker!("90", "marker_positions");
pub const P90_DESIRED_MARKER_POSITIONS: &str =
gen_marker!("90", "desired_marker_positions");
pub const P99_MARKER_HEIGHTS: &str = gen_marker!("99", "marker_heights");
pub const P99_MARKER_POSITIONS: &str =
gen_marker!("99", "marker_positions");
pub const P99_DESIRED_MARKER_POSITIONS: &str =
gen_marker!("99", "desired_marker_positions");
pub const DATETIME64: &str = "DateTime64";
pub const ARRAYU64: &str = "Array[u64]";
pub const ARRAYFLOAT64: &str = "Array[f64]";
pub const ARRAYINT64: &str = "Array[i64]";
pub const FLOAT64: &str = "f64";
pub const UINT64: &str = "u64";

pub const DISTRIBUTION_IDENTS: [&str; 15] = [
"bins",
"counts",
"min",
"max",
"sum_of_samples",
"squared_mean",
gen_marker!("50", "marker_heights"),
gen_marker!("50", "marker_positions"),
gen_marker!("50", "desired_marker_positions"),
gen_marker!("90", "marker_heights"),
gen_marker!("90", "marker_positions"),
gen_marker!("90", "desired_marker_positions"),
gen_marker!("99", "marker_heights"),
gen_marker!("99", "marker_positions"),
gen_marker!("99", "desired_marker_positions"),
];

pub fn array_type_name_from_histogram_type(
type_: DatumType,
) -> Option<String> {
Expand Down
2 changes: 1 addition & 1 deletion oximeter/db/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ impl RestrictedQuery {
//
// Scalar measurements have only a timestamp and datum. Cumulative counters
// have those plus a start_time. And histograms have those plus the bins,
// counts, min, max, sum of samples, sum of squares, and quantiile arrays.
// counts, min, max, sum of samples, sum of squares, and quantile arrays.
fn datum_type_to_columns(
datum_type: &DatumType,
) -> &'static [&'static str] {
Expand Down
51 changes: 37 additions & 14 deletions oximeter/oximeter/src/quantile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ impl Quantile {
p,
marker_heights: [0.; FILLED_MARKER_LEN],
// We start with a sample size of 0.
//marker_positions: [0, 1, 2, 3, 0],
marker_positions: [1, 2, 3, 4, 0],
// 1-indexed, which is like the paper, but
// used to keep track of the sample size without
Expand Down Expand Up @@ -250,7 +249,6 @@ impl Quantile {
// We've already checked that the value is finite.
let value_f = value.to_f64().unwrap();

// if !self.is_filled {
if self.len() < FILLED_MARKER_LEN as u64 {
self.marker_heights[self.len() as usize] = value_f;
self.marker_positions[4] += 1;
Expand Down Expand Up @@ -311,14 +309,11 @@ impl Quantile {
if value < self.marker_heights[0] {
None
} else {
let mut k = 0;
while k + 1 < FILLED_MARKER_LEN
&& value >= self.marker_heights[k + 1]
{
k += 1;
}

Some(k)
Some(
self.marker_heights
.partition_point(|&height| height <= value)
.saturating_sub(1),
)
}
}

Expand All @@ -333,7 +328,7 @@ impl Quantile {
if (d >= 1.
&& self.marker_positions[i + 1] > self.marker_positions[i] + 1)
|| (d <= -1.
&& self.marker_positions[i - 1] < self.marker_positions[i])
&& self.marker_positions[i - 1] < self.marker_positions[i] - 1)
{
let d_signum = d.signum();
let q_prime = self.parabolic(i, d_signum);
Expand Down Expand Up @@ -362,8 +357,8 @@ impl Quantile {
/// Read <https://aakinshin.net/posts/p2-quantile-estimator-initialization>
/// for more.
fn adaptive_init(&mut self) {
self.desired_marker_positions[1..FILLED_MARKER_LEN]
.copy_from_slice(&self.marker_heights[1..FILLED_MARKER_LEN]);
self.desired_marker_positions[..FILLED_MARKER_LEN]
.copy_from_slice(&self.marker_heights[..FILLED_MARKER_LEN]);

self.marker_positions[1] = (1. + 2. * self.p()).round() as u64;
self.marker_positions[2] = (1. + 4. * self.p()).round() as u64;
Expand Down Expand Up @@ -435,6 +430,33 @@ mod tests {
q
}

#[test]
fn test_min_p() {
let observations = [3, 6, 7, 8, 8, 10, 13, 15, 16, 20];

let mut q = Quantile::new(0.0).unwrap();
//assert_eq!(q.p(), 0.1);
for &o in observations.iter() {
q.append(o).unwrap();
}
assert_eq!(q.estimate().unwrap(), 3.);
}

/// Compared with C# implementation of P² algorithm.
#[test]
fn test_max_p() {
let observations = [3, 6, 7, 8, 8, 10, 13, 15, 16, 20];

let mut q = Quantile::new(1.).unwrap();
assert_eq!(q.p(), 1.);

for &o in observations.iter() {
q.append(o).unwrap();
}

assert_eq!(q.estimate().unwrap(), 11.66543209876543);
}

/// Example observations from the P² paper.
#[test]
fn test_float_observations() {
Expand All @@ -447,8 +469,9 @@ mod tests {
q.append(o).unwrap();
}
assert_eq!(q.marker_positions, [1, 6, 10, 16, 20]);
assert_eq!(q.desired_marker_positions, [1., 5.75, 10.50, 15.25, 20.0]);
assert_eq!(q.desired_marker_positions, [0.02, 5.75, 10.5, 15.25, 20.0]);
assert_eq!(q.p(), 0.5);
assert_eq!(q.len(), 20);
assert_relative_eq!(q.estimate().unwrap(), 4.2462394088036435,);
}

Expand Down

0 comments on commit fc729fe

Please sign in to comment.