From 4ee02c0a6b262e5890439b6bf8297ef8874fd8cf Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Thu, 21 Nov 2024 17:00:34 -0800 Subject: [PATCH] Remove HTTP ports from the `oximeter_db::Client` (#7118) - Remove the HTTP address argument from the client constructor - Remove the HTTP resolver from the client constructor - Remove the HTTP pool from the client - Rename `native_{address,port}` to just `{address,port}` in configuration and CLI flags - Closes #7094 - NOTE: We're still populating the DNS records for the HTTP address, and relying on the implicit addition of the native TCP address via `DnsBuilder::service_zone_clickhouse()`. We're also specifying the HTTP address in the SMF properties of the ClickHouse zone. We don't strictly need these anymore, but it's a bit more complicated to remove them, since some deployed systems rely on those records. That can be done in a follow-up, in which we'll switch from specifying the HTTP to the native addresses. --- clickhouse-admin/src/http_entrypoints.rs | 15 +- dev-tools/ch-dev/src/main.rs | 15 +- dev-tools/clickhouse-cluster-dev/src/main.rs | 2 - dev-tools/omdb/src/bin/omdb/oxql.rs | 30 +- dev-tools/omdb/tests/usage_errors.out | 7 +- nexus-config/src/nexus_config.rs | 26 +- nexus/src/app/mod.rs | 36 +- nexus/test-utils/src/lib.rs | 11 +- nexus/tests/integration_tests/oximeter.rs | 8 +- oximeter/collector/config.toml | 2 +- oximeter/collector/src/agent.rs | 21 +- .../src/bin/clickhouse-schema-updater.rs | 20 +- oximeter/collector/src/lib.rs | 36 +- oximeter/db/src/bin/oxdb/main.rs | 57 +-- oximeter/db/src/client/mod.rs | 413 +++--------------- oximeter/db/src/client/oxql.rs | 6 +- oximeter/db/src/client/sql.rs | 68 +-- oximeter/db/src/lib.rs | 7 +- oximeter/db/src/native/block.rs | 2 +- oximeter/db/src/shells/oxql.rs | 5 +- oximeter/db/src/shells/sql.rs | 5 +- oximeter/db/src/sql/mod.rs | 14 +- oximeter/db/tests/integration_test.rs | 20 +- oximeter/test-utils/src/lib.rs | 6 +- 24 files changed, 160 insertions(+), 672 deletions(-) diff --git a/clickhouse-admin/src/http_entrypoints.rs b/clickhouse-admin/src/http_entrypoints.rs index 4380318476..9eb987da72 100644 --- a/clickhouse-admin/src/http_entrypoints.rs +++ b/clickhouse-admin/src/http_entrypoints.rs @@ -130,18 +130,13 @@ impl ClickhouseAdminSingleApi for ClickhouseAdminSingleImpl { ) -> Result { let log = &rqctx.log; let ctx = rqctx.context(); - let http_address = ctx.clickhouse_cli().listen_address; - let native_address = - SocketAddrV6::new(*http_address.ip(), CLICKHOUSE_TCP_PORT, 0, 0); - let client = OximeterClient::new( - http_address.into(), - native_address.into(), - log, - ); + let ip = ctx.clickhouse_cli().listen_address.ip(); + let address = SocketAddrV6::new(*ip, CLICKHOUSE_TCP_PORT, 0, 0); + let client = OximeterClient::new(address.into(), log); debug!( log, "initializing single-node ClickHouse \ - at {http_address} to version {OXIMETER_VERSION}" + at {address} to version {OXIMETER_VERSION}" ); // Database initialization is idempotent, but not concurrency-safe. @@ -154,7 +149,7 @@ impl ClickhouseAdminSingleApi for ClickhouseAdminSingleImpl { .map_err(|e| { HttpError::for_internal_error(format!( "can't initialize single-node ClickHouse \ - at {http_address} to version {OXIMETER_VERSION}: {e}", + at {address} to version {OXIMETER_VERSION}: {e}", )) })?; diff --git a/dev-tools/ch-dev/src/main.rs b/dev-tools/ch-dev/src/main.rs index c356f098bc..ea4719f46c 100644 --- a/dev-tools/ch-dev/src/main.rs +++ b/dev-tools/ch-dev/src/main.rs @@ -9,7 +9,7 @@ use clap::{Args, Parser, Subcommand}; use dropshot::test_util::LogContext; use futures::StreamExt; use libc::SIGINT; -use omicron_common::address::{CLICKHOUSE_HTTP_PORT, CLICKHOUSE_TCP_PORT}; +use omicron_common::address::CLICKHOUSE_TCP_PORT; use omicron_test_utils::dev::{self, clickhouse::ClickHousePorts}; use signal_hook_tokio::Signals; @@ -43,14 +43,11 @@ enum ChDevCmd { #[derive(Clone, Debug, Args)] struct ChRunArgs { - /// The HTTP port on which the server will listen - #[clap(short = 'H', long, default_value_t = CLICKHOUSE_HTTP_PORT, action)] - http_port: u16, /// The port on which the native protocol server will listen #[clap(short, long, default_value_t = CLICKHOUSE_TCP_PORT, action)] - native_port: u16, + port: u16, /// Starts a ClickHouse replicated cluster of 2 replicas and 3 keeper nodes - #[clap(long, conflicts_with_all = ["http_port", "native_port"], action)] + #[clap(long, conflicts_with_all = ["port"], action)] replicated: bool, } @@ -65,8 +62,7 @@ impl ChRunArgs { if self.replicated { start_replicated_cluster(&logctx).await?; } else { - start_single_node(&logctx, self.http_port, self.native_port) - .await?; + start_single_node(&logctx, self.port).await?; } Ok(()) } @@ -74,7 +70,6 @@ impl ChRunArgs { async fn start_single_node( logctx: &LogContext, - http_port: u16, native_port: u16, ) -> Result<(), anyhow::Error> { // Start a stream listening for SIGINT @@ -82,7 +77,7 @@ async fn start_single_node( let mut signal_stream = signals.fuse(); // Start the database server process, possibly on a specific port - let ports = ClickHousePorts::new(http_port, native_port)?; + let ports = ClickHousePorts::new(0, native_port)?; let mut deployment = dev::clickhouse::ClickHouseDeployment::new_single_node_with_ports( logctx, ports, diff --git a/dev-tools/clickhouse-cluster-dev/src/main.rs b/dev-tools/clickhouse-cluster-dev/src/main.rs index 54714ac368..2f85c53ab6 100644 --- a/dev-tools/clickhouse-cluster-dev/src/main.rs +++ b/dev-tools/clickhouse-cluster-dev/src/main.rs @@ -37,13 +37,11 @@ async fn main() -> Result<()> { deployment.deploy().context("failed to deploy")?; let client1 = Client::new_with_request_timeout( - deployment.http_addr(1.into()), deployment.native_addr(1.into()), &logctx.log, request_timeout, ); let client2 = Client::new_with_request_timeout( - deployment.http_addr(2.into()), deployment.native_addr(2.into()), &logctx.log, request_timeout, diff --git a/dev-tools/omdb/src/bin/omdb/oxql.rs b/dev-tools/omdb/src/bin/omdb/oxql.rs index 172344b56c..39736d8fda 100644 --- a/dev-tools/omdb/src/bin/omdb/oxql.rs +++ b/dev-tools/omdb/src/bin/omdb/oxql.rs @@ -31,15 +31,6 @@ pub struct OxqlArgs { )] clickhouse_url: Option, - /// URL of the ClickHouse server to connect to for the native protcol. - #[arg( - long, - env = "OMDB_CLICKHOUSE_NATIVE_URL", - global = true, - help_heading = CONNECTION_OPTIONS_HEADING, - )] - clickhouse_native_url: Option, - /// Print summaries of each SQL query run against the database. #[clap(long = "summaries")] print_summaries: bool, @@ -56,7 +47,6 @@ impl OxqlArgs { omdb: &Omdb, log: &Logger, ) -> anyhow::Result<()> { - let http_addr = self.resolve_http_addr(omdb, log).await?; let native_addr = self.resolve_native_addr(omdb, log).await?; let opts = ShellOptions { @@ -65,8 +55,7 @@ impl OxqlArgs { }; oxql::shell( - http_addr.ip(), - http_addr.port(), + native_addr.ip(), native_addr.port(), log.new(slog::o!("component" => "clickhouse-client")), opts, @@ -79,27 +68,12 @@ impl OxqlArgs { &self, omdb: &Omdb, log: &Logger, - ) -> anyhow::Result { - self.resolve_addr( - omdb, - log, - self.clickhouse_native_url.as_deref(), - ServiceName::ClickhouseNative, - ) - .await - } - - /// Resolve the ClickHouse HTTP URL to a socket address. - async fn resolve_http_addr( - &self, - omdb: &Omdb, - log: &Logger, ) -> anyhow::Result { self.resolve_addr( omdb, log, self.clickhouse_url.as_deref(), - ServiceName::Clickhouse, + ServiceName::ClickhouseNative, ) .await } diff --git a/dev-tools/omdb/tests/usage_errors.out b/dev-tools/omdb/tests/usage_errors.out index 61241d9d26..5e66467403 100644 --- a/dev-tools/omdb/tests/usage_errors.out +++ b/dev-tools/omdb/tests/usage_errors.out @@ -790,16 +790,13 @@ Usage: omdb oxql [OPTIONS] Options: --log-level log level filter [env: LOG_LEVEL=] [default: warn] --summaries Print summaries of each SQL query run against the database - --color Color output [default: auto] [possible values: auto, always, never] --elapsed Print the total elapsed query duration + --color Color output [default: auto] [possible values: auto, always, never] -h, --help Print help Connection Options: --clickhouse-url URL of the ClickHouse server to connect to [env: OMDB_CLICKHOUSE_URL=] - --clickhouse-native-url - URL of the ClickHouse server to connect to for the native protcol [env: - OMDB_CLICKHOUSE_NATIVE_URL=] --dns-server [env: OMDB_DNS_SERVER=] @@ -818,7 +815,7 @@ error: unexpected argument '--summarizes' found tip: a similar argument exists: '--summaries' -Usage: omdb oxql <--clickhouse-url |--clickhouse-native-url |--summaries|--elapsed> +Usage: omdb oxql <--clickhouse-url |--summaries|--elapsed> For more information, try '--help'. ============================================= diff --git a/nexus-config/src/nexus_config.rs b/nexus-config/src/nexus_config.rs index 82362f2f0d..c4cb755042 100644 --- a/nexus-config/src/nexus_config.rs +++ b/nexus-config/src/nexus_config.rs @@ -250,12 +250,9 @@ pub struct SchemaConfig { /// Optional configuration for the timeseries database. #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)] pub struct TimeseriesDbConfig { - /// The HTTP address of the ClickHouse server. - #[serde(default, skip_serializing_if = "Option::is_none")] - pub address: Option, /// The native TCP address of the ClickHouse server. #[serde(default, skip_serializing_if = "Option::is_none")] - pub native_address: Option, + pub address: Option, } /// Configuration for the `Dendrite` dataplane daemon. @@ -779,7 +776,7 @@ mod test { use super::*; use omicron_common::address::{ - Ipv6Subnet, CLICKHOUSE_HTTP_PORT, CLICKHOUSE_TCP_PORT, RACK_PREFIX, + Ipv6Subnet, CLICKHOUSE_TCP_PORT, RACK_PREFIX, }; use omicron_common::api::internal::shared::SwitchLocation; @@ -894,8 +891,7 @@ mod test { path = "/nonexistent/path" if_exists = "fail" [timeseries_db] - address = "[::1]:8123" - native_address = "[::1]:9000" + address = "[::1]:9000" [updates] trusted_root = "/path/to/root.json" [tunables] @@ -1016,18 +1012,10 @@ mod test { timeseries_db: TimeseriesDbConfig { address: Some(SocketAddr::V6(SocketAddrV6::new( Ipv6Addr::LOCALHOST, - CLICKHOUSE_HTTP_PORT, + CLICKHOUSE_TCP_PORT, 0, 0, ))), - native_address: Some(SocketAddr::V6( - SocketAddrV6::new( - Ipv6Addr::LOCALHOST, - CLICKHOUSE_TCP_PORT, - 0, - 0, - ) - )), }, updates: Some(UpdatesConfig { trusted_root: Utf8PathBuf::from("/path/to/root.json"), @@ -1180,7 +1168,7 @@ mod test { path = "/nonexistent/path" if_exists = "fail" [timeseries_db] - address = "[::1]:8123" + address = "[::1]:9000" [deployment] id = "28b90dc4-c22a-65ba-f49a-f051fe01208f" rack_id = "38b90dc4-c22a-65ba-f49a-f051fe01208f" @@ -1267,7 +1255,7 @@ mod test { path = "/nonexistent/path" if_exists = "fail" [timeseries_db] - address = "[::1]:8123" + address = "[::1]:9000" [deployment] id = "28b90dc4-c22a-65ba-f49a-f051fe01208f" rack_id = "38b90dc4-c22a-65ba-f49a-f051fe01208f" @@ -1319,7 +1307,7 @@ mod test { path = "/nonexistent/path" if_exists = "fail" [timeseries_db] - address = "[::1]:8123" + address = "[::1]:9000" [updates] trusted_root = "/path/to/root.json" default_base_url = "http://example.invalid/" diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index e451119bfc..435ca2a56d 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -24,8 +24,6 @@ use nexus_db_queries::authn; use nexus_db_queries::authz; use nexus_db_queries::context::OpContext; use nexus_db_queries::db; -use omicron_common::address::CLICKHOUSE_HTTP_PORT; -use omicron_common::address::CLICKHOUSE_TCP_PORT; use omicron_common::address::DENDRITE_PORT; use omicron_common::address::MGD_PORT; use omicron_common::address::MGS_PORT; @@ -37,7 +35,6 @@ use sagas::common_storage::make_pantry_connection_pool; use sagas::common_storage::PooledPantryClient; use slog::Logger; use std::collections::HashMap; -use std::net::SocketAddr; use std::net::SocketAddrV6; use std::net::{IpAddr, Ipv6Addr}; use std::sync::Arc; @@ -412,38 +409,13 @@ impl Nexus { .map_err(|e| e.to_string())?; // Client to the ClickHouse database. - // TODO-cleanup: Simplify this when we remove the HTTP client. - let timeseries_client = match ( - &config.pkg.timeseries_db.address, - &config.pkg.timeseries_db.native_address, - ) { - (None, None) => { - let http_resolver = - qorb_resolver.for_service(ServiceName::Clickhouse); + let timeseries_client = match &config.pkg.timeseries_db.address { + None => { let native_resolver = qorb_resolver.for_service(ServiceName::ClickhouseNative); - oximeter_db::Client::new_with_pool( - http_resolver, - native_resolver, - &log, - ) - } - (maybe_http, maybe_native) => { - let (http_address, native_address) = - match (maybe_http, maybe_native) { - (None, None) => unreachable!("handled above"), - (None, Some(native)) => ( - SocketAddr::new(native.ip(), CLICKHOUSE_HTTP_PORT), - *native, - ), - (Some(http), None) => ( - *http, - SocketAddr::new(http.ip(), CLICKHOUSE_TCP_PORT), - ), - (Some(http), Some(native)) => (*http, *native), - }; - oximeter_db::Client::new(http_address, native_address, &log) + oximeter_db::Client::new_with_pool(native_resolver, &log) } + Some(address) => oximeter_db::Client::new(*address, &log), }; // TODO-cleanup We may want to make the populator a first-class diff --git a/nexus/test-utils/src/lib.rs b/nexus/test-utils/src/lib.rs index bf966c382c..cae42f4305 100644 --- a/nexus/test-utils/src/lib.rs +++ b/nexus/test-utils/src/lib.rs @@ -504,8 +504,7 @@ impl<'a, N: NexusServer> ControlPlaneTestContextBuilder<'a, N> { .as_mut() .expect("Tests expect to set a port of Clickhouse") .set_port(http_port); - self.config.pkg.timeseries_db.native_address = - Some(native_address.into()); + self.config.pkg.timeseries_db.address = Some(native_address.into()); let pool_name = illumos_utils::zpool::ZpoolName::new_external(zpool_id) .to_string() @@ -623,7 +622,6 @@ impl<'a, N: NexusServer> ControlPlaneTestContextBuilder<'a, N> { let oximeter = start_oximeter( log.new(o!("component" => "oximeter")), nexus_internal_addr, - clickhouse.http_address().port(), clickhouse.native_address().port(), collector_id, ) @@ -1457,16 +1455,11 @@ pub async fn start_sled_agent( pub async fn start_oximeter( log: Logger, nexus_address: SocketAddr, - http_port: u16, native_port: u16, id: Uuid, ) -> Result { let db = oximeter_collector::DbConfig { - address: Some(SocketAddr::new(Ipv6Addr::LOCALHOST.into(), http_port)), - native_address: Some(SocketAddr::new( - Ipv6Addr::LOCALHOST.into(), - native_port, - )), + address: Some(SocketAddr::new(Ipv6Addr::LOCALHOST.into(), native_port)), batch_size: 10, batch_interval: 1, replicated: false, diff --git a/nexus/tests/integration_tests/oximeter.rs b/nexus/tests/integration_tests/oximeter.rs index d842a2cc4a..2674e952e8 100644 --- a/nexus/tests/integration_tests/oximeter.rs +++ b/nexus/tests/integration_tests/oximeter.rs @@ -117,13 +117,8 @@ async fn test_oximeter_reregistration() { row.get::<&str, chrono::DateTime>("time_modified"); // ClickHouse client for verifying collection. - let ch_address = context.clickhouse.http_address().into(); let native_address = context.clickhouse.native_address().into(); - let client = oximeter_db::Client::new( - ch_address, - native_address, - &context.logctx.log, - ); + let client = oximeter_db::Client::new(native_address, &context.logctx.log); client .init_single_node_db() .await @@ -306,7 +301,6 @@ async fn test_oximeter_reregistration() { context.oximeter = nexus_test_utils::start_oximeter( context.logctx.log.new(o!("component" => "oximeter")), context.server.get_http_server_internal_address().await, - context.clickhouse.http_address().port(), context.clickhouse.native_address().port(), oximeter_id, ) diff --git a/oximeter/collector/config.toml b/oximeter/collector/config.toml index c22043e072..52b0391971 100644 --- a/oximeter/collector/config.toml +++ b/oximeter/collector/config.toml @@ -3,7 +3,7 @@ nexus_address = "[::1]:12221" [db] -address = "[::1]:8123" +address = "[::1]:9000" batch_size = 1000 batch_interval = 5 # In seconds diff --git a/oximeter/collector/src/agent.rs b/oximeter/collector/src/agent.rs index 0b33276de4..8e14428d33 100644 --- a/oximeter/collector/src/agent.rs +++ b/oximeter/collector/src/agent.rs @@ -16,7 +16,6 @@ use chrono::Utc; use futures::TryStreamExt; use nexus_client::types::IdSortMode; use nexus_client::Client as NexusClient; -use omicron_common::address::CLICKHOUSE_TCP_PORT; use omicron_common::backoff; use omicron_common::backoff::BackoffError; use oximeter::types::ProducerResults; @@ -35,7 +34,6 @@ use slog::warn; use slog::Logger; use std::collections::btree_map::Entry; use std::collections::BTreeMap; -use std::net::SocketAddr; use std::net::SocketAddrV6; use std::ops::Bound; use std::sync::Arc; @@ -394,14 +392,11 @@ pub struct OximeterAgent { impl OximeterAgent { /// Construct a new agent with the given ID and logger. - // TODO(cleanup): Remove this lint when we have only a native resolver. - #[allow(clippy::too_many_arguments)] pub async fn with_id( id: Uuid, address: SocketAddrV6, refresh_interval: Duration, db_config: DbConfig, - http_resolver: BoxedResolver, native_resolver: BoxedResolver, log: &Logger, replicated: bool, @@ -428,8 +423,7 @@ impl OximeterAgent { // - The DB doesn't exist at all. This reports a version number of 0. We // need to create the DB here, at the latest version. This is used in // fresh installations and tests. - let client = - Client::new_with_pool(http_resolver, native_resolver, &log); + let client = Client::new_with_pool(native_resolver, &log); match client.check_db_is_at_expected_version().await { Ok(_) => {} Err(oximeter_db::Error::DatabaseVersionMismatch { @@ -521,18 +515,15 @@ impl OximeterAgent { // prints the results as they're received. let insertion_log = log.new(o!("component" => "results-sink")); if let Some(db_config) = db_config { - let Some(http_address) = db_config.address else { + // Take the explicit native TCP address if provided, or the HTTP + // IP address and use the default TCP port. One of these has to be + // provided. + let Some(address) = db_config.address else { return Err(Error::Standalone(anyhow!( "Must provide explicit IP address in standalone mode" ))); }; - - // Grab the native TCP address, or construct one from the defaults. - let native_address = - db_config.native_address.unwrap_or_else(|| { - SocketAddr::new(http_address.ip(), CLICKHOUSE_TCP_PORT) - }); - let client = Client::new(http_address, native_address, &log); + let client = Client::new(address, &log); let replicated = client.is_oximeter_cluster().await?; if !replicated { client.init_single_node_db().await?; diff --git a/oximeter/collector/src/bin/clickhouse-schema-updater.rs b/oximeter/collector/src/bin/clickhouse-schema-updater.rs index 12615abca9..847baa04eb 100644 --- a/oximeter/collector/src/bin/clickhouse-schema-updater.rs +++ b/oximeter/collector/src/bin/clickhouse-schema-updater.rs @@ -11,7 +11,6 @@ use anyhow::Context; use camino::Utf8PathBuf; use clap::Parser; use clap::Subcommand; -use omicron_common::address::CLICKHOUSE_HTTP_PORT; use omicron_common::address::CLICKHOUSE_TCP_PORT; use oximeter_db::Client; use oximeter_db::OXIMETER_VERSION; @@ -23,14 +22,7 @@ use std::net::Ipv6Addr; use std::net::SocketAddr; use std::net::SocketAddrV6; -const DEFAULT_HTTP_HOST: SocketAddr = SocketAddr::V6(SocketAddrV6::new( - Ipv6Addr::LOCALHOST, - CLICKHOUSE_HTTP_PORT, - 0, - 0, -)); - -const DEFAULT_NATIVE_HOST: SocketAddr = SocketAddr::V6(SocketAddrV6::new( +const DEFAULT_HOST: SocketAddr = SocketAddr::V6(SocketAddrV6::new( Ipv6Addr::LOCALHOST, CLICKHOUSE_TCP_PORT, 0, @@ -44,14 +36,10 @@ fn parse_log_level(s: &str) -> anyhow::Result { /// Tool to apply offline updates to ClickHouse schema. #[derive(Clone, Debug, Parser)] struct Args { - /// IP address and port at which to access ClickHouse. - #[arg(long, default_value_t = DEFAULT_HTTP_HOST, env = "CLICKHOUSE_HOST")] - host: SocketAddr, - /// IP address and port at which to access ClickHouse via the native TCP /// protocol. - #[arg(long, default_value_t = DEFAULT_NATIVE_HOST, env = "CLICKHOUSE_NATIVE_HOST")] - native_host: SocketAddr, + #[arg(long, default_value_t = DEFAULT_HOST, env = "CLICKHOUSE_HOST")] + host: SocketAddr, /// Directory from which to read schema files for each version. #[arg( @@ -100,7 +88,7 @@ fn build_logger(level: Level) -> Logger { async fn main() -> anyhow::Result<()> { let args = Args::parse(); let log = build_logger(args.log_level); - let client = Client::new(args.host, args.native_host, &log); + let client = Client::new(args.host, &log); let is_replicated = client.is_oximeter_cluster().await?; match args.cmd { Cmd::List => { diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index ce190e02ed..623ba3f294 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -75,17 +75,11 @@ impl From for HttpError { /// Configuration for interacting with the metric database. #[derive(Debug, Clone, Copy, Deserialize, Serialize)] pub struct DbConfig { - /// Optional address of the ClickHouse server's HTTP interface. - /// - /// If "None", will be inferred from DNS. - #[serde(default, skip_serializing_if = "Option::is_none")] - pub address: Option, - /// Optional address of the ClickHouse server's native TCP interface. /// /// If None, will be inferred from DNS. #[serde(default, skip_serializing_if = "Option::is_none")] - pub native_address: Option, + pub address: Option, /// Batch size of samples at which to insert. pub batch_size: usize, @@ -117,7 +111,6 @@ impl DbConfig { fn with_address(address: SocketAddr) -> Self { Self { address: Some(address), - native_address: None, batch_size: Self::DEFAULT_BATCH_SIZE, batch_interval: Self::DEFAULT_BATCH_INTERVAL, replicated: Self::DEFAULT_REPLICATED, @@ -256,38 +249,17 @@ impl Oximeter { } }; - // Closure to create _two_ resolvers, one to resolve the ClickHouse HTTP - // SRV record, and one for the native TCP record. - // - // TODO(cleanup): This should be removed if / when we entirely switch to - // the native protocol. - let make_clickhouse_resolvers = || -> (BoxedResolver, BoxedResolver) { - let http_resolver = make_resolver( - config.db.address, - if config.db.replicated { - ServiceName::ClickhouseServer - } else { - ServiceName::Clickhouse - }, - ); - let native_resolver = make_resolver( - config.db.native_address, - ServiceName::ClickhouseNative, - ); - (http_resolver, native_resolver) - }; - let make_agent = || async { debug!(log, "creating ClickHouse client"); - let (http_resolver, native_resolver) = make_clickhouse_resolvers(); + let resolver = + make_resolver(config.db.address, ServiceName::ClickhouseNative); Ok(Arc::new( OximeterAgent::with_id( args.id, args.address, config.refresh_interval, config.db, - http_resolver, - native_resolver, + resolver, &log, config.db.replicated, ) diff --git a/oximeter/db/src/bin/oxdb/main.rs b/oximeter/db/src/bin/oxdb/main.rs index e7d49f6707..f5701c0ef1 100644 --- a/oximeter/db/src/bin/oxdb/main.rs +++ b/oximeter/db/src/bin/oxdb/main.rs @@ -60,13 +60,9 @@ struct OxDb { #[clap(short, long, default_value = "::1")] address: IpAddr, - /// Port on which to connect to the database using the HTTP interface. - #[clap(short, long, default_value = "8123", action)] - port: u16, - /// Port on which to connect to the database using the native TCP interface. #[clap(short, long, default_value_t = CLICKHOUSE_TCP_PORT)] - native_port: u16, + port: u16, /// Logging level #[clap(short, long, default_value = "info", value_parser = level_from_str)] @@ -219,13 +215,12 @@ async fn insert_samples( async fn populate( address: IpAddr, - http_port: u16, - native_port: u16, + port: u16, log: Logger, args: PopulateArgs, ) -> Result<(), anyhow::Error> { info!(log, "populating Oximeter database"); - let client = make_client(address, http_port, native_port, &log).await?; + let client = make_client(address, port, &log).await?; let n_timeseries = args.n_projects * args.n_instances * args.n_cpus; debug!( log, @@ -274,26 +269,24 @@ async fn populate( async fn wipe_single_node_db( address: IpAddr, - http_port: u16, - native_port: u16, + port: u16, log: Logger, ) -> Result<(), anyhow::Error> { - let client = make_client(address, http_port, native_port, &log).await?; + let client = make_client(address, port, &log).await?; client.wipe_single_node_db().await.context("Failed to wipe database") } #[allow(clippy::too_many_arguments)] async fn query( address: IpAddr, - http_port: u16, - native_port: u16, + port: u16, log: Logger, timeseries_name: String, filters: Vec, start: Option, end: Option, ) -> Result<(), anyhow::Error> { - let client = make_client(address, http_port, native_port, &log).await?; + let client = make_client(address, port, &log).await?; let filters = filters.iter().map(|s| s.as_str()).collect::>(); let timeseries = client .select_timeseries_with( @@ -323,18 +316,10 @@ async fn main() -> anyhow::Result<()> { match args.cmd { Subcommand::Describe => describe_data(), Subcommand::Populate { populate_args } => { - populate( - args.address, - args.port, - args.native_port, - log, - populate_args, - ) - .await? + populate(args.address, args.port, log, populate_args).await? } Subcommand::Wipe => { - wipe_single_node_db(args.address, args.port, args.native_port, log) - .await? + wipe_single_node_db(args.address, args.port, log).await? } Subcommand::Query { timeseries_name, @@ -357,7 +342,6 @@ async fn main() -> anyhow::Result<()> { query( args.address, args.port, - args.native_port, log, timeseries_name, filters, @@ -368,30 +352,17 @@ async fn main() -> anyhow::Result<()> { } #[cfg(feature = "sql")] Subcommand::Sql { opts } => { - oximeter_db::shells::sql::shell( - args.address, - args.port, - args.native_port, - log, - opts, - ) - .await? + oximeter_db::shells::sql::shell(args.address, args.port, log, opts) + .await? } #[cfg(feature = "oxql")] Subcommand::Oxql { opts } => { - oximeter_db::shells::oxql::shell( - args.address, - args.port, - args.native_port, - log, - opts, - ) - .await? + oximeter_db::shells::oxql::shell(args.address, args.port, log, opts) + .await? } #[cfg(feature = "native-sql-shell")] Subcommand::NativeSql => { - oximeter_db::shells::native::shell(args.address, args.native_port) - .await? + oximeter_db::shells::native::shell(args.address, args.port).await? } } Ok(()) diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index be86b45ed4..1d09ec3723 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -31,7 +31,6 @@ use crate::Timeseries; use crate::TimeseriesPageSelector; use crate::TimeseriesScanParams; use crate::TimeseriesSchema; -use anyhow::anyhow; use debug_ignore::DebugIgnore; use dropshot::EmptyScanParams; use dropshot::PaginationOrder; @@ -42,8 +41,6 @@ use oximeter::schema::TimeseriesKey; use oximeter::types::Sample; use oximeter::Measurement; use oximeter::TimeseriesName; -use qorb::backend; -use qorb::backend::Error as QorbError; use qorb::pool::Pool; use qorb::resolver::BoxedResolver; use qorb::resolvers::single_host::SingleHostResolver; @@ -83,120 +80,19 @@ mod probes { fn sql__query__done(_: &usdt::UniqueId) {} } -// A "qorb connector" which creates a ReqwestClient for the backend. -// -// This also keeps track of the underlying address, so we can use it -// for making HTTP requests directly to the backend. -struct ReqwestConnector {} - -#[async_trait::async_trait] -impl backend::Connector for ReqwestConnector { - type Connection = ReqwestClient; - - async fn connect( - &self, - backend: &backend::Backend, - ) -> Result { - Ok(ReqwestClient { - client: reqwest::Client::builder() - .pool_max_idle_per_host(1) - .build() - .map_err(|e| QorbError::Other(anyhow!(e)))?, - url: format!("http://{}", backend.address), - }) - } - - async fn is_valid( - &self, - conn: &mut Self::Connection, - ) -> Result<(), backend::Error> { - handle_db_response( - conn.client - .get(format!("{}/ping", conn.url)) - .send() - .await - .map_err(|err| QorbError::Other(anyhow!(err.to_string())))?, - ) - .await - .map_err(|e| QorbError::Other(anyhow!(e)))?; - Ok(()) - } -} - -// TODO-remove: https://github.com/oxidecomputer/omicron/issues/7094 -#[allow(dead_code)] -#[derive(Clone, Debug)] -pub(crate) struct ReqwestClient { - url: String, - client: reqwest::Client, -} - -// TODO-remove: https://github.com/oxidecomputer/omicron/issues/7094 -#[allow(dead_code)] -#[derive(Debug)] -pub(crate) enum ClientSource { - Static(ReqwestClient), - Pool { pool: DebugIgnore> }, -} - -// TODO-remove: https://github.com/oxidecomputer/omicron/issues/7094 -#[allow(dead_code)] -pub(crate) enum ClientVariant { - Static(ReqwestClient), - Handle(qorb::claim::Handle), -} - -// TODO-remove: https://github.com/oxidecomputer/omicron/issues/7094 -#[allow(dead_code)] -impl ClientVariant { - pub(crate) async fn new(source: &ClientSource) -> Result { - let client = match source { - ClientSource::Static(client) => { - ClientVariant::Static(client.clone()) - } - ClientSource::Pool { pool } => { - let handle = pool.claim().await?; - ClientVariant::Handle(handle) - } - }; - Ok(client) - } - - pub(crate) fn url(&self) -> &str { - match self { - ClientVariant::Static(client) => &client.url, - ClientVariant::Handle(handle) => &handle.url, - } - } - - pub(crate) fn reqwest(&self) -> &reqwest::Client { - match self { - ClientVariant::Static(client) => &client.client, - ClientVariant::Handle(handle) => &handle.client, - } - } -} - /// A `Client` to the ClickHouse metrics database. #[derive(Debug)] pub struct Client { _id: Uuid, log: Logger, - // Source for creating HTTP connections. - source: ClientSource, - // qorb pool for native TCP connections. - native_pool: DebugIgnore, + pool: DebugIgnore, schema: Mutex>, request_timeout: Duration, } impl Client { /// Construct a Clickhouse client of the database with a connection pool. - pub fn new_with_pool( - http_resolver: BoxedResolver, - native_resolver: BoxedResolver, - log: &Logger, - ) -> Self { + pub fn new_with_pool(native_resolver: BoxedResolver, log: &Logger) -> Self { let id = Uuid::new_v4(); let log = log.new(slog::o!( "component" => "clickhouse-client", @@ -204,20 +100,6 @@ impl Client { )); let schema = Mutex::new(BTreeMap::new()); let request_timeout = DEFAULT_REQUEST_TIMEOUT; - let pool = match Pool::new( - http_resolver, - Arc::new(ReqwestConnector {}), - qorb::policy::Policy::default(), - ) { - Ok(pool) => { - debug!(log, "registered USDT probes"); - pool - } - Err(err) => { - error!(log, "failed to register USDT probes"); - err.into_inner() - } - }; let native_pool = match Pool::new( native_resolver, Arc::new(native::connection::Connector), @@ -235,32 +117,25 @@ impl Client { Self { _id: id, log, - source: ClientSource::Pool { pool: DebugIgnore(pool) }, - native_pool: DebugIgnore(native_pool), + pool: DebugIgnore(native_pool), schema, request_timeout, } } /// Construct a new ClickHouse client of the database at `address`. - pub fn new( - http_address: SocketAddr, - native_address: SocketAddr, - log: &Logger, - ) -> Self { - Self::new_with_request_timeout( - http_address, - native_address, - log, - DEFAULT_REQUEST_TIMEOUT, - ) + /// + /// NOTE: The address here is that of the native TCP client. + pub fn new(address: SocketAddr, log: &Logger) -> Self { + Self::new_with_request_timeout(address, log, DEFAULT_REQUEST_TIMEOUT) } /// Construct a new ClickHouse client of the database at `address`, and a /// custom request timeout. + /// + /// NOTE: The address here is that of the native TCP client. pub fn new_with_request_timeout( - http_address: SocketAddr, - native_address: SocketAddr, + address: SocketAddr, log: &Logger, request_timeout: Duration, ) -> Self { @@ -269,11 +144,9 @@ impl Client { "component" => "clickhouse-client", "id" => id.to_string(), )); - let client = reqwest::Client::new(); - let url = format!("http://{}", http_address); let schema = Mutex::new(BTreeMap::new()); let native_pool = match Pool::new( - Box::new(SingleHostResolver::new(native_address)), + Box::new(SingleHostResolver::new(address)), Arc::new(native::connection::Connector), Default::default(), ) { @@ -289,8 +162,7 @@ impl Client { Self { _id: id, log, - source: ClientSource::Static(ReqwestClient { url, client }), - native_pool: DebugIgnore(native_pool), + pool: DebugIgnore(native_pool), schema, request_timeout, } @@ -300,16 +172,16 @@ impl Client { /// /// For pool-based clients, this returns "dynamic", as the URL may change /// between accesses. + #[deprecated( + note = "This cannot be relied on since we use Qorb for pooling" + )] pub fn url(&self) -> &str { - match &self.source { - ClientSource::Static(client) => &client.url, - ClientSource::Pool { .. } => "dynamic", - } + "dynamic" } /// Ping the ClickHouse server to verify connectivity. pub async fn ping(&self) -> Result<(), Error> { - let mut handle = self.native_pool.claim().await?; + let mut handle = self.pool.claim().await?; trace!(self.log, "acquired native pool claim"); handle.ping().await.map_err(Error::Native)?; trace!(self.log, "successful ping of ClickHouse server"); @@ -1169,7 +1041,7 @@ impl Client { "n_rows" => block.n_rows(), "n_columns" => block.n_columns(), ); - let mut handle = self.native_pool.claim().await?; + let mut handle = self.pool.claim().await?; let id = usdt::UniqueId::new(); probes::sql__query__start!(|| (&id, sql)); let now = tokio::time::Instant::now(); @@ -1207,7 +1079,7 @@ impl Client { "sql" => sql, ); - let mut handle = self.native_pool.claim().await?; + let mut handle = self.pool.claim().await?; let id = usdt::UniqueId::new(); probes::sql__query__start!(|| (&id, sql)); let now = tokio::time::Instant::now(); @@ -1490,22 +1362,6 @@ fn schema_validation_regex() -> &'static Regex { .expect("Invalid regex") }) } -// Return Ok if the response indicates success, otherwise return either the reqwest::Error, if this -// is a client-side error, or the body of the actual error retrieved from ClickHouse if the error -// was generated there. -async fn handle_db_response( - response: reqwest::Response, -) -> Result { - let status = response.status(); - if status.is_success() { - Ok(response) - } else { - // NOTE: ClickHouse returns 404 for all errors (so far encountered). We pull the text from - // the body if possible, which contains the actual error from the database. - let body = response.text().await.unwrap_or_else(|e| e.to_string()); - Err(Error::Database(format!("Query failed: {body}"))) - } -} #[cfg(test)] mod tests { @@ -1594,7 +1450,7 @@ mod tests { // let logctx = test_setup_log("test_do_the_thing"); // let mut db = // ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - // let client = Client::new(db.http_address().into(), &logctx.log); + // let client = Client::new(db.native_address().into(), &logctx.log); // init_db(&db, &client).await; // test_do_the_thing_impl(&db, client).await; // db.cleanup().await.unwrap(); @@ -1655,9 +1511,8 @@ mod tests { async fn test_replicated() { let logctx = test_setup_log("test_replicated"); let mut cluster = create_cluster(&logctx).await; - let address = cluster.http_address().into(); - let native_address = cluster.native_address().into(); - let client = Client::new(address, native_address, &logctx.log); + let address = cluster.native_address().into(); + let client = Client::new(address, &logctx.log); let futures: Vec<(&'static str, AsyncTest)> = vec![ ( "test_is_oximeter_cluster_replicated", @@ -1813,8 +1668,7 @@ mod tests { for (test_name, mut test) in futures { let testctx = test_setup_log(test_name); init_db(&cluster, &client).await; - test(&cluster, Client::new(address, native_address, &logctx.log)) - .await; + test(&cluster, Client::new(address, &logctx.log)).await; wipe_db(&cluster, &client).await; testctx.cleanup_successful(); } @@ -1826,9 +1680,8 @@ mod tests { async fn cannot_ping_nonexistent_server() { let logctx = test_setup_log("cannot_ping_nonexistent_server"); let log = &logctx.log; - let dont_care = "127.0.0.1:443".parse().unwrap(); let bad_addr = "[::1]:80".parse().unwrap(); - let client = Client::new(dont_care, bad_addr, &log); + let client = Client::new(bad_addr, &log); let e = client .ping() .await @@ -1844,11 +1697,7 @@ mod tests { let logctx = test_setup_log("can_ping_clickhouse"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); client.ping().await.expect("Should be able to ping existing server"); db.cleanup().await.unwrap(); logctx.cleanup_successful(); @@ -1859,11 +1708,7 @@ mod tests { let logctx = test_setup_log("test_is_oximeter_cluster"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; test_is_oximeter_cluster_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -1886,11 +1731,7 @@ mod tests { let logctx = test_setup_log("test_insert_samples"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; test_insert_samples_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -1939,11 +1780,7 @@ mod tests { let logctx = test_setup_log("test_schema_mismatch"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; test_schema_mismatch_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -1976,11 +1813,7 @@ mod tests { let logctx = test_setup_log("test_schema_update"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; test_schema_updated_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2059,11 +1892,7 @@ mod tests { let logctx = test_setup_log("test_client_select_timeseries_one"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; test_client_select_timeseries_one_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2151,11 +1980,7 @@ mod tests { let logctx = test_setup_log("test_field_record_cont"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; test_field_record_count_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2220,11 +2045,7 @@ mod tests { let logctx = test_setup_log("test_differentiate_by_timeseries_name"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; test_differentiate_by_timeseries_name_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2294,11 +2115,7 @@ mod tests { let logctx = test_setup_log("test_select_timeseries_with_select_one"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; test_select_timeseries_with_select_one_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2363,11 +2180,7 @@ mod tests { ); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; test_select_timeseries_with_select_one_field_with_multiple_values_impl( &db, client, @@ -2440,11 +2253,7 @@ mod tests { test_setup_log("test_select_timeseries_with_select_multiple_fields_with_multiple_values"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; test_select_timeseries_with_select_multiple_fields_with_multiple_values_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2520,11 +2329,7 @@ mod tests { let logctx = test_setup_log("test_select_timeseries_with_all"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; test_select_timeseries_with_all_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2585,11 +2390,7 @@ mod tests { let logctx = test_setup_log("test_select_timeseries_with_start_time"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; test_select_timeseries_with_start_time_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2640,11 +2441,7 @@ mod tests { let logctx = test_setup_log("test_select_timeseries_with_limit"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; test_select_timeseries_with_limit_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2763,11 +2560,7 @@ mod tests { let logctx = test_setup_log("test_select_timeseries_with_order"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; test_select_timeseries_with_order_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2869,11 +2662,7 @@ mod tests { let logctx = test_setup_log("test_get_schema_no_new_values"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; test_get_schema_no_new_values_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2901,11 +2690,7 @@ mod tests { let logctx = test_setup_log("test_timeseries_schema_list"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; test_timeseries_schema_list_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2944,11 +2729,7 @@ mod tests { let logctx = test_setup_log("test_list_timeseries"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; test_list_timeseries_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3533,11 +3314,7 @@ mod tests { let logctx = test_setup_log("test_recall_of_all_fields"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; test_recall_of_all_fields_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3593,11 +3370,7 @@ mod tests { test_setup_log("test_database_version_update_is_idempotent"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); // NOTE: We don't init the DB, because the test explicitly tests that. test_database_version_update_is_idempotent_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3638,11 +3411,7 @@ mod tests { let logctx = test_setup_log("test_database_version_will_not_downgrade"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); // NOTE: We don't init the DB, because the test explicitly tests that. test_database_version_will_not_downgrade_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3681,11 +3450,7 @@ mod tests { let logctx = test_setup_log("test_database_version_wipes_old_version"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); // NOTE: We don't init the DB, because the test explicitly tests that. test_database_version_wipes_old_version_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3725,11 +3490,7 @@ mod tests { let logctx = test_setup_log("test_update_schema_cache_on_new_sample"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; test_update_schema_cache_on_new_sample_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3770,11 +3531,7 @@ mod tests { let logctx = test_setup_log("test_select_all_datum_types"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; test_select_all_datum_types_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3805,11 +3562,7 @@ mod tests { test_setup_log("test_new_schema_removed_when_not_inserted"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; test_new_schema_removed_when_not_inserted_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -4026,15 +3779,14 @@ mod tests { async fn test_apply_one_schema_upgrade_impl( log: &Logger, - http_address: SocketAddr, - native_address: SocketAddr, + address: SocketAddr, replicated: bool, ) { let test_name = format!( "test_apply_one_schema_upgrade_{}", if replicated { "replicated" } else { "single_node" } ); - let client = Client::new(http_address, native_address, &log); + let client = Client::new(address, &log); // We'll test moving from version 1, which just creates a database and // table, to version 2, which adds two columns to that table in @@ -4125,10 +3877,8 @@ mod tests { let logctx = test_setup_log(TEST_NAME); let log = &logctx.log; let mut cluster = create_cluster(&logctx).await; - let address = cluster.http_address().into(); test_apply_one_schema_upgrade_impl( log, - address, cluster.native_address().into(), true, ) @@ -4145,10 +3895,8 @@ mod tests { let mut db = ClickHouseDeployment::new_single_node(&logctx) .await .expect("Failed to start ClickHouse"); - let address = db.http_address().into(); test_apply_one_schema_upgrade_impl( log, - address, db.native_address().into(), false, ) @@ -4165,8 +3913,7 @@ mod tests { let mut db = ClickHouseDeployment::new_single_node(&logctx) .await .expect("Failed to start ClickHouse"); - let address = db.http_address().into(); - let client = Client::new(address, db.native_address().into(), &log); + let client = Client::new(db.native_address().into(), &log); const REPLICATED: bool = false; client .initialize_db_with_version( @@ -4208,8 +3955,7 @@ mod tests { let mut db = ClickHouseDeployment::new_single_node(&logctx) .await .expect("Failed to start ClickHouse"); - let address = db.http_address().into(); - let client = Client::new(address, db.native_address().into(), &log); + let client = Client::new(db.native_address().into(), &log); const REPLICATED: bool = false; client .initialize_db_with_version( @@ -4243,15 +3989,14 @@ mod tests { async fn test_ensure_schema_walks_through_multiple_steps_impl( log: &Logger, - http_address: SocketAddr, - native_address: SocketAddr, + address: SocketAddr, replicated: bool, ) { let test_name = format!( "test_ensure_schema_walks_through_multiple_steps_{}", if replicated { "replicated" } else { "single_node" } ); - let client = Client::new(http_address, native_address, &log); + let client = Client::new(address, &log); // We need to actually have the oximeter DB here, and the version table, // since `ensure_schema()` writes out versions to the DB as they're @@ -4357,10 +4102,8 @@ mod tests { let mut db = ClickHouseDeployment::new_single_node(&logctx) .await .expect("Failed to start ClickHouse"); - let address = db.http_address().into(); test_ensure_schema_walks_through_multiple_steps_impl( log, - address, db.native_address().into(), false, ) @@ -4376,10 +4119,8 @@ mod tests { let logctx = test_setup_log(TEST_NAME); let log = &logctx.log; let mut cluster = create_cluster(&logctx).await; - let address = cluster.http_address().into(); test_ensure_schema_walks_through_multiple_steps_impl( log, - address, cluster.native_address().into(), true, ) @@ -4459,8 +4200,7 @@ mod tests { let mut db = ClickHouseDeployment::new_single_node(&logctx) .await .expect("Failed to start ClickHouse"); - let address = db.http_address().into(); - let client = Client::new(address, db.native_address().into(), &log); + let client = Client::new(db.native_address().into(), &log); client .init_single_node_db() .await @@ -4485,8 +4225,7 @@ mod tests { let mut db = ClickHouseDeployment::new_single_node(&logctx) .await .expect("Failed to start ClickHouse"); - let address = db.http_address().into(); - let client = Client::new(address, db.native_address().into(), &log); + let client = Client::new(db.native_address().into(), &log); client .initialize_db_with_version(false, OXIMETER_VERSION) .await @@ -4635,11 +4374,7 @@ mod tests { .await .expect("Failed to start ClickHouse") }; - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &log, - ); + let client = Client::new(db.native_address().into(), &log); // Let's start with version 2, which is the first tracked and contains // the full SQL files we need to populate the DB. @@ -4828,7 +4563,6 @@ mod tests { .expect("Failed to start ClickHouse"); test_expunge_timeseries_by_name_impl( log, - db.http_address().into(), db.native_address().into(), false, ) @@ -4842,10 +4576,8 @@ mod tests { const TEST_NAME: &str = "test_expunge_timeseries_by_name_replicated"; let logctx = test_setup_log(TEST_NAME); let mut cluster = create_cluster(&logctx).await; - let address = cluster.http_address().into(); test_expunge_timeseries_by_name_impl( &logctx.log, - address, cluster.native_address().into(), true, ) @@ -4858,11 +4590,10 @@ mod tests { // upgrade. async fn test_expunge_timeseries_by_name_impl( log: &Logger, - http_address: SocketAddr, - native_address: SocketAddr, + address: SocketAddr, replicated: bool, ) { - let client = Client::new(http_address, native_address, &log); + let client = Client::new(address, &log); const STARTING_VERSION: u64 = 1; const NEXT_VERSION: u64 = 2; @@ -5056,11 +4787,7 @@ mod tests { test_setup_log("read_latest_version_with_no_database_reports_zero"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); assert_eq!( client.read_latest_version().await.unwrap(), 0, @@ -5078,11 +4805,7 @@ mod tests { ); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; client.execute_native("DROP TABLE oximeter.version").await.unwrap(); assert_eq!( @@ -5102,11 +4825,7 @@ mod tests { ); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; assert_eq!( client.read_latest_version().await.unwrap(), @@ -5123,11 +4842,7 @@ mod tests { let logctx = test_setup_log("read_latest_version_reports_max"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); init_db(&db, &client).await; client.insert_version(1).await.unwrap(); client.insert_version(10).await.unwrap(); diff --git a/oximeter/db/src/client/oxql.rs b/oximeter/db/src/client/oxql.rs index a1fa34dd1e..52be9ae81c 100644 --- a/oximeter/db/src/client/oxql.rs +++ b/oximeter/db/src/client/oxql.rs @@ -1271,11 +1271,7 @@ mod tests { let db = ClickHouseDeployment::new_single_node(&logctx) .await .expect("Failed to start ClickHouse"); - let client = Client::new( - db.http_address().into(), - db.native_address().into(), - &logctx.log, - ); + let client = Client::new(db.native_address().into(), &logctx.log); client .init_single_node_db() .await diff --git a/oximeter/db/src/client/sql.rs b/oximeter/db/src/client/sql.rs index 10c234c9cc..6b7a78981c 100644 --- a/oximeter/db/src/client/sql.rs +++ b/oximeter/db/src/client/sql.rs @@ -18,16 +18,12 @@ // Copyright 2024 Oxide Computer Company -use super::query_summary::QuerySummary; +use crate::client::Client; +use crate::sql::QueryResult; pub use crate::sql::RestrictedQuery; +use crate::sql::Table; use crate::Error; -use crate::{ - client::Client, - sql::{QueryResult, Table}, -}; -pub use indexmap::IndexMap; use slog::debug; -pub use std::time::Instant; impl Client { /// Transform a SQL query against a timeseries, but do not execute it. @@ -44,57 +40,27 @@ impl Client { &self, query: impl AsRef, ) -> Result { - use crate::client::handle_db_response; - let original_query = query.as_ref().trim_end_matches(';'); - let ox_sql = self.transform_query(original_query).await?; - let rewritten = format!("{ox_sql} FORMAT JSONEachRow"); + let rewritten = self.transform_query(original_query).await?; debug!( self.log, "rewrote restricted query"; "original_sql" => &original_query, "rewritten_sql" => &rewritten, ); - let client = crate::client::ClientVariant::new(&self.source).await?; - - let request = client - .reqwest() - .post(client.url()) - .query(&[ - ("output_format_json_quote_64bit_integers", "0"), - ("database", crate::DATABASE_NAME), - ]) - .body(rewritten.clone()); - let query_start = Instant::now(); - let response = handle_db_response( - request - .send() - .await - .map_err(|err| Error::DatabaseUnavailable(err.to_string()))?, - ) - .await?; - let summary = QuerySummary::from_headers( - query_start.elapsed(), - response.headers(), - )?; - let text = response.text().await.unwrap(); - let mut table = Table::default(); - for line in text.lines() { - let row = - serde_json::from_str::>( - line.trim(), - ) - .unwrap(); - if table.column_names.is_empty() { - table.column_names.extend(row.keys().cloned()) - } else { - assert!(table - .column_names - .iter() - .zip(row.keys()) - .all(|(k1, k2)| k1 == k2)); - } - table.rows.push(row.into_values().collect()); + let result = self.execute_with_block(&rewritten).await?; + let summary = result.query_summary(); + let Some(block) = result.data.as_ref() else { + return Err(Error::Database(String::from( + "Got an empty data block", + ))); + }; + let mut table = Table { + column_names: block.columns.keys().cloned().collect(), + rows: vec![], + }; + for row in block.json_rows().into_iter() { + table.rows.push(row.into_iter().map(|(_k, v)| v).collect()); } Ok(QueryResult { original_query: original_query.to_string(), diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index 7879432295..6479cc6b55 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -222,13 +222,10 @@ pub struct TimeseriesPageSelector { /// Create a client to the timeseries database, and ensure the database exists. pub async fn make_client( address: IpAddr, - http_port: u16, - native_port: u16, + port: u16, log: &Logger, ) -> Result { - let http_address = SocketAddr::new(address, http_port); - let native_address = SocketAddr::new(address, native_port); - let client = Client::new(http_address, native_address, &log); + let client = Client::new(SocketAddr::new(address, port), &log); client .init_single_node_db() .await diff --git a/oximeter/db/src/native/block.rs b/oximeter/db/src/native/block.rs index 34d7588182..bd681003c2 100644 --- a/oximeter/db/src/native/block.rs +++ b/oximeter/db/src/native/block.rs @@ -210,7 +210,7 @@ impl Block { /// /// Types which have native JSON represenation, such as strings and numbers, /// are converted directly. Those without, like dates, are stringified. - #[cfg(test)] + #[cfg(any(test, feature = "sql"))] pub fn json_rows(&self) -> Vec> { use serde_json::Value; let mut out = Vec::with_capacity(self.n_rows()); diff --git a/oximeter/db/src/shells/oxql.rs b/oximeter/db/src/shells/oxql.rs index 909b4916ac..f46d08c0cf 100644 --- a/oximeter/db/src/shells/oxql.rs +++ b/oximeter/db/src/shells/oxql.rs @@ -32,13 +32,12 @@ pub struct ShellOptions { /// Run/execute the OxQL shell. pub async fn shell( address: IpAddr, - http_port: u16, - native_port: u16, + port: u16, log: Logger, opts: ShellOptions, ) -> anyhow::Result<()> { // Create the client. - let client = make_client(address, http_port, native_port, &log).await?; + let client = make_client(address, port, &log).await?; // A workaround to ensure the client has all available timeseries when the // shell starts. diff --git a/oximeter/db/src/shells/sql.rs b/oximeter/db/src/shells/sql.rs index 4d8c332aaf..f75713da3b 100644 --- a/oximeter/db/src/shells/sql.rs +++ b/oximeter/db/src/shells/sql.rs @@ -50,12 +50,11 @@ impl Default for ShellOptions { /// Run/execute the SQL shell. pub async fn shell( address: IpAddr, - http_port: u16, - native_port: u16, + port: u16, log: Logger, opts: ShellOptions, ) -> anyhow::Result<()> { - let client = make_client(address, http_port, native_port, &log).await?; + let client = make_client(address, port, &log).await?; // A workaround to ensure the client has all available timeseries when the // shell starts. diff --git a/oximeter/db/src/sql/mod.rs b/oximeter/db/src/sql/mod.rs index e434608b1c..134c58ad09 100644 --- a/oximeter/db/src/sql/mod.rs +++ b/oximeter/db/src/sql/mod.rs @@ -673,11 +673,12 @@ impl RestrictedQuery { field_name: &str, field_type: &FieldType, ) -> Select { - // FROM fields_{field_type} + // FROM oximeter.fields_{field_type} let from = TableWithJoins { relation: TableFactor::Table { - name: ObjectName(vec![Self::str_to_ident(&field_table_name( - *field_type, + name: ObjectName(vec![Self::str_to_ident(&format!( + "oximeter.{}", + field_table_name(*field_type) ))]), alias: None, args: None, @@ -760,9 +761,10 @@ impl RestrictedQuery { // FROM measurements_{datum_type} let from = TableWithJoins { relation: TableFactor::Table { - name: ObjectName(vec![Self::str_to_ident( - &measurement_table_name(*datum_type), - )]), + name: ObjectName(vec![Self::str_to_ident(&format!( + "oximeter.{}", + measurement_table_name(*datum_type) + ))]), alias: None, args: None, with_hints: vec![], diff --git a/oximeter/db/tests/integration_test.rs b/oximeter/db/tests/integration_test.rs index 14246b6345..eca0adf7d5 100644 --- a/oximeter/db/tests/integration_test.rs +++ b/oximeter/db/tests/integration_test.rs @@ -63,7 +63,6 @@ async fn test_schemas_disjoint() -> anyhow::Result<()> { deployment.deploy().context("failed to deploy")?; let client1 = Client::new_with_request_timeout( - deployment.http_addr(1.into()), deployment.native_addr(1.into()), log, request_timeout, @@ -159,13 +158,11 @@ async fn test_cluster() -> anyhow::Result<()> { deployment.deploy().context("failed to deploy")?; let client1 = Client::new_with_request_timeout( - deployment.http_addr(1.into()), deployment.native_addr(1.into()), log, request_timeout, ); let client2 = Client::new_with_request_timeout( - deployment.http_addr(2.into()), deployment.native_addr(2.into()), log, request_timeout, @@ -231,7 +228,6 @@ async fn test_cluster() -> anyhow::Result<()> { // Add a 3rd clickhouse server and wait for it to come up deployment.add_server().expect("failed to launch a 3rd clickhouse server"); let client3 = Client::new_with_request_timeout( - deployment.http_addr(3.into()), deployment.native_addr(3.into()), log, request_timeout, @@ -333,7 +329,6 @@ async fn test_cluster() -> anyhow::Result<()> { // few hundred milliseconds. To shorten the length of our test, we create a // new client with a shorter timeout. let client1_short_timeout = Client::new_with_request_timeout( - deployment.http_addr(1.into()), deployment.native_addr(1.into()), log, Duration::from_secs(2), @@ -469,10 +464,8 @@ async fn wait_for_ping(log: &Logger, client: &Client) -> anyhow::Result<()> { &Duration::from_secs(30), ) .await - .with_context(|| { - format!("failed to ping clickhouse server: {}", client.url()) - })?; - info!(log, "Clickhouse server ready: {}", client.url()); + .context("failed to ping ClickHouse server")?; + info!(log, "ClickHouse server ready"); Ok(()) } @@ -493,12 +486,7 @@ async fn wait_for_insert( &Duration::from_secs(60), ) .await - .with_context(|| { - format!( - "failed to insert samples at clickhouse server: {}", - client.url() - ) - })?; - info!(log, "inserted samples at clickhouse server: {}", client.url()); + .context("failed to insert samples into ClickHouse server")?; + info!(log, "inserted samples into clickhouse server"); Ok(()) } diff --git a/oximeter/test-utils/src/lib.rs b/oximeter/test-utils/src/lib.rs index 01d32576b0..0ceb2129f4 100644 --- a/oximeter/test-utils/src/lib.rs +++ b/oximeter/test-utils/src/lib.rs @@ -202,10 +202,8 @@ pub async fn wait_for_ping( &Duration::from_secs(30), ) .await - .with_context(|| { - format!("failed to ping clickhouse server: {}", client.url()) - })?; - info!(log, "Clickhouse server ready: {}", client.url()); + .context("failed to ping ClickHouse server")?; + info!(log, "ClickHouse server ready"); Ok(()) }