From 03ab4294d3255699e8aecbd8265baa15e6f58bbe Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Mon, 1 Jul 2024 13:29:38 +0000 Subject: [PATCH] Move `oxql` from `oxdb` to mainline `omdb` Why?: Concerning [network observability work](https://github.com/orgs/oxidecomputer/projects/55/views/1?filterQuery=&pane=issue&itemId=68336554), this makes the [`oxql`](https://rfd.shared.oxide.computer/rfd/0463) interactive query repl accessible via omdb, as we start to give users and ourselves the ability to query timeseries and metrics more easily. Additionally, in the "now", this aids in debugging through our metrics set and makes it available, via omdb, throughout our ecosystem/a4x2. Includes: * Removes `oxql` from `oxdb`. * If no URL is given to `omdb oxql`, it will leverage internal DNS. * Update the oximeter omdb call (for listing producers) to leverage internal. DNS if no URL is given. * Update command/output tests/generations and collector specific tests for list producers. Notes: * The oxql client still expects an socket address as liked it typed specifically v.s. a String. Instead, upon running the `omdb oxql` command, we take in a URL String and parse it into the socket address directly. --- .gitignore | 1 + Cargo.lock | 2 + dev-tools/omdb/Cargo.toml | 2 + dev-tools/omdb/src/bin/omdb/main.rs | 6 +- dev-tools/omdb/src/bin/omdb/oximeter.rs | 49 ++- .../omdb/src/bin/omdb}/oxql.rs | 407 +++++++++--------- dev-tools/omdb/tests/env.out | 25 ++ dev-tools/omdb/tests/successes.out | 24 +- dev-tools/omdb/tests/test_all_output.rs | 145 ++++++- dev-tools/omdb/tests/usage_errors.out | 65 +++ nexus-config/src/nexus_config.rs | 10 +- nexus/db-model/src/tuf_repo.rs | 6 +- oximeter/collector/src/lib.rs | 5 + oximeter/db/Cargo.toml | 36 +- oximeter/db/src/bin/oxdb/main.rs | 14 - oximeter/db/src/bin/oxdb/sql.rs | 4 +- oximeter/db/src/client/mod.rs | 1 - oximeter/db/src/lib.rs | 68 ++- test-utils/src/dev/test_cmds.rs | 11 +- 19 files changed, 574 insertions(+), 307 deletions(-) rename {oximeter/db/src/bin/oxdb => dev-tools/omdb/src/bin/omdb}/oxql.rs (53%) diff --git a/.gitignore b/.gitignore index 39a8f361446..6e4e0eb42a1 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ tags .falcon/* .img/* connectivity-report.json +*.local diff --git a/Cargo.lock b/Cargo.lock index d79933cd87f..11bff75a02f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5638,6 +5638,7 @@ dependencies = [ "omicron-uuid-kinds", "omicron-workspace-hack", "oximeter-client", + "oximeter-db", "pq-sys", "ratatui", "reedline", @@ -5652,6 +5653,7 @@ dependencies = [ "textwrap", "tokio", "unicode-width", + "url", "uuid", ] diff --git a/dev-tools/omdb/Cargo.toml b/dev-tools/omdb/Cargo.toml index 9cdf03093c8..ddfdab6caa5 100644 --- a/dev-tools/omdb/Cargo.toml +++ b/dev-tools/omdb/Cargo.toml @@ -37,6 +37,7 @@ nexus-types.workspace = true omicron-common.workspace = true omicron-uuid-kinds.workspace = true oximeter-client.workspace = true +oximeter-db.workspace = true # See omicron-rpaths for more about the "pq-sys" dependency. pq-sys = "*" ratatui.workspace = true @@ -51,6 +52,7 @@ tabled.workspace = true textwrap.workspace = true tokio = { workspace = true, features = [ "full" ] } unicode-width.workspace = true +url.workspace = true uuid.workspace = true ipnetwork.workspace = true omicron-workspace-hack.workspace = true diff --git a/dev-tools/omdb/src/bin/omdb/main.rs b/dev-tools/omdb/src/bin/omdb/main.rs index 7469e2ba54f..8fc48f50283 100644 --- a/dev-tools/omdb/src/bin/omdb/main.rs +++ b/dev-tools/omdb/src/bin/omdb/main.rs @@ -50,6 +50,7 @@ mod helpers; mod mgs; mod nexus; mod oximeter; +mod oxql; mod sled_agent; #[tokio::main] @@ -66,7 +67,8 @@ async fn main() -> Result<(), anyhow::Error> { OmdbCommands::Db(db) => db.run_cmd(&args, &log).await, OmdbCommands::Mgs(mgs) => mgs.run_cmd(&args, &log).await, OmdbCommands::Nexus(nexus) => nexus.run_cmd(&args, &log).await, - OmdbCommands::Oximeter(oximeter) => oximeter.run_cmd(&log).await, + OmdbCommands::Oximeter(oximeter) => oximeter.run_cmd(&args, &log).await, + OmdbCommands::Oxql(oxql) => oxql.run_cmd(&args, &log).await, OmdbCommands::SledAgent(sled) => sled.run_cmd(&args, &log).await, OmdbCommands::CrucibleAgent(crucible) => crucible.run_cmd(&args).await, } @@ -269,6 +271,8 @@ enum OmdbCommands { Nexus(nexus::NexusArgs), /// Query oximeter collector state Oximeter(oximeter::OximeterArgs), + /// Enter the Oximeter Query Language shell for interactive querying. + Oxql(oxql::OxqlArgs), /// Debug a specific Sled SledAgent(sled_agent::SledAgentArgs), } diff --git a/dev-tools/omdb/src/bin/omdb/oximeter.rs b/dev-tools/omdb/src/bin/omdb/oximeter.rs index a6dc2ce0115..1f1e3a47234 100644 --- a/dev-tools/omdb/src/bin/omdb/oximeter.rs +++ b/dev-tools/omdb/src/bin/omdb/oximeter.rs @@ -5,6 +5,7 @@ //! omdb commands that query oximeter use crate::helpers::CONNECTION_OPTIONS_HEADING; +use crate::Omdb; use anyhow::Context; use clap::Args; use clap::Subcommand; @@ -24,34 +25,58 @@ pub struct OximeterArgs { #[arg( long, env = "OMDB_OXIMETER_URL", - // This can't be global = true (i.e. passed in later in the - // command-line) because global options can't be required. If this - // changes to being optional, we should set global = true. + global = true, help_heading = CONNECTION_OPTIONS_HEADING, )] - oximeter_url: String, + oximeter_url: Option, #[command(subcommand)] command: OximeterCommands, } -/// Subcommands that query oximeter collector state +/// Subcommands that query oximeter collector state. #[derive(Debug, Subcommand)] enum OximeterCommands { - /// List the producers the collector is assigned to poll + /// List the producers the collector is assigned to poll. ListProducers, } impl OximeterArgs { - fn client(&self, log: &Logger) -> Client { - Client::new( - &self.oximeter_url, + async fn client( + &self, + omdb: &Omdb, + log: &Logger, + ) -> Result { + let oximeter_url = match &self.oximeter_url { + Some(cli_or_env_url) => cli_or_env_url.clone(), + None => { + eprintln!( + "note: Oximeter URL not specified. Will pick one from DNS." + ); + let addr = omdb + .dns_lookup_one( + log.clone(), + internal_dns::ServiceName::Oximeter, + ) + .await?; + format!("http://{}", addr) + } + }; + eprintln!("note: using Oximeter URL {}", &oximeter_url); + + let client = Client::new( + &oximeter_url, log.new(slog::o!("component" => "oximeter-client")), - ) + ); + Ok(client) } - pub async fn run_cmd(&self, log: &Logger) -> anyhow::Result<()> { - let client = self.client(log); + pub async fn run_cmd( + &self, + omdb: &Omdb, + log: &Logger, + ) -> anyhow::Result<()> { + let client = self.client(omdb, log).await?; match self.command { OximeterCommands::ListProducers => { self.list_producers(client).await diff --git a/oximeter/db/src/bin/oxdb/oxql.rs b/dev-tools/omdb/src/bin/omdb/oxql.rs similarity index 53% rename from oximeter/db/src/bin/oxdb/oxql.rs rename to dev-tools/omdb/src/bin/omdb/oxql.rs index ebe55dc7a77..fb19b15c5a0 100644 --- a/oximeter/db/src/bin/oxdb/oxql.rs +++ b/dev-tools/omdb/src/bin/omdb/oxql.rs @@ -2,38 +2,172 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -//! OxQL shell. +//! omdb OxQL shell for interactive queries on metrics/timeseries. // Copyright 2024 Oxide Computer -use crate::make_client; +use crate::helpers::CONNECTION_OPTIONS_HEADING; +use crate::Omdb; +use anyhow::Context; use clap::Args; use crossterm::style::Stylize; -use dropshot::EmptyScanParams; -use dropshot::WhichPage; -use oximeter::TimeseriesSchema; -use oximeter_db::oxql::query::special_idents; -use oximeter_db::oxql::Table; -use oximeter_db::Client; -use oximeter_db::OxqlResult; -use reedline::DefaultPrompt; -use reedline::DefaultPromptSegment; -use reedline::Reedline; -use reedline::Signal; +use dropshot::{EmptyScanParams, WhichPage}; +use oximeter_db::{oxql::Table, prepare_columns, Client, DbWrite, OxqlResult}; +use reedline::{DefaultPrompt, DefaultPromptSegment, Reedline, Signal}; use slog::Logger; -use std::net::IpAddr; +use std::net::SocketAddr; +use url::Url; + +/// Command-line arguments for the OxQL shell. +#[derive(Debug, Args)] +pub struct OxqlArgs { + /// URL of the metrics database. + #[arg( + long, + env = "OMDB_METRICS_DB_URL", + global = true, + help_heading = CONNECTION_OPTIONS_HEADING, + )] + metrics_db_url: Option, -#[derive(Clone, Debug, Args)] -pub struct ShellOptions { /// Print summaries of each SQL query run against the database. #[clap(long = "summaries")] print_summaries: bool, + /// Print the total elapsed query duration. #[clap(long = "elapsed")] print_elapsed: bool, } -// Print help for the basic OxQL commands. +impl OxqlArgs { + async fn client( + &self, + omdb: &Omdb, + log: &Logger, + ) -> Result { + let socket_addr = match &self.metrics_db_url { + Some(cli_or_env_url) => Url::parse(&cli_or_env_url) + .context("Failed to parse metrics DB URL")? + .socket_addrs(|| None) + .context("Failed to resolve metrics DB URL")? + .drain(..) + .next() + .context("Failed to resolve metrics DB URL")?, + _ => { + eprintln!( + "note: Metrics DB address/port not specified. Will pick one from DNS." + ); + SocketAddr::V6( + omdb.dns_lookup_one( + log.clone(), + internal_dns::ServiceName::Clickhouse, + ) + .await?, + ) + } + }; + eprintln!("note: using Metrics DB socket address: {}", &socket_addr); + + let client = Client::new(socket_addr, log); + + client + .init_single_node_db() + .await + .context("Failed to initialize timeseries database")?; + Ok(client) + } + + /// Run the OxQL shell via the `omdb oxql` subcommand. + pub async fn run_cmd( + &self, + omdb: &Omdb, + log: &Logger, + ) -> anyhow::Result<()> { + let client = self.client(omdb, log).await?; + // A workaround to ensure the client has all available timeseries when + // the shell starts. + let dummy = "foo:bar".parse().unwrap(); + let _ = client.schema_for_timeseries(&dummy).await; + + // Create the line-editor. + let mut ed = Reedline::create(); + let prompt = DefaultPrompt::new( + DefaultPromptSegment::Basic("0x".to_string()), + DefaultPromptSegment::Empty, + ); + println!("Oximeter Query Language shell"); + println!(); + print_basic_commands(); + + loop { + let sig = ed.read_line(&prompt); + match sig { + Ok(Signal::Success(buf)) => { + let cmd = buf.as_str().trim(); + match cmd { + "" => continue, + "\\?" | "\\h" | "help" => print_basic_commands(), + "\\q" | "quit" | "exit" => return Ok(()), + "\\l" | "\\d" => list_timeseries(&client).await?, + _ => { + if let Some(timeseries_name) = + cmd.strip_prefix("\\d") + { + if timeseries_name.is_empty() { + list_timeseries(&client).await?; + } else { + describe_timeseries( + &client, + timeseries_name + .trim() + .trim_end_matches(';'), + ) + .await?; + } + } else if let Some(stmt) = cmd.strip_prefix("\\ql") + { + let stmt = stmt.trim(); + if stmt.is_empty() { + print_general_oxql_help(); + } else { + print_oxql_operation_help(stmt); + } + } else { + match client + .oxql_query( + cmd.trim().trim_end_matches(';'), + ) + .await + { + Ok(result) => { + print_query_summary( + &result, + self.print_elapsed, + self.print_summaries, + ); + print_tables(&result.tables); + } + Err(e) => { + eprintln!( + "{}", + "Error".underlined().red() + ); + eprintln!("{e}"); + } + } + } + } + } + } + Ok(Signal::CtrlD) => return Ok(()), + Ok(Signal::CtrlC) => continue, + err => eprintln!("err: {err:?}"), + } + } + } +} + +/// Print help for the basic OxQL commands. fn print_basic_commands() { println!("Basic commands:"); println!(" \\?, \\h, help - Print this help"); @@ -45,7 +179,7 @@ fn print_basic_commands() { println!("Or try entering an OxQL `get` query"); } -// Print high-level information about OxQL. +/// Print high-level information about OxQL. fn print_general_oxql_help() { const HELP: &str = r#"Oximeter Query Language @@ -72,7 +206,7 @@ Run `\ql ` to get specific help about that operation. println!("{HELP}"); } -// Print help for a specific OxQL operation. +/// Print help for a specific OxQL operation. fn print_oxql_operation_help(op: &str) { match op { "get" => { @@ -125,7 +259,52 @@ directly."#; } } -// List the known timeseries. +/// Print a summary of the query results. +fn print_query_summary( + result: &OxqlResult, + print_elapsed: bool, + print_summaries: bool, +) { + if !print_elapsed && !print_summaries { + return; + } + println!("{}", "Query summary".underlined().bold()); + println!(" {}: {}", "ID".bold(), result.query_id); + if print_elapsed { + println!(" {}: {:?}\n", "Total duration".bold(), result.total_duration); + } + if print_summaries { + println!(" {}:", "SQL queries".bold()); + for summary in result.query_summaries.iter() { + println!(" {}: {}", "ID".bold(), summary.id); + println!(" {}: {:?}", "Duration".bold(), summary.elapsed); + println!(" {}: {}", "Read".bold(), summary.io_summary.read); + println!(); + } + } +} + +/// Print the tables returned by a query. +fn print_tables(tables: &[Table]) { + for table in tables.iter() { + println!(); + println!("{}", table.name().underlined().bold()); + for timeseries in table.iter() { + if timeseries.points.is_empty() { + continue; + } + println!(); + for (name, value) in timeseries.fields.iter() { + println!(" {}: {}", name.as_str().bold(), value); + } + for point in timeseries.points.iter_points() { + println!(" {point}"); + } + } + } +} + +/// List the known timeseries. async fn list_timeseries(client: &Client) -> anyhow::Result<()> { let mut page = WhichPage::First(EmptyScanParams {}); let limit = 100.try_into().unwrap(); @@ -146,69 +325,6 @@ async fn list_timeseries(client: &Client) -> anyhow::Result<()> { } } -/// Prepare the columns for a timeseries or virtual table. -pub(crate) fn prepare_columns( - schema: &TimeseriesSchema, -) -> (Vec, Vec) { - let mut cols = Vec::with_capacity(schema.field_schema.len() + 2); - let mut types = cols.clone(); - - for field in schema.field_schema.iter() { - cols.push(field.name.clone()); - types.push(field.field_type.to_string()); - } - - cols.push(special_idents::TIMESTAMP.into()); - types.push(special_idents::DATETIME64.into()); - - if schema.datum_type.is_histogram() { - cols.push(special_idents::START_TIME.into()); - types.push(special_idents::DATETIME64.into()); - - cols.push(special_idents::BINS.into()); - types.push( - special_idents::array_type_name_from_histogram_type( - schema.datum_type, - ) - .unwrap(), - ); - - cols.push(special_idents::COUNTS.into()); - types.push(special_idents::ARRAYU64.into()); - - cols.push(special_idents::MIN.into()); - types.push(special_idents::FLOAT64.into()); - - cols.push(special_idents::MAX.into()); - types.push(special_idents::FLOAT64.into()); - - cols.push(special_idents::SUM_OF_SAMPLES.into()); - types.push(special_idents::UINT64.into()); - - cols.push(special_idents::SQUARED_MEAN.into()); - types.push(special_idents::UINT64.into()); - - for quantile in ["P50", "P90", "P99"].iter() { - cols.push(format!("{}_MARKER_HEIGHTS", quantile)); - types.push(special_idents::ARRAYFLOAT64.into()); - cols.push(format!("{}_MARKER_POSITIONS", quantile)); - types.push(special_idents::ARRAYINT64.into()); - cols.push(format!("{}_DESIRED_MARKER_POSITIONS", quantile)); - types.push(special_idents::ARRAYFLOAT64.into()); - } - } else if schema.datum_type.is_cumulative() { - cols.push(special_idents::START_TIME.into()); - types.push(special_idents::DATETIME64.into()); - cols.push(special_idents::DATUM.into()); - types.push(schema.datum_type.to_string()); - } else { - cols.push(special_idents::DATUM.into()); - types.push(schema.datum_type.to_string()); - } - - (cols, types) -} - /// Describe a single timeseries. async fn describe_timeseries( client: &Client, @@ -237,128 +353,3 @@ async fn describe_timeseries( } Ok(()) } - -/// Run the OxQL shell. -pub async fn oxql_shell( - address: IpAddr, - port: u16, - log: Logger, - opts: ShellOptions, -) -> anyhow::Result<()> { - let client = make_client(address, port, &log).await?; - - // A workaround to ensure the client has all available timeseries when the - // shell starts. - let dummy = "foo:bar".parse().unwrap(); - let _ = client.schema_for_timeseries(&dummy).await; - - // Create the line-editor. - let mut ed = Reedline::create(); - let prompt = DefaultPrompt::new( - DefaultPromptSegment::Basic("0x".to_string()), - DefaultPromptSegment::Empty, - ); - println!("Oximeter Query Language shell"); - println!(); - print_basic_commands(); - loop { - let sig = ed.read_line(&prompt); - match sig { - Ok(Signal::Success(buf)) => { - let cmd = buf.as_str().trim(); - match cmd { - "" => continue, - "\\?" | "\\h" | "help" => print_basic_commands(), - "\\q" | "quit" | "exit" => return Ok(()), - "\\l" | "\\d" => list_timeseries(&client).await?, - _ => { - if let Some(timeseries_name) = cmd.strip_prefix("\\d") { - if timeseries_name.is_empty() { - list_timeseries(&client).await?; - } else { - describe_timeseries( - &client, - timeseries_name - .trim() - .trim_end_matches(';'), - ) - .await?; - } - } else if let Some(stmt) = cmd.strip_prefix("\\ql") { - let stmt = stmt.trim(); - if stmt.is_empty() { - print_general_oxql_help(); - } else { - print_oxql_operation_help(stmt); - } - } else { - match client - .oxql_query(cmd.trim().trim_end_matches(';')) - .await - { - Ok(result) => { - print_query_summary( - &result, - opts.print_elapsed, - opts.print_summaries, - ); - print_tables(&result.tables); - } - Err(e) => { - eprintln!("{}", "Error".underlined().red()); - eprintln!("{e}"); - } - } - } - } - } - } - Ok(Signal::CtrlD) => return Ok(()), - Ok(Signal::CtrlC) => continue, - err => eprintln!("err: {err:?}"), - } - } -} - -fn print_query_summary( - result: &OxqlResult, - print_elapsed: bool, - print_summaries: bool, -) { - if !print_elapsed && !print_summaries { - return; - } - println!("{}", "Query summary".underlined().bold()); - println!(" {}: {}", "ID".bold(), result.query_id); - if print_elapsed { - println!(" {}: {:?}\n", "Total duration".bold(), result.total_duration); - } - if print_summaries { - println!(" {}:", "SQL queries".bold()); - for summary in result.query_summaries.iter() { - println!(" {}: {}", "ID".bold(), summary.id); - println!(" {}: {:?}", "Duration".bold(), summary.elapsed); - println!(" {}: {}", "Read".bold(), summary.io_summary.read); - println!(); - } - } -} - -fn print_tables(tables: &[Table]) { - for table in tables.iter() { - println!(); - println!("{}", table.name().underlined().bold()); - for timeseries in table.iter() { - if timeseries.points.is_empty() { - continue; - } - println!(); - for (name, value) in timeseries.fields.iter() { - println!(" {}: {}", name.as_str().bold(), value); - } - for point in timeseries.points.iter_points() { - println!(" {point}"); - } - } - } -} diff --git a/dev-tools/omdb/tests/env.out b/dev-tools/omdb/tests/env.out index 252313e6c86..a5687c9e90a 100644 --- a/dev-tools/omdb/tests/env.out +++ b/dev-tools/omdb/tests/env.out @@ -421,3 +421,28 @@ note: using database URL postgresql://root@[::1]:REDACTED_PORT/omicron?sslmode=d note: database schema version matches expected () note: listing all commissioned sleds (use -F to filter, e.g. -F in-service) ============================================= +EXECUTING COMMAND: omdb ["oximeter", "--oximeter-url", "junk", "list-producers"] +termination: Exited(1) +--------------------------------------------- +stdout: +--------------------------------------------- +stderr: +note: using Oximeter URL junk +Error: failed to fetch collector info + +Caused by: + 0: Communication Error: builder error: relative URL without a base + 1: builder error: relative URL without a base + 2: relative URL without a base +============================================= +EXECUTING COMMAND: omdb ["oxql", "--metrics-db-url", "junk"] +termination: Exited(1) +--------------------------------------------- +stdout: +--------------------------------------------- +stderr: +Error: Failed to parse metrics DB URL + +Caused by: + relative URL without a base +============================================= diff --git a/dev-tools/omdb/tests/successes.out b/dev-tools/omdb/tests/successes.out index 47d091443db..1dcee314d3d 100644 --- a/dev-tools/omdb/tests/successes.out +++ b/dev-tools/omdb/tests/successes.out @@ -401,14 +401,14 @@ task: "dns_propagation_external" task: "nat_v4_garbage_collector" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms last completion reported error: failed to resolve addresses for Dendrite services: no record found for Query { name: Name("_dendrite._tcp.control-plane.oxide.internal."), query_type: SRV, query_class: IN } task: "blueprint_loader" - configured period: every 1m 40s + configured period: every 1m s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -432,7 +432,7 @@ task: "abandoned_vmm_reaper" sled resource reservations deleted: 0 task: "bfd_manager" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -463,7 +463,7 @@ task: "external_endpoints" TLS certificates: 0 task: "instance_watcher" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -491,7 +491,7 @@ task: "metrics_producer_gc" warning: unknown background task: "metrics_producer_gc" (don't know how to interpret details: Object {"expiration": String(""), "pruned": Array []}) task: "phantom_disks" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -499,14 +499,14 @@ task: "phantom_disks" number of phantom disk delete errors: 0 task: "physical_disk_adoption" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a dependent task completing started at (s ago) and ran for ms last completion reported error: task disabled task: "region_replacement" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -514,7 +514,7 @@ task: "region_replacement" number of region replacement start errors: 0 task: "region_replacement_driver" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -529,28 +529,28 @@ task: "service_firewall_rule_propagation" started at (s ago) and ran for ms task: "service_zone_nat_tracker" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms last completion reported error: inventory collection is None task: "switch_port_config_manager" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms warning: unknown background task: "switch_port_config_manager" (don't know how to interpret details: Object {}) task: "v2p_manager" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms warning: unknown background task: "v2p_manager" (don't know how to interpret details: Object {}) task: "vpc_route_manager" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms diff --git a/dev-tools/omdb/tests/test_all_output.rs b/dev-tools/omdb/tests/test_all_output.rs index 19be33631db..72fbf36ac30 100644 --- a/dev-tools/omdb/tests/test_all_output.rs +++ b/dev-tools/omdb/tests/test_all_output.rs @@ -8,6 +8,7 @@ //! sure you're only breaking what you intend. use expectorate::assert_contents; +use nexus_test_utils::{OXIMETER_UUID, PRODUCER_UUID}; use nexus_test_utils_macros::nexus_test; use nexus_types::deployment::SledFilter; use nexus_types::deployment::UnstableReconfiguratorState; @@ -26,6 +27,14 @@ const CMD_OMDB: &str = env!("CARGO_BIN_EXE_omdb"); type ControlPlaneTestContext = nexus_test_utils::ControlPlaneTestContext; +fn assert_oximeter_list_producers_output(output: &str, ox_url: &str) { + assert!( + output.contains(format!("Collector ID: {}", OXIMETER_UUID).as_str()) + ); + assert!(output.contains(PRODUCER_UUID)); + assert!(output.contains(ox_url)); +} + #[tokio::test] async fn test_omdb_usage_errors() { let cmd_path = path_to_executable(CMD_OMDB); @@ -57,6 +66,10 @@ async fn test_omdb_usage_errors() { &["sled-agent"], &["sled-agent", "zones"], &["sled-agent", "zpools"], + &["oximeter", "--help"], + &["oxql", "--help"], + // Mispelled argument + &["oxql", "--summarizes"], ]; for args in invocations { @@ -74,10 +87,14 @@ async fn test_omdb_success_cases(cptestctx: &ControlPlaneTestContext) { ) .await; let cmd_path = path_to_executable(CMD_OMDB); + let postgres_url = cptestctx.database.listen_url(); let nexus_internal_url = format!("http://{}/", cptestctx.internal_client.bind_address); let mgs_url = format!("http://{}/", gwtestctx.client.bind_address); + let ox_url = format!("http://{}/", cptestctx.oximeter.server_address()); + let ch_url = format!("http://{}/", cptestctx.clickhouse.address); + let tmpdir = camino_tempfile::tempdir() .expect("failed to create temporary directory"); let tmppath = tmpdir.path().join("reconfigurator-save.out"); @@ -124,18 +141,25 @@ async fn test_omdb_success_cases(cptestctx: &ControlPlaneTestContext) { let p = postgres_url.to_string(); let u = nexus_internal_url.clone(); let g = mgs_url.clone(); + let ox = ox_url.clone(); + let ch = ch_url.clone(); do_run_extra( &mut output, move |exec| { exec.env("OMDB_DB_URL", &p) .env("OMDB_NEXUS_URL", &u) .env("OMDB_MGS_URL", &g) + .env("OMDB_OXIMETER_URL", &ox) + .env("OMDB_METRICS_DB_URL", &ch) }, &cmd_path, args, - ExtraRedactions::new() - .variable_length("tmp_path", tmppath.as_str()) - .fixed_length("blueprint_id", &initial_blueprint_id), + true, + Some( + ExtraRedactions::new() + .variable_length("tmp_path", tmppath.as_str()) + .fixed_length("blueprint_id", &initial_blueprint_id), + ), ) .await; } @@ -170,6 +194,23 @@ async fn test_omdb_success_cases(cptestctx: &ControlPlaneTestContext) { .is_some()); assert!(!parsed.collections.is_empty()); + // The `oximeter` list-producers command output is not easy to compare as a + // string directly because the timing of registrations with both our test + // producer and the one nexus registers. But, let's find our test producer + // in the list. + let ox_invocation = &["oximeter", "list-producers"]; + let mut ox_output = String::new(); + let ox = ox_url.clone(); + + do_run_no_redactions( + &mut ox_output, + move |exec| exec.env("OMDB_OXIMETER_URL", &ox), + &cmd_path, + ox_invocation, + ) + .await; + assert_oximeter_list_producers_output(&ox_output, &ox_url); + gwtestctx.teardown().await; } @@ -188,6 +229,8 @@ async fn test_omdb_env_settings(cptestctx: &ControlPlaneTestContext) { let postgres_url = cptestctx.database.listen_url().to_string(); let nexus_internal_url = format!("http://{}", cptestctx.internal_client.bind_address); + let ox_url = format!("http://{}/", cptestctx.oximeter.server_address()); + let ch_url = format!("http://{}/", cptestctx.clickhouse.address); let dns_sockaddr = cptestctx.internal_dns.dns_server.local_address(); let mut output = String::new(); @@ -263,7 +306,47 @@ async fn test_omdb_env_settings(cptestctx: &ControlPlaneTestContext) { let args = &["--dns-server", &dns_sockaddr.to_string(), "db", "sleds"]; do_run(&mut output, move |exec| exec, &cmd_path, args).await; + // Case: specified in multiple places (command-line argument wins) + let args = &["oximeter", "--oximeter-url", "junk", "list-producers"]; + let ox = ox_url.clone(); + do_run( + &mut output, + move |exec| exec.env("OMDB_OXIMETER_URL", &ox), + &cmd_path, + args, + ) + .await; + + // Case: specified in multiple places (command-line argument wins) + let args = &["oxql", "--metrics-db-url", "junk"]; + do_run( + &mut output, + move |exec| exec.env("OMDB_METRICS_DB_URL", &ch_url), + &cmd_path, + args, + ) + .await; + assert_contents("tests/env.out", &output); + + // The `oximeter` list-producers command output is not easy to compare as a + // string directly because the timing of registrations with both our test + // producer and the one nexus registers. But, let's find our test producer + // in the list. + + // Oximeter URL + // Case 1: specified on the command line. + // Case 2: is covered by the success tests above. + let ox_args1 = &["oximeter", "--oximeter-url", &ox_url, "list-producers"]; + let mut ox_output1 = String::new(); + do_run_no_redactions( + &mut ox_output1, + move |exec| exec, + &cmd_path, + ox_args1, + ) + .await; + assert_oximeter_list_producers_output(&ox_output1, &ox_url); } async fn do_run( @@ -274,8 +357,26 @@ async fn do_run( ) where F: FnOnce(Exec) -> Exec + Send + 'static, { - do_run_extra(output, modexec, cmd_path, args, &ExtraRedactions::new()) - .await; + do_run_extra( + output, + modexec, + cmd_path, + args, + true, + Some(&ExtraRedactions::new()), + ) + .await; +} + +async fn do_run_no_redactions( + output: &mut String, + modexec: F, + cmd_path: &Path, + args: &[&str], +) where + F: FnOnce(Exec) -> Exec + Send + 'static, +{ + do_run_extra(output, modexec, cmd_path, args, false, None).await; } async fn do_run_extra( @@ -283,18 +384,27 @@ async fn do_run_extra( modexec: F, cmd_path: &Path, args: &[&str], - extra_redactions: &ExtraRedactions<'_>, + redactions: bool, + extra_redactions: Option<&ExtraRedactions<'_>>, ) where F: FnOnce(Exec) -> Exec + Send + 'static, { - println!("running command with args: {:?}", args); write!( output, "EXECUTING COMMAND: {} {:?}\n", cmd_path.file_name().expect("missing command").to_string_lossy(), - args.iter() - .map(|r| redact_extra(r, extra_redactions)) - .collect::>(), + if redactions { + args.iter() + .map(|r| { + redact_extra( + r, + extra_redactions.unwrap_or(&ExtraRedactions::new()), + ) + }) + .collect::>() + } else { + args.iter().map(|r| r.to_string()).collect::>() + } ) .unwrap(); @@ -326,9 +436,20 @@ async fn do_run_extra( write!(output, "termination: {:?}\n", exit_status).unwrap(); write!(output, "---------------------------------------------\n").unwrap(); write!(output, "stdout:\n").unwrap(); - output.push_str(&redact_extra(&stdout_text, extra_redactions)); + if redactions { + output.push_str(&redact_extra(&stdout_text, extra_redactions.unwrap())); + } else { + output.push_str(&stdout_text); + } write!(output, "---------------------------------------------\n").unwrap(); write!(output, "stderr:\n").unwrap(); - output.push_str(&redact_extra(&stderr_text, extra_redactions)); + if redactions { + output.push_str(&redact_extra( + &stderr_text, + extra_redactions.unwrap_or(&ExtraRedactions::new()), + )); + } else { + output.push_str(&stderr_text); + } write!(output, "=============================================\n").unwrap(); } diff --git a/dev-tools/omdb/tests/usage_errors.out b/dev-tools/omdb/tests/usage_errors.out index b7fe7bdf7fe..f66580b49b8 100644 --- a/dev-tools/omdb/tests/usage_errors.out +++ b/dev-tools/omdb/tests/usage_errors.out @@ -14,6 +14,7 @@ Commands: mgs Debug a specific Management Gateway Service instance nexus Debug a specific Nexus instance oximeter Query oximeter collector state + oxql Enter the Oximeter Query Language shell for interactive querying sled-agent Debug a specific Sled help Print this message or the help of the given subcommand(s) @@ -44,6 +45,7 @@ Commands: mgs Debug a specific Management Gateway Service instance nexus Debug a specific Nexus instance oximeter Query oximeter collector state + oxql Enter the Oximeter Query Language shell for interactive querying sled-agent Debug a specific Sled help Print this message or the help of the given subcommand(s) @@ -613,3 +615,66 @@ Connection Options: Safety Options: -w, --destructive Allow potentially-destructive subcommands ============================================= +EXECUTING COMMAND: omdb ["oximeter", "--help"] +termination: Exited(0) +--------------------------------------------- +stdout: +Query oximeter collector state + +Usage: omdb oximeter [OPTIONS] + +Commands: + list-producers List the producers the collector is assigned to poll + help Print this message or the help of the given subcommand(s) + +Options: + --log-level log level filter [env: LOG_LEVEL=] [default: warn] + -h, --help Print help + +Connection Options: + --oximeter-url URL of the oximeter collector to query [env: + OMDB_OXIMETER_URL=] + --dns-server [env: OMDB_DNS_SERVER=] + +Safety Options: + -w, --destructive Allow potentially-destructive subcommands +--------------------------------------------- +stderr: +============================================= +EXECUTING COMMAND: omdb ["oxql", "--help"] +termination: Exited(0) +--------------------------------------------- +stdout: +Enter the Oximeter Query Language shell for interactive querying + +Usage: omdb oxql [OPTIONS] + +Options: + --log-level log level filter [env: LOG_LEVEL=] [default: warn] + --summaries Print summaries of each SQL query run against the database + --elapsed Print the total elapsed query duration + -h, --help Print help + +Connection Options: + --metrics-db-url URL of the metrics database [env: OMDB_METRICS_DB_URL=] + --dns-server [env: OMDB_DNS_SERVER=] + +Safety Options: + -w, --destructive Allow potentially-destructive subcommands +--------------------------------------------- +stderr: +============================================= +EXECUTING COMMAND: omdb ["oxql", "--summarizes"] +termination: Exited(2) +--------------------------------------------- +stdout: +--------------------------------------------- +stderr: +error: unexpected argument '--summarizes' found + + tip: a similar argument exists: '--summaries' + +Usage: omdb oxql <--metrics-db-url |--summaries|--elapsed> + +For more information, try '--help'. +============================================= diff --git a/nexus-config/src/nexus_config.rs b/nexus-config/src/nexus_config.rs index a8c863298e3..5d59c9d46bf 100644 --- a/nexus-config/src/nexus_config.rs +++ b/nexus-config/src/nexus_config.rs @@ -6,16 +6,14 @@ //! at deployment time. use crate::PostgresConfigWithUrl; - -use omicron_common::address::Ipv6Subnet; -use omicron_common::address::NEXUS_TECHPORT_EXTERNAL_PORT; -use omicron_common::address::RACK_PREFIX; -use omicron_common::api::internal::shared::SwitchLocation; - use anyhow::anyhow; use camino::{Utf8Path, Utf8PathBuf}; use dropshot::ConfigDropshot; use dropshot::ConfigLogging; +use omicron_common::address::Ipv6Subnet; +use omicron_common::address::NEXUS_TECHPORT_EXTERNAL_PORT; +use omicron_common::address::RACK_PREFIX; +use omicron_common::api::internal::shared::SwitchLocation; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_with::serde_as; diff --git a/nexus/db-model/src/tuf_repo.rs b/nexus/db-model/src/tuf_repo.rs index 4a64566a628..2ef68829432 100644 --- a/nexus/db-model/src/tuf_repo.rs +++ b/nexus/db-model/src/tuf_repo.rs @@ -291,7 +291,7 @@ impl fmt::Display for ArtifactHash { } } -impl ToSql for ArtifactHash { +impl ToSql for ArtifactHash { fn to_sql<'a>( &'a self, out: &mut diesel::serialize::Output<'a, '_, diesel::pg::Pg>, @@ -303,11 +303,11 @@ impl ToSql for ArtifactHash { } } -impl FromSql for ArtifactHash { +impl FromSql for ArtifactHash { fn from_sql( bytes: diesel::pg::PgValue<'_>, ) -> diesel::deserialize::Result { - let s = String::from_sql(bytes)?; + let s = >::from_sql(bytes)?; ExternalArtifactHash::from_str(&s) .map(ArtifactHash) .map_err(|e| e.into()) diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index 367a2066a1c..02bf9152f42 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -409,4 +409,9 @@ impl Oximeter { pub fn collector_id(&self) -> &Uuid { &self.agent.id } + + /// Return the address of the server. + pub fn server_address(&self) -> SocketAddr { + self.server.local_addr() + } } diff --git a/oximeter/db/Cargo.toml b/oximeter/db/Cargo.toml index c446bc78221..0e85a37bfe0 100644 --- a/oximeter/db/Cargo.toml +++ b/oximeter/db/Cargo.toml @@ -16,12 +16,16 @@ bcs.workspace = true camino.workspace = true chrono.workspace = true clap.workspace = true +crossterm.workspace = true dropshot.workspace = true futures.workspace = true highway.workspace = true +num.workspace = true omicron-common.workspace = true omicron-workspace-hack.workspace = true oximeter.workspace = true +peg.workspace = true +reedline.workspace = true regex.workspace = true serde.workspace = true serde_json.workspace = true @@ -29,6 +33,7 @@ slog.workspace = true slog-async.workspace = true slog-dtrace.workspace = true slog-term.workspace = true +tabled.workspace = true thiserror.workspace = true usdt.workspace = true uuid.workspace = true @@ -37,26 +42,10 @@ uuid.workspace = true workspace = true features = [ "serde" ] -[dependencies.crossterm] -workspace = true -optional = true - [dependencies.indexmap] workspace = true optional = true -[dependencies.num] -workspace = true -optional = true - -[dependencies.peg] -workspace = true -optional = true - -[dependencies.reedline] -workspace = true -optional = true - [dependencies.reqwest] workspace = true features = [ "json" ] @@ -81,10 +70,6 @@ optional = true workspace = true features = [ "rt-multi-thread", "macros" ] -[dependencies.tabled] -workspace = true -optional = true - [dev-dependencies] expectorate.workspace = true indexmap.workspace = true @@ -96,21 +81,12 @@ strum.workspace = true tempfile.workspace = true [features] -default = [ "oxql", "sql" ] +default = [ "sql" ] sql = [ "dep:indexmap", - "dep:reedline", "dep:rustyline", "dep:sqlformat", "dep:sqlparser", - "dep:tabled" -] -oxql = [ - "dep:crossterm", - "dep:num", - "dep:peg", - "dep:reedline", - "dep:tabled", ] [[bin]] diff --git a/oximeter/db/src/bin/oxdb/main.rs b/oximeter/db/src/bin/oxdb/main.rs index ca11dd18a34..c7e2b44e8da 100644 --- a/oximeter/db/src/bin/oxdb/main.rs +++ b/oximeter/db/src/bin/oxdb/main.rs @@ -22,9 +22,6 @@ use uuid::Uuid; #[cfg(feature = "sql")] mod sql; -#[cfg(feature = "oxql")] -mod oxql; - // Samples are inserted in chunks of this size, to avoid large allocations when inserting huge // numbers of timeseries. const INSERT_CHUNK_SIZE: usize = 100_000; @@ -152,13 +149,6 @@ enum Subcommand { #[clap(flatten)] opts: crate::sql::ShellOptions, }, - - /// Enter the Oximeter Query Language shell for interactive querying. - #[cfg(feature = "oxql")] - Oxql { - #[clap(flatten)] - opts: crate::oxql::ShellOptions, - }, } async fn make_client( @@ -370,10 +360,6 @@ async fn main() -> anyhow::Result<()> { Subcommand::Sql { opts } => { crate::sql::sql_shell(args.address, args.port, log, opts).await? } - #[cfg(feature = "oxql")] - Subcommand::Oxql { opts } => { - crate::oxql::oxql_shell(args.address, args.port, log, opts).await? - } } Ok(()) } diff --git a/oximeter/db/src/bin/oxdb/sql.rs b/oximeter/db/src/bin/oxdb/sql.rs index 44780592fcb..064b1494cbe 100644 --- a/oximeter/db/src/bin/oxdb/sql.rs +++ b/oximeter/db/src/bin/oxdb/sql.rs @@ -6,11 +6,11 @@ // Copyright 2024 Oxide Computer Company -use super::oxql; use crate::make_client; use clap::Args; use dropshot::EmptyScanParams; use dropshot::WhichPage; +use oximeter_db::prepare_columns; use oximeter_db::sql::function_allow_list; use oximeter_db::sql::QueryResult; use oximeter_db::sql::Table; @@ -64,7 +64,7 @@ async fn describe_virtual_table( Err(_) => println!("Invalid timeseries name: {table}"), Ok(name) => { if let Some(schema) = client.schema_for_timeseries(&name).await? { - let (cols, types) = oxql::prepare_columns(&schema); + let (cols, types) = prepare_columns(&schema); let mut builder = tabled::builder::Builder::default(); builder.push_record(cols); // first record is the header builder.push_record(types); diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index 2d6212971e4..0c372cedae4 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -7,7 +7,6 @@ // Copyright 2024 Oxide Computer Company pub(crate) mod dbwrite; -#[cfg(any(feature = "oxql", test))] pub(crate) mod oxql; pub(crate) mod query_summary; #[cfg(any(feature = "sql", test))] diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index c471a837ea4..677edb6a701 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -6,6 +6,7 @@ // Copyright 2024 Oxide Computer Company +use crate::oxql::query::special_idents; use crate::query::StringFieldSelector; use chrono::DateTime; use chrono::Utc; @@ -32,13 +33,11 @@ use thiserror::Error; mod client; pub mod model; -#[cfg(feature = "oxql")] pub mod oxql; pub mod query; #[cfg(any(feature = "sql", test))] pub mod sql; -#[cfg(feature = "oxql")] pub use client::oxql::OxqlResult; pub use client::query_summary::QuerySummary; pub use client::Client; @@ -141,12 +140,10 @@ pub enum Error { #[error("SQL error")] Sql(#[from] sql::Error), - #[cfg(any(feature = "oxql", test))] #[error(transparent)] Oxql(oxql::Error), } -#[cfg(any(feature = "oxql", test))] impl From for Error { fn from(e: crate::oxql::Error) -> Self { Error::Oxql(e) @@ -447,3 +444,66 @@ mod tests { assert_eq!(expected_schema, TimeseriesSchema::from(db_schema)); } } + +/// Prepare the columns for a timeseries or virtual table. +pub fn prepare_columns( + schema: &TimeseriesSchema, +) -> (Vec, Vec) { + let mut cols = Vec::with_capacity(schema.field_schema.len() + 2); + let mut types = cols.clone(); + + for field in schema.field_schema.iter() { + cols.push(field.name.clone()); + types.push(field.field_type.to_string()); + } + + cols.push(special_idents::TIMESTAMP.into()); + types.push(special_idents::DATETIME64.into()); + + if schema.datum_type.is_histogram() { + cols.push(special_idents::START_TIME.into()); + types.push(special_idents::DATETIME64.into()); + + cols.push(special_idents::BINS.into()); + types.push( + special_idents::array_type_name_from_histogram_type( + schema.datum_type, + ) + .unwrap(), + ); + + cols.push(special_idents::COUNTS.into()); + types.push(special_idents::ARRAYU64.into()); + + cols.push(special_idents::MIN.into()); + types.push(special_idents::FLOAT64.into()); + + cols.push(special_idents::MAX.into()); + types.push(special_idents::FLOAT64.into()); + + cols.push(special_idents::SUM_OF_SAMPLES.into()); + types.push(special_idents::UINT64.into()); + + cols.push(special_idents::SQUARED_MEAN.into()); + types.push(special_idents::UINT64.into()); + + for quantile in ["P50", "P90", "P99"].iter() { + cols.push(format!("{}_MARKER_HEIGHTS", quantile)); + types.push(special_idents::ARRAYFLOAT64.into()); + cols.push(format!("{}_MARKER_POSITIONS", quantile)); + types.push(special_idents::ARRAYINT64.into()); + cols.push(format!("{}_DESIRED_MARKER_POSITIONS", quantile)); + types.push(special_idents::ARRAYFLOAT64.into()); + } + } else if schema.datum_type.is_cumulative() { + cols.push(special_idents::START_TIME.into()); + types.push(special_idents::DATETIME64.into()); + cols.push(special_idents::DATUM.into()); + types.push(schema.datum_type.to_string()); + } else { + cols.push(special_idents::DATUM.into()); + types.push(schema.datum_type.to_string()); + } + + (cols, types) +} diff --git a/test-utils/src/dev/test_cmds.rs b/test-utils/src/dev/test_cmds.rs index 3c675ddfd99..ed6b3eb0afa 100644 --- a/test-utils/src/dev/test_cmds.rs +++ b/test-utils/src/dev/test_cmds.rs @@ -126,8 +126,8 @@ pub fn error_for_enoent() -> String { /// /// This allows use to use expectorate to verify the shape of the CLI output. pub fn redact_variable(input: &str) -> String { - // Replace TCP port numbers. We include the localhost characters to avoid - // catching any random sequence of numbers. + // Replace TCP port numbers and keep to v4 hosts. We include the localhost + // characters to avoid catching any random sequence of numbers. let s = regex::Regex::new(r"\[::1\]:\d{4,5}") .unwrap() .replace_all(&input, "[::1]:REDACTED_PORT") @@ -189,6 +189,13 @@ pub fn redact_variable(input: &str) -> String { .replace_all(&s, "s ago") .to_string(); + // Replace interval (s). + let s = regex::Regex::new(r"\d+s") + .unwrap() + .replace_all(&s, "s") + .to_string(); + + // Replace interval (ms). let s = regex::Regex::new(r"\d+ms") .unwrap() .replace_all(&s, "ms")