From eb8791c48d9a78370ceead7e70c7688d30aaa9c0 Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Wed, 6 Nov 2024 16:45:20 -0800 Subject: [PATCH] Read / write timeseries schema with the native client (#6943) - Add methods for converting an array of objects into a data block, or deserializing an array out of one. - Use new to/from block methods to read and write timeseries schema in the native format - Add native `Connection` method for inserting data in a block. - Implement deserialization of the `TableColumns` server message type, which is sent to client when it tries to insert data, and is used to check that the block to be inserted matches the structure of the table into which the insertion is directed. - Add a build script, which reads the `db-init.sql` file to extract the enum data type definitions into constants. This is used in the serialization of timeseries schema rows into a block. - Closes #6942 --- Cargo.lock | 4 + nexus-config/src/nexus_config.rs | 26 +- nexus/src/app/metrics.rs | 6 +- nexus/src/app/mod.rs | 33 +- nexus/src/app/oximeter.rs | 3 +- nexus/test-utils/src/lib.rs | 3 + oximeter/db/Cargo.toml | 7 + oximeter/db/build.rs | 103 ++++++ oximeter/db/src/client/dbwrite.rs | 38 +-- oximeter/db/src/client/mod.rs | 146 +++++---- oximeter/db/src/lib.rs | 14 +- oximeter/db/src/model/from_block.rs | 142 +++++++++ oximeter/db/src/{model.rs => model/mod.rs} | 13 +- oximeter/db/src/model/to_block.rs | 115 +++++++ oximeter/db/src/native/block.rs | 347 ++++++++++++++------- oximeter/db/src/native/connection.rs | 136 +++++++- oximeter/db/src/native/io/mod.rs | 1 + oximeter/db/src/native/io/packet/client.rs | 3 +- oximeter/db/src/native/io/packet/server.rs | 14 + oximeter/db/src/native/io/table_columns.rs | 213 +++++++++++++ oximeter/db/src/native/mod.rs | 26 +- oximeter/db/src/native/packets/server.rs | 26 +- oximeter/types/Cargo.toml | 1 + oximeter/types/src/schema.rs | 5 + oximeter/types/src/types.rs | 19 +- 25 files changed, 1207 insertions(+), 237 deletions(-) create mode 100644 oximeter/db/build.rs create mode 100644 oximeter/db/src/model/from_block.rs rename oximeter/db/src/{model.rs => model/mod.rs} (99%) create mode 100644 oximeter/db/src/model/to_block.rs create mode 100644 oximeter/db/src/native/io/table_columns.rs diff --git a/Cargo.lock b/Cargo.lock index bc6a0c15c9..77f6a20491 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7547,6 +7547,7 @@ dependencies = [ "chrono-tz", "clap", "clickward", + "const_format", "criterion", "crossterm", "debug-ignore", @@ -7568,8 +7569,10 @@ dependencies = [ "oximeter", "oximeter-test-utils", "oxql-types", + "parse-display", "peg", "qorb", + "quote", "reedline", "regex", "reqwest 0.12.8", @@ -7714,6 +7717,7 @@ dependencies = [ "omicron-common", "omicron-workspace-hack", "oximeter-macro-impl", + "parse-display", "rand", "rand_distr", "regex", diff --git a/nexus-config/src/nexus_config.rs b/nexus-config/src/nexus_config.rs index a0af65b6ec..82362f2f0d 100644 --- a/nexus-config/src/nexus_config.rs +++ b/nexus-config/src/nexus_config.rs @@ -250,8 +250,12 @@ pub struct SchemaConfig { /// Optional configuration for the timeseries database. #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)] pub struct TimeseriesDbConfig { + /// The HTTP address of the ClickHouse server. #[serde(default, skip_serializing_if = "Option::is_none")] pub address: Option, + /// The native TCP address of the ClickHouse server. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub native_address: Option, } /// Configuration for the `Dendrite` dataplane daemon. @@ -774,7 +778,9 @@ impl std::fmt::Display for SchemeName { mod test { use super::*; - use omicron_common::address::{Ipv6Subnet, RACK_PREFIX}; + use omicron_common::address::{ + Ipv6Subnet, CLICKHOUSE_HTTP_PORT, CLICKHOUSE_TCP_PORT, RACK_PREFIX, + }; use omicron_common::api::internal::shared::SwitchLocation; use camino::{Utf8Path, Utf8PathBuf}; @@ -784,7 +790,7 @@ mod test { use dropshot::ConfigLoggingLevel; use std::collections::HashMap; use std::fs; - use std::net::{Ipv6Addr, SocketAddr}; + use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6}; use std::str::FromStr; use std::time::Duration; @@ -889,6 +895,7 @@ mod test { if_exists = "fail" [timeseries_db] address = "[::1]:8123" + native_address = "[::1]:9000" [updates] trusted_root = "/path/to/root.json" [tunables] @@ -1007,7 +1014,20 @@ mod test { path: "/nonexistent/path".into() }, timeseries_db: TimeseriesDbConfig { - address: Some("[::1]:8123".parse().unwrap()) + address: Some(SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::LOCALHOST, + CLICKHOUSE_HTTP_PORT, + 0, + 0, + ))), + native_address: Some(SocketAddr::V6( + SocketAddrV6::new( + Ipv6Addr::LOCALHOST, + CLICKHOUSE_TCP_PORT, + 0, + 0, + ) + )), }, updates: Some(UpdatesConfig { trusted_root: Utf8PathBuf::from("/path/to/root.json"), diff --git a/nexus/src/app/metrics.rs b/nexus/src/app/metrics.rs index ba76f87392..40f7882281 100644 --- a/nexus/src/app/metrics.rs +++ b/nexus/src/app/metrics.rs @@ -115,7 +115,8 @@ impl super::Nexus { .timeseries_schema_list(&pagination.page, limit) .await .map_err(|e| match e { - oximeter_db::Error::DatabaseUnavailable(_) => { + oximeter_db::Error::DatabaseUnavailable(_) + | oximeter_db::Error::Connection(_) => { Error::ServiceUnavailable { internal_message: e.to_string(), } @@ -150,7 +151,8 @@ impl super::Nexus { result.tables }) .map_err(|e| match e { - oximeter_db::Error::DatabaseUnavailable(_) => { + oximeter_db::Error::DatabaseUnavailable(_) + | oximeter_db::Error::Connection(_) => { Error::ServiceUnavailable { internal_message: e.to_string(), } diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index fa9c2c69cf..e451119bfc 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -24,6 +24,7 @@ use nexus_db_queries::authn; use nexus_db_queries::authz; use nexus_db_queries::context::OpContext; use nexus_db_queries::db; +use omicron_common::address::CLICKHOUSE_HTTP_PORT; use omicron_common::address::CLICKHOUSE_TCP_PORT; use omicron_common::address::DENDRITE_PORT; use omicron_common::address::MGD_PORT; @@ -411,13 +412,12 @@ impl Nexus { .map_err(|e| e.to_string())?; // Client to the ClickHouse database. - let timeseries_client = - if let Some(http_address) = &config.pkg.timeseries_db.address { - let native_address = - SocketAddr::new(http_address.ip(), CLICKHOUSE_TCP_PORT); - oximeter_db::Client::new(*http_address, native_address, &log) - } else { - // TODO-cleanup: Remove this when we remove the HTTP client. + // TODO-cleanup: Simplify this when we remove the HTTP client. + let timeseries_client = match ( + &config.pkg.timeseries_db.address, + &config.pkg.timeseries_db.native_address, + ) { + (None, None) => { let http_resolver = qorb_resolver.for_service(ServiceName::Clickhouse); let native_resolver = @@ -427,7 +427,24 @@ impl Nexus { native_resolver, &log, ) - }; + } + (maybe_http, maybe_native) => { + let (http_address, native_address) = + match (maybe_http, maybe_native) { + (None, None) => unreachable!("handled above"), + (None, Some(native)) => ( + SocketAddr::new(native.ip(), CLICKHOUSE_HTTP_PORT), + *native, + ), + (Some(http), None) => ( + *http, + SocketAddr::new(http.ip(), CLICKHOUSE_TCP_PORT), + ), + (Some(http), Some(native)) => (*http, *native), + }; + oximeter_db::Client::new(http_address, native_address, &log) + } + }; // TODO-cleanup We may want to make the populator a first-class // background task. diff --git a/nexus/src/app/oximeter.rs b/nexus/src/app/oximeter.rs index 6a4a81a47a..708b0828c9 100644 --- a/nexus/src/app/oximeter.rs +++ b/nexus/src/app/oximeter.rs @@ -241,7 +241,8 @@ pub(crate) async fn unassign_producer( fn map_oximeter_err(error: oximeter_db::Error) -> Error { match error { - oximeter_db::Error::DatabaseUnavailable(_) => { + oximeter_db::Error::DatabaseUnavailable(_) + | oximeter_db::Error::Connection(_) => { Error::ServiceUnavailable { internal_message: error.to_string() } } _ => Error::InternalError { internal_message: error.to_string() }, diff --git a/nexus/test-utils/src/lib.rs b/nexus/test-utils/src/lib.rs index 22fa61685d..0c5df8c844 100644 --- a/nexus/test-utils/src/lib.rs +++ b/nexus/test-utils/src/lib.rs @@ -486,6 +486,7 @@ impl<'a, N: NexusServer> ControlPlaneTestContextBuilder<'a, N> { let dataset_id = Uuid::new_v4(); let http_address = clickhouse.http_address(); let http_port = http_address.port(); + let native_address = clickhouse.native_address(); self.rack_init_builder.add_clickhouse_dataset( zpool_id, dataset_id, @@ -504,6 +505,8 @@ impl<'a, N: NexusServer> ControlPlaneTestContextBuilder<'a, N> { .as_mut() .expect("Tests expect to set a port of Clickhouse") .set_port(http_port); + self.config.pkg.timeseries_db.native_address = + Some(native_address.into()); let pool_name = illumos_utils::zpool::ZpoolName::new_external(zpool_id) .to_string() diff --git a/oximeter/db/Cargo.toml b/oximeter/db/Cargo.toml index b0afdfbb07..df4bc7d703 100644 --- a/oximeter/db/Cargo.toml +++ b/oximeter/db/Cargo.toml @@ -17,6 +17,7 @@ camino.workspace = true chrono.workspace = true chrono-tz.workspace = true clap.workspace = true +const_format.workspace = true clickward.workspace = true debug-ignore.workspace = true dropshot.workspace = true @@ -32,6 +33,7 @@ omicron-common.workspace = true omicron-workspace-hack.workspace = true oximeter.workspace = true oxql-types.workspace = true +parse-display.workspace = true qorb.workspace = true regex.workspace = true serde.workspace = true @@ -93,6 +95,11 @@ optional = true workspace = true features = [ "rt-multi-thread", "macros" ] +[build-dependencies] +anyhow.workspace = true +nom.workspace = true +quote.workspace = true + [dev-dependencies] camino-tempfile.workspace = true criterion = { workspace = true, features = [ "async_tokio" ] } diff --git a/oximeter/db/build.rs b/oximeter/db/build.rs new file mode 100644 index 0000000000..398dfb7f69 --- /dev/null +++ b/oximeter/db/build.rs @@ -0,0 +1,103 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright 2024 Oxide Computer Company + +use anyhow::Context as _; +use nom::IResult; + +/// Build script for generating native type representations from the +/// ground-truth SQL definitions. +fn main() -> anyhow::Result<()> { + // We're only parsing data types, specifically the enum variant-to-name + // mappings. These are the same for single-node and replicated databases, so + // use the former for simplicity. + const INIT_FILE: &str = + concat!(env!("CARGO_MANIFEST_DIR"), "/schema/single-node/db-init.sql"); + let contents = std::fs::read_to_string(INIT_FILE) + .with_context(|| format!("Failed to read SQL file: '{INIT_FILE}'"))?; + let field_type_enum = + find_enum(&contents, "type").context("failed to find column 'type'")?; + let field_source_enum = find_enum(&contents, "source") + .context("failed to find column 'source'")?; + let datum_type_enum = find_enum(&contents, "datum_type") + .context("failed to find column 'datum_type'")?; + std::fs::write( + format!("{}/enum_defs.rs", std::env::var("OUT_DIR")?), + [field_type_enum, field_source_enum, datum_type_enum].join("\n"), + ) + .context("writing output file")?; + Ok(()) +} + +// Find an enum in the `timeseries_schema` table definition for the named +// column, and return the corresponding `DataType::Enum8()` definition for it. +fn find_enum(contents: &str, column: &str) -> Option { + let needle = format!("{column} Enum(\n"); + let start = contents.find(&needle)? + needle.len(); + let s = &contents[start..].trim(); + let (variants, names): (Vec, Vec) = + variant_list(s).ok()?.1.into_iter().unzip(); + let enum_map = quote::format_ident!("{}_ENUM_MAP", column.to_uppercase()); + let enum_rev_map = + quote::format_ident!("{}_ENUM_REV_MAP", column.to_uppercase()); + let enum_type = + quote::format_ident!("{}_ENUM_DATA_TYPE", column.to_uppercase()); + let parsed_type = if column == "type" { + quote::quote! { ::oximeter::FieldType } + } else if column == "source" { + quote::quote! { ::oximeter::FieldSource } + } else if column == "datum_type" { + quote::quote! { ::oximeter::DatumType } + } else { + unreachable!(); + }; + Some(quote::quote! { + /// Mapping from the variant index to the string form. + #[allow(dead_code)] + static #enum_map: ::std::sync::LazyLock<::indexmap::IndexMap> = ::std::sync::LazyLock::new(|| { + ::indexmap::IndexMap::from([ + #((#variants, String::from(#names))),* + ]) + }); + /// Reverse mapping, from the _parsed_ form to the variant index. + #[allow(dead_code)] + static #enum_rev_map: ::std::sync::LazyLock<::indexmap::IndexMap<#parsed_type, i8>> = ::std::sync::LazyLock::new(|| { + ::indexmap::IndexMap::from([ + #((<#parsed_type as ::std::str::FromStr>::from_str(#names).unwrap(), #variants)),* + ]) + }); + /// Actual DataType::Enum8(_) with the contained variant-to-name mapping. + #[allow(dead_code)] + static #enum_type: ::std::sync::LazyLock = ::std::sync::LazyLock::new(|| { + crate::native::block::DataType::Enum8( + ::indexmap::IndexMap::from([ + #((#variants, String::from(#names))),* + ]) + ) + }); + }.to_string()) +} + +fn variant_list(s: &str) -> IResult<&str, Vec<(i8, String)>> { + nom::multi::separated_list1( + nom::bytes::complete::is_a(" ,\n"), + single_variant, + )(s) +} + +fn single_variant(s: &str) -> IResult<&str, (i8, String)> { + nom::combinator::map( + nom::sequence::separated_pair( + nom::sequence::delimited( + nom::bytes::complete::tag("'"), + nom::character::complete::alphanumeric1, + nom::bytes::complete::tag("'"), + ), + nom::bytes::complete::tag(" = "), + nom::character::complete::i8, + ), + |(name, variant): (&str, i8)| (variant, name.to_string()), + )(s) +} diff --git a/oximeter/db/src/client/dbwrite.rs b/oximeter/db/src/client/dbwrite.rs index 3559374e0d..d37d95deb9 100644 --- a/oximeter/db/src/client/dbwrite.rs +++ b/oximeter/db/src/client/dbwrite.rs @@ -8,18 +8,19 @@ use crate::client::Client; use crate::model; +use crate::model::to_block::ToBlock as _; use crate::Error; use camino::Utf8PathBuf; use oximeter::Sample; -use oximeter::TimeseriesName; +use oximeter::TimeseriesSchema; use slog::debug; use std::collections::BTreeMap; use std::collections::BTreeSet; #[derive(Debug)] pub(super) struct UnrolledSampleRows { - /// The timeseries schema rows, keyed by timeseries name. - pub new_schema: BTreeMap, + /// The timeseries schema rows. + pub new_schema: Vec, /// The rows to insert in all the other tables, keyed by the table name. pub rows: BTreeMap>, } @@ -182,14 +183,14 @@ impl Client { continue; } Ok(None) => {} - Ok(Some((name, schema))) => { + Ok(Some(schema)) => { debug!( self.log, "new timeseries schema"; - "timeseries_name" => %name, - "schema" => %schema + "timeseries_name" => %schema.timeseries_name, + "schema" => ?schema, ); - new_schema.insert(name, schema); + new_schema.insert(schema.timeseries_name.clone(), schema); } } @@ -217,6 +218,7 @@ impl Client { seen_timeseries.insert(key); } + let new_schema = new_schema.into_values().collect(); UnrolledSampleRows { new_schema, rows } } @@ -268,7 +270,7 @@ impl Client { // receive a sample with a new schema, and both would then try to insert that schema. pub(super) async fn save_new_schema_or_remove( &self, - new_schema: BTreeMap, + new_schema: Vec, ) -> Result<(), Error> { if !new_schema.is_empty() { debug!( @@ -276,17 +278,11 @@ impl Client { "inserting {} new timeseries schema", new_schema.len() ); - const APPROX_ROW_SIZE: usize = 64; - let mut body = String::with_capacity( - APPROX_ROW_SIZE + APPROX_ROW_SIZE * new_schema.len(), + let body = const_format::formatcp!( + "INSERT INTO {}.timeseries_schema FORMAT Native", + crate::DATABASE_NAME ); - body.push_str("INSERT INTO "); - body.push_str(crate::DATABASE_NAME); - body.push_str(".timeseries_schema FORMAT JSONEachRow\n"); - for row_data in new_schema.values() { - body.push_str(row_data); - body.push('\n'); - } + let block = TimeseriesSchema::to_block(&new_schema)?; // Try to insert the schema. // @@ -294,16 +290,16 @@ impl Client { // internal cache. Since we check the internal cache first for // schema, if we fail here but _don't_ remove the schema, we'll // never end up inserting the schema, but we will insert samples. - if let Err(e) = self.execute(body).await { + if let Err(e) = self.insert_native(&body, block).await { debug!( self.log, "failed to insert new schema, removing from cache"; "error" => ?e, ); let mut schema = self.schema.lock().await; - for name in new_schema.keys() { + for schema_to_remove in new_schema.iter() { schema - .remove(name) + .remove(&schema_to_remove.timeseries_name) .expect("New schema should have been cached"); } return Err(e); diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index ee964b08f6..3fdee90858 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -17,7 +17,9 @@ pub use self::dbwrite::DbWrite; pub use self::dbwrite::TestDbWrite; use crate::client::query_summary::QuerySummary; use crate::model; +use crate::model::from_block::FromBlock; use crate::native; +use crate::native::block::Block; use crate::native::block::ValueArray; use crate::native::QueryResult; use crate::query; @@ -456,7 +458,7 @@ impl Client { "FROM {}.timeseries_schema ", "ORDER BY timeseries_name ", "LIMIT {} ", - "FORMAT JSONEachRow;", + "FORMAT Native;", ), crate::DATABASE_NAME, limit.get(), @@ -469,7 +471,7 @@ impl Client { "WHERE timeseries_name > '{}' ", "ORDER BY timeseries_name ", "LIMIT {} ", - "FORMAT JSONEachRow;", + "FORMAT Native;", ), crate::DATABASE_NAME, last_timeseries, @@ -477,18 +479,19 @@ impl Client { ) } }; - let body = self.execute_with_body(sql).await?.1; - let schema = body - .lines() - .map(|line| { - TimeseriesSchema::from( - serde_json::from_str::(line) - .expect( - "Failed to deserialize TimeseriesSchema from database", - ), - ) - }) - .collect::>(); + let result = self.execute_with_block(&sql).await?; + let Some(block) = result.data.as_ref() else { + error!( + self.log, + "query listing timeseries schema did not contain \ + a data block" + ); + return Err(Error::Database(String::from( + "Query listing timeseries schema did not contain \ + any data from the database", + ))); + }; + let schema = TimeseriesSchema::from_block(block)?; ResultsPage::new(schema, &dropshot::EmptyScanParams {}, |schema, _| { schema.timeseries_name.clone() }) @@ -677,7 +680,7 @@ impl Client { "path" => path.display(), "filename" => &name, ); - match self.execute_native(sql).await { + match self.execute_native(&sql).await { Ok(_) => debug!( self.log, "successfully applied schema upgrade file"; @@ -885,11 +888,11 @@ impl Client { /// Read the latest version applied in the database. pub async fn read_latest_version(&self) -> Result { const ALIAS: &str = "max_version"; - let sql = format!( + const QUERY: &str = const_format::formatcp!( "SELECT MAX(value) AS {ALIAS} FROM {db_name}.version;", db_name = crate::DATABASE_NAME, ); - match self.execute_with_result_native(sql).await { + match self.execute_with_block(QUERY).await { Ok(result) => { let Some(data) = &result.data else { error!( @@ -985,13 +988,14 @@ impl Client { "INSERT INTO {db_name}.version (*) VALUES ({version}, now());", db_name = crate::DATABASE_NAME, ); - self.execute_native(sql).await + self.execute_native(&sql).await } /// Verifies if instance is part of oximeter_cluster pub async fn is_oximeter_cluster(&self) -> Result { - let sql = format!("SHOW CLUSTER {}", crate::CLUSTER_NAME); - self.execute_with_result_native(sql).await.and_then(|result| { + const QUERY: &str = + const_format::formatcp!("SHOW CLUSTER {}", crate::CLUSTER_NAME); + self.execute_with_block(QUERY).await.and_then(|result| { result .data .ok_or_else(|| { @@ -1017,7 +1021,7 @@ impl Client { async fn verify_or_cache_sample_schema( &self, sample: &Sample, - ) -> Result, Error> { + ) -> Result, Error> { let sample_schema = TimeseriesSchema::from(sample); let name = sample_schema.timeseries_name.clone(); let mut schema = self.schema.lock().await; @@ -1051,15 +1055,8 @@ impl Client { } } Entry::Vacant(entry) => { - let name = entry.key().clone(); entry.insert(sample_schema.clone()); - Ok(Some(( - name, - serde_json::to_string(&model::DbTimeseriesSchema::from( - sample_schema, - )) - .expect("Failed to convert schema to DB model"), - ))) + Ok(Some(sample_schema)) } } } @@ -1118,36 +1115,48 @@ impl Client { Ok(timeseries_by_key.into_values().collect()) } + // Insert data using the native TCP interface. + async fn insert_native( + &self, + sql: &str, + block: Block, + ) -> Result { + trace!( + self.log, + "executing SQL query"; + "sql" => sql, + ); + let mut handle = self.native_pool.claim().await?; + let id = usdt::UniqueId::new(); + probes::sql__query__start!(|| (&id, &sql)); + let result = handle.insert(sql, block).await.map_err(Error::from); + probes::sql__query__done!(|| (&id)); + result + } + // Execute a generic SQL statement, using the native TCP interface. - async fn execute_native(&self, sql: S) -> Result<(), Error> - where - S: Into, - { - self.execute_with_result_native(sql).await.map(|_| ()) + async fn execute_native(&self, sql: &str) -> Result<(), Error> { + self.execute_with_block(sql).await.map(|_| ()) } // Execute a generic SQL statement, returning the query result as a data // block. // // TODO-robustness This currently does no validation of the statement. - async fn execute_with_result_native( + async fn execute_with_block( &self, - sql: S, - ) -> Result - where - S: Into, - { - let sql = sql.into(); + sql: &str, + ) -> Result { trace!( self.log, "executing SQL query"; - "sql" => &sql, + "sql" => sql, ); let mut handle = self.native_pool.claim().await?; let id = usdt::UniqueId::new(); probes::sql__query__start!(|| (&id, &sql)); - let result = handle.query(sql.as_str()).await.map_err(Error::from); + let result = handle.query(sql).await.map_err(Error::from); probes::sql__query__done!(|| (&id)); result } @@ -1245,7 +1254,7 @@ impl Client { let sql = { if schema.is_empty() { format!( - "SELECT * FROM {db_name}.timeseries_schema FORMAT JSONEachRow;", + "SELECT * FROM {db_name}.timeseries_schema FORMAT Native;", db_name = crate::DATABASE_NAME, ) } else { @@ -1256,7 +1265,7 @@ impl Client { "FROM {db_name}.timeseries_schema ", "WHERE timeseries_name NOT IN ", "({current_keys}) ", - "FORMAT JSONEachRow;", + "FORMAT Native;", ), db_name = crate::DATABASE_NAME, current_keys = schema @@ -1267,21 +1276,22 @@ impl Client { ) } }; - let body = self.execute_with_body(sql).await?.1; - if body.is_empty() { + let body = self.execute_with_block(&sql).await?; + let Some(data) = body.data.as_ref() else { trace!(self.log, "no new timeseries schema in database"); - } else { - trace!(self.log, "extracting new timeseries schema"); - let new = body.lines().map(|line| { - let schema = TimeseriesSchema::from( - serde_json::from_str::(line) - .expect( - "Failed to deserialize TimeseriesSchema from database", - ), - ); - (schema.timeseries_name.clone(), schema) - }); - schema.extend(new); + return Ok(()); + }; + if data.n_rows == 0 { + trace!(self.log, "no new timeseries schema in database"); + return Ok(()); + } + trace!( + self.log, + "retrieved new timeseries schema"; + "n_schema" => data.n_rows, + ); + for new_schema in TimeseriesSchema::from_block(data)?.into_iter() { + schema.insert(new_schema.timeseries_name.clone(), new_schema); } Ok(()) } @@ -1375,7 +1385,7 @@ impl Client { "table_name" => table, "n_timeseries" => chunk.len(), ); - self.execute_native(sql).await?; + self.execute_native(&sql).await?; } } Ok(()) @@ -3153,7 +3163,7 @@ mod tests { "INSERT INTO oximeter.{field_table} FORMAT JSONEachRow {row}" ); client - .execute_native(insert_sql) + .execute_native(&insert_sql) .await .expect("Failed to insert field row"); @@ -3164,7 +3174,7 @@ mod tests { crate::DATABASE_SELECT_FORMAT, ); let body = client - .execute_with_body(select_sql) + .execute_with_body(&select_sql) .await .expect("Failed to select field row") .1; @@ -3493,7 +3503,7 @@ mod tests { ); println!("Inserted row: {}", inserted_row); client - .execute_native(insert_sql) + .execute_native(&insert_sql) .await .expect("Failed to insert measurement row"); @@ -4061,11 +4071,11 @@ mod tests { // table, to version 2, which adds two columns to that table in // different SQL files. client - .execute_native(format!("CREATE DATABASE {test_name};")) + .execute_native(&format!("CREATE DATABASE {test_name};")) .await .unwrap(); client - .execute_native(format!( + .execute_native(&format!( "\ CREATE TABLE {test_name}.tbl (\ `col0` UInt8 \ @@ -4110,7 +4120,7 @@ mod tests { // Check that it actually worked! let body = client - .execute_with_body(format!( + .execute_with_body(&format!( "\ SELECT name, type FROM system.columns \ WHERE database = '{test_name}' AND table = 'tbl' \ @@ -4273,11 +4283,11 @@ mod tests { // modifications over two versions, rather than as multiple schema // upgrades in one version bump. client - .execute_native(format!("CREATE DATABASE {test_name};")) + .execute_native(&format!("CREATE DATABASE {test_name};")) .await .unwrap(); client - .execute_native(format!( + .execute_native(&format!( "\ CREATE TABLE {test_name}.tbl (\ `col0` UInt8 \ diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index 36fff6056f..3712771595 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -20,6 +20,8 @@ pub use oximeter::Field; pub use oximeter::FieldType; pub use oximeter::Measurement; pub use oximeter::Sample; +use parse_display::Display; +use parse_display::FromStr; use schemars::JsonSchema; use serde::Deserialize; use serde::Serialize; @@ -224,7 +226,17 @@ pub struct Timeseries { } #[derive( - Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Deserialize, + Serialize, + FromStr, + Display, )] pub enum DbFieldSource { Target, diff --git a/oximeter/db/src/model/from_block.rs b/oximeter/db/src/model/from_block.rs new file mode 100644 index 0000000000..83e0f0a0e7 --- /dev/null +++ b/oximeter/db/src/model/from_block.rs @@ -0,0 +1,142 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright 2024 Oxide Computer Company + +//! Trait for deserializing an array of values from a `Block`. + +use crate::native::block::Block; +use crate::native::block::DataType; +use crate::native::block::ValueArray; +use crate::native::Error; +use oximeter::AuthzScope; +use oximeter::FieldSchema; +use oximeter::TimeseriesDescription; +use oximeter::TimeseriesSchema; +use oximeter::Units; +use std::collections::BTreeSet; +use std::num::NonZeroU8; + +/// Trait for deserializing an array of items from a ClickHouse data block. +pub trait FromBlock: Sized { + /// Deserialize an array of `Self`s from a block. + fn from_block(block: &Block) -> Result, Error>; +} + +// TODO-cleanup: This is probably a good candidate for a derive-macro, which +// expands to the code that checks that names / types in the block match those +// of the fields in the struct itself. +impl FromBlock for TimeseriesSchema { + fn from_block(block: &Block) -> Result, Error> { + if block.is_empty() { + return Ok(vec![]); + } + let n_rows = + usize::try_from(block.n_rows).map_err(|_| Error::BlockTooLarge)?; + let mut out = Vec::with_capacity(n_rows); + let ValueArray::String(timeseries_names) = + block.column_values("timeseries_name")? + else { + return Err(Error::UnexpectedColumnType); + }; + let ValueArray::Array { + values: field_names, + inner_type: DataType::String, + } = block.column_values("fields.name")? + else { + return Err(Error::UnexpectedColumnType); + }; + let ValueArray::Array { + values: field_types, + inner_type: DataType::Enum8(field_type_variants), + } = block.column_values("fields.type")? + else { + return Err(Error::UnexpectedColumnType); + }; + let ValueArray::Array { + values: field_sources, + inner_type: DataType::Enum8(field_source_variants), + } = block.column_values("fields.source")? + else { + return Err(Error::UnexpectedColumnType); + }; + let ValueArray::Enum8 { + variants: datum_type_variants, + values: datum_types, + } = block.column_values("datum_type")? + else { + return Err(Error::UnexpectedColumnType); + }; + let ValueArray::DateTime64 { values: created, .. } = + block.column_values("created")? + else { + return Err(Error::UnexpectedColumnType); + }; + + for row in 0..n_rows { + let ValueArray::String(names) = &field_names[row] else { + unreachable!(); + }; + let ValueArray::Enum8 { values: row_field_types, .. } = + &field_types[row] + else { + unreachable!(); + }; + let ValueArray::Enum8 { values: row_field_sources, .. } = + &field_sources[row] + else { + unreachable!(); + }; + let mut field_schema = BTreeSet::new(); + let n_fields = names.len(); + for field in 0..n_fields { + let schema = FieldSchema { + name: names[field].clone(), + field_type: field_type_variants[&row_field_types[field]] + .parse() + .map_err(|_| { + Error::Serde(format!( + "Failed to deserialize field type from database: {:?}", + field_type_variants[&row_field_types[field]] + )) + })?, + source: field_source_variants[&row_field_sources[field]] + .parse() + .map_err(|_| { + Error::Serde(format!( + "Failed to deserialize field source from database: {:?}", + field_source_variants[&row_field_sources[field]])) + })?, + description: String::new(), + }; + field_schema.insert(schema); + } + let schema = TimeseriesSchema { + timeseries_name: + timeseries_names[row].clone().parse().map_err(|_| { + Error::Serde(format!( + "Failed to deserialize timeseries name from database: {:?}", + ×eries_names[row] + )) + })?, + description: TimeseriesDescription::default(), + field_schema, + datum_type: datum_type_variants[&datum_types[row]] + .parse() + .map_err(|_| { + Error::Serde(format!( + "Failed to deserialize datum type from database: {:?}", + &datum_type_variants[&datum_types[row]] + )) + })?, + version: unsafe { NonZeroU8::new_unchecked(1) }, + authz_scope: AuthzScope::Fleet, + units: Units::None, + created: created[row].to_utc(), + }; + out.push(schema); + } + Ok(out) + } +} diff --git a/oximeter/db/src/model.rs b/oximeter/db/src/model/mod.rs similarity index 99% rename from oximeter/db/src/model.rs rename to oximeter/db/src/model/mod.rs index 5e3508a14f..93b857f34f 100644 --- a/oximeter/db/src/model.rs +++ b/oximeter/db/src/model/mod.rs @@ -29,6 +29,8 @@ use oximeter::types::Measurement; use oximeter::types::MissingDatum; use oximeter::types::Sample; use oximeter::Quantile; +use parse_display::Display; +use parse_display::FromStr; use serde::Deserialize; use serde::Serialize; use std::collections::BTreeMap; @@ -38,6 +40,9 @@ use std::net::IpAddr; use std::net::Ipv6Addr; use uuid::Uuid; +pub mod from_block; +pub mod to_block; + /// Describes the version of the Oximeter database. /// /// For usage and details see: @@ -170,7 +175,9 @@ impl From for DbTimeseriesSchema { } } -#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] +#[derive( + Clone, Copy, Debug, Serialize, Deserialize, PartialEq, FromStr, Display, +)] pub enum DbFieldType { String, I8, @@ -223,7 +230,9 @@ impl From for DbFieldType { } } -#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] +#[derive( + Clone, Copy, Debug, Serialize, Deserialize, PartialEq, FromStr, Display, +)] pub enum DbDatumType { Bool, I8, diff --git a/oximeter/db/src/model/to_block.rs b/oximeter/db/src/model/to_block.rs new file mode 100644 index 0000000000..5a6bb91766 --- /dev/null +++ b/oximeter/db/src/model/to_block.rs @@ -0,0 +1,115 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright 2024 Oxide Computer Company + +//! Trait for serializing an array of values into a `Block`. + +use crate::native::block::Block; +use crate::native::block::Column; +use crate::native::block::DataType; +use crate::native::block::Precision; +use crate::native::block::ValueArray; +use crate::native::Error; +use chrono::TimeZone as _; +use chrono_tz::Tz; +use indexmap::IndexMap; +use oximeter::TimeseriesSchema; + +include!(concat!(env!("OUT_DIR"), "/enum_defs.rs")); + +/// Trait for serializing an array of items to a ClickHouse data block. +pub trait ToBlock: Sized { + /// Serialize an array of `Self`s to a block. + fn to_block(items: &[Self]) -> Result; +} + +// TODO-cleanup: This is probably a good candidate for a derive-macro, which +// expands to the code that checks that names / types in the block match those +// of the fields in the struct itself. +impl ToBlock for TimeseriesSchema { + fn to_block(items: &[Self]) -> Result { + let n_items = items.len(); + let mut timeseries_names = Vec::with_capacity(n_items); + let mut field_names = Vec::with_capacity(n_items); + let mut field_types = Vec::with_capacity(n_items); + let mut field_sources = Vec::with_capacity(n_items); + let mut datum_types = Vec::with_capacity(n_items); + let mut created = Vec::with_capacity(n_items); + for item in items.iter() { + timeseries_names.push(item.timeseries_name.to_string()); + let n_fields = item.field_schema.len(); + let mut row_field_names = Vec::with_capacity(n_fields); + let mut row_field_types = Vec::with_capacity(n_fields); + let mut row_field_sources = Vec::with_capacity(n_fields); + for field in item.field_schema.iter() { + row_field_names.push(field.name.clone()); + let ty = TYPE_ENUM_REV_MAP.get(&field.field_type).unwrap(); + row_field_types.push(*ty); + let src = SOURCE_ENUM_REV_MAP.get(&field.source).unwrap(); + row_field_sources.push(*src); + } + field_names.push(ValueArray::String(row_field_names)); + field_types.push(ValueArray::Enum8 { + variants: TYPE_ENUM_MAP.clone(), + values: row_field_types, + }); + field_sources.push(ValueArray::Enum8 { + variants: SOURCE_ENUM_MAP.clone(), + values: row_field_sources, + }); + datum_types + .push(*DATUM_TYPE_ENUM_REV_MAP.get(&item.datum_type).unwrap()); + created.push(Tz::UTC.from_utc_datetime(&item.created.naive_utc())); + } + Ok(Block { + name: String::new(), + info: Default::default(), + n_columns: 6, + n_rows: u64::try_from(n_items).map_err(|_| Error::BlockTooLarge)?, + columns: IndexMap::from([ + ( + String::from("timeseries_name"), + Column::from(ValueArray::String(timeseries_names)), + ), + ( + String::from("fields.name"), + Column::from(ValueArray::Array { + inner_type: DataType::String, + values: field_names, + }), + ), + ( + String::from("fields.type"), + Column::from(ValueArray::Array { + inner_type: TYPE_ENUM_DATA_TYPE.clone(), + values: field_types, + }), + ), + ( + String::from("fields.source"), + Column::from(ValueArray::Array { + inner_type: SOURCE_ENUM_DATA_TYPE.clone(), + values: field_sources, + }), + ), + ( + String::from("datum_type"), + Column::from(ValueArray::Enum8 { + variants: DATUM_TYPE_ENUM_MAP.clone(), + values: datum_types, + }), + ), + ( + String::from("created"), + Column::from(ValueArray::DateTime64 { + values: created, + precision: Precision::new(9).unwrap(), + tz: Tz::UTC, + }), + ), + ]), + }) + } +} diff --git a/oximeter/db/src/native/block.rs b/oximeter/db/src/native/block.rs index 10727b6532..e9e2dd0056 100644 --- a/oximeter/db/src/native/block.rs +++ b/oximeter/db/src/native/block.rs @@ -6,20 +6,28 @@ //! Types for working with actual blocks and columns of data. +use super::packets::server::ColumnDescription; use super::Error; use chrono::DateTime; use chrono::NaiveDate; use chrono_tz::Tz; use indexmap::IndexMap; +use nom::branch::alt; use nom::bytes::complete::tag; use nom::bytes::complete::take_while1; +use nom::character::complete::alphanumeric1; +use nom::character::complete::i8 as nom_i8; use nom::character::complete::u8 as nom_u8; +use nom::combinator::all_consuming; use nom::combinator::eof; use nom::combinator::map; use nom::combinator::map_opt; use nom::combinator::opt; +use nom::combinator::value; +use nom::multi::separated_list1; use nom::sequence::delimited; use nom::sequence::preceded; +use nom::sequence::separated_pair; use nom::sequence::tuple; use nom::IResult; use std::fmt; @@ -67,26 +75,15 @@ impl Block { self.n_columns == 0 && self.n_rows == 0 } - /// Create an empty block with the provided column names and types - pub fn empty<'a>( - types: impl IntoIterator, - ) -> Result { - let mut columns = IndexMap::new(); - let mut n_columns = 0; - for (name, type_) in types.into_iter() { - if !type_.is_supported() { - return Err(Error::UnsupportedDataType(type_.to_string())); - } - n_columns += 1; - columns.insert(name.to_string(), Column::empty(type_)); - } - Ok(Self { + /// Create an empty block. + pub fn empty() -> Self { + Self { name: String::new(), info: BlockInfo::default(), - n_columns, + n_columns: 0, n_rows: 0, - columns, - }) + columns: IndexMap::new(), + } } /// Concatenate this data block with another. @@ -106,7 +103,10 @@ impl Block { Ok(()) } - fn matches_structure(&self, block: &Block) -> bool { + /// Return true if this block matches the structure of the other. + /// + /// This means it has the same column names and types. + pub fn matches_structure(&self, block: &Block) -> bool { if self.n_columns != block.n_columns { return false; } @@ -120,6 +120,32 @@ impl Block { } true } + + /// Return the values of the named column, if it exists. + pub fn column_values(&self, name: &str) -> Result<&ValueArray, Error> { + self.columns + .get(name) + .map(|col| &col.values) + .ok_or_else(|| Error::NoSuchColumn(name.to_string())) + } + + pub(crate) fn matches_table_description( + &self, + columns: &[ColumnDescription], + ) -> bool { + if self.n_columns != columns.len() as u64 { + return false; + } + for (our_col, their_col) in self.columns.iter().zip(columns.iter()) { + if our_col.0 != &their_col.name { + return false; + } + if our_col.1.data_type != their_col.data_type { + return false; + } + } + true + } } /// Details about the block. @@ -177,6 +203,13 @@ pub struct Column { pub data_type: DataType, } +impl From for Column { + fn from(values: ValueArray) -> Self { + let data_type = values.data_type(); + Self { values, data_type } + } +} + impl Column { /// Create an empty column of the provided type. pub fn empty(data_type: DataType) -> Self { @@ -194,6 +227,16 @@ impl Column { self.values.concat(rhs.values); Ok(()) } + + /// Return true if the column is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Return the number of elements in the column. + pub fn len(&self) -> usize { + self.values.len() + } } /// An array of singly-typed data values from the server. @@ -252,7 +295,7 @@ impl ValueArray { } /// Return an empty value array of the provided type. - fn empty(data_type: &DataType) -> ValueArray { + pub fn empty(data_type: &DataType) -> ValueArray { match data_type { DataType::UInt8 => ValueArray::UInt8(vec![]), DataType::UInt16 => ValueArray::UInt16(vec![]), @@ -589,6 +632,39 @@ impl DataType { pub(crate) fn is_nullable(&self) -> bool { matches!(self, DataType::Nullable(_)) } + + /// Parse out a data type from a string. + /// + /// This is a `nom`-based function, so that the method can be used in other + /// contexts. The `DataType::from_str()` implementation is a thin wrapper + /// around this. + pub(super) fn nom_parse(s: &str) -> IResult<&str, Self> { + alt(( + value(DataType::UInt8, tag("UInt8")), + value(DataType::UInt16, tag("UInt16")), + value(DataType::UInt32, tag("UInt32")), + value(DataType::UInt64, tag("UInt64")), + value(DataType::UInt128, tag("UInt128")), + value(DataType::Int8, tag("Int8")), + value(DataType::Int16, tag("Int16")), + value(DataType::Int32, tag("Int32")), + value(DataType::Int64, tag("Int64")), + value(DataType::Int128, tag("Int128")), + value(DataType::Float32, tag("Float32")), + value(DataType::Float64, tag("Float64")), + value(DataType::String, tag("String")), + value(DataType::Uuid, tag("UUID")), + value(DataType::Ipv4, tag("IPv4")), + value(DataType::Ipv6, tag("IPv6")), + // IMPORTANT: This needs to consume all its input, otherwise we may + // parse something like `DateTime(UTC)` as `Date`, which is + // incorrect. + value(DataType::Date, all_consuming(tag("Date"))), + // These need to be nested because `alt` supports a max of 21 + // parsers, and we have 22 data types. + alt((datetime, datetime64, enum8, nullable, array)), + ))(s) + } } impl fmt::Display for DataType { @@ -632,9 +708,23 @@ impl fmt::Display for DataType { } // Parse a quoted timezone, like `'UTC'` or `'America/Los_Angeles'` +// +// Note that the quotes may optionally be escaped, like `\'UTC\'`, which is +// needed to support deserializing table descriptions, where the types for each +// column are serialized as an escaped string. fn quoted_timezone(s: &str) -> IResult<&str, Tz> { map( - delimited(tag("'"), take_while1(|c| c != '\''), tag("'")), + delimited( + preceded(opt(tag("\\")), tag("'")), + take_while1(|c: char| { + c.is_ascii_alphanumeric() + || c == '/' + || c == '+' + || c == '-' + || c == '_' + }), + preceded(opt(tag("\\")), tag("'")), + ), parse_timezone, )(s) } @@ -687,104 +777,68 @@ fn parse_timezone(s: &str) -> Tz { s.parse().unwrap_or_else(|_| *DEFAULT_TIMEZONE) } -impl std::str::FromStr for DataType { - type Err = Error; - - fn from_str(s: &str) -> Result { - // Simple scalar types. - if s == "UInt8" { - return Ok(DataType::UInt8); - } else if s == "UInt16" { - return Ok(DataType::UInt16); - } else if s == "UInt32" { - return Ok(DataType::UInt32); - } else if s == "UInt64" { - return Ok(DataType::UInt64); - } else if s == "UInt128" { - return Ok(DataType::UInt128); - } else if s == "Int8" { - return Ok(DataType::Int8); - } else if s == "Int16" { - return Ok(DataType::Int16); - } else if s == "Int32" { - return Ok(DataType::Int32); - } else if s == "Int64" { - return Ok(DataType::Int64); - } else if s == "Int128" { - return Ok(DataType::Int128); - } else if s == "Float32" { - return Ok(DataType::Float32); - } else if s == "Float64" { - return Ok(DataType::Float64); - } else if s == "String" { - return Ok(DataType::String); - } else if s == "UUID" { - return Ok(DataType::Uuid); - } else if s == "IPv4" { - return Ok(DataType::Ipv4); - } else if s == "IPv6" { - return Ok(DataType::Ipv6); - } else if s == "Date" { - return Ok(DataType::Date); - } - - // Check for datetime, possibly with a timezone. - if let Ok((_, dt)) = datetime(s) { - return Ok(dt); - }; +/// Parse an enum variant name. +fn variant_name(s: &str) -> IResult<&str, &str> { + delimited( + preceded(opt(tag("\\")), tag("'")), + alphanumeric1, + preceded(opt(tag("\\")), tag("'")), + )(s) +} - // Check for DateTime64 with precision, and possibly a timezone. - if let Ok((_, dt)) = datetime64(s) { - return Ok(dt); - }; +/// Parse a single enum variant, like `'Foo' = 1`. +/// +/// Note that the single-quotes may be escaped, which is required for parsing +/// the `ColumnDescription` type from a `TableColumns` server packet. +fn enum_variant(s: &str) -> IResult<&str, (i8, &str)> { + map(separated_pair(variant_name, tag(" = "), nom_i8), |(name, variant)| { + (variant, name) + })(s) +} - // Check for Enum8s. - // - // These are written like "Enum8('foo' = 1, 'bar' = 2)" - if let Some(suffix) = s.strip_prefix("Enum8(") { - let Some(inner) = suffix.strip_suffix(")") else { - return Err(Error::UnsupportedDataType(s.to_string())); - }; +/// Parse an `Enum8` data type from a string. +pub(super) fn enum8(s: &str) -> IResult<&str, DataType> { + map( + delimited( + tag("Enum8("), + separated_list1(tag(", "), enum_variant), + tag(")"), + ), + |variants| { let mut map = IndexMap::new(); - for each in inner.split(',') { - let Some((name, value)) = each.split_once(" = ") else { - return Err(Error::UnsupportedDataType(s.to_string())); - }; - let Ok(value) = value.parse() else { - return Err(Error::UnsupportedDataType(s.to_string())); - }; - // Trim whitespace from the name and strip any single-quotes. - let name = name.trim().trim_matches('\'').to_string(); - map.insert(value, name.to_string()); - } - return Ok(DataType::Enum8(map)); - } + for (variant, name) in variants.into_iter() { + map.insert(variant, name.to_string()); + } + DataType::Enum8(map) + }, + )(s) +} - // Recurse for nullable types. - if let Some(suffix) = s.strip_prefix("Nullable(") { - let Some(inner) = suffix.strip_suffix(')') else { - return Err(Error::UnsupportedDataType(s.to_string())); - }; - return inner - .parse() - .map(|inner| DataType::Nullable(Box::new(inner))); - } +fn nullable(s: &str) -> IResult<&str, DataType> { + map(delimited(tag("Nullable("), DataType::nom_parse, tag(")")), |inner| { + DataType::Nullable(Box::new(inner)) + })(s) +} - // And for arrays. - if let Some(suffix) = s.strip_prefix("Array(") { - let Some(inner) = suffix.strip_suffix(')') else { - return Err(Error::UnsupportedDataType(s.to_string())); - }; - return inner.parse().map(|inner| DataType::Array(Box::new(inner))); - } +fn array(s: &str) -> IResult<&str, DataType> { + map(delimited(tag("Array("), DataType::nom_parse, tag(")")), |inner| { + DataType::Array(Box::new(inner)) + })(s) +} - // Anything else is unsupported for now. - Err(Error::UnsupportedDataType(s.to_string())) +impl std::str::FromStr for DataType { + type Err = Error; + + fn from_str(s: &str) -> Result { + Self::nom_parse(s) + .map(|(_, parsed)| parsed) + .map_err(|_| Error::UnsupportedDataType(s.to_string())) } } #[cfg(test)] mod tests { + use super::enum8; use super::Block; use super::BlockInfo; use super::Column; @@ -794,6 +848,8 @@ mod tests { use super::DEFAULT_TIMEZONE; use crate::native::block::datetime; use crate::native::block::datetime64; + use crate::native::block::enum_variant; + use crate::native::block::quoted_timezone; use chrono::SubsecRound as _; use chrono::Utc; use chrono_tz::Tz; @@ -933,6 +989,14 @@ mod tests { assert!(datetime64("DateTime64(1,'UTC')").is_err()); } + #[test] + fn parse_escaped_date_time64() { + assert_eq!( + DataType::DateTime64(Precision(1), Tz::UTC), + datetime64(r#"DateTime64(1, \'UTC\')"#).unwrap().1 + ); + } + #[test] fn concat_blocks() { let data = vec![0, 1]; @@ -955,4 +1019,73 @@ mod tests { ValueArray::UInt64([data.as_slice(), data.as_slice()].concat()) ); } + + #[test] + fn test_parse_enum_variant() { + assert_eq!(enum_variant("'Foo' = 1'").unwrap().1, (1, "Foo"),); + assert_eq!(enum_variant("\\'Foo\\' = 1'").unwrap().1, (1, "Foo"),); + + enum_variant("'Foo'").unwrap_err(); + enum_variant("'Foo' = ").unwrap_err(); + enum_variant("'Foo' = x").unwrap_err(); + enum_variant("\"Foo\" = 1").unwrap_err(); + } + + #[test] + fn test_parse_enum8() { + let parsed = enum8("Enum8('Foo' = 1, 'Bar' = 2)").unwrap().1; + let DataType::Enum8(map) = parsed else { + panic!("Expected DataType::Enum8, found {parsed:#?}"); + }; + assert_eq!(map.len(), 2); + assert_eq!(map.get(&1).unwrap(), "Foo"); + assert_eq!(map.get(&2).unwrap(), "Bar"); + } + + #[test] + fn test_parse_array_enum8_with_escapes() { + const INPUT: &str = r#"Array(Enum8(\'Bool\' = 1, \'I64\' = 2))"#; + let parsed = DataType::nom_parse(INPUT).unwrap().1; + let DataType::Array(inner) = parsed else { + panic!("Expected a `DataType::Array(_)`, found {parsed:#?}"); + }; + let DataType::Enum8(map) = &*inner else { + panic!("Expected a `DataType::Enum8(_)`, found {inner:#?}"); + }; + assert_eq!(map.len(), 2); + assert_eq!(map.get(&1).unwrap(), "Bool"); + assert_eq!(map.get(&2).unwrap(), "I64"); + } + + #[test] + fn test_parse_array_enum8_with_bad_escapes() { + DataType::nom_parse(r#"Array(Enum8(\\'Bool\' = 1, \'I64\' = 2))"#) + .expect_err("Should fail to parse data type with bad escape"); + DataType::nom_parse(r#"Array(Enum8(\t\'Bool\' = 1, \'I64\' = 2))"#) + .expect_err("Should fail to parse data type with bad escape"); + DataType::nom_parse(r#"Array(Enum8(\"Bool\' = 1, \'I64\' = 2))"#) + .expect_err("Should fail to parse data type with bad escape"); + } + + #[test] + fn test_parse_all_known_timezones() { + for tz in chrono_tz::TZ_VARIANTS.iter() { + let quoted = format!("'{}'", tz); + let Ok(out) = quoted_timezone("ed) else { + panic!("Failed to parse quoted timezone: {quoted}"); + }; + assert_eq!(&out.1, tz, "Failed to parse quoted timezone: {quoted}"); + + let escape_quoted = format!("\\'{}\\'", tz); + let Ok(out) = quoted_timezone(&escape_quoted) else { + panic!( + "Failed to parse escaped quoted timezone: {escape_quoted}" + ); + }; + assert_eq!( + &out.1, tz, + "Failed to parse escaped quoted timezone: {escape_quoted}" + ); + } + } } diff --git a/oximeter/db/src/native/connection.rs b/oximeter/db/src/native/connection.rs index 911788a91f..706dbf3331 100644 --- a/oximeter/db/src/native/connection.rs +++ b/oximeter/db/src/native/connection.rs @@ -6,6 +6,7 @@ //! A connection and pool for talking to the ClickHouse server. +use super::block::Block; use super::io::packet::client::Encoder; use super::io::packet::server::Decoder; use super::packets::client::Packet as ClientPacket; @@ -208,8 +209,34 @@ impl Connection { Ok(false) } - /// Send a SQL query, possibly with data. + /// Send a SQL query that inserts data. + pub async fn insert( + &mut self, + query: &str, + block: Block, + ) -> Result { + self.query_inner(query, Some(block)).await + } + + /// Send a SQL query, without any data. pub async fn query(&mut self, query: &str) -> Result { + self.query_inner(query, None).await + } + + // Send a SQL query, possibly with data. + // + // If data is present, it is sent after the SQL query itself. An error is + // returned if the server indicates that the block structure required by the + // insert query doesn't match that of the provided block. + // + // IMPORTANT: We do not currently validate that data is provided iff the + // query is an INSERT statement! Callers are required to ensure that they + // provide data if and only if the query requires it. + async fn query_inner( + &mut self, + query: &str, + maybe_data: Option, + ) -> Result { let mut query_result = QueryResult { id: Uuid::new_v4(), progress: Progress::default(), @@ -221,12 +248,109 @@ impl Connection { self.writer.send(ClientPacket::Query(query)).await?; probes::packet__sent!(|| "Query"); self.outstanding_query = true; + + // If we have data to send, wait for the server to send an empty block + // that describes its structure. + if let Some(block_to_insert) = maybe_data { + let res: Result<(), Error> = loop { + match self.reader.next().await { + Some(Ok(packet)) => match packet { + ServerPacket::Hello(_) + | ServerPacket::Pong + // The server should only send this after we've + // inserted our data. + | ServerPacket::EndOfStream => + { + let kind = packet.kind(); + probes::unexpected__server__packet!(|| kind); + break Err(Error::UnexpectedPacket(kind)); + } + ServerPacket::Data(block) => { + probes::data__packet__received!(|| { + ( + block.n_columns, + block.n_rows, + block + .columns + .iter() + .map(|(name, col)| { + ( + name.clone(), + col.data_type.to_string(), + ) + }) + .collect::>(), + ) + }); + + // Similar to when selecting data, the server sends + // a block with zero rows that describes the table + // structure, so any block with a non-zero number of + // rows is an error here. + if block.n_rows != 0 { + break Err(Error::ExpectedEmptyDataBlock); + } + + // Don't concatenate the block, but check that its + // structure matches what we're about to insert. + if !block.matches_structure(&block_to_insert) { + break Err(Error::MismatchedBlockStructure); + } + + // Finally, send the actual data block and an empty + // block to tell the server we're finished. + if let Err(e) = self + .writer + .send(ClientPacket::Data(block_to_insert)) + .await + { + break Err(e); + } + break self + .writer + .send(ClientPacket::Data(Block::empty())) + .await; + } + ServerPacket::Exception(exceptions) => { + break Err(Error::Exception { exceptions }) + } + ServerPacket::Progress(progress) => { + query_result.progress += progress + } + ServerPacket::ProfileInfo(info) => { + let _ = query_result.profile_info.replace(info); + } + ServerPacket::TableColumns(columns) => { + if !block_to_insert + .matches_table_description(&columns) + { + break Err(Error::MismatchedBlockStructure); + } + } + ServerPacket::ProfileEvents(block) => { + let _ = query_result.profile_events.replace(block); + } + }, + Some(Err(e)) => break Err(e), + None => break Err(Error::Disconnected), + } + }; + if let Err(e) = res { + self.outstanding_query = false; + return Err(e); + } + } + + // Now wait for the remainder of the query to execute. let res = loop { match self.reader.next().await { Some(Ok(packet)) => match packet { - ServerPacket::Hello(_) => { - probes::unexpected__server__packet!(|| "Hello"); - break Err(Error::UnexpectedPacket("Hello")); + ServerPacket::Hello(_) + | ServerPacket::Pong + | ServerPacket::TableColumns(_) => { + let kind = packet.kind(); + probes::unexpected__server__packet!(|| kind); + break Err(Error::UnexpectedPacket(kind)); } ServerPacket::Data(block) => { probes::data__packet__received!(|| { @@ -263,10 +387,6 @@ impl Connection { ServerPacket::Progress(progress) => { query_result.progress += progress } - ServerPacket::Pong => { - probes::unexpected__server__packet!(|| "Hello"); - break Err(Error::UnexpectedPacket("Pong")); - } ServerPacket::EndOfStream => break Ok(query_result), ServerPacket::ProfileInfo(info) => { let _ = query_result.profile_info.replace(info); diff --git a/oximeter/db/src/native/io/mod.rs b/oximeter/db/src/native/io/mod.rs index 999e90f4f6..1d54b13969 100644 --- a/oximeter/db/src/native/io/mod.rs +++ b/oximeter/db/src/native/io/mod.rs @@ -13,4 +13,5 @@ pub mod packet; pub mod profile_info; pub mod progress; pub mod string; +pub mod table_columns; pub mod varuint; diff --git a/oximeter/db/src/native/io/packet/client.rs b/oximeter/db/src/native/io/packet/client.rs index c8397d68a2..78e9fd09df 100644 --- a/oximeter/db/src/native/io/packet/client.rs +++ b/oximeter/db/src/native/io/packet/client.rs @@ -67,8 +67,7 @@ impl Encoder { io::string::encode("", &mut dst); // Send an empty block to signal the end of data transfer. - self.encode_block(Block::empty(std::iter::empty()).unwrap(), &mut dst) - .unwrap(); + self.encode_block(Block::empty(), &mut dst).unwrap(); } /// Encode a ClientInfo into the buffer. diff --git a/oximeter/db/src/native/io/packet/server.rs b/oximeter/db/src/native/io/packet/server.rs index 0ef6d96d4b..1fd9e2cac4 100644 --- a/oximeter/db/src/native/io/packet/server.rs +++ b/oximeter/db/src/native/io/packet/server.rs @@ -161,6 +161,20 @@ impl tokio_util::codec::Decoder for Decoder { }; Packet::ProfileInfo(info) } + Packet::TABLE_COLUMNS => { + match io::table_columns::decode(&mut buf) { + Ok(Some(columns)) => Packet::TableColumns(columns), + Ok(None) => return Ok(None), + Err(e) => { + probes::invalid__packet!(|| ( + "TableColumns", + src.len() + )); + src.clear(); + return Err(e); + } + } + } Packet::PROFILE_EVENTS => match io::block::decode(&mut buf) { // Profile events are encoded as a data block. Ok(Some(block)) => Packet::ProfileEvents(block), diff --git a/oximeter/db/src/native/io/table_columns.rs b/oximeter/db/src/native/io/table_columns.rs new file mode 100644 index 0000000000..c25b2c88a9 --- /dev/null +++ b/oximeter/db/src/native/io/table_columns.rs @@ -0,0 +1,213 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright 2024 Oxide Computer Company + +//! Decoding table column descriptions from the database. +//! +//! As is helpfully noted in the ClickHouse sources at +//! , +//! +//! > NOTE: Serialization format is insane. +//! +//! The server serializes an array of `ColumnDescription`s to help the client +//! validate data before inserting it. This is a pure-text format that includes +//! column names and types, but also comments, codecs, and TTL values. The +//! format is indeed pretty wonky, so we're using `nom` to help parse it +//! robustly. For now, we only care about the names and data types; the +//! remainder is collected as-is, without interpretation, into a generic +//! `details` field. + +use super::string; +use crate::native::block::DataType; +use crate::native::packets::server::ColumnDescription; +use crate::native::Error; +use nom::bytes::streaming::is_not; +use nom::bytes::streaming::tag; +use nom::character::streaming::u64 as nom_u64; +use nom::error::ErrorKind; +use nom::sequence::delimited; +use nom::sequence::separated_pair; +use nom::sequence::terminated; +use nom::Err as NomErr; +use nom::IResult; + +/// Decode an array of `ColumnDescription`s from a buffer. +pub fn decode( + src: &mut &[u8], +) -> Result>, Error> { + // See `src/Storages/ColumnDescription.cpp` for details of the encoding + // here, but briefly: + // + // - The packet starts with a "header" describing the table name (usually + // empty), the version, the number of columns. + // - Each column is serialized as a string, with its name; type; comment; + // compression codec; settings; statistics; and TTL + // - These are all newline delimited, with tabs prefixing each of the items + // listed above. + // + // See https://github.com/ClickHouse/ClickHouse/blob/c82cf25b3e5864bcc153cbe45adb8c6527e1ec6e/src/Server/TCPHandler.cpp#L2433 + // for more details, which is where the encoding of these values starts. + let Some(_table_name) = string::decode(src)? else { + return Ok(None); + }; + + // The column description itself is serialized as a ClickHouse + // varuint-prefixed string. Deserialize this, and then parse out the pieces + // of that parsed string. + let Some(text) = string::decode(src)? else { + return Ok(None); + }; + column_descriptions(&text) +} + +fn column_descriptions( + text: &str, +) -> Result>, Error> { + // Try to match the version "header" + let text = match tag::<_, _, (_, ErrorKind)>("columns format version: 1\n")( + text, + ) { + Ok((text, _match)) => text, + Err(NomErr::Incomplete(_)) => return Ok(None), + Err(e) => { + return Err(Error::InvalidPacket { + kind: "TableColumns", + msg: format!("failed to parse header: {e}"), + }) + } + }; + + // Match the number of columns. + let (text, n_columns) = match column_count(text) { + Ok(input) => input, + Err(NomErr::Incomplete(_)) => return Ok(None), + Err(e) => { + return Err(Error::InvalidPacket { + kind: "TableColumns", + msg: format!("failed to parse column count: {e}"), + }); + } + }; + + // Extract each column, each of which is on a separate line. + let mut out = Vec::with_capacity( + usize::try_from(n_columns).map_err(|_| Error::BlockTooLarge)?, + ); + for line in text.lines() { + let col = match column_description(line) { + Ok(out) => out.1, + Err(NomErr::Incomplete(_)) => return Ok(None), + Err(e) => { + return Err(Error::InvalidPacket { + kind: "TableColumns", + msg: format!("failed to parse description: {e}"), + }) + } + }; + out.push(col); + } + assert_eq!(out.len(), n_columns as usize); + Ok(Some(out)) +} + +// Parse out the column count from a header line like: "3 columns:\n". +fn column_count(s: &str) -> IResult<&str, u64> { + terminated(nom_u64, tag(" columns:\n"))(s) +} + +// Parse a backtick-quoted column name like: "`foo.bar`". +fn backtick_quoted_column_name(s: &str) -> IResult<&str, &str> { + delimited(tag("`"), is_not("`"), tag("`"))(s) +} + +// Parse a full column description from one line. +// +// Note that this must _not_ end in a newline, so one should use something like +// `str::lines()` and pass the result to this method. +fn column_description(s: &str) -> IResult<&str, ColumnDescription> { + let (s, (name, data_type)) = separated_pair( + backtick_quoted_column_name, + tag(" "), + DataType::nom_parse, + )(s)?; + + // At this point, we take any remaining output as the details, which may be + // empty. + Ok(( + "", + ColumnDescription { + name: name.to_string(), + data_type, + details: s.to_string(), + }, + )) +} + +#[cfg(test)] +mod tests { + use super::backtick_quoted_column_name; + use super::column_count; + use super::column_description; + use super::column_descriptions; + use super::NomErr; + use crate::native::block::DataType; + + #[test] + fn test_backtick_quoted_column_name() { + assert_eq!(backtick_quoted_column_name("`foo`").unwrap().1, "foo"); + assert_eq!( + backtick_quoted_column_name("`foo.bar`").unwrap().1, + "foo.bar" + ); + assert!(matches!( + backtick_quoted_column_name("`foo").unwrap_err(), + NomErr::Incomplete(_) + )); + } + + #[test] + fn test_column_count() { + assert_eq!(column_count("4 columns:\n").unwrap().1, 4,); + } + + #[test] + fn test_column_description_only_required_parts() { + let desc = column_description("`timeseries_name` String").unwrap().1; + assert_eq!(desc.name, "timeseries_name"); + assert_eq!(desc.data_type, DataType::String); + assert!(desc.details.is_empty()); + } + + #[test] + fn test_column_descriptions() { + static INPUT: &str = r#"columns format version: 1 +2 columns: +`timeseries_name` String +`fields.name` Array(String) +"#; + let columns = column_descriptions(&mut &*INPUT) + .expect("failed to decode column descriptions") + .expect("expected Some(_) column description"); + assert_eq!(columns.len(), 2); + assert_eq!(columns[0].name, "timeseries_name"); + assert_eq!(columns[0].data_type, DataType::String); + assert_eq!(columns[1].name, "fields.name"); + assert_eq!( + columns[1].data_type, + DataType::Array(Box::new(DataType::String)) + ); + } + + #[test] + fn test_column_description_with_default() { + static INPUT: &str = r#"`timeseries_name` String\tDEFAULT "foo""#; + let column = column_description(&mut &*INPUT) + .expect("failed to decode column description") + .1; + assert_eq!(column.name, "timeseries_name"); + assert_eq!(column.data_type, DataType::String); + assert_eq!(column.details, "\\tDEFAULT \"foo\""); + } +} diff --git a/oximeter/db/src/native/mod.rs b/oximeter/db/src/native/mod.rs index 02cd485569..6c14988a86 100644 --- a/oximeter/db/src/native/mod.rs +++ b/oximeter/db/src/native/mod.rs @@ -171,6 +171,9 @@ pub enum Error { #[error("Unrecognized server packet, kind = {0}")] UnrecognizedServerPacket(u8), + #[error("Invalid data packet kind = '{kind}', msg = {msg}")] + InvalidPacket { kind: &'static str, msg: String }, + #[error("Encountered non-UTF8 string")] NonUtf8String, @@ -205,11 +208,32 @@ pub enum Error { )] Exception { exceptions: Vec }, - #[error("Cannot concatenate blocks with mismatched structure")] + #[error( + "Mismatched data block structure when concatenating blocks or \ + inserting data blocks into the database" + )] MismatchedBlockStructure, #[error("Value out of range for corresponding ClickHouse type")] OutOfRange { type_name: String, min: String, max: String, value: String }, + + #[error("Failed to serialize / deserialize value from the database")] + Serde(String), + + #[error("No column with name '{0}'")] + NoSuchColumn(String), + + #[error("Too many rows to create block")] + TooManyRows, + + #[error("Column has unexpected type")] + UnexpectedColumnType, + + #[error("Data block is too large")] + BlockTooLarge, + + #[error("Expected an empty data block")] + ExpectedEmptyDataBlock, } /// Error codes and related constants. diff --git a/oximeter/db/src/native/packets/server.rs b/oximeter/db/src/native/packets/server.rs index 2e6be64a9d..0acf614496 100644 --- a/oximeter/db/src/native/packets/server.rs +++ b/oximeter/db/src/native/packets/server.rs @@ -6,10 +6,30 @@ //! Packets sent from the server. +use crate::native::block::Block; +use crate::native::block::DataType; use std::fmt; use std::time::Duration; -use crate::native::block::Block; +/// Description of a single column in a table. +/// +/// This is used during insertion queries. When the client sends a request to +/// insert data, the server responds with a description of all the columns in +/// the table, which the client is supposed to use to verify the data block +/// being inserted. +#[derive(Clone, Debug, PartialEq)] +pub struct ColumnDescription { + /// The name of the column. + pub name: String, + /// The type of the column. + pub data_type: DataType, + /// Other details for the column. + /// + /// This is collected as a string, but otherwise unparsed or processed. We + /// don't care about these details at this point, and do nothing with them + /// for now. + pub details: String, +} /// A packet sent from the ClickHouse server to the client. #[derive(Clone, Debug, PartialEq)] @@ -30,6 +50,8 @@ pub enum Packet { EndOfStream, /// Profiling data for a query. ProfileInfo(ProfileInfo), + /// Metadata about the columns in a table. + TableColumns(Vec), /// A data block containing profiling events during a query. ProfileEvents(Block), } @@ -42,6 +64,7 @@ impl Packet { pub const PONG: u8 = 4; pub const END_OF_STREAM: u8 = 5; pub const PROFILE_INFO: u8 = 6; + pub const TABLE_COLUMNS: u8 = 11; pub const PROFILE_EVENTS: u8 = 14; /// Return the kind of the packet as a string. @@ -54,6 +77,7 @@ impl Packet { Packet::Pong => "Pong", Packet::EndOfStream => "EndOfStream", Packet::ProfileInfo(_) => "ProfileInfo", + Packet::TableColumns(_) => "TableColumns", Packet::ProfileEvents(_) => "ProfileEvents", } } diff --git a/oximeter/types/Cargo.toml b/oximeter/types/Cargo.toml index 6d6bbc07e6..3f4ab66c2e 100644 --- a/oximeter/types/Cargo.toml +++ b/oximeter/types/Cargo.toml @@ -14,6 +14,7 @@ float-ord.workspace = true num.workspace = true omicron-common.workspace = true omicron-workspace-hack.workspace = true +parse-display.workspace = true regex.workspace = true schemars = { workspace = true, features = [ "uuid1", "bytes", "chrono" ] } serde.workspace = true diff --git a/oximeter/types/src/schema.rs b/oximeter/types/src/schema.rs index 135c77462a..3f0e8beb5c 100644 --- a/oximeter/types/src/schema.rs +++ b/oximeter/types/src/schema.rs @@ -14,6 +14,8 @@ use crate::Metric; use crate::Target; use chrono::DateTime; use chrono::Utc; +use parse_display::Display; +use parse_display::FromStr; use schemars::JsonSchema; use serde::Deserialize; use serde::Serialize; @@ -63,11 +65,14 @@ impl FieldSchema { Debug, PartialEq, Eq, + Hash, PartialOrd, Ord, Deserialize, Serialize, JsonSchema, + FromStr, + Display, )] #[serde(rename_all = "snake_case")] pub enum FieldSource { diff --git a/oximeter/types/src/types.rs b/oximeter/types/src/types.rs index 60260e3649..cea48f4477 100644 --- a/oximeter/types/src/types.rs +++ b/oximeter/types/src/types.rs @@ -15,6 +15,8 @@ use chrono::DateTime; use chrono::Utc; use num::traits::One; use num::traits::Zero; +use parse_display::Display; +use parse_display::FromStr; use schemars::JsonSchema; use serde::Deserialize; use serde::Serialize; @@ -40,12 +42,15 @@ use uuid::Uuid; Debug, PartialEq, Eq, + Hash, PartialOrd, Ord, JsonSchema, Serialize, Deserialize, strum::EnumIter, + FromStr, + Display, )] #[serde(rename_all = "snake_case")] pub enum FieldType { @@ -86,12 +91,6 @@ impl FieldType { } } -impl std::fmt::Display for FieldType { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{:?}", self) - } -} - macro_rules! impl_field_type_from { ($ty:ty, $variant:path) => { impl From<&$ty> for FieldType { @@ -313,6 +312,8 @@ pub struct Field { Serialize, Deserialize, strum::EnumIter, + FromStr, + Display, )] #[serde(rename_all = "snake_case")] pub enum DatumType { @@ -390,12 +391,6 @@ impl DatumType { } } -impl std::fmt::Display for DatumType { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{:?}", self) - } -} - /// A `Datum` is a single sampled data point from a metric. #[derive(Clone, Debug, PartialEq, JsonSchema, Serialize, Deserialize)] #[serde(tag = "type", content = "datum", rename_all = "snake_case")]