From b95c5bfe1d201aacbc2088cec3968335f24f6d44 Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Fri, 25 Oct 2024 11:55:43 -0700 Subject: [PATCH] Read / write timeseries schema with the native client - Add methods for converting an array of objects into a data block, or deserializing an array out of one. - Use new to/from block methods to read and write timeseries schema in the native format - Add native `Connection` method for inserting data in a block. - Implement deserialization of the `TableColumns` server message type, which is sent to client when it tries to insert data, and is used to check that the block to be inserted matches the structure of the table into which the insertion is directed. - Add a build script, which reads the `db-init.sql` file to extract the enum data type definitions into constants. This is used in the serialization of timeseries schema rows into a block. - Closes #6942 --- Cargo.lock | 4 + nexus/src/app/metrics.rs | 6 +- nexus/src/app/oximeter.rs | 3 +- oximeter/db/Cargo.toml | 7 + oximeter/db/build.rs | 100 ++++++ oximeter/db/src/client/dbwrite.rs | 38 ++- oximeter/db/src/client/mod.rs | 146 ++++----- oximeter/db/src/lib.rs | 14 +- oximeter/db/src/model/from_block.rs | 142 +++++++++ oximeter/db/src/{model.rs => model/mod.rs} | 13 +- oximeter/db/src/model/to_block.rs | 115 +++++++ oximeter/db/src/native/block.rs | 337 ++++++++++++++------- oximeter/db/src/native/connection.rs | 130 +++++++- oximeter/db/src/native/io/mod.rs | 1 + oximeter/db/src/native/io/packet/client.rs | 3 +- oximeter/db/src/native/io/packet/server.rs | 14 + oximeter/db/src/native/io/table_columns.rs | 202 ++++++++++++ oximeter/db/src/native/mod.rs | 26 +- oximeter/db/src/native/packets/server.rs | 26 +- oximeter/types/Cargo.toml | 1 + oximeter/types/src/schema.rs | 5 + oximeter/types/src/types.rs | 19 +- 22 files changed, 1132 insertions(+), 220 deletions(-) create mode 100644 oximeter/db/build.rs create mode 100644 oximeter/db/src/model/from_block.rs rename oximeter/db/src/{model.rs => model/mod.rs} (99%) create mode 100644 oximeter/db/src/model/to_block.rs create mode 100644 oximeter/db/src/native/io/table_columns.rs diff --git a/Cargo.lock b/Cargo.lock index bc5f53f759a..d3370fe34ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7502,6 +7502,7 @@ dependencies = [ "chrono-tz", "clap", "clickward", + "const_format", "criterion", "crossterm", "debug-ignore", @@ -7523,8 +7524,10 @@ dependencies = [ "oximeter", "oximeter-test-utils", "oxql-types", + "parse-display", "peg", "qorb", + "quote", "reedline", "regex", "reqwest 0.12.8", @@ -7669,6 +7672,7 @@ dependencies = [ "omicron-common", "omicron-workspace-hack", "oximeter-macro-impl", + "parse-display", "rand", "rand_distr", "regex", diff --git a/nexus/src/app/metrics.rs b/nexus/src/app/metrics.rs index ba76f87392e..40f7882281c 100644 --- a/nexus/src/app/metrics.rs +++ b/nexus/src/app/metrics.rs @@ -115,7 +115,8 @@ impl super::Nexus { .timeseries_schema_list(&pagination.page, limit) .await .map_err(|e| match e { - oximeter_db::Error::DatabaseUnavailable(_) => { + oximeter_db::Error::DatabaseUnavailable(_) + | oximeter_db::Error::Connection(_) => { Error::ServiceUnavailable { internal_message: e.to_string(), } @@ -150,7 +151,8 @@ impl super::Nexus { result.tables }) .map_err(|e| match e { - oximeter_db::Error::DatabaseUnavailable(_) => { + oximeter_db::Error::DatabaseUnavailable(_) + | oximeter_db::Error::Connection(_) => { Error::ServiceUnavailable { internal_message: e.to_string(), } diff --git a/nexus/src/app/oximeter.rs b/nexus/src/app/oximeter.rs index 6a4a81a47ae..708b0828c9b 100644 --- a/nexus/src/app/oximeter.rs +++ b/nexus/src/app/oximeter.rs @@ -241,7 +241,8 @@ pub(crate) async fn unassign_producer( fn map_oximeter_err(error: oximeter_db::Error) -> Error { match error { - oximeter_db::Error::DatabaseUnavailable(_) => { + oximeter_db::Error::DatabaseUnavailable(_) + | oximeter_db::Error::Connection(_) => { Error::ServiceUnavailable { internal_message: error.to_string() } } _ => Error::InternalError { internal_message: error.to_string() }, diff --git a/oximeter/db/Cargo.toml b/oximeter/db/Cargo.toml index b0afdfbb073..df4bc7d703c 100644 --- a/oximeter/db/Cargo.toml +++ b/oximeter/db/Cargo.toml @@ -17,6 +17,7 @@ camino.workspace = true chrono.workspace = true chrono-tz.workspace = true clap.workspace = true +const_format.workspace = true clickward.workspace = true debug-ignore.workspace = true dropshot.workspace = true @@ -32,6 +33,7 @@ omicron-common.workspace = true omicron-workspace-hack.workspace = true oximeter.workspace = true oxql-types.workspace = true +parse-display.workspace = true qorb.workspace = true regex.workspace = true serde.workspace = true @@ -93,6 +95,11 @@ optional = true workspace = true features = [ "rt-multi-thread", "macros" ] +[build-dependencies] +anyhow.workspace = true +nom.workspace = true +quote.workspace = true + [dev-dependencies] camino-tempfile.workspace = true criterion = { workspace = true, features = [ "async_tokio" ] } diff --git a/oximeter/db/build.rs b/oximeter/db/build.rs new file mode 100644 index 00000000000..b166c528976 --- /dev/null +++ b/oximeter/db/build.rs @@ -0,0 +1,100 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright 2024 Oxide Computer Company + +use anyhow::Context as _; +use nom::IResult; + +/// Build script for generating native type representations from the +/// ground-truth SQL definitions. +fn main() -> anyhow::Result<()> { + const INIT_FILE: &str = + concat!(env!("CARGO_MANIFEST_DIR"), "/schema/single-node/db-init.sql"); + let contents = std::fs::read_to_string(INIT_FILE) + .with_context(|| format!("Failed to read SQL file: '{INIT_FILE}'"))?; + let field_type_enum = + find_enum(&contents, "type").context("failed to find column 'type'")?; + let field_source_enum = find_enum(&contents, "source") + .context("failed to find column 'source'")?; + let datum_type_enum = find_enum(&contents, "datum_type") + .context("failed to find column 'datum_type'")?; + std::fs::write( + format!("{}/enum_defs.rs", std::env::var("OUT_DIR")?), + [field_type_enum, field_source_enum, datum_type_enum].join("\n"), + ) + .context("writing output file")?; + Ok(()) +} + +// Find an enum in the `timeseries_schema` table definition for the named +// column, and return the corresponding `DataType::Enum8()` definition for it. +fn find_enum(contents: &str, column: &str) -> Option { + let needle = format!("{column} Enum(\n"); + let start = contents.find(&needle)? + needle.len(); + let s = &contents[start..].trim(); + let (variants, names): (Vec, Vec) = + variant_list(s).ok()?.1.into_iter().unzip(); + let enum_map = quote::format_ident!("{}_ENUM_MAP", column.to_uppercase()); + let enum_rev_map = + quote::format_ident!("{}_ENUM_REV_MAP", column.to_uppercase()); + let enum_type = + quote::format_ident!("{}_ENUM_DATA_TYPE", column.to_uppercase()); + let parsed_type = if column == "type" { + quote::quote! { ::oximeter::FieldType } + } else if column == "source" { + quote::quote! { ::oximeter::FieldSource } + } else if column == "datum_type" { + quote::quote! { ::oximeter::DatumType } + } else { + unreachable!(); + }; + Some(quote::quote! { + /// Mapping from the variant index to the string form. + #[allow(dead_code)] + static #enum_map: ::std::sync::LazyLock<::indexmap::IndexMap> = ::std::sync::LazyLock::new(|| { + ::indexmap::IndexMap::from([ + #((#variants, String::from(#names))),* + ]) + }); + /// Reverse mapping, from the _parsed_ form to the variant index. + #[allow(dead_code)] + static #enum_rev_map: ::std::sync::LazyLock<::indexmap::IndexMap<#parsed_type, i8>> = ::std::sync::LazyLock::new(|| { + ::indexmap::IndexMap::from([ + #((<#parsed_type as ::std::str::FromStr>::from_str(#names).unwrap(), #variants)),* + ]) + }); + /// Actual DataType::Enum8(_) with the contained variant-to-name mapping. + #[allow(dead_code)] + static #enum_type: ::std::sync::LazyLock = ::std::sync::LazyLock::new(|| { + crate::native::block::DataType::Enum8( + ::indexmap::IndexMap::from([ + #((#variants, String::from(#names))),* + ]) + ) + }); + }.to_string()) +} + +fn variant_list(s: &str) -> IResult<&str, Vec<(i8, String)>> { + nom::multi::separated_list1( + nom::bytes::complete::is_a(" ,\n"), + single_variant, + )(s) +} + +fn single_variant(s: &str) -> IResult<&str, (i8, String)> { + nom::combinator::map( + nom::sequence::separated_pair( + nom::sequence::delimited( + nom::bytes::complete::tag("'"), + nom::character::complete::alphanumeric1, + nom::bytes::complete::tag("'"), + ), + nom::bytes::complete::tag(" = "), + nom::character::complete::i8, + ), + |(name, variant): (&str, i8)| (variant, name.to_string()), + )(s) +} diff --git a/oximeter/db/src/client/dbwrite.rs b/oximeter/db/src/client/dbwrite.rs index 3559374e0dd..d37d95deb9d 100644 --- a/oximeter/db/src/client/dbwrite.rs +++ b/oximeter/db/src/client/dbwrite.rs @@ -8,18 +8,19 @@ use crate::client::Client; use crate::model; +use crate::model::to_block::ToBlock as _; use crate::Error; use camino::Utf8PathBuf; use oximeter::Sample; -use oximeter::TimeseriesName; +use oximeter::TimeseriesSchema; use slog::debug; use std::collections::BTreeMap; use std::collections::BTreeSet; #[derive(Debug)] pub(super) struct UnrolledSampleRows { - /// The timeseries schema rows, keyed by timeseries name. - pub new_schema: BTreeMap, + /// The timeseries schema rows. + pub new_schema: Vec, /// The rows to insert in all the other tables, keyed by the table name. pub rows: BTreeMap>, } @@ -182,14 +183,14 @@ impl Client { continue; } Ok(None) => {} - Ok(Some((name, schema))) => { + Ok(Some(schema)) => { debug!( self.log, "new timeseries schema"; - "timeseries_name" => %name, - "schema" => %schema + "timeseries_name" => %schema.timeseries_name, + "schema" => ?schema, ); - new_schema.insert(name, schema); + new_schema.insert(schema.timeseries_name.clone(), schema); } } @@ -217,6 +218,7 @@ impl Client { seen_timeseries.insert(key); } + let new_schema = new_schema.into_values().collect(); UnrolledSampleRows { new_schema, rows } } @@ -268,7 +270,7 @@ impl Client { // receive a sample with a new schema, and both would then try to insert that schema. pub(super) async fn save_new_schema_or_remove( &self, - new_schema: BTreeMap, + new_schema: Vec, ) -> Result<(), Error> { if !new_schema.is_empty() { debug!( @@ -276,17 +278,11 @@ impl Client { "inserting {} new timeseries schema", new_schema.len() ); - const APPROX_ROW_SIZE: usize = 64; - let mut body = String::with_capacity( - APPROX_ROW_SIZE + APPROX_ROW_SIZE * new_schema.len(), + let body = const_format::formatcp!( + "INSERT INTO {}.timeseries_schema FORMAT Native", + crate::DATABASE_NAME ); - body.push_str("INSERT INTO "); - body.push_str(crate::DATABASE_NAME); - body.push_str(".timeseries_schema FORMAT JSONEachRow\n"); - for row_data in new_schema.values() { - body.push_str(row_data); - body.push('\n'); - } + let block = TimeseriesSchema::to_block(&new_schema)?; // Try to insert the schema. // @@ -294,16 +290,16 @@ impl Client { // internal cache. Since we check the internal cache first for // schema, if we fail here but _don't_ remove the schema, we'll // never end up inserting the schema, but we will insert samples. - if let Err(e) = self.execute(body).await { + if let Err(e) = self.insert_native(&body, block).await { debug!( self.log, "failed to insert new schema, removing from cache"; "error" => ?e, ); let mut schema = self.schema.lock().await; - for name in new_schema.keys() { + for schema_to_remove in new_schema.iter() { schema - .remove(name) + .remove(&schema_to_remove.timeseries_name) .expect("New schema should have been cached"); } return Err(e); diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index 625ed7fafbe..44e3c19089d 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -17,7 +17,9 @@ pub use self::dbwrite::DbWrite; pub use self::dbwrite::TestDbWrite; use crate::client::query_summary::QuerySummary; use crate::model; +use crate::model::from_block::FromBlock; use crate::native; +use crate::native::block::Block; use crate::native::block::ValueArray; use crate::native::QueryResult; use crate::query; @@ -428,7 +430,7 @@ impl Client { "FROM {}.timeseries_schema ", "ORDER BY timeseries_name ", "LIMIT {} ", - "FORMAT JSONEachRow;", + "FORMAT Native;", ), crate::DATABASE_NAME, limit.get(), @@ -441,7 +443,7 @@ impl Client { "WHERE timeseries_name > '{}' ", "ORDER BY timeseries_name ", "LIMIT {} ", - "FORMAT JSONEachRow;", + "FORMAT Native;", ), crate::DATABASE_NAME, last_timeseries, @@ -449,18 +451,19 @@ impl Client { ) } }; - let body = self.execute_with_body(sql).await?.1; - let schema = body - .lines() - .map(|line| { - TimeseriesSchema::from( - serde_json::from_str::(line) - .expect( - "Failed to deserialize TimeseriesSchema from database", - ), - ) - }) - .collect::>(); + let result = self.execute_with_block(&sql).await?; + let Some(block) = result.data.as_ref() else { + error!( + self.log, + "query listing timeseries schema did not contain \ + a data block" + ); + return Err(Error::Database(String::from( + "Query listing timeseries schema did not contain \ + any data from the database", + ))); + }; + let schema = TimeseriesSchema::from_block(block)?; ResultsPage::new(schema, &dropshot::EmptyScanParams {}, |schema, _| { schema.timeseries_name.clone() }) @@ -649,7 +652,7 @@ impl Client { "path" => path.display(), "filename" => &name, ); - match self.execute_native(sql).await { + match self.execute_native(&sql).await { Ok(_) => debug!( self.log, "successfully applied schema upgrade file"; @@ -857,11 +860,11 @@ impl Client { /// Read the latest version applied in the database. pub async fn read_latest_version(&self) -> Result { const ALIAS: &str = "max_version"; - let sql = format!( + const QUERY: &str = const_format::formatcp!( "SELECT MAX(value) AS {ALIAS} FROM {db_name}.version;", db_name = crate::DATABASE_NAME, ); - match self.execute_with_result_native(sql).await { + match self.execute_with_block(QUERY).await { Ok(result) => { let Some(data) = &result.data else { error!( @@ -957,13 +960,14 @@ impl Client { "INSERT INTO {db_name}.version (*) VALUES ({version}, now());", db_name = crate::DATABASE_NAME, ); - self.execute_native(sql).await + self.execute_native(&sql).await } /// Verifies if instance is part of oximeter_cluster pub async fn is_oximeter_cluster(&self) -> Result { - let sql = format!("SHOW CLUSTER {}", crate::CLUSTER_NAME); - self.execute_with_result_native(sql).await.and_then(|result| { + const QUERY: &str = + const_format::formatcp!("SHOW CLUSTER {}", crate::CLUSTER_NAME); + self.execute_with_block(QUERY).await.and_then(|result| { result .data .ok_or_else(|| { @@ -989,7 +993,7 @@ impl Client { async fn verify_or_cache_sample_schema( &self, sample: &Sample, - ) -> Result, Error> { + ) -> Result, Error> { let sample_schema = TimeseriesSchema::from(sample); let name = sample_schema.timeseries_name.clone(); let mut schema = self.schema.lock().await; @@ -1023,15 +1027,8 @@ impl Client { } } Entry::Vacant(entry) => { - let name = entry.key().clone(); entry.insert(sample_schema.clone()); - Ok(Some(( - name, - serde_json::to_string(&model::DbTimeseriesSchema::from( - sample_schema, - )) - .expect("Failed to convert schema to DB model"), - ))) + Ok(Some(sample_schema)) } } } @@ -1090,36 +1087,48 @@ impl Client { Ok(timeseries_by_key.into_values().collect()) } + // Insert data using the native TCP interface. + async fn insert_native( + &self, + sql: &str, + block: Block, + ) -> Result { + trace!( + self.log, + "executing SQL query"; + "sql" => sql, + ); + let mut handle = self.native_pool.claim().await?; + let id = usdt::UniqueId::new(); + probes::sql__query__start!(|| (&id, &sql)); + let result = handle.insert(sql, block).await.map_err(Error::from); + probes::sql__query__done!(|| (&id)); + result + } + // Execute a generic SQL statement, using the native TCP interface. - async fn execute_native(&self, sql: S) -> Result<(), Error> - where - S: Into, - { - self.execute_with_result_native(sql).await.map(|_| ()) + async fn execute_native(&self, sql: &str) -> Result<(), Error> { + self.execute_with_block(sql).await.map(|_| ()) } // Execute a generic SQL statement, returning the query result as a data // block. // // TODO-robustness This currently does no validation of the statement. - async fn execute_with_result_native( + async fn execute_with_block( &self, - sql: S, - ) -> Result - where - S: Into, - { - let sql = sql.into(); + sql: &str, + ) -> Result { trace!( self.log, "executing SQL query"; - "sql" => &sql, + "sql" => sql, ); let mut handle = self.native_pool.claim().await?; let id = usdt::UniqueId::new(); probes::sql__query__start!(|| (&id, &sql)); - let result = handle.query(sql.as_str()).await.map_err(Error::from); + let result = handle.query(sql).await.map_err(Error::from); probes::sql__query__done!(|| (&id)); result } @@ -1218,7 +1227,7 @@ impl Client { let sql = { if schema.is_empty() { format!( - "SELECT * FROM {db_name}.timeseries_schema FORMAT JSONEachRow;", + "SELECT * FROM {db_name}.timeseries_schema FORMAT Native;", db_name = crate::DATABASE_NAME, ) } else { @@ -1229,7 +1238,7 @@ impl Client { "FROM {db_name}.timeseries_schema ", "WHERE timeseries_name NOT IN ", "({current_keys}) ", - "FORMAT JSONEachRow;", + "FORMAT Native;", ), db_name = crate::DATABASE_NAME, current_keys = schema @@ -1240,21 +1249,22 @@ impl Client { ) } }; - let body = self.execute_with_body(sql).await?.1; - if body.is_empty() { + let body = self.execute_with_block(&sql).await?; + let Some(data) = body.data.as_ref() else { trace!(self.log, "no new timeseries schema in database"); - } else { - trace!(self.log, "extracting new timeseries schema"); - let new = body.lines().map(|line| { - let schema = TimeseriesSchema::from( - serde_json::from_str::(line) - .expect( - "Failed to deserialize TimeseriesSchema from database", - ), - ); - (schema.timeseries_name.clone(), schema) - }); - schema.extend(new); + return Ok(()); + }; + if data.is_empty() { + trace!(self.log, "no new timeseries schema in database"); + return Ok(()); + } + trace!( + self.log, + "retrieved new timeseries schema"; + "n_schema" => data.n_rows, + ); + for new_schema in TimeseriesSchema::from_block(data)?.into_iter() { + schema.insert(new_schema.timeseries_name.clone(), new_schema); } Ok(()) } @@ -1348,7 +1358,7 @@ impl Client { "table_name" => table, "n_timeseries" => chunk.len(), ); - self.execute_native(sql).await?; + self.execute_native(&sql).await?; } } Ok(()) @@ -3126,7 +3136,7 @@ mod tests { "INSERT INTO oximeter.{field_table} FORMAT JSONEachRow {row}" ); client - .execute_native(insert_sql) + .execute_native(&insert_sql) .await .expect("Failed to insert field row"); @@ -3137,7 +3147,7 @@ mod tests { crate::DATABASE_SELECT_FORMAT, ); let body = client - .execute_with_body(select_sql) + .execute_with_body(&select_sql) .await .expect("Failed to select field row") .1; @@ -3466,7 +3476,7 @@ mod tests { ); println!("Inserted row: {}", inserted_row); client - .execute_native(insert_sql) + .execute_native(&insert_sql) .await .expect("Failed to insert measurement row"); @@ -4037,11 +4047,11 @@ mod tests { // table, to version 2, which adds two columns to that table in // different SQL files. client - .execute_native(format!("CREATE DATABASE {test_name};")) + .execute_native(&format!("CREATE DATABASE {test_name};")) .await .unwrap(); client - .execute_native(format!( + .execute_native(&format!( "\ CREATE TABLE {test_name}.tbl (\ `col0` UInt8 \ @@ -4086,7 +4096,7 @@ mod tests { // Check that it actually worked! let body = client - .execute_with_body(format!( + .execute_with_body(&format!( "\ SELECT name, type FROM system.columns \ WHERE database = '{test_name}' AND table = 'tbl' \ @@ -4249,11 +4259,11 @@ mod tests { // modifications over two versions, rather than as multiple schema // upgrades in one version bump. client - .execute_native(format!("CREATE DATABASE {test_name};")) + .execute_native(&format!("CREATE DATABASE {test_name};")) .await .unwrap(); client - .execute_native(format!( + .execute_native(&format!( "\ CREATE TABLE {test_name}.tbl (\ `col0` UInt8 \ diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index 36fff6056fc..3712771595e 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -20,6 +20,8 @@ pub use oximeter::Field; pub use oximeter::FieldType; pub use oximeter::Measurement; pub use oximeter::Sample; +use parse_display::Display; +use parse_display::FromStr; use schemars::JsonSchema; use serde::Deserialize; use serde::Serialize; @@ -224,7 +226,17 @@ pub struct Timeseries { } #[derive( - Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Deserialize, + Serialize, + FromStr, + Display, )] pub enum DbFieldSource { Target, diff --git a/oximeter/db/src/model/from_block.rs b/oximeter/db/src/model/from_block.rs new file mode 100644 index 00000000000..83e0f0a0e74 --- /dev/null +++ b/oximeter/db/src/model/from_block.rs @@ -0,0 +1,142 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright 2024 Oxide Computer Company + +//! Trait for deserializing an array of values from a `Block`. + +use crate::native::block::Block; +use crate::native::block::DataType; +use crate::native::block::ValueArray; +use crate::native::Error; +use oximeter::AuthzScope; +use oximeter::FieldSchema; +use oximeter::TimeseriesDescription; +use oximeter::TimeseriesSchema; +use oximeter::Units; +use std::collections::BTreeSet; +use std::num::NonZeroU8; + +/// Trait for deserializing an array of items from a ClickHouse data block. +pub trait FromBlock: Sized { + /// Deserialize an array of `Self`s from a block. + fn from_block(block: &Block) -> Result, Error>; +} + +// TODO-cleanup: This is probably a good candidate for a derive-macro, which +// expands to the code that checks that names / types in the block match those +// of the fields in the struct itself. +impl FromBlock for TimeseriesSchema { + fn from_block(block: &Block) -> Result, Error> { + if block.is_empty() { + return Ok(vec![]); + } + let n_rows = + usize::try_from(block.n_rows).map_err(|_| Error::BlockTooLarge)?; + let mut out = Vec::with_capacity(n_rows); + let ValueArray::String(timeseries_names) = + block.column_values("timeseries_name")? + else { + return Err(Error::UnexpectedColumnType); + }; + let ValueArray::Array { + values: field_names, + inner_type: DataType::String, + } = block.column_values("fields.name")? + else { + return Err(Error::UnexpectedColumnType); + }; + let ValueArray::Array { + values: field_types, + inner_type: DataType::Enum8(field_type_variants), + } = block.column_values("fields.type")? + else { + return Err(Error::UnexpectedColumnType); + }; + let ValueArray::Array { + values: field_sources, + inner_type: DataType::Enum8(field_source_variants), + } = block.column_values("fields.source")? + else { + return Err(Error::UnexpectedColumnType); + }; + let ValueArray::Enum8 { + variants: datum_type_variants, + values: datum_types, + } = block.column_values("datum_type")? + else { + return Err(Error::UnexpectedColumnType); + }; + let ValueArray::DateTime64 { values: created, .. } = + block.column_values("created")? + else { + return Err(Error::UnexpectedColumnType); + }; + + for row in 0..n_rows { + let ValueArray::String(names) = &field_names[row] else { + unreachable!(); + }; + let ValueArray::Enum8 { values: row_field_types, .. } = + &field_types[row] + else { + unreachable!(); + }; + let ValueArray::Enum8 { values: row_field_sources, .. } = + &field_sources[row] + else { + unreachable!(); + }; + let mut field_schema = BTreeSet::new(); + let n_fields = names.len(); + for field in 0..n_fields { + let schema = FieldSchema { + name: names[field].clone(), + field_type: field_type_variants[&row_field_types[field]] + .parse() + .map_err(|_| { + Error::Serde(format!( + "Failed to deserialize field type from database: {:?}", + field_type_variants[&row_field_types[field]] + )) + })?, + source: field_source_variants[&row_field_sources[field]] + .parse() + .map_err(|_| { + Error::Serde(format!( + "Failed to deserialize field source from database: {:?}", + field_source_variants[&row_field_sources[field]])) + })?, + description: String::new(), + }; + field_schema.insert(schema); + } + let schema = TimeseriesSchema { + timeseries_name: + timeseries_names[row].clone().parse().map_err(|_| { + Error::Serde(format!( + "Failed to deserialize timeseries name from database: {:?}", + ×eries_names[row] + )) + })?, + description: TimeseriesDescription::default(), + field_schema, + datum_type: datum_type_variants[&datum_types[row]] + .parse() + .map_err(|_| { + Error::Serde(format!( + "Failed to deserialize datum type from database: {:?}", + &datum_type_variants[&datum_types[row]] + )) + })?, + version: unsafe { NonZeroU8::new_unchecked(1) }, + authz_scope: AuthzScope::Fleet, + units: Units::None, + created: created[row].to_utc(), + }; + out.push(schema); + } + Ok(out) + } +} diff --git a/oximeter/db/src/model.rs b/oximeter/db/src/model/mod.rs similarity index 99% rename from oximeter/db/src/model.rs rename to oximeter/db/src/model/mod.rs index d57819b0d01..eba2333ae5e 100644 --- a/oximeter/db/src/model.rs +++ b/oximeter/db/src/model/mod.rs @@ -29,6 +29,8 @@ use oximeter::types::Measurement; use oximeter::types::MissingDatum; use oximeter::types::Sample; use oximeter::Quantile; +use parse_display::Display; +use parse_display::FromStr; use serde::Deserialize; use serde::Serialize; use std::collections::BTreeMap; @@ -38,6 +40,9 @@ use std::net::IpAddr; use std::net::Ipv6Addr; use uuid::Uuid; +pub mod from_block; +pub mod to_block; + /// Describes the version of the Oximeter database. /// /// For usage and details see: @@ -170,7 +175,9 @@ impl From for DbTimeseriesSchema { } } -#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] +#[derive( + Clone, Copy, Debug, Serialize, Deserialize, PartialEq, FromStr, Display, +)] pub enum DbFieldType { String, I8, @@ -223,7 +230,9 @@ impl From for DbFieldType { } } -#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] +#[derive( + Clone, Copy, Debug, Serialize, Deserialize, PartialEq, FromStr, Display, +)] pub enum DbDatumType { Bool, I8, diff --git a/oximeter/db/src/model/to_block.rs b/oximeter/db/src/model/to_block.rs new file mode 100644 index 00000000000..5a6bb917668 --- /dev/null +++ b/oximeter/db/src/model/to_block.rs @@ -0,0 +1,115 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright 2024 Oxide Computer Company + +//! Trait for serializing an array of values into a `Block`. + +use crate::native::block::Block; +use crate::native::block::Column; +use crate::native::block::DataType; +use crate::native::block::Precision; +use crate::native::block::ValueArray; +use crate::native::Error; +use chrono::TimeZone as _; +use chrono_tz::Tz; +use indexmap::IndexMap; +use oximeter::TimeseriesSchema; + +include!(concat!(env!("OUT_DIR"), "/enum_defs.rs")); + +/// Trait for serializing an array of items to a ClickHouse data block. +pub trait ToBlock: Sized { + /// Serialize an array of `Self`s to a block. + fn to_block(items: &[Self]) -> Result; +} + +// TODO-cleanup: This is probably a good candidate for a derive-macro, which +// expands to the code that checks that names / types in the block match those +// of the fields in the struct itself. +impl ToBlock for TimeseriesSchema { + fn to_block(items: &[Self]) -> Result { + let n_items = items.len(); + let mut timeseries_names = Vec::with_capacity(n_items); + let mut field_names = Vec::with_capacity(n_items); + let mut field_types = Vec::with_capacity(n_items); + let mut field_sources = Vec::with_capacity(n_items); + let mut datum_types = Vec::with_capacity(n_items); + let mut created = Vec::with_capacity(n_items); + for item in items.iter() { + timeseries_names.push(item.timeseries_name.to_string()); + let n_fields = item.field_schema.len(); + let mut row_field_names = Vec::with_capacity(n_fields); + let mut row_field_types = Vec::with_capacity(n_fields); + let mut row_field_sources = Vec::with_capacity(n_fields); + for field in item.field_schema.iter() { + row_field_names.push(field.name.clone()); + let ty = TYPE_ENUM_REV_MAP.get(&field.field_type).unwrap(); + row_field_types.push(*ty); + let src = SOURCE_ENUM_REV_MAP.get(&field.source).unwrap(); + row_field_sources.push(*src); + } + field_names.push(ValueArray::String(row_field_names)); + field_types.push(ValueArray::Enum8 { + variants: TYPE_ENUM_MAP.clone(), + values: row_field_types, + }); + field_sources.push(ValueArray::Enum8 { + variants: SOURCE_ENUM_MAP.clone(), + values: row_field_sources, + }); + datum_types + .push(*DATUM_TYPE_ENUM_REV_MAP.get(&item.datum_type).unwrap()); + created.push(Tz::UTC.from_utc_datetime(&item.created.naive_utc())); + } + Ok(Block { + name: String::new(), + info: Default::default(), + n_columns: 6, + n_rows: u64::try_from(n_items).map_err(|_| Error::BlockTooLarge)?, + columns: IndexMap::from([ + ( + String::from("timeseries_name"), + Column::from(ValueArray::String(timeseries_names)), + ), + ( + String::from("fields.name"), + Column::from(ValueArray::Array { + inner_type: DataType::String, + values: field_names, + }), + ), + ( + String::from("fields.type"), + Column::from(ValueArray::Array { + inner_type: TYPE_ENUM_DATA_TYPE.clone(), + values: field_types, + }), + ), + ( + String::from("fields.source"), + Column::from(ValueArray::Array { + inner_type: SOURCE_ENUM_DATA_TYPE.clone(), + values: field_sources, + }), + ), + ( + String::from("datum_type"), + Column::from(ValueArray::Enum8 { + variants: DATUM_TYPE_ENUM_MAP.clone(), + values: datum_types, + }), + ), + ( + String::from("created"), + Column::from(ValueArray::DateTime64 { + values: created, + precision: Precision::new(9).unwrap(), + tz: Tz::UTC, + }), + ), + ]), + }) + } +} diff --git a/oximeter/db/src/native/block.rs b/oximeter/db/src/native/block.rs index 10727b65322..4b0405e0236 100644 --- a/oximeter/db/src/native/block.rs +++ b/oximeter/db/src/native/block.rs @@ -6,20 +6,28 @@ //! Types for working with actual blocks and columns of data. +use super::packets::server::ColumnDescription; use super::Error; use chrono::DateTime; use chrono::NaiveDate; use chrono_tz::Tz; use indexmap::IndexMap; +use nom::branch::alt; use nom::bytes::complete::tag; use nom::bytes::complete::take_while1; +use nom::character::complete::alphanumeric1; +use nom::character::complete::i8 as nom_i8; use nom::character::complete::u8 as nom_u8; +use nom::combinator::all_consuming; use nom::combinator::eof; use nom::combinator::map; use nom::combinator::map_opt; use nom::combinator::opt; +use nom::combinator::value; +use nom::multi::separated_list1; use nom::sequence::delimited; use nom::sequence::preceded; +use nom::sequence::separated_pair; use nom::sequence::tuple; use nom::IResult; use std::fmt; @@ -67,26 +75,15 @@ impl Block { self.n_columns == 0 && self.n_rows == 0 } - /// Create an empty block with the provided column names and types - pub fn empty<'a>( - types: impl IntoIterator, - ) -> Result { - let mut columns = IndexMap::new(); - let mut n_columns = 0; - for (name, type_) in types.into_iter() { - if !type_.is_supported() { - return Err(Error::UnsupportedDataType(type_.to_string())); - } - n_columns += 1; - columns.insert(name.to_string(), Column::empty(type_)); - } - Ok(Self { + /// Create an empty block. + pub fn empty() -> Self { + Self { name: String::new(), info: BlockInfo::default(), - n_columns, + n_columns: 0, n_rows: 0, - columns, - }) + columns: IndexMap::new(), + } } /// Concatenate this data block with another. @@ -106,7 +103,10 @@ impl Block { Ok(()) } - fn matches_structure(&self, block: &Block) -> bool { + /// Return true if this block matches the structure of the other. + /// + /// This means it has the same column names and types. + pub fn matches_structure(&self, block: &Block) -> bool { if self.n_columns != block.n_columns { return false; } @@ -120,6 +120,32 @@ impl Block { } true } + + /// Return the values of the named column, if it exists. + pub fn column_values(&self, name: &str) -> Result<&ValueArray, Error> { + self.columns + .get(name) + .map(|col| &col.values) + .ok_or_else(|| Error::NoSuchColumn(name.to_string())) + } + + pub(crate) fn matches_table_description( + &self, + columns: &[ColumnDescription], + ) -> bool { + if self.n_columns != columns.len() as u64 { + return false; + } + for (our_col, their_col) in self.columns.iter().zip(columns.iter()) { + if our_col.0 != &their_col.name { + return false; + } + if our_col.1.data_type != their_col.data_type { + return false; + } + } + true + } } /// Details about the block. @@ -177,6 +203,13 @@ pub struct Column { pub data_type: DataType, } +impl From for Column { + fn from(values: ValueArray) -> Self { + let data_type = values.data_type(); + Self { values, data_type } + } +} + impl Column { /// Create an empty column of the provided type. pub fn empty(data_type: DataType) -> Self { @@ -194,6 +227,16 @@ impl Column { self.values.concat(rhs.values); Ok(()) } + + /// Return true if the column is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Return the number of elements in the column. + pub fn len(&self) -> usize { + self.values.len() + } } /// An array of singly-typed data values from the server. @@ -252,7 +295,7 @@ impl ValueArray { } /// Return an empty value array of the provided type. - fn empty(data_type: &DataType) -> ValueArray { + pub fn empty(data_type: &DataType) -> ValueArray { match data_type { DataType::UInt8 => ValueArray::UInt8(vec![]), DataType::UInt16 => ValueArray::UInt16(vec![]), @@ -589,6 +632,39 @@ impl DataType { pub(crate) fn is_nullable(&self) -> bool { matches!(self, DataType::Nullable(_)) } + + /// Parse out a data type from a string. + /// + /// This is a `nom`-based function, so that the method can be used in other + /// contexts. The `DataType::from_str()` implementation is a thin wrapper + /// around this. + pub(super) fn nom_parse(s: &str) -> IResult<&str, Self> { + alt(( + value(DataType::UInt8, tag("UInt8")), + value(DataType::UInt16, tag("UInt16")), + value(DataType::UInt32, tag("UInt32")), + value(DataType::UInt64, tag("UInt64")), + value(DataType::UInt128, tag("UInt128")), + value(DataType::Int8, tag("Int8")), + value(DataType::Int16, tag("Int16")), + value(DataType::Int32, tag("Int32")), + value(DataType::Int64, tag("Int64")), + value(DataType::Int128, tag("Int128")), + value(DataType::Float32, tag("Float32")), + value(DataType::Float64, tag("Float64")), + value(DataType::String, tag("String")), + value(DataType::Uuid, tag("UUID")), + value(DataType::Ipv4, tag("IPv4")), + value(DataType::Ipv6, tag("IPv6")), + // IMPORTANT: This needs to consume all its input, otherwise we may + // parse something like `DateTime(UTC)` as `Date`, which is + // incorrect. + value(DataType::Date, all_consuming(tag("Date"))), + // These need to be nested because `alt` supports a max of 21 + // parsers, and we have 22 data types. + alt((datetime, datetime64, enum8, nullable, array)), + ))(s) + } } impl fmt::Display for DataType { @@ -632,9 +708,23 @@ impl fmt::Display for DataType { } // Parse a quoted timezone, like `'UTC'` or `'America/Los_Angeles'` +// +// Note that the quotes may optionally be escaped, like `\'UTC\'`, which is +// needed to support deserializing table descriptions, where the types for each +// column are serialized as an escaped string. fn quoted_timezone(s: &str) -> IResult<&str, Tz> { map( - delimited(tag("'"), take_while1(|c| c != '\''), tag("'")), + delimited( + preceded(opt(tag("\\")), tag("'")), + take_while1(|c: char| { + c.is_ascii_alphanumeric() + || c == '/' + || c == '+' + || c == '-' + || c == '_' + }), + preceded(opt(tag("\\")), tag("'")), + ), parse_timezone, )(s) } @@ -687,104 +777,68 @@ fn parse_timezone(s: &str) -> Tz { s.parse().unwrap_or_else(|_| *DEFAULT_TIMEZONE) } -impl std::str::FromStr for DataType { - type Err = Error; - - fn from_str(s: &str) -> Result { - // Simple scalar types. - if s == "UInt8" { - return Ok(DataType::UInt8); - } else if s == "UInt16" { - return Ok(DataType::UInt16); - } else if s == "UInt32" { - return Ok(DataType::UInt32); - } else if s == "UInt64" { - return Ok(DataType::UInt64); - } else if s == "UInt128" { - return Ok(DataType::UInt128); - } else if s == "Int8" { - return Ok(DataType::Int8); - } else if s == "Int16" { - return Ok(DataType::Int16); - } else if s == "Int32" { - return Ok(DataType::Int32); - } else if s == "Int64" { - return Ok(DataType::Int64); - } else if s == "Int128" { - return Ok(DataType::Int128); - } else if s == "Float32" { - return Ok(DataType::Float32); - } else if s == "Float64" { - return Ok(DataType::Float64); - } else if s == "String" { - return Ok(DataType::String); - } else if s == "UUID" { - return Ok(DataType::Uuid); - } else if s == "IPv4" { - return Ok(DataType::Ipv4); - } else if s == "IPv6" { - return Ok(DataType::Ipv6); - } else if s == "Date" { - return Ok(DataType::Date); - } - - // Check for datetime, possibly with a timezone. - if let Ok((_, dt)) = datetime(s) { - return Ok(dt); - }; +/// Parse an enum variant name. +fn variant_name(s: &str) -> IResult<&str, &str> { + delimited( + preceded(opt(tag("\\")), tag("'")), + alphanumeric1, + preceded(opt(tag("\\")), tag("'")), + )(s) +} - // Check for DateTime64 with precision, and possibly a timezone. - if let Ok((_, dt)) = datetime64(s) { - return Ok(dt); - }; +/// Parse a single enum variant, like `'Foo' = 1`. +/// +/// Note that the single-quotes may be escaped, which is required for parsing +/// the `ColumnDescription` type from a `TableColumns` server packet. +fn enum_variant(s: &str) -> IResult<&str, (i8, &str)> { + map(separated_pair(variant_name, tag(" = "), nom_i8), |(name, variant)| { + (variant, name) + })(s) +} - // Check for Enum8s. - // - // These are written like "Enum8('foo' = 1, 'bar' = 2)" - if let Some(suffix) = s.strip_prefix("Enum8(") { - let Some(inner) = suffix.strip_suffix(")") else { - return Err(Error::UnsupportedDataType(s.to_string())); - }; +/// Parse an `Enum8` data type from a string. +pub(super) fn enum8(s: &str) -> IResult<&str, DataType> { + map( + delimited( + tag("Enum8("), + separated_list1(tag(", "), enum_variant), + tag(")"), + ), + |variants| { let mut map = IndexMap::new(); - for each in inner.split(',') { - let Some((name, value)) = each.split_once(" = ") else { - return Err(Error::UnsupportedDataType(s.to_string())); - }; - let Ok(value) = value.parse() else { - return Err(Error::UnsupportedDataType(s.to_string())); - }; - // Trim whitespace from the name and strip any single-quotes. - let name = name.trim().trim_matches('\'').to_string(); - map.insert(value, name.to_string()); - } - return Ok(DataType::Enum8(map)); - } + for (variant, name) in variants.into_iter() { + map.insert(variant, name.to_string()); + } + DataType::Enum8(map) + }, + )(s) +} - // Recurse for nullable types. - if let Some(suffix) = s.strip_prefix("Nullable(") { - let Some(inner) = suffix.strip_suffix(')') else { - return Err(Error::UnsupportedDataType(s.to_string())); - }; - return inner - .parse() - .map(|inner| DataType::Nullable(Box::new(inner))); - } +fn nullable(s: &str) -> IResult<&str, DataType> { + map(delimited(tag("Nullable("), DataType::nom_parse, tag(")")), |inner| { + DataType::Nullable(Box::new(inner)) + })(s) +} - // And for arrays. - if let Some(suffix) = s.strip_prefix("Array(") { - let Some(inner) = suffix.strip_suffix(')') else { - return Err(Error::UnsupportedDataType(s.to_string())); - }; - return inner.parse().map(|inner| DataType::Array(Box::new(inner))); - } +fn array(s: &str) -> IResult<&str, DataType> { + map(delimited(tag("Array("), DataType::nom_parse, tag(")")), |inner| { + DataType::Array(Box::new(inner)) + })(s) +} + +impl std::str::FromStr for DataType { + type Err = Error; - // Anything else is unsupported for now. - Err(Error::UnsupportedDataType(s.to_string())) + fn from_str(s: &str) -> Result { + Self::nom_parse(s) + .map(|(_, parsed)| parsed) + .map_err(|_| Error::UnsupportedDataType(s.to_string())) } } #[cfg(test)] mod tests { + use super::enum8; use super::Block; use super::BlockInfo; use super::Column; @@ -794,6 +848,8 @@ mod tests { use super::DEFAULT_TIMEZONE; use crate::native::block::datetime; use crate::native::block::datetime64; + use crate::native::block::enum_variant; + use crate::native::block::quoted_timezone; use chrono::SubsecRound as _; use chrono::Utc; use chrono_tz::Tz; @@ -933,6 +989,14 @@ mod tests { assert!(datetime64("DateTime64(1,'UTC')").is_err()); } + #[test] + fn parse_escaped_date_time64() { + assert_eq!( + DataType::DateTime64(Precision(1), Tz::UTC), + datetime64(r#"DateTime64(1, \'UTC\')"#).unwrap().1 + ); + } + #[test] fn concat_blocks() { let data = vec![0, 1]; @@ -955,4 +1019,63 @@ mod tests { ValueArray::UInt64([data.as_slice(), data.as_slice()].concat()) ); } + + #[test] + fn test_parse_enum_variant() { + assert_eq!(enum_variant("'Foo' = 1'").unwrap().1, (1, "Foo"),); + assert_eq!(enum_variant("\\'Foo\\' = 1'").unwrap().1, (1, "Foo"),); + + enum_variant("'Foo'").unwrap_err(); + enum_variant("'Foo' = ").unwrap_err(); + enum_variant("'Foo' = x").unwrap_err(); + enum_variant("\"Foo\" = 1").unwrap_err(); + } + + #[test] + fn test_parse_enum8() { + let parsed = enum8("Enum8('Foo' = 1, 'Bar' = 2)").unwrap().1; + let DataType::Enum8(map) = parsed else { + panic!("Expected DataType::Enum8, found {parsed:#?}"); + }; + assert_eq!(map.len(), 2); + assert_eq!(map.get(&1).unwrap(), "Foo"); + assert_eq!(map.get(&2).unwrap(), "Bar"); + } + + #[test] + fn test_parse_array_enum8_with_escapes() { + const INPUT: &str = r#"Array(Enum8(\'Bool\' = 1, \'I64\' = 2))"#; + let parsed = DataType::nom_parse(INPUT).unwrap().1; + let DataType::Array(inner) = parsed else { + panic!("Expected a `DataType::Array(_)`, found {parsed:#?}"); + }; + let DataType::Enum8(map) = &*inner else { + panic!("Expected a `DataType::Enum8(_)`, found {inner:#?}"); + }; + assert_eq!(map.len(), 2); + assert_eq!(map.get(&1).unwrap(), "Bool"); + assert_eq!(map.get(&2).unwrap(), "I64"); + } + + #[test] + fn test_parse_all_known_timezones() { + for tz in chrono_tz::TZ_VARIANTS.iter() { + let quoted = format!("'{}'", tz); + let Ok(out) = quoted_timezone("ed) else { + panic!("Failed to parse quoted timezone: {quoted}"); + }; + assert_eq!(&out.1, tz, "Failed to parse quoted timezone: {quoted}"); + + let escape_quoted = format!("\\'{}\\'", tz); + let Ok(out) = quoted_timezone(&escape_quoted) else { + panic!( + "Failed to parse escaped quoted timezone: {escape_quoted}" + ); + }; + assert_eq!( + &out.1, tz, + "Failed to parse escaped quoted timezone: {escape_quoted}" + ); + } + } } diff --git a/oximeter/db/src/native/connection.rs b/oximeter/db/src/native/connection.rs index 911788a91fe..e5e353d1f2f 100644 --- a/oximeter/db/src/native/connection.rs +++ b/oximeter/db/src/native/connection.rs @@ -6,6 +6,7 @@ //! A connection and pool for talking to the ClickHouse server. +use super::block::Block; use super::io::packet::client::Encoder; use super::io::packet::server::Decoder; use super::packets::client::Packet as ClientPacket; @@ -208,8 +209,34 @@ impl Connection { Ok(false) } - /// Send a SQL query, possibly with data. + /// Send a SQL query that inserts data. + pub async fn insert( + &mut self, + query: &str, + block: Block, + ) -> Result { + self.query_inner(query, Some(block)).await + } + + /// Send a SQL query, without any data. pub async fn query(&mut self, query: &str) -> Result { + self.query_inner(query, None).await + } + + // Send a SQL query, possibly with data. + // + // If data is present, it is sent after the SQL query itself. An error is + // returned if the server indicates that the block structure required by the + // insert query doesn't match that of the provided block. + // + // IMPORTANT: We do not currently validate that data is provided iff the + // query is an INSERT statement! Callers are required to ensure that they + // provide data if and only if the query requires it. + async fn query_inner( + &mut self, + query: &str, + maybe_data: Option, + ) -> Result { let mut query_result = QueryResult { id: Uuid::new_v4(), progress: Progress::default(), @@ -221,6 +248,101 @@ impl Connection { self.writer.send(ClientPacket::Query(query)).await?; probes::packet__sent!(|| "Query"); self.outstanding_query = true; + + // If we have data to send, wait for the server to send an empty block + // that describes its structure. + if let Some(block_to_insert) = maybe_data { + let res: Result<(), Error> = loop { + match self.reader.next().await { + Some(Ok(packet)) => match packet { + ServerPacket::Hello(_) => { + probes::unexpected__server__packet!(|| "Hello"); + break Err(Error::UnexpectedPacket("Hello")); + } + ServerPacket::Data(block) => { + probes::data__packet__received!(|| { + ( + block.n_columns, + block.n_rows, + block + .columns + .iter() + .map(|(name, col)| { + ( + name.clone(), + col.data_type.to_string(), + ) + }) + .collect::>(), + ) + }); + // Similar to when selecting data, the server sends + // an empty block (zero rows) that describes the + // table structure. Any block we receive should be + // empty here. + if block.n_rows != 0 { + break Err(Error::ExpectedEmptyDataBlock); + } + // Don't concatenate the block, but check that it's + // structure matches what we're about to insert. + if !block.matches_structure(&block_to_insert) { + break Err(Error::MismatchedBlockStructure); + } + if let Err(e) = self + .writer + .send(ClientPacket::Data(block_to_insert)) + .await + { + break Err(e); + } + break self + .writer + .send(ClientPacket::Data(Block::empty())) + .await; + } + ServerPacket::Exception(exceptions) => { + break Err(Error::Exception { exceptions }) + } + ServerPacket::Progress(progress) => { + query_result.progress += progress + } + ServerPacket::Pong => { + probes::unexpected__server__packet!(|| "Hello"); + break Err(Error::UnexpectedPacket("Pong")); + } + ServerPacket::EndOfStream => { + // The server should only send this after we've + // inserted our data. + probes::unexpected__server__packet!(|| { + "EndOfStream" + }); + break Err(Error::UnexpectedPacket("EndOfStream")); + } + ServerPacket::ProfileInfo(info) => { + let _ = query_result.profile_info.replace(info); + } + ServerPacket::TableColumns(columns) => { + if !block_to_insert + .matches_table_description(&columns) + { + break Err(Error::MismatchedBlockStructure); + } + } + ServerPacket::ProfileEvents(block) => { + let _ = query_result.profile_events.replace(block); + } + }, + Some(Err(e)) => break Err(e), + None => break Err(Error::Disconnected), + } + }; + if let Err(e) = res { + self.outstanding_query = false; + return Err(e); + } + } + + // Now wait for the remainder of the query to execute. let res = loop { match self.reader.next().await { Some(Ok(packet)) => match packet { @@ -264,13 +386,17 @@ impl Connection { query_result.progress += progress } ServerPacket::Pong => { - probes::unexpected__server__packet!(|| "Hello"); + probes::unexpected__server__packet!(|| "Pong"); break Err(Error::UnexpectedPacket("Pong")); } ServerPacket::EndOfStream => break Ok(query_result), ServerPacket::ProfileInfo(info) => { let _ = query_result.profile_info.replace(info); } + ServerPacket::TableColumns(_) => { + probes::unexpected__server__packet!(|| "TableColumns"); + break Err(Error::UnexpectedPacket("TableColumns")); + } ServerPacket::ProfileEvents(block) => { let _ = query_result.profile_events.replace(block); } diff --git a/oximeter/db/src/native/io/mod.rs b/oximeter/db/src/native/io/mod.rs index 999e90f4f6d..1d54b13969a 100644 --- a/oximeter/db/src/native/io/mod.rs +++ b/oximeter/db/src/native/io/mod.rs @@ -13,4 +13,5 @@ pub mod packet; pub mod profile_info; pub mod progress; pub mod string; +pub mod table_columns; pub mod varuint; diff --git a/oximeter/db/src/native/io/packet/client.rs b/oximeter/db/src/native/io/packet/client.rs index c8397d68a29..78e9fd09df3 100644 --- a/oximeter/db/src/native/io/packet/client.rs +++ b/oximeter/db/src/native/io/packet/client.rs @@ -67,8 +67,7 @@ impl Encoder { io::string::encode("", &mut dst); // Send an empty block to signal the end of data transfer. - self.encode_block(Block::empty(std::iter::empty()).unwrap(), &mut dst) - .unwrap(); + self.encode_block(Block::empty(), &mut dst).unwrap(); } /// Encode a ClientInfo into the buffer. diff --git a/oximeter/db/src/native/io/packet/server.rs b/oximeter/db/src/native/io/packet/server.rs index 0ef6d96d4be..1fd9e2cac45 100644 --- a/oximeter/db/src/native/io/packet/server.rs +++ b/oximeter/db/src/native/io/packet/server.rs @@ -161,6 +161,20 @@ impl tokio_util::codec::Decoder for Decoder { }; Packet::ProfileInfo(info) } + Packet::TABLE_COLUMNS => { + match io::table_columns::decode(&mut buf) { + Ok(Some(columns)) => Packet::TableColumns(columns), + Ok(None) => return Ok(None), + Err(e) => { + probes::invalid__packet!(|| ( + "TableColumns", + src.len() + )); + src.clear(); + return Err(e); + } + } + } Packet::PROFILE_EVENTS => match io::block::decode(&mut buf) { // Profile events are encoded as a data block. Ok(Some(block)) => Packet::ProfileEvents(block), diff --git a/oximeter/db/src/native/io/table_columns.rs b/oximeter/db/src/native/io/table_columns.rs new file mode 100644 index 00000000000..7fb8ad55eb9 --- /dev/null +++ b/oximeter/db/src/native/io/table_columns.rs @@ -0,0 +1,202 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright 2024 Oxide Computer Company + +//! Decoding table column descriptions from the database. +//! +//! As is helpfully noted in the ClickHouse sources at +//! , +//! +//! > NOTE: Serialization format is insane. +//! +//! The server serializes an array of `ColumnDescription`s to help the client +//! validate data before inserting it. This is a pure-text format that includes +//! column names and types, but also comments, codecs, and TTL values. The +//! format is indeed pretty wonky, so we're using `nom` to help parse it +//! robustly. For now, we only care about the names and data types; the +//! remainder is collected as-is, without interpretation, into a generic +//! `details` field. + +use super::string; +use crate::native::block::DataType; +use crate::native::packets::server::ColumnDescription; +use crate::native::Error; +use nom::bytes::streaming::is_not; +use nom::bytes::streaming::tag; +use nom::character::streaming::u64 as nom_u64; +use nom::error::ErrorKind; +use nom::sequence::delimited; +use nom::sequence::separated_pair; +use nom::sequence::terminated; +use nom::Err as NomErr; +use nom::IResult; + +/// Decode an array of `ColumnDescription`s from a buffer. +pub fn decode( + src: &mut &[u8], +) -> Result>, Error> { + // See `src/Storages/ColumnDescription.cpp` for details of the encoding + // here, but briefly: + // + // - The packet starts with a "header" describing the table name (usually + // empty), the version, the number of columns. + // - Each column is serialized as a string, with its name; type; comment; + // compression codec; settings; statistics; and TTL + // - These are all newline delimited, with tabs prefixing each of the items + // listed above. + // + // See https://github.com/ClickHouse/ClickHouse/blob/c82cf25b3e5864bcc153cbe45adb8c6527e1ec6e/src/Server/TCPHandler.cpp#L2433 + // for more details, which is where the encoding of these values starts. + let Some(_table_name) = string::decode(src)? else { + return Ok(None); + }; + + // The column description itself is serialized as a ClickHouse + // varuint-prefixed string. Deserialize this, and then parse out the pieces + // of that parsed string. + let Some(text) = string::decode(src)? else { + return Ok(None); + }; + column_descriptions(&text) +} + +fn column_descriptions( + text: &str, +) -> Result>, Error> { + // Try to match the version "header" + let text = match tag::<_, _, (_, ErrorKind)>("columns format version: 1\n")( + text, + ) { + Ok((text, _match)) => text, + Err(NomErr::Incomplete(_)) => return Ok(None), + Err(_) => return Err(Error::InvalidPacket("TableColumns (header)")), + }; + + // Match the number of columns. + let (text, n_columns) = match column_count(text) { + Ok(input) => input, + Err(NomErr::Incomplete(_)) => return Ok(None), + Err(_) => { + return Err(Error::InvalidPacket("TableColumns (column count)")) + } + }; + + // Extract each column, each of which is on a separate line. + let mut out = Vec::with_capacity( + usize::try_from(n_columns).map_err(|_| Error::BlockTooLarge)?, + ); + for line in text.lines() { + let col = match column_description(line) { + Ok(out) => out.1, + Err(NomErr::Incomplete(_)) => return Ok(None), + Err(_) => { + return Err(Error::InvalidPacket("TableColumns (description)")) + } + }; + out.push(col); + } + assert_eq!(out.len(), n_columns as usize); + Ok(Some(out)) +} + +// Parse out the column count from a header line like: "3 columns:\n". +fn column_count(s: &str) -> IResult<&str, u64> { + terminated(nom_u64, tag(" columns:\n"))(s) +} + +// Parse a backtick-quoted column name like: "`foo.bar`". +fn backtick_quoted_column_name(s: &str) -> IResult<&str, &str> { + delimited(tag("`"), is_not("`"), tag("`"))(s) +} + +// Parse a full column description from one line. +// +// Note that this must _not_ end in a newline, so one should use something like +// `str::lines()` and pass the result to this method. +fn column_description(s: &str) -> IResult<&str, ColumnDescription> { + let (s, (name, data_type)) = separated_pair( + backtick_quoted_column_name, + tag(" "), + DataType::nom_parse, + )(s)?; + + // At this point, we take any remaining output as the details, which may be + // empty. + Ok(( + "", + ColumnDescription { + name: name.to_string(), + data_type, + details: s.to_string(), + }, + )) +} + +#[cfg(test)] +mod tests { + use super::backtick_quoted_column_name; + use super::column_count; + use super::column_description; + use super::column_descriptions; + use super::NomErr; + use crate::native::block::DataType; + + #[test] + fn test_backtick_quoted_column_name() { + assert_eq!(backtick_quoted_column_name("`foo`").unwrap().1, "foo"); + assert_eq!( + backtick_quoted_column_name("`foo.bar`").unwrap().1, + "foo.bar" + ); + assert!(matches!( + backtick_quoted_column_name("`foo").unwrap_err(), + NomErr::Incomplete(_) + )); + } + + #[test] + fn test_column_count() { + assert_eq!(column_count("4 columns:\n").unwrap().1, 4,); + } + + #[test] + fn test_column_description_only_required_parts() { + let desc = column_description("`timeseries_name` String").unwrap().1; + assert_eq!(desc.name, "timeseries_name"); + assert_eq!(desc.data_type, DataType::String); + assert!(desc.details.is_empty()); + } + + #[test] + fn test_column_descriptions() { + static INPUT: &str = r#"columns format version: 1 +2 columns: +`timeseries_name` String +`fields.name` Array(String) +"#; + let columns = column_descriptions(&mut &*INPUT) + .expect("failed to decode column descriptions") + .expect("expected Some(_) column description"); + assert_eq!(columns.len(), 2); + assert_eq!(columns[0].name, "timeseries_name"); + assert_eq!(columns[0].data_type, DataType::String); + assert_eq!(columns[1].name, "fields.name"); + assert_eq!( + columns[1].data_type, + DataType::Array(Box::new(DataType::String)) + ); + } + + #[test] + fn test_column_description_with_default() { + static INPUT: &str = r#"`timeseries_name` String\tDEFAULT "foo""#; + let column = column_description(&mut &*INPUT) + .expect("failed to decode column description") + .1; + assert_eq!(column.name, "timeseries_name"); + assert_eq!(column.data_type, DataType::String); + assert_eq!(column.details, "\\tDEFAULT \"foo\""); + } +} diff --git a/oximeter/db/src/native/mod.rs b/oximeter/db/src/native/mod.rs index 02cd485569f..77cbd934fe7 100644 --- a/oximeter/db/src/native/mod.rs +++ b/oximeter/db/src/native/mod.rs @@ -171,6 +171,9 @@ pub enum Error { #[error("Unrecognized server packet, kind = {0}")] UnrecognizedServerPacket(u8), + #[error("Invalid data packet of kind '{0}'")] + InvalidPacket(&'static str), + #[error("Encountered non-UTF8 string")] NonUtf8String, @@ -205,11 +208,32 @@ pub enum Error { )] Exception { exceptions: Vec }, - #[error("Cannot concatenate blocks with mismatched structure")] + #[error( + "Mismatched data block structure when concatenating blocks or \ + inserting data blocks into the database" + )] MismatchedBlockStructure, #[error("Value out of range for corresponding ClickHouse type")] OutOfRange { type_name: String, min: String, max: String, value: String }, + + #[error("Failed to serialize / deserialize value from the database")] + Serde(String), + + #[error("No column with name '{0}'")] + NoSuchColumn(String), + + #[error("Too many rows to create block")] + TooManyRows, + + #[error("Column has unexpected type")] + UnexpectedColumnType, + + #[error("Data block is too large")] + BlockTooLarge, + + #[error("Expected an empty data block")] + ExpectedEmptyDataBlock, } /// Error codes and related constants. diff --git a/oximeter/db/src/native/packets/server.rs b/oximeter/db/src/native/packets/server.rs index 2e6be64a9d0..0acf6144966 100644 --- a/oximeter/db/src/native/packets/server.rs +++ b/oximeter/db/src/native/packets/server.rs @@ -6,10 +6,30 @@ //! Packets sent from the server. +use crate::native::block::Block; +use crate::native::block::DataType; use std::fmt; use std::time::Duration; -use crate::native::block::Block; +/// Description of a single column in a table. +/// +/// This is used during insertion queries. When the client sends a request to +/// insert data, the server responds with a description of all the columns in +/// the table, which the client is supposed to use to verify the data block +/// being inserted. +#[derive(Clone, Debug, PartialEq)] +pub struct ColumnDescription { + /// The name of the column. + pub name: String, + /// The type of the column. + pub data_type: DataType, + /// Other details for the column. + /// + /// This is collected as a string, but otherwise unparsed or processed. We + /// don't care about these details at this point, and do nothing with them + /// for now. + pub details: String, +} /// A packet sent from the ClickHouse server to the client. #[derive(Clone, Debug, PartialEq)] @@ -30,6 +50,8 @@ pub enum Packet { EndOfStream, /// Profiling data for a query. ProfileInfo(ProfileInfo), + /// Metadata about the columns in a table. + TableColumns(Vec), /// A data block containing profiling events during a query. ProfileEvents(Block), } @@ -42,6 +64,7 @@ impl Packet { pub const PONG: u8 = 4; pub const END_OF_STREAM: u8 = 5; pub const PROFILE_INFO: u8 = 6; + pub const TABLE_COLUMNS: u8 = 11; pub const PROFILE_EVENTS: u8 = 14; /// Return the kind of the packet as a string. @@ -54,6 +77,7 @@ impl Packet { Packet::Pong => "Pong", Packet::EndOfStream => "EndOfStream", Packet::ProfileInfo(_) => "ProfileInfo", + Packet::TableColumns(_) => "TableColumns", Packet::ProfileEvents(_) => "ProfileEvents", } } diff --git a/oximeter/types/Cargo.toml b/oximeter/types/Cargo.toml index 6d6bbc07e6b..3f4ab66c2e3 100644 --- a/oximeter/types/Cargo.toml +++ b/oximeter/types/Cargo.toml @@ -14,6 +14,7 @@ float-ord.workspace = true num.workspace = true omicron-common.workspace = true omicron-workspace-hack.workspace = true +parse-display.workspace = true regex.workspace = true schemars = { workspace = true, features = [ "uuid1", "bytes", "chrono" ] } serde.workspace = true diff --git a/oximeter/types/src/schema.rs b/oximeter/types/src/schema.rs index 135c77462a7..3f0e8beb5c9 100644 --- a/oximeter/types/src/schema.rs +++ b/oximeter/types/src/schema.rs @@ -14,6 +14,8 @@ use crate::Metric; use crate::Target; use chrono::DateTime; use chrono::Utc; +use parse_display::Display; +use parse_display::FromStr; use schemars::JsonSchema; use serde::Deserialize; use serde::Serialize; @@ -63,11 +65,14 @@ impl FieldSchema { Debug, PartialEq, Eq, + Hash, PartialOrd, Ord, Deserialize, Serialize, JsonSchema, + FromStr, + Display, )] #[serde(rename_all = "snake_case")] pub enum FieldSource { diff --git a/oximeter/types/src/types.rs b/oximeter/types/src/types.rs index 60260e36490..cea48f4477c 100644 --- a/oximeter/types/src/types.rs +++ b/oximeter/types/src/types.rs @@ -15,6 +15,8 @@ use chrono::DateTime; use chrono::Utc; use num::traits::One; use num::traits::Zero; +use parse_display::Display; +use parse_display::FromStr; use schemars::JsonSchema; use serde::Deserialize; use serde::Serialize; @@ -40,12 +42,15 @@ use uuid::Uuid; Debug, PartialEq, Eq, + Hash, PartialOrd, Ord, JsonSchema, Serialize, Deserialize, strum::EnumIter, + FromStr, + Display, )] #[serde(rename_all = "snake_case")] pub enum FieldType { @@ -86,12 +91,6 @@ impl FieldType { } } -impl std::fmt::Display for FieldType { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{:?}", self) - } -} - macro_rules! impl_field_type_from { ($ty:ty, $variant:path) => { impl From<&$ty> for FieldType { @@ -313,6 +312,8 @@ pub struct Field { Serialize, Deserialize, strum::EnumIter, + FromStr, + Display, )] #[serde(rename_all = "snake_case")] pub enum DatumType { @@ -390,12 +391,6 @@ impl DatumType { } } -impl std::fmt::Display for DatumType { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{:?}", self) - } -} - /// A `Datum` is a single sampled data point from a metric. #[derive(Clone, Debug, PartialEq, JsonSchema, Serialize, Deserialize)] #[serde(tag = "type", content = "datum", rename_all = "snake_case")]