From 1e2dba3469efb57a3d33f085bae54aeed43ffd21 Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Mon, 29 Jan 2024 17:21:51 +0000 Subject: [PATCH] WIP: Oximeter query language - Basic query grammar - Started shell sketch - Adding timeseries and transformation implementations --- Cargo.lock | 61 ++ Cargo.toml | 90 +- oximeter/db/Cargo.toml | 52 +- oximeter/db/src/bin/{oxdb.rs => oxdb/main.rs} | 305 +------ oximeter/db/src/bin/oxdb/oxql.rs | 288 ++++++ oximeter/db/src/bin/oxdb/sql.rs | 298 +++++++ oximeter/db/src/client.rs | 253 +++--- oximeter/db/src/lib.rs | 9 +- oximeter/db/src/oxql/ast.rs | 254 ++++++ oximeter/db/src/oxql/grammar.rs | 820 ++++++++++++++++++ oximeter/db/src/oxql/mod.rs | 30 + oximeter/db/src/oxql/transformation.rs | 23 + workspace-hack/Cargo.toml | 2 + 13 files changed, 1988 insertions(+), 497 deletions(-) rename oximeter/db/src/bin/{oxdb.rs => oxdb/main.rs} (50%) create mode 100644 oximeter/db/src/bin/oxdb/oxql.rs create mode 100644 oximeter/db/src/bin/oxdb/sql.rs create mode 100644 oximeter/db/src/oxql/ast.rs create mode 100644 oximeter/db/src/oxql/grammar.rs create mode 100644 oximeter/db/src/oxql/mod.rs create mode 100644 oximeter/db/src/oxql/transformation.rs diff --git a/Cargo.lock b/Cargo.lock index dcdc4b98327..f5b4daa58a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4048,6 +4048,16 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" +[[package]] +name = "matrixmultiply" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7574c1cf36da4798ab73da5b215bbf444f50718207754cb522201d78d1cd0ff2" +dependencies = [ + "autocfg", + "rawpointer", +] + [[package]] name = "maybe-uninit" version = "2.0.0" @@ -4259,6 +4269,21 @@ dependencies = [ "tempfile", ] +[[package]] +name = "ndarray" +version = "0.15.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adb12d4e967ec485a5f71c6311fe28158e9d6f4bc4a447b474184d0f91a8fa32" +dependencies = [ + "matrixmultiply", + "num-complex", + "num-integer", + "num-traits", + "rawpointer", + "rayon", + "serde", +] + [[package]] name = "nested" version = "0.1.1" @@ -5427,6 +5452,7 @@ dependencies = [ "num-traits", "once_cell", "openapiv3", + "peg-runtime", "pem-rfc7468", "petgraph", "postgres-types", @@ -5804,10 +5830,12 @@ dependencies = [ "highway", "indexmap 2.2.2", "itertools 0.12.1", + "ndarray", "omicron-common", "omicron-test-utils", "omicron-workspace-hack", "oximeter", + "peg", "reedline", "regex", "reqwest", @@ -6100,6 +6128,33 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" +[[package]] +name = "peg" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "400bcab7d219c38abf8bd7cc2054eb9bbbd4312d66f6a5557d572a203f646f61" +dependencies = [ + "peg-macros", + "peg-runtime", +] + +[[package]] +name = "peg-macros" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46e61cce859b76d19090f62da50a9fe92bab7c2a5f09e183763559a2ac392c90" +dependencies = [ + "peg-runtime", + "proc-macro2", + "quote", +] + +[[package]] +name = "peg-runtime" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36bae92c60fa2398ce4678b98b2c4b5a7c61099961ca1fa305aec04a9ad28922" + [[package]] name = "pem" version = "3.0.2" @@ -6924,6 +6979,12 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "rawpointer" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a357793950651c4ed0f3f52338f53b2f809f32d83a07f72909fa13e4c6c1e3" + [[package]] name = "rayon" version = "1.8.0" diff --git a/Cargo.toml b/Cargo.toml index 16f064d64dc..0d144bd37c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -162,7 +162,6 @@ backoff = { version = "0.4.0", features = [ "tokio" ] } base64 = "0.21.7" bb8 = "0.8.3" bcs = "0.1.6" -bincode = "1.3.3" bootstore = { path = "bootstore" } bootstrap-agent-client = { path = "clients/bootstrap-agent-client" } buf-list = { version = "1.0.3", features = ["tokio1"] } @@ -251,6 +250,7 @@ mockall = "0.12" newtype_derive = "0.1.6" mg-admin-client = { path = "clients/mg-admin-client" } multimap = "0.8.1" +ndarray = { version = "0.15.6", default-features = false, features = ["std", "rayon", "serde"] } nexus-blueprint-execution = { path = "nexus/blueprint-execution" } nexus-client = { path = "clients/nexus-client" } nexus-db-model = { path = "nexus/db-model" } @@ -258,16 +258,15 @@ nexus-db-queries = { path = "nexus/db-queries" } nexus-defaults = { path = "nexus/defaults" } nexus-deployment = { path = "nexus/deployment" } nexus-inventory = { path = "nexus/inventory" } -omicron-certificates = { path = "certificates" } -omicron-passwords = { path = "passwords" } -omicron-workspace-hack = "0.1.0" -oxlog = { path = "dev-tools/oxlog" } nexus-test-interface = { path = "nexus/test-interface" } nexus-test-utils-macros = { path = "nexus/test-utils-macros" } nexus-test-utils = { path = "nexus/test-utils" } nexus-types = { path = "nexus/types" } num-integer = "0.1.45" num = { version = "0.4.1", default-features = false, features = [ "libm" ] } +omicron-certificates = { path = "certificates" } +omicron-passwords = { path = "passwords" } +omicron-workspace-hack = "0.1.0" omicron-common = { path = "common" } omicron-gateway = { path = "gateway" } omicron-nexus = { path = "nexus" } @@ -278,13 +277,13 @@ omicron-test-utils = { path = "test-utils" } omicron-zone-package = "0.11.0" oxide-client = { path = "clients/oxide-client" } oxide-vpc = { git = "https://github.com/oxidecomputer/opte", rev = "1d29ef60a18179babfb44f0f7a3c2fe71034a2c1", features = [ "api", "std" ] } +oxlog = { path = "dev-tools/oxlog" } once_cell = "1.19.0" openapi-lint = { git = "https://github.com/oxidecomputer/openapi-lint", branch = "main" } openapiv3 = "2.0.0" # must match samael's crate! openssl = "0.10" openssl-sys = "0.9" -openssl-probe = "0.1.5" opte-ioctl = { git = "https://github.com/oxidecomputer/opte", rev = "1d29ef60a18179babfb44f0f7a3c2fe71034a2c1" } oso = "0.27" owo-colors = "4.0.0" @@ -300,10 +299,9 @@ parse-display = "0.9.0" partial-io = { version = "0.5.4", features = ["proptest1", "tokio1"] } parse-size = "1.0.0" paste = "1.0.14" -percent-encoding = "2.3.1" +peg = "0.8.2" pem = "3.0" petgraph = "0.6.4" -postgres-protocol = "0.6.6" predicates = "3.1.0" pretty_assertions = "1.4.0" pretty-hex = "0.4.1" @@ -363,9 +361,8 @@ slog-error-chain = { git = "https://github.com/oxidecomputer/slog-error-chain", slog-term = "2.9" smf = "0.2" sp-sim = { path = "sp-sim" } -sprockets-common = { git = "http://github.com/oxidecomputer/sprockets", rev = "77df31efa5619d0767ffc837ef7468101608aee9" } -sprockets-host = { git = "http://github.com/oxidecomputer/sprockets", rev = "77df31efa5619d0767ffc837ef7468101608aee9" } sprockets-rot = { git = "http://github.com/oxidecomputer/sprockets", rev = "77df31efa5619d0767ffc837ef7468101608aee9" } +sqlformat = "0.2.3" sqlparser = { version = "0.43.1", features = [ "visitor" ] } static_assertions = "1.1.0" # Please do not change the Steno version to a Git dependency. It makes it @@ -438,23 +435,12 @@ debug = "line-tables-only" # times, because it allows target and host dependencies to be unified. debug = "line-tables-only" -# `bindgen` is used by `samael`'s build script; building it with optimizations -# makes that build script run ~5x faster, more than offsetting the additional -# build time added to `bindgen` itself. -[profile.dev.package.bindgen] -opt-level = 3 - # `lalrpop` is used by `polar-core`'s build script; building it with # optimizations makes that build script run ~20x faster, more than offsetting # the additional build time added to `lalrpop` itself. [profile.dev.package.lalrpop] opt-level = 3 -# `polar-core` is exercised heavily during the test suite and it's worthwhile to -# have it built with optimizations. -[profile.dev.package.polar-core] -opt-level = 3 - # Password hashing is expensive by construction. Build the hashing libraries # with optimizations to significantly speed up tests. [profile.dev.package.argon2] @@ -481,46 +467,12 @@ opt-level = 3 opt-level = 3 [profile.dev.package.chacha20poly1305] opt-level = 3 -[profile.dev.package.chacha20] -opt-level = 3 [profile.dev.package.vsss-rs] opt-level = 3 -[profile.dev.package.curve25519-dalek] -opt-level = 3 -[profile.dev.package.aead] -opt-level = 3 -[profile.dev.package.aes] -opt-level = 3 -[profile.dev.package.aes-gcm] -opt-level = 3 -[profile.dev.package.bcrypt-pbkdf] -opt-level = 3 -[profile.dev.package.blake2] -opt-level = 3 -[profile.dev.package.blake2b_simd] -opt-level = 3 -[profile.dev.package.block-buffer] -opt-level = 3 -[profile.dev.package.block-padding] -opt-level = 3 -[profile.dev.package.blowfish] -opt-level = 3 -[profile.dev.package.constant_time_eq] -opt-level = 3 -[profile.dev.package.crypto-bigint] -opt-level = 3 [profile.dev.package.crypto-common] opt-level = 3 -[profile.dev.package.ctr] -opt-level = 3 -[profile.dev.package.cbc] -opt-level = 3 [profile.dev.package.digest] opt-level = 3 -[profile.dev.package.ed25519] -opt-level = 3 -[profile.dev.package.ed25519-dalek] -opt-level = 3 [profile.dev.package.elliptic-curve] opt-level = 3 [profile.dev.package.generic-array] @@ -529,48 +481,20 @@ opt-level = 3 opt-level = 3 [profile.dev.package.hmac] opt-level = 3 -[profile.dev.package.lpc55_sign] -opt-level = 3 -[profile.dev.package.md5] -opt-level = 3 -[profile.dev.package.md-5] -opt-level = 3 [profile.dev.package.num-bigint] opt-level = 3 -[profile.dev.package.num-bigint-dig] -opt-level = 3 [profile.dev.package.rand] opt-level = 3 [profile.dev.package.rand_chacha] opt-level = 3 -[profile.dev.package.rand_core] -opt-level = 3 -[profile.dev.package.rand_hc] -opt-level = 3 -[profile.dev.package.rand_xorshift] -opt-level = 3 -[profile.dev.package.rsa] -opt-level = 3 -[profile.dev.package.salty] -opt-level = 3 -[profile.dev.package.signature] -opt-level = 3 [profile.dev.package.subtle] opt-level = 3 -[profile.dev.package.tiny-keccak] -opt-level = 3 [profile.dev.package.uuid] opt-level = 3 [profile.dev.package.cipher] opt-level = 3 -[profile.dev.package.cpufeatures] -opt-level = 3 -[profile.dev.package.poly1305] -opt-level = 3 [profile.dev.package.inout] opt-level = 3 -[profile.dev.package.keccak] -opt-level = 3 # # It's common during development to use a local copy of various complex diff --git a/oximeter/db/Cargo.toml b/oximeter/db/Cargo.toml index c4ee44acb61..9068b158a4c 100644 --- a/oximeter/db/Cargo.toml +++ b/oximeter/db/Cargo.toml @@ -15,21 +15,15 @@ clap.workspace = true dropshot.workspace = true futures.workspace = true highway.workspace = true -indexmap.workspace = true omicron-common.workspace = true omicron-workspace-hack.workspace = true oximeter.workspace = true -reedline.workspace = true regex.workspace = true -rustyline.workspace = true serde.workspace = true serde_json.workspace = true slog.workspace = true slog-async.workspace = true slog-term.workspace = true -sqlparser.workspace = true -sqlformat = "0.2.3" -tabled.workspace = true thiserror.workspace = true usdt.workspace = true uuid.workspace = true @@ -38,26 +32,72 @@ uuid.workspace = true workspace = true features = [ "serde" ] +[dependencies.indexmap] +workspace = true +optional = true + +[dependencies.ndarray] +workspace = true +optional = true + +[dependencies.peg] +workspace = true +optional = true + +[dependencies.reedline] +workspace = true +optional = true + [dependencies.reqwest] workspace = true features = [ "json" ] +[dependencies.rustyline] +workspace = true +optional = true + [dependencies.schemars] workspace = true features = [ "uuid1", "bytes", "chrono" ] +[dependencies.sqlformat] +workspace = true +optional = true + +[dependencies.sqlparser] +workspace = true +optional = true + [dependencies.tokio] workspace = true features = [ "rt-multi-thread", "macros" ] +[dependencies.tabled] +workspace = true +optional = true + [dev-dependencies] expectorate.workspace = true +indexmap.workspace = true itertools.workspace = true omicron-test-utils.workspace = true slog-dtrace.workspace = true +sqlparser.workspace = true strum.workspace = true tempfile.workspace = true +[features] +default = [ "oxql" ] +sql = [ + "dep:indexmap", + "dep:reedline", + "dep:rustyline", + "dep:sqlformat", + "dep:sqlparser", + "dep:tabled" +] +oxql = [ "dep:ndarray", "dep:peg", "dep:reedline", "dep:tabled" ] + [[bin]] name = "oxdb" doc = false diff --git a/oximeter/db/src/bin/oxdb.rs b/oximeter/db/src/bin/oxdb/main.rs similarity index 50% rename from oximeter/db/src/bin/oxdb.rs rename to oximeter/db/src/bin/oxdb/main.rs index 02a8054da07..b8855978508 100644 --- a/oximeter/db/src/bin/oxdb.rs +++ b/oximeter/db/src/bin/oxdb/main.rs @@ -4,31 +4,27 @@ //! Tool for developing against the Oximeter timeseries database, populating data and querying. -// Copyright 2023 Oxide Computer Company +// Copyright 2024 Oxide Computer Company use anyhow::{bail, Context}; use chrono::{DateTime, Utc}; use clap::{Args, Parser}; -use dropshot::EmptyScanParams; -use dropshot::WhichPage; use oximeter::{ types::{Cumulative, Sample}, Metric, Target, }; -use oximeter_db::sql::function_allow_list; -use oximeter_db::QueryMetadata; -use oximeter_db::QueryResult; -use oximeter_db::Table; use oximeter_db::{query, Client, DbWrite}; -use reedline::DefaultPrompt; -use reedline::DefaultPromptSegment; -use reedline::Reedline; -use reedline::Signal; use slog::{debug, info, o, Drain, Level, Logger}; use std::net::IpAddr; use std::net::SocketAddr; use uuid::Uuid; +#[cfg(feature = "sql")] +mod sql; + +#[cfg(feature = "oxql")] +mod oxql; + // Samples are inserted in chunks of this size, to avoid large allocations when inserting huge // numbers of timeseries. const INSERT_CHUNK_SIZE: usize = 100_000; @@ -151,10 +147,15 @@ enum Subcommand { }, /// Enter a SQL shell for interactive querying. + #[cfg(feature = "sql")] Sql { #[clap(flatten)] - opts: ShellOptions, + opts: crate::sql::ShellOptions, }, + + /// Enter the Oximeter Query Language shell for interactive querying. + #[cfg(feature = "oxql")] + Oxql, } async fn make_client( @@ -312,281 +313,6 @@ async fn query( Ok(()) } -fn print_basic_commands() { - println!("Basic commands:"); - println!(" \\?, \\h, help - Print this help"); - println!(" \\q, quit, exit, ^D - Exit the shell"); - println!(" \\l - List tables"); - println!(" \\d - Describe a table"); - println!( - " \\f - List or describe ClickHouse SQL functions" - ); - println!(); - println!("Or try entering a SQL `SELECT` statement"); -} - -async fn list_virtual_tables(client: &Client) -> anyhow::Result<()> { - let mut page = WhichPage::First(EmptyScanParams {}); - let limit = 100.try_into().unwrap(); - loop { - let results = client.timeseries_schema_list(&page, limit).await?; - for schema in results.items.iter() { - println!("{}", schema.timeseries_name); - } - if results.next_page.is_some() { - if let Some(last) = results.items.last() { - page = WhichPage::Next(last.timeseries_name.clone()); - } else { - return Ok(()); - } - } else { - return Ok(()); - } - } -} - -async fn describe_virtual_table( - client: &Client, - table: &str, -) -> anyhow::Result<()> { - match table.parse() { - Err(_) => println!("Invalid timeseries name: {table}"), - Ok(name) => { - if let Some(schema) = client.schema_for_timeseries(&name).await? { - let mut cols = - Vec::with_capacity(schema.field_schema.len() + 2); - let mut types = cols.clone(); - for field in schema.field_schema.iter() { - cols.push(field.name.clone()); - types.push(field.field_type.to_string()); - } - cols.push("timestamp".into()); - types.push("DateTime64".into()); - - if schema.datum_type.is_histogram() { - cols.push("start_time".into()); - types.push("DateTime64".into()); - - cols.push("bins".into()); - types.push(format!( - "Array[{}]", - schema - .datum_type - .to_string() - .strip_prefix("Histogram") - .unwrap() - .to_lowercase(), - )); - - cols.push("counts".into()); - types.push("Array[u64]".into()); - } else if schema.datum_type.is_cumulative() { - cols.push("start_time".into()); - types.push("DateTime64".into()); - cols.push("datum".into()); - types.push(schema.datum_type.to_string()); - } else { - cols.push("datum".into()); - types.push(schema.datum_type.to_string()); - } - - let mut builder = tabled::builder::Builder::default(); - builder.push_record(cols); // first record is the header - builder.push_record(types); - println!( - "{}", - builder.build().with(tabled::settings::Style::psql()) - ); - } else { - println!("No such timeseries: {table}"); - } - } - } - Ok(()) -} - -#[derive(Clone, Debug, Args)] -struct ShellOptions { - /// Print query metadata. - #[clap(long = "metadata")] - print_metadata: bool, - /// Print the original SQL query. - #[clap(long = "original")] - print_original_query: bool, - /// Print the rewritten SQL query that is actually run on the DB. - #[clap(long = "rewritten")] - print_rewritten_query: bool, - /// Print the transformed query, but do not run it. - #[clap(long)] - transform: Option, -} - -impl Default for ShellOptions { - fn default() -> Self { - Self { - print_metadata: true, - print_original_query: false, - print_rewritten_query: false, - transform: None, - } - } -} - -fn list_supported_functions() { - println!("Subset of ClickHouse SQL functions currently supported"); - println!( - "See https://clickhouse.com/docs/en/sql-reference/functions for more" - ); - println!(); - for func in function_allow_list().iter() { - println!(" {func}"); - } -} - -fn show_supported_function(name: &str) { - if let Some(func) = function_allow_list().iter().find(|f| f.name == name) { - println!("{}", func.name); - println!(" {}", func.usage); - println!(" {}", func.description); - } else { - println!("No supported function '{name}'"); - } -} - -fn print_sql_query(query: &str) { - println!( - "{}", - sqlformat::format( - &query, - &sqlformat::QueryParams::None, - sqlformat::FormatOptions { uppercase: true, ..Default::default() } - ) - ); - println!(); -} - -fn print_query_metadata(table: &Table, metadata: &QueryMetadata) { - println!("Metadata"); - println!(" Query ID: {}", metadata.id); - println!(" Result rows: {}", table.rows.len()); - println!(" Time: {:?}", metadata.elapsed); - println!(" Read: {}\n", metadata.summary.read); -} - -async fn sql_shell( - address: IpAddr, - port: u16, - log: Logger, - opts: ShellOptions, -) -> anyhow::Result<()> { - let client = make_client(address, port, &log).await?; - - // A workaround to ensure the client has all available timeseries when the - // shell starts. - let dummy = "foo:bar".parse().unwrap(); - let _ = client.schema_for_timeseries(&dummy).await; - - // Possibly just transform the query, but do not execute it. - if let Some(query) = &opts.transform { - let transformed = client.transform_query(query).await?; - println!( - "{}", - sqlformat::format( - &transformed, - &sqlformat::QueryParams::None, - sqlformat::FormatOptions { - uppercase: true, - ..Default::default() - } - ) - ); - return Ok(()); - } - - let mut ed = Reedline::create(); - let prompt = DefaultPrompt::new( - DefaultPromptSegment::Basic("0x".to_string()), - DefaultPromptSegment::Empty, - ); - println!("Oximeter SQL shell"); - println!(); - print_basic_commands(); - loop { - let sig = ed.read_line(&prompt); - match sig { - Ok(Signal::Success(buf)) => { - let cmd = buf.as_str().trim(); - match cmd { - "" => continue, - "\\?" | "\\h" | "help" => print_basic_commands(), - "\\q" | "quit" | "exit" => return Ok(()), - "\\l" | "\\d" => list_virtual_tables(&client).await?, - _ => { - if let Some(table_name) = cmd.strip_prefix("\\d") { - if table_name.is_empty() { - list_virtual_tables(&client).await?; - } else { - describe_virtual_table( - &client, - table_name.trim().trim_end_matches(';'), - ) - .await?; - } - } else if let Some(func_name) = cmd.strip_prefix("\\f") - { - if func_name.is_empty() { - list_supported_functions(); - } else { - show_supported_function( - func_name.trim().trim_end_matches(';'), - ); - } - } else { - match client.query(&buf).await { - Err(e) => println!("Query failed: {e:#?}"), - Ok(QueryResult { - original_query, - rewritten_query, - metadata, - table, - }) => { - println!(); - let mut builder = - tabled::builder::Builder::default(); - builder.push_record(&table.column_names); // first record is the header - for row in table.rows.iter() { - builder.push_record( - row.iter().map(ToString::to_string), - ); - } - if opts.print_original_query { - print_sql_query(&original_query); - } - if opts.print_rewritten_query { - print_sql_query(&rewritten_query); - } - println!( - "{}\n", - builder.build().with( - tabled::settings::Style::psql() - ) - ); - if opts.print_metadata { - print_query_metadata(&table, &metadata); - } - } - } - } - } - } - } - Ok(Signal::CtrlD) => return Ok(()), - Ok(Signal::CtrlC) => continue, - err => println!("err: {err:?}"), - } - } -} - #[tokio::main] async fn main() -> anyhow::Result<()> { usdt::register_probes().context("Failed to register USDT probes")?; @@ -636,9 +362,12 @@ async fn main() -> anyhow::Result<()> { ) .await?; } + #[cfg(feature = "sql")] Subcommand::Sql { opts } => { - sql_shell(args.address, args.port, log, opts).await? + crate::sql::sql_shell(args.address, args.port, log, opts).await? } + #[cfg(feature = "oxql")] + Subcommand::Oxql => crate::oxql::oxql_shell(args.address, args.port, log).await? } Ok(()) } diff --git a/oximeter/db/src/bin/oxdb/oxql.rs b/oximeter/db/src/bin/oxdb/oxql.rs new file mode 100644 index 00000000000..9808acfdb31 --- /dev/null +++ b/oximeter/db/src/bin/oxdb/oxql.rs @@ -0,0 +1,288 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! OxQL shell. + +// Copyright 2024 Oxide Computer + +use std::net::IpAddr; +use dropshot::WhichPage; +use dropshot::EmptyScanParams; +use oximeter_db::Client; +use slog::Logger; +use crate::make_client; +use reedline::Signal; +use reedline::DefaultPrompt; +use reedline::DefaultPromptSegment; +use reedline::Reedline; +use oximeter_db::oxql::query_parser; + +// Print help for the basic OxQL commands. +fn print_basic_commands() { + println!("Basic commands:"); + println!(" \\?, \\h, help - Print this help"); + println!(" \\q, quit, exit, ^D - Exit the shell"); + println!(" \\l - List timeseries"); + println!(" \\d - Describe a timeseries"); + println!(" \\ql [] - Get OxQL help about an operation"); + println!(); + println!("Or try entering an OxQL `get` query"); +} + +// Print high-level information about OxQL. +fn print_general_oxql_help() { + const HELP: &str = r#"Oximeter Query Language + +The Oximeter Query Language (OxQL) implements queries as +as sequence of operations. Each of these takes zero or more +timeseries as inputs, and produces zero or more timeseries +as outputs. Operations are chained together with the pipe +operator, "|". + +All queries start with a `get` operation, which selects a +timeseries from the database, by name. For example: + +`get physical_data_link:bytes_received` + +The support timeseries operations are: + +- get: Select a timeseries by name +- filter: Filter timeseries by field or sample values +- group_by: Group timeseries by fields, applying a reducer. + +Run `\ql ` to get specific help about that operation. + "#; + println!("{HELP}"); +} + +// Print help for a specific OxQL operation. +fn print_oxql_operation_help(op: &str) { + match op { + "get" => { + const HELP: &str = r#"get "); + +Get instances of a timeseries by name"#; + println!("{HELP}"); + } + "filter" => { + const HELP: &str = r#"filter "); + +Filter timeseries based on their attributes. + can be a logical combination of filtering. +\"atoms\", such as `field_foo > 0`. Expressions +may use any of the usual comparison operators, and +can be nested and combined with && or ||. + +Expressions must refer to the name of a field +for a timeseries at this time, and must compare +against literals. For example, `some_field > 0` +is supported, but `some_field > other_field` is not."#; + println!("{HELP}"); + } + "group_by" => { + const HELP: &str = r#"group_by [, ... ] +group_by [, ... ], + +Group timeseries by the named fields, optionally +specifying a reducer to use when aggregating the +timeseries within each group. If no reducer is +specified, `mean` is used, averaging the values +within each group. + +Current supported reducers: + - mean"#; + println!("{HELP}"); + } + _ => eprintln!("unrecognized OxQL operation: '{op}'"), + } +} + +// List the known timeseries. +async fn list_timeseries(client: &Client) -> anyhow::Result<()> { + let mut page = WhichPage::First(EmptyScanParams {}); + let limit = 100.try_into().unwrap(); + loop { + let results = client.timeseries_schema_list(&page, limit).await?; + for schema in results.items.iter() { + println!("{}", schema.timeseries_name); + } + if results.next_page.is_some() { + if let Some(last) = results.items.last() { + page = WhichPage::Next(last.timeseries_name.clone()); + } else { + return Ok(()); + } + } else { + return Ok(()); + } + } +} + +// Describe a single timeseries. +async fn describe_timeseries( + client: &Client, + timeseries: &str, +) -> anyhow::Result<()> { + match timeseries.parse() { + Err(_) => println!("Invalid timeseries name: {timeseries}"), + Ok(name) => { + if let Some(schema) = client.schema_for_timeseries(&name).await? { + let mut cols = + Vec::with_capacity(schema.field_schema.len() + 2); + let mut types = cols.clone(); + for field in schema.field_schema.iter() { + cols.push(field.name.clone()); + types.push(field.field_type.to_string()); + } + cols.push("timestamp".into()); + types.push("DateTime64".into()); + + if schema.datum_type.is_histogram() { + cols.push("start_time".into()); + types.push("DateTime64".into()); + + cols.push("bins".into()); + types.push(format!( + "Array[{}]", + schema + .datum_type + .to_string() + .strip_prefix("Histogram") + .unwrap() + .to_lowercase(), + )); + + cols.push("counts".into()); + types.push("Array[u64]".into()); + } else if schema.datum_type.is_cumulative() { + cols.push("start_time".into()); + types.push("DateTime64".into()); + cols.push("datum".into()); + types.push(schema.datum_type.to_string()); + } else { + cols.push("datum".into()); + types.push(schema.datum_type.to_string()); + } + + let mut builder = tabled::builder::Builder::default(); + builder.push_record(cols); // first record is the header + builder.push_record(types); + println!( + "{}", + builder.build().with(tabled::settings::Style::psql()) + ); + } else { + println!("No such timeseries: {timeseries}"); + } + } + } + Ok(()) +} + +/// Run the OxQL shell. +pub async fn oxql_shell( + address: IpAddr, + port: u16, + log: Logger, +) -> anyhow::Result<()> { + let client = make_client(address, port, &log).await?; + + // A workaround to ensure the client has all available timeseries when the + // shell starts. + let dummy = "foo:bar".parse().unwrap(); + let _ = client.schema_for_timeseries(&dummy).await; + + // Create the line-editor. + let mut ed = Reedline::create(); + let prompt = DefaultPrompt::new( + DefaultPromptSegment::Basic("0x".to_string()), + DefaultPromptSegment::Empty, + ); + println!("Oximeter Query Language shell"); + println!(); + print_basic_commands(); + loop { + let sig = ed.read_line(&prompt); + match sig { + Ok(Signal::Success(buf)) => { + let cmd = buf.as_str().trim(); + match cmd { + "" => continue, + "\\?" | "\\h" | "help" => print_basic_commands(), + "\\q" | "quit" | "exit" => return Ok(()), + "\\l" | "\\d" => list_timeseries(&client).await?, + _ => { + if let Some(timeseries_name) = cmd.strip_prefix("\\d") { + if timeseries_name.is_empty() { + list_timeseries(&client).await?; + } else { + describe_timeseries( + &client, + timeseries_name.trim().trim_end_matches(';'), + ) + .await?; + } + } else if let Some(stmt) = cmd.strip_prefix("\\ql") { + let stmt = stmt.trim(); + if stmt.is_empty() { + print_general_oxql_help(); + } else { + print_oxql_operation_help(stmt); + } + } else { + match query_parser::query(cmd.trim().trim_end_matches(';')) { + Ok(q) => { + println!("{cmd}"); + println!("{q:#?}"); + } + Err(e) => { + eprintln!("Invalid OxQL query: {e}"); + } + } + /* + match client.query(&buf).await { + Err(e) => println!("Query failed: {e:#?}"), + Ok(QueryResult { + original_query, + rewritten_query, + metadata, + table, + }) => { + println!(); + let mut builder = + tabled::builder::Builder::default(); + builder.push_record(&table.column_names); // first record is the header + for row in table.rows.iter() { + builder.push_record( + row.iter().map(ToString::to_string), + ); + } + if opts.print_original_query { + print_sql_query(&original_query); + } + if opts.print_rewritten_query { + print_sql_query(&rewritten_query); + } + println!( + "{}\n", + builder.build().with( + tabled::settings::Style::psql() + ) + ); + if opts.print_metadata { + print_query_metadata(&table, &metadata); + } + } + } + */ + } + } + } + } + Ok(Signal::CtrlD) => return Ok(()), + Ok(Signal::CtrlC) => continue, + err => println!("err: {err:?}"), + } + } +} diff --git a/oximeter/db/src/bin/oxdb/sql.rs b/oximeter/db/src/bin/oxdb/sql.rs new file mode 100644 index 00000000000..88ef522777d --- /dev/null +++ b/oximeter/db/src/bin/oxdb/sql.rs @@ -0,0 +1,298 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! SQL shell subcommand for `oxdb`. + +// Copyright 2024 Oxide Computer Company + +use crate::make_client; +use clap::Args; +use dropshot::EmptyScanParams; +use dropshot::WhichPage; +use oximeter_db::sql::function_allow_list; +use oximeter_db::Client; +use oximeter_db::QueryMetadata; +use oximeter_db::QueryResult; +use oximeter_db::Table; +use reedline::DefaultPrompt; +use reedline::DefaultPromptSegment; +use reedline::Reedline; +use reedline::Signal; +use slog::Logger; +use std::net::IpAddr; + +fn print_basic_commands() { + println!("Basic commands:"); + println!(" \\?, \\h, help - Print this help"); + println!(" \\q, quit, exit, ^D - Exit the shell"); + println!(" \\l - List tables"); + println!(" \\d
- Describe a table"); + println!( + " \\f - List or describe ClickHouse SQL functions" + ); + println!(); + println!("Or try entering a SQL `SELECT` statement"); +} + +async fn list_virtual_tables(client: &Client) -> anyhow::Result<()> { + let mut page = WhichPage::First(EmptyScanParams {}); + let limit = 100.try_into().unwrap(); + loop { + let results = client.timeseries_schema_list(&page, limit).await?; + for schema in results.items.iter() { + println!("{}", schema.timeseries_name); + } + if results.next_page.is_some() { + if let Some(last) = results.items.last() { + page = WhichPage::Next(last.timeseries_name.clone()); + } else { + return Ok(()); + } + } else { + return Ok(()); + } + } +} + +async fn describe_virtual_table( + client: &Client, + table: &str, +) -> anyhow::Result<()> { + match table.parse() { + Err(_) => println!("Invalid timeseries name: {table}"), + Ok(name) => { + if let Some(schema) = client.schema_for_timeseries(&name).await? { + let mut cols = + Vec::with_capacity(schema.field_schema.len() + 2); + let mut types = cols.clone(); + for field in schema.field_schema.iter() { + cols.push(field.name.clone()); + types.push(field.field_type.to_string()); + } + cols.push("timestamp".into()); + types.push("DateTime64".into()); + + if schema.datum_type.is_histogram() { + cols.push("start_time".into()); + types.push("DateTime64".into()); + + cols.push("bins".into()); + types.push(format!( + "Array[{}]", + schema + .datum_type + .to_string() + .strip_prefix("Histogram") + .unwrap() + .to_lowercase(), + )); + + cols.push("counts".into()); + types.push("Array[u64]".into()); + } else if schema.datum_type.is_cumulative() { + cols.push("start_time".into()); + types.push("DateTime64".into()); + cols.push("datum".into()); + types.push(schema.datum_type.to_string()); + } else { + cols.push("datum".into()); + types.push(schema.datum_type.to_string()); + } + + let mut builder = tabled::builder::Builder::default(); + builder.push_record(cols); // first record is the header + builder.push_record(types); + println!( + "{}", + builder.build().with(tabled::settings::Style::psql()) + ); + } else { + println!("No such timeseries: {table}"); + } + } + } + Ok(()) +} + +#[derive(Clone, Debug, Args)] +pub struct ShellOptions { + /// Print query metadata. + #[clap(long = "metadata")] + print_metadata: bool, + /// Print the original SQL query. + #[clap(long = "original")] + print_original_query: bool, + /// Print the rewritten SQL query that is actually run on the DB. + #[clap(long = "rewritten")] + print_rewritten_query: bool, + /// Print the transformed query, but do not run it. + #[clap(long)] + transform: Option, +} + +impl Default for ShellOptions { + fn default() -> Self { + Self { + print_metadata: true, + print_original_query: false, + print_rewritten_query: false, + transform: None, + } + } +} + +fn list_supported_functions() { + println!("Subset of ClickHouse SQL functions currently supported"); + println!( + "See https://clickhouse.com/docs/en/sql-reference/functions for more" + ); + println!(); + for func in function_allow_list().iter() { + println!(" {func}"); + } +} + +fn show_supported_function(name: &str) { + if let Some(func) = function_allow_list().iter().find(|f| f.name == name) { + println!("{}", func.name); + println!(" {}", func.usage); + println!(" {}", func.description); + } else { + println!("No supported function '{name}'"); + } +} + +fn print_sql_query(query: &str) { + println!( + "{}", + sqlformat::format( + &query, + &sqlformat::QueryParams::None, + sqlformat::FormatOptions { uppercase: true, ..Default::default() } + ) + ); + println!(); +} + +fn print_query_metadata(table: &Table, metadata: &QueryMetadata) { + println!("Metadata"); + println!(" Query ID: {}", metadata.id); + println!(" Result rows: {}", table.rows.len()); + println!(" Time: {:?}", metadata.elapsed); + println!(" Read: {}\n", metadata.summary.read); +} + +pub async fn sql_shell( + address: IpAddr, + port: u16, + log: Logger, + opts: ShellOptions, +) -> anyhow::Result<()> { + let client = make_client(address, port, &log).await?; + + // A workaround to ensure the client has all available timeseries when the + // shell starts. + let dummy = "foo:bar".parse().unwrap(); + let _ = client.schema_for_timeseries(&dummy).await; + + // Possibly just transform the query, but do not execute it. + if let Some(query) = &opts.transform { + let transformed = client.transform_query(query).await?; + println!( + "{}", + sqlformat::format( + &transformed, + &sqlformat::QueryParams::None, + sqlformat::FormatOptions { + uppercase: true, + ..Default::default() + } + ) + ); + return Ok(()); + } + + let mut ed = Reedline::create(); + let prompt = DefaultPrompt::new( + DefaultPromptSegment::Basic("0x".to_string()), + DefaultPromptSegment::Empty, + ); + println!("Oximeter SQL shell"); + println!(); + print_basic_commands(); + loop { + let sig = ed.read_line(&prompt); + match sig { + Ok(Signal::Success(buf)) => { + let cmd = buf.as_str().trim(); + match cmd { + "" => continue, + "\\?" | "\\h" | "help" => print_basic_commands(), + "\\q" | "quit" | "exit" => return Ok(()), + "\\l" | "\\d" => list_virtual_tables(&client).await?, + _ => { + if let Some(table_name) = cmd.strip_prefix("\\d") { + if table_name.is_empty() { + list_virtual_tables(&client).await?; + } else { + describe_virtual_table( + &client, + table_name.trim().trim_end_matches(';'), + ) + .await?; + } + } else if let Some(func_name) = cmd.strip_prefix("\\f") + { + if func_name.is_empty() { + list_supported_functions(); + } else { + show_supported_function( + func_name.trim().trim_end_matches(';'), + ); + } + } else { + match client.query(&buf).await { + Err(e) => println!("Query failed: {e:#?}"), + Ok(QueryResult { + original_query, + rewritten_query, + metadata, + table, + }) => { + println!(); + let mut builder = + tabled::builder::Builder::default(); + builder.push_record(&table.column_names); // first record is the header + for row in table.rows.iter() { + builder.push_record( + row.iter().map(ToString::to_string), + ); + } + if opts.print_original_query { + print_sql_query(&original_query); + } + if opts.print_rewritten_query { + print_sql_query(&rewritten_query); + } + println!( + "{}\n", + builder.build().with( + tabled::settings::Style::psql() + ) + ); + if opts.print_metadata { + print_query_metadata(&table, &metadata); + } + } + } + } + } + } + } + Ok(Signal::CtrlD) => return Ok(()), + Ok(Signal::CtrlC) => continue, + err => println!("err: {err:?}"), + } + } +} diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs index ca996dc8947..9a1251d5559 100644 --- a/oximeter/db/src/client.rs +++ b/oximeter/db/src/client.rs @@ -4,11 +4,10 @@ //! Rust client to ClickHouse database -// Copyright 2023 Oxide Computer Company +// Copyright 2024 Oxide Computer Company use crate::model; use crate::query; -use crate::sql::RestrictedQuery; use crate::Error; use crate::Metric; use crate::Target; @@ -23,11 +22,9 @@ use dropshot::EmptyScanParams; use dropshot::PaginationOrder; use dropshot::ResultsPage; use dropshot::WhichPage; -use indexmap::IndexMap; use oximeter::types::Sample; use regex::Regex; use regex::RegexBuilder; -use reqwest::header::HeaderMap; use slog::debug; use slog::error; use slog::info; @@ -44,8 +41,6 @@ use std::ops::Bound; use std::path::Path; use std::path::PathBuf; use std::sync::OnceLock; -use std::time::Duration; -use std::time::Instant; use tokio::fs; use tokio::sync::Mutex; use uuid::Uuid; @@ -56,136 +51,153 @@ mod probes { fn query__done(_: &usdt::UniqueId) {} } -/// A count of bytes / rows accessed during a query. -#[derive(Clone, Copy, Debug)] -pub struct IoCount { - pub bytes: u64, - pub rows: u64, -} +#[cfg(any(feature = "sql", test))] +mod sql_types { + pub use crate::sql::RestrictedQuery; + use crate::Error; + pub use indexmap::IndexMap; + use reqwest::header::HeaderMap; + use std::time::Duration; + pub use std::time::Instant; + use uuid::Uuid; -impl std::fmt::Display for IoCount { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{} rows ({} bytes)", self.rows, self.bytes) + /// A count of bytes / rows accessed during a query. + #[derive(Clone, Copy, Debug)] + pub struct IoCount { + pub bytes: u64, + pub rows: u64, } -} -/// Summary of the I/O and duration of a query. -#[derive(Clone, Copy, Debug, serde::Deserialize)] -#[serde(try_from = "serde_json::Value")] -pub struct QuerySummary { - /// The bytes and rows read by the query. - pub read: IoCount, - /// The bytes and rows written by the query. - pub written: IoCount, -} + impl std::fmt::Display for IoCount { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{} rows ({} bytes)", self.rows, self.bytes) + } + } -impl TryFrom for QuerySummary { - type Error = Error; + /// Summary of the I/O and duration of a query. + #[derive(Clone, Copy, Debug, serde::Deserialize)] + #[serde(try_from = "serde_json::Value")] + pub struct QuerySummary { + /// The bytes and rows read by the query. + pub read: IoCount, + /// The bytes and rows written by the query. + pub written: IoCount, + } - fn try_from(j: serde_json::Value) -> Result { - use serde_json::Map; - use serde_json::Value; - use std::str::FromStr; + impl TryFrom for QuerySummary { + type Error = Error; - let Value::Object(map) = j else { - return Err(Error::Database(String::from( - "Expected a JSON object for a metadata summary", - ))); - }; + fn try_from(j: serde_json::Value) -> Result { + use serde_json::Map; + use serde_json::Value; + use std::str::FromStr; - fn unpack_summary_value( - map: &Map, - key: &str, - ) -> Result - where - T: FromStr, - ::Err: std::error::Error, - { - let value = map.get(key).ok_or_else(|| { - Error::MissingHeaderKey { key: key.to_string() } - })?; - let Value::String(v) = value else { - return Err(Error::BadMetadata { - key: key.to_string(), - msg: String::from("Expected a string value"), - }); + let Value::Object(map) = j else { + return Err(Error::Database(String::from( + "Expected a JSON object for a metadata summary", + ))); }; - v.parse::().map_err(|e| Error::BadMetadata { - key: key.to_string(), - msg: e.to_string(), + + fn unpack_summary_value( + map: &Map, + key: &str, + ) -> Result + where + T: FromStr, + ::Err: std::error::Error, + { + let value = map.get(key).ok_or_else(|| { + Error::MissingHeaderKey { key: key.to_string() } + })?; + let Value::String(v) = value else { + return Err(Error::BadMetadata { + key: key.to_string(), + msg: String::from("Expected a string value"), + }); + }; + v.parse::().map_err(|e| Error::BadMetadata { + key: key.to_string(), + msg: e.to_string(), + }) + } + let rows_read: u64 = unpack_summary_value(&map, "read_rows")?; + let bytes_read: u64 = unpack_summary_value(&map, "read_bytes")?; + let rows_written: u64 = unpack_summary_value(&map, "written_rows")?; + let bytes_written: u64 = + unpack_summary_value(&map, "written_bytes")?; + Ok(Self { + read: IoCount { bytes: bytes_read, rows: rows_read }, + written: IoCount { bytes: bytes_written, rows: rows_written }, }) } - let rows_read: u64 = unpack_summary_value(&map, "read_rows")?; - let bytes_read: u64 = unpack_summary_value(&map, "read_bytes")?; - let rows_written: u64 = unpack_summary_value(&map, "written_rows")?; - let bytes_written: u64 = unpack_summary_value(&map, "written_bytes")?; - Ok(Self { - read: IoCount { bytes: bytes_read, rows: rows_read }, - written: IoCount { bytes: bytes_written, rows: rows_written }, - }) } -} - -/// Basic metadata about the resource usage of a single SQL query. -#[derive(Clone, Copy, Debug)] -pub struct QueryMetadata { - /// The database-assigned query ID. - pub id: Uuid, - /// The total duration of the query (network plus execution). - pub elapsed: Duration, - /// Summary of the data read and written. - pub summary: QuerySummary, -} -impl QueryMetadata { - fn from_headers( - elapsed: Duration, - headers: &HeaderMap, - ) -> Result { - fn get_header<'a>( - map: &'a HeaderMap, - key: &'a str, - ) -> Result<&'a str, Error> { - let hdr = map.get(key).ok_or_else(|| Error::MissingHeaderKey { - key: key.to_string(), - })?; - std::str::from_utf8(hdr.as_bytes()) - .map_err(|err| Error::Database(err.to_string())) + /// Basic metadata about the resource usage of a single SQL query. + #[derive(Clone, Copy, Debug)] + pub struct QueryMetadata { + /// The database-assigned query ID. + pub id: Uuid, + /// The total duration of the query (network plus execution). + pub elapsed: Duration, + /// Summary of the data read and written. + pub summary: QuerySummary, + } + + impl QueryMetadata { + pub(crate) fn from_headers( + elapsed: Duration, + headers: &HeaderMap, + ) -> Result { + fn get_header<'a>( + map: &'a HeaderMap, + key: &'a str, + ) -> Result<&'a str, Error> { + let hdr = map.get(key).ok_or_else(|| { + Error::MissingHeaderKey { key: key.to_string() } + })?; + std::str::from_utf8(hdr.as_bytes()) + .map_err(|err| Error::Database(err.to_string())) + } + let summary = serde_json::from_str(get_header( + headers, + "X-ClickHouse-Summary", + )?) + .map_err(|err| Error::Database(err.to_string()))?; + let id = get_header(headers, "X-ClickHouse-Query-Id")? + .parse() + .map_err(|err: uuid::Error| Error::Database(err.to_string()))?; + Ok(Self { id, elapsed, summary }) } - let summary = - serde_json::from_str(get_header(headers, "X-ClickHouse-Summary")?) - .map_err(|err| Error::Database(err.to_string()))?; - let id = get_header(headers, "X-ClickHouse-Query-Id")? - .parse() - .map_err(|err: uuid::Error| Error::Database(err.to_string()))?; - Ok(Self { id, elapsed, summary }) } -} -/// A tabular result from a SQL query against a timeseries. -#[derive(Clone, Debug, Default, serde::Serialize)] -pub struct Table { - /// The name of each column in the result set. - pub column_names: Vec, - /// The rows of the result set, one per column. - pub rows: Vec>, + /// A tabular result from a SQL query against a timeseries. + #[derive(Clone, Debug, Default, serde::Serialize)] + pub struct Table { + /// The name of each column in the result set. + pub column_names: Vec, + /// The rows of the result set, one per column. + pub rows: Vec>, + } + + /// The full result of running a SQL query against a timeseries. + #[derive(Clone, Debug)] + pub struct QueryResult { + /// The query as written by the client. + pub original_query: String, + /// The rewritten query, run against the JOINed representation of the + /// timeseries. + /// + /// This is the query that is actually run in the database itself. + pub rewritten_query: String, + /// Metadata about the resource usage of the query. + pub metadata: QueryMetadata, + /// The result of the query, with column names and rows. + pub table: Table, + } } -/// The full result of running a SQL query against a timeseries. -#[derive(Clone, Debug)] -pub struct QueryResult { - /// The query as written by the client. - pub original_query: String, - /// The rewritten query, run against the JOINed representation of the - /// timeseries. - /// - /// This is the query that is actually run in the database itself. - pub rewritten_query: String, - /// Metadata about the resource usage of the query. - pub metadata: QueryMetadata, - /// The result of the query, with column names and rows. - pub table: Table, -} +#[cfg(any(feature = "sql", test))] +pub use sql_types::*; /// A `Client` to the ClickHouse metrics database. #[derive(Debug)] @@ -226,6 +238,7 @@ impl Client { } /// Transform a SQL query against a timeseries, but do not execute it. + #[cfg(any(feature = "sql", test))] pub async fn transform_query( &self, query: impl AsRef, @@ -235,6 +248,7 @@ impl Client { } /// Run a SQL query against a timeseries. + #[cfg(any(feature = "sql", test))] pub async fn query( &self, query: impl AsRef, @@ -4481,6 +4495,7 @@ mod tests { logctx.cleanup_successful(); } + #[cfg(any(feature = "sql", test))] #[tokio::test] async fn test_sql_query_output() { let logctx = test_setup_log("test_sql_query_output"); diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index 24f7d8c2d0a..3959a4741a4 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -4,7 +4,7 @@ //! Tools for interacting with the control plane telemetry database. -// Copyright 2023 Oxide Computer Company +// Copyright 2024 Oxide Computer Company use crate::query::StringFieldSelector; use chrono::DateTime; @@ -32,13 +32,19 @@ use thiserror::Error; mod client; pub mod model; +#[cfg(feature = "oxql")] +pub mod oxql; pub mod query; +#[cfg(any(feature = "sql", test))] pub mod sql; pub use client::Client; pub use client::DbWrite; +#[cfg(feature = "sql")] pub use client::QueryMetadata; +#[cfg(feature = "sql")] pub use client::QueryResult; +#[cfg(feature = "sql")] pub use client::Table; pub use model::OXIMETER_VERSION; @@ -134,6 +140,7 @@ pub enum Error { #[error("Schema update versions must be sequential without gaps")] NonSequentialSchemaVersions, + #[cfg(any(feature = "sql", test))] #[error("SQL error")] Sql(#[from] sql::Error), } diff --git a/oximeter/db/src/oxql/ast.rs b/oximeter/db/src/oxql/ast.rs new file mode 100644 index 00000000000..0be299758d9 --- /dev/null +++ b/oximeter/db/src/oxql/ast.rs @@ -0,0 +1,254 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! The OxQL AST. + +// Copyright 2024 Oxide Computer Company + +use oximeter::TimeseriesName; +use std::fmt; +use std::time::Duration; +use uuid::Uuid; + +/// An AST node like: `get foo:bar` +#[derive(Clone, Debug, PartialEq)] +pub struct Get { + pub timeseries_name: TimeseriesName, +} + +#[derive(Clone, Debug, PartialEq)] +pub enum TimeseriesTransformation { + Filter(Filter), + GroupBy(GroupBy), +} + +#[derive(Clone, Debug, PartialEq)] +pub struct GroupBy { + pub identifiers: Vec, + pub reducer: Reducer, +} + +#[derive(Clone, Debug, Default, PartialEq)] +pub enum Reducer { + #[default] + Mean, +} + +/// A top-level Query AST node. +#[derive(Clone, Debug, PartialEq)] +pub struct Query { + pub get: Get, + pub transformations: Vec, +} + +/// A literal value. +#[derive(Clone, Debug, PartialEq)] +pub enum Literal { + Integer(i64), + Double(f64), + String(String), + Boolean(bool), + Uuid(Uuid), + Duration(Duration), +} + +impl fmt::Display for Literal { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Literal::Integer(inner) => write!(f, "{inner}"), + Literal::Double(inner) => write!(f, "{inner}"), + Literal::String(inner) => write!(f, "{inner:?}"), + Literal::Boolean(inner) => write!(f, "{inner}"), + Literal::Uuid(inner) => write!(f, "\"{inner}\""), + Literal::Duration(inner) => write!(f, "{inner:?}"), + } + } +} + +/// An identifier, such as a column or function name. +#[derive(Clone, Debug, PartialEq)] +pub struct Ident(pub(crate) String); + +impl fmt::Display for Ident { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +/// Logical operators. +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum LogicalOp { + And, + Or, +} + +impl fmt::Display for LogicalOp { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "{}", + match self { + LogicalOp::And => "&&", + LogicalOp::Or => "||", + } + ) + } +} + +/// Comparison operators. +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum Comparison { + Eq, + Ne, + Gt, + Ge, + Lt, + Le, + Like, +} + +impl fmt::Display for Comparison { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "{}", + match self { + Comparison::Eq => "==", + Comparison::Ne => "!=", + Comparison::Gt => ">", + Comparison::Ge => ">=", + Comparison::Lt => "<", + Comparison::Le => "<=", + Comparison::Like => "~=", + } + ) + } +} + +/// A more complicated expression as part of a filtering operation. +/// +/// E.g., the `hostname == "bar" && id == "baz"` in the below. +// NOTE: This should really be extended to a generic binary op expression. +#[derive(Clone, Debug, PartialEq)] +pub struct FilterExpr { + pub left: Box, + pub op: LogicalOp, + pub right: Box, +} + +/// An atom of a filtering expression. +/// +/// E.g, the `hostname == "foo"` in the below. +#[derive(Clone, Debug, PartialEq)] +pub struct FilterAtom { + pub negated: bool, + pub ident: Ident, + pub cmp: Comparison, + pub expr: Literal, +} + +impl fmt::Display for FilterAtom { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let bang = if self.negated { "!" } else { "" }; + write!(f, "{}({} {} {})", bang, self.ident, self.cmp, self.expr,) + } +} + +/// A single item in a list of filtering expressions. +/// +/// E.g, the `hostname == "foo"` in the below. +#[derive(Clone, Debug, PartialEq)] +pub enum FilterItem { + Atom(FilterAtom), + Expr(FilterExpr), +} + +/// A `filter` table operation. +/// +/// E.g., `filter hostname == "foo" || (hostname == "bar" && id == "baz")`. +#[derive(Clone, Debug, PartialEq)] +pub struct Filter { + pub item: FilterItem, +} + +#[cfg(test)] +mod tests { + /* + use crate::oxql::ast::oxql::GetParser; + use crate::oxql::ast::oxql::FilterOpParser; + use crate::oxql::ast::FilterOp; + use crate::oxql::ast::oxql::NumericParser; + use crate::oxql::ast::Numeric; + use crate::oxql::ast::oxql::ColumnNameParser; + use crate::oxql::ast::ColumnName; + + #[test] + fn test_get_parser() { + let get = GetParser::new().parse("get some:timeseries").unwrap(); + assert_eq!(get.timeseries_name, "some:timeseries"); + } + + #[test] + fn test_filter_op_parser() { + let items = [ + ("==", FilterOp::Eq), + ("!=", FilterOp::Ne), + (">=", FilterOp::Ge), + (">", FilterOp::Gt), + ("<=", FilterOp::Le), + ("<", FilterOp::Lt), + ("~=", FilterOp::Like), + ]; + let parser = FilterOpParser::new(); + for (item, op) in items { + assert_eq!(parser.parse(item).unwrap(), op); + } + } + + #[test] + fn test_numeric_parser() { + let parser = NumericParser::new(); + for x in -10..10 { + assert_eq!(parser.parse(&x.to_string()).unwrap(), Numeric::Int(x)); + let f = x as f64 + 0.5; + assert_eq!(parser.parse(&f.to_string()).unwrap(), Numeric::Float(f)); + } + } + + #[test] + fn test_column_name_parser() { + let parser = ColumnNameParser::new(); + let ok = [ + "name", + "another_name", + "weird_0_but_ok", + "i_guess0", + ]; + for each in ok { + let p = parser.parse(each).expect(&format!("{each} should parse as a ColumnName")); + assert_eq!(p, ColumnName(each.to_string())); + } + + let not_ok = [ + "kebab-case", + "NO_CAPS_FOR_YOU", + "(something)", + "000", + ]; + for each in not_ok { + assert!(parser.parse(each).is_err(), "{each} should not parse as a ColumnName"); + } + } + + #[test] + fn test_filter_expr_parser() { + todo!(); + } + + #[test] + fn test_query_parser() { + todo!(); + } + */ +} diff --git a/oximeter/db/src/oxql/grammar.rs b/oximeter/db/src/oxql/grammar.rs new file mode 100644 index 00000000000..6afd08f3801 --- /dev/null +++ b/oximeter/db/src/oxql/grammar.rs @@ -0,0 +1,820 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Grammar for the Oximeter Query Language (OxQL). + +// Copyright 2024 Oxide Computer + +peg::parser! { + pub grammar query_parser() for str { + use crate::oxql::ast::Comparison; + use crate::oxql::ast::Filter; + use crate::oxql::ast::FilterAtom; + use crate::oxql::ast::FilterExpr; + use crate::oxql::ast::FilterItem; + use crate::oxql::ast::Get; + use crate::oxql::ast::GroupBy; + use crate::oxql::ast::Ident; + use crate::oxql::ast::Literal; + use crate::oxql::ast::LogicalOp; + use crate::oxql::ast::Query; + use crate::oxql::ast::Reducer; + use crate::oxql::ast::TimeseriesTransformation; + use oximeter::TimeseriesName; + use std::time::Duration; + use uuid::Uuid; + + rule _ = quiet!{[' ' | '\n' | '\t']+} + + // Parse boolean literals. + rule true_literal() -> bool = "true" { true } + rule false_literal() -> bool = "false" { false } + pub(in super) rule boolean_literal_impl() -> bool + = true_literal() / false_literal() + + pub rule boolean_literal() -> Literal + = b:boolean_literal_impl() { Literal::Boolean(b) } + + // Parse duration literals. + rule year() -> Duration + = "y" { Duration::from_secs(60 * 60 * 24 * 365) } + rule month() -> Duration + = "M" { Duration::from_secs(60 * 60 * 24 * 30) } + rule week() -> Duration + = "w" { Duration::from_secs(60 * 60 * 24 * 7) } + rule day() -> Duration + = "d" { Duration::from_secs(60 * 60 * 24) } + rule hour() -> Duration + = "h" { Duration::from_secs(60 * 60) } + rule minute() -> Duration + = "m" { Duration::from_secs(60) } + rule second() -> Duration + = "s" { Duration::from_secs(1) } + rule millisecond() -> Duration + = "ms" { Duration::from_millis(1) } + rule microsecond() -> Duration + = "us" { Duration::from_micros(1) } + rule nanosecond() -> Duration + = "ns" { Duration::from_nanos(1) } + pub(in super) rule duration_literal_impl() -> Duration + = count:integer_literal_impl() base:( + year() / + month() / + week() / day() / + hour() / + millisecond() / + minute() / + second() / + microsecond() / + nanosecond() + ) {? + let Ok(count) = u32::try_from(count) else { + return Err("invalid count for duration literal"); + }; + base.checked_mul(count).ok_or("overflowed duration literal") + } + + /// Parse a literal duration from a string. + /// + /// Durations are written as a positive integer multiple of a base time + /// unit. For example, `7s` is interpreted as 7 seconds. Supported units + /// are: + /// + /// - 'y': an approximate year, 365 days + /// - 'M': an approximate month, 30 days + /// - 'w': a week, 7 days + /// - 'h': an hour, 3600 seconds + /// - 'm': a minute, 60 seconds + /// - 's': seconds + /// - 'ms': milliseconds + /// - 'us': microseconds + /// - 'ns': nanoseconds + pub rule duration_literal() -> Literal + = d:duration_literal_impl() { Literal::Duration(d) } + + rule dashed_uuid_literal() -> Uuid + = s:$( + "\"" + ['a'..='f' | '0'..='9']*<8> "-" + ['a'..='f' | '0'..='9']*<4> "-" + ['a'..='f' | '0'..='9']*<4> "-" + ['a'..='f' | '0'..='9']*<4> "-" + ['a'..='f' | '0'..='9']*<12> + "\"" + ) {? + let Some(middle) = s.get(1..37) else { + return Err("invalid UUID literal"); + }; + middle.parse().or(Err("invalid UUID literal")) + } + rule undashed_uuid_literal() -> Uuid + = s:$("\"" ['a'..='f' | '0'..='9']*<32> "\"") {? + let Some(middle) = s.get(1..33) else { + return Err("invalid UUID literal"); + }; + middle.parse().or(Err("invalid UUID literal")) + } + pub(in super) rule uuid_literal_impl() -> Uuid + = dashed_uuid_literal() / undashed_uuid_literal() + + /// Parse UUID literals. + /// + /// UUIDs should be quoted with `"` and can include or omit dashes + /// between the segments. Both of the following are equivalent. + /// + /// "fc59ab26-f1d8-44ca-abbc-dd8f61321433" + /// "fc59ab26f1d844caabbcdd8f61321433" + pub rule uuid_literal() -> Literal + = id:uuid_literal_impl() { Literal::Uuid(id) } + + // Parse string literals. + rule any_but_single_quote() -> String + = s:$([^'\'']*) + {? + recognize_escape_sequences(s).ok_or("invalid single quoted string") + } + + rule any_but_double_quote() -> String + = s:$([^'"']*) + {? + recognize_escape_sequences(s).ok_or("invalid double quoted string") + } + + rule single_quoted_string() -> String + = "'" s:any_but_single_quote() "'" { s } + + rule double_quoted_string() -> String + = "\"" s:any_but_double_quote() "\"" { s } + + pub(in super) rule string_literal_impl() -> String + = single_quoted_string() / double_quoted_string() + + /// Parse a string literal, either single- or double-quoted. + /// + /// Parsing string literals is pretty tricky, but we add several + /// constraints to simplify things. First strings must be quoted, either + /// with single- or double-quotes. E.g., the strings `"this"` and + /// `'this'` parse the same way. + /// + /// We require that the string not _contain_ its quote-style, so there + /// can't be any embedded single-quotes in a single-quoted string, or + /// double-quotes in a double-quoted string. Each quote-style may contain + /// the quote from the other style. + /// + /// We support the following common escape sequences: + /// + /// ```ignore + /// \n + /// \r + /// \t + /// \\ + /// \0 + /// ``` + /// + /// Beyond this, any valid Unicode code point, written in the usual Rust + /// style, is supported. For example, `\u{1234}` is accepted and mapped + /// to `ሴ` upon parsing. This also allows users to write both quote + /// styles if required, by writing them as their Unicode escape + /// sequences. For example, this string: + /// + /// ```ignore + /// "this string has \u{22} in it" + /// ``` + /// + /// Will be parsed as `this string has " in it`. + pub rule string_literal() -> Literal + = s:string_literal_impl() { Literal::String(s) } + + pub(in super) rule integer_literal_impl() -> i64 + = n:$("-"? ['0'..='9']+ !['e' | 'E' | '.']) {? n.parse().or(Err("integer literal")) } + + /// Parse integer literals. + pub rule integer_literal() -> Literal + = n:integer_literal_impl() { Literal::Integer(n) } + + // We're being a bit lazy here, since the rule expression isn't exactly + // right. But we rely on calling `f64`'s `FromStr` implementation to + // actually verify the values can be parsed. + pub(in super) rule double_literal_impl() -> f64 + = n:$("-"? ['0'..='9']* "."? ['0'..='9']* (['e' | 'E'] "-"? ['0'..='9']+)*) {? + n.parse().or(Err("double literal")) + } + + // Parse double literals. + pub rule double_literal() -> Literal + = d:double_literal_impl() { Literal::Double(d) } + + /// Parse a literal. + /// + /// Literals are typed, with support for bools, durations, integers and + /// doubles, UUIDs, and general strings. See the rules for each type of + /// literal for details on supported formats. + pub rule literal() -> Literal + = lit:( + boolean_literal() / + duration_literal() / + integer_literal() / + double_literal() / + uuid_literal() / + string_literal() + ) + { + lit + } + + pub(in super) rule logical_op_impl() -> LogicalOp + = "||" { LogicalOp::Or} / "&&" { LogicalOp::And } + + // Parse a filter item, which is one logical expression used in a filter + // operation. + #[cache_left_rec] + pub(in super) rule filter_item() -> FilterItem = precedence! { + // Note: We need to separate the logical operations into different + // levels of precedence. + left:(@) _? "||" _? right:@ { + FilterItem::Expr(FilterExpr { + left: Box::new(left), + op: LogicalOp::Or, + right: Box::new(right), + }) + } + -- + left:(@) _? "&&" _? right:@ { + FilterItem::Expr(FilterExpr { + left: Box::new(left), + op: LogicalOp::And, + right: Box::new(right), + }) + } + -- + a:filter_atom() { FilterItem::Atom(a) } + "(" e:filter_item() ")" { e } + } + + rule filter_atom() -> FilterAtom + = atom:(negated_filter_atom() / unnegated_filter_atom()) + { + atom + } + + rule unnegated_filter_atom() -> FilterAtom + = ident:ident() _? cmp:comparison() _? expr:literal() + { + FilterAtom { negated: false, ident, cmp, expr } + } + + rule negated_filter_atom() -> FilterAtom + = "!(" _? ident:ident() _? cmp:comparison() _? expr:literal() _? ")" + { + FilterAtom { negated: true, ident, cmp, expr } + } + + /// Parse a "filter" table operation. + pub rule filter() -> Filter + = "filter" _ item:filter_item() + { + Filter { item } + } + + pub(in super) rule ident_impl() -> &'input str + = inner:$(['a'..='z']+ ['a'..='z' | '0'..='9']* ("_" ['a'..='z' | '0'..='9']+)*) + + /// Parse an identifier, usually a column name. + pub rule ident() -> Ident + = inner:ident_impl() { Ident(inner.to_string()) } + + pub(in super) rule comparison() -> Comparison + = "==" { Comparison::Eq } + / "!=" { Comparison::Ne } + / ">=" { Comparison::Ge } + / ">" { Comparison::Gt } + / "<=" { Comparison::Le } + / "<" { Comparison::Lt } + / "~=" { Comparison::Like } + + pub rule timeseries_name() -> TimeseriesName + = target_name:ident_impl() ":" metric_name:ident_impl() + {? + format!("{target_name}:{metric_name}") + .try_into() + .map_err(|_| "invalid timeseries name") + } + + /// Parse a "get" table operation. + pub rule get() -> Get + = "get" _ timeseries_name:timeseries_name() + { + Get { timeseries_name } + } + + /// Parse a reducing operation by name. + pub rule reducer() -> Reducer + = "mean" { Reducer::Mean } + / expected!("a reducer name") + + rule ws_with_comma() = _? "," _? + pub rule group_by() -> GroupBy + = "group_by" + _ + "[" _? identifiers:(ident() ** ws_with_comma()) ","? _? "]" + reducer:("," _? red:reducer() { red })? + { + GroupBy { + identifiers, + reducer: reducer.unwrap_or_default(), + } + } + + pub(in super) rule timeseries_transformation() -> TimeseriesTransformation + = f:filter() { TimeseriesTransformation::Filter(f) } + / g:group_by() { TimeseriesTransformation::GroupBy(g) } + + /// Parse a top-level OxQL query. + /// + /// Queries always start with a "get" operation, and may be followed by + /// any number of other timeseries transformations + pub rule query() -> Query + = get:get() transformations:(_ "|" _? tr:timeseries_transformation() { tr })* + { + Query { get, transformations } + } + } +} + +// Recognize escape sequences and convert them into the intended Unicode point +// they represent. +// +// For example, the string containing ASCII "abcd" is returned unchanged. +// +// The string containing "\u{1234}" is returned as the string "ሴ". Note that the +// Unicode bytes must be enclosed in {}, and can have length 1-6. +// +// If the string contains an invalid escape sequence, such as "\uFFFF", or a +// control code, such as `\u07`, `None` is returned. +fn recognize_escape_sequences(s: &str) -> Option { + let mut out = String::with_capacity(s.len()); + + let mut chars = s.chars().peekable(); + while let Some(ch) = chars.next() { + match ch { + '\\' => { + let Some(next_ch) = chars.next() else { + // Escape at the end of the string + return None; + }; + match next_ch { + 'n' => out.push('\n'), + 'r' => out.push('\r'), + 't' => out.push('\t'), + '\\' => out.push('\\'), + '0' => out.push('\0'), + 'u' => { + // We need this to be delimited by {}, and between 1 and + // 6 characters long. + if !matches!(chars.next(), Some('{')) { + return None; + } + + let mut digits = String::with_capacity(6); + let mut found_closing_brace = false; + while !found_closing_brace && digits.len() < 7 { + // Take the next value, if it's a hex digit or the + // closing brace. + let Some(next) = chars.next_if(|ch| { + ch.is_ascii_hexdigit() || *ch == '}' + }) else { + break; + }; + if next.is_ascii_hexdigit() { + digits.push(next); + continue; + } + found_closing_brace = true; + } + if !found_closing_brace { + return None; + } + let val = u32::from_str_radix(&digits, 16).ok()?; + let decoded = char::from_u32(val)?; + out.push(decoded) + } + _ => return None, + } + } + _ => out.push(ch), + } + } + Some(out) +} + +#[cfg(test)] +mod tests { + use super::query_parser; + use crate::oxql::ast::Comparison; + use crate::oxql::ast::FilterAtom; + use crate::oxql::ast::FilterExpr; + use crate::oxql::ast::FilterItem; + use crate::oxql::ast::Ident; + use crate::oxql::ast::Literal; + use crate::oxql::ast::LogicalOp; + use crate::oxql::ast::Reducer; + use crate::oxql::grammar::recognize_escape_sequences; + use std::time::Duration; + use uuid::Uuid; + + #[test] + fn test_boolean_literal() { + assert_eq!(query_parser::boolean_literal_impl("true").unwrap(), true); + assert_eq!(query_parser::boolean_literal_impl("false").unwrap(), false); + } + + #[test] + fn test_duration_literal() { + for (as_str, dur) in [ + ("7y", Duration::from_secs(60 * 60 * 24 * 365 * 7)), + ("7M", Duration::from_secs(60 * 60 * 24 * 30 * 7)), + ("7w", Duration::from_secs(60 * 60 * 24 * 7 * 7)), + ("7d", Duration::from_secs(60 * 60 * 24 * 7)), + ("7h", Duration::from_secs(60 * 60 * 7)), + ("7m", Duration::from_secs(60 * 7)), + ("7s", Duration::from_secs(7)), + ("7ms", Duration::from_millis(7)), + ("7us", Duration::from_micros(7)), + ("7ns", Duration::from_nanos(7)), + ] { + assert_eq!( + query_parser::duration_literal_impl(as_str).unwrap(), + dur + ); + } + + assert!(query_parser::duration_literal_impl("-1m").is_err()); + let too_big: i64 = u32::MAX as i64 + 1; + assert!(query_parser::duration_literal_impl(&format!("{too_big}s")) + .is_err()); + } + + #[test] + fn test_uuid_literal() { + const ID: Uuid = uuid::uuid!("9f8900bd-886d-4988-b623-95b7fda36d23"); + let as_string = format!("\"{}\"", ID); + assert_eq!(query_parser::uuid_literal_impl(&as_string).unwrap(), ID); + let without_dashes = as_string.replace('-', ""); + assert_eq!( + query_parser::uuid_literal_impl(&without_dashes).unwrap(), + ID + ); + + assert!(query_parser::uuid_literal_impl( + &as_string[1..as_string.len() - 2] + ) + .is_err()); + assert!(query_parser::uuid_literal_impl( + &without_dashes[1..without_dashes.len() - 2] + ) + .is_err()); + } + + #[test] + fn test_integer_literal() { + assert_eq!(query_parser::integer_literal_impl("1").unwrap(), 1); + assert_eq!(query_parser::integer_literal_impl("-1").unwrap(), -1); + assert_eq!(query_parser::integer_literal_impl("-1").unwrap(), -1); + + assert!(query_parser::integer_literal_impl("-1.0").is_err()); + assert!(query_parser::integer_literal_impl("-1.").is_err()); + assert!(query_parser::integer_literal_impl("1e3").is_err()); + } + + #[test] + fn test_double_literal() { + assert_eq!(query_parser::double_literal_impl("1.0").unwrap(), 1.0); + assert_eq!(query_parser::double_literal_impl("-1.0").unwrap(), -1.0); + assert_eq!(query_parser::double_literal_impl("1.").unwrap(), 1.0); + assert_eq!(query_parser::double_literal_impl("-1.").unwrap(), -1.0); + assert_eq!(query_parser::double_literal_impl(".5").unwrap(), 0.5); + assert_eq!(query_parser::double_literal_impl("-.5").unwrap(), -0.5); + assert_eq!(query_parser::double_literal_impl("1e3").unwrap(), 1e3); + assert_eq!(query_parser::double_literal_impl("-1e3").unwrap(), -1e3); + assert_eq!(query_parser::double_literal_impl("-1e-3").unwrap(), -1e-3); + assert_eq!( + query_parser::double_literal_impl("0.5e-3").unwrap(), + 0.5e-3 + ); + + assert!(query_parser::double_literal_impl("-.e4").is_err()); + assert!(query_parser::double_literal_impl("-.e-4").is_err()); + assert!(query_parser::double_literal_impl("1e").is_err()); + } + + #[test] + fn test_recognize_escape_sequences_with_none() { + for each in ["", "abc", "$%("] { + assert_eq!(recognize_escape_sequences(each).unwrap(), each); + } + } + + #[test] + fn test_recognize_escape_sequence_with_valid_unicode_sequence() { + // Welp, let's just test every possible code point. + for x in 0..=0x10FFFF { + let expected = char::from_u32(x); + let as_hex = format!("{x:0x}"); + let sequence = format!("\\u{{{as_hex}}}"); + let recognized = recognize_escape_sequences(&sequence) + .map(|s| s.chars().next().unwrap()); + assert_eq!( + expected, recognized, + "did not correctly recognized Unicode escape sequence" + ); + } + } + + #[test] + fn test_recognize_escape_sequences_with_invalid_unicode_sequence() { + for each in [ + r#"\uFFFF"#, // Valid, but not using {} delimiters + r#"\u{}"#, // Not enough characters. + r#"\u{12345678}"#, // Too many characters + r#"\u{ZZZZ}"#, // Not hex digits + r#"\u{d800}"#, // A surrogate code point, not valid. + r#"\u{1234"#, // Valid, but missing closing brace. + ] { + println!("{each}"); + assert!(recognize_escape_sequences(each).is_none()); + } + } + + #[test] + fn test_recognize_escape_sequences_with_valid_escape_sequence() { + for (as_str, expected) in [ + (r#"\n"#, '\n'), + (r#"\r"#, '\r'), + (r#"\t"#, '\t'), + (r#"\0"#, '\0'), + (r#"\\"#, '\\'), + ] { + let recognized = recognize_escape_sequences(as_str).unwrap(); + assert_eq!(recognized.chars().next().unwrap(), expected); + } + } + + #[test] + fn test_single_quoted_string_literal() { + for (input, expected) in [ + ("''", String::new()), + ("'simple'", String::from("simple")), + ("'袈►♖'", String::from("袈►♖")), + (r#"'escapes \n handled'"#, String::from("escapes \n handled")), + (r#"'may contain " in it'"#, String::from("may contain \" in it")), + ( + r#"'may contain "\u{1234}" in it'"#, + String::from("may contain \"ሴ\" in it"), + ), + ] { + assert_eq!( + query_parser::string_literal_impl(input).unwrap(), + expected + ); + } + assert!(query_parser::string_literal_impl(r#"' cannot have ' in it'"#) + .is_err()); + } + + #[test] + fn test_double_quoted_string_literal() { + for (input, expected) in [ + ("\"\"", String::new()), + ("\"simple\"", String::from("simple")), + ("\"袈►♖\"", String::from("袈►♖")), + (r#""escapes \n handled""#, String::from("escapes \n handled")), + (r#""may contain ' in it""#, String::from("may contain ' in it")), + ( + r#""may contain '\u{1234}' in it""#, + String::from("may contain 'ሴ' in it"), + ), + ] { + assert_eq!( + query_parser::string_literal_impl(input).unwrap(), + expected + ); + } + + assert!(query_parser::string_literal_impl(r#"" cannot have " in it""#) + .is_err()); + } + + #[test] + fn test_comparison() { + for (as_str, cmp) in [ + ("==", Comparison::Eq), + ("!=", Comparison::Ne), + (">=", Comparison::Ge), + (">", Comparison::Gt), + ("<=", Comparison::Le), + ("<", Comparison::Lt), + ("~=", Comparison::Like), + ] { + assert_eq!(query_parser::comparison(as_str).unwrap(), cmp); + } + } + + #[test] + fn test_filter_item_single_atom() { + let atom = FilterItem::Atom(FilterAtom { + negated: false, + ident: Ident("a".to_string()), + cmp: Comparison::Eq, + expr: Literal::Boolean(true), + }); + assert_eq!(query_parser::filter_item("a == true").unwrap(), atom); + assert_eq!(query_parser::filter_item("(a == true)").unwrap(), atom); + + assert!(query_parser::filter_item("(a == true").is_err()); + } + + #[test] + fn test_filter_item_single_negated_atom() { + assert_eq!( + query_parser::filter_item("!(a > 1.)").unwrap(), + FilterItem::Atom(FilterAtom { + negated: true, + ident: Ident("a".to_string()), + cmp: Comparison::Gt, + expr: Literal::Double(1.0) + }) + ); + + assert!(query_parser::filter_item("!(a > 1.0").is_err()); + } + + #[test] + fn test_filter_item_two_atoms() { + let left = FilterItem::Atom(FilterAtom { + negated: false, + ident: Ident("a".to_string()), + cmp: Comparison::Eq, + expr: Literal::Boolean(true), + }); + let right = FilterItem::Atom(FilterAtom { + negated: false, + ident: Ident("a".to_string()), + cmp: Comparison::Eq, + expr: Literal::Boolean(true), + }); + + for op in [LogicalOp::And, LogicalOp::Or] { + let expected = FilterItem::Expr(FilterExpr { + left: Box::new(left.clone()), + op, + right: Box::new(right.clone()), + }); + // Match with either parenthesized. + let as_str = format!("a == true {op} (a == true)"); + assert_eq!(query_parser::filter_item(&as_str).unwrap(), expected); + let as_str = format!("(a == true) {op} a == true"); + assert_eq!(query_parser::filter_item(&as_str).unwrap(), expected); + } + } + + #[test] + fn test_filter_atom_precedence() { + let atom = FilterItem::Atom(FilterAtom { + negated: false, + ident: Ident("a".to_string()), + cmp: Comparison::Eq, + expr: Literal::Boolean(true), + }); + let as_str = format!("a == true || a == true && a == true"); + let parsed = query_parser::filter_item(&as_str).unwrap(); + + // && should bind more tightly + let FilterItem::Expr(FilterExpr { left, op, right }) = parsed else { + unreachable!(); + }; + assert_eq!(op, LogicalOp::Or); + assert_eq!(atom, *left); + + // Destructure the RHS, and check it. + let FilterItem::Expr(FilterExpr { left, op, right }) = *right else { + unreachable!(); + }; + assert_eq!(op, LogicalOp::And); + assert_eq!(atom, *left); + assert_eq!(atom, *right); + } + + #[test] + fn test_filter_atom_overridden_precedence() { + let atom = FilterItem::Atom(FilterAtom { + negated: false, + ident: Ident("a".to_string()), + cmp: Comparison::Eq, + expr: Literal::Boolean(true), + }); + let as_str = format!("(a == true || a == true) && a == true"); + let parsed = query_parser::filter_item(&as_str).unwrap(); + + // Now, || should bind more tightly, so we should have (a && b) at the + // top-level, where b is the test atom. + let FilterItem::Expr(FilterExpr { left, op, right }) = parsed else { + unreachable!(); + }; + assert_eq!(op, LogicalOp::And); + assert_eq!(atom, *right); + + // Destructure the LHS and check it. + let FilterItem::Expr(FilterExpr { left, op, right }) = *left else { + unreachable!(); + }; + assert_eq!(op, LogicalOp::Or); + assert_eq!(atom, *left); + assert_eq!(atom, *right); + } + + #[test] + fn test_filter_table_op() { + for expr in [ + "filter field == 0", + "filter baz == 'quux'", + "filter other_field != 'yes'", + "filter id != \"45c937fb-5e99-4a86-a95b-22bf30bf1507\"", + "filter (foo == 'bar') || ((yes != \"no\") && !(maybe > 'so'))", + ] { + let parsed = query_parser::filter(expr) + .expect(&format!("failed to parse query: '{}'", expr)); + println!("{parsed:#?}"); + } + } + + #[test] + fn test_get_table_op() { + for expr in [ + "get foo:bar", + "get target_name:metric_name", + "get target_name_0:metric_name000", + ] { + let parsed = query_parser::get(expr) + .expect(&format!("failed to parse get expr: '{}'", expr)); + println!("{parsed:#?}"); + } + + assert!(query_parser::get("get foo").is_err()); + assert!(query_parser::get("get foo:").is_err()); + assert!(query_parser::get("get :bar").is_err()); + assert!(query_parser::get("get 0:0").is_err()); + } + + #[test] + fn test_ident() { + for id in ["foo", "foo0", "foo_0_1_2"] { + query_parser::ident(id) + .expect(&format!("failed to identifier: '{id}'")); + } + + for id in ["0foo", "0", "A", "", "%"] { + query_parser::ident(id).expect_err(&format!( + "should not have parsed as identifier: '{}'", + id + )); + } + } + + #[test] + fn test_group_by() { + for q in [ + "group_by []", + "group_by [baz]", + "group_by [baz,]", + "group_by [baz,another_field]", + "group_by [baz,another_field,]", + ] { + let parsed = query_parser::group_by(q) + .expect(&format!("failed to parse group_by: '{q}'")); + println!("{parsed:#?}"); + } + } + + #[test] + fn test_query() { + for q in [ + "get foo:bar", + "get foo:bar | group_by []", + "get foo:bar | group_by [baz]", + "get foo:bar | filter baz == 'quuz'", + "get foo:bar | filter (some == 0) && (id == false || a == -1.0)", + "get foo:bar | group_by [baz] | filter baz == 'yo'", + ] { + let parsed = query_parser::query(q) + .expect(&format!("failed to parse query: '{q}'")); + println!("{parsed:#?}"); + } + } + + #[test] + fn test_reducer() { + assert_eq!(query_parser::reducer("mean").unwrap(), Reducer::Mean); + assert!(query_parser::reducer("foo").is_err()); + } +} diff --git a/oximeter/db/src/oxql/mod.rs b/oximeter/db/src/oxql/mod.rs new file mode 100644 index 00000000000..c631dca766a --- /dev/null +++ b/oximeter/db/src/oxql/mod.rs @@ -0,0 +1,30 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! The Oximeter Query Language, OxQL. + +// Copyright 2024 Oxide Computer Company + +#[derive(Clone, Debug, thiserror::Error)] +pub enum Error { + #[error("operation requires a valid timeseries name")] + InvalidTimeseriesName, + + #[error("invalid time window literal")] + InvalidTimeWindowLiteral, + + #[error("invalid integer literal")] + InvalidIntegerLiteral, + + #[error("invalid double literal")] + InvalidDoubleLiteral, +} + +mod ast; +mod grammar; +mod table; +mod transformation; + +pub use table::Table; +pub use grammar::query_parser; diff --git a/oximeter/db/src/oxql/transformation.rs b/oximeter/db/src/oxql/transformation.rs new file mode 100644 index 00000000000..94f0328ba09 --- /dev/null +++ b/oximeter/db/src/oxql/transformation.rs @@ -0,0 +1,23 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Timeseries transformations. + +// Copyright 2024 Oxide Computer + +use std::collections::BTreeMap; +use oximeter::Datum; +use oximeter::FieldValue; +use ndarray::Array2; + +/// A timeseries is the result of an OxQL query. +#[derive(Clone, Debug)] +pub struct Timeseries { + fields: BTreeMap, + data: Array2, +} + +pub(crate) trait Tranformation { + fn apply(&self, client: &Client, timeseries: &[Timeseries]) -> Result, Error>; +} diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 7038f9c0380..5f319309115 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -75,6 +75,7 @@ num-integer = { version = "0.1.45", features = ["i128"] } num-iter = { version = "0.1.43", default-features = false, features = ["i128"] } num-traits = { version = "0.2.16", features = ["i128", "libm"] } openapiv3 = { version = "2.0.0", default-features = false, features = ["skip_serializing_defaults"] } +peg-runtime = { version = "0.8.2", default-features = false, features = ["std"] } pem-rfc7468 = { version = "0.7.0", default-features = false, features = ["std"] } petgraph = { version = "0.6.4", features = ["serde-1"] } postgres-types = { version = "0.2.6", default-features = false, features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] } @@ -182,6 +183,7 @@ num-integer = { version = "0.1.45", features = ["i128"] } num-iter = { version = "0.1.43", default-features = false, features = ["i128"] } num-traits = { version = "0.2.16", features = ["i128", "libm"] } openapiv3 = { version = "2.0.0", default-features = false, features = ["skip_serializing_defaults"] } +peg-runtime = { version = "0.8.2", default-features = false, features = ["std"] } pem-rfc7468 = { version = "0.7.0", default-features = false, features = ["std"] } petgraph = { version = "0.6.4", features = ["serde-1"] } postgres-types = { version = "0.2.6", default-features = false, features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] }