Skip to content

Commit

Permalink
Insert samples using the native client
Browse files Browse the repository at this point in the history
- Remove unneeded and error-prone `n_columns` and `n_rows` fields from
  the `Block` type. These are now methods that delegate to the actual
  column arrays, which are the source of truth anyway.
- Add methods for extracting fields from a sample as a set of data
  blocks, one per field table.
- Add methods to extract measurements from a sample as a block destined
  for one table. Take care to correctly construct missing samples,
  especially for histograms. Also implement the `FromBlock` trait for
  samples, to ensure we can extract the raw data as well. This is only
  used in tests, in this commit.
- Insert fields and measurements from a sample as a data block, using
  the native interface.
- Fix serde of UUIDs in native format, which doesn't match the
  documentation at least for our current version of ClickHouse.
- Remove code serializing fields and measurements into JSON.
- Closes #6884
  • Loading branch information
bnaecker committed Nov 7, 2024
1 parent 8d73079 commit 9acd3a6
Show file tree
Hide file tree
Showing 18 changed files with 2,306 additions and 1,104 deletions.
63 changes: 39 additions & 24 deletions oximeter/db/src/client/dbwrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,22 @@
use crate::client::Client;
use crate::model;
use crate::model::to_block::ToBlock as _;
use crate::native::block::Block;
use crate::Error;
use camino::Utf8PathBuf;
use oximeter::Sample;
use oximeter::TimeseriesSchema;
use slog::debug;
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::collections::BTreeSet;

#[derive(Debug)]
pub(super) struct UnrolledSampleRows {
/// The timeseries schema rows.
pub new_schema: Vec<TimeseriesSchema>,
/// The rows to insert in all the other tables, keyed by the table name.
pub rows: BTreeMap<String, Vec<String>>,
/// The blocks to insert in all the other tables, keyed by the table name.
pub blocks: BTreeMap<String, Block>,
}

/// A trait allowing a [`Client`] to write data into the timeseries database.
Expand Down Expand Up @@ -54,10 +56,10 @@ impl DbWrite for Client {
/// Insert the given samples into the database.
async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error> {
debug!(self.log, "unrolling {} total samples", samples.len());
let UnrolledSampleRows { new_schema, rows } =
let UnrolledSampleRows { new_schema, blocks } =
self.unroll_samples(samples).await;
self.save_new_schema_or_remove(new_schema).await?;
self.insert_unrolled_samples(rows).await
self.insert_unrolled_samples(blocks).await
}

/// Initialize the replicated telemetry database, creating tables as needed.
Expand Down Expand Up @@ -172,7 +174,7 @@ impl Client {
samples: &[Sample],
) -> UnrolledSampleRows {
let mut seen_timeseries = BTreeSet::new();
let mut rows = BTreeMap::new();
let mut table_blocks = BTreeMap::new();
let mut new_schema = BTreeMap::new();

for sample in samples.iter() {
Expand Down Expand Up @@ -200,48 +202,61 @@ impl Client {
crate::timeseries_key(sample),
);
if !seen_timeseries.contains(&key) {
for (table_name, table_rows) in model::unroll_field_rows(sample)
for (table_name, block) in
model::fields::extract_fields_as_block(sample)
{
rows.entry(table_name)
.or_insert_with(Vec::new)
.extend(table_rows);
match table_blocks.entry(table_name) {
Entry::Vacant(entry) => {
entry.insert(block);
}
Entry::Occupied(mut entry) => entry
.get_mut()
.concat(block)
.expect("All blocks for a table must match"),
}
}
}

let (table_name, measurement_row) =
model::unroll_measurement_row(sample);

rows.entry(table_name)
.or_insert_with(Vec::new)
.push(measurement_row);
let (table_name, measurement_block) =
model::measurements::extract_measurement_as_block(sample);
match table_blocks.entry(table_name) {
Entry::Vacant(entry) => {
entry.insert(measurement_block);
}
Entry::Occupied(mut entry) => entry
.get_mut()
.concat(measurement_block)
.expect("All blocks for a table must match"),
}

seen_timeseries.insert(key);
}

let new_schema = new_schema.into_values().collect();
UnrolledSampleRows { new_schema, rows }
UnrolledSampleRows { new_schema, blocks: table_blocks }
}

// Insert unrolled sample rows into the corresponding tables.
async fn insert_unrolled_samples(
&self,
rows: BTreeMap<String, Vec<String>>,
blocks: BTreeMap<String, Block>,
) -> Result<(), Error> {
for (table_name, rows) in rows {
let body = format!(
"INSERT INTO {table_name} FORMAT JSONEachRow\n{row_data}\n",
for (table_name, block) in blocks {
let n_rows = block.n_rows();
let query = format!(
"INSERT INTO {db_name}.{table_name} FORMAT Native",
db_name = crate::DATABASE_NAME,
table_name = table_name,
row_data = rows.join("\n")
);
// TODO-robustness We've verified the schema, so this is likely a transient failure.
// But we may want to check the actual error condition, and, if possible, continue
// inserting any remaining data.
self.execute(body).await?;
self.insert_native(&query, block).await?;
debug!(
self.log,
"inserted rows into table";
"n_rows" => rows.len(),
"table_name" => table_name,
"n_rows" => n_rows,
"table_name" => &table_name,
);
}

Expand Down
56 changes: 24 additions & 32 deletions oximeter/db/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ impl Client {
returned an empty data block",
))
})
.map(|block| block.n_rows > 0)
.map(|block| block.n_rows() > 0)
})
}

Expand Down Expand Up @@ -1161,17 +1161,6 @@ impl Client {
result
}

// Execute a generic SQL statement.
//
// TODO-robustness This currently does no validation of the statement.
async fn execute<S>(&self, sql: S) -> Result<(), Error>
where
S: Into<String>,
{
self.execute_with_body(sql).await?;
Ok(())
}

// Execute a generic SQL statement, awaiting the response as text
//
// TODO-robustness This currently does no validation of the statement.
Expand Down Expand Up @@ -1281,14 +1270,14 @@ impl Client {
trace!(self.log, "no new timeseries schema in database");
return Ok(());
};
if data.n_rows == 0 {
if data.n_rows() == 0 {
trace!(self.log, "no new timeseries schema in database");
return Ok(());
}
trace!(
self.log,
"retrieved new timeseries schema";
"n_schema" => data.n_rows,
"n_schema" => data.n_rows(),
);
for new_schema in TimeseriesSchema::from_block(data)?.into_iter() {
schema.insert(new_schema.timeseries_name.clone(), new_schema);
Expand Down Expand Up @@ -3492,41 +3481,44 @@ mod tests {
// Insert a record from this datum.
const TIMESERIES_NAME: &str = "foo:bar";
const TIMESERIES_KEY: u64 = 101;
let (measurement_table, inserted_row) =
crate::model::unroll_measurement_row_impl(
let (measurement_table, inserted_block) =
crate::model::measurements::extract_measurement_as_block_impl(
TIMESERIES_NAME.to_string(),
TIMESERIES_KEY,
&measurement,
);
let insert_sql = format!(
"INSERT INTO {measurement_table} FORMAT JSONEachRow {inserted_row}",
"INSERT INTO {}.{} FORMAT Native ",
crate::DATABASE_NAME,
measurement_table,
);
println!("Inserted row: {}", inserted_row);
println!("Expected measurement: {:#?}", measurement);
println!("Inserted block: {:#?}", inserted_block);
client
.execute_native(&insert_sql)
.insert_native(&insert_sql, inserted_block)
.await
.expect("Failed to insert measurement row");
.expect("Failed to insert measurement block");

// Select it exactly back out.
let select_sql = format!(
"SELECT * FROM {} WHERE timestamp = '{}' FORMAT {};",
"SELECT * FROM {}.{} WHERE timestamp = '{}'",
crate::DATABASE_NAME,
measurement_table,
measurement.timestamp().format(crate::DATABASE_TIMESTAMP_FORMAT),
crate::DATABASE_SELECT_FORMAT,
);
let body = client
.execute_with_body(select_sql)
let selected_block = client
.execute_with_block(&select_sql)
.await
.expect("Failed to select measurement row")
.1;
let (_, actual_row) = crate::model::parse_measurement_from_row(
&body,
measurement.datum_type(),
);
println!("Actual row: {actual_row:?}");
.data
.expect("Should have selected some data block");
let actual_measurements = Measurement::from_block(&selected_block)
.expect("Failed to extract measurement from block");
println!("Actual measurements: {actual_measurements:#?}");
assert_eq!(actual_measurements.len(), 1);
assert_eq!(
actual_row, measurement,
"Actual and expected measurement rows do not match"
actual_measurements[0], measurement,
"Actual and expected measurements do not match"
);
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion oximeter/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use std::path::PathBuf;
use thiserror::Error;

mod client;
pub mod model;
pub(crate) mod model;
pub mod native;
#[cfg(any(feature = "oxql", test))]
pub mod oxql;
Expand Down
Loading

0 comments on commit 9acd3a6

Please sign in to comment.