diff --git a/Cargo.lock b/Cargo.lock index 9627143d846..8051ac49894 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -242,6 +242,12 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545" +[[package]] +name = "arrayvec" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" + [[package]] name = "arrayvec" version = "0.7.4" @@ -633,7 +639,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c2f0dc9a68c6317d884f97cc36cf5a3d20ba14ce404227df55e1af708ab04bc" dependencies = [ "arrayref", - "arrayvec", + "arrayvec 0.7.4", "constant_time_eq 0.2.6", ] @@ -1078,6 +1084,17 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd7cc57abe963c6d3b9d8be5b06ba7c8957a930305ca90304f24ef040aa6f961" +[[package]] +name = "clipboard-win" +version = "4.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7191c27c2357d9b7ef96baac1773290d4ca63b24205b82a3fd8a0637afcf0362" +dependencies = [ + "error-code", + "str-buf", + "winapi", +] + [[package]] name = "colorchoice" version = "1.0.0" @@ -1358,6 +1375,23 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "crossterm" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a84cda67535339806297f1b331d6dd6320470d2a0fe65381e79ee9e156dd3d13" +dependencies = [ + "bitflags 1.3.2", + "crossterm_winapi", + "libc", + "mio", + "parking_lot 0.12.1", + "serde", + "signal-hook", + "signal-hook-mio", + "winapi", +] + [[package]] name = "crossterm" version = "0.27.0" @@ -1370,7 +1404,6 @@ dependencies = [ "libc", "mio", "parking_lot 0.12.1", - "serde", "signal-hook", "signal-hook-mio", "winapi", @@ -2372,6 +2405,16 @@ dependencies = [ "libc", ] +[[package]] +name = "error-code" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64f18991e7bf11e7ffee451b5318b5c1a73c52d0d0ada6e5a3017c8c1ced6a21" +dependencies = [ + "libc", + "str-buf", +] + [[package]] name = "expectorate" version = "1.1.0" @@ -5432,7 +5475,6 @@ dependencies = [ "const-oid", "crossbeam-epoch", "crossbeam-utils", - "crossterm", "crypto-common", "diesel", "digest", @@ -5464,6 +5506,8 @@ dependencies = [ "managed", "memchr", "mio", + "nix 0.26.2 (registry+https://github.com/rust-lang/crates.io-index)", + "nom", "num-bigint", "num-integer", "num-iter", @@ -5842,13 +5886,17 @@ dependencies = [ "clap 4.4.3", "dropshot", "expectorate", + "futures", "highway", + "indexmap 2.1.0", "itertools 0.11.0", "omicron-test-utils", "omicron-workspace-hack", "oximeter 0.1.0", + "reedline", "regex", "reqwest", + "rustyline", "schemars", "serde", "serde_json", @@ -5856,7 +5904,10 @@ dependencies = [ "slog-async", "slog-dtrace", "slog-term", + "sqlformat", + "sqlparser", "strum", + "tabled", "thiserror", "tokio", "usdt", @@ -6996,7 +7047,7 @@ checksum = "2e2e4cd95294a85c3b4446e63ef054eea43e0205b1fd60120c16b74ff7ff96ad" dependencies = [ "bitflags 2.4.0", "cassowary", - "crossterm", + "crossterm 0.27.0", "indoc 2.0.3", "itertools 0.11.0", "paste", @@ -7077,12 +7128,12 @@ dependencies = [ [[package]] name = "reedline" -version = "0.25.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7dc1d1d369c194cf79acc204397aca1fecc4248df3e1c1eabb15e5ef2d16991" +checksum = "c2fde955d11817fdcb79d703932fb6b473192cb36b6a92ba21f7f4ac0513374e" dependencies = [ "chrono", - "crossterm", + "crossterm 0.26.1", "fd-lock", "itertools 0.10.5", "nu-ansi-term", @@ -7650,6 +7701,29 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "rustyline" +version = "12.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "994eca4bca05c87e86e15d90fc7a91d1be64b4482b38cb2d27474568fe7c9db9" +dependencies = [ + "bitflags 2.4.0", + "cfg-if 1.0.0", + "clipboard-win", + "fd-lock", + "home", + "libc", + "log", + "memchr", + "nix 0.26.2 (registry+https://github.com/rust-lang/crates.io-index)", + "radix_trie", + "scopeguard", + "unicode-segmentation", + "unicode-width", + "utf8parse", + "winapi", +] + [[package]] name = "ryu" version = "1.0.15" @@ -8533,6 +8607,38 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "sqlformat" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b7b278788e7be4d0d29c0f39497a0eef3fba6bbc8e70d8bf7fde46edeaa9e85" +dependencies = [ + "itertools 0.11.0", + "nom", + "unicode_categories", +] + +[[package]] +name = "sqlparser" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eaa1e88e78d2c2460d78b7dc3f0c08dbb606ab4222f9aff36f420d36e307d87" +dependencies = [ + "log", + "sqlparser_derive", +] + +[[package]] +name = "sqlparser_derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -8567,6 +8673,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "str-buf" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e08d8363704e6c71fc928674353e6b7c23dcea9d82d7012c8faf2a3a025f8d0" + [[package]] name = "string_cache" version = "0.8.7" @@ -8593,9 +8705,9 @@ dependencies = [ [[package]] name = "strip-ansi-escapes" -version = "0.2.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55ff8ef943b384c414f54aefa961dd2bd853add74ec75e7ac74cf91dba62bcfa" +checksum = "011cbb39cf7c1f62871aea3cc46e5817b0937b49e9447370c93cacbe93a766d8" dependencies = [ "vte", ] @@ -9724,6 +9836,12 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" +[[package]] +name = "unicode_categories" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" + [[package]] name = "universal-hash" version = "0.5.1" @@ -9950,10 +10068,11 @@ dependencies = [ [[package]] name = "vte" -version = "0.11.1" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5022b5fbf9407086c180e9557be968742d839e68346af7792b8592489732197" +checksum = "6cbce692ab4ca2f1f3047fcf732430249c0e971bfdd2b234cf2c47ad93af5983" dependencies = [ + "arrayvec 0.5.2", "utf8parse", "vte_generate_state_changes", ] @@ -10143,7 +10262,7 @@ dependencies = [ "camino", "ciborium", "clap 4.4.3", - "crossterm", + "crossterm 0.27.0", "debug-ignore", "futures", "hex", @@ -10208,7 +10327,7 @@ dependencies = [ "camino", "ciborium", "clap 4.4.3", - "crossterm", + "crossterm 0.27.0", "omicron-workspace-hack", "ratatui", "reedline", diff --git a/Cargo.toml b/Cargo.toml index 0e139465339..5ec231d5fb3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -298,6 +298,7 @@ rand = "0.8.5" ratatui = "0.23.0" rayon = "1.8" rcgen = "0.11.3" +reedline = "0.22.0" ref-cast = "1.0" regex = "1.10.2" regress = "0.7.1" @@ -307,6 +308,7 @@ rpassword = "7.2.0" rstest = "0.18.2" rustfmt-wrapper = "0.2" rustls = "0.21.8" +rustyline = "12.0.0" samael = { git = "https://github.com/njaremko/samael", features = ["xmlsec"], branch = "master" } schemars = "0.8.12" secrecy = "0.8.0" @@ -340,6 +342,7 @@ sp-sim = { path = "sp-sim" } sprockets-common = { git = "http://github.com/oxidecomputer/sprockets", rev = "77df31efa5619d0767ffc837ef7468101608aee9" } sprockets-host = { git = "http://github.com/oxidecomputer/sprockets", rev = "77df31efa5619d0767ffc837ef7468101608aee9" } sprockets-rot = { git = "http://github.com/oxidecomputer/sprockets", rev = "77df31efa5619d0767ffc837ef7468101608aee9" } +sqlparser = { version = "0.36.1", features = [ "visitor" ] } static_assertions = "1.1.0" # Please do not change the Steno version to a Git dependency. It makes it # harder than expected to make breaking changes (even if you specify a specific diff --git a/oximeter/db/Cargo.toml b/oximeter/db/Cargo.toml index d37c57ccced..cd772833100 100644 --- a/oximeter/db/Cargo.toml +++ b/oximeter/db/Cargo.toml @@ -9,25 +9,44 @@ license = "MPL-2.0" anyhow.workspace = true async-trait.workspace = true bcs.workspace = true -bytes = { workspace = true, features = [ "serde" ] } chrono.workspace = true clap.workspace = true dropshot.workspace = true +futures.workspace = true highway.workspace = true +indexmap.workspace = true +omicron-workspace-hack.workspace = true oximeter.workspace = true +reedline.workspace = true regex.workspace = true -reqwest = { workspace = true, features = [ "json" ] } -schemars = { workspace = true, features = [ "uuid1", "bytes", "chrono" ] } +rustyline.workspace = true serde.workspace = true serde_json.workspace = true slog.workspace = true slog-async.workspace = true slog-term.workspace = true +sqlparser.workspace = true +sqlformat = "0.2.2" +tabled.workspace = true thiserror.workspace = true -tokio = { workspace = true, features = [ "rt-multi-thread", "macros" ] } usdt.workspace = true uuid.workspace = true -omicron-workspace-hack.workspace = true + +[dependencies.bytes] +workspace = true +features = [ "serde" ] + +[dependencies.reqwest] +workspace = true +features = [ "json" ] + +[dependencies.schemars] +workspace = true +features = [ "uuid1", "bytes", "chrono" ] + +[dependencies.tokio] +workspace = true +features = [ "rt-multi-thread", "macros" ] [dev-dependencies] expectorate.workspace = true diff --git a/oximeter/db/notes.txt b/oximeter/db/notes.txt deleted file mode 100644 index 66c3871d460..00000000000 --- a/oximeter/db/notes.txt +++ /dev/null @@ -1,232 +0,0 @@ -Some notes on querying - -For pagination: - -- Timeseries name is enough for paginated list timeseries endpoint. -It's just normal keyset pagination. - -- For the timeseries data, we'll be using limit/offset pagination. We'll -run the query to get the consistent timeseries keys each time. This is -the `ScanParams` part of the `WhichPage`. The `PageSelector` is the offset. - - -Now, how to run more complex queries? A good example is something like, -aggregating the timeseries across all but one field. For example, let's -look at the Nexus HTTP latency data. The fields are: - -- name (String) -- id (Uuid) -- route (String) -- method (String) -- status_code (I64) - -Imagine we wanted to look at the average latency by route, so averaged -across all methods and status codes. (Let's ingore name/id) - -We need to group the timeseries keys by route, to find the set of keys -consistent with each different route. ClickHouse provides the `groupArray` -function, which is an aggregate function that collects multiple values -into an array. So we can do: - -``` -SELECT - field_value, - groupArray(timeseries_key) -FROM fields_string -WHERE field_name = 'route' -GROUP BY field_value; - - -┌─field_value───────────────────────────────────────────┬─groupArray(timeseries_key)────────────────┐ -│ /metrics/producers │ [1916712826069192294,6228796576473532827] │ -│ /metrics/collectors │ [1500085842574282480] │ -│ /metrics/collect/e6bff1ff-24fb-49dc-a54e-c6a350cd4d6c │ [15389669872422126367] │ -│ /sled_agents/fb0f7546-4d46-40ca-9d56-cbb810684ca7 │ [1166666993114742619] │ -└───────────────────────────────────────────────────────┴───────────────────────────────────────────┘ -``` - -This gives an array of timeseries keys where the route is each of the values -on the left. - -So at a very high level, we can average all the timeseries values where the keys -are in each of these different arrays. - - -This kinda works. It produces an array of arrays, the counts for each of the -histograms, grouped by the field value. - -``` -SELECT - field_value, - groupArray(counts) -FROM -( - SELECT - field_value, - timeseries_key - FROM fields_string - WHERE field_name = 'route' -) AS f0 -INNER JOIN -( - SELECT * - FROM measurements_histogramf64 -) AS meas USING (timeseries_key) -GROUP BY field_value -``` - -We can extend this `groupArray(bins), groupArray(counts)` to get both. - - -Ok, we're getting somewhere. The aggregation "combinators" modify the behavior of -aggregations, in pretty suprising and powerful ways. For example: - -``` -SELECT - field_value, - sumForEach(counts) -FROM -( - SELECT - field_value, - timeseries_key - FROM fields_string - WHERE field_name = 'route' -) AS f0 -INNER JOIN -( - SELECT * - FROM measurements_histogramf64 -) AS meas USING (timeseries_key) -GROUP BY field_value -``` - -This applies the `-ForEach` combinator to the sum aggregation. This applies the -aggregation to corresponding elements of a sequence (table?) of arrays. We can -do this with any of the aggregations, `avg`, `min`, etc. - - -The `-Resample` combinator also looks interesting. It uses its arguments to create -a set of intervals, and applies the aggregation within each of those intervals. -So sort of a group-by interval or window function. - -Another useful method is `toStartOfInterval`. This takes a timestamp and an interval, -say 5 seconds, or 10 minutes, and returns the interval into which that timestamp -falls. Could be very helpful for aligning/binning data to time intervals. But -it does "round", in that the bins don't start at the first timestamp, but at -the rounded-down interval from that timestamp. - -It's possible to build intervals that start exactly at the first timestamp with: - -``` -SELECT - timestamp, - toStartOfInterval(timestamp, toIntervalMinute(1)) + ( - SELECT toSecond(min(timestamp)) - FROM measurements_histogramf64 - ) -FROM measurements_histogramf64 -``` - -Or some other rounding shenanigans. - - -Putting lots of this together: - -``` -SELECT - f0.field_name, - f0.field_value, - f1.field_name, - f1.field_value, - minForEach(bins), - avgForEach(counts) -FROM -( - SELECT - field_name, - field_value, - timeseries_key - FROM fields_string - WHERE field_name = 'route' -) AS f0 -INNER JOIN -( - SELECT - field_name, - field_value, - timeseries_key - FROM fields_i64 - WHERE field_name = 'status_code' -) AS f1 ON f0.timeseries_key = f1.timeseries_key -INNER JOIN -( - SELECT * - FROM measurements_histogramf64 -) AS meas ON f1.timeseries_key = meas.timeseries_key -GROUP BY - f0.field_name, - f0.field_value, - f1.field_name, - f1.field_value -``` - -This selects the field name/value, and the bin and average count for each -histogram, grouping by route and status code. - -These inner select statements look similar to the ones we already -implement in `field.as_query`. But in that case we select *, and here we -probably don't want to do that to avoid errors about things not being -in aggregations or group by's. - -This works (or is syntactically valid) for scalars, if we replace the -combinators with their non-combinator version: e.g, `avgForEach` -> `avg`. - - -Other rando thoughts. - -It'd be nice to have the query builder be able to handle all these, but -I'm not sure how worth it that is. For example, I don't even think we need -the timeseries keys in this query. For the fields where we are specifying -a condition, we have subqueries like: - -``` -SELECT * -FROM fields_{TYPE} -WHERE field_name = NAME -AND field_value OP VALUE; -``` - -For ones where we _don't_ care, we just have the first three lines: - -``` -SELECT * -FROM fields_{TYPE} -WHERE field_name = NAME; -``` - -We can join successive entries on timeseries keys. - -For straight SELECT queries, that's pretty much it, like we have currently. -For AGGREGATION queries, we need to - -- Have a group-by for each (field_name, field_value) pair. This is true -even when we're unselective on the field, because we are still taking that -field, and we still need to group the keys accordingly. -- Select the consistent timeseries keys. This is so we can correlate the -results of the aggregation back to the field names/values which we still -get from the key-select query. -- Apply the aggregation to the measurements. For scalars, this just the -aggregation. For histograms, this is the `-Array` or `-ForEach` combinator -for that aggregation, depending on what we're applying. -- ??? to the timestamps? -- some alignment, grouping, subsampling? It seems -this has to come from the aggregation query, because there's not a useful -default. - -Speaking of defaults, how do these functions behave with missing data? -Or more subtly, what happens if two histograms (say) have the same number -of bins, but the actual bin edges are different? ClickHouse itself doesn't -deal with this AFAICT, which means we'd need to do that in the client. -Ah, but that is unlikely, since we're only aggregating data from the -same timeseries, with the same key. So far anyway. I'm not sure what'll -happen when we start correlating data between timeseries. diff --git a/oximeter/db/src/bin/oxdb.rs b/oximeter/db/src/bin/oxdb.rs index e14fdeb6a8e..314653a10f0 100644 --- a/oximeter/db/src/bin/oxdb.rs +++ b/oximeter/db/src/bin/oxdb.rs @@ -3,16 +3,24 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. //! Tool for developing against the Oximeter timeseries database, populating data and querying. -// Copyright 2021 Oxide Computer Company + +// Copyright 2023 Oxide Computer Company use anyhow::{bail, Context}; use chrono::{DateTime, Utc}; use clap::{Args, Parser}; +use oximeter::DatumType; use oximeter::{ types::{Cumulative, Sample}, Metric, Target, }; +use oximeter_db::sql::function_allow_list; +use oximeter_db::QueryResult; use oximeter_db::{query, Client, DbWrite}; +use reedline::DefaultPrompt; +use reedline::DefaultPromptSegment; +use reedline::Reedline; +use reedline::Signal; use slog::{debug, info, o, Drain, Level, Logger}; use std::net::IpAddr; use std::net::SocketAddr; @@ -138,6 +146,12 @@ enum Subcommand { #[clap(long, conflicts_with("end"), action)] end_exclusive: Option>, }, + + /// Enter a SQL shell for interactive querying. + Sql { + #[clap(flatten)] + opts: ShellOptions, + }, } async fn make_client( @@ -295,8 +309,241 @@ async fn query( Ok(()) } +fn print_basic_commands() { + println!("Basic commands:"); + println!(" \\?, \\h, help - Print this help"); + println!(" \\q, quit, exit, ^D - Exit the shell"); + println!(" \\l - List tables"); + println!(" \\d - Describe a table"); + println!(" \\f - List supported ClickHouse SQL functions"); + println!(); + println!("Or try entering a SQL `SELECT` statement"); +} + +async fn list_virtual_tables(client: &Client) -> anyhow::Result<()> { + for name in client.schema().await.into_keys() { + println!("{name}"); + } + Ok(()) +} + +async fn describe_virtual_table( + client: &Client, + table: &str, +) -> anyhow::Result<()> { + match table.parse() { + Err(_) => println!("Invalid timeseries name: {table}"), + Ok(name) => { + if let Some(schema) = client.schema_for_timeseries(&name).await? { + let mut cols = + Vec::with_capacity(schema.field_schema.len() + 2); + let mut types = cols.clone(); + for field in schema.field_schema.iter() { + cols.push(field.name.clone()); + types.push(field.ty.to_string()); + } + cols.push("timestamp".into()); + types.push("DateTime64".into()); + + if schema.datum_type.is_histogram() { + cols.push("start_time".into()); + types.push("DateTime64".into()); + + cols.push("bins".into()); + if matches!(schema.datum_type, DatumType::HistogramF64) { + types.push("Array[f64]".into()); + } else { + types.push("Array[u64]".into()); + }; + + cols.push("counts".into()); + types.push("Array[u64]".into()); + } else if schema.datum_type.is_cumulative() { + cols.push("start_time".into()); + types.push("DateTime64".into()); + cols.push("datum".into()); + types.push(schema.datum_type.to_string()); + } else { + cols.push("datum".into()); + types.push(schema.datum_type.to_string()); + } + + let mut builder = tabled::builder::Builder::default(); + builder.set_header(cols); + builder.push_record(types); + println!( + "{}", + builder.build().with(tabled::settings::Style::psql()) + ); + } else { + println!("No such timeseries: {table}"); + } + } + } + Ok(()) +} + +#[derive(Clone, Copy, Debug, Args)] +struct ShellOptions { + /// Print query metadata. + #[clap(long = "metadata")] + print_metadata: bool, + /// Print the original SQL query. + #[clap(long = "original")] + print_original_query: bool, + /// Print the rewritten SQL query that is actually run on the DB. + #[clap(long = "rewritten")] + print_rewritten_query: bool, +} + +impl Default for ShellOptions { + fn default() -> Self { + Self { + print_metadata: true, + print_original_query: false, + print_rewritten_query: false, + } + } +} + +fn list_supported_functions() { + println!("Subset of ClickHouse SQL functions currently supported"); + println!( + "See https://clickhouse.com/docs/en/sql-reference/functions for more" + ); + println!(); + for func in function_allow_list().iter() { + println!(" {func}"); + } +} + +async fn sql_shell( + address: IpAddr, + port: u16, + log: Logger, + opts: ShellOptions, +) -> anyhow::Result<()> { + let client = make_client(address, port, &log).await?; + let dummy = "foo:bar".parse().unwrap(); + let _ = client.schema_for_timeseries(&dummy).await; + let mut ed = Reedline::create(); + let prompt = DefaultPrompt::new( + DefaultPromptSegment::Basic("0x".to_string()), + DefaultPromptSegment::Empty, + ); + println!("Oximeter SQL shell"); + println!(); + print_basic_commands(); + loop { + let sig = ed.read_line(&prompt); + match sig { + Ok(Signal::Success(buf)) => { + let cmd = buf.as_str().trim(); + match cmd { + "" => continue, + "\\?" | "\\h" | "help" => print_basic_commands(), + "\\q" | "quit" | "exit" => return Ok(()), + "\\l" | "\\d" => list_virtual_tables(&client).await?, + "\\f" => list_supported_functions(), + _ => { + if let Some(table_name) = cmd.strip_prefix("\\d ") { + if table_name.is_empty() { + list_virtual_tables(&client).await?; + } else { + describe_virtual_table( + &client, + table_name.trim().trim_end_matches(';'), + ) + .await?; + } + } else { + match client.query(&buf).await { + Err(e) => println!("Query failed: {e:#?}"), + Ok(QueryResult { + original_query, + rewritten_query, + metadata, + table, + }) => { + println!(); + let mut builder = + tabled::builder::Builder::default(); + builder.set_header(&table.column_names); + for row in table.rows.iter() { + builder.push_record( + row.iter().map(ToString::to_string), + ); + } + if opts.print_original_query { + println!( + "{}", + sqlformat::format( + &original_query, + &sqlformat::QueryParams::None, + sqlformat::FormatOptions { + uppercase: true, + ..Default::default() + } + ) + ); + println!(); + } + if opts.print_rewritten_query { + println!( + "{}", + sqlformat::format( + &rewritten_query, + &sqlformat::QueryParams::None, + sqlformat::FormatOptions { + uppercase: true, + ..Default::default() + } + ) + ); + println!(); + } + println!( + "{}", + builder.build().with( + tabled::settings::Style::psql() + ) + ); + if opts.print_metadata { + println!(); + println!("Metadata"); + println!( + " Query ID: {}", + metadata.id + ); + println!( + " Result rows: {}", + table.rows.len() + ); + println!( + " Time: {:?}", + metadata.elapsed + ); + println!( + " Read: {}\n", + metadata.summary.read + ); + } + } + } + } + } + } + } + Ok(Signal::CtrlD) => return Ok(()), + Ok(Signal::CtrlC) => continue, + err => println!("err: {err:?}"), + } + } +} + #[tokio::main] -async fn main() { +async fn main() -> anyhow::Result<()> { + usdt::register_probes().unwrap(); let args = OxDb::parse(); let decorator = slog_term::TermDecorator::new().build(); let drain = slog_term::FullFormat::new(decorator) @@ -308,12 +555,10 @@ async fn main() { match args.cmd { Subcommand::Describe => describe_data(), Subcommand::Populate { populate_args } => { - populate(args.address, args.port, log, populate_args) - .await - .unwrap(); + populate(args.address, args.port, log, populate_args).await? } Subcommand::Wipe => { - wipe_single_node_db(args.address, args.port, log).await.unwrap() + wipe_single_node_db(args.address, args.port, log).await? } Subcommand::Query { timeseries_name, @@ -342,8 +587,11 @@ async fn main() { start, end, ) - .await - .unwrap(); + .await?; + } + Subcommand::Sql { opts } => { + sql_shell(args.address, args.port, log, opts).await? } } + Ok(()) } diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs index 88b95c37641..c108506a8d5 100644 --- a/oximeter/db/src/client.rs +++ b/oximeter/db/src/client.rs @@ -8,6 +8,7 @@ use crate::model; use crate::query; +use crate::sql::RestrictedQuery; use crate::Error; use crate::Metric; use crate::Target; @@ -22,7 +23,9 @@ use dropshot::EmptyScanParams; use dropshot::PaginationOrder; use dropshot::ResultsPage; use dropshot::WhichPage; +use indexmap::IndexMap; use oximeter::types::Sample; +use reqwest::header::HeaderMap; use slog::debug; use slog::error; use slog::info; @@ -35,6 +38,8 @@ use std::collections::BTreeSet; use std::convert::TryFrom; use std::net::SocketAddr; use std::num::NonZeroU32; +use std::time::Duration; +use std::time::Instant; use tokio::sync::Mutex; use uuid::Uuid; @@ -44,6 +49,137 @@ mod probes { fn query__done(_: &usdt::UniqueId) {} } +/// A count of bytes / rows accessed during a query. +#[derive(Clone, Copy, Debug)] +pub struct IoCount { + pub bytes: u64, + pub rows: u64, +} + +impl std::fmt::Display for IoCount { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{} rows ({} bytes)", self.rows, self.bytes) + } +} + +/// Summary of the I/O and duration of a query. +#[derive(Clone, Copy, Debug, serde::Deserialize)] +#[serde(try_from = "serde_json::Value")] +pub struct QuerySummary { + /// The bytes and rows read by the query. + pub read: IoCount, + /// The bytes and rows written by the query. + pub written: IoCount, +} + +impl TryFrom for QuerySummary { + type Error = Error; + + fn try_from(j: serde_json::Value) -> Result { + use serde_json::Map; + use serde_json::Value; + use std::str::FromStr; + + let Value::Object(map) = j else { + return Err(Error::Database(String::from( + "Expected a JSON object for a metadata summary", + ))); + }; + + fn unpack_summary_value( + map: &Map, + key: &str, + ) -> Result + where + T: FromStr, + ::Err: std::error::Error, + { + let value = map.get(key).ok_or_else(|| { + Error::MissingHeaderKey { key: key.to_string() } + })?; + let Value::String(v) = value else { + return Err(Error::BadMetadata { + key: key.to_string(), + msg: String::from("Expected a string value"), + }); + }; + v.parse::().map_err(|e| Error::BadMetadata { + key: key.to_string(), + msg: e.to_string(), + }) + } + let rows_read: u64 = unpack_summary_value(&map, "read_rows")?; + let bytes_read: u64 = unpack_summary_value(&map, "read_bytes")?; + let rows_written: u64 = unpack_summary_value(&map, "written_rows")?; + let bytes_written: u64 = unpack_summary_value(&map, "written_bytes")?; + Ok(Self { + read: IoCount { bytes: bytes_read, rows: rows_read }, + written: IoCount { bytes: bytes_written, rows: rows_written }, + }) + } +} + +/// Basic metadata about the resource usage of a single SQL query. +#[derive(Clone, Copy, Debug)] +pub struct QueryMetadata { + /// The database-assigned query ID. + pub id: Uuid, + /// The total duration of the query (network plus execution). + pub elapsed: Duration, + /// Summary of the data read and written. + pub summary: QuerySummary, +} + +impl QueryMetadata { + fn from_headers( + elapsed: Duration, + headers: &HeaderMap, + ) -> Result { + fn get_header<'a>( + map: &'a HeaderMap, + key: &'a str, + ) -> Result<&'a str, Error> { + let hdr = map.get(key).ok_or_else(|| Error::MissingHeaderKey { + key: key.to_string(), + })?; + std::str::from_utf8(hdr.as_bytes()) + .map_err(|err| Error::Database(err.to_string())) + } + let summary = + serde_json::from_str(get_header(headers, "X-ClickHouse-Summary")?) + .map_err(|err| Error::Database(err.to_string()))?; + let id = get_header(headers, "X-ClickHouse-Query-Id")? + .parse() + .map_err(|err: uuid::Error| Error::Database(err.to_string()))?; + Ok(Self { id, elapsed, summary }) + } +} + +/// A tabular result from a SQL query against a timeseries. +#[derive(Clone, Debug, Default, serde::Serialize)] +pub struct Table { + /// The name of each column in the result set. + pub column_names: Vec, + /// The rows of the result set, one per column. + pub rows: Vec>, +} + +/// The full result of running a SQL query against a timeseries. +#[derive(Clone, Debug)] +pub struct QueryResult { + /// The query as written by the client. + pub original_query: String, + /// The rewritten query, run against the JOINed representation of the + /// timeseries. + /// + /// This is the query that is actually run in the database itself. + pub rewritten_query: String, + /// Metadata about the resource usage of the query. + pub metadata: QueryMetadata, + /// The result of the query, with column names and rows. + pub table: Table, +} + /// A `Client` to the ClickHouse metrics database. #[derive(Debug)] pub struct Client { @@ -82,6 +218,68 @@ impl Client { Ok(()) } + /// Run a SQL query against a timeseries. + pub async fn query( + &self, + query: impl AsRef, + ) -> Result { + let original_query = query.as_ref().trim_end_matches(';'); + let restricted = RestrictedQuery::new(original_query)?; + let ox_sql = restricted.to_oximeter_sql(&*self.schema.lock().await)?; + let rewritten = format!("{ox_sql} FORMAT JSONEachRow"); + debug!( + self.log, + "rewrote restricted query"; + "original_sql" => &original_query, + "rewritten_sql" => &rewritten, + ); + let request = self + .client + .post(&self.url) + .query(&[ + ("output_format_json_quote_64bit_integers", "0"), + ("database", crate::DATABASE_NAME), + ]) + .body(rewritten.clone()); + let query_start = Instant::now(); + let response = handle_db_response( + request + .send() + .await + .map_err(|err| Error::DatabaseUnavailable(err.to_string()))?, + ) + .await?; + let metadata = QueryMetadata::from_headers( + query_start.elapsed(), + response.headers(), + )?; + let text = response.text().await.unwrap(); + let mut table = Table::default(); + for line in text.lines() { + let row = + serde_json::from_str::>( + line.trim(), + ) + .unwrap(); + if table.column_names.is_empty() { + table.column_names.extend(row.keys().cloned()) + } else { + assert!(table + .column_names + .iter() + .zip(row.keys()) + .all(|(k1, k2)| k1 == k2)); + } + table.rows.push(row.values().cloned().collect()); + } + Ok(QueryResult { + original_query: original_query.to_string(), + rewritten_query: rewritten, + metadata, + table, + }) + } + /// Select timeseries from criteria on the fields and start/end timestamps. pub async fn select_timeseries_with( &self, @@ -216,6 +414,13 @@ impl Client { Ok(schema.get(name).map(Clone::clone)) } + /// Return the _current_ timeseries + /// + /// TODO(ben) remove + pub async fn schema(&self) -> BTreeMap { + self.schema.lock().await.clone() + } + /// List timeseries schema, paginated. pub async fn timeseries_schema_list( &self, @@ -264,7 +469,7 @@ impl Client { ResultsPage::new(schema, &dropshot::EmptyScanParams {}, |schema, _| { schema.timeseries_name.clone() }) - .map_err(|e| Error::Database(e.to_string())) + .map_err(|err| Error::Database(err.to_string())) } /// Validates that the schema used by the DB matches the version used by @@ -817,13 +1022,14 @@ async fn handle_db_response( // NOTE: ClickHouse returns 404 for all errors (so far encountered). We pull the text from // the body if possible, which contains the actual error from the database. let body = response.text().await.unwrap_or_else(|e| e.to_string()); - Err(Error::Database(body)) + Err(Error::Database(format!("Query failed: {body}"))) } } #[cfg(test)] mod tests { use super::*; + use crate::model::OXIMETER_VERSION; use crate::query; use crate::query::field_table_name; use crate::query::measurement_table_name; @@ -839,6 +1045,7 @@ mod tests { use oximeter::Metric; use oximeter::Target; use std::net::Ipv6Addr; + use std::path::PathBuf; use std::time::Duration; use tokio::time::sleep; use uuid::Uuid; @@ -3345,4 +3552,87 @@ mod tests { ); } } + + #[tokio::test] + async fn test_sql_query_output() { + let logctx = test_setup_log("test_sql_query_output"); + let log = &logctx.log; + let mut db = ClickHouseInstance::new_single_node(0) + .await + .expect("Failed to start ClickHouse"); + let address = SocketAddr::new("::1".parse().unwrap(), db.port()); + let client = Client::new(address, &log); + client + .initialize_db_with_version(false, OXIMETER_VERSION) + .await + .expect("Failed to initialize timeseries database"); + let (_target, metrics, samples) = setup_select_test(); + client.insert_samples(&samples).await.unwrap(); + + // Sanity check that we get exactly the number of samples we expected. + let res = client + .query("SELECT count() AS total FROM service:request_latency") + .await + .unwrap(); + assert_eq!(res.table.rows.len(), 1); + let serde_json::Value::Number(n) = &res.table.rows[0][0] else { + panic!("Expected exactly 1 row with 1 item"); + }; + assert_eq!(n.as_u64().unwrap(), samples.len() as u64); + + // Assert grouping by the keys results in exactly the number of samples + // expected for each timeseries. + let res = client + .query( + "SELECT count() AS total \ + FROM service:request_latency \ + GROUP BY timeseries_key; \ + ", + ) + .await + .unwrap(); + assert_eq!(res.table.rows.len(), metrics.len()); + for row in res.table.rows.iter() { + assert_eq!(row.len(), 1); + let serde_json::Value::Number(n) = &row[0] else { + panic!("Expected a number in each row"); + }; + assert_eq!( + n.as_u64().unwrap(), + (samples.len() / metrics.len()) as u64 + ); + } + + // Read test SQL and make sure we're getting expected results. + let sql_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("test-output") + .join("sql"); + let mut rd = tokio::fs::read_dir(&sql_dir) + .await + .expect("failed to read SQL test directory"); + while let Some(next_entry) = + rd.next_entry().await.expect("failed to read directory entry") + { + let sql_file = next_entry.path().join("query.sql"); + let result_file = next_entry.path().join("result.txt"); + let query = tokio::fs::read_to_string(&sql_file) + .await + .unwrap_or_else(|_| { + panic!( + "failed to read test SQL query in '{}", + sql_file.display() + ) + }); + let res = client + .query(&query) + .await + .expect("failed to execute test query"); + expectorate::assert_contents( + result_file, + &serde_json::to_string_pretty(&res.table).unwrap(), + ); + } + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } } diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index 11ecbeddc87..5fd7f4d0c36 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -21,9 +21,11 @@ use thiserror::Error; mod client; pub mod model; pub mod query; +pub use client::QueryResult; pub use client::{Client, DbWrite}; +pub mod sql; -#[derive(Clone, Debug, Error)] +#[derive(Debug, Error)] pub enum Error { #[error("Oximeter core error: {0}")] Oximeter(#[from] oximeter::MetricsError), @@ -32,8 +34,14 @@ pub enum Error { #[error("Telemetry database unavailable: {0}")] DatabaseUnavailable(String), + #[error("Missing expected metadata header key '{key}'")] + MissingHeaderKey { key: String }, + + #[error("Invalid or malformed query metadata for key '{key}': {msg}")] + BadMetadata { key: String, msg: String }, + /// An error interacting with the telemetry database - #[error("Error interacting with telemetry database: {0}")] + #[error("Error interacting with telemetry database")] Database(String), /// A schema provided when collecting samples did not match the expected schema @@ -79,6 +87,9 @@ pub enum Error { #[error("Query must resolve to a single timeseries if limit is specified")] InvalidLimitQuery, + + #[error("SQL error")] + Sql(#[from] sql::Error), } /// A timeseries name. @@ -135,6 +146,13 @@ impl std::fmt::Display for TimeseriesName { } } +impl std::str::FromStr for TimeseriesName { + type Err = Error; + fn from_str(s: &str) -> Result { + Self::try_from(s) + } +} + impl std::convert::TryFrom<&str> for TimeseriesName { type Error = Error; fn try_from(s: &str) -> Result { @@ -150,13 +168,6 @@ impl std::convert::TryFrom for TimeseriesName { } } -impl std::str::FromStr for TimeseriesName { - type Err = Error; - fn from_str(s: &str) -> Result { - s.try_into() - } -} - impl PartialEq for TimeseriesName where T: AsRef, @@ -166,7 +177,7 @@ where } } -fn validate_timeseries_name(s: &str) -> Result<&str, Error> { +pub(crate) fn validate_timeseries_name(s: &str) -> Result<&str, Error> { if regex::Regex::new(TIMESERIES_NAME_REGEX).unwrap().is_match(s) { Ok(s) } else { @@ -393,11 +404,11 @@ const DATABASE_SELECT_FORMAT: &str = "JSONEachRow"; // // - Start with lowercase a-z // - Any number of alphanumerics -// - Zero or more of the above, delimited by '-'. +// - Zero or more of the above, delimited by '_'. // // That describes the target/metric name, and the timeseries is two of those, joined with ':'. const TIMESERIES_NAME_REGEX: &str = - "(([a-z]+[a-z0-9]*)(_([a-z0-9]+))*):(([a-z]+[a-z0-9]*)(_([a-z0-9]+))*)"; + "^(([a-z]+[a-z0-9]*)(_([a-z0-9]+))*):(([a-z]+[a-z0-9]*)(_([a-z0-9]+))*)$"; #[cfg(test)] mod tests { @@ -426,6 +437,7 @@ mod tests { assert!(TimeseriesName::try_from(":b").is_err()); assert!(TimeseriesName::try_from("a:").is_err()); assert!(TimeseriesName::try_from("123").is_err()); + assert!(TimeseriesName::try_from("no:no:no").is_err()); } // Validates that the timeseries_key stability for a sample is stable. diff --git a/oximeter/db/src/query.rs b/oximeter/db/src/query.rs index 6a55d3f5181..c176a83f130 100644 --- a/oximeter/db/src/query.rs +++ b/oximeter/db/src/query.rs @@ -295,6 +295,7 @@ impl SelectQueryBuilder { } } +/// Return the name of the measurements table for a datum type. pub(crate) fn measurement_table_name(ty: DatumType) -> String { format!("measurements_{}", ty.to_string().to_lowercase()) } @@ -334,6 +335,7 @@ pub struct FieldSelector { comparison: Option, } +/// Return the name of the field table for the provided field type. pub(crate) fn field_table_name(ty: FieldType) -> String { format!("fields_{}", ty.to_string().to_lowercase()) } diff --git a/oximeter/db/src/sql/mod.rs b/oximeter/db/src/sql/mod.rs new file mode 100644 index 00000000000..c7e0035b714 --- /dev/null +++ b/oximeter/db/src/sql/mod.rs @@ -0,0 +1,1010 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Run SQL queries against the timeseries database. +//! +//! # Overview +//! +//! `oximeter` collects and stores samples from timeseries. The schema for those +//! samples is defined by applications, using the [`Target`](oximeter::Target) +//! and [`Metric`](oximeter::Metric) traits. Samples from these timeseries are +//! not stored in explicit tables, however. They are "unrolled" into the fields +//! and measurements, which are stored in a table based on their _data type_. +//! For example, `String` fields are stored in the `oximeter.fields_string` +//! table. (See RFD 161 for more details.) +//! +//! This arrangement is flexible and simple, since we can statically define the +//! tables we need, rather than say create a new table for each timeseries +//! schema. However, the drawback of this is that the timeseries data is not +//! easily queried directly. The data is split across many tables, and +//! interleaved with other timeseries, which may not even share a schema. +//! +//! The tools in this module are for making "normal" SQL queries transparently +//! act on the "virtual tables" that are implied by each timeseries. It's +//! effectively a SQL-to-SQL transpiler, converting queries against the +//! timeseries into one or more queries against the actual tables in ClickHouse. + +// Copyright 2023 Oxide Computer Company + +use crate::query::field_table_name; +use crate::query::measurement_table_name; +use crate::DatumType; +use crate::Error as OxdbError; +use crate::FieldType; +use crate::TimeseriesName; +use crate::TimeseriesSchema; +use sqlparser::ast::BinaryOperator; +use sqlparser::ast::Cte; +use sqlparser::ast::Distinct; +use sqlparser::ast::Expr; +use sqlparser::ast::Ident; +use sqlparser::ast::Join; +use sqlparser::ast::JoinConstraint; +use sqlparser::ast::JoinOperator; +use sqlparser::ast::ObjectName; +use sqlparser::ast::OrderByExpr; +use sqlparser::ast::Query; +use sqlparser::ast::Select; +use sqlparser::ast::SelectItem; +use sqlparser::ast::SetExpr; +use sqlparser::ast::Statement; +use sqlparser::ast::TableAlias; +use sqlparser::ast::TableFactor; +use sqlparser::ast::TableWithJoins; +use sqlparser::ast::Value; +use sqlparser::ast::With; +use sqlparser::dialect::AnsiDialect; +use sqlparser::dialect::Dialect; +use sqlparser::parser::Parser; +use sqlparser::parser::ParserError; +use std::collections::BTreeMap; +use std::collections::BTreeSet; +use std::ops::ControlFlow; +use std::sync::OnceLock; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("SQL parsing error")] + Parser(#[from] ParserError), + + #[error("Unsupported SQL: {0}")] + UnsupportedSql(&'static str), + + #[error("Unsupported function: '{func}'")] + UnsupportedFunction { func: String }, + + #[error("Invalid column '{name}' for timeseries '{timeseries_name}'")] + InvalidColumn { name: String, timeseries_name: String }, + + #[error( + "Table name '{table_name}' in select query does not match \ + timeseries name '{timeseries_name}'" + )] + TableInSelectIsNotTimeseries { table_name: String, timeseries_name: String }, + + #[error("Invalid timeseries name: '{name}'")] + InvalidTimeseriesName { name: String }, +} + +/// The oximeter timeseries SQL dialect. +#[derive(Clone, Copy, Debug, PartialEq)] +pub struct OxdbDialect; + +impl Dialect for OxdbDialect { + fn is_identifier_start(&self, ch: char) -> bool { + AnsiDialect {}.is_identifier_start(ch) + } + + fn is_identifier_part(&self, ch: char) -> bool { + AnsiDialect {}.is_identifier_part(ch) || ch == ':' + } +} + +/// A SQL statement that is probably supported. +/// +/// There's a big range of statements that are not supported. This is guaranteed +/// to be a single select statement, where all the items being selected FROM +/// are: +/// +/// - concrete tables that could be timeseries (valid names) +/// - a subquery against a restricted query +#[derive(Clone, Debug)] +pub struct RestrictedQuery { + query: Query, + timeseries: Vec, +} + +impl std::fmt::Display for RestrictedQuery { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.query) + } +} + +macro_rules! unsupported { + ($msg:literal) => { + Err(OxdbError::from(Error::UnsupportedSql($msg))) + }; +} + +impl RestrictedQuery { + /// Construct a new restricted query. + pub fn new(sql: impl AsRef) -> Result { + let sql = sql.as_ref(); + let statements = + Parser::parse_sql(&OxdbDialect, sql).map_err(Error::from)?; + if statements.len() != 1 { + return unsupported!("Only a single SQL statement is supported"); + } + + let statement = statements.into_iter().next().unwrap(); + let Statement::Query(mut query) = statement else { + return unsupported!("Statement must be a SELECT query"); + }; + + // Walk the AST before doing any real processing or transformation, and + // validate any function calls are on the allow-list. + let maybe_denied_function = + sqlparser::ast::visit_expressions(&query, |expr| { + if let Expr::Function(func) = expr { + if let Some(name) = func.name.0.first() { + if !function_allow_list().contains(name.value.as_str()) + { + return ControlFlow::Break(name.value.clone()); + } + } + } + ControlFlow::Continue(()) + }); + if let ControlFlow::Break(func) = maybe_denied_function { + return Err(OxdbError::from(Error::UnsupportedFunction { func })); + }; + + let timeseries = Self::process_query(&mut query)?; + Ok(Self { query: *query, timeseries }) + } + + /// Return the timeseries that this query refers to. + pub fn timeseries(&self) -> &[TimeseriesName] { + &self.timeseries + } + + /// Convert the original SQL into a query specifically for the `oximeter` + /// timeseries table organization. + pub fn to_oximeter_sql( + &self, + timeseries_schema: &BTreeMap, + ) -> Result { + // Don't need to walk the AST, need a CTE at the beginning with each + // timeseries table. + self.generate_timeseries_ctes(×eries_schema).map(|cte_tables| { + let query = self.query.clone(); + if cte_tables.is_empty() { + query + } else { + Query { + with: Some(With { recursive: false, cte_tables }), + ..query + } + } + }) + } + + // For each timeseries named in `self`, generate a CTE that creates the + // virtual table for that timeseries by joining all its component parts. + fn generate_timeseries_ctes( + &self, + timeseries_schema: &BTreeMap, + ) -> Result, OxdbError> { + let mut ctes = Vec::with_capacity(self.timeseries.len()); + for timeseries in self.timeseries.iter() { + let schema = + timeseries_schema.get(timeseries).ok_or_else(|| { + OxdbError::TimeseriesNotFound( + timeseries.as_str().to_owned(), + ) + })?; + ctes.push(Self::build_timeseries_cte(schema)); + } + Ok(ctes) + } + + // Given a timeseries schema, return a CTE which generates the equivalent + // virtual table. + // + // As timeseries samples are ingested, we "unroll" them in various ways, and + // store them in a set of normalized tables. These contain the _fields_ (on + // table per field data type) and the measurements (one table per + // measurement data type). This method reverses that process, creating a + // single, virtual table that represents all samples from the timeseries + // (plural) of the same schema. + // + // It generates a CTE like so: + // + // ```sql + // WITH {timeseries_name} AS ( + // SELECT + // timeseries_key, + // filter_on_{field_name0}.field_value as {field_name0}, + // filter_on_{field_name1}.field_value as {field_name1}, + // ... + // measurements.timestamp AS timestamp, + // measurements.datum as datum, + // FROM + // ( + // SELECT DINSTINCT timeseries_key, + // field_value + // FROM + // fields_{field_type} + // WHERE + // timeseries_name = '{timeseries_name}' + // AND field_name = '{field_name0} + // ) AS filter_on_{field_name0} + // JOIN ( + // ... select next field table + // ) AS filter_on_{field_name1} ON filter_on_{field_name0}.timeseries_key = filter_on_{field_name1} + // ... + // JOIN ( + // SELECT + // timeseries_key, + // timestamp, + // datum, + // FROM + // measurements_{datum_type} + // WHERE + // timeseries_name = '{timeseries_name}' + // ) AS measurements ON filter_on_fieldN.timeseries_key = measurements.timeseries_key + // ORDER BY + // timeseries_key, + // timestamp + // ) + // ``` + // + // In other words, it should generate a CTE that one can query as if the + // timeseries itself where an actual table in the database, like: + // + // ``` + // timeseries_key | field_name0 | field_name1 | ... | timestamp | datum + // ---------------+-------------+-------------+ ... +-----------+------ + // key0 | field0_0 | field0_1 | ... | t0 | d0 + // key0 | field0_0 | field0_1 | ... | t1 | d1 + // key0 | field0_0 | field0_1 | ... | t2 | d2 + // key0 | field0_0 | field0_1 | ... | t3 | d3 + // ... + // key1 | field1_0 | field1_1 | ... | t0 | d0 + // key1 | field1_0 | field1_1 | ... | t1 | d1 + // key1 | field1_0 | field1_1 | ... | t2 | d2 + // key1 | field1_0 | field1_1 | ... | t3 | d3 + // ... + // ``` + // + // In this case, all rows with `key0` are from the "first" timeseries with + // this schema. `fieldX_Y` indicates the Yth field from timeseries with + // `key0` as its key. + fn build_timeseries_cte(schema: &TimeseriesSchema) -> Cte { + // First build each query against the relevant field tables. + // + // These are the `SELECT DISTINCT ... FROM fields_{field_type}` + // subqueries above. + let mut field_queries = Vec::with_capacity(schema.field_schema.len()); + for field_schema in schema.field_schema.iter() { + let field_query = Self::build_field_query( + &schema.timeseries_name, + &field_schema.name, + &field_schema.ty, + ); + field_queries.push((field_schema.name.as_str(), field_query)); + } + + // Generate the last measurement query, the last subquery in the main + // CTE. + let measurement_query = Self::build_measurement_query( + &schema.timeseries_name, + &schema.datum_type, + ); + + // The "top-level" columns are the columns outputted by the CTE itself. + // + // These are the aliased columns of the full, reconstructed table + // representing the timeseries. This makes the timeseries_key available, + // as well as each field aliased to the actual field name, and the + // measurements. + let mut top_level_projections = + Vec::with_capacity(field_queries.len() + 2); + + // Create the projection of the top-level timeseries_key. + // + // This is taken from the first field, which always exists, since + // timeseries have at least one field. This creates the expression: + // `filter_{field_name}.timeseries_key AS timeseries_key` + let timeseries_key_projection = SelectItem::ExprWithAlias { + expr: Expr::CompoundIdentifier(vec![ + Self::field_subquery_alias(field_queries[0].0), + Self::str_to_ident("timeseries_key"), + ]), + alias: Self::str_to_ident("timeseries_key"), + }; + top_level_projections.push(timeseries_key_projection); + + // We'll build a big `TableWithJoins` to express the entire JOIN + // operation between all fields and the measurements. This is the "meat" + // of the CTE for this timeseries, joining the constituent records into + // the virtual table for this schema. + // + // We select first from the subquery specifying the first field query. + let mut cte_from = TableWithJoins { + relation: TableFactor::Derived { + lateral: false, + subquery: Self::select_to_query(field_queries[0].1.clone()), + alias: Some(TableAlias { + name: Self::field_subquery_alias(field_queries[0].0), + columns: vec![], + }), + }, + joins: Vec::with_capacity(field_queries.len()), + }; + + // For all field queries, create a projection for the field_value, + // aliased as the field name. + let field_queries: Vec<_> = field_queries.into_iter().collect(); + for (i, (field_name, query)) in field_queries.iter().enumerate() { + // Select the field_value from this field query, renaming it to the + // actual field name. + let projection = SelectItem::ExprWithAlias { + expr: Expr::CompoundIdentifier(vec![ + Self::field_subquery_alias(field_name), + Self::str_to_ident("field_value"), + ]), + alias: Self::str_to_ident(field_name), + }; + top_level_projections.push(projection); + + // We've inserted the first subquery as the `from.relation` field in + // the main CTE we're building. We need to skip that one, even + // though we added its aliased `field_value` column to the top level + // projections. + // + // Any additional field subqueries are inserted in the JOIN portion + // of the CTE. + if i == 0 { + continue; + } + let relation = TableFactor::Derived { + lateral: false, + subquery: Self::select_to_query(query.clone()), + alias: Some(TableAlias { + name: Self::field_subquery_alias(field_name), + columns: vec![], + }), + }; + + // The join is always INNER, and is on the timeseries_key only. + // ClickHouse does not support `USING ` when using multiple + // JOINs simultaneously, so we always write this as an `ON` + // constraint, between the previous field subquery and this one. + // + // I.e., `ON filter_foo.timeseries_key = filter_bar.timeseries_key` + let last_field_name = &field_queries[i - 1].0; + let constraints = Expr::BinaryOp { + left: Box::new(Expr::CompoundIdentifier(vec![ + Self::field_subquery_alias(last_field_name), + Self::str_to_ident("timeseries_key"), + ])), + op: BinaryOperator::Eq, + right: Box::new(Expr::CompoundIdentifier(vec![ + Self::field_subquery_alias(field_name), + Self::str_to_ident("timeseries_key"), + ])), + }; + let join_operator = + JoinOperator::Inner(JoinConstraint::On(constraints)); + cte_from.joins.push(Join { relation, join_operator }); + } + + // Finally, we need to project and join in the measurements table. + let datum_columns = Self::datum_type_to_columns(&schema.datum_type); + for col in datum_columns.iter() { + let projection = SelectItem::ExprWithAlias { + expr: Expr::CompoundIdentifier(vec![ + Self::str_to_ident("measurements"), + Self::str_to_ident(col), + ]), + alias: Self::str_to_ident(col), + }; + top_level_projections.push(projection); + } + let relation = TableFactor::Derived { + lateral: false, + subquery: Self::select_to_query(measurement_query), + alias: Some(TableAlias { + name: Self::str_to_ident("measurements"), + columns: vec![], + }), + }; + let constraints = Expr::BinaryOp { + left: Box::new(Expr::CompoundIdentifier(vec![ + Self::field_subquery_alias( + &schema.field_schema.last().unwrap().name, + ), + Self::str_to_ident("timeseries_key"), + ])), + op: BinaryOperator::Eq, + right: Box::new(Expr::CompoundIdentifier(vec![ + Self::str_to_ident("measurements"), + Self::str_to_ident("timeseries_key"), + ])), + }; + let join_operator = + JoinOperator::Inner(JoinConstraint::On(constraints)); + cte_from.joins.push(Join { relation, join_operator }); + + // To build the real virtual table for all the timeseries, we really + // need to sort the samples as if they were inserted into the table + // itself. ClickHouse partitions the tables dynamically since we're + // using a MergeTree engine, which groups and repacks rows in the + // background. + // + // We'll impose a consistent sorting order here. If one does not include + // this, results are inconsistent, since the different data parts of the + // measurements tables are not read in order every time. + let order_by = top_level_projections + .iter() + .filter_map(|proj| { + let SelectItem::ExprWithAlias { alias, .. } = &proj else { + unreachable!(); + }; + if alias.value == "timeseries_key" + || alias.value == "start_time" + || alias.value == "timestamp" + { + Some(OrderByExpr { + expr: Expr::Identifier(alias.clone()), + asc: None, + nulls_first: None, + }) + } else { + None + } + }) + .collect(); + + // We now have all the subqueries joined together, plus the columns + // we're projecting from that join result. We need to build the final + // CTE that represents the full virtual timeseries table. + let alias = TableAlias { + name: Ident { + value: schema.timeseries_name.to_string(), + quote_style: Some('`'), + }, + columns: vec![], + }; + let top_level_select = Select { + distinct: None, + top: None, + projection: top_level_projections, + into: None, + from: vec![cte_from], + lateral_views: vec![], + selection: None, + group_by: vec![], + cluster_by: vec![], + distribute_by: vec![], + sort_by: vec![], + having: None, + named_window: vec![], + qualify: None, + }; + let mut query = Self::select_to_query(top_level_select); + query.order_by = order_by; + Cte { alias, query, from: None } + } + + // Create a SQL parser `Ident` with a the given name. + fn str_to_ident(s: &str) -> Ident { + Ident { value: s.to_string(), quote_style: None } + } + + // Return an `Ident` alias for a subquery of a specific field table. + // + // E.g., the `filter_on_foo` in `(SELECT DISTINCT ... ) AS filter_on_foo`. + fn field_subquery_alias(field_name: &str) -> Ident { + Self::str_to_ident(format!("filter_on_{field_name}").as_str()) + } + + // Return the required measurement columns for a specific datum type. + // + // Scalar measurements have only a timestamp and datum. Cumulative counters + // have those plus a start_time. And histograms have those plus the bins. + fn datum_type_to_columns( + datum_type: &DatumType, + ) -> &'static [&'static str] { + if datum_type.is_histogram() { + &["start_time", "timestamp", "bins", "counts"] + } else if datum_type.is_cumulative() { + &["start_time", "timestamp", "datum"] + } else { + &["timestamp", "datum"] + } + } + + fn select_to_query(select: Select) -> Box { + Box::new(Query { + with: None, + body: Box::new(SetExpr::Select(Box::new(select))), + order_by: vec![], + limit: None, + offset: None, + fetch: None, + locks: vec![], + }) + } + + // Build a single subquery which selects the unique fields with the provided + // name. E.g., this creates: + // + // ```sql + // SELECT DISTINCT timeseries_key, + // field_value + // FROM + // fields_{field_type} + // WHERE + // timeseries_name = '{timeseries_name}' + // AND field_name = '{field_name}' + // ``` + fn build_field_query( + timeseries_name: &TimeseriesName, + field_name: &str, + field_type: &FieldType, + ) -> Select { + // FROM fields_{field_type} + let from = TableWithJoins { + relation: TableFactor::Table { + name: ObjectName(vec![Self::str_to_ident(&field_table_name( + *field_type, + ))]), + alias: None, + args: None, + with_hints: vec![], + }, + joins: vec![], + }; + + // SELECT timeseries_key, field_value + let projection = vec![ + SelectItem::UnnamedExpr(Expr::Identifier(Self::str_to_ident( + "timeseries_key", + ))), + SelectItem::UnnamedExpr(Expr::Identifier(Self::str_to_ident( + "field_value", + ))), + ]; + + // WHERE timeseries_name = '{timeseries_name}' AND field_name = '{field_name}' + let selection = Some(Expr::BinaryOp { + left: Box::new(Expr::BinaryOp { + left: Box::new(Expr::Identifier(Self::str_to_ident( + "timeseries_name", + ))), + op: BinaryOperator::Eq, + right: Box::new(Expr::Value(Value::SingleQuotedString( + timeseries_name.to_string(), + ))), + }), + op: BinaryOperator::And, + right: Box::new(Expr::BinaryOp { + left: Box::new(Expr::Identifier(Self::str_to_ident( + "field_name", + ))), + op: BinaryOperator::Eq, + right: Box::new(Expr::Value(Value::SingleQuotedString( + field_name.to_string(), + ))), + }), + }); + + Select { + distinct: Some(Distinct::Distinct), + top: None, + projection, + into: None, + from: vec![from], + lateral_views: vec![], + selection, + group_by: vec![], + cluster_by: vec![], + distribute_by: vec![], + sort_by: vec![], + having: None, + named_window: vec![], + qualify: None, + } + } + + // Build a single subquery which selects the measurements with the provided + // name. E.g., this creates: + // + // ```sql + // SELECT + // timeseries_key, + // timestamp, + // datum + // FROM + // measurements_{datum_type} + // WHERE + // timeseries_name = '{timeseries_name}' + // ``` + fn build_measurement_query( + timeseries_name: &TimeseriesName, + datum_type: &DatumType, + ) -> Select { + // FROM measurements_{datum_type} + let from = TableWithJoins { + relation: TableFactor::Table { + name: ObjectName(vec![Self::str_to_ident( + &measurement_table_name(*datum_type), + )]), + alias: None, + args: None, + with_hints: vec![], + }, + joins: vec![], + }; + + // SELECT timeseries_key, timestamp, [datum type columns] + let mut projection = vec![SelectItem::UnnamedExpr(Expr::Identifier( + Self::str_to_ident("timeseries_key"), + ))]; + let datum_projection = Self::datum_type_to_columns(datum_type); + projection.extend(datum_projection.iter().map(|name| { + SelectItem::UnnamedExpr(Expr::Identifier(Self::str_to_ident(name))) + })); + + // WHERE timeseries_name = '{timeseries_name}' + let selection = Some(Expr::BinaryOp { + left: Box::new(Expr::Identifier(Self::str_to_ident( + "timeseries_name", + ))), + op: BinaryOperator::Eq, + right: Box::new(Expr::Value(Value::SingleQuotedString( + timeseries_name.to_string(), + ))), + }); + + Select { + distinct: None, + top: None, + projection, + into: None, + from: vec![from], + lateral_views: vec![], + selection, + group_by: vec![], + cluster_by: vec![], + distribute_by: vec![], + sort_by: vec![], + having: None, + named_window: vec![], + qualify: None, + } + } + + // Process a single "table factor", the in `FROM ` to + // extract the names of the timeseries it refers to. + // + // Note this is recursive since we do support basic inner joins. + fn process_table_factor( + relation: &mut TableFactor, + ) -> Result, OxdbError> { + match relation { + TableFactor::Table { ref mut name, args, with_hints, .. } => { + if args.is_some() || !with_hints.is_empty() { + return unsupported!( + "Table functions and hints are not supported" + ); + } + if name.0.len() != 1 { + return unsupported!("Query must select from single named table, with no database"); + } + let timeseries_name = name.0[0] + .value + .parse() + .map(|n| vec![n]) + .map_err(|_| OxdbError::InvalidTimeseriesName)?; + // Rewrite the quote style to be backticks, so that the + // resulting actual query translates into a valid identifier for + // ClickHouse, naming the CTE's well generate later. + name.0[0].quote_style = Some('`'); + Ok(timeseries_name) + } + TableFactor::Derived { lateral: false, subquery, .. } => { + RestrictedQuery::process_query(subquery) + } + _ => { + return unsupported!( + "Query must select from concrete tables or subqueries on them" + ) + } + } + } + + // Process a parsed query, returning the named timeseries that it refers to. + // + // This is the entry-point for our query processing implementation. We take + // a parsed query from `sqlparser`, and extract the virtual tables + // (timeseries names) that we'll need to construct in order to actually run + // it against our database. + fn process_query( + query: &mut Query, + ) -> Result, OxdbError> { + // Some basic checks limiting the scope of the query. + if query.with.is_some() + || query.fetch.is_some() + || !query.locks.is_empty() + { + return unsupported!( + "CTEs, FETCH and LOCKS are not currently supported" + ); + } + let SetExpr::Select(select) = &mut *query.body else { + return unsupported!("Only SELECT queries are currently supported"); + }; + + // For each object we're selecting from (a table factor), process that + // directly, and process any JOINs it also contains. + let mut timeseries = Vec::with_capacity(select.from.len()); + if select.from.len() > 1 { + return unsupported!( + "Query must select from a single named table, with no database" + ); + } + if let Some(from) = select.from.iter_mut().next() { + timeseries.extend(Self::process_table_factor(&mut from.relation)?); + for join in from.joins.iter_mut() { + let JoinOperator::Inner(op) = &join.join_operator else { + return unsupported!( + "JOINS must be INNER or ASOF, and use \ + explicit constraints" + ); + }; + if matches!(op, JoinConstraint::Natural) { + return unsupported!( + "JOINS must be INNER or ASOF, and use \ + explicit constraints" + ); + } + timeseries + .extend(Self::process_table_factor(&mut join.relation)?); + } + } + Ok(timeseries) + } +} + +static CLICKHOUSE_FUNCTION_ALLOW_LIST: OnceLock> = + OnceLock::new(); + +/// Return the set of supported ClickHouse SQL functions. +pub fn function_allow_list() -> &'static BTreeSet<&'static str> { + CLICKHOUSE_FUNCTION_ALLOW_LIST.get_or_init(|| { + let mut out = BTreeSet::new(); + + // Core functions + out.insert("avg"); + out.insert("min"); + out.insert("max"); + out.insert("sum"); + out.insert("count"); + out.insert("now"); + out.insert("first_value"); + out.insert("last_value"); + out.insert("any"); + out.insert("topK"); + out.insert("groupArray"); + out.insert("argMin"); + out.insert("argMax"); + + // To support histograrms, we allow the `-ForEach` combinator functions. + // + // See + // https://clickhouse.com/docs/en/sql-reference/aggregate-functions/combinators#-foreach, + // but briefly, this allows computing the aggregate function across + // corresponding array elements. + out.insert("maxForEach"); + out.insert("minForEach"); + out.insert("sumForEach"); + out.insert("avgForEach"); + + // Type conversions + // + // Note that `cast` itself will be difficult to use, because ClickHouse is + // particular about the capitalization of type names, e.g., it must be + // `cast(x as String)` not `cast(x as STRING)`. + out.insert("toString"); + out.insert("toInt8"); + out.insert("toUInt8"); + out.insert("toInt16"); + out.insert("toUInt16"); + out.insert("toInt32"); + out.insert("toUInt32"); + out.insert("toInt64"); + out.insert("toUInt64"); + out.insert("toFloat32"); + out.insert("toFloat64"); + out.insert("toDate"); + out.insert("toDateTime"); + out.insert("toDateTime64"); + out.insert("toInterval"); + + // Array functions + out.insert("arrayMax"); + out.insert("arrayMin"); + out.insert("arraySum"); + out.insert("arrayAvg"); + out.insert("arrayMap"); + out.insert("arrayReduce"); + out.insert("arrayFilter"); + out.insert("arrayDifference"); + out.insert("indexOf"); + out.insert("length"); + + // Strings. + out.insert("empty"); + out.insert("lower"); + out.insert("upper"); + out.insert("reverse"); + out.insert("concat"); + out.insert("concatWithSeparator"); + out.insert("substring"); + out.insert("endsWith"); + out.insert("startsWith"); + out.insert("splitByChar"); + out.insert("splitByString"); + + // Time. + out.insert("tumble"); + out.insert("toYear"); + out.insert("toQuarter"); + out.insert("toMonth"); + out.insert("toDayOfYear"); + out.insert("toDayOfMonth"); + out.insert("toDayOfWeek"); + out.insert("toDay"); + out.insert("toHour"); + out.insert("toMinute"); + out.insert("toSecond"); + out.insert("toUnixTimestamp"); + out.insert("toStartOfInterval"); + out.insert("date_diff"); + out.insert("date_trunc"); + out.insert("date_add"); + out.insert("date_sub"); + + // Other + out.insert("generateUUIDv4"); + out.insert("rand"); + out.insert("rand64"); + out.insert("runningDifference"); + out + }) +} + +#[cfg(test)] +mod tests { + use super::Error; + use super::OxdbError; + use super::RestrictedQuery; + + #[test] + fn test_function_allow_list() { + assert!(RestrictedQuery::new("SELECT bogus()").is_err()); + assert!(matches!( + RestrictedQuery::new("SELECT bogus()").unwrap_err(), + OxdbError::Sql(Error::UnsupportedFunction { .. }) + )); + assert!(RestrictedQuery::new("SELECT now()").is_ok()); + } + + #[test] + fn test_ctes_are_not_supported() { + assert!(matches!( + RestrictedQuery::new("WITH nono AS (SELECT 1) SELECT * FROM NONO") + .unwrap_err(), + OxdbError::Sql(Error::UnsupportedSql(_)) + )); + } + + #[test] + fn test_multiple_statements_are_not_supported() { + assert!(matches!( + RestrictedQuery::new("SELECT 1; SELECT 2;").unwrap_err(), + OxdbError::Sql(Error::UnsupportedSql(_)) + )); + } + + #[test] + fn test_query_must_be_select_statement() { + for query in [ + "SHOW TABLES", + "DROP TABLE foo", + "CREATE TABLE foo (x Int4)", + "DESCRIBE foo", + "EXPLAIN SELECT 1", + "INSERT INTO foo VALUES (1)", + ] { + let err = RestrictedQuery::new(query).unwrap_err(); + println!("{err:?}"); + assert!(matches!(err, OxdbError::Sql(Error::UnsupportedSql(_)))); + } + } + + #[test] + fn test_cannot_name_database() { + let err = RestrictedQuery::new("SELECT * FROM dbname.a:a").unwrap_err(); + assert!(matches!(err, OxdbError::Sql(Error::UnsupportedSql(_)))); + } + + #[test] + fn test_with_comma_join_fails() { + let err = RestrictedQuery::new("SELECT * FROM a:a, b:b").unwrap_err(); + println!("{err:?}"); + assert!(matches!(err, OxdbError::Sql(Error::UnsupportedSql(_)))); + } + + #[test] + fn test_join_must_be_inner_or_asof() { + // ASOF JOIN is a ClickHouse-ism, and actually only works because we're + // abusing the notation for an alias. The parser believes: + // + // `SELECT foo ASOF JOIN bar` + // + // is really equivalent to: + // + // `SELECT foo AS asof JOIN bar`. + // + // That isn't correct, but ClickHouse will correctly parse the query in + // any case. + let allowed = ["inner", "asof", ""]; + let denied = + ["natural", "cross", "left outer", "right outer", "full outer"]; + for join in allowed.iter() { + RestrictedQuery::new(format!("SELECT * FROM a:a {join} JOIN b:b")) + .unwrap_or_else(|_| { + panic!("Should be able to use join type '{join}'") + }); + } + for join in denied.iter() { + let sql = format!("SELECT * FROM a:a {join} JOIN b:b"); + println!("{sql}"); + let err = RestrictedQuery::new(&sql).expect_err( + format!("Should not be able to use join type '{join}'") + .as_str(), + ); + println!("{err:?}"); + assert!(matches!(err, OxdbError::Sql(Error::UnsupportedSql(_)))); + } + } + + #[test] + fn test_allow_limit_offset() { + let sql = "SELECT * FROM a:b LIMIT 10 OFFSET 10;"; + println!("{sql}"); + RestrictedQuery::new(&sql) + .expect("Should be able to use LIMIT / OFFSET queries"); + } + + #[test] + fn test_require_table_is_timeseries_name() { + assert!(RestrictedQuery::new("SELECT * FROM a:b").is_ok()); + let bad = ["table", "db.table", "no:no:no"]; + for each in bad.iter() { + let sql = format!("SELECT * FROM {each}"); + RestrictedQuery::new(&sql) + .expect_err("Should have validated timeseries name"); + } + } + + #[test] + fn test_allow_subqueries() { + assert!(RestrictedQuery::new("SELECT * FROM (SELECT 1);").is_ok()); + } +} diff --git a/oximeter/db/src/sql/virtual_table.rs b/oximeter/db/src/sql/virtual_table.rs new file mode 100644 index 00000000000..535dfb64e8f --- /dev/null +++ b/oximeter/db/src/sql/virtual_table.rs @@ -0,0 +1,380 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +// Copyright 2023 Oxide Computer Company + +use crate::sql::Error; +use crate::TimeseriesName; +use crate::TimeseriesSchema; +use sqlparser::ast::Expr; +use sqlparser::ast::Ident; +use sqlparser::ast::Query; +use sqlparser::ast::Select; +use sqlparser::ast::SelectItem; +use sqlparser::ast::SetExpr; +use sqlparser::ast::Statement; +use sqlparser::ast::TableFactor; +use sqlparser::ast::WildcardAdditionalOptions; +use sqlparser::dialect::AnsiDialect; +use sqlparser::dialect::Dialect; +use sqlparser::parser::Parser; + +// I think just iterate through any SELECT query +// +// - Find any concrete table +// - Verify that it is a timeseries name +// - Insert the full JOIN that builds the virtual table. I think this needs to +// be reinserted as a SetExpr::Query(Box(the_new_query)). +// - Keep both original SQL and the translated (as a new query?) + + +/// Validation that can be done independent of the timeseries schema. +#[derive(Clone, Debug)] +pub struct ParsedQuery { + sql: String, + query: Query, + timeseries_name: TimeseriesName, +} + +impl ParsedQuery { + pub fn new(sql: impl AsRef) -> Result { + let sql = sql.as_ref(); + let statements = Parser::parse_sql(&OxdbDialect, sql)?; + if statements.len() != 1 { + return unsupported!("Only a single SQL statement is supported"); + } + let statement = statements.into_iter().next().unwrap(); + let Statement::Query(query) = statement else { + return unsupported!("Statement must be a SELECT query"); + }; + if query.with.is_some() + || !query.order_by.is_empty() + || query.offset.is_some() + || query.limit.is_some() + || query.fetch.is_some() + || !query.locks.is_empty() + { + return unsupported!( + "CTEs, ORDER BY, OFFSET/LIMIT, FETCH and LOCKS \ + are not currently supported" + ); + } + let SetExpr::Select(select) = &*query.body else { + return unsupported!("Only SELECT queries are currently supported"); + }; + if select.from.len() != 1 { + return unsupported!("Query must select from a single timeseries"); + } + let from = &select.from[0]; + if !from.joins.is_empty() { + return unsupported!("Joins are not supported"); + } + let TableFactor::Table { name, alias, args, with_hints } = &from.relation else { + return unsupported!("Query must select from a single named table"); + }; + if alias.is_some() || args.is_some() || !with_hints.is_empty() { + return unsupported!( + "Table aliases, functions, or hints are not supported" + ); + } + if name.0.len() != 1 { + return unsupported!("Query must select from a single named table"); + } + let timeseries_name = + name.0[0].value.as_str().parse().map_err(|_| { + Error::InvalidTimeseriesName { name: name.0[0].value.clone() } + })?; + Ok(Self { sql: sql.to_string(), query: *query, timeseries_name }) + } + + fn select(&self) -> &Select { + let SetExpr::Select(select) = &*self.query.body else { + unreachable!(); + }; + select + } +} + +/// Validated to be an acceptable query against the virtual table implied by the +/// timeseries. +#[derive(Clone, Debug)] +pub struct VirtualTableQuery { + schema: TimeseriesSchema, + parsed_query: ParsedQuery, +} + +impl VirtualTableQuery { + pub fn new( + schema: &TimeseriesSchema, + sql: impl AsRef, + ) -> Result { + let parsed_query = ParsedQuery::new(sql)?; + + // The table must be just the timeseries name right now. + let timeseries_name = schema.timeseries_name.as_str(); + if parsed_query.timeseries_name != timeseries_name { + return Err(Error::TableInSelectIsNotTimeseries { + table_name: parsed_query.timeseries_name.to_string(), + timeseries_name: timeseries_name.to_string(), + }); + } + + // Selection (WHERE) clauses + // + // Basically just WHERE field_name field_value for now. + // + // I think the result of this parsing needs to tell us which part of the + // query the filter applies to. In other words, can this be pushed down + // into the individual table query, or is it on the resulting joined + // table? + // + // E.g. + // + // where name = 'a' or name = 'b' + // + // Could be pushed into an individual field query like this: + // + // select ... from fields_string where field_name = 'name' and + // field_value = 'a' or field_value = 'b' + // + // Where as this: + // + // where name = 'a' or (name = 'b' and count > 0) + // + // Could not be pushed. That would have to be done like this: + // + // select ... + // from fields_string + // where name = 'a' + // + // + // Wait...I don't think this can be expressed as an inner join. It's not + // a join, it's a union. + // + // a ^ b -> a inner join b on timeseries_key + // a v b -> a union b. + // + // Ok. Fuckit. What if we build _the entire virtual table_ by joining + // all the relevant rows on the timeseries key, and do all the actual + // work of the query on that joined in-memory result? + // + // I.e., + // + // with `timeseries:name` as ( + // select key, fields_1.field_value as field_name, fields_2.field_value as field_name, ... + // from ( + // select timeseries_key, field_value as `field_name` from fields_1 inner join + // select timeseries_key, field_value as `field_name` from fields_2 on (fields_1.timeseries_key = fields_2.timeseries_key) inner join + // select timeseries_key, field_value as `field_name` from fields_3 on (fields_2.timeseries_key = fields_3.timeseries_key) inner join + // ... + // select timeseries_key, timestamp, value from measurements inner join fields_n on (fields_n.timeseries_key = meas.timeseries_key) + // ) + // ) + // . + // + // This basically means we can support any operation right now. It's + // also stupid inefficient, since it reads the entire table (restricted + // to the timeseries _schema_, at least). + // + // OTOH, ClickHouse doesn't really seem to avoid that no matter _how_ + // you filter things, it's always reading the entire fucking table it + // seems. + // + // This also means to make things more efficient, we'd want to _not_ + // remove support for other queries. Which would be hard, since you'd + // need to do the logic I mention above, of figuring out what can be + // pushed down and what can't. + // + // So, tradeoffs + // + // - support basically any SQL now, so we'd want to avoid removing that + // support in the future. + // - easy to write + // - really inefficient, may run into memory limitations. + // + Ok(Self { schema: schema.clone(), parsed_query }) + } +} + +#[cfg(test)] +mod tests { + use super::Error; + use super::TimeseriesSchema; + use super::VirtualTableQuery; + use chrono::Utc; + use oximeter::DatumType; + + #[test] + fn test_virtual_table_query_no_fields() { + let schema = TimeseriesSchema { + timeseries_name: "foo:bar".parse().unwrap(), + field_schema: vec![], + datum_type: DatumType::I64, + created: Utc::now(), + }; + let sql = "select * from \"foo:bar\" where timestamp > now()"; + VirtualTableQuery::new(&schema, sql).unwrap(); + + let sql = "select timestamp from \"foo:bar\" where timestamp > now()"; + VirtualTableQuery::new(&schema, sql).unwrap(); + + let sql = "select timestamp, timeseries_key, value from \"foo:bar\" where timestamp > now()"; + VirtualTableQuery::new(&schema, sql).unwrap(); + + let sql = "select * from \"foo:baz\" where timestamp > now()"; + assert!(matches!( + VirtualTableQuery::new(&schema, sql).unwrap_err(), + Error::TableInSelectIsNotTimeseries { .. } + )); + } +} + +/* + * WITH `http_service:request_latency_histogram` AS + ( + SELECT + filter_name.timeseries_key AS timeseries_key, + filter_name.field_value AS name, + filter_method.field_value AS method, + filter_route.field_value AS route, + filter_status_code.field_value AS status_code, + meas.start_time AS start_time, + meas.timestamp AS timestamp + FROM + ( + SELECT + timeseries_key, + field_value + FROM fields_string + WHERE (timeseries_name = 'http_service:request_latency_histogram') AND (field_name = 'method') + ) AS filter_method + INNER JOIN + ( + SELECT + timeseries_key, + field_value + FROM fields_string + WHERE (timeseries_name = 'http_service:request_latency_histogram') AND (field_name = 'name') + ) AS filter_name ON filter_method.timeseries_key = filter_name.timeseries_key + INNER JOIN + ( + SELECT + timeseries_key, + field_value + FROM fields_string + WHERE (timeseries_name = 'http_service:request_latency_histogram') AND (field_name = 'route') + ) AS filter_route ON filter_name.timeseries_key = filter_route.timeseries_key + INNER JOIN + ( + SELECT + timeseries_key, + field_value + FROM fields_i64 + WHERE (timeseries_name = 'http_service:request_latency_histogram') AND (field_name = 'status_code') + ) AS filter_status_code ON filter_route.timeseries_key = filter_status_code.timeseries_key + INNER JOIN + ( + SELECT + timeseries_key, + start_time, + timestamp + FROM measurements_histogramf64 + WHERE timeseries_name = 'http_service:request_latency_histogram' + ) AS meas ON filter_status_code.timeseries_key = meas.timeseries_key + ) +SELECT * +FROM `http_service:request_latency_histogram` +WHERE (route = '/v1/disks') AND (status_code > 200) AND ((timestamp >= '2023-07-27 08:05:00') AND (timestamp <= '2023-07-27 08:10:00')) +*/ + +// So if do this, we can generate it directly from the schema, without any +// reference to the SQL really. Or, we just generate the WITH `timeseries_name` +// as (...... massive join ....) +// `. We don't even need to parse it..., just splat +// it in the end. +// +// Well, we probably do need to parse it to make sure there are no CTEs of their +// own. But they _could_ issue joins across timeseries, I guess. +// +// select * from ts0 asof join ts1 on timestamp, kinda thing. +// +// subqueries? +// +// select * from (select timestamp from ts0) as ts0 join (select timestamp from +// ts1) on timestamp; +// +// For every _timeseries_ mentioned in the main query, generate a CTE for it +// with that name +// +// with +// ts0 as (...), +// ts1 as (...) +// select ts0 asof join ts1 on timestamp; +// +// +// +// So, there's a couple of obvious efficiencies. +// +// - If the projection doesn't include anything from the measurements table, +// then we don't need to join. I'm not sure whether we want to "replicate" the +// actual projected rows to match what one _would_ get if one selected the full +// table though. I.e., are we always doing `select distinct field_name...`? +// +// - I think we should be able to rewrite the join to put the meas table first, +// which from my reading should be slightly more performant. +// +// I think there might be a few good initial optimizations. +// +// - If query refers just to measurements or fields... +// +// select min(timestamp) from foo:bar; This seems really useless. +// select f0, f1 from foo:bar; Also seems useless? +// +// select distinct seems useful against just the fields, kind of listing the +// actual, concrete timeseries +// +// - We should probably split any real query into two, just like we currently +// do: field query to get the keys, and then the big join that generates the +// virtual table, but limited to those keys: +// +// select (timeseries_key from (all fields joined)) where (field_filters); +// +// <- get back keys +// +// select (big join mess, limited to keys on each table) inner join measurements +// (limited to keys) on timeseries_key +// +// - Maybe reduce memory usage limit for every query, to avoid bad shit? +// +// +// - What do we do with queries like: +// +// select * from foo:bar where (f0 = 0 and timestamp < t0) or (f1 = 0 and +// timestamp > t0) +// +// I think the _only_ way you can express this is the full join followed by +// filtering. Although we can still pull out those field queries first, and then +// rewrite the join. +// +// -- +// +// Does this make the likely things easy? +// +// - accumulate / group by: select f0, f1, ..., max(datum) from cumulative_ts +// group by f0, f1, ...; +// +// It's actualy hard-ish, or at least annoying to write out. I don't know how +// that will perform, but probably badly. Could rewrite it differently maybe. +// +// with max_datum as (select timeseries_name, timeseries_key, max(timestamp), +// max(datum)) from meas where timeseries_name = 'a:b' group by +// timeseries_name, timeseries_key) But how do we get start_time? Some OVER +// clause? +// +// +// The short answer is, there are tons of opportunities for improvement. +// +// Another one, if we do a query that doesn't actually depend on the details of +// the timeseries, e.g., select count() from timeseries, don't do the join, do +// something else. diff --git a/oximeter/db/test-output/sql/00/query.sql b/oximeter/db/test-output/sql/00/query.sql new file mode 100644 index 00000000000..e0ac49d1ecf --- /dev/null +++ b/oximeter/db/test-output/sql/00/query.sql @@ -0,0 +1 @@ +SELECT 1; diff --git a/oximeter/db/test-output/sql/00/result.txt b/oximeter/db/test-output/sql/00/result.txt new file mode 100644 index 00000000000..925e298e866 --- /dev/null +++ b/oximeter/db/test-output/sql/00/result.txt @@ -0,0 +1,10 @@ +{ + "column_names": [ + "1" + ], + "rows": [ + [ + 1 + ] + ] +} \ No newline at end of file diff --git a/oximeter/db/test-output/sql/01/query.sql b/oximeter/db/test-output/sql/01/query.sql new file mode 100644 index 00000000000..f3e5549e7f8 --- /dev/null +++ b/oximeter/db/test-output/sql/01/query.sql @@ -0,0 +1 @@ +SELECT 1 + 1 AS total; diff --git a/oximeter/db/test-output/sql/01/result.txt b/oximeter/db/test-output/sql/01/result.txt new file mode 100644 index 00000000000..ee17f9993e6 --- /dev/null +++ b/oximeter/db/test-output/sql/01/result.txt @@ -0,0 +1,10 @@ +{ + "column_names": [ + "total" + ], + "rows": [ + [ + 2 + ] + ] +} \ No newline at end of file diff --git a/oximeter/db/test-output/sql/02/query.sql b/oximeter/db/test-output/sql/02/query.sql new file mode 100644 index 00000000000..cd16a883aa3 --- /dev/null +++ b/oximeter/db/test-output/sql/02/query.sql @@ -0,0 +1 @@ +SELECT count() FROM service:request_latency WHERE route = '/a'; diff --git a/oximeter/db/test-output/sql/02/result.txt b/oximeter/db/test-output/sql/02/result.txt new file mode 100644 index 00000000000..7bae246ae86 --- /dev/null +++ b/oximeter/db/test-output/sql/02/result.txt @@ -0,0 +1,10 @@ +{ + "column_names": [ + "count()" + ], + "rows": [ + [ + 12 + ] + ] +} \ No newline at end of file diff --git a/oximeter/db/test-output/sql/03/query.sql b/oximeter/db/test-output/sql/03/query.sql new file mode 100644 index 00000000000..9d043eda5b4 --- /dev/null +++ b/oximeter/db/test-output/sql/03/query.sql @@ -0,0 +1,4 @@ +SELECT + count() AS total +FROM service:request_latency +GROUP BY name, id, route, method, status_code; diff --git a/oximeter/db/test-output/sql/03/result.txt b/oximeter/db/test-output/sql/03/result.txt new file mode 100644 index 00000000000..246b8a224ee --- /dev/null +++ b/oximeter/db/test-output/sql/03/result.txt @@ -0,0 +1,43 @@ +{ + "column_names": [ + "total" + ], + "rows": [ + [ + 2 + ], + [ + 2 + ], + [ + 2 + ], + [ + 2 + ], + [ + 2 + ], + [ + 2 + ], + [ + 2 + ], + [ + 2 + ], + [ + 2 + ], + [ + 2 + ], + [ + 2 + ], + [ + 2 + ] + ] +} \ No newline at end of file diff --git a/oximeter/db/test-output/sql/04/query.sql b/oximeter/db/test-output/sql/04/query.sql new file mode 100644 index 00000000000..a276475a0dc --- /dev/null +++ b/oximeter/db/test-output/sql/04/query.sql @@ -0,0 +1,5 @@ +SELECT + timeseries_key, + count() AS total +FROM service:request_latency +GROUP BY timeseries_key; diff --git a/oximeter/db/test-output/sql/04/result.txt b/oximeter/db/test-output/sql/04/result.txt new file mode 100644 index 00000000000..4eca1fd93df --- /dev/null +++ b/oximeter/db/test-output/sql/04/result.txt @@ -0,0 +1,56 @@ +{ + "column_names": [ + "timeseries_key", + "total" + ], + "rows": [ + [ + 1249464505628069370, + 2 + ], + [ + 1201872630192423018, + 2 + ], + [ + 1490072383288995413, + 2 + ], + [ + 4845785484328932020, + 2 + ], + [ + 16162802647654680800, + 2 + ], + [ + 9308844330114997943, + 2 + ], + [ + 5233273748839477731, + 2 + ], + [ + 12759963114254845848, + 2 + ], + [ + 8677807063017961056, + 2 + ], + [ + 17069599562714970297, + 2 + ], + [ + 1477351355909737762, + 2 + ], + [ + 16473879070749258520, + 2 + ] + ] +} \ No newline at end of file diff --git a/oximeter/oximeter/src/types.rs b/oximeter/oximeter/src/types.rs index 0cc3299ec40..cddd956c5a3 100644 --- a/oximeter/oximeter/src/types.rs +++ b/oximeter/oximeter/src/types.rs @@ -329,6 +329,11 @@ impl DatumType { | DatumType::HistogramF64 ) } + + /// Return `true` if this datum type is a histogram, and `false` otherwise. + pub const fn is_histogram(&self) -> bool { + matches!(self, DatumType::HistogramF64 | DatumType::HistogramI64) + } } impl std::fmt::Display for DatumType { diff --git a/wicket-dbg/Cargo.toml b/wicket-dbg/Cargo.toml index d546c41e44f..9b4ea49c9dd 100644 --- a/wicket-dbg/Cargo.toml +++ b/wicket-dbg/Cargo.toml @@ -21,7 +21,7 @@ tokio = { workspace = true, features = ["full"] } wicket.workspace = true # used only by wicket-dbg binary -reedline = "0.25.0" +reedline.workspace = true omicron-workspace-hack.workspace = true [[bin]] diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 621f1a8a998..48dc9c4d3e3 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -31,7 +31,6 @@ console = { version = "0.15.7" } const-oid = { version = "0.9.5", default-features = false, features = ["db", "std"] } crossbeam-epoch = { version = "0.9.15" } crossbeam-utils = { version = "0.8.16" } -crossterm = { version = "0.27.0", features = ["event-stream", "serde"] } crypto-common = { version = "0.1.6", default-features = false, features = ["getrandom", "std"] } diesel = { version = "2.1.3", features = ["chrono", "i-implement-a-third-party-backend-and-opt-into-breaking-changes", "network-address", "postgres", "r2d2", "serde_json", "uuid"] } digest = { version = "0.10.7", features = ["mac", "oid", "std"] } @@ -61,6 +60,7 @@ libc = { version = "0.2.150", features = ["extra_traits"] } log = { version = "0.4.20", default-features = false, features = ["std"] } managed = { version = "0.8.0", default-features = false, features = ["alloc", "map"] } memchr = { version = "2.6.3" } +nom = { version = "7.1.3" } num-bigint = { version = "0.4.4", features = ["rand"] } num-integer = { version = "0.1.45", features = ["i128"] } num-iter = { version = "0.1.43", default-features = false, features = ["i128"] } @@ -125,7 +125,6 @@ console = { version = "0.15.7" } const-oid = { version = "0.9.5", default-features = false, features = ["db", "std"] } crossbeam-epoch = { version = "0.9.15" } crossbeam-utils = { version = "0.8.16" } -crossterm = { version = "0.27.0", features = ["event-stream", "serde"] } crypto-common = { version = "0.1.6", default-features = false, features = ["getrandom", "std"] } diesel = { version = "2.1.3", features = ["chrono", "i-implement-a-third-party-backend-and-opt-into-breaking-changes", "network-address", "postgres", "r2d2", "serde_json", "uuid"] } digest = { version = "0.10.7", features = ["mac", "oid", "std"] } @@ -155,6 +154,7 @@ libc = { version = "0.2.150", features = ["extra_traits"] } log = { version = "0.4.20", default-features = false, features = ["std"] } managed = { version = "0.8.0", default-features = false, features = ["alloc", "map"] } memchr = { version = "2.6.3" } +nom = { version = "7.1.3" } num-bigint = { version = "0.4.4", features = ["rand"] } num-integer = { version = "0.1.45", features = ["i128"] } num-iter = { version = "0.1.43", default-features = false, features = ["i128"] } @@ -207,6 +207,7 @@ zip = { version = "0.6.6", default-features = false, features = ["bzip2", "defla bitflags-f595c2ba2a3f28df = { package = "bitflags", version = "2.4.0", default-features = false, features = ["std"] } hyper-rustls = { version = "0.24.2" } mio = { version = "0.8.8", features = ["net", "os-ext"] } +nix = { version = "0.26.2" } once_cell = { version = "1.18.0", features = ["unstable"] } rustix = { version = "0.38.9", features = ["fs", "termios"] } @@ -214,6 +215,7 @@ rustix = { version = "0.38.9", features = ["fs", "termios"] } bitflags-f595c2ba2a3f28df = { package = "bitflags", version = "2.4.0", default-features = false, features = ["std"] } hyper-rustls = { version = "0.24.2" } mio = { version = "0.8.8", features = ["net", "os-ext"] } +nix = { version = "0.26.2" } once_cell = { version = "1.18.0", features = ["unstable"] } rustix = { version = "0.38.9", features = ["fs", "termios"] } @@ -221,6 +223,7 @@ rustix = { version = "0.38.9", features = ["fs", "termios"] } bitflags-f595c2ba2a3f28df = { package = "bitflags", version = "2.4.0", default-features = false, features = ["std"] } hyper-rustls = { version = "0.24.2" } mio = { version = "0.8.8", features = ["net", "os-ext"] } +nix = { version = "0.26.2" } once_cell = { version = "1.18.0", features = ["unstable"] } rustix = { version = "0.38.9", features = ["fs", "termios"] } @@ -228,6 +231,7 @@ rustix = { version = "0.38.9", features = ["fs", "termios"] } bitflags-f595c2ba2a3f28df = { package = "bitflags", version = "2.4.0", default-features = false, features = ["std"] } hyper-rustls = { version = "0.24.2" } mio = { version = "0.8.8", features = ["net", "os-ext"] } +nix = { version = "0.26.2" } once_cell = { version = "1.18.0", features = ["unstable"] } rustix = { version = "0.38.9", features = ["fs", "termios"] } @@ -235,6 +239,7 @@ rustix = { version = "0.38.9", features = ["fs", "termios"] } bitflags-f595c2ba2a3f28df = { package = "bitflags", version = "2.4.0", default-features = false, features = ["std"] } hyper-rustls = { version = "0.24.2" } mio = { version = "0.8.8", features = ["net", "os-ext"] } +nix = { version = "0.26.2" } once_cell = { version = "1.18.0", features = ["unstable"] } rustix = { version = "0.38.9", features = ["fs", "termios"] } @@ -242,6 +247,7 @@ rustix = { version = "0.38.9", features = ["fs", "termios"] } bitflags-f595c2ba2a3f28df = { package = "bitflags", version = "2.4.0", default-features = false, features = ["std"] } hyper-rustls = { version = "0.24.2" } mio = { version = "0.8.8", features = ["net", "os-ext"] } +nix = { version = "0.26.2" } once_cell = { version = "1.18.0", features = ["unstable"] } rustix = { version = "0.38.9", features = ["fs", "termios"] } @@ -249,6 +255,7 @@ rustix = { version = "0.38.9", features = ["fs", "termios"] } bitflags-f595c2ba2a3f28df = { package = "bitflags", version = "2.4.0", default-features = false, features = ["std"] } hyper-rustls = { version = "0.24.2" } mio = { version = "0.8.8", features = ["net", "os-ext"] } +nix = { version = "0.26.2" } once_cell = { version = "1.18.0", features = ["unstable"] } rustix = { version = "0.38.9", features = ["fs", "termios"] } toml_datetime = { version = "0.6.5", default-features = false, features = ["serde"] } @@ -258,6 +265,7 @@ toml_edit-cdcf2f9584511fe6 = { package = "toml_edit", version = "0.19.15", featu bitflags-f595c2ba2a3f28df = { package = "bitflags", version = "2.4.0", default-features = false, features = ["std"] } hyper-rustls = { version = "0.24.2" } mio = { version = "0.8.8", features = ["net", "os-ext"] } +nix = { version = "0.26.2" } once_cell = { version = "1.18.0", features = ["unstable"] } rustix = { version = "0.38.9", features = ["fs", "termios"] } toml_datetime = { version = "0.6.5", default-features = false, features = ["serde"] }