From e6e052dc496e02501c3d97e5413f4cece9c454dc Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Mon, 1 Jul 2024 13:29:38 +0000 Subject: [PATCH] Move `oxql` from `oxdb` to mainline `omdb` Why?: Concerning [network observability work](https://github.com/orgs/oxidecomputer/projects/55/views/1?filterQuery=&pane=issue&itemId=68336554), this makes the [`oxql`](https://rfd.shared.oxide.computer/rfd/0463) interactive query repl accessible via omdb, as we start to give users and ourselves the ability to query timeseries and metrics more easily. Additionally, in the "now", this aids in debugging through our metrics set and makes it available, via omdb, throughout our ecosystem/a4x2. Includes: * Moves `oxql_shell` into the oximeter_db lib for use by both omdb and oxdb. * If no URL is given to `omdb oxql`, it will leverage internal DNS. * Update the oximeter omdb call (for listing producers) to leverage internal. DNS if no URL is given. * Update command/output tests/generations and collector specific tests for list producers. Notes: * The oxql client still expects an socket address as liked it typed specifically v.s. a String. Instead, upon running the `omdb oxql` command, we take in a URL String and parse it into the socket address directly. --- .gitignore | 1 + Cargo.lock | 2 + dev-tools/omdb/Cargo.toml | 2 + dev-tools/omdb/src/bin/omdb/main.rs | 6 +- dev-tools/omdb/src/bin/omdb/oximeter.rs | 47 ++- dev-tools/omdb/src/bin/omdb/oxql.rs | 92 +++++ dev-tools/omdb/tests/env.out | 25 ++ dev-tools/omdb/tests/successes.out | 24 +- dev-tools/omdb/tests/test_all_output.rs | 137 +++++++- dev-tools/omdb/tests/usage_errors.out | 65 ++++ nexus-config/src/nexus_config.rs | 10 +- oximeter/collector/src/lib.rs | 5 + oximeter/db/Cargo.toml | 36 +- oximeter/db/src/bin/oxdb/main.rs | 11 +- oximeter/db/src/bin/oxdb/oxql.rs | 351 +------------------ oximeter/db/src/bin/oxdb/sql.rs | 10 +- oximeter/db/src/client/mod.rs | 1 - oximeter/db/src/lib.rs | 4 - oximeter/db/src/oxql/ast/table_ops/filter.rs | 2 +- oximeter/db/src/oxql/mod.rs | 272 ++++++++++++++ oximeter/db/src/oxql/query/mod.rs | 57 --- oximeter/impl/src/schema/mod.rs | 120 +++++++ test-utils/src/dev/test_cmds.rs | 11 +- 23 files changed, 793 insertions(+), 498 deletions(-) create mode 100644 dev-tools/omdb/src/bin/omdb/oxql.rs diff --git a/.gitignore b/.gitignore index 39a8f361446..6e4e0eb42a1 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ tags .falcon/* .img/* connectivity-report.json +*.local diff --git a/Cargo.lock b/Cargo.lock index d79933cd87f..11bff75a02f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5638,6 +5638,7 @@ dependencies = [ "omicron-uuid-kinds", "omicron-workspace-hack", "oximeter-client", + "oximeter-db", "pq-sys", "ratatui", "reedline", @@ -5652,6 +5653,7 @@ dependencies = [ "textwrap", "tokio", "unicode-width", + "url", "uuid", ] diff --git a/dev-tools/omdb/Cargo.toml b/dev-tools/omdb/Cargo.toml index 9cdf03093c8..ddfdab6caa5 100644 --- a/dev-tools/omdb/Cargo.toml +++ b/dev-tools/omdb/Cargo.toml @@ -37,6 +37,7 @@ nexus-types.workspace = true omicron-common.workspace = true omicron-uuid-kinds.workspace = true oximeter-client.workspace = true +oximeter-db.workspace = true # See omicron-rpaths for more about the "pq-sys" dependency. pq-sys = "*" ratatui.workspace = true @@ -51,6 +52,7 @@ tabled.workspace = true textwrap.workspace = true tokio = { workspace = true, features = [ "full" ] } unicode-width.workspace = true +url.workspace = true uuid.workspace = true ipnetwork.workspace = true omicron-workspace-hack.workspace = true diff --git a/dev-tools/omdb/src/bin/omdb/main.rs b/dev-tools/omdb/src/bin/omdb/main.rs index 7469e2ba54f..8fc48f50283 100644 --- a/dev-tools/omdb/src/bin/omdb/main.rs +++ b/dev-tools/omdb/src/bin/omdb/main.rs @@ -50,6 +50,7 @@ mod helpers; mod mgs; mod nexus; mod oximeter; +mod oxql; mod sled_agent; #[tokio::main] @@ -66,7 +67,8 @@ async fn main() -> Result<(), anyhow::Error> { OmdbCommands::Db(db) => db.run_cmd(&args, &log).await, OmdbCommands::Mgs(mgs) => mgs.run_cmd(&args, &log).await, OmdbCommands::Nexus(nexus) => nexus.run_cmd(&args, &log).await, - OmdbCommands::Oximeter(oximeter) => oximeter.run_cmd(&log).await, + OmdbCommands::Oximeter(oximeter) => oximeter.run_cmd(&args, &log).await, + OmdbCommands::Oxql(oxql) => oxql.run_cmd(&args, &log).await, OmdbCommands::SledAgent(sled) => sled.run_cmd(&args, &log).await, OmdbCommands::CrucibleAgent(crucible) => crucible.run_cmd(&args).await, } @@ -269,6 +271,8 @@ enum OmdbCommands { Nexus(nexus::NexusArgs), /// Query oximeter collector state Oximeter(oximeter::OximeterArgs), + /// Enter the Oximeter Query Language shell for interactive querying. + Oxql(oxql::OxqlArgs), /// Debug a specific Sled SledAgent(sled_agent::SledAgentArgs), } diff --git a/dev-tools/omdb/src/bin/omdb/oximeter.rs b/dev-tools/omdb/src/bin/omdb/oximeter.rs index a6dc2ce0115..02bc36d2d73 100644 --- a/dev-tools/omdb/src/bin/omdb/oximeter.rs +++ b/dev-tools/omdb/src/bin/omdb/oximeter.rs @@ -5,6 +5,7 @@ //! omdb commands that query oximeter use crate::helpers::CONNECTION_OPTIONS_HEADING; +use crate::Omdb; use anyhow::Context; use clap::Args; use clap::Subcommand; @@ -24,12 +25,10 @@ pub struct OximeterArgs { #[arg( long, env = "OMDB_OXIMETER_URL", - // This can't be global = true (i.e. passed in later in the - // command-line) because global options can't be required. If this - // changes to being optional, we should set global = true. + global = true, help_heading = CONNECTION_OPTIONS_HEADING, )] - oximeter_url: String, + oximeter_url: Option, #[command(subcommand)] command: OximeterCommands, @@ -38,20 +37,46 @@ pub struct OximeterArgs { /// Subcommands that query oximeter collector state #[derive(Debug, Subcommand)] enum OximeterCommands { - /// List the producers the collector is assigned to poll + /// List the producers the collector is assigned to poll. ListProducers, } impl OximeterArgs { - fn client(&self, log: &Logger) -> Client { - Client::new( - &self.oximeter_url, + async fn client( + &self, + omdb: &Omdb, + log: &Logger, + ) -> Result { + let oximeter_url = match &self.oximeter_url { + Some(cli_or_env_url) => cli_or_env_url.clone(), + None => { + eprintln!( + "note: Oximeter URL not specified. Will pick one from DNS." + ); + let addr = omdb + .dns_lookup_one( + log.clone(), + internal_dns::ServiceName::Oximeter, + ) + .await?; + format!("http://{}", addr) + } + }; + eprintln!("note: using Oximeter URL {}", &oximeter_url); + + let client = Client::new( + &oximeter_url, log.new(slog::o!("component" => "oximeter-client")), - ) + ); + Ok(client) } - pub async fn run_cmd(&self, log: &Logger) -> anyhow::Result<()> { - let client = self.client(log); + pub async fn run_cmd( + &self, + omdb: &Omdb, + log: &Logger, + ) -> anyhow::Result<()> { + let client = self.client(omdb, log).await?; match self.command { OximeterCommands::ListProducers => { self.list_producers(client).await diff --git a/dev-tools/omdb/src/bin/omdb/oxql.rs b/dev-tools/omdb/src/bin/omdb/oxql.rs new file mode 100644 index 00000000000..66382c0bffe --- /dev/null +++ b/dev-tools/omdb/src/bin/omdb/oxql.rs @@ -0,0 +1,92 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! omdb OxQL shell for interactive queries on metrics/timeseries. + +// Copyright 2024 Oxide Computer + +use crate::helpers::CONNECTION_OPTIONS_HEADING; +use crate::Omdb; +use anyhow::Context; +use clap::Args; +use oximeter_db::{self, Client, DbWrite}; +use slog::Logger; +use std::net::SocketAddr; +use url::Url; + +/// Command-line arguments for the OxQL shell. +#[derive(Debug, Args)] +pub struct OxqlArgs { + /// URL of the metrics database. + #[arg( + long, + env = "OMDB_METRICS_DB_URL", + global = true, + help_heading = CONNECTION_OPTIONS_HEADING, + )] + metrics_db_url: Option, + + /// Print summaries of each SQL query run against the database. + #[clap(long = "summaries")] + print_summaries: bool, + + /// Print the total elapsed query duration. + #[clap(long = "elapsed")] + print_elapsed: bool, +} + +impl OxqlArgs { + async fn client( + &self, + omdb: &Omdb, + log: &Logger, + ) -> Result { + let socket_addr = match &self.metrics_db_url { + Some(cli_or_env_url) => Url::parse(&cli_or_env_url) + .context("Failed to parse metrics DB URL")? + .socket_addrs(|| None) + .context("Failed to resolve metrics DB URL")? + .drain(..) + .next() + .context("Failed to resolve metrics DB URL")?, + _ => { + eprintln!( + "note: Metrics DB address/port not specified. Will pick one from DNS." + ); + SocketAddr::V6( + omdb.dns_lookup_one( + log.clone(), + internal_dns::ServiceName::Clickhouse, + ) + .await?, + ) + } + }; + eprintln!("note: using Metrics DB socket address: {}", &socket_addr); + + let client = Client::new(socket_addr, log); + + client + .init_single_node_db() + .await + .context("Failed to initialize timeseries database")?; + Ok(client) + } + + /// Run the OxQL shell via the `omdb oxql` subcommand. + pub async fn run_cmd( + &self, + omdb: &Omdb, + log: &Logger, + ) -> anyhow::Result<()> { + let client = self.client(omdb, log).await?; + oximeter_db::oxql::oxql_shell( + client, + self.print_summaries, + self.print_elapsed, + ) + .await?; + Ok(()) + } +} diff --git a/dev-tools/omdb/tests/env.out b/dev-tools/omdb/tests/env.out index 252313e6c86..a5687c9e90a 100644 --- a/dev-tools/omdb/tests/env.out +++ b/dev-tools/omdb/tests/env.out @@ -421,3 +421,28 @@ note: using database URL postgresql://root@[::1]:REDACTED_PORT/omicron?sslmode=d note: database schema version matches expected () note: listing all commissioned sleds (use -F to filter, e.g. -F in-service) ============================================= +EXECUTING COMMAND: omdb ["oximeter", "--oximeter-url", "junk", "list-producers"] +termination: Exited(1) +--------------------------------------------- +stdout: +--------------------------------------------- +stderr: +note: using Oximeter URL junk +Error: failed to fetch collector info + +Caused by: + 0: Communication Error: builder error: relative URL without a base + 1: builder error: relative URL without a base + 2: relative URL without a base +============================================= +EXECUTING COMMAND: omdb ["oxql", "--metrics-db-url", "junk"] +termination: Exited(1) +--------------------------------------------- +stdout: +--------------------------------------------- +stderr: +Error: Failed to parse metrics DB URL + +Caused by: + relative URL without a base +============================================= diff --git a/dev-tools/omdb/tests/successes.out b/dev-tools/omdb/tests/successes.out index 47d091443db..1dcee314d3d 100644 --- a/dev-tools/omdb/tests/successes.out +++ b/dev-tools/omdb/tests/successes.out @@ -401,14 +401,14 @@ task: "dns_propagation_external" task: "nat_v4_garbage_collector" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms last completion reported error: failed to resolve addresses for Dendrite services: no record found for Query { name: Name("_dendrite._tcp.control-plane.oxide.internal."), query_type: SRV, query_class: IN } task: "blueprint_loader" - configured period: every 1m 40s + configured period: every 1m s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -432,7 +432,7 @@ task: "abandoned_vmm_reaper" sled resource reservations deleted: 0 task: "bfd_manager" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -463,7 +463,7 @@ task: "external_endpoints" TLS certificates: 0 task: "instance_watcher" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -491,7 +491,7 @@ task: "metrics_producer_gc" warning: unknown background task: "metrics_producer_gc" (don't know how to interpret details: Object {"expiration": String(""), "pruned": Array []}) task: "phantom_disks" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -499,14 +499,14 @@ task: "phantom_disks" number of phantom disk delete errors: 0 task: "physical_disk_adoption" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a dependent task completing started at (s ago) and ran for ms last completion reported error: task disabled task: "region_replacement" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -514,7 +514,7 @@ task: "region_replacement" number of region replacement start errors: 0 task: "region_replacement_driver" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -529,28 +529,28 @@ task: "service_firewall_rule_propagation" started at (s ago) and ran for ms task: "service_zone_nat_tracker" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms last completion reported error: inventory collection is None task: "switch_port_config_manager" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms warning: unknown background task: "switch_port_config_manager" (don't know how to interpret details: Object {}) task: "v2p_manager" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms warning: unknown background task: "v2p_manager" (don't know how to interpret details: Object {}) task: "vpc_route_manager" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms diff --git a/dev-tools/omdb/tests/test_all_output.rs b/dev-tools/omdb/tests/test_all_output.rs index 19be33631db..72af9e9bc25 100644 --- a/dev-tools/omdb/tests/test_all_output.rs +++ b/dev-tools/omdb/tests/test_all_output.rs @@ -8,6 +8,7 @@ //! sure you're only breaking what you intend. use expectorate::assert_contents; +use nexus_test_utils::{OXIMETER_UUID, PRODUCER_UUID}; use nexus_test_utils_macros::nexus_test; use nexus_types::deployment::SledFilter; use nexus_types::deployment::UnstableReconfiguratorState; @@ -26,6 +27,14 @@ const CMD_OMDB: &str = env!("CARGO_BIN_EXE_omdb"); type ControlPlaneTestContext = nexus_test_utils::ControlPlaneTestContext; +fn assert_oximeter_list_producers_output(output: &str, ox_url: &str) { + assert!( + output.contains(format!("Collector ID: {}", OXIMETER_UUID).as_str()) + ); + assert!(output.contains(PRODUCER_UUID)); + assert!(output.contains(ox_url)); +} + #[tokio::test] async fn test_omdb_usage_errors() { let cmd_path = path_to_executable(CMD_OMDB); @@ -57,6 +66,10 @@ async fn test_omdb_usage_errors() { &["sled-agent"], &["sled-agent", "zones"], &["sled-agent", "zpools"], + &["oximeter", "--help"], + &["oxql", "--help"], + // Mispelled argument + &["oxql", "--summarizes"], ]; for args in invocations { @@ -74,10 +87,14 @@ async fn test_omdb_success_cases(cptestctx: &ControlPlaneTestContext) { ) .await; let cmd_path = path_to_executable(CMD_OMDB); + let postgres_url = cptestctx.database.listen_url(); let nexus_internal_url = format!("http://{}/", cptestctx.internal_client.bind_address); let mgs_url = format!("http://{}/", gwtestctx.client.bind_address); + let ox_url = format!("http://{}/", cptestctx.oximeter.server_address()); + let ch_url = format!("http://{}/", cptestctx.clickhouse.address); + let tmpdir = camino_tempfile::tempdir() .expect("failed to create temporary directory"); let tmppath = tmpdir.path().join("reconfigurator-save.out"); @@ -124,18 +141,24 @@ async fn test_omdb_success_cases(cptestctx: &ControlPlaneTestContext) { let p = postgres_url.to_string(); let u = nexus_internal_url.clone(); let g = mgs_url.clone(); + let ox = ox_url.clone(); + let ch = ch_url.clone(); do_run_extra( &mut output, move |exec| { exec.env("OMDB_DB_URL", &p) .env("OMDB_NEXUS_URL", &u) .env("OMDB_MGS_URL", &g) + .env("OMDB_OXIMETER_URL", &ox) + .env("OMDB_METRICS_DB_URL", &ch) }, &cmd_path, args, - ExtraRedactions::new() - .variable_length("tmp_path", tmppath.as_str()) - .fixed_length("blueprint_id", &initial_blueprint_id), + Some( + ExtraRedactions::new() + .variable_length("tmp_path", tmppath.as_str()) + .fixed_length("blueprint_id", &initial_blueprint_id), + ), ) .await; } @@ -170,6 +193,23 @@ async fn test_omdb_success_cases(cptestctx: &ControlPlaneTestContext) { .is_some()); assert!(!parsed.collections.is_empty()); + // The `oximeter` list-producers command output is not easy to compare as a + // string directly because the timing of registrations with both our test + // producer and the one nexus registers. But, let's find our test producer + // in the list. + let ox_invocation = &["oximeter", "list-producers"]; + let mut ox_output = String::new(); + let ox = ox_url.clone(); + + do_run_no_redactions( + &mut ox_output, + move |exec| exec.env("OMDB_OXIMETER_URL", &ox), + &cmd_path, + ox_invocation, + ) + .await; + assert_oximeter_list_producers_output(&ox_output, &ox_url); + gwtestctx.teardown().await; } @@ -188,6 +228,8 @@ async fn test_omdb_env_settings(cptestctx: &ControlPlaneTestContext) { let postgres_url = cptestctx.database.listen_url().to_string(); let nexus_internal_url = format!("http://{}", cptestctx.internal_client.bind_address); + let ox_url = format!("http://{}/", cptestctx.oximeter.server_address()); + let ch_url = format!("http://{}/", cptestctx.clickhouse.address); let dns_sockaddr = cptestctx.internal_dns.dns_server.local_address(); let mut output = String::new(); @@ -263,7 +305,47 @@ async fn test_omdb_env_settings(cptestctx: &ControlPlaneTestContext) { let args = &["--dns-server", &dns_sockaddr.to_string(), "db", "sleds"]; do_run(&mut output, move |exec| exec, &cmd_path, args).await; + // Case: specified in multiple places (command-line argument wins) + let args = &["oximeter", "--oximeter-url", "junk", "list-producers"]; + let ox = ox_url.clone(); + do_run( + &mut output, + move |exec| exec.env("OMDB_OXIMETER_URL", &ox), + &cmd_path, + args, + ) + .await; + + // Case: specified in multiple places (command-line argument wins) + let args = &["oxql", "--metrics-db-url", "junk"]; + do_run( + &mut output, + move |exec| exec.env("OMDB_METRICS_DB_URL", &ch_url), + &cmd_path, + args, + ) + .await; + assert_contents("tests/env.out", &output); + + // The `oximeter` list-producers command output is not easy to compare as a + // string directly because the timing of registrations with both our test + // producer and the one nexus registers. But, let's find our test producer + // in the list. + + // Oximeter URL + // Case 1: specified on the command line. + // Case 2: is covered by the success tests above. + let ox_args1 = &["oximeter", "--oximeter-url", &ox_url, "list-producers"]; + let mut ox_output1 = String::new(); + do_run_no_redactions( + &mut ox_output1, + move |exec| exec, + &cmd_path, + ox_args1, + ) + .await; + assert_oximeter_list_producers_output(&ox_output1, &ox_url); } async fn do_run( @@ -274,8 +356,25 @@ async fn do_run( ) where F: FnOnce(Exec) -> Exec + Send + 'static, { - do_run_extra(output, modexec, cmd_path, args, &ExtraRedactions::new()) - .await; + do_run_extra( + output, + modexec, + cmd_path, + args, + Some(&ExtraRedactions::new()), + ) + .await; +} + +async fn do_run_no_redactions( + output: &mut String, + modexec: F, + cmd_path: &Path, + args: &[&str], +) where + F: FnOnce(Exec) -> Exec + Send + 'static, +{ + do_run_extra(output, modexec, cmd_path, args, None).await; } async fn do_run_extra( @@ -283,18 +382,22 @@ async fn do_run_extra( modexec: F, cmd_path: &Path, args: &[&str], - extra_redactions: &ExtraRedactions<'_>, + extra_redactions: Option<&ExtraRedactions<'_>>, ) where F: FnOnce(Exec) -> Exec + Send + 'static, { - println!("running command with args: {:?}", args); write!( output, "EXECUTING COMMAND: {} {:?}\n", cmd_path.file_name().expect("missing command").to_string_lossy(), args.iter() - .map(|r| redact_extra(r, extra_redactions)) - .collect::>(), + .map(|r| { + extra_redactions.map_or_else( + || r.to_string(), + |redactions| redact_extra(r, redactions), + ) + }) + .collect::>() ) .unwrap(); @@ -326,9 +429,21 @@ async fn do_run_extra( write!(output, "termination: {:?}\n", exit_status).unwrap(); write!(output, "---------------------------------------------\n").unwrap(); write!(output, "stdout:\n").unwrap(); - output.push_str(&redact_extra(&stdout_text, extra_redactions)); + + if let Some(extra_redactions) = extra_redactions { + output.push_str(&redact_extra(&stdout_text, extra_redactions)); + } else { + output.push_str(&stdout_text); + } + write!(output, "---------------------------------------------\n").unwrap(); write!(output, "stderr:\n").unwrap(); - output.push_str(&redact_extra(&stderr_text, extra_redactions)); + + if let Some(extra_redactions) = extra_redactions { + output.push_str(&redact_extra(&stderr_text, extra_redactions)); + } else { + output.push_str(&stderr_text); + } + write!(output, "=============================================\n").unwrap(); } diff --git a/dev-tools/omdb/tests/usage_errors.out b/dev-tools/omdb/tests/usage_errors.out index 9524d217c95..12566aaa3f5 100644 --- a/dev-tools/omdb/tests/usage_errors.out +++ b/dev-tools/omdb/tests/usage_errors.out @@ -14,6 +14,7 @@ Commands: mgs Debug a specific Management Gateway Service instance nexus Debug a specific Nexus instance oximeter Query oximeter collector state + oxql Enter the Oximeter Query Language shell for interactive querying sled-agent Debug a specific Sled help Print this message or the help of the given subcommand(s) @@ -44,6 +45,7 @@ Commands: mgs Debug a specific Management Gateway Service instance nexus Debug a specific Nexus instance oximeter Query oximeter collector state + oxql Enter the Oximeter Query Language shell for interactive querying sled-agent Debug a specific Sled help Print this message or the help of the given subcommand(s) @@ -613,3 +615,66 @@ Connection Options: Safety Options: -w, --destructive Allow potentially-destructive subcommands ============================================= +EXECUTING COMMAND: omdb ["oximeter", "--help"] +termination: Exited(0) +--------------------------------------------- +stdout: +Query oximeter collector state + +Usage: omdb oximeter [OPTIONS] + +Commands: + list-producers List the producers the collector is assigned to poll + help Print this message or the help of the given subcommand(s) + +Options: + --log-level log level filter [env: LOG_LEVEL=] [default: warn] + -h, --help Print help + +Connection Options: + --oximeter-url URL of the oximeter collector to query [env: + OMDB_OXIMETER_URL=] + --dns-server [env: OMDB_DNS_SERVER=] + +Safety Options: + -w, --destructive Allow potentially-destructive subcommands +--------------------------------------------- +stderr: +============================================= +EXECUTING COMMAND: omdb ["oxql", "--help"] +termination: Exited(0) +--------------------------------------------- +stdout: +Enter the Oximeter Query Language shell for interactive querying + +Usage: omdb oxql [OPTIONS] + +Options: + --log-level log level filter [env: LOG_LEVEL=] [default: warn] + --summaries Print summaries of each SQL query run against the database + --elapsed Print the total elapsed query duration + -h, --help Print help + +Connection Options: + --metrics-db-url URL of the metrics database [env: OMDB_METRICS_DB_URL=] + --dns-server [env: OMDB_DNS_SERVER=] + +Safety Options: + -w, --destructive Allow potentially-destructive subcommands +--------------------------------------------- +stderr: +============================================= +EXECUTING COMMAND: omdb ["oxql", "--summarizes"] +termination: Exited(2) +--------------------------------------------- +stdout: +--------------------------------------------- +stderr: +error: unexpected argument '--summarizes' found + + tip: a similar argument exists: '--summaries' + +Usage: omdb oxql <--metrics-db-url |--summaries|--elapsed> + +For more information, try '--help'. +============================================= diff --git a/nexus-config/src/nexus_config.rs b/nexus-config/src/nexus_config.rs index a8c863298e3..5d59c9d46bf 100644 --- a/nexus-config/src/nexus_config.rs +++ b/nexus-config/src/nexus_config.rs @@ -6,16 +6,14 @@ //! at deployment time. use crate::PostgresConfigWithUrl; - -use omicron_common::address::Ipv6Subnet; -use omicron_common::address::NEXUS_TECHPORT_EXTERNAL_PORT; -use omicron_common::address::RACK_PREFIX; -use omicron_common::api::internal::shared::SwitchLocation; - use anyhow::anyhow; use camino::{Utf8Path, Utf8PathBuf}; use dropshot::ConfigDropshot; use dropshot::ConfigLogging; +use omicron_common::address::Ipv6Subnet; +use omicron_common::address::NEXUS_TECHPORT_EXTERNAL_PORT; +use omicron_common::address::RACK_PREFIX; +use omicron_common::api::internal::shared::SwitchLocation; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_with::serde_as; diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index 367a2066a1c..02bf9152f42 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -409,4 +409,9 @@ impl Oximeter { pub fn collector_id(&self) -> &Uuid { &self.agent.id } + + /// Return the address of the server. + pub fn server_address(&self) -> SocketAddr { + self.server.local_addr() + } } diff --git a/oximeter/db/Cargo.toml b/oximeter/db/Cargo.toml index c446bc78221..0e85a37bfe0 100644 --- a/oximeter/db/Cargo.toml +++ b/oximeter/db/Cargo.toml @@ -16,12 +16,16 @@ bcs.workspace = true camino.workspace = true chrono.workspace = true clap.workspace = true +crossterm.workspace = true dropshot.workspace = true futures.workspace = true highway.workspace = true +num.workspace = true omicron-common.workspace = true omicron-workspace-hack.workspace = true oximeter.workspace = true +peg.workspace = true +reedline.workspace = true regex.workspace = true serde.workspace = true serde_json.workspace = true @@ -29,6 +33,7 @@ slog.workspace = true slog-async.workspace = true slog-dtrace.workspace = true slog-term.workspace = true +tabled.workspace = true thiserror.workspace = true usdt.workspace = true uuid.workspace = true @@ -37,26 +42,10 @@ uuid.workspace = true workspace = true features = [ "serde" ] -[dependencies.crossterm] -workspace = true -optional = true - [dependencies.indexmap] workspace = true optional = true -[dependencies.num] -workspace = true -optional = true - -[dependencies.peg] -workspace = true -optional = true - -[dependencies.reedline] -workspace = true -optional = true - [dependencies.reqwest] workspace = true features = [ "json" ] @@ -81,10 +70,6 @@ optional = true workspace = true features = [ "rt-multi-thread", "macros" ] -[dependencies.tabled] -workspace = true -optional = true - [dev-dependencies] expectorate.workspace = true indexmap.workspace = true @@ -96,21 +81,12 @@ strum.workspace = true tempfile.workspace = true [features] -default = [ "oxql", "sql" ] +default = [ "sql" ] sql = [ "dep:indexmap", - "dep:reedline", "dep:rustyline", "dep:sqlformat", "dep:sqlparser", - "dep:tabled" -] -oxql = [ - "dep:crossterm", - "dep:num", - "dep:peg", - "dep:reedline", - "dep:tabled", ] [[bin]] diff --git a/oximeter/db/src/bin/oxdb/main.rs b/oximeter/db/src/bin/oxdb/main.rs index ca11dd18a34..1fe64d768e1 100644 --- a/oximeter/db/src/bin/oxdb/main.rs +++ b/oximeter/db/src/bin/oxdb/main.rs @@ -13,18 +13,16 @@ use oximeter::{ types::{Cumulative, Sample}, Metric, Target, }; -use oximeter_db::{query, Client, DbWrite}; +use oximeter_db::{oxql::oxql_shell, query, Client, DbWrite}; use slog::{debug, info, o, Drain, Level, Logger}; use std::net::IpAddr; use std::net::SocketAddr; use uuid::Uuid; +mod oxql; #[cfg(feature = "sql")] mod sql; -#[cfg(feature = "oxql")] -mod oxql; - // Samples are inserted in chunks of this size, to avoid large allocations when inserting huge // numbers of timeseries. const INSERT_CHUNK_SIZE: usize = 100_000; @@ -154,7 +152,6 @@ enum Subcommand { }, /// Enter the Oximeter Query Language shell for interactive querying. - #[cfg(feature = "oxql")] Oxql { #[clap(flatten)] opts: crate::oxql::ShellOptions, @@ -370,9 +367,9 @@ async fn main() -> anyhow::Result<()> { Subcommand::Sql { opts } => { crate::sql::sql_shell(args.address, args.port, log, opts).await? } - #[cfg(feature = "oxql")] Subcommand::Oxql { opts } => { - crate::oxql::oxql_shell(args.address, args.port, log, opts).await? + let client = make_client(args.address, args.port, &log).await?; + oxql_shell(client, opts.print_summaries, opts.print_elapsed).await? } } Ok(()) diff --git a/oximeter/db/src/bin/oxdb/oxql.rs b/oximeter/db/src/bin/oxdb/oxql.rs index ebe55dc7a77..8893652ba5e 100644 --- a/oximeter/db/src/bin/oxdb/oxql.rs +++ b/oximeter/db/src/bin/oxdb/oxql.rs @@ -2,363 +2,18 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -//! OxQL shell. +//! OxQL binary options. // Copyright 2024 Oxide Computer -use crate::make_client; use clap::Args; -use crossterm::style::Stylize; -use dropshot::EmptyScanParams; -use dropshot::WhichPage; -use oximeter::TimeseriesSchema; -use oximeter_db::oxql::query::special_idents; -use oximeter_db::oxql::Table; -use oximeter_db::Client; -use oximeter_db::OxqlResult; -use reedline::DefaultPrompt; -use reedline::DefaultPromptSegment; -use reedline::Reedline; -use reedline::Signal; -use slog::Logger; -use std::net::IpAddr; #[derive(Clone, Debug, Args)] pub struct ShellOptions { /// Print summaries of each SQL query run against the database. #[clap(long = "summaries")] - print_summaries: bool, + pub(crate) print_summaries: bool, /// Print the total elapsed query duration. #[clap(long = "elapsed")] - print_elapsed: bool, -} - -// Print help for the basic OxQL commands. -fn print_basic_commands() { - println!("Basic commands:"); - println!(" \\?, \\h, help - Print this help"); - println!(" \\q, quit, exit, ^D - Exit the shell"); - println!(" \\l - List timeseries"); - println!(" \\d - Describe a timeseries"); - println!(" \\ql [] - Get OxQL help about an operation"); - println!(); - println!("Or try entering an OxQL `get` query"); -} - -// Print high-level information about OxQL. -fn print_general_oxql_help() { - const HELP: &str = r#"Oximeter Query Language - -The Oximeter Query Language (OxQL) implements queries as -as sequence of operations. Each of these takes zero or more -timeseries as inputs, and produces zero or more timeseries -as outputs. Operations are chained together with the pipe -operator, "|". - -All queries start with a `get` operation, which selects a -timeseries from the database, by name. For example: - -`get physical_data_link:bytes_received` - -The supported timeseries operations are: - -- get: Select a timeseries by name -- filter: Filter timeseries by field or sample values -- group_by: Group timeseries by fields, applying a reducer. -- join: Join two or more timeseries together - -Run `\ql ` to get specific help about that operation. - "#; - println!("{HELP}"); -} - -// Print help for a specific OxQL operation. -fn print_oxql_operation_help(op: &str) { - match op { - "get" => { - const HELP: &str = r#"get "); - -Get instances of a timeseries by name"#; - println!("{HELP}"); - } - "filter" => { - const HELP: &str = r#"filter "); - -Filter timeseries based on their attributes. - can be a logical combination of filtering -\"atoms\", such as `field_foo > 0`. Expressions -may use any of the usual comparison operators, and -can be nested and combined with && or ||. - -Expressions must refer to the name of a field -for a timeseries at this time, and must compare -against literals. For example, `some_field > 0` -is supported, but `some_field > other_field` is not."#; - println!("{HELP}"); - } - "group_by" => { - const HELP: &str = r#"group_by [, ... ] -group_by [, ... ], - -Group timeseries by the named fields, optionally -specifying a reducer to use when aggregating the -timeseries within each group. If no reducer is -specified, `mean` is used, averaging the values -within each group. - -Current supported reducers: - - mean - - sum"#; - println!("{HELP}"); - } - "join" => { - const HELP: &str = r#"join - -Combine 2 or more tables by peforming a natural -inner join, matching up those with fields of the -same value. Currently, joining does not take into -account the timestamps, and does not align the outputs -directly."#; - println!("{HELP}"); - } - _ => eprintln!("unrecognized OxQL operation: '{op}'"), - } -} - -// List the known timeseries. -async fn list_timeseries(client: &Client) -> anyhow::Result<()> { - let mut page = WhichPage::First(EmptyScanParams {}); - let limit = 100.try_into().unwrap(); - loop { - let results = client.timeseries_schema_list(&page, limit).await?; - for schema in results.items.iter() { - println!("{}", schema.timeseries_name); - } - if results.next_page.is_some() { - if let Some(last) = results.items.last() { - page = WhichPage::Next(last.timeseries_name.clone()); - } else { - return Ok(()); - } - } else { - return Ok(()); - } - } -} - -/// Prepare the columns for a timeseries or virtual table. -pub(crate) fn prepare_columns( - schema: &TimeseriesSchema, -) -> (Vec, Vec) { - let mut cols = Vec::with_capacity(schema.field_schema.len() + 2); - let mut types = cols.clone(); - - for field in schema.field_schema.iter() { - cols.push(field.name.clone()); - types.push(field.field_type.to_string()); - } - - cols.push(special_idents::TIMESTAMP.into()); - types.push(special_idents::DATETIME64.into()); - - if schema.datum_type.is_histogram() { - cols.push(special_idents::START_TIME.into()); - types.push(special_idents::DATETIME64.into()); - - cols.push(special_idents::BINS.into()); - types.push( - special_idents::array_type_name_from_histogram_type( - schema.datum_type, - ) - .unwrap(), - ); - - cols.push(special_idents::COUNTS.into()); - types.push(special_idents::ARRAYU64.into()); - - cols.push(special_idents::MIN.into()); - types.push(special_idents::FLOAT64.into()); - - cols.push(special_idents::MAX.into()); - types.push(special_idents::FLOAT64.into()); - - cols.push(special_idents::SUM_OF_SAMPLES.into()); - types.push(special_idents::UINT64.into()); - - cols.push(special_idents::SQUARED_MEAN.into()); - types.push(special_idents::UINT64.into()); - - for quantile in ["P50", "P90", "P99"].iter() { - cols.push(format!("{}_MARKER_HEIGHTS", quantile)); - types.push(special_idents::ARRAYFLOAT64.into()); - cols.push(format!("{}_MARKER_POSITIONS", quantile)); - types.push(special_idents::ARRAYINT64.into()); - cols.push(format!("{}_DESIRED_MARKER_POSITIONS", quantile)); - types.push(special_idents::ARRAYFLOAT64.into()); - } - } else if schema.datum_type.is_cumulative() { - cols.push(special_idents::START_TIME.into()); - types.push(special_idents::DATETIME64.into()); - cols.push(special_idents::DATUM.into()); - types.push(schema.datum_type.to_string()); - } else { - cols.push(special_idents::DATUM.into()); - types.push(schema.datum_type.to_string()); - } - - (cols, types) -} - -/// Describe a single timeseries. -async fn describe_timeseries( - client: &Client, - timeseries: &str, -) -> anyhow::Result<()> { - match timeseries.parse() { - Err(_) => eprintln!( - "Invalid timeseries name '{timeseries}, \ - use \\l to list available timeseries by name - " - ), - Ok(name) => { - if let Some(schema) = client.schema_for_timeseries(&name).await? { - let (cols, types) = prepare_columns(&schema); - let mut builder = tabled::builder::Builder::default(); - builder.push_record(cols); // first record is the header - builder.push_record(types); - println!( - "{}", - builder.build().with(tabled::settings::Style::psql()) - ); - } else { - eprintln!("No such timeseries: {timeseries}"); - } - } - } - Ok(()) -} - -/// Run the OxQL shell. -pub async fn oxql_shell( - address: IpAddr, - port: u16, - log: Logger, - opts: ShellOptions, -) -> anyhow::Result<()> { - let client = make_client(address, port, &log).await?; - - // A workaround to ensure the client has all available timeseries when the - // shell starts. - let dummy = "foo:bar".parse().unwrap(); - let _ = client.schema_for_timeseries(&dummy).await; - - // Create the line-editor. - let mut ed = Reedline::create(); - let prompt = DefaultPrompt::new( - DefaultPromptSegment::Basic("0x".to_string()), - DefaultPromptSegment::Empty, - ); - println!("Oximeter Query Language shell"); - println!(); - print_basic_commands(); - loop { - let sig = ed.read_line(&prompt); - match sig { - Ok(Signal::Success(buf)) => { - let cmd = buf.as_str().trim(); - match cmd { - "" => continue, - "\\?" | "\\h" | "help" => print_basic_commands(), - "\\q" | "quit" | "exit" => return Ok(()), - "\\l" | "\\d" => list_timeseries(&client).await?, - _ => { - if let Some(timeseries_name) = cmd.strip_prefix("\\d") { - if timeseries_name.is_empty() { - list_timeseries(&client).await?; - } else { - describe_timeseries( - &client, - timeseries_name - .trim() - .trim_end_matches(';'), - ) - .await?; - } - } else if let Some(stmt) = cmd.strip_prefix("\\ql") { - let stmt = stmt.trim(); - if stmt.is_empty() { - print_general_oxql_help(); - } else { - print_oxql_operation_help(stmt); - } - } else { - match client - .oxql_query(cmd.trim().trim_end_matches(';')) - .await - { - Ok(result) => { - print_query_summary( - &result, - opts.print_elapsed, - opts.print_summaries, - ); - print_tables(&result.tables); - } - Err(e) => { - eprintln!("{}", "Error".underlined().red()); - eprintln!("{e}"); - } - } - } - } - } - } - Ok(Signal::CtrlD) => return Ok(()), - Ok(Signal::CtrlC) => continue, - err => eprintln!("err: {err:?}"), - } - } -} - -fn print_query_summary( - result: &OxqlResult, - print_elapsed: bool, - print_summaries: bool, -) { - if !print_elapsed && !print_summaries { - return; - } - println!("{}", "Query summary".underlined().bold()); - println!(" {}: {}", "ID".bold(), result.query_id); - if print_elapsed { - println!(" {}: {:?}\n", "Total duration".bold(), result.total_duration); - } - if print_summaries { - println!(" {}:", "SQL queries".bold()); - for summary in result.query_summaries.iter() { - println!(" {}: {}", "ID".bold(), summary.id); - println!(" {}: {:?}", "Duration".bold(), summary.elapsed); - println!(" {}: {}", "Read".bold(), summary.io_summary.read); - println!(); - } - } -} - -fn print_tables(tables: &[Table]) { - for table in tables.iter() { - println!(); - println!("{}", table.name().underlined().bold()); - for timeseries in table.iter() { - if timeseries.points.is_empty() { - continue; - } - println!(); - for (name, value) in timeseries.fields.iter() { - println!(" {}: {}", name.as_str().bold(), value); - } - for point in timeseries.points.iter_points() { - println!(" {point}"); - } - } - } + pub(crate) print_elapsed: bool, } diff --git a/oximeter/db/src/bin/oxdb/sql.rs b/oximeter/db/src/bin/oxdb/sql.rs index 44780592fcb..cd01b14d889 100644 --- a/oximeter/db/src/bin/oxdb/sql.rs +++ b/oximeter/db/src/bin/oxdb/sql.rs @@ -6,16 +6,12 @@ // Copyright 2024 Oxide Computer Company -use super::oxql; use crate::make_client; use clap::Args; use dropshot::EmptyScanParams; use dropshot::WhichPage; -use oximeter_db::sql::function_allow_list; -use oximeter_db::sql::QueryResult; -use oximeter_db::sql::Table; -use oximeter_db::Client; -use oximeter_db::QuerySummary; +use oximeter_db::sql::{function_allow_list, QueryResult, Table}; +use oximeter_db::{Client, QuerySummary, TimeseriesSchema}; use reedline::DefaultPrompt; use reedline::DefaultPromptSegment; use reedline::Reedline; @@ -64,7 +60,7 @@ async fn describe_virtual_table( Err(_) => println!("Invalid timeseries name: {table}"), Ok(name) => { if let Some(schema) = client.schema_for_timeseries(&name).await? { - let (cols, types) = oxql::prepare_columns(&schema); + let (cols, types) = TimeseriesSchema::prepare_columns(&schema); let mut builder = tabled::builder::Builder::default(); builder.push_record(cols); // first record is the header builder.push_record(types); diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index 2d6212971e4..0c372cedae4 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -7,7 +7,6 @@ // Copyright 2024 Oxide Computer Company pub(crate) mod dbwrite; -#[cfg(any(feature = "oxql", test))] pub(crate) mod oxql; pub(crate) mod query_summary; #[cfg(any(feature = "sql", test))] diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index c471a837ea4..efdd8401555 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -32,13 +32,11 @@ use thiserror::Error; mod client; pub mod model; -#[cfg(feature = "oxql")] pub mod oxql; pub mod query; #[cfg(any(feature = "sql", test))] pub mod sql; -#[cfg(feature = "oxql")] pub use client::oxql::OxqlResult; pub use client::query_summary::QuerySummary; pub use client::Client; @@ -141,12 +139,10 @@ pub enum Error { #[error("SQL error")] Sql(#[from] sql::Error), - #[cfg(any(feature = "oxql", test))] #[error(transparent)] Oxql(oxql::Error), } -#[cfg(any(feature = "oxql", test))] impl From for Error { fn from(e: crate::oxql::Error) -> Self { Error::Oxql(e) diff --git a/oximeter/db/src/oxql/ast/table_ops/filter.rs b/oximeter/db/src/oxql/ast/table_ops/filter.rs index 9e796bc730a..3814f376d79 100644 --- a/oximeter/db/src/oxql/ast/table_ops/filter.rs +++ b/oximeter/db/src/oxql/ast/table_ops/filter.rs @@ -16,12 +16,12 @@ use crate::oxql::point::DataType; use crate::oxql::point::MetricType; use crate::oxql::point::Points; use crate::oxql::point::ValueArray; -use crate::oxql::query::special_idents; use crate::oxql::Error; use crate::oxql::Table; use crate::oxql::Timeseries; use chrono::DateTime; use chrono::Utc; +use oximeter::schema::special_idents; use oximeter::FieldType; use oximeter::FieldValue; use regex::Regex; diff --git a/oximeter/db/src/oxql/mod.rs b/oximeter/db/src/oxql/mod.rs index b93d75b859a..7ab11ac7ead 100644 --- a/oximeter/db/src/oxql/mod.rs +++ b/oximeter/db/src/oxql/mod.rs @@ -6,8 +6,16 @@ // Copyright 2024 Oxide Computer Company +use super::{Client, OxqlResult}; +use crossterm::style::Stylize; +use dropshot::EmptyScanParams; +use dropshot::WhichPage; use peg::error::ParseError as PegError; use peg::str::LineCol; +use reedline::DefaultPrompt; +use reedline::DefaultPromptSegment; +use reedline::Reedline; +use reedline::Signal; pub mod ast; pub mod point; @@ -19,6 +27,114 @@ pub use self::table::Table; pub use self::table::Timeseries; pub use anyhow::Error; +/// Run the OxQL shell. +pub async fn oxql_shell( + client: Client, + print_summaries: bool, + print_elapsed: bool, +) -> anyhow::Result<()> { + // A workaround to ensure the client has all available timeseries when the + // shell starts. + let dummy = "foo:bar".parse().unwrap(); + let _ = client.schema_for_timeseries(&dummy).await; + + // Create the line-editor. + let mut ed = Reedline::create(); + let prompt = DefaultPrompt::new( + DefaultPromptSegment::Basic("0x".to_string()), + DefaultPromptSegment::Empty, + ); + println!("Oximeter Query Language shell"); + println!(); + print_basic_commands(); + loop { + let sig = ed.read_line(&prompt); + match sig { + Ok(Signal::Success(buf)) => { + let cmd = buf.as_str().trim(); + match cmd { + "" => continue, + "\\?" | "\\h" | "help" => print_basic_commands(), + "\\q" | "quit" | "exit" => return Ok(()), + "\\l" | "\\d" => list_timeseries(&client).await?, + _ => { + if let Some(timeseries_name) = cmd.strip_prefix("\\d") { + if timeseries_name.is_empty() { + list_timeseries(&client).await?; + } else { + describe_timeseries( + &client, + timeseries_name + .trim() + .trim_end_matches(';'), + ) + .await?; + } + } else if let Some(stmt) = cmd.strip_prefix("\\ql") { + let stmt = stmt.trim(); + if stmt.is_empty() { + print_general_oxql_help(); + } else { + print_oxql_operation_help(stmt); + } + } else { + match client + .oxql_query(cmd.trim().trim_end_matches(';')) + .await + { + Ok(result) => { + print_query_summary( + &result, + print_elapsed, + print_summaries, + ); + print_tables(&result.tables); + } + Err(e) => { + eprintln!("{}", "Error".underlined().red()); + eprintln!("{e}"); + } + } + } + } + } + } + Ok(Signal::CtrlD) => return Ok(()), + Ok(Signal::CtrlC) => continue, + err => eprintln!("err: {err:?}"), + } + } +} + +/// Describe a single timeseries. +async fn describe_timeseries( + client: &Client, + timeseries: &str, +) -> anyhow::Result<()> { + match timeseries.parse() { + Err(_) => eprintln!( + "Invalid timeseries name '{timeseries}, \ + use \\l to list available timeseries by name + " + ), + Ok(name) => { + if let Some(schema) = client.schema_for_timeseries(&name).await? { + let (cols, types) = schema.prepare_columns(); + let mut builder = tabled::builder::Builder::default(); + builder.push_record(cols); // first record is the header + builder.push_record(types); + println!( + "{}", + builder.build().with(tabled::settings::Style::psql()) + ); + } else { + eprintln!("No such timeseries: {timeseries}"); + } + } + } + Ok(()) +} + // Format a PEG parsing error into a nice anyhow error. fn fmt_parse_error(source: &str, err: PegError) -> Error { use std::fmt::Write; @@ -37,3 +153,159 @@ fn fmt_parse_error(source: &str, err: PegError) -> Error { writeln!(out, "Expected: {}", err).unwrap(); anyhow::anyhow!(out) } + +/// Print help for a specific OxQL operation. +fn print_oxql_operation_help(op: &str) { + match op { + "get" => { + const HELP: &str = r#"get "); + +Get instances of a timeseries by name"#; + println!("{HELP}"); + } + "filter" => { + const HELP: &str = r#"filter "); + +Filter timeseries based on their attributes. + can be a logical combination of filtering +\"atoms\", such as `field_foo > 0`. Expressions +may use any of the usual comparison operators, and +can be nested and combined with && or ||. + +Expressions must refer to the name of a field +for a timeseries at this time, and must compare +against literals. For example, `some_field > 0` +is supported, but `some_field > other_field` is not."#; + println!("{HELP}"); + } + "group_by" => { + const HELP: &str = r#"group_by [, ... ] +group_by [, ... ], + +Group timeseries by the named fields, optionally +specifying a reducer to use when aggregating the +timeseries within each group. If no reducer is +specified, `mean` is used, averaging the values +within each group. + +Current supported reducers: + - mean + - sum"#; + println!("{HELP}"); + } + "join" => { + const HELP: &str = r#"join + +Combine 2 or more tables by peforming a natural +inner join, matching up those with fields of the +same value. Currently, joining does not take into +account the timestamps, and does not align the outputs +directly."#; + println!("{HELP}"); + } + _ => eprintln!("unrecognized OxQL operation: '{op}'"), + } +} + +/// Print help for the basic OxQL commands. +fn print_basic_commands() { + println!("Basic commands:"); + println!(" \\?, \\h, help - Print this help"); + println!(" \\q, quit, exit, ^D - Exit the shell"); + println!(" \\l - List timeseries"); + println!(" \\d - Describe a timeseries"); + println!(" \\ql [] - Get OxQL help about an operation"); + println!(); + println!("Or try entering an OxQL `get` query"); +} + +/// Print high-level information about OxQL. +fn print_general_oxql_help() { + const HELP: &str = r#"Oximeter Query Language + +The Oximeter Query Language (OxQL) implements queries as +as sequence of operations. Each of these takes zero or more +timeseries as inputs, and produces zero or more timeseries +as outputs. Operations are chained together with the pipe +operator, "|". + +All queries start with a `get` operation, which selects a +timeseries from the database, by name. For example: + +`get physical_data_link:bytes_received` + +The supported timeseries operations are: + +- get: Select a timeseries by name +- filter: Filter timeseries by field or sample values +- group_by: Group timeseries by fields, applying a reducer. +- join: Join two or more timeseries together + +Run `\ql ` to get specific help about that operation. + "#; + println!("{HELP}"); +} + +fn print_query_summary( + result: &OxqlResult, + print_elapsed: bool, + print_summaries: bool, +) { + if !print_elapsed && !print_summaries { + return; + } + println!("{}", "Query summary".underlined().bold()); + println!(" {}: {}", "ID".bold(), result.query_id); + if print_elapsed { + println!(" {}: {:?}\n", "Total duration".bold(), result.total_duration); + } + if print_summaries { + println!(" {}:", "SQL queries".bold()); + for summary in result.query_summaries.iter() { + println!(" {}: {}", "ID".bold(), summary.id); + println!(" {}: {:?}", "Duration".bold(), summary.elapsed); + println!(" {}: {}", "Read".bold(), summary.io_summary.read); + println!(); + } + } +} + +fn print_tables(tables: &[Table]) { + for table in tables.iter() { + println!(); + println!("{}", table.name().underlined().bold()); + for timeseries in table.iter() { + if timeseries.points.is_empty() { + continue; + } + println!(); + for (name, value) in timeseries.fields.iter() { + println!(" {}: {}", name.as_str().bold(), value); + } + for point in timeseries.points.iter_points() { + println!(" {point}"); + } + } + } +} + +// List the known timeseries. +async fn list_timeseries(client: &Client) -> anyhow::Result<()> { + let mut page = WhichPage::First(EmptyScanParams {}); + let limit = 100.try_into().unwrap(); + loop { + let results = client.timeseries_schema_list(&page, limit).await?; + for schema in results.items.iter() { + println!("{}", schema.timeseries_name); + } + if results.next_page.is_some() { + if let Some(last) = results.items.last() { + page = WhichPage::Next(last.timeseries_name.clone()); + } else { + return Ok(()); + } + } else { + return Ok(()); + } + } +} diff --git a/oximeter/db/src/oxql/query/mod.rs b/oximeter/db/src/oxql/query/mod.rs index 40a6c82f93f..e1fada9f2aa 100644 --- a/oximeter/db/src/oxql/query/mod.rs +++ b/oximeter/db/src/oxql/query/mod.rs @@ -25,63 +25,6 @@ use chrono::DateTime; use chrono::Utc; use std::time::Duration; -/// Special identifiers for column names or other widely-used values. -pub mod special_idents { - use oximeter::DatumType; - - macro_rules! gen_marker { - ($p:expr, $field:expr) => { - concat!("p", $p, "_", $field) - }; - } - - pub const TIMESTAMP: &str = "timestamp"; - pub const START_TIME: &str = "start_time"; - pub const DATUM: &str = "datum"; - pub const BINS: &str = "bins"; - pub const COUNTS: &str = "counts"; - pub const MIN: &str = "min"; - pub const MAX: &str = "max"; - pub const SUM_OF_SAMPLES: &str = "sum_of_samples"; - pub const SQUARED_MEAN: &str = "squared_mean"; - pub const DATETIME64: &str = "DateTime64"; - pub const ARRAYU64: &str = "Array[u64]"; - pub const ARRAYFLOAT64: &str = "Array[f64]"; - pub const ARRAYINT64: &str = "Array[i64]"; - pub const FLOAT64: &str = "f64"; - pub const UINT64: &str = "u64"; - - pub const DISTRIBUTION_IDENTS: [&str; 15] = [ - "bins", - "counts", - "min", - "max", - "sum_of_samples", - "squared_mean", - gen_marker!("50", "marker_heights"), - gen_marker!("50", "marker_positions"), - gen_marker!("50", "desired_marker_positions"), - gen_marker!("90", "marker_heights"), - gen_marker!("90", "marker_positions"), - gen_marker!("90", "desired_marker_positions"), - gen_marker!("99", "marker_heights"), - gen_marker!("99", "marker_positions"), - gen_marker!("99", "desired_marker_positions"), - ]; - - pub fn array_type_name_from_histogram_type( - type_: DatumType, - ) -> Option { - if !type_.is_histogram() { - return None; - } - Some(format!( - "Array[{}]", - type_.to_string().strip_prefix("Histogram").unwrap().to_lowercase(), - )) - } -} - /// A parsed OxQL query. #[derive(Clone, Debug, PartialEq)] pub struct Query { diff --git a/oximeter/impl/src/schema/mod.rs b/oximeter/impl/src/schema/mod.rs index 28dbf38ab85..f4b901ccf9c 100644 --- a/oximeter/impl/src/schema/mod.rs +++ b/oximeter/impl/src/schema/mod.rs @@ -35,6 +35,63 @@ use std::path::Path; pub const SCHEMA_DIRECTORY: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/../oximeter/schema"); +/// Special identifiers for column names or other widely-used values. +pub mod special_idents { + use oximeter::DatumType; + + macro_rules! gen_marker { + ($p:expr, $field:expr) => { + concat!("p", $p, "_", $field) + }; + } + + pub const TIMESTAMP: &str = "timestamp"; + pub const START_TIME: &str = "start_time"; + pub const DATUM: &str = "datum"; + pub const BINS: &str = "bins"; + pub const COUNTS: &str = "counts"; + pub const MIN: &str = "min"; + pub const MAX: &str = "max"; + pub const SUM_OF_SAMPLES: &str = "sum_of_samples"; + pub const SQUARED_MEAN: &str = "squared_mean"; + pub const DATETIME64: &str = "DateTime64"; + pub const ARRAYU64: &str = "Array[u64]"; + pub const ARRAYFLOAT64: &str = "Array[f64]"; + pub const ARRAYINT64: &str = "Array[i64]"; + pub const FLOAT64: &str = "f64"; + pub const UINT64: &str = "u64"; + + pub const DISTRIBUTION_IDENTS: [&str; 15] = [ + "bins", + "counts", + "min", + "max", + "sum_of_samples", + "squared_mean", + gen_marker!("50", "marker_heights"), + gen_marker!("50", "marker_positions"), + gen_marker!("50", "desired_marker_positions"), + gen_marker!("90", "marker_heights"), + gen_marker!("90", "marker_positions"), + gen_marker!("90", "desired_marker_positions"), + gen_marker!("99", "marker_heights"), + gen_marker!("99", "marker_positions"), + gen_marker!("99", "desired_marker_positions"), + ]; + + pub fn array_type_name_from_histogram_type( + type_: DatumType, + ) -> Option { + if !type_.is_histogram() { + return None; + } + Some(format!( + "Array[{}]", + type_.to_string().strip_prefix("Histogram").unwrap().to_lowercase(), + )) + } +} + /// The name and type information for a field of a timeseries schema. #[derive( Clone, @@ -200,6 +257,69 @@ pub struct TimeseriesSchema { pub created: DateTime, } +impl TimeseriesSchema { + /// Prepare the columns for a timeseries or virtual table. + pub fn prepare_columns(&self) -> (Vec, Vec) { + let mut cols = Vec::with_capacity(self.field_schema.len() + 2); + let mut types = cols.clone(); + + for field in self.field_schema.iter() { + cols.push(field.name.clone()); + types.push(field.field_type.to_string()); + } + + cols.push(special_idents::TIMESTAMP.into()); + types.push(special_idents::DATETIME64.into()); + + if self.datum_type.is_histogram() { + cols.push(special_idents::START_TIME.into()); + types.push(special_idents::DATETIME64.into()); + + cols.push(special_idents::BINS.into()); + types.push( + special_idents::array_type_name_from_histogram_type( + self.datum_type, + ) + .unwrap(), + ); + + cols.push(special_idents::COUNTS.into()); + types.push(special_idents::ARRAYU64.into()); + + cols.push(special_idents::MIN.into()); + types.push(special_idents::FLOAT64.into()); + + cols.push(special_idents::MAX.into()); + types.push(special_idents::FLOAT64.into()); + + cols.push(special_idents::SUM_OF_SAMPLES.into()); + types.push(special_idents::UINT64.into()); + + cols.push(special_idents::SQUARED_MEAN.into()); + types.push(special_idents::UINT64.into()); + + for quantile in ["P50", "P90", "P99"].iter() { + cols.push(format!("{}_MARKER_HEIGHTS", quantile)); + types.push(special_idents::ARRAYFLOAT64.into()); + cols.push(format!("{}_MARKER_POSITIONS", quantile)); + types.push(special_idents::ARRAYINT64.into()); + cols.push(format!("{}_DESIRED_MARKER_POSITIONS", quantile)); + types.push(special_idents::ARRAYFLOAT64.into()); + } + } else if self.datum_type.is_cumulative() { + cols.push(special_idents::START_TIME.into()); + types.push(special_idents::DATETIME64.into()); + cols.push(special_idents::DATUM.into()); + types.push(self.datum_type.to_string()); + } else { + cols.push(special_idents::DATUM.into()); + types.push(self.datum_type.to_string()); + } + + (cols, types) + } +} + /// Default version for timeseries schema, 1. pub const fn default_schema_version() -> NonZeroU8 { unsafe { NonZeroU8::new_unchecked(1) } diff --git a/test-utils/src/dev/test_cmds.rs b/test-utils/src/dev/test_cmds.rs index 3c675ddfd99..5d6b9a152e1 100644 --- a/test-utils/src/dev/test_cmds.rs +++ b/test-utils/src/dev/test_cmds.rs @@ -126,8 +126,8 @@ pub fn error_for_enoent() -> String { /// /// This allows use to use expectorate to verify the shape of the CLI output. pub fn redact_variable(input: &str) -> String { - // Replace TCP port numbers. We include the localhost characters to avoid - // catching any random sequence of numbers. + // Replace TCP port numbers. We include the localhost + // characters to avoid catching any random sequence of numbers. let s = regex::Regex::new(r"\[::1\]:\d{4,5}") .unwrap() .replace_all(&input, "[::1]:REDACTED_PORT") @@ -189,6 +189,13 @@ pub fn redact_variable(input: &str) -> String { .replace_all(&s, "s ago") .to_string(); + // Replace interval (s). + let s = regex::Regex::new(r"\d+s") + .unwrap() + .replace_all(&s, "s") + .to_string(); + + // Replace interval (ms). let s = regex::Regex::new(r"\d+ms") .unwrap() .replace_all(&s, "ms")