diff --git a/Cargo.lock b/Cargo.lock index c379dcfbff8..e15d02defd0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -211,6 +211,12 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545" +[[package]] +name = "arrayvec" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" + [[package]] name = "arrayvec" version = "0.7.4" @@ -555,7 +561,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c2f0dc9a68c6317d884f97cc36cf5a3d20ba14ce404227df55e1af708ab04bc" dependencies = [ "arrayref", - "arrayvec", + "arrayvec 0.7.4", "constant_time_eq 0.2.6", ] @@ -991,6 +997,17 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd7cc57abe963c6d3b9d8be5b06ba7c8957a930305ca90304f24ef040aa6f961" +[[package]] +name = "clipboard-win" +version = "4.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7191c27c2357d9b7ef96baac1773290d4ca63b24205b82a3fd8a0637afcf0362" +dependencies = [ + "error-code", + "str-buf", + "winapi", +] + [[package]] name = "cobs" version = "0.2.3" @@ -1245,6 +1262,23 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossterm" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a84cda67535339806297f1b331d6dd6320470d2a0fe65381e79ee9e156dd3d13" +dependencies = [ + "bitflags 1.3.2", + "crossterm_winapi", + "libc", + "mio", + "parking_lot 0.12.1", + "serde", + "signal-hook", + "signal-hook-mio", + "winapi", +] + [[package]] name = "crossterm" version = "0.27.0" @@ -1257,7 +1291,6 @@ dependencies = [ "libc", "mio", "parking_lot 0.12.1", - "serde", "signal-hook", "signal-hook-mio", "winapi", @@ -2149,6 +2182,16 @@ dependencies = [ "libc", ] +[[package]] +name = "error-code" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64f18991e7bf11e7ffee451b5318b5c1a73c52d0d0ada6e5a3017c8c1ced6a21" +dependencies = [ + "libc", + "str-buf", +] + [[package]] name = "expectorate" version = "1.1.0" @@ -2535,7 +2578,7 @@ dependencies = [ "hubpack 0.1.2", "hubtools", "lru-cache", - "nix", + "nix 0.26.2", "once_cell", "paste", "serde", @@ -4251,6 +4294,17 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "nix" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b" +dependencies = [ + "bitflags 1.3.2", + "cfg-if", + "libc", +] + [[package]] name = "nodrop" version = "0.1.14" @@ -4983,7 +5037,6 @@ dependencies = [ "const-oid", "crossbeam-epoch", "crossbeam-utils", - "crossterm", "crypto-common", "der", "diesel", @@ -5020,6 +5073,7 @@ dependencies = [ "managed", "memchr", "mio", + "nom", "num-bigint", "num-integer", "num-iter", @@ -5389,14 +5443,18 @@ dependencies = [ "clap 4.4.3", "dropshot", "expectorate", + "futures", "highway", + "indexmap 2.1.0", "itertools 0.12.0", "omicron-common", "omicron-test-utils", "omicron-workspace-hack", "oximeter", + "reedline", "regex", "reqwest", + "rustyline", "schemars", "serde", "serde_json", @@ -5404,7 +5462,10 @@ dependencies = [ "slog-async", "slog-dtrace", "slog-term", + "sqlformat", + "sqlparser", "strum", + "tabled", "tempfile", "thiserror", "tokio", @@ -6407,7 +6468,7 @@ checksum = "2e2e4cd95294a85c3b4446e63ef054eea43e0205b1fd60120c16b74ff7ff96ad" dependencies = [ "bitflags 2.4.0", "cassowary", - "crossterm", + "crossterm 0.27.0", "indoc 2.0.3", "itertools 0.11.0", "paste", @@ -6497,12 +6558,12 @@ dependencies = [ [[package]] name = "reedline" -version = "0.26.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0a093a20a6c473247c2e9971aaf4cedf9041bcd3f444dc7fad667d3b6b7a5fd" +checksum = "c2fde955d11817fdcb79d703932fb6b473192cb36b6a92ba21f7f4ac0513374e" dependencies = [ "chrono", - "crossterm", + "crossterm 0.26.1", "fd-lock", "itertools 0.10.5", "nu-ansi-term", @@ -7037,6 +7098,29 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "rustyline" +version = "12.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "994eca4bca05c87e86e15d90fc7a91d1be64b4482b38cb2d27474568fe7c9db9" +dependencies = [ + "bitflags 2.4.0", + "cfg-if", + "clipboard-win", + "fd-lock", + "home", + "libc", + "log", + "memchr", + "nix 0.26.4", + "radix_trie", + "scopeguard", + "unicode-segmentation", + "unicode-width", + "utf8parse", + "winapi", +] + [[package]] name = "ryu" version = "1.0.15" @@ -7931,6 +8015,38 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "sqlformat" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce81b7bd7c4493975347ef60d8c7e8b742d4694f4c49f93e0a12ea263938176c" +dependencies = [ + "itertools 0.12.0", + "nom", + "unicode_categories", +] + +[[package]] +name = "sqlparser" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eaa1e88e78d2c2460d78b7dc3f0c08dbb606ab4222f9aff36f420d36e307d87" +dependencies = [ + "log", + "sqlparser_derive", +] + +[[package]] +name = "sqlparser_derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -7965,6 +8081,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "str-buf" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e08d8363704e6c71fc928674353e6b7c23dcea9d82d7012c8faf2a3a025f8d0" + [[package]] name = "string_cache" version = "0.8.7" @@ -7991,9 +8113,9 @@ dependencies = [ [[package]] name = "strip-ansi-escapes" -version = "0.2.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55ff8ef943b384c414f54aefa961dd2bd853add74ec75e7ac74cf91dba62bcfa" +checksum = "011cbb39cf7c1f62871aea3cc46e5817b0937b49e9447370c93cacbe93a766d8" dependencies = [ "vte", ] @@ -9095,6 +9217,12 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" +[[package]] +name = "unicode_categories" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" + [[package]] name = "universal-hash" version = "0.5.1" @@ -9291,10 +9419,11 @@ dependencies = [ [[package]] name = "vte" -version = "0.11.1" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5022b5fbf9407086c180e9557be968742d839e68346af7792b8592489732197" +checksum = "6cbce692ab4ca2f1f3047fcf732430249c0e971bfdd2b234cf2c47ad93af5983" dependencies = [ + "arrayvec 0.5.2", "utf8parse", "vte_generate_state_changes", ] @@ -9484,7 +9613,7 @@ dependencies = [ "camino", "ciborium", "clap 4.4.3", - "crossterm", + "crossterm 0.27.0", "futures", "humantime", "indexmap 2.1.0", @@ -9545,7 +9674,7 @@ dependencies = [ "camino", "ciborium", "clap 4.4.3", - "crossterm", + "crossterm 0.27.0", "omicron-workspace-hack", "reedline", "serde", diff --git a/Cargo.toml b/Cargo.toml index 5591dcebc93..aa27f61a71e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -301,6 +301,7 @@ rand = "0.8.5" ratatui = "0.23.0" rayon = "1.8" rcgen = "0.11.3" +reedline = "0.22.0" ref-cast = "1.0" regex = "1.10.2" regress = "0.7.1" @@ -310,6 +311,7 @@ rpassword = "7.3.1" rstest = "0.18.2" rustfmt-wrapper = "0.2" rustls = "0.21.9" +rustyline = "12.0.0" samael = { git = "https://github.com/njaremko/samael", features = ["xmlsec"], branch = "master" } schemars = "0.8.12" secrecy = "0.8.0" @@ -343,6 +345,7 @@ sp-sim = { path = "sp-sim" } sprockets-common = { git = "http://github.com/oxidecomputer/sprockets", rev = "77df31efa5619d0767ffc837ef7468101608aee9" } sprockets-host = { git = "http://github.com/oxidecomputer/sprockets", rev = "77df31efa5619d0767ffc837ef7468101608aee9" } sprockets-rot = { git = "http://github.com/oxidecomputer/sprockets", rev = "77df31efa5619d0767ffc837ef7468101608aee9" } +sqlparser = { version = "0.36.1", features = [ "visitor" ] } static_assertions = "1.1.0" # Please do not change the Steno version to a Git dependency. It makes it # harder than expected to make breaking changes (even if you specify a specific diff --git a/oximeter/db/Cargo.toml b/oximeter/db/Cargo.toml index 4d53869d0dc..99985a3b809 100644 --- a/oximeter/db/Cargo.toml +++ b/oximeter/db/Cargo.toml @@ -9,27 +9,46 @@ license = "MPL-2.0" anyhow.workspace = true async-trait.workspace = true bcs.workspace = true -bytes = { workspace = true, features = [ "serde" ] } camino.workspace = true chrono.workspace = true clap.workspace = true dropshot.workspace = true +futures.workspace = true highway.workspace = true +indexmap.workspace = true omicron-common.workspace = true +omicron-workspace-hack.workspace = true oximeter.workspace = true +reedline.workspace = true regex.workspace = true -reqwest = { workspace = true, features = [ "json" ] } -schemars = { workspace = true, features = [ "uuid1", "bytes", "chrono" ] } +rustyline.workspace = true serde.workspace = true serde_json.workspace = true slog.workspace = true slog-async.workspace = true slog-term.workspace = true +sqlparser.workspace = true +sqlformat = "0.2.2" +tabled.workspace = true thiserror.workspace = true -tokio = { workspace = true, features = [ "rt-multi-thread", "macros" ] } usdt.workspace = true uuid.workspace = true -omicron-workspace-hack.workspace = true + +[dependencies.bytes] +workspace = true +features = [ "serde" ] + +[dependencies.reqwest] +workspace = true +features = [ "json" ] + +[dependencies.schemars] +workspace = true +features = [ "uuid1", "bytes", "chrono" ] + +[dependencies.tokio] +workspace = true +features = [ "rt-multi-thread", "macros" ] [dev-dependencies] expectorate.workspace = true diff --git a/oximeter/db/README-oxdb-sql.md b/oximeter/db/README-oxdb-sql.md new file mode 100644 index 00000000000..8ebecdd409b --- /dev/null +++ b/oximeter/db/README-oxdb-sql.md @@ -0,0 +1,219 @@ +# `oxdb sql` + +This is a short how-to for using SQL to query timeseries. If you're eager to get +started, find a ClickHouse server with `oximeter` data, and run: + +```console +oxdb --address $CLICKHOUSE_ADDR sql +``` + +You can use `help` to get a help menu on the CLI, or run `\l` to list available +timeseries to start querying. + +## `oximeter` overview + +In general, `oximeter`'s architecture and data model are laid out in RFDs 161 +and 162. These provide a good detailed look at the system. + +### Terminology + +`oximeter` is the subsystem for describing, collecting, and storing telemetry +data from the Oxide rack. Software components make data available to an +`oximeter` collector in the form of _samples_, which are timestamped datapoints +from a single timeseries. + +Timeseries are named for their _target_, the component being measured or +monitored, and the _metric_, the measured feature or aspect of the target. The +timeseries name is derived as `target_name:metric_name`. The target and metric +can both have name-value pairs called _fields_, and the metric additionally has +a _measurement_, the actual measured value. Both are strongly typed. + +### Data normalization + +As samples are collected, `oximeter` normalizes them before storing in +ClickHouse. The database consists of a set of tables for fields and +measurements, with each _type_ stored in a different table. For fields, the name +of the field is also stored; for measurements, the timestamp and actual datum +are stored. Additionally, one table stores all the received _timeseries schema_, +which describes the name, fields, and measurement types for each timeseries. + +Normalizing the tables has many benefits. Less duplicated data is stored; +simpler, more static table arrangements; better compression; and more. It does +have drawbacks. Querying becomes especially tricky, because one needs to join +many tables together the reconstitute the original samples. This is exacerbated +by ClickHouse's lack of unique primary keys, which means we need to generate a +tag used to associated records from a single timeseries. These are called +_timeseries keys_, and are just hashes computed when a sample is received. + +## Oximeter SQL + +While writing the full set of join expressions needed to denormalize samples is +not very human-friendly, it _is_ relatively easy to generate these in code. +Using the stored timeseries schema and timeseries keys, one can write a (huge) +join expression that results in the full timeseries _as if_ it were a real table +in the database. `oxdb sql` generates this expression, and then runs whatever +query the user supplied on _the resulting in-memory table_. + +### Basic commands + +After starting the SQL shell with `oxdb sql`, one can run the following basic +operations: + +- `\h` or `help` will print a _help_ menu +- `\l` will _list_ all available timeseries by name +- `\d ` will _describe_ the schema of a single named timeseries +- `\f` will list supported ClickHouse functions and `\f ` will print + more details about the function and its usage + +### SQL + +In general, normal ANSI SQL is supported. Instead of _table_, however, one +queries against a _timeseries_. For example: + +```sql +SELECT count() FROM physical_data_link:bytes_received; +``` + +This will return the total number of samples in the timeseries representing the +number of bytes received on an Ethernet data link on a Gimlet. Here are the +available fields: + +```console +0x〉\d physical_data_link:bytes_received + hostname | link_name | rack_id | serial | sled_id | timestamp | start_time | datum +----------+-----------+---------+--------+---------+------------+------------+--------------- + String | String | Uuid | String | Uuid | DateTime64 | DateTime64 | CumulativeU64 +``` + +Any of these fields can be queried, including aggregations, groupings, etc. + +```console +0x〉select min(timestamp), max(timestamp) from physical_data_link:bytes_received; + + min(timestamp) | max(timestamp) +---------------------------------+--------------------------------- + "2023-11-09 04:24:53.284336528" | "2023-11-09 22:12:58.986751414" + +Metadata + Query ID: 66e68db5-8792-4e48-af2d-e5a2a117ab0d + Result rows: 1 + Time: 72.371047ms + Read: 19736 rows (1292186 bytes) + +``` + +or + +```console +0x〉select distinct route from http_service:request_latency_histogram where name = 'nexus-internal'; + + route +------------------------------------------------------------------------------------------------- + "/sled-agents/a8c6432e-338f-4839-bfb5-297112b39803/zpools/d462a7f7-b628-40fe-80ff-4e4189e2d62b" + "/metrics/producers" + "/sled-agents/a8c6432e-338f-4839-bfb5-297112b39803/zpools/616b26df-e62a-4c68-b506-f4a923d8aaf7" + "/metrics/collect/1e9a8843-2327-4d59-94b2-14f909b6f207" + "/sled-agents/a8c6432e-338f-4839-bfb5-297112b39803/zpools/f4b4dc87-ab46-49fb-a4b4-d361ae214c03" + "/sled-agents/a8c6432e-338f-4839-bfb5-297112b39803/zpools/a462a7f7-b628-40fe-80ff-4e4189e2d62b" + "/metrics/collect/4b795850-8320-4b7d-9048-aa277653ab8e" + "/sled-agents/a8c6432e-338f-4839-bfb5-297112b39803/zpools/14b4dc87-ab46-49fb-a4b4-d361ae214c03" + "/sled-agents/a8c6432e-338f-4839-bfb5-297112b39803/zpools/e4b4dc87-ab46-49fb-a4b4-d361ae214c03" + "/sled-agents/a8c6432e-338f-4839-bfb5-297112b39803/zpools/cd70d7f6-2354-4bf2-8012-55bf9eaf7930" + "/sled-agents/a8c6432e-338f-4839-bfb5-297112b39803/zpools/31bd71cd-4736-4a12-a387-9b74b050396f" + "/sled-agents/a8c6432e-338f-4839-bfb5-297112b39803/zpools/b462a7f7-b628-40fe-80ff-4e4189e2d62b" + "/sled-agents/a8c6432e-338f-4839-bfb5-297112b39803/zpools/24b4dc87-ab46-49fb-a4b4-d361ae214c03" + "/physical-disk" + "/sled-agents/a8c6432e-338f-4839-bfb5-297112b39803" + "/sled-agents/a8c6432e-338f-4839-bfb5-297112b39803/zpools/ceb4461c-cf56-4719-ad3c-14430bfdfb60" + "/metrics/collect/5c4f4629-1325-4123-bdcd-01bc9a18d740" + "/metrics/collectors" + +Metadata + Query ID: 3107c7ca-6906-4ce9-9d57-19cf0c1d6c71 + Result rows: 18 + Time: 119.387667ms + Read: 206840 rows (14749196 bytes) + +``` + +or + +```console +0x〉select link_name, formatReadableSize(max(datum)) from physical_data_link:bytes_sent group by link_name; + + link_name | formatReadableSize(max(datum)) +-----------+-------------------------------- + "net0" | "5.96 MiB" + "net1" | "0.00 B" + +Metadata + Query ID: cd101b14-a91e-419b-b2d0-633047db219e + Result rows: 2 + Time: 56.036663ms + Read: 27025 rows (1558430 bytes) + +``` + +> Note the metadata at the bottom. The query ID is assigned by the server, which + also returns the number of rows / bytes read. The _time_ includes the server + processing time and the network time, and is usually dominated by the latter. + +### JOINs + +SQL joins are also supported, as long as they are either _inner joins_ or the +ClickHoouse-specific _asof join_. Inner joins are pretty standard, but `ASOF +JOIN` is unique and very useful. It provides a way to match up rows that do not +have an _exact_ equal in each table. As an example, we can use this to match up +metrics from different timeseries + +```console +0x〉select timestamp, datum as bytes_received, s.timestamp, s.datum as bytes_sent from physical_data_link:bytes_received asof join physical_data_link:bytes_sent as s using (link_name, timestamp) where link_name = 'net0' limit 10; + + timestamp | bytes_received | s.timestamp | bytes_sent +---------------------------------+----------------+---------------------------------+------------ + "2023-11-09 04:24:53.284336528" | 0 | "2023-11-09 04:24:53.284336528" | 10661 + "2023-11-09 04:25:06.960255374" | 1064 | "2023-11-09 04:25:06.960255374" | 11937 + "2023-11-09 04:25:16.962286001" | 3748 | "2023-11-09 04:25:16.962286001" | 15910 + "2023-11-09 04:25:26.964768912" | 5278 | "2023-11-09 04:25:26.964768912" | 18465 + "2023-11-09 04:25:36.966422345" | 7146 | "2023-11-09 04:25:36.966422345" | 24423 + "2023-11-09 04:25:46.969640057" | 8032 | "2023-11-09 04:25:46.969640057" | 25370 + "2023-11-09 04:25:57.589902294" | 8868 | "2023-11-09 04:25:57.589902294" | 26277 + "2023-11-09 04:26:07.590262491" | 13120 | "2023-11-09 04:26:07.590262491" | 30225 + "2023-11-09 04:26:17.592895364" | 14584 | "2023-11-09 04:26:17.592895364" | 31501 + "2023-11-09 04:26:27.594820340" | 15344 | "2023-11-09 04:26:27.594820340" | 32211 + +Metadata + Query ID: a13f3abf-9c57-4caa-bfc6-c1b26732d2ad + Result rows: 10 + Time: 124.670777ms + Read: 45858 rows (3331596 bytes) + +``` + +Note that these happen to have exactly the same timestamp, based on how they +were generated, but that need not be the case. + +## Warnings and caveats + +First this is a **prototype**. It is also designed for testing and +experimentation, and little or none of the product should expect this to work on +customer sites any time soon. + +Second, the abstraction here is pretty leaky. SQL expressions that might +normally work against a real table can easily fail here. Please file a bug if +you think something _should_ work. + +Last, and maybe most important, be aware of the resource limitations here. This +all works by constructing an _enormous_ joined table in memory. ClickHouse is +extremely fast, but it is relatively simplistic when it comes to query planning +and optimization. That means it will do exactly what the query says, including +trying to create tables much larger than the available memory. + +For the most part, the blast radius of those problems should be limited to the +ClickHouse zone itself. We also limit the total memory consumption of the +server, currently to 90% of the zone's memory. But since we don't limit the +_zone's_ memory, that's 90% of the physical memory, which is very large indeed. +If you're curious how a query will perform, it's probably a good idea to try it +out on a small subset of data, by adding a `LIMIT` clause or similar. You can +also run `oxdb sql --transform $QUERY_STRING` to print the full query that will +actually be executed on the server. diff --git a/oximeter/db/src/bin/oxdb.rs b/oximeter/db/src/bin/oxdb.rs index e14fdeb6a8e..43352d551ed 100644 --- a/oximeter/db/src/bin/oxdb.rs +++ b/oximeter/db/src/bin/oxdb.rs @@ -3,16 +3,27 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. //! Tool for developing against the Oximeter timeseries database, populating data and querying. -// Copyright 2021 Oxide Computer Company + +// Copyright 2023 Oxide Computer Company use anyhow::{bail, Context}; use chrono::{DateTime, Utc}; use clap::{Args, Parser}; +use dropshot::EmptyScanParams; +use dropshot::WhichPage; use oximeter::{ types::{Cumulative, Sample}, Metric, Target, }; +use oximeter_db::sql::function_allow_list; +use oximeter_db::QueryMetadata; +use oximeter_db::QueryResult; +use oximeter_db::Table; use oximeter_db::{query, Client, DbWrite}; +use reedline::DefaultPrompt; +use reedline::DefaultPromptSegment; +use reedline::Reedline; +use reedline::Signal; use slog::{debug, info, o, Drain, Level, Logger}; use std::net::IpAddr; use std::net::SocketAddr; @@ -138,6 +149,12 @@ enum Subcommand { #[clap(long, conflicts_with("end"), action)] end_exclusive: Option>, }, + + /// Enter a SQL shell for interactive querying. + Sql { + #[clap(flatten)] + opts: ShellOptions, + }, } async fn make_client( @@ -295,8 +312,285 @@ async fn query( Ok(()) } +fn print_basic_commands() { + println!("Basic commands:"); + println!(" \\?, \\h, help - Print this help"); + println!(" \\q, quit, exit, ^D - Exit the shell"); + println!(" \\l - List tables"); + println!(" \\d - Describe a table"); + println!( + " \\f - List or describe ClickHouse SQL functions" + ); + println!(); + println!("Or try entering a SQL `SELECT` statement"); +} + +async fn list_virtual_tables(client: &Client) -> anyhow::Result<()> { + let mut page = WhichPage::First(EmptyScanParams {}); + let limit = 100.try_into().unwrap(); + loop { + let results = client.timeseries_schema_list(&page, limit).await?; + for schema in results.items.iter() { + println!("{}", schema.timeseries_name); + } + if results.next_page.is_some() { + if let Some(last) = results.items.last() { + page = WhichPage::Next(last.timeseries_name.clone()); + } else { + return Ok(()); + } + } else { + return Ok(()); + } + } +} + +async fn describe_virtual_table( + client: &Client, + table: &str, +) -> anyhow::Result<()> { + match table.parse() { + Err(_) => println!("Invalid timeseries name: {table}"), + Ok(name) => { + if let Some(schema) = client.schema_for_timeseries(&name).await? { + let mut cols = + Vec::with_capacity(schema.field_schema.len() + 2); + let mut types = cols.clone(); + for field in schema.field_schema.iter() { + cols.push(field.name.clone()); + types.push(field.ty.to_string()); + } + cols.push("timestamp".into()); + types.push("DateTime64".into()); + + if schema.datum_type.is_histogram() { + cols.push("start_time".into()); + types.push("DateTime64".into()); + + cols.push("bins".into()); + types.push(format!( + "Array[{}]", + schema + .datum_type + .to_string() + .strip_prefix("Histogram") + .unwrap() + .to_lowercase(), + )); + + cols.push("counts".into()); + types.push("Array[u64]".into()); + } else if schema.datum_type.is_cumulative() { + cols.push("start_time".into()); + types.push("DateTime64".into()); + cols.push("datum".into()); + types.push(schema.datum_type.to_string()); + } else { + cols.push("datum".into()); + types.push(schema.datum_type.to_string()); + } + + let mut builder = tabled::builder::Builder::default(); + builder.set_header(cols); + builder.push_record(types); + println!( + "{}", + builder.build().with(tabled::settings::Style::psql()) + ); + } else { + println!("No such timeseries: {table}"); + } + } + } + Ok(()) +} + +#[derive(Clone, Debug, Args)] +struct ShellOptions { + /// Print query metadata. + #[clap(long = "metadata")] + print_metadata: bool, + /// Print the original SQL query. + #[clap(long = "original")] + print_original_query: bool, + /// Print the rewritten SQL query that is actually run on the DB. + #[clap(long = "rewritten")] + print_rewritten_query: bool, + /// Print the transformed query, but do not run it. + #[clap(long)] + transform: Option, +} + +impl Default for ShellOptions { + fn default() -> Self { + Self { + print_metadata: true, + print_original_query: false, + print_rewritten_query: false, + transform: None, + } + } +} + +fn list_supported_functions() { + println!("Subset of ClickHouse SQL functions currently supported"); + println!( + "See https://clickhouse.com/docs/en/sql-reference/functions for more" + ); + println!(); + for func in function_allow_list().iter() { + println!(" {func}"); + } +} + +fn show_supported_function(name: &str) { + if let Some(func) = function_allow_list().iter().find(|f| f.name == name) { + println!("{}", func.name); + println!(" {}", func.usage); + println!(" {}", func.description); + } else { + println!("No supported function '{name}'"); + } +} + +fn print_sql_query(query: &str) { + println!( + "{}", + sqlformat::format( + &query, + &sqlformat::QueryParams::None, + sqlformat::FormatOptions { uppercase: true, ..Default::default() } + ) + ); + println!(); +} + +fn print_query_metadata(table: &Table, metadata: &QueryMetadata) { + println!("Metadata"); + println!(" Query ID: {}", metadata.id); + println!(" Result rows: {}", table.rows.len()); + println!(" Time: {:?}", metadata.elapsed); + println!(" Read: {}\n", metadata.summary.read); +} + +async fn sql_shell( + address: IpAddr, + port: u16, + log: Logger, + opts: ShellOptions, +) -> anyhow::Result<()> { + let client = make_client(address, port, &log).await?; + + // A workaround to ensure the client has all available timeseries when the + // shell starts. + let dummy = "foo:bar".parse().unwrap(); + let _ = client.schema_for_timeseries(&dummy).await; + + // Possibly just transform the query, but do not execute it. + if let Some(query) = &opts.transform { + let transformed = client.transform_query(query).await?; + println!( + "{}", + sqlformat::format( + &transformed, + &sqlformat::QueryParams::None, + sqlformat::FormatOptions { + uppercase: true, + ..Default::default() + } + ) + ); + return Ok(()); + } + + let mut ed = Reedline::create(); + let prompt = DefaultPrompt::new( + DefaultPromptSegment::Basic("0x".to_string()), + DefaultPromptSegment::Empty, + ); + println!("Oximeter SQL shell"); + println!(); + print_basic_commands(); + loop { + let sig = ed.read_line(&prompt); + match sig { + Ok(Signal::Success(buf)) => { + let cmd = buf.as_str().trim(); + match cmd { + "" => continue, + "\\?" | "\\h" | "help" => print_basic_commands(), + "\\q" | "quit" | "exit" => return Ok(()), + "\\l" | "\\d" => list_virtual_tables(&client).await?, + _ => { + if let Some(table_name) = cmd.strip_prefix("\\d") { + if table_name.is_empty() { + list_virtual_tables(&client).await?; + } else { + describe_virtual_table( + &client, + table_name.trim().trim_end_matches(';'), + ) + .await?; + } + } else if let Some(func_name) = cmd.strip_prefix("\\f") + { + if func_name.is_empty() { + list_supported_functions(); + } else { + show_supported_function( + func_name.trim().trim_end_matches(';'), + ); + } + } else { + match client.query(&buf).await { + Err(e) => println!("Query failed: {e:#?}"), + Ok(QueryResult { + original_query, + rewritten_query, + metadata, + table, + }) => { + println!(); + let mut builder = + tabled::builder::Builder::default(); + builder.set_header(&table.column_names); + for row in table.rows.iter() { + builder.push_record( + row.iter().map(ToString::to_string), + ); + } + if opts.print_original_query { + print_sql_query(&original_query); + } + if opts.print_rewritten_query { + print_sql_query(&rewritten_query); + } + println!( + "{}\n", + builder.build().with( + tabled::settings::Style::psql() + ) + ); + if opts.print_metadata { + print_query_metadata(&table, &metadata); + } + } + } + } + } + } + } + Ok(Signal::CtrlD) => return Ok(()), + Ok(Signal::CtrlC) => continue, + err => println!("err: {err:?}"), + } + } +} + #[tokio::main] -async fn main() { +async fn main() -> anyhow::Result<()> { + usdt::register_probes().context("Failed to register USDT probes")?; + let args = OxDb::parse(); let decorator = slog_term::TermDecorator::new().build(); let drain = slog_term::FullFormat::new(decorator) @@ -308,12 +602,10 @@ async fn main() { match args.cmd { Subcommand::Describe => describe_data(), Subcommand::Populate { populate_args } => { - populate(args.address, args.port, log, populate_args) - .await - .unwrap(); + populate(args.address, args.port, log, populate_args).await? } Subcommand::Wipe => { - wipe_single_node_db(args.address, args.port, log).await.unwrap() + wipe_single_node_db(args.address, args.port, log).await? } Subcommand::Query { timeseries_name, @@ -342,8 +634,11 @@ async fn main() { start, end, ) - .await - .unwrap(); + .await?; + } + Subcommand::Sql { opts } => { + sql_shell(args.address, args.port, log, opts).await? } } + Ok(()) } diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs index d295d0dcdfd..e7f7b6e1929 100644 --- a/oximeter/db/src/client.rs +++ b/oximeter/db/src/client.rs @@ -8,6 +8,7 @@ use crate::model; use crate::query; +use crate::sql::RestrictedQuery; use crate::Error; use crate::Metric; use crate::Target; @@ -22,9 +23,11 @@ use dropshot::EmptyScanParams; use dropshot::PaginationOrder; use dropshot::ResultsPage; use dropshot::WhichPage; +use indexmap::IndexMap; use oximeter::types::Sample; use regex::Regex; use regex::RegexBuilder; +use reqwest::header::HeaderMap; use slog::debug; use slog::error; use slog::info; @@ -41,6 +44,8 @@ use std::ops::Bound; use std::path::Path; use std::path::PathBuf; use std::sync::OnceLock; +use std::time::Duration; +use std::time::Instant; use tokio::fs; use tokio::sync::Mutex; use uuid::Uuid; @@ -51,6 +56,137 @@ mod probes { fn query__done(_: &usdt::UniqueId) {} } +/// A count of bytes / rows accessed during a query. +#[derive(Clone, Copy, Debug)] +pub struct IoCount { + pub bytes: u64, + pub rows: u64, +} + +impl std::fmt::Display for IoCount { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{} rows ({} bytes)", self.rows, self.bytes) + } +} + +/// Summary of the I/O and duration of a query. +#[derive(Clone, Copy, Debug, serde::Deserialize)] +#[serde(try_from = "serde_json::Value")] +pub struct QuerySummary { + /// The bytes and rows read by the query. + pub read: IoCount, + /// The bytes and rows written by the query. + pub written: IoCount, +} + +impl TryFrom for QuerySummary { + type Error = Error; + + fn try_from(j: serde_json::Value) -> Result { + use serde_json::Map; + use serde_json::Value; + use std::str::FromStr; + + let Value::Object(map) = j else { + return Err(Error::Database(String::from( + "Expected a JSON object for a metadata summary", + ))); + }; + + fn unpack_summary_value( + map: &Map, + key: &str, + ) -> Result + where + T: FromStr, + ::Err: std::error::Error, + { + let value = map.get(key).ok_or_else(|| { + Error::MissingHeaderKey { key: key.to_string() } + })?; + let Value::String(v) = value else { + return Err(Error::BadMetadata { + key: key.to_string(), + msg: String::from("Expected a string value"), + }); + }; + v.parse::().map_err(|e| Error::BadMetadata { + key: key.to_string(), + msg: e.to_string(), + }) + } + let rows_read: u64 = unpack_summary_value(&map, "read_rows")?; + let bytes_read: u64 = unpack_summary_value(&map, "read_bytes")?; + let rows_written: u64 = unpack_summary_value(&map, "written_rows")?; + let bytes_written: u64 = unpack_summary_value(&map, "written_bytes")?; + Ok(Self { + read: IoCount { bytes: bytes_read, rows: rows_read }, + written: IoCount { bytes: bytes_written, rows: rows_written }, + }) + } +} + +/// Basic metadata about the resource usage of a single SQL query. +#[derive(Clone, Copy, Debug)] +pub struct QueryMetadata { + /// The database-assigned query ID. + pub id: Uuid, + /// The total duration of the query (network plus execution). + pub elapsed: Duration, + /// Summary of the data read and written. + pub summary: QuerySummary, +} + +impl QueryMetadata { + fn from_headers( + elapsed: Duration, + headers: &HeaderMap, + ) -> Result { + fn get_header<'a>( + map: &'a HeaderMap, + key: &'a str, + ) -> Result<&'a str, Error> { + let hdr = map.get(key).ok_or_else(|| Error::MissingHeaderKey { + key: key.to_string(), + })?; + std::str::from_utf8(hdr.as_bytes()) + .map_err(|err| Error::Database(err.to_string())) + } + let summary = + serde_json::from_str(get_header(headers, "X-ClickHouse-Summary")?) + .map_err(|err| Error::Database(err.to_string()))?; + let id = get_header(headers, "X-ClickHouse-Query-Id")? + .parse() + .map_err(|err: uuid::Error| Error::Database(err.to_string()))?; + Ok(Self { id, elapsed, summary }) + } +} + +/// A tabular result from a SQL query against a timeseries. +#[derive(Clone, Debug, Default, serde::Serialize)] +pub struct Table { + /// The name of each column in the result set. + pub column_names: Vec, + /// The rows of the result set, one per column. + pub rows: Vec>, +} + +/// The full result of running a SQL query against a timeseries. +#[derive(Clone, Debug)] +pub struct QueryResult { + /// The query as written by the client. + pub original_query: String, + /// The rewritten query, run against the JOINed representation of the + /// timeseries. + /// + /// This is the query that is actually run in the database itself. + pub rewritten_query: String, + /// Metadata about the resource usage of the query. + pub metadata: QueryMetadata, + /// The result of the query, with column names and rows. + pub table: Table, +} + /// A `Client` to the ClickHouse metrics database. #[derive(Debug)] pub struct Client { @@ -89,6 +225,76 @@ impl Client { Ok(()) } + /// Transform a SQL query against a timeseries, but do not execute it. + pub async fn transform_query( + &self, + query: impl AsRef, + ) -> Result { + let restricted = RestrictedQuery::new(query.as_ref())?; + restricted.to_oximeter_sql(&*self.schema.lock().await) + } + + /// Run a SQL query against a timeseries. + pub async fn query( + &self, + query: impl AsRef, + ) -> Result { + let original_query = query.as_ref().trim_end_matches(';'); + let ox_sql = self.transform_query(original_query).await?; + let rewritten = format!("{ox_sql} FORMAT JSONEachRow"); + debug!( + self.log, + "rewrote restricted query"; + "original_sql" => &original_query, + "rewritten_sql" => &rewritten, + ); + let request = self + .client + .post(&self.url) + .query(&[ + ("output_format_json_quote_64bit_integers", "0"), + ("database", crate::DATABASE_NAME), + ]) + .body(rewritten.clone()); + let query_start = Instant::now(); + let response = handle_db_response( + request + .send() + .await + .map_err(|err| Error::DatabaseUnavailable(err.to_string()))?, + ) + .await?; + let metadata = QueryMetadata::from_headers( + query_start.elapsed(), + response.headers(), + )?; + let text = response.text().await.unwrap(); + let mut table = Table::default(); + for line in text.lines() { + let row = + serde_json::from_str::>( + line.trim(), + ) + .unwrap(); + if table.column_names.is_empty() { + table.column_names.extend(row.keys().cloned()) + } else { + assert!(table + .column_names + .iter() + .zip(row.keys()) + .all(|(k1, k2)| k1 == k2)); + } + table.rows.push(row.into_values().collect()); + } + Ok(QueryResult { + original_query: original_query.to_string(), + rewritten_query: rewritten, + metadata, + table, + }) + } + /// Select timeseries from criteria on the fields and start/end timestamps. pub async fn select_timeseries_with( &self, @@ -271,7 +477,7 @@ impl Client { ResultsPage::new(schema, &dropshot::EmptyScanParams {}, |schema, _| { schema.timeseries_name.clone() }) - .map_err(|e| Error::Database(e.to_string())) + .map_err(|err| Error::Database(err.to_string())) } /// Read the available schema versions in the provided directory. @@ -1181,13 +1387,14 @@ async fn handle_db_response( // NOTE: ClickHouse returns 404 for all errors (so far encountered). We pull the text from // the body if possible, which contains the actual error from the database. let body = response.text().await.unwrap_or_else(|e| e.to_string()); - Err(Error::Database(body)) + Err(Error::Database(format!("Query failed: {body}"))) } } #[cfg(test)] mod tests { use super::*; + use crate::model::OXIMETER_VERSION; use crate::query; use crate::query::field_table_name; use bytes::Bytes; @@ -4261,4 +4468,87 @@ mod tests { db.cleanup().await.expect("Failed to cleanup ClickHouse server"); logctx.cleanup_successful(); } + + #[tokio::test] + async fn test_sql_query_output() { + let logctx = test_setup_log("test_sql_query_output"); + let log = &logctx.log; + let mut db = ClickHouseInstance::new_single_node(0) + .await + .expect("Failed to start ClickHouse"); + let address = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), db.port()); + let client = Client::new(address, &log); + client + .initialize_db_with_version(false, OXIMETER_VERSION) + .await + .expect("Failed to initialize timeseries database"); + let (_target, metrics, samples) = setup_select_test(); + client.insert_samples(&samples).await.unwrap(); + + // Sanity check that we get exactly the number of samples we expected. + let res = client + .query("SELECT count() AS total FROM service:request_latency") + .await + .unwrap(); + assert_eq!(res.table.rows.len(), 1); + let serde_json::Value::Number(n) = &res.table.rows[0][0] else { + panic!("Expected exactly 1 row with 1 item"); + }; + assert_eq!(n.as_u64().unwrap(), samples.len() as u64); + + // Assert grouping by the keys results in exactly the number of samples + // expected for each timeseries. + let res = client + .query( + "SELECT count() AS total \ + FROM service:request_latency \ + GROUP BY timeseries_key; \ + ", + ) + .await + .unwrap(); + assert_eq!(res.table.rows.len(), metrics.len()); + for row in res.table.rows.iter() { + assert_eq!(row.len(), 1); + let serde_json::Value::Number(n) = &row[0] else { + panic!("Expected a number in each row"); + }; + assert_eq!( + n.as_u64().unwrap(), + (samples.len() / metrics.len()) as u64 + ); + } + + // Read test SQL and make sure we're getting expected results. + let sql_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("test-output") + .join("sql"); + let mut rd = tokio::fs::read_dir(&sql_dir) + .await + .expect("failed to read SQL test directory"); + while let Some(next_entry) = + rd.next_entry().await.expect("failed to read directory entry") + { + let sql_file = next_entry.path().join("query.sql"); + let result_file = next_entry.path().join("result.txt"); + let query = tokio::fs::read_to_string(&sql_file) + .await + .unwrap_or_else(|_| { + panic!( + "failed to read test SQL query in '{}", + sql_file.display() + ) + }); + let res = client + .query(&query) + .await + .expect("failed to execute test query"); + expectorate::assert_contents( + result_file, + &serde_json::to_string_pretty(&res.table).unwrap(), + ); + } + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } } diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index 9029319048c..f632a531aff 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -33,9 +33,13 @@ use thiserror::Error; mod client; pub mod model; pub mod query; +pub mod sql; + pub use client::Client; pub use client::DbWrite; - +pub use client::QueryMetadata; +pub use client::QueryResult; +pub use client::Table; pub use model::OXIMETER_VERSION; #[derive(Debug, Error)] @@ -47,8 +51,14 @@ pub enum Error { #[error("Telemetry database unavailable: {0}")] DatabaseUnavailable(String), + #[error("Missing expected metadata header key '{key}'")] + MissingHeaderKey { key: String }, + + #[error("Invalid or malformed query metadata for key '{key}': {msg}")] + BadMetadata { key: String, msg: String }, + /// An error interacting with the telemetry database - #[error("Error interacting with telemetry database: {0}")] + #[error("Error interacting with telemetry database")] Database(String), /// A schema provided when collecting samples did not match the expected schema @@ -123,6 +133,9 @@ pub enum Error { #[error("Schema update versions must be sequential without gaps")] NonSequentialSchemaVersions, + + #[error("SQL error")] + Sql(#[from] sql::Error), } impl From for TimeseriesSchema { @@ -290,6 +303,7 @@ mod tests { assert!(TimeseriesName::try_from(":b").is_err()); assert!(TimeseriesName::try_from("a:").is_err()); assert!(TimeseriesName::try_from("123").is_err()); + assert!(TimeseriesName::try_from("no:no:no").is_err()); } // Validates that the timeseries_key stability for a sample is stable. diff --git a/oximeter/db/src/query.rs b/oximeter/db/src/query.rs index 2caefb24c37..92127695739 100644 --- a/oximeter/db/src/query.rs +++ b/oximeter/db/src/query.rs @@ -296,6 +296,7 @@ impl SelectQueryBuilder { } } +/// Return the name of the measurements table for a datum type. pub(crate) fn measurement_table_name(ty: DatumType) -> String { format!("measurements_{}", ty.to_string().to_lowercase()) } @@ -335,6 +336,7 @@ pub struct FieldSelector { comparison: Option, } +/// Return the name of the field table for the provided field type. pub(crate) fn field_table_name(ty: FieldType) -> String { format!("fields_{}", ty.to_string().to_lowercase()) } diff --git a/oximeter/db/src/sql/mod.rs b/oximeter/db/src/sql/mod.rs new file mode 100644 index 00000000000..b7a2232cf4a --- /dev/null +++ b/oximeter/db/src/sql/mod.rs @@ -0,0 +1,1350 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Run SQL queries against the timeseries database. +//! +//! # Overview +//! +//! `oximeter` collects and stores samples from timeseries. The schema for those +//! samples is defined by applications, using the [`Target`](oximeter::Target) +//! and [`Metric`](oximeter::Metric) traits. Samples from these timeseries are +//! not stored in explicit tables, however. They are "unrolled" into the fields +//! and measurements, which are stored in a table based on their _data type_. +//! For example, `String` fields are stored in the `oximeter.fields_string` +//! table. (See RFD 161 for more details.) +//! +//! This arrangement is flexible and simple, since we can statically define the +//! tables we need, rather than say create a new table for each timeseries +//! schema. However, the drawback of this is that the timeseries data is not +//! easily queried directly. The data is split across many tables, and +//! interleaved with other timeseries, which may not even share a schema. +//! +//! The tools in this module are for making "normal" SQL queries transparently +//! act on the "virtual tables" that are implied by each timeseries. It's +//! effectively a SQL-to-SQL transpiler, converting queries against the +//! timeseries into one or more queries against the actual tables in ClickHouse. + +// Copyright 2023 Oxide Computer Company + +use crate::query::field_table_name; +use crate::query::measurement_table_name; +use crate::DatumType; +use crate::Error as OxdbError; +use crate::FieldType; +use crate::TimeseriesName; +use crate::TimeseriesSchema; +use indexmap::IndexSet; +use sqlparser::ast::BinaryOperator; +use sqlparser::ast::Cte; +use sqlparser::ast::Distinct; +use sqlparser::ast::Expr; +use sqlparser::ast::Ident; +use sqlparser::ast::Join; +use sqlparser::ast::JoinConstraint; +use sqlparser::ast::JoinOperator; +use sqlparser::ast::ObjectName; +use sqlparser::ast::OrderByExpr; +use sqlparser::ast::Query; +use sqlparser::ast::Select; +use sqlparser::ast::SelectItem; +use sqlparser::ast::SetExpr; +use sqlparser::ast::Statement; +use sqlparser::ast::TableAlias; +use sqlparser::ast::TableFactor; +use sqlparser::ast::TableWithJoins; +use sqlparser::ast::Value; +use sqlparser::ast::With; +use sqlparser::dialect::AnsiDialect; +use sqlparser::dialect::Dialect; +use sqlparser::parser::Parser; +use sqlparser::parser::ParserError; +use std::collections::BTreeMap; +use std::collections::BTreeSet; +use std::ops::ControlFlow; +use std::sync::OnceLock; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("SQL parsing error")] + Parser(#[from] ParserError), + + #[error("Unsupported SQL: {0}")] + UnsupportedSql(&'static str), + + #[error("Unsupported function: '{func}'")] + UnsupportedFunction { func: String }, + + #[error("Invalid column '{name}' for timeseries '{timeseries_name}'")] + InvalidColumn { name: String, timeseries_name: String }, + + #[error( + "Table name '{table_name}' in select query does not match \ + timeseries name '{timeseries_name}'" + )] + TableInSelectIsNotTimeseries { table_name: String, timeseries_name: String }, + + #[error("Invalid timeseries name: '{name}'")] + InvalidTimeseriesName { name: String }, +} + +/// The oximeter timeseries SQL dialect. +#[derive(Clone, Copy, Debug, PartialEq)] +pub struct OxdbDialect; + +impl Dialect for OxdbDialect { + fn is_identifier_start(&self, ch: char) -> bool { + AnsiDialect {}.is_identifier_start(ch) + } + + fn is_identifier_part(&self, ch: char) -> bool { + AnsiDialect {}.is_identifier_part(ch) || ch == ':' + } +} + +/// A SQL statement that is probably supported. +/// +/// There's a big range of statements that are not supported. This is guaranteed +/// to be a single select statement, where all the items being selected FROM +/// are: +/// +/// - concrete tables that could be timeseries (valid names) +/// - a subquery against a restricted query +#[derive(Clone, Debug)] +pub struct RestrictedQuery { + safe_sql: SafeSql, + query: Query, + timeseries: IndexSet, +} + +impl std::fmt::Display for RestrictedQuery { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.query) + } +} + +macro_rules! unsupported { + ($msg:literal) => { + Err(OxdbError::from(Error::UnsupportedSql($msg))) + }; +} + +/// A helper type to preprocess any ClickHouse-specific SQL, and present a +/// known-safe version of it to the main `sqlparser` code. +/// +/// This is currently used to handle ASOF JOINs, which are a ClickHouse-specific +/// JOIN that joins rows based on a "closest match" condition. However, a +/// standard SQL parser will take an expression like: +/// +/// ```sql +/// SELECT foo ASOF JOIN bar +/// ``` +/// +/// And interpret the `ASOF` as an alias for `FOO`, as if one had written +/// +/// ```sql +/// SELECT foo AS asof JOIN bar +/// ``` +/// +/// This basically detects and removes a bare `ASOF` in that case, so the parser +/// can run normally. +#[derive(Clone, Debug)] +struct SafeSql { + original: String, + safe: String, +} + +impl SafeSql { + fn new(sql: impl AsRef) -> Self { + // The regex crate doesn't support look-arounds, so we'll have to + // manually find sequences like `ASOF JOIN`, that are not preceded by + // `AS`. + let sql = sql.as_ref().trim().trim_end_matches(';'); + let mut original = Vec::new(); + let mut safe = Vec::new(); + let mut tokens = sql.split_ascii_whitespace().peekable(); + while let Some(token) = tokens.next() { + // Always push the current token. + if token.parse::().is_ok() { + let tok = format!("\"{token}\""); + safe.push(tok.clone()); + original.push(tok); + } else { + safe.push(token.to_string()); + original.push(token.to_string()); + } + + // If the next token is ASOF, and the current is _not_ AS, then this + // is something like `select foo asof join bar`, and we want to chop + // out the `asof`. Consume the next token, and break the SQL string, + // by pushing a new chunk at the end. + if let Some(next_token) = tokens.peek() { + if !token.eq_ignore_ascii_case("as") + && next_token.eq_ignore_ascii_case("asof") + { + original.push(tokens.next().unwrap().to_string()); + } + } + } + Self { original: original.join(" "), safe: safe.join(" ") } + } + + fn safe_sql(&self) -> &str { + &self.safe + } +} + +impl RestrictedQuery { + /// Construct a new restricted query. + pub fn new(sql: impl AsRef) -> Result { + let safe_sql = SafeSql::new(sql); + let statements = Parser::parse_sql(&OxdbDialect, &safe_sql.safe_sql()) + .map_err(Error::from)?; + if statements.len() != 1 { + return unsupported!("Only a single SQL statement is supported"); + } + + let statement = statements.into_iter().next().unwrap(); + let Statement::Query(mut query) = statement else { + return unsupported!("Statement must be a SELECT query"); + }; + + // Walk the AST before doing any real processing or transformation, and + // validate any function calls are on the allow-list. + let maybe_denied_function = + sqlparser::ast::visit_expressions(&query, |expr| { + if let Expr::Function(func) = expr { + if let Some(name) = func.name.0.first() { + if !function_allow_list() + .iter() + .any(|f| f.name == name.value.as_str()) + { + return ControlFlow::Break(name.value.clone()); + } + } + } + ControlFlow::Continue(()) + }); + if let ControlFlow::Break(func) = maybe_denied_function { + return Err(OxdbError::from(Error::UnsupportedFunction { func })); + }; + + let timeseries = Self::process_query(&mut query)?; + Ok(Self { safe_sql, query: *query, timeseries }) + } + + /// Convert the original SQL into a query specifically for the `oximeter` + /// timeseries table organization. + pub fn to_oximeter_sql( + &self, + timeseries_schema: &BTreeMap, + ) -> Result { + self.generate_timeseries_ctes(×eries_schema).map(|cte_tables| { + if cte_tables.is_empty() { + // The query didn't reference any timeseries at all, let's just + // return it + self.safe_sql.original.clone() + } else { + // There are some timeseries referenced. Let's return a query + // constructed by building the CTEs, and then the _original_ + // SQL, which may have `ASOF JOIN`s in it. + format!( + "{} {}", + With { recursive: false, cte_tables }, + self.safe_sql.original, + ) + } + }) + } + + // For each timeseries named in `self`, generate a CTE that creates the + // virtual table for that timeseries by joining all its component parts. + fn generate_timeseries_ctes( + &self, + timeseries_schema: &BTreeMap, + ) -> Result, OxdbError> { + let mut ctes = Vec::with_capacity(self.timeseries.len()); + for timeseries in self.timeseries.iter() { + let schema = + timeseries_schema.get(timeseries).ok_or_else(|| { + OxdbError::TimeseriesNotFound( + timeseries.as_str().to_owned(), + ) + })?; + ctes.push(Self::build_timeseries_cte(schema)); + } + Ok(ctes) + } + + // Given a timeseries schema, return a CTE which generates the equivalent + // virtual table. + // + // As timeseries samples are ingested, we "unroll" them in various ways, and + // store them in a set of normalized tables. These contain the _fields_ (on + // table per field data type) and the measurements (one table per + // measurement data type). This method reverses that process, creating a + // single, virtual table that represents all samples from the timeseries + // (plural) of the same schema. + // + // It generates a CTE like so: + // + // ```sql + // WITH {timeseries_name} AS ( + // SELECT + // timeseries_key, + // filter_on_{field_name0}.field_value as {field_name0}, + // filter_on_{field_name1}.field_value as {field_name1}, + // ... + // measurements.timestamp AS timestamp, + // measurements.datum as datum, + // FROM + // ( + // SELECT DINSTINCT timeseries_key, + // field_value + // FROM + // fields_{field_type} + // WHERE + // timeseries_name = '{timeseries_name}' + // AND field_name = '{field_name0} + // ) AS filter_on_{field_name0} + // JOIN ( + // ... select next field table + // ) AS filter_on_{field_name1} ON filter_on_{field_name0}.timeseries_key = filter_on_{field_name1} + // ... + // JOIN ( + // SELECT + // timeseries_key, + // timestamp, + // datum, + // FROM + // measurements_{datum_type} + // WHERE + // timeseries_name = '{timeseries_name}' + // ) AS measurements ON filter_on_fieldN.timeseries_key = measurements.timeseries_key + // ORDER BY + // timeseries_key, + // timestamp + // ) + // ``` + // + // In other words, it should generate a CTE that one can query as if the + // timeseries itself where an actual table in the database, like: + // + // ``` + // timeseries_key | field_name0 | field_name1 | ... | timestamp | datum + // ---------------+-------------+-------------+ ... +-----------+------ + // key0 | field0_0 | field0_1 | ... | t0 | d0 + // key0 | field0_0 | field0_1 | ... | t1 | d1 + // key0 | field0_0 | field0_1 | ... | t2 | d2 + // key0 | field0_0 | field0_1 | ... | t3 | d3 + // ... + // key1 | field1_0 | field1_1 | ... | t0 | d0 + // key1 | field1_0 | field1_1 | ... | t1 | d1 + // key1 | field1_0 | field1_1 | ... | t2 | d2 + // key1 | field1_0 | field1_1 | ... | t3 | d3 + // ... + // ``` + // + // In this case, all rows with `key0` are from the "first" timeseries with + // this schema. `fieldX_Y` indicates the Yth field from timeseries with + // `key0` as its key. + fn build_timeseries_cte(schema: &TimeseriesSchema) -> Cte { + // First build each query against the relevant field tables. + // + // These are the `SELECT DISTINCT ... FROM fields_{field_type}` + // subqueries above. + let mut field_queries = Vec::with_capacity(schema.field_schema.len()); + for field_schema in schema.field_schema.iter() { + let field_query = Self::build_field_query( + &schema.timeseries_name, + &field_schema.name, + &field_schema.ty, + ); + field_queries.push((field_schema.name.as_str(), field_query)); + } + + // Generate the last measurement query, the last subquery in the main + // CTE. + let measurement_query = Self::build_measurement_query( + &schema.timeseries_name, + &schema.datum_type, + ); + + // The "top-level" columns are the columns outputted by the CTE itself. + // + // These are the aliased columns of the full, reconstructed table + // representing the timeseries. This makes the timeseries_key available, + // as well as each field aliased to the actual field name, and the + // measurements. + let mut top_level_projections = + Vec::with_capacity(field_queries.len() + 2); + + // Create the projection of the top-level timeseries_key. + // + // This is taken from the first field, which always exists, since + // timeseries have at least one field. This creates the expression: + // `filter_{field_name}.timeseries_key AS timeseries_key` + let timeseries_key_projection = SelectItem::ExprWithAlias { + expr: Expr::CompoundIdentifier(vec![ + Self::field_subquery_alias(field_queries[0].0), + Self::str_to_ident("timeseries_key"), + ]), + alias: Self::str_to_ident("timeseries_key"), + }; + top_level_projections.push(timeseries_key_projection); + + // We'll build a big `TableWithJoins` to express the entire JOIN + // operation between all fields and the measurements. This is the "meat" + // of the CTE for this timeseries, joining the constituent records into + // the virtual table for this schema. + // + // We select first from the subquery specifying the first field query. + let mut cte_from = TableWithJoins { + relation: TableFactor::Derived { + lateral: false, + subquery: Self::select_to_query(field_queries[0].1.clone()), + alias: Some(TableAlias { + name: Self::field_subquery_alias(field_queries[0].0), + columns: vec![], + }), + }, + joins: Vec::with_capacity(field_queries.len()), + }; + + // For all field queries, create a projection for the field_value, + // aliased as the field name. + let field_queries: Vec<_> = field_queries.into_iter().collect(); + for (i, (field_name, query)) in field_queries.iter().enumerate() { + // Select the field_value from this field query, renaming it to the + // actual field name. + let projection = SelectItem::ExprWithAlias { + expr: Expr::CompoundIdentifier(vec![ + Self::field_subquery_alias(field_name), + Self::str_to_ident("field_value"), + ]), + alias: Self::str_to_ident(field_name), + }; + top_level_projections.push(projection); + + // We've inserted the first subquery as the `from.relation` field in + // the main CTE we're building. We need to skip that one, even + // though we added its aliased `field_value` column to the top level + // projections. + // + // Any additional field subqueries are inserted in the JOIN portion + // of the CTE. + if i == 0 { + continue; + } + let relation = TableFactor::Derived { + lateral: false, + subquery: Self::select_to_query(query.clone()), + alias: Some(TableAlias { + name: Self::field_subquery_alias(field_name), + columns: vec![], + }), + }; + + // The join is always INNER, and is on the timeseries_key only. + // ClickHouse does not support `USING ` when using multiple + // JOINs simultaneously, so we always write this as an `ON` + // constraint, between the previous field subquery and this one. + // + // I.e., `ON filter_foo.timeseries_key = filter_bar.timeseries_key` + let last_field_name = &field_queries[i - 1].0; + let constraints = Expr::BinaryOp { + left: Box::new(Expr::CompoundIdentifier(vec![ + Self::field_subquery_alias(last_field_name), + Self::str_to_ident("timeseries_key"), + ])), + op: BinaryOperator::Eq, + right: Box::new(Expr::CompoundIdentifier(vec![ + Self::field_subquery_alias(field_name), + Self::str_to_ident("timeseries_key"), + ])), + }; + let join_operator = + JoinOperator::Inner(JoinConstraint::On(constraints)); + cte_from.joins.push(Join { relation, join_operator }); + } + + // Finally, we need to project and join in the measurements table. + let datum_columns = Self::datum_type_to_columns(&schema.datum_type); + for col in datum_columns.iter() { + let projection = SelectItem::ExprWithAlias { + expr: Expr::CompoundIdentifier(vec![ + Self::str_to_ident("measurements"), + Self::str_to_ident(col), + ]), + alias: Self::str_to_ident(col), + }; + top_level_projections.push(projection); + } + let relation = TableFactor::Derived { + lateral: false, + subquery: Self::select_to_query(measurement_query), + alias: Some(TableAlias { + name: Self::str_to_ident("measurements"), + columns: vec![], + }), + }; + let constraints = Expr::BinaryOp { + left: Box::new(Expr::CompoundIdentifier(vec![ + Self::field_subquery_alias( + &schema.field_schema.last().unwrap().name, + ), + Self::str_to_ident("timeseries_key"), + ])), + op: BinaryOperator::Eq, + right: Box::new(Expr::CompoundIdentifier(vec![ + Self::str_to_ident("measurements"), + Self::str_to_ident("timeseries_key"), + ])), + }; + let join_operator = + JoinOperator::Inner(JoinConstraint::On(constraints)); + cte_from.joins.push(Join { relation, join_operator }); + + // To build the real virtual table for all the timeseries, we really + // need to sort the samples as if they were inserted into the table + // itself. ClickHouse partitions the tables dynamically since we're + // using a MergeTree engine, which groups and repacks rows in the + // background. + // + // We'll impose a consistent sorting order here. If one does not include + // this, results are inconsistent, since the different data parts of the + // measurements tables are not read in order every time. + let order_by = top_level_projections + .iter() + .filter_map(|proj| { + let SelectItem::ExprWithAlias { alias, .. } = &proj else { + unreachable!(); + }; + if alias.value == "timeseries_key" + || alias.value == "start_time" + || alias.value == "timestamp" + { + Some(OrderByExpr { + expr: Expr::Identifier(alias.clone()), + asc: None, + nulls_first: None, + }) + } else { + None + } + }) + .collect(); + + // We now have all the subqueries joined together, plus the columns + // we're projecting from that join result. We need to build the final + // CTE that represents the full virtual timeseries table. + let alias = TableAlias { + name: Ident { + value: schema.timeseries_name.to_string(), + quote_style: Some('"'), + }, + columns: vec![], + }; + let top_level_select = Select { + distinct: None, + top: None, + projection: top_level_projections, + into: None, + from: vec![cte_from], + lateral_views: vec![], + selection: None, + group_by: vec![], + cluster_by: vec![], + distribute_by: vec![], + sort_by: vec![], + having: None, + named_window: vec![], + qualify: None, + }; + let mut query = Self::select_to_query(top_level_select); + query.order_by = order_by; + Cte { alias, query, from: None } + } + + // Create a SQL parser `Ident` with a the given name. + fn str_to_ident(s: &str) -> Ident { + Ident { value: s.to_string(), quote_style: None } + } + + // Return an `Ident` alias for a subquery of a specific field table. + // + // E.g., the `filter_on_foo` in `(SELECT DISTINCT ... ) AS filter_on_foo`. + fn field_subquery_alias(field_name: &str) -> Ident { + Self::str_to_ident(format!("filter_on_{field_name}").as_str()) + } + + // Return the required measurement columns for a specific datum type. + // + // Scalar measurements have only a timestamp and datum. Cumulative counters + // have those plus a start_time. And histograms have those plus the bins. + fn datum_type_to_columns( + datum_type: &DatumType, + ) -> &'static [&'static str] { + if datum_type.is_histogram() { + &["start_time", "timestamp", "bins", "counts"] + } else if datum_type.is_cumulative() { + &["start_time", "timestamp", "datum"] + } else { + &["timestamp", "datum"] + } + } + + fn select_to_query(select: Select) -> Box { + Box::new(Query { + with: None, + body: Box::new(SetExpr::Select(Box::new(select))), + order_by: vec![], + limit: None, + offset: None, + fetch: None, + locks: vec![], + }) + } + + // Build a single subquery which selects the unique fields with the provided + // name. E.g., this creates: + // + // ```sql + // SELECT DISTINCT timeseries_key, + // field_value + // FROM + // fields_{field_type} + // WHERE + // timeseries_name = '{timeseries_name}' + // AND field_name = '{field_name}' + // ``` + fn build_field_query( + timeseries_name: &TimeseriesName, + field_name: &str, + field_type: &FieldType, + ) -> Select { + // FROM fields_{field_type} + let from = TableWithJoins { + relation: TableFactor::Table { + name: ObjectName(vec![Self::str_to_ident(&field_table_name( + *field_type, + ))]), + alias: None, + args: None, + with_hints: vec![], + }, + joins: vec![], + }; + + // SELECT timeseries_key, field_value + let projection = vec![ + SelectItem::UnnamedExpr(Expr::Identifier(Self::str_to_ident( + "timeseries_key", + ))), + SelectItem::UnnamedExpr(Expr::Identifier(Self::str_to_ident( + "field_value", + ))), + ]; + + // WHERE timeseries_name = '{timeseries_name}' AND field_name = '{field_name}' + let selection = Some(Expr::BinaryOp { + left: Box::new(Expr::BinaryOp { + left: Box::new(Expr::Identifier(Self::str_to_ident( + "timeseries_name", + ))), + op: BinaryOperator::Eq, + right: Box::new(Expr::Value(Value::SingleQuotedString( + timeseries_name.to_string(), + ))), + }), + op: BinaryOperator::And, + right: Box::new(Expr::BinaryOp { + left: Box::new(Expr::Identifier(Self::str_to_ident( + "field_name", + ))), + op: BinaryOperator::Eq, + right: Box::new(Expr::Value(Value::SingleQuotedString( + field_name.to_string(), + ))), + }), + }); + + Select { + distinct: Some(Distinct::Distinct), + top: None, + projection, + into: None, + from: vec![from], + lateral_views: vec![], + selection, + group_by: vec![], + cluster_by: vec![], + distribute_by: vec![], + sort_by: vec![], + having: None, + named_window: vec![], + qualify: None, + } + } + + // Build a single subquery which selects the measurements with the provided + // name. E.g., this creates: + // + // ```sql + // SELECT + // timeseries_key, + // timestamp, + // datum + // FROM + // measurements_{datum_type} + // WHERE + // timeseries_name = '{timeseries_name}' + // ``` + fn build_measurement_query( + timeseries_name: &TimeseriesName, + datum_type: &DatumType, + ) -> Select { + // FROM measurements_{datum_type} + let from = TableWithJoins { + relation: TableFactor::Table { + name: ObjectName(vec![Self::str_to_ident( + &measurement_table_name(*datum_type), + )]), + alias: None, + args: None, + with_hints: vec![], + }, + joins: vec![], + }; + + // SELECT timeseries_key, timestamp, [datum type columns] + let mut projection = vec![SelectItem::UnnamedExpr(Expr::Identifier( + Self::str_to_ident("timeseries_key"), + ))]; + let datum_projection = Self::datum_type_to_columns(datum_type); + projection.extend(datum_projection.iter().map(|name| { + SelectItem::UnnamedExpr(Expr::Identifier(Self::str_to_ident(name))) + })); + + // WHERE timeseries_name = '{timeseries_name}' + let selection = Some(Expr::BinaryOp { + left: Box::new(Expr::Identifier(Self::str_to_ident( + "timeseries_name", + ))), + op: BinaryOperator::Eq, + right: Box::new(Expr::Value(Value::SingleQuotedString( + timeseries_name.to_string(), + ))), + }); + + Select { + distinct: None, + top: None, + projection, + into: None, + from: vec![from], + lateral_views: vec![], + selection, + group_by: vec![], + cluster_by: vec![], + distribute_by: vec![], + sort_by: vec![], + having: None, + named_window: vec![], + qualify: None, + } + } + + // Process a single "table factor", the in `FROM ` to + // extract the names of the timeseries it refers to. + // + // Note this is recursive since we do support basic inner joins. + fn process_table_factor( + relation: &mut TableFactor, + ) -> Result, OxdbError> { + match relation { + TableFactor::Table { ref mut name, args, with_hints, .. } => { + if args.is_some() || !with_hints.is_empty() { + return unsupported!( + "Table functions and hints are not supported" + ); + } + if name.0.len() != 1 { + return unsupported!( + "Query must select from single named \ + table, with no database" + ); + } + let timeseries_name = name.0[0] + .value + .parse() + .map(|n| indexmap::indexset! { n }) + .map_err(|_| OxdbError::InvalidTimeseriesName)?; + // Rewrite the quote style to be backticks, so that the + // resulting actual query translates into a valid identifier for + // ClickHouse, naming the CTE's well generate later. + name.0[0].quote_style = Some('"'); + Ok(timeseries_name) + } + TableFactor::Derived { lateral: false, subquery, .. } => { + RestrictedQuery::process_query(subquery) + } + _ => { + return unsupported!( + "Query must select from concrete tables or subqueries on them" + ) + } + } + } + + // Process a parsed query, returning the named timeseries that it refers to. + // + // This is the entry-point for our query processing implementation. We take + // a parsed query from `sqlparser`, and extract the virtual tables + // (timeseries names) that we'll need to construct in order to actually run + // it against our database. + // + // Note that we return an _ordered set_ of the timeseries names. This is to + // produce the CTEs that correspond to each timeseries, but without + // duplicating the actual CTE. + fn process_query( + query: &mut Query, + ) -> Result, OxdbError> { + // Some basic checks limiting the scope of the query. + if query.with.is_some() + || query.fetch.is_some() + || !query.locks.is_empty() + { + return unsupported!( + "CTEs, FETCH and LOCKS are not currently supported" + ); + } + let SetExpr::Select(select) = &mut *query.body else { + return unsupported!("Only SELECT queries are currently supported"); + }; + + // For each object we're selecting from (a table factor), process that + // directly, and process any JOINs it also contains. + let mut timeseries = IndexSet::with_capacity(select.from.len()); + if select.from.len() > 1 { + return unsupported!( + "Query must select from a single named table, with no database" + ); + } + if let Some(from) = select.from.iter_mut().next() { + timeseries.extend(Self::process_table_factor(&mut from.relation)?); + for join in from.joins.iter_mut() { + let JoinOperator::Inner(op) = &join.join_operator else { + return unsupported!( + "Only INNER JOINs are supported, using \ + explicit constraints" + ); + }; + if matches!(op, JoinConstraint::Natural) { + return unsupported!( + "Only INNER JOINs are supported, using \ + explicit constraints" + ); + } + timeseries + .extend(Self::process_table_factor(&mut join.relation)?); + } + } + Ok(timeseries) + } +} + +static CLICKHOUSE_FUNCTION_ALLOW_LIST: OnceLock> = + OnceLock::new(); + +#[derive(Copy, Clone, Debug, Eq, Ord, PartialEq, PartialOrd)] +pub struct ClickHouseFunction { + pub name: &'static str, + pub usage: &'static str, + pub description: &'static str, +} + +impl std::fmt::Display for ClickHouseFunction { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.name) + } +} + +impl ClickHouseFunction { + fn new(usage: &'static str, description: &'static str) -> Self { + let name = usage.split_once('(').expect("need parentheses").0; + Self { name, usage, description } + } +} + +/// Return the set of supported ClickHouse SQL functions, with a short help +/// string. +pub fn function_allow_list() -> &'static BTreeSet { + CLICKHOUSE_FUNCTION_ALLOW_LIST.get_or_init(|| { + let mut out = BTreeSet::new(); + + // Core functions + out.insert(ClickHouseFunction::new("avg(expr)", "Arithmetic mean")); + out.insert(ClickHouseFunction::new("min(expr)", "Minimum value")); + out.insert(ClickHouseFunction::new("max(expr)", "Maximum value")); + out.insert(ClickHouseFunction::new("sum(expr)", "Sum values")); + out.insert(ClickHouseFunction::new( + "count(expr)", + "Count number of rows", + )); + out.insert(ClickHouseFunction::new( + "now()", + "Return current timestamp", + )); + out.insert(ClickHouseFunction::new( + "first_value(expr)", + "First value in a partition", + )); + out.insert(ClickHouseFunction::new( + "last_value(expr)", + "Last value in a partition", + )); + out.insert(ClickHouseFunction::new( + "any(expr)", + "First non-NULL value", + )); + out.insert(ClickHouseFunction::new( + "topK(k)(expr)", + "Estimate K most frequent values", + )); + out.insert(ClickHouseFunction::new( + "groupArray(expr)", + "Create an array from rows", + )); + out.insert(ClickHouseFunction::new( + "argMin(arg, val)", + "Argument of minimum value", + )); + out.insert(ClickHouseFunction::new( + "argMax(arg, val)", + "Argument of maximum value", + )); + out.insert(ClickHouseFunction::new( + "quantileExact(quantile)(expr)", + "Exact quantile of inputs", + )); + + // To support histograrms, we allow the `-ForEach` combinator functions. + // + // See + // https://clickhouse.com/docs/en/sql-reference/aggregate-functions/combinators#-foreach, + // but briefly, this allows computing the aggregate function across + // corresponding array elements. + out.insert(ClickHouseFunction::new( + "maxForEach(array expr)", + "Max of corresponding array elements", + )); + out.insert(ClickHouseFunction::new( + "minForEach(array expr)", + "Min of corresponding array elements", + )); + out.insert(ClickHouseFunction::new( + "sumForEach(array expr)", + "Sum of corresponding array elements", + )); + out.insert(ClickHouseFunction::new( + "avgForEach(array expr)", + "Mean of corresponding array elements", + )); + + // Type conversions + // + // Note that `cast` itself will be difficult to use, because ClickHouse is + // particular about the capitalization of type names, e.g., it must be + // `cast(x as String)` not `cast(x as STRING)`. + out.insert(ClickHouseFunction::new( + "toString(x)", + "Convert to a string", + )); + out.insert(ClickHouseFunction::new("toInt8(x)", "Convert to an i8")); + out.insert(ClickHouseFunction::new("toUInt8(x)", "Convert to a u8")); + out.insert(ClickHouseFunction::new("toInt16(x)", "Convert to an i16")); + out.insert(ClickHouseFunction::new("toUInt16(x)", "Convert to a u16")); + out.insert(ClickHouseFunction::new("toInt32(x)", "Convert to an i32")); + out.insert(ClickHouseFunction::new("toUInt32(x)", "Convert to a u32")); + out.insert(ClickHouseFunction::new("toInt64(x)", "Convert to an i64")); + out.insert(ClickHouseFunction::new("toUInt64(x)", "Convert to a u64")); + out.insert(ClickHouseFunction::new( + "toFloat32(x)", + "Convert to an f32", + )); + out.insert(ClickHouseFunction::new( + "toFloat64(x)", + "Convert to an f64", + )); + out.insert(ClickHouseFunction::new( + "toDate(x)", + "Convert to a 32-bit date", + )); + out.insert(ClickHouseFunction::new( + "toDateTime(x)", + "Convert to a 32-bit date and time", + )); + out.insert(ClickHouseFunction::new( + "toDateTime64(x)", + "Convert to a 64-bit date and time", + )); + out.insert(ClickHouseFunction::new( + "toIntervalYear(x)", + "Convert to an interval in years", + )); + out.insert(ClickHouseFunction::new( + "toIntervalQuarter(x)", + "Convert to an interval in quarters", + )); + out.insert(ClickHouseFunction::new( + "toIntervalMonth(x)", + "Convert to an interval in months", + )); + out.insert(ClickHouseFunction::new( + "toIntervalWeek(x)", + "Convert to an interval in weeks", + )); + out.insert(ClickHouseFunction::new( + "toIntervalDay(x)", + "Convert to an interval in days", + )); + out.insert(ClickHouseFunction::new( + "toIntervalHour(x)", + "Convert to an interval in hours", + )); + out.insert(ClickHouseFunction::new( + "toIntervalMinute(x)", + "Convert to an interval in minutes", + )); + out.insert(ClickHouseFunction::new( + "toIntervalSecond(x)", + "Convert to an interval in seconds", + )); + + // Array functions + out.insert(ClickHouseFunction::new( + "arrayMax([func,] arr)", + "Maximum in source array", + )); + out.insert(ClickHouseFunction::new( + "arrayMin([func,] arr)", + "Minimum in source array", + )); + out.insert(ClickHouseFunction::new( + "arraySum([func,] arr)", + "Sum of elements in source array", + )); + out.insert(ClickHouseFunction::new( + "arrayAvg([func,] arr)", + "Mean of elements in source array", + )); + out.insert(ClickHouseFunction::new( + "arrayMap(func, arr, ...)", + "Apply function to elements in source array", + )); + out.insert(ClickHouseFunction::new( + "arrayReduce(func, arr, ...)", + "Aggregate elements in source array with a function", + )); + out.insert(ClickHouseFunction::new( + "arrayFilter(func, arr, ...)", + "Apply a lambda to source array", + )); + out.insert(ClickHouseFunction::new( + "arrayDifference(arr)", + "Difference between adjacent elements in source array", + )); + out.insert(ClickHouseFunction::new( + "indexOf(arr, x)", + "Index of `x` in source array, or 0", + )); + out.insert(ClickHouseFunction::new( + "length(arr)", + "Length of source array", + )); + + // Strings + out.insert(ClickHouseFunction::new( + "empty(x)", + "True if array or string is empty", + )); + out.insert(ClickHouseFunction::new( + "lower(x)", + "Convert a string to lowercase", + )); + out.insert(ClickHouseFunction::new( + "upper(x)", + "Convert a string to uppercase", + )); + out.insert(ClickHouseFunction::new( + "reverse(x)", + "Reverse the bytes (not chars) in a string", + )); + out.insert(ClickHouseFunction::new( + "reverseUTF8(x)", + "Reverse the characters in a string", + )); + out.insert(ClickHouseFunction::new( + "concat(s1, s2, ...)", + "Concatenate two or more strings", + )); + out.insert(ClickHouseFunction::new( + "concatWithSeparator(sep, s1, s2, ..)", + "Concatenate two or more strings with a separator", + )); + out.insert(ClickHouseFunction::new( + "substring(s, offset, len)", + "Return a substring", + )); + out.insert(ClickHouseFunction::new( + "endsWith(s, suffix)", + "True if `s` ends with `suffix`", + )); + out.insert(ClickHouseFunction::new( + "startsWith(s, prefix)", + "True if `s` starts with `prefix`", + )); + out.insert(ClickHouseFunction::new( + "splitByChar(sep, s[, limit])", + "Split on a separator, up to `limit` times", + )); + out.insert(ClickHouseFunction::new( + "splitByString(sep, s[, limit])", + "Split by a separating string, up to `limit` times", + )); + + // Time. + out.insert(ClickHouseFunction::new( + "tumble(datetime, interval[, tz])", + "Nonoverlapping time windows of a specified interval", + )); + out.insert(ClickHouseFunction::new( + "toYear(date)", + "Extract year from date", + )); + out.insert(ClickHouseFunction::new( + "toQuarter(date)", + "Extract quarter from date", + )); + out.insert(ClickHouseFunction::new( + "toMonth(date)", + "Extract month from date", + )); + out.insert(ClickHouseFunction::new( + "toDayOfYear(date)", + "Index of day in its year", + )); + out.insert(ClickHouseFunction::new( + "toDayOfMonth(date)", + "Index of day in its month", + )); + out.insert(ClickHouseFunction::new( + "toDayOfWeek(date)", + "Index of day in its week", + )); + out.insert(ClickHouseFunction::new( + "toHour(date)", + "Extract hour from date", + )); + out.insert(ClickHouseFunction::new( + "toMinute(date)", + "Extract minute from date", + )); + out.insert(ClickHouseFunction::new( + "toSecond(date)", + "Extract second from date", + )); + out.insert(ClickHouseFunction::new( + "toUnixTimestamp(date)", + "Convert to UNIX timestamp", + )); + out.insert(ClickHouseFunction::new( + "toStartOfInterval(date, INTERVAL x UNIT[, tz])", + "Convert date to the start of the specified interval", + )); + out.insert(ClickHouseFunction::new( + "date_diff('unit', start, end[, tz])", + "Difference between two dates in the provided unit", + )); + out.insert(ClickHouseFunction::new( + "date_trunc('unit', date[, tz])", + "Truncate a datetime to the provided unit", + )); + out.insert(ClickHouseFunction::new( + "date_add('unit', count, date)", + "Add `count` units to `date`", + )); + out.insert(ClickHouseFunction::new( + "date_sub('unit', count, date)", + "Subtract `count` units from `date`", + )); + + // Other + out.insert(ClickHouseFunction::new( + "generateUUIDv4()", + "Generate a random UUID v4", + )); + out.insert(ClickHouseFunction::new("rand()", "Uniform random u32")); + out.insert(ClickHouseFunction::new("rand64()", "Uniform random u64")); + out.insert(ClickHouseFunction::new( + "runningDifference(arr)", + "Difference between adjacent values", + )); + out.insert(ClickHouseFunction::new( + "formatReadableSize(x)", + "Format a byte count for humans", + )); + out.insert(ClickHouseFunction::new( + "formatReadableTimeDelta(x)", + "Format an interval for humans", + )); + out.insert(ClickHouseFunction::new( + "formatReadableQuantity(x)", + "Format a quantity for humans", + )); + out + }) +} + +#[cfg(test)] +mod tests { + use super::Error; + use super::OxdbError; + use super::RestrictedQuery; + use super::SafeSql; + + #[test] + fn test_function_allow_list() { + assert!(RestrictedQuery::new("SELECT bogus()").is_err()); + assert!(matches!( + RestrictedQuery::new("SELECT bogus()").unwrap_err(), + OxdbError::Sql(Error::UnsupportedFunction { .. }) + )); + assert!(RestrictedQuery::new("SELECT now()").is_ok()); + } + + #[test] + fn test_ctes_are_not_supported() { + assert!(matches!( + RestrictedQuery::new("WITH nono AS (SELECT 1) SELECT * FROM NONO") + .unwrap_err(), + OxdbError::Sql(Error::UnsupportedSql(_)) + )); + } + + #[test] + fn test_multiple_statements_are_not_supported() { + assert!(matches!( + RestrictedQuery::new("SELECT 1; SELECT 2;").unwrap_err(), + OxdbError::Sql(Error::UnsupportedSql(_)) + )); + } + + #[test] + fn test_query_must_be_select_statement() { + for query in [ + "SHOW TABLES", + "DROP TABLE foo", + "CREATE TABLE foo (x Int4)", + "DESCRIBE foo", + "EXPLAIN SELECT 1", + "INSERT INTO foo VALUES (1)", + ] { + let err = RestrictedQuery::new(query).unwrap_err(); + println!("{err:?}"); + assert!(matches!(err, OxdbError::Sql(Error::UnsupportedSql(_)))); + } + } + + #[test] + fn test_cannot_name_database() { + let err = RestrictedQuery::new("SELECT * FROM dbname.a:a").unwrap_err(); + assert!(matches!(err, OxdbError::Sql(Error::UnsupportedSql(_)))); + } + + #[test] + fn test_with_comma_join_fails() { + let err = RestrictedQuery::new("SELECT * FROM a:a, b:b").unwrap_err(); + println!("{err:?}"); + assert!(matches!(err, OxdbError::Sql(Error::UnsupportedSql(_)))); + } + + #[test] + fn test_join_must_be_inner() { + let allowed = ["inner", ""]; + let denied = + ["natural", "cross", "left outer", "right outer", "full outer"]; + for join in allowed.iter() { + RestrictedQuery::new(format!("SELECT * FROM a:a {join} JOIN b:b")) + .unwrap_or_else(|_| { + panic!("Should be able to use join type '{join}'") + }); + } + for join in denied.iter() { + let sql = format!("SELECT * FROM a:a {join} JOIN b:b"); + println!("{sql}"); + let err = RestrictedQuery::new(&sql).expect_err( + format!("Should not be able to use join type '{join}'") + .as_str(), + ); + println!("{err:?}"); + assert!(matches!(err, OxdbError::Sql(Error::UnsupportedSql(_)))); + } + } + + #[test] + fn test_allow_limit_offset() { + let sql = "SELECT * FROM a:b LIMIT 10 OFFSET 10;"; + println!("{sql}"); + RestrictedQuery::new(&sql) + .expect("Should be able to use LIMIT / OFFSET queries"); + } + + #[test] + fn test_require_table_is_timeseries_name() { + assert!(RestrictedQuery::new("SELECT * FROM a:b").is_ok()); + let bad = ["table", "db.table", "no:no:no"]; + for each in bad.iter() { + let sql = format!("SELECT * FROM {each}"); + RestrictedQuery::new(&sql) + .expect_err("Should have validated timeseries name"); + } + } + + #[test] + fn test_allow_subqueries() { + assert!(RestrictedQuery::new("SELECT * FROM (SELECT 1);").is_ok()); + } + + #[test] + fn test_query_with_multiple_timeseries_generates_one_cte() { + let query = "SELECT * FROM a:b JOIN a:b USING (timeseries_key);"; + let res = RestrictedQuery::new(&query).unwrap(); + assert_eq!(res.timeseries.len(), 1); + } + + #[test] + fn test_safe_sql_does_not_modify_original_alias() { + let query = "SELECT * FROM a:b AS ASOF JOIN a:b"; + let query_with_quotes = "SELECT * FROM \"a:b\" AS ASOF JOIN \"a:b\""; + let safe = SafeSql::new(query); + let rewritten = safe.safe_sql(); + println!("{query}"); + println!("{query_with_quotes}"); + println!("{rewritten}"); + + // Check that we've written out the same query words, ignoring + // whitespace. + let words = query_with_quotes + .split_ascii_whitespace() + .rev() + .collect::>(); + let rewritten_words = rewritten + .split_ascii_whitespace() + .rev() + .take(words.len()) + .collect::>(); + assert_eq!(words, rewritten_words); + } +} diff --git a/oximeter/db/test-output/sql/00/query.sql b/oximeter/db/test-output/sql/00/query.sql new file mode 100644 index 00000000000..e0ac49d1ecf --- /dev/null +++ b/oximeter/db/test-output/sql/00/query.sql @@ -0,0 +1 @@ +SELECT 1; diff --git a/oximeter/db/test-output/sql/00/result.txt b/oximeter/db/test-output/sql/00/result.txt new file mode 100644 index 00000000000..925e298e866 --- /dev/null +++ b/oximeter/db/test-output/sql/00/result.txt @@ -0,0 +1,10 @@ +{ + "column_names": [ + "1" + ], + "rows": [ + [ + 1 + ] + ] +} \ No newline at end of file diff --git a/oximeter/db/test-output/sql/01/query.sql b/oximeter/db/test-output/sql/01/query.sql new file mode 100644 index 00000000000..f3e5549e7f8 --- /dev/null +++ b/oximeter/db/test-output/sql/01/query.sql @@ -0,0 +1 @@ +SELECT 1 + 1 AS total; diff --git a/oximeter/db/test-output/sql/01/result.txt b/oximeter/db/test-output/sql/01/result.txt new file mode 100644 index 00000000000..ee17f9993e6 --- /dev/null +++ b/oximeter/db/test-output/sql/01/result.txt @@ -0,0 +1,10 @@ +{ + "column_names": [ + "total" + ], + "rows": [ + [ + 2 + ] + ] +} \ No newline at end of file diff --git a/oximeter/db/test-output/sql/02/query.sql b/oximeter/db/test-output/sql/02/query.sql new file mode 100644 index 00000000000..cd16a883aa3 --- /dev/null +++ b/oximeter/db/test-output/sql/02/query.sql @@ -0,0 +1 @@ +SELECT count() FROM service:request_latency WHERE route = '/a'; diff --git a/oximeter/db/test-output/sql/02/result.txt b/oximeter/db/test-output/sql/02/result.txt new file mode 100644 index 00000000000..7bae246ae86 --- /dev/null +++ b/oximeter/db/test-output/sql/02/result.txt @@ -0,0 +1,10 @@ +{ + "column_names": [ + "count()" + ], + "rows": [ + [ + 12 + ] + ] +} \ No newline at end of file diff --git a/oximeter/db/test-output/sql/03/query.sql b/oximeter/db/test-output/sql/03/query.sql new file mode 100644 index 00000000000..9d043eda5b4 --- /dev/null +++ b/oximeter/db/test-output/sql/03/query.sql @@ -0,0 +1,4 @@ +SELECT + count() AS total +FROM service:request_latency +GROUP BY name, id, route, method, status_code; diff --git a/oximeter/db/test-output/sql/03/result.txt b/oximeter/db/test-output/sql/03/result.txt new file mode 100644 index 00000000000..246b8a224ee --- /dev/null +++ b/oximeter/db/test-output/sql/03/result.txt @@ -0,0 +1,43 @@ +{ + "column_names": [ + "total" + ], + "rows": [ + [ + 2 + ], + [ + 2 + ], + [ + 2 + ], + [ + 2 + ], + [ + 2 + ], + [ + 2 + ], + [ + 2 + ], + [ + 2 + ], + [ + 2 + ], + [ + 2 + ], + [ + 2 + ], + [ + 2 + ] + ] +} \ No newline at end of file diff --git a/oximeter/db/test-output/sql/04/query.sql b/oximeter/db/test-output/sql/04/query.sql new file mode 100644 index 00000000000..a276475a0dc --- /dev/null +++ b/oximeter/db/test-output/sql/04/query.sql @@ -0,0 +1,5 @@ +SELECT + timeseries_key, + count() AS total +FROM service:request_latency +GROUP BY timeseries_key; diff --git a/oximeter/db/test-output/sql/04/result.txt b/oximeter/db/test-output/sql/04/result.txt new file mode 100644 index 00000000000..4eca1fd93df --- /dev/null +++ b/oximeter/db/test-output/sql/04/result.txt @@ -0,0 +1,56 @@ +{ + "column_names": [ + "timeseries_key", + "total" + ], + "rows": [ + [ + 1249464505628069370, + 2 + ], + [ + 1201872630192423018, + 2 + ], + [ + 1490072383288995413, + 2 + ], + [ + 4845785484328932020, + 2 + ], + [ + 16162802647654680800, + 2 + ], + [ + 9308844330114997943, + 2 + ], + [ + 5233273748839477731, + 2 + ], + [ + 12759963114254845848, + 2 + ], + [ + 8677807063017961056, + 2 + ], + [ + 17069599562714970297, + 2 + ], + [ + 1477351355909737762, + 2 + ], + [ + 16473879070749258520, + 2 + ] + ] +} \ No newline at end of file diff --git a/oximeter/oximeter/src/types.rs b/oximeter/oximeter/src/types.rs index 3d74bec72c0..eff5c399e3e 100644 --- a/oximeter/oximeter/src/types.rs +++ b/oximeter/oximeter/src/types.rs @@ -330,6 +330,11 @@ impl DatumType { | DatumType::HistogramF64 ) } + + /// Return `true` if this datum type is a histogram, and `false` otherwise. + pub const fn is_histogram(&self) -> bool { + matches!(self, DatumType::HistogramF64 | DatumType::HistogramI64) + } } impl std::fmt::Display for DatumType { diff --git a/wicket-dbg/Cargo.toml b/wicket-dbg/Cargo.toml index f9047297af1..f42ed335c8a 100644 --- a/wicket-dbg/Cargo.toml +++ b/wicket-dbg/Cargo.toml @@ -20,7 +20,7 @@ tokio = { workspace = true, features = ["full"] } wicket.workspace = true # used only by wicket-dbg binary -reedline = "0.26.0" +reedline.workspace = true omicron-workspace-hack.workspace = true [[bin]] diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 1d14b26a69c..00be70aef50 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -32,7 +32,6 @@ console = { version = "0.15.7" } const-oid = { version = "0.9.5", default-features = false, features = ["db", "std"] } crossbeam-epoch = { version = "0.9.15" } crossbeam-utils = { version = "0.8.16" } -crossterm = { version = "0.27.0", features = ["event-stream", "serde"] } crypto-common = { version = "0.1.6", default-features = false, features = ["getrandom", "std"] } der = { version = "0.7.8", default-features = false, features = ["derive", "flagset", "oid", "pem", "std"] } diesel = { version = "2.1.4", features = ["chrono", "i-implement-a-third-party-backend-and-opt-into-breaking-changes", "network-address", "postgres", "r2d2", "serde_json", "uuid"] } @@ -66,6 +65,7 @@ libc = { version = "0.2.150", features = ["extra_traits"] } log = { version = "0.4.20", default-features = false, features = ["std"] } managed = { version = "0.8.0", default-features = false, features = ["alloc", "map"] } memchr = { version = "2.6.3" } +nom = { version = "7.1.3" } num-bigint = { version = "0.4.4", features = ["rand"] } num-integer = { version = "0.1.45", features = ["i128"] } num-iter = { version = "0.1.43", default-features = false, features = ["i128"] } @@ -133,7 +133,6 @@ console = { version = "0.15.7" } const-oid = { version = "0.9.5", default-features = false, features = ["db", "std"] } crossbeam-epoch = { version = "0.9.15" } crossbeam-utils = { version = "0.8.16" } -crossterm = { version = "0.27.0", features = ["event-stream", "serde"] } crypto-common = { version = "0.1.6", default-features = false, features = ["getrandom", "std"] } der = { version = "0.7.8", default-features = false, features = ["derive", "flagset", "oid", "pem", "std"] } diesel = { version = "2.1.4", features = ["chrono", "i-implement-a-third-party-backend-and-opt-into-breaking-changes", "network-address", "postgres", "r2d2", "serde_json", "uuid"] } @@ -167,6 +166,7 @@ libc = { version = "0.2.150", features = ["extra_traits"] } log = { version = "0.4.20", default-features = false, features = ["std"] } managed = { version = "0.8.0", default-features = false, features = ["alloc", "map"] } memchr = { version = "2.6.3" } +nom = { version = "7.1.3" } num-bigint = { version = "0.4.4", features = ["rand"] } num-integer = { version = "0.1.45", features = ["i128"] } num-iter = { version = "0.1.43", default-features = false, features = ["i128"] }