Skip to content

Commit

Permalink
Select samples from ClickHouse using the native client (#7093)
Browse files Browse the repository at this point in the history
- Add `FromBlock` trait implementations for extracting the results of
the queries we use to fetch data with OxQL and the older interface
- Convert field-value insert-select roundtrip tests to use native client
- Convert expungement tests to native client
- Remove model code for reading / writing JSON rows to the database
- Remove code for executing SQL as JSON and returning result as a string
- Remove `JSONEachRows` database select format entirely
- Closes #6886
  • Loading branch information
bnaecker authored Nov 19, 2024
1 parent 9696009 commit 48790e5
Show file tree
Hide file tree
Showing 13 changed files with 971 additions and 1,829 deletions.
654 changes: 304 additions & 350 deletions oximeter/db/src/client/mod.rs

Large diffs are not rendered by default.

36 changes: 24 additions & 12 deletions oximeter/db/src/client/oxql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

use super::query_summary::QuerySummary;
use crate::client::Client;
use crate::model;
use crate::model::columns;
use crate::model::from_block::FromBlock as _;
use crate::oxql;
use crate::oxql::ast::table_ops::filter;
use crate::oxql::ast::table_ops::filter::Filter;
Expand All @@ -19,6 +20,7 @@ use crate::Error;
use crate::Metric;
use crate::Target;
use oximeter::schema::TimeseriesKey;
use oximeter::Measurement;
use oximeter::TimeseriesSchema;
use slog::debug;
use slog::trace;
Expand Down Expand Up @@ -596,12 +598,28 @@ impl Client {
limit,
total_rows_fetched,
)?;
let (summary, body) =
self.execute_with_body(&measurements_query).await?;
let result = self.execute_with_block(&measurements_query).await?;
let summary = result.query_summary();
summaries.push(summary);
for line in body.lines() {
let (key, measurement) =
model::parse_measurement_from_row(line, schema.datum_type);
let Some(block) = result.data.as_ref() else {
return Err(Error::QueryMissingData {
query: measurements_query,
});
};
let timeseries_keys = block
.column_values(columns::TIMESERIES_KEY)?
.as_u64()
.map_err(|_| {
crate::native::Error::unexpected_column_type(
block,
columns::TIMESERIES_KEY,
"UInt64",
)
})?;
let measurements = Measurement::from_block(block, &())?;
for (key, measurement) in
timeseries_keys.iter().copied().zip(measurements)
{
measurements_by_key.entry(key).or_default().push(measurement);
n_measurements += 1;
}
Expand Down Expand Up @@ -831,10 +849,6 @@ impl Client {
let remainder = MAX_DATABASE_ROWS - *total_rows_fetched;
query.push_str(" LIMIT ");
write!(query, "{}", remainder + 1).unwrap();

// Finally, use JSON format.
query.push_str(" FORMAT ");
query.push_str(crate::DATABASE_SELECT_FORMAT);
Ok(query)
}

Expand Down Expand Up @@ -887,8 +901,6 @@ impl Client {
}
query.push_str(&preds);
}
query.push_str(" FORMAT ");
query.push_str(crate::DATABASE_SELECT_FORMAT);
Ok(query)
}

Expand Down
2 changes: 2 additions & 0 deletions oximeter/db/src/client/query_summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ pub struct QuerySummary {
pub io_summary: IoSummary,
}

// TODO-remove: https://github.com/oxidecomputer/omicron/issues/7094
#[allow(dead_code)]
impl QuerySummary {
/// Construct a SQL query summary from the headers received from the DB.
pub(crate) fn from_headers(
Expand Down
129 changes: 9 additions & 120 deletions oximeter/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ pub use oximeter::Field;
pub use oximeter::FieldType;
pub use oximeter::Measurement;
pub use oximeter::Sample;
use parse_display::Display;
use parse_display::FromStr;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
use slog::Logger;
use std::collections::BTreeMap;
use std::convert::TryFrom;
use std::io;
use std::net::{IpAddr, SocketAddr};
use std::num::NonZeroU32;
Expand Down Expand Up @@ -172,6 +169,9 @@ pub enum Error {

#[error("Native protocol error")]
Native(#[from] crate::native::Error),

#[error("Query unexpectedly contained no data: '{query}'")]
QueryMissingData { query: String },
}

#[cfg(any(feature = "oxql", test))]
Expand All @@ -181,26 +181,6 @@ impl From<crate::oxql::Error> for Error {
}
}

impl From<model::DbTimeseriesSchema> for TimeseriesSchema {
fn from(schema: model::DbTimeseriesSchema) -> TimeseriesSchema {
TimeseriesSchema {
timeseries_name: TimeseriesName::try_from(
schema.timeseries_name.as_str(),
)
.expect("Invalid timeseries name in database"),
// TODO-cleanup: Fill these in from the values in the database. See
// https://github.com/oxidecomputer/omicron/issues/5942.
description: Default::default(),
version: oximeter::schema::default_schema_version(),
authz_scope: oximeter::schema::AuthzScope::Fleet,
units: oximeter::schema::Units::Count,
field_schema: schema.field_schema.into(),
datum_type: schema.datum_type.into(),
created: schema.created,
}
}
}

/// The target identifies the resource or component about which metric data is produced.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Target {
Expand All @@ -225,41 +205,6 @@ pub struct Timeseries {
pub measurements: Vec<Measurement>,
}

#[derive(
Clone,
Copy,
Debug,
PartialEq,
Eq,
PartialOrd,
Ord,
Deserialize,
Serialize,
FromStr,
Display,
)]
pub enum DbFieldSource {
Target,
Metric,
}

impl From<DbFieldSource> for FieldSource {
fn from(src: DbFieldSource) -> Self {
match src {
DbFieldSource::Target => FieldSource::Target,
DbFieldSource::Metric => FieldSource::Metric,
}
}
}
impl From<FieldSource> for DbFieldSource {
fn from(src: FieldSource) -> Self {
match src {
FieldSource::Target => DbFieldSource::Target,
FieldSource::Metric => DbFieldSource::Metric,
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct TimeseriesScanParams {
pub timeseries_name: TimeseriesName,
Expand Down Expand Up @@ -360,17 +305,14 @@ const VERSION_TABLE_NAME: &str = "version";
// per line, in the file inside the schema version directory with this name.
const TIMESERIES_TO_DELETE_FILE: &str = "timeseries-to-delete.txt";

// The output format used for the result of select queries
//
// See https://clickhouse.com/docs/en/interfaces/formats/#jsoneachrow for details.
const DATABASE_SELECT_FORMAT: &str = "JSONEachRow";

#[cfg(test)]
mod tests {
use super::*;
use crate::model::DbFieldList;
use crate::model::DbTimeseriesSchema;
use crate::timeseries_key;
use crate::timeseries_key_for;
use oximeter::DatumType;
use oximeter::Sample;
use std::borrow::Cow;
use std::collections::BTreeMap;
use uuid::Uuid;

// Validates that the timeseries_key stability for a sample is stable.
Expand All @@ -391,7 +333,7 @@ mod tests {
let target = TestTarget { name: String::from("Hello"), num: 1337 };
let metric = TestMetric { id: Uuid::nil(), datum: 0x1de };
let sample = Sample::new(&target, &metric).unwrap();
let key = super::timeseries_key(&sample);
let key = timeseries_key(&sample);

expectorate::assert_contents(
"test-output/sample-timeseries-key.txt",
Expand Down Expand Up @@ -456,57 +398,4 @@ mod tests {
&output.join("\n"),
);
}

#[test]
fn test_unsorted_db_fields_are_sorted_on_read() {
let target_field = FieldSchema {
name: String::from("later"),
field_type: FieldType::U64,
source: FieldSource::Target,
description: String::new(),
};
let metric_field = FieldSchema {
name: String::from("earlier"),
field_type: FieldType::U64,
source: FieldSource::Metric,
description: String::new(),
};
let timeseries_name: TimeseriesName = "foo:bar".parse().unwrap();
let datum_type = DatumType::U64;
let field_schema =
[target_field.clone(), metric_field.clone()].into_iter().collect();
let expected_schema = TimeseriesSchema {
timeseries_name: timeseries_name.clone(),
description: Default::default(),
version: oximeter::schema::default_schema_version(),
authz_scope: oximeter::schema::AuthzScope::Fleet,
units: oximeter::schema::Units::Count,
field_schema,
datum_type,
created: Utc::now(),
};

// The fields here are sorted by target and then metric, which is how we
// used to insert them into the DB. We're checking that they are totally
// sorted when we read them out of the DB, even though they are not in
// the extracted model type.
let db_fields = DbFieldList {
names: vec![target_field.name.clone(), metric_field.name.clone()],
types: vec![
target_field.field_type.into(),
metric_field.field_type.into(),
],
sources: vec![
target_field.source.into(),
metric_field.source.into(),
],
};
let db_schema = DbTimeseriesSchema {
timeseries_name: timeseries_name.to_string(),
field_schema: db_fields,
datum_type: datum_type.into(),
created: expected_schema.created,
};
assert_eq!(expected_schema, TimeseriesSchema::from(db_schema));
}
}
Loading

0 comments on commit 48790e5

Please sign in to comment.