Skip to content

Commit

Permalink
Read / write timeseries schema with the native client
Browse files Browse the repository at this point in the history
- Add methods for converting an array of objects into a data block, or
  deserializing an array out of one.
- Use new to/from block methods to read and write timeseries schema in
  the native format
- Add native `Connection` method for inserting data in a block.
- Implement deserialization of the `TableColumns` server message type,
  which is sent to client when it tries to insert data, and is used to
  check that the block to be inserted matches the structure of the table
  into which the insertion is directed.
- Add a build script, which reads the `db-init.sql` file to extract the
  enum data type definitions into constants. This is used in the
  serialization of timeseries schema rows into a block.
- Closes #6942
  • Loading branch information
bnaecker committed Oct 30, 2024
1 parent 2c51979 commit b95c5bf
Show file tree
Hide file tree
Showing 22 changed files with 1,132 additions and 220 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions nexus/src/app/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ impl super::Nexus {
.timeseries_schema_list(&pagination.page, limit)
.await
.map_err(|e| match e {
oximeter_db::Error::DatabaseUnavailable(_) => {
oximeter_db::Error::DatabaseUnavailable(_)
| oximeter_db::Error::Connection(_) => {
Error::ServiceUnavailable {
internal_message: e.to_string(),
}
Expand Down Expand Up @@ -150,7 +151,8 @@ impl super::Nexus {
result.tables
})
.map_err(|e| match e {
oximeter_db::Error::DatabaseUnavailable(_) => {
oximeter_db::Error::DatabaseUnavailable(_)
| oximeter_db::Error::Connection(_) => {
Error::ServiceUnavailable {
internal_message: e.to_string(),
}
Expand Down
3 changes: 2 additions & 1 deletion nexus/src/app/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ pub(crate) async fn unassign_producer(

fn map_oximeter_err(error: oximeter_db::Error) -> Error {
match error {
oximeter_db::Error::DatabaseUnavailable(_) => {
oximeter_db::Error::DatabaseUnavailable(_)
| oximeter_db::Error::Connection(_) => {
Error::ServiceUnavailable { internal_message: error.to_string() }
}
_ => Error::InternalError { internal_message: error.to_string() },
Expand Down
7 changes: 7 additions & 0 deletions oximeter/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ camino.workspace = true
chrono.workspace = true
chrono-tz.workspace = true
clap.workspace = true
const_format.workspace = true
clickward.workspace = true
debug-ignore.workspace = true
dropshot.workspace = true
Expand All @@ -32,6 +33,7 @@ omicron-common.workspace = true
omicron-workspace-hack.workspace = true
oximeter.workspace = true
oxql-types.workspace = true
parse-display.workspace = true
qorb.workspace = true
regex.workspace = true
serde.workspace = true
Expand Down Expand Up @@ -93,6 +95,11 @@ optional = true
workspace = true
features = [ "rt-multi-thread", "macros" ]

[build-dependencies]
anyhow.workspace = true
nom.workspace = true
quote.workspace = true

[dev-dependencies]
camino-tempfile.workspace = true
criterion = { workspace = true, features = [ "async_tokio" ] }
Expand Down
100 changes: 100 additions & 0 deletions oximeter/db/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright 2024 Oxide Computer Company

use anyhow::Context as _;
use nom::IResult;

/// Build script for generating native type representations from the
/// ground-truth SQL definitions.
fn main() -> anyhow::Result<()> {
const INIT_FILE: &str =
concat!(env!("CARGO_MANIFEST_DIR"), "/schema/single-node/db-init.sql");
let contents = std::fs::read_to_string(INIT_FILE)
.with_context(|| format!("Failed to read SQL file: '{INIT_FILE}'"))?;
let field_type_enum =
find_enum(&contents, "type").context("failed to find column 'type'")?;
let field_source_enum = find_enum(&contents, "source")
.context("failed to find column 'source'")?;
let datum_type_enum = find_enum(&contents, "datum_type")
.context("failed to find column 'datum_type'")?;
std::fs::write(
format!("{}/enum_defs.rs", std::env::var("OUT_DIR")?),
[field_type_enum, field_source_enum, datum_type_enum].join("\n"),
)
.context("writing output file")?;
Ok(())
}

// Find an enum in the `timeseries_schema` table definition for the named
// column, and return the corresponding `DataType::Enum8()` definition for it.
fn find_enum(contents: &str, column: &str) -> Option<String> {
let needle = format!("{column} Enum(\n");
let start = contents.find(&needle)? + needle.len();
let s = &contents[start..].trim();
let (variants, names): (Vec<i8>, Vec<String>) =
variant_list(s).ok()?.1.into_iter().unzip();
let enum_map = quote::format_ident!("{}_ENUM_MAP", column.to_uppercase());
let enum_rev_map =
quote::format_ident!("{}_ENUM_REV_MAP", column.to_uppercase());
let enum_type =
quote::format_ident!("{}_ENUM_DATA_TYPE", column.to_uppercase());
let parsed_type = if column == "type" {
quote::quote! { ::oximeter::FieldType }
} else if column == "source" {
quote::quote! { ::oximeter::FieldSource }
} else if column == "datum_type" {
quote::quote! { ::oximeter::DatumType }
} else {
unreachable!();
};
Some(quote::quote! {
/// Mapping from the variant index to the string form.
#[allow(dead_code)]
static #enum_map: ::std::sync::LazyLock<::indexmap::IndexMap<i8, String>> = ::std::sync::LazyLock::new(|| {
::indexmap::IndexMap::from([
#((#variants, String::from(#names))),*
])
});
/// Reverse mapping, from the _parsed_ form to the variant index.
#[allow(dead_code)]
static #enum_rev_map: ::std::sync::LazyLock<::indexmap::IndexMap<#parsed_type, i8>> = ::std::sync::LazyLock::new(|| {
::indexmap::IndexMap::from([
#((<#parsed_type as ::std::str::FromStr>::from_str(#names).unwrap(), #variants)),*
])
});
/// Actual DataType::Enum8(_) with the contained variant-to-name mapping.
#[allow(dead_code)]
static #enum_type: ::std::sync::LazyLock<crate::native::block::DataType> = ::std::sync::LazyLock::new(|| {
crate::native::block::DataType::Enum8(
::indexmap::IndexMap::from([
#((#variants, String::from(#names))),*
])
)
});
}.to_string())
}

fn variant_list(s: &str) -> IResult<&str, Vec<(i8, String)>> {
nom::multi::separated_list1(
nom::bytes::complete::is_a(" ,\n"),
single_variant,
)(s)
}

fn single_variant(s: &str) -> IResult<&str, (i8, String)> {
nom::combinator::map(
nom::sequence::separated_pair(
nom::sequence::delimited(
nom::bytes::complete::tag("'"),
nom::character::complete::alphanumeric1,
nom::bytes::complete::tag("'"),
),
nom::bytes::complete::tag(" = "),
nom::character::complete::i8,
),
|(name, variant): (&str, i8)| (variant, name.to_string()),
)(s)
}
38 changes: 17 additions & 21 deletions oximeter/db/src/client/dbwrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@

use crate::client::Client;
use crate::model;
use crate::model::to_block::ToBlock as _;
use crate::Error;
use camino::Utf8PathBuf;
use oximeter::Sample;
use oximeter::TimeseriesName;
use oximeter::TimeseriesSchema;
use slog::debug;
use std::collections::BTreeMap;
use std::collections::BTreeSet;

#[derive(Debug)]
pub(super) struct UnrolledSampleRows {
/// The timeseries schema rows, keyed by timeseries name.
pub new_schema: BTreeMap<TimeseriesName, String>,
/// The timeseries schema rows.
pub new_schema: Vec<TimeseriesSchema>,
/// The rows to insert in all the other tables, keyed by the table name.
pub rows: BTreeMap<String, Vec<String>>,
}
Expand Down Expand Up @@ -182,14 +183,14 @@ impl Client {
continue;
}
Ok(None) => {}
Ok(Some((name, schema))) => {
Ok(Some(schema)) => {
debug!(
self.log,
"new timeseries schema";
"timeseries_name" => %name,
"schema" => %schema
"timeseries_name" => %schema.timeseries_name,
"schema" => ?schema,
);
new_schema.insert(name, schema);
new_schema.insert(schema.timeseries_name.clone(), schema);
}
}

Expand Down Expand Up @@ -217,6 +218,7 @@ impl Client {
seen_timeseries.insert(key);
}

let new_schema = new_schema.into_values().collect();
UnrolledSampleRows { new_schema, rows }
}

Expand Down Expand Up @@ -268,42 +270,36 @@ impl Client {
// receive a sample with a new schema, and both would then try to insert that schema.
pub(super) async fn save_new_schema_or_remove(
&self,
new_schema: BTreeMap<TimeseriesName, String>,
new_schema: Vec<TimeseriesSchema>,
) -> Result<(), Error> {
if !new_schema.is_empty() {
debug!(
self.log,
"inserting {} new timeseries schema",
new_schema.len()
);
const APPROX_ROW_SIZE: usize = 64;
let mut body = String::with_capacity(
APPROX_ROW_SIZE + APPROX_ROW_SIZE * new_schema.len(),
let body = const_format::formatcp!(
"INSERT INTO {}.timeseries_schema FORMAT Native",
crate::DATABASE_NAME
);
body.push_str("INSERT INTO ");
body.push_str(crate::DATABASE_NAME);
body.push_str(".timeseries_schema FORMAT JSONEachRow\n");
for row_data in new_schema.values() {
body.push_str(row_data);
body.push('\n');
}
let block = TimeseriesSchema::to_block(&new_schema)?;

// Try to insert the schema.
//
// If this fails, be sure to remove the schema we've added from the
// internal cache. Since we check the internal cache first for
// schema, if we fail here but _don't_ remove the schema, we'll
// never end up inserting the schema, but we will insert samples.
if let Err(e) = self.execute(body).await {
if let Err(e) = self.insert_native(&body, block).await {
debug!(
self.log,
"failed to insert new schema, removing from cache";
"error" => ?e,
);
let mut schema = self.schema.lock().await;
for name in new_schema.keys() {
for schema_to_remove in new_schema.iter() {
schema
.remove(name)
.remove(&schema_to_remove.timeseries_name)
.expect("New schema should have been cached");
}
return Err(e);
Expand Down
Loading

0 comments on commit b95c5bf

Please sign in to comment.