From 8d899ca015c2f750fd768135c64b8d1cd4e87e5d Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Thu, 17 Oct 2024 11:07:27 -0400 Subject: [PATCH 1/7] Add clickhouse policy to the db (#6862) This enables us to enable and disable both clickhouse clusters and single-node clickhouse zones. We anticipate few changes to this policy and so we maintain a complete history indexed by version. We only allow updates via a single version increment, and we guarantee consistency by ensuring the previous version is the latest version in the database while updating. --- nexus/db-model/src/clickhouse_policy.rs | 111 +++++++ nexus/db-model/src/lib.rs | 2 + nexus/db-model/src/schema.rs | 10 + nexus/db-model/src/schema_versions.rs | 3 +- .../src/db/datastore/clickhouse_policy.rs | 284 ++++++++++++++++++ nexus/db-queries/src/db/datastore/mod.rs | 1 + nexus/reconfigurator/execution/src/dns.rs | 1 + .../planning/src/blueprint_builder/builder.rs | 33 +- nexus/reconfigurator/planning/src/planner.rs | 155 +++++++--- nexus/reconfigurator/preparation/src/lib.rs | 10 +- nexus/types/src/deployment.rs | 1 + nexus/types/src/deployment/planning_input.rs | 91 ++++-- schema/crdb/clickhouse-policy/up1.sql | 5 + schema/crdb/clickhouse-policy/up2.sql | 7 + schema/crdb/dbinit.sql | 42 ++- 15 files changed, 677 insertions(+), 79 deletions(-) create mode 100644 nexus/db-model/src/clickhouse_policy.rs create mode 100644 nexus/db-queries/src/db/datastore/clickhouse_policy.rs create mode 100644 schema/crdb/clickhouse-policy/up1.sql create mode 100644 schema/crdb/clickhouse-policy/up2.sql diff --git a/nexus/db-model/src/clickhouse_policy.rs b/nexus/db-model/src/clickhouse_policy.rs new file mode 100644 index 0000000000..55b50d903a --- /dev/null +++ b/nexus/db-model/src/clickhouse_policy.rs @@ -0,0 +1,111 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Database representation of a clickhouse deployment policy + +use super::impl_enum_type; +use crate::SqlU32; +use crate::{schema::clickhouse_policy, SqlU8}; +use chrono::{DateTime, Utc}; +use nexus_types::deployment; +use serde::{Deserialize, Serialize}; + +impl_enum_type!( + #[derive(Clone, SqlType, Debug, QueryId)] + #[diesel(postgres_type(name = "clickhouse_mode", schema = "public"))] + pub struct ClickhouseModeEnum; + + #[derive(Clone, Copy, Debug, AsExpression, FromSqlRow, Serialize, Deserialize, PartialEq)] + #[diesel(sql_type = ClickhouseModeEnum)] + pub enum DbClickhouseMode; + + // Enum values + SingleNodeOnly => b"single_node_only" + ClusterOnly => b"cluster_only" + Both => b"both" +); + +#[derive(Queryable, Clone, Debug, Selectable, Insertable)] +#[diesel(table_name = clickhouse_policy)] +pub struct ClickhousePolicy { + pub version: SqlU32, + pub clickhouse_mode: DbClickhouseMode, + pub clickhouse_cluster_target_servers: SqlU8, + pub clickhouse_cluster_target_keepers: SqlU8, + pub time_created: DateTime, +} + +impl From<&deployment::ClickhouseMode> for DbClickhouseMode { + fn from(value: &deployment::ClickhouseMode) -> Self { + match value { + deployment::ClickhouseMode::SingleNodeOnly => { + DbClickhouseMode::SingleNodeOnly + } + deployment::ClickhouseMode::ClusterOnly { .. } => { + DbClickhouseMode::ClusterOnly + } + deployment::ClickhouseMode::Both { .. } => DbClickhouseMode::Both, + } + } +} + +impl From for deployment::ClickhousePolicy { + fn from(value: ClickhousePolicy) -> Self { + let mode = match value.clickhouse_mode { + DbClickhouseMode::SingleNodeOnly => { + deployment::ClickhouseMode::SingleNodeOnly + } + DbClickhouseMode::ClusterOnly => { + deployment::ClickhouseMode::ClusterOnly { + target_servers: value.clickhouse_cluster_target_servers.0, + target_keepers: value.clickhouse_cluster_target_keepers.0, + } + } + DbClickhouseMode::Both => deployment::ClickhouseMode::Both { + target_servers: value.clickhouse_cluster_target_servers.0, + target_keepers: value.clickhouse_cluster_target_keepers.0, + }, + }; + + deployment::ClickhousePolicy { + version: value.version.0, + mode, + time_created: value.time_created, + } + } +} + +impl From for ClickhousePolicy { + fn from(value: deployment::ClickhousePolicy) -> Self { + match value.mode { + deployment::ClickhouseMode::SingleNodeOnly => ClickhousePolicy { + version: value.version.into(), + clickhouse_mode: DbClickhouseMode::SingleNodeOnly, + clickhouse_cluster_target_servers: 0.into(), + clickhouse_cluster_target_keepers: 0.into(), + time_created: value.time_created, + }, + deployment::ClickhouseMode::ClusterOnly { + target_servers, + target_keepers, + } => ClickhousePolicy { + version: value.version.into(), + clickhouse_mode: DbClickhouseMode::ClusterOnly, + clickhouse_cluster_target_servers: target_servers.into(), + clickhouse_cluster_target_keepers: target_keepers.into(), + time_created: value.time_created, + }, + deployment::ClickhouseMode::Both { + target_servers, + target_keepers, + } => ClickhousePolicy { + version: value.version.into(), + clickhouse_mode: DbClickhouseMode::Both, + clickhouse_cluster_target_servers: target_servers.into(), + clickhouse_cluster_target_keepers: target_keepers.into(), + time_created: value.time_created, + }, + } + } +} diff --git a/nexus/db-model/src/lib.rs b/nexus/db-model/src/lib.rs index 001c97b6f6..9137b0b3c3 100644 --- a/nexus/db-model/src/lib.rs +++ b/nexus/db-model/src/lib.rs @@ -17,6 +17,7 @@ mod block_size; mod bootstore; mod bytecount; mod certificate; +mod clickhouse_policy; mod cockroachdb_node_id; mod collection; mod console_session; @@ -133,6 +134,7 @@ pub use block_size::*; pub use bootstore::*; pub use bytecount::*; pub use certificate::*; +pub use clickhouse_policy::*; pub use cockroachdb_node_id::*; pub use collection::*; pub use console_session::*; diff --git a/nexus/db-model/src/schema.rs b/nexus/db-model/src/schema.rs index a51fd04c8e..e0942d6b3b 100644 --- a/nexus/db-model/src/schema.rs +++ b/nexus/db-model/src/schema.rs @@ -841,6 +841,16 @@ table! { } } +table! { + clickhouse_policy (version) { + version -> Int8, + clickhouse_mode -> crate::clickhouse_policy::ClickhouseModeEnum, + clickhouse_cluster_target_servers -> Int2, + clickhouse_cluster_target_keepers -> Int2, + time_created -> Timestamptz, + } +} + table! { rack (id) { id -> Uuid, diff --git a/nexus/db-model/src/schema_versions.rs b/nexus/db-model/src/schema_versions.rs index d1d82b25fb..f75264d18a 100644 --- a/nexus/db-model/src/schema_versions.rs +++ b/nexus/db-model/src/schema_versions.rs @@ -17,7 +17,7 @@ use std::collections::BTreeMap; /// /// This must be updated when you change the database schema. Refer to /// schema/crdb/README.adoc in the root of this repository for details. -pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(109, 0, 0); +pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(110, 0, 0); /// List of all past database schema versions, in *reverse* order /// @@ -29,6 +29,7 @@ static KNOWN_VERSIONS: Lazy> = Lazy::new(|| { // | leaving the first copy as an example for the next person. // v // KnownVersion::new(next_int, "unique-dirname-with-the-sql-files"), + KnownVersion::new(110, "clickhouse-policy"), KnownVersion::new(109, "inv-clickhouse-keeper-membership"), KnownVersion::new(108, "internet-gateway"), KnownVersion::new(107, "add-instance-boot-disk"), diff --git a/nexus/db-queries/src/db/datastore/clickhouse_policy.rs b/nexus/db-queries/src/db/datastore/clickhouse_policy.rs new file mode 100644 index 0000000000..d433bb9b60 --- /dev/null +++ b/nexus/db-queries/src/db/datastore/clickhouse_policy.rs @@ -0,0 +1,284 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Queries related to clickhouse policy + +use super::DataStore; +use crate::authz; +use crate::context::OpContext; +use crate::db; +use crate::db::error::public_error_from_diesel; +use crate::db::error::ErrorHandler; +use crate::db::pagination::paginated; +use async_bb8_diesel::AsyncRunQueryDsl; +use diesel::dsl::sql_query; +use diesel::expression::SelectableHelper; +use diesel::sql_types; +use diesel::ExpressionMethods; +use diesel::OptionalExtension; +use diesel::QueryDsl; +use nexus_db_model::ClickhouseModeEnum; +use nexus_db_model::ClickhousePolicy as DbClickhousePolicy; +use nexus_db_model::DbClickhouseMode; +use nexus_db_model::SqlU32; +use nexus_db_model::SqlU8; +use nexus_types::deployment::ClickhousePolicy; +use omicron_common::api::external::DataPageParams; +use omicron_common::api::external::Error; +use omicron_common::api::external::ListResultVec; + +impl DataStore { + pub async fn clickhouse_policy_list( + &self, + opctx: &OpContext, + pagparams: &DataPageParams<'_, SqlU32>, + ) -> ListResultVec { + use db::schema::clickhouse_policy; + + opctx + .authorize(authz::Action::ListChildren, &authz::BLUEPRINT_CONFIG) + .await?; + + let policies = paginated( + clickhouse_policy::table, + clickhouse_policy::version, + pagparams, + ) + .select(DbClickhousePolicy::as_select()) + .get_results_async(&*self.pool_connection_authorized(opctx).await?) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; + + Ok(policies.into_iter().map(ClickhousePolicy::from).collect()) + } + + /// Return the clickhouse policy with the highest version + pub async fn clickhouse_policy_get_latest( + &self, + opctx: &OpContext, + ) -> Result, Error> { + opctx.authorize(authz::Action::Read, &authz::BLUEPRINT_CONFIG).await?; + let conn = self.pool_connection_authorized(opctx).await?; + + use db::schema::clickhouse_policy::dsl; + + let latest_policy = dsl::clickhouse_policy + .order_by(dsl::version.desc()) + .first_async::(&*conn) + .await + .optional() + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; + + Ok(latest_policy.map(Into::into)) + } + + /// Insert the current version of the policy in the database + /// + /// Only succeeds if the prior version is the latest version currently + /// in the `clickhouse_policy` table. If there are no versions currently + /// in the table, then the current policy must be at version 1. + pub async fn clickhouse_policy_insert_latest_version( + &self, + opctx: &OpContext, + policy: &ClickhousePolicy, + ) -> Result<(), Error> { + if policy.version < 1 { + return Err(Error::invalid_request( + "policy version must be greater than 0", + )); + } + opctx + .authorize(authz::Action::Modify, &authz::BLUEPRINT_CONFIG) + .await?; + + let num_inserted = if policy.version == 1 { + self.clickhouse_policy_insert_first_policy(opctx, &policy).await? + } else { + self.clickhouse_policy_insert_next_policy(opctx, &policy).await? + }; + + match num_inserted { + 0 => Err(Error::invalid_request(format!( + "policy version {} is not the most recent", + policy.version + ))), + 1 => Ok(()), + // This is impossible because we are explicitly inserting only one + // row with a unique primary key. + _ => unreachable!("query inserted more than one row"), + } + } + + /// Insert the next version of the policy in the database + /// + /// Only succeeds if the prior version is the latest version currently + /// in the `clickhouse_policy` table. + /// + /// Panics if `policy.version <= 1`; + async fn clickhouse_policy_insert_next_policy( + &self, + opctx: &OpContext, + policy: &ClickhousePolicy, + ) -> Result { + assert!(policy.version > 1); + let prev_version = policy.version - 1; + + sql_query( + r"INSERT INTO clickhouse_policy + (version, clickhouse_mode, clickhouse_cluster_target_servers, + clickhouse_cluster_target_keepers, time_created) + SELECT $1, $2, $3, $4, $5 + FROM clickhouse_policy WHERE version = $6 AND version IN + (SELECT version FROM clickhouse_policy + ORDER BY version DESC LIMIT 1)", + ) + .bind::(policy.version.into()) + .bind::((&policy.mode).into()) + .bind::(policy.mode.target_servers().into()) + .bind::(policy.mode.target_keepers().into()) + .bind::(policy.time_created) + .bind::(prev_version.into()) + .execute_async(&*self.pool_connection_authorized(opctx).await?) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + + /// Insert the first clickhouse policy in the database at version 1. + /// + /// Only insert this policy if no other policy exists yet. + /// + /// Return the number of inserted rows or an error. + async fn clickhouse_policy_insert_first_policy( + &self, + opctx: &OpContext, + policy: &ClickhousePolicy, + ) -> Result { + sql_query( + r"INSERT INTO clickhouse_policy + (version, clickhouse_mode, clickhouse_cluster_target_servers, + clickhouse_cluster_target_keepers, time_created) + SELECT $1, $2, $3, $4, $5 + WHERE NOT EXISTS (SELECT * FROM clickhouse_policy)", + ) + .bind::(policy.version.into()) + .bind::((&policy.mode).into()) + .bind::(policy.mode.target_servers().into()) + .bind::(policy.mode.target_keepers().into()) + .bind::(policy.time_created) + .execute_async(&*self.pool_connection_authorized(opctx).await?) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::db::datastore::test_utils::datastore_test; + use nexus_inventory::now_db_precision; + use nexus_test_utils::db::test_setup_database; + use nexus_types::deployment::ClickhouseMode; + use omicron_test_utils::dev; + + #[tokio::test] + async fn test_clickhouse_policy_basic() { + // Setup + let logctx = dev::test_setup_log("test_clickhouse_policy_basic"); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = datastore_test(&logctx, &db).await; + + // Listing an empty table should return an empty vec + + assert!(datastore + .clickhouse_policy_list(&opctx, &DataPageParams::max_page()) + .await + .unwrap() + .is_empty()); + + // Fail to insert a policy with version 0 + let mut policy = ClickhousePolicy { + version: 0, + mode: ClickhouseMode::SingleNodeOnly, + time_created: now_db_precision(), + }; + + assert!(datastore + .clickhouse_policy_insert_latest_version(&opctx, &policy) + .await + .unwrap_err() + .to_string() + .contains("policy version must be greater than 0")); + + // Inserting version 2 before version 1 should not work + policy.version = 2; + assert!(datastore + .clickhouse_policy_insert_latest_version(&opctx, &policy) + .await + .unwrap_err() + .to_string() + .contains("policy version 2 is not the most recent")); + + // Inserting version 1 should work + policy.version = 1; + assert!(datastore + .clickhouse_policy_insert_latest_version(&opctx, &policy) + .await + .is_ok()); + + // Inserting version 2 should work + policy.version = 2; + assert!(datastore + .clickhouse_policy_insert_latest_version(&opctx, &policy) + .await + .is_ok()); + + // Inserting version 4 should not work, since the prior version is 2 + policy.version = 4; + assert!(datastore + .clickhouse_policy_insert_latest_version(&opctx, &policy) + .await + .unwrap_err() + .to_string() + .contains("policy version 4 is not the most recent")); + + // Inserting version 3 should work + policy.version = 3; + assert!(datastore + .clickhouse_policy_insert_latest_version(&opctx, &policy) + .await + .is_ok()); + + // Inserting version 4 should work + policy.version = 4; + policy.mode = + ClickhouseMode::Both { target_servers: 3, target_keepers: 5 }; + assert!(datastore + .clickhouse_policy_insert_latest_version(&opctx, &policy) + .await + .is_ok()); + + let history = datastore + .clickhouse_policy_list(&opctx, &DataPageParams::max_page()) + .await + .unwrap(); + + for i in 1..=4 { + let policy = &history[i - 1]; + assert_eq!(policy.version, i as u32); + if i != 4 { + assert!(matches!(policy.mode, ClickhouseMode::SingleNodeOnly)); + assert_eq!(policy.mode.target_servers(), 0); + assert_eq!(policy.mode.target_keepers(), 0); + } else { + assert!(matches!(policy.mode, ClickhouseMode::Both { .. })); + assert_eq!(policy.mode.target_servers(), 3); + assert_eq!(policy.mode.target_keepers(), 5); + } + } + + // Clean up. + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } +} diff --git a/nexus/db-queries/src/db/datastore/mod.rs b/nexus/db-queries/src/db/datastore/mod.rs index 258e43f18c..e724d3d9af 100644 --- a/nexus/db-queries/src/db/datastore/mod.rs +++ b/nexus/db-queries/src/db/datastore/mod.rs @@ -55,6 +55,7 @@ mod bfd; mod bgp; mod bootstore; mod certificate; +mod clickhouse_policy; mod cockroachdb_node_id; mod cockroachdb_settings; mod console_session; diff --git a/nexus/reconfigurator/execution/src/dns.rs b/nexus/reconfigurator/execution/src/dns.rs index 6a40dc1da2..8798836f47 100644 --- a/nexus/reconfigurator/execution/src/dns.rs +++ b/nexus/reconfigurator/execution/src/dns.rs @@ -1366,6 +1366,7 @@ mod test { target_cockroachdb_cluster_version: CockroachDbClusterVersion::POLICY, target_crucible_pantry_zone_count: CRUCIBLE_PANTRY_REDUNDANCY, + clickhouse_policy: None, log, } .build() diff --git a/nexus/reconfigurator/planning/src/blueprint_builder/builder.rs b/nexus/reconfigurator/planning/src/blueprint_builder/builder.rs index 3302768a56..8a9f49823d 100644 --- a/nexus/reconfigurator/planning/src/blueprint_builder/builder.rs +++ b/nexus/reconfigurator/planning/src/blueprint_builder/builder.rs @@ -182,6 +182,9 @@ impl fmt::Display for Operation { ZoneExpungeReason::ClickhouseClusterDisabled => { "clickhouse cluster disabled via policy" } + ZoneExpungeReason::ClickhouseSingleNodeDisabled => { + "clickhouse single-node disabled via policy" + } }; write!( f, @@ -562,13 +565,6 @@ impl<'a> BlueprintBuilder<'a> { "sled_id" => sled_id.to_string(), )); - // If there are any `ClickhouseServer` or `ClickhouseKeeper` zones that - // are not expunged and we no longer have a `ClickhousePolicy` which - // indicates replicated clickhouse clusters should be running, we need - // to expunge all such zones. - let clickhouse_cluster_enabled = - self.input.clickhouse_cluster_enabled(); - // Do any zones need to be marked expunged? let mut zones_to_expunge = BTreeMap::new(); @@ -580,11 +576,9 @@ impl<'a> BlueprintBuilder<'a> { "zone_id" => zone_id.to_string() )); - let Some(reason) = zone_needs_expungement( - sled_details, - zone_config, - clickhouse_cluster_enabled, - ) else { + let Some(reason) = + zone_needs_expungement(sled_details, zone_config, &self.input) + else { continue; }; @@ -630,6 +624,13 @@ impl<'a> BlueprintBuilder<'a> { expunging related zone" ); } + ZoneExpungeReason::ClickhouseSingleNodeDisabled => { + info!( + &log, + "clickhouse single-node disabled via policy, \ + expunging related zone" + ); + } } zones_to_expunge.insert(zone_id, reason); @@ -661,6 +662,7 @@ impl<'a> BlueprintBuilder<'a> { let mut count_sled_decommissioned = 0; let mut count_sled_expunged = 0; let mut count_clickhouse_cluster_disabled = 0; + let mut count_clickhouse_single_node_disabled = 0; for reason in zones_to_expunge.values() { match reason { ZoneExpungeReason::DiskExpunged => count_disk_expunged += 1, @@ -671,6 +673,9 @@ impl<'a> BlueprintBuilder<'a> { ZoneExpungeReason::ClickhouseClusterDisabled => { count_clickhouse_cluster_disabled += 1 } + ZoneExpungeReason::ClickhouseSingleNodeDisabled => { + count_clickhouse_single_node_disabled += 1 + } }; } let count_and_reason = [ @@ -681,6 +686,10 @@ impl<'a> BlueprintBuilder<'a> { count_clickhouse_cluster_disabled, ZoneExpungeReason::ClickhouseClusterDisabled, ), + ( + count_clickhouse_single_node_disabled, + ZoneExpungeReason::ClickhouseSingleNodeDisabled, + ), ]; for (count, reason) in count_and_reason { if count > 0 { diff --git a/nexus/reconfigurator/planning/src/planner.rs b/nexus/reconfigurator/planning/src/planner.rs index 145be867c6..c5ab26419a 100644 --- a/nexus/reconfigurator/planning/src/planner.rs +++ b/nexus/reconfigurator/planning/src/planner.rs @@ -749,7 +749,7 @@ fn sled_needs_all_zones_expunged( pub(crate) fn zone_needs_expungement( sled_details: &SledDetails, zone_config: &BlueprintZoneConfig, - clickhouse_cluster_enabled: bool, + input: &PlanningInput, ) -> Option { // Should we expunge the zone because the sled is gone? if let Some(reason) = @@ -776,7 +776,7 @@ pub(crate) fn zone_needs_expungement( // Should we expunge the zone because clickhouse clusters are no longer // enabled via policy? - if !clickhouse_cluster_enabled { + if !input.clickhouse_cluster_enabled() { if zone_config.zone_type.is_clickhouse_keeper() || zone_config.zone_type.is_clickhouse_server() { @@ -784,6 +784,14 @@ pub(crate) fn zone_needs_expungement( } } + // Should we expunge the zone because single-node clickhouse is no longer + // enabled via policy? + if !input.clickhouse_single_node_enabled() { + if zone_config.zone_type.is_clickhouse() { + return Some(ZoneExpungeReason::ClickhouseSingleNodeDisabled); + } + } + None } @@ -797,6 +805,7 @@ pub(crate) enum ZoneExpungeReason { SledDecommissioned, SledExpunged, ClickhouseClusterDisabled, + ClickhouseSingleNodeDisabled, } #[cfg(test)] @@ -821,6 +830,7 @@ mod test { use nexus_types::deployment::BlueprintZoneDisposition; use nexus_types::deployment::BlueprintZoneFilter; use nexus_types::deployment::BlueprintZoneType; + use nexus_types::deployment::ClickhouseMode; use nexus_types::deployment::ClickhousePolicy; use nexus_types::deployment::CockroachDbClusterVersion; use nexus_types::deployment::CockroachDbPreserveDowngrade; @@ -843,6 +853,12 @@ mod test { use std::net::IpAddr; use typed_rng::TypedUuidRng; + // Generate a ClickhousePolicy ignoring fields we don't care about for + /// planner tests + fn clickhouse_policy(mode: ClickhouseMode) -> ClickhousePolicy { + ClickhousePolicy { version: 0, mode, time_created: Utc::now() } + } + /// Runs through a basic sequence of blueprints for adding a sled #[test] fn test_basic_add_sled() { @@ -2622,11 +2638,11 @@ mod test { // Enable clickhouse clusters via policy let mut input_builder = input.into_builder(); - input_builder.policy_mut().clickhouse_policy = Some(ClickhousePolicy { - deploy_with_standalone: true, - target_servers, - target_keepers, - }); + input_builder.policy_mut().clickhouse_policy = + Some(clickhouse_policy(ClickhouseMode::Both { + target_servers, + target_keepers, + })); // Creating a new blueprint should deploy all the new clickhouse zones let input = input_builder.build(); @@ -2659,8 +2675,8 @@ mod test { .map(|z| z.id) .collect(); - assert_eq!(keeper_zone_ids.len(), target_keepers); - assert_eq!(server_zone_ids.len(), target_servers); + assert_eq!(keeper_zone_ids.len(), target_keepers as usize); + assert_eq!(server_zone_ids.len(), target_servers as usize); // We should be attempting to allocate all servers and keepers since // this the initial configuration @@ -2670,14 +2686,20 @@ mod test { assert_eq!(clickhouse_cluster_config.generation, 2.into()); assert_eq!( clickhouse_cluster_config.max_used_keeper_id, - (target_keepers as u64).into() + (u64::from(target_keepers)).into() ); assert_eq!( clickhouse_cluster_config.max_used_server_id, - (target_servers as u64).into() + (u64::from(target_servers)).into() + ); + assert_eq!( + clickhouse_cluster_config.keepers.len(), + target_keepers as usize + ); + assert_eq!( + clickhouse_cluster_config.servers.len(), + target_servers as usize ); - assert_eq!(clickhouse_cluster_config.keepers.len(), target_keepers); - assert_eq!(clickhouse_cluster_config.servers.len(), target_servers); // Ensure that the added keepers are in server zones for zone_id in clickhouse_cluster_config.keepers.keys() { @@ -2769,11 +2791,11 @@ mod test { // Enable clickhouse clusters via policy let target_keepers = 5; let mut input_builder = input.into_builder(); - input_builder.policy_mut().clickhouse_policy = Some(ClickhousePolicy { - deploy_with_standalone: true, - target_servers, - target_keepers, - }); + input_builder.policy_mut().clickhouse_policy = + Some(clickhouse_policy(ClickhouseMode::Both { + target_servers, + target_keepers, + })); let input = input_builder.build(); let blueprint5 = Planner::new_based_on( log.clone(), @@ -2799,7 +2821,7 @@ mod test { .collect(); // We should have allocated 2 new keeper zones - assert_eq!(new_keeper_zone_ids.len(), target_keepers); + assert_eq!(new_keeper_zone_ids.len(), target_keepers as usize); assert!(keeper_zone_ids.is_subset(&new_keeper_zone_ids)); // We should be trying to provision one new keeper for a keeper zone @@ -2869,7 +2891,7 @@ mod test { bp7_config.keepers.len(), bp7_config.max_used_keeper_id.0 as usize ); - assert_eq!(bp7_config.keepers.len(), target_keepers); + assert_eq!(bp7_config.keepers.len(), target_keepers as usize); assert_eq!( bp7_config.highest_seen_keeper_leader_committed_log_index, 2 @@ -2909,7 +2931,7 @@ mod test { bp7_config.max_used_keeper_id ); assert_eq!(bp8_config.keepers, bp7_config.keepers); - assert_eq!(bp7_config.keepers.len(), target_keepers); + assert_eq!(bp7_config.keepers.len(), target_keepers as usize); assert_eq!( bp8_config.highest_seen_keeper_leader_committed_log_index, 3 @@ -2935,11 +2957,11 @@ mod test { // Enable clickhouse clusters via policy let mut input_builder = input.into_builder(); - input_builder.policy_mut().clickhouse_policy = Some(ClickhousePolicy { - deploy_with_standalone: true, - target_servers, - target_keepers, - }); + input_builder.policy_mut().clickhouse_policy = + Some(clickhouse_policy(ClickhouseMode::Both { + target_servers, + target_keepers, + })); let input = input_builder.build(); // Create a new blueprint to deploy all our clickhouse zones @@ -2972,13 +2994,13 @@ mod test { .map(|z| z.id) .collect(); - assert_eq!(keeper_zone_ids.len(), target_keepers); - assert_eq!(server_zone_ids.len(), target_servers); + assert_eq!(keeper_zone_ids.len(), target_keepers as usize); + assert_eq!(server_zone_ids.len(), target_servers as usize); // Directly manipulate the blueprint and inventory so that the // clickhouse clusters are stable let config = blueprint2.clickhouse_cluster_config.as_mut().unwrap(); - config.max_used_keeper_id = (target_keepers as u64).into(); + config.max_used_keeper_id = (u64::from(target_keepers)).into(); config.keepers = keeper_zone_ids .iter() .enumerate() @@ -3118,7 +3140,7 @@ mod test { // We've only added one keeper from our desired state // This brings us back up to our target count assert_eq!(config.keepers.len(), old_config.keepers.len() + 1); - assert_eq!(config.keepers.len(), target_keepers); + assert_eq!(config.keepers.len(), target_keepers as usize); // We've allocated one new keeper assert_eq!( config.max_used_keeper_id, @@ -3129,8 +3151,9 @@ mod test { } #[test] - fn test_expunge_all_clickhouse_cluster_zones_after_policy_is_disabled() { - static TEST_NAME: &str = "planner_expunge_all_clickhouse_cluster_zones_after_policy_is_disabled"; + fn test_expunge_clickhouse_zones_after_policy_is_changed() { + static TEST_NAME: &str = + "planner_expunge_clickhouse__zones_after_policy_is_changed"; let logctx = test_setup_log(TEST_NAME); let log = logctx.log.clone(); @@ -3142,11 +3165,11 @@ mod test { // Enable clickhouse clusters via policy let mut input_builder = input.into_builder(); - input_builder.policy_mut().clickhouse_policy = Some(ClickhousePolicy { - deploy_with_standalone: true, - target_servers, - target_keepers, - }); + input_builder.policy_mut().clickhouse_policy = + Some(clickhouse_policy(ClickhouseMode::Both { + target_servers, + target_keepers, + })); let input = input_builder.build(); // Create a new blueprint to deploy all our clickhouse zones @@ -3179,15 +3202,24 @@ mod test { .map(|z| z.id) .collect(); - assert_eq!(keeper_zone_ids.len(), target_keepers); - assert_eq!(server_zone_ids.len(), target_servers); + assert_eq!(keeper_zone_ids.len(), target_keepers as usize); + assert_eq!(server_zone_ids.len(), target_servers as usize); - // Disable clickhouse clusters via policy + // Disable clickhouse single node via policy, and ensure the zone goes + // away. First ensure we have one. + assert_eq!( + 1, + active_zones.iter().filter(|z| z.zone_type.is_clickhouse()).count() + ); let mut input_builder = input.into_builder(); - input_builder.policy_mut().clickhouse_policy = None; + input_builder.policy_mut().clickhouse_policy = + Some(clickhouse_policy(ClickhouseMode::ClusterOnly { + target_servers, + target_keepers, + })); let input = input_builder.build(); - // Create a new blueprint with the disabled policy + // Create a new blueprint with `ClickhouseMode::ClusterOnly` let blueprint3 = Planner::new_based_on( log.clone(), &blueprint2, @@ -3200,9 +3232,37 @@ mod test { .plan() .expect("plan"); + // We should have expunged our single-node clickhouse zone + let expunged_zones: Vec<_> = blueprint3 + .all_omicron_zones(BlueprintZoneFilter::Expunged) + .map(|(_, z)| z.clone()) + .collect(); + + assert_eq!(1, expunged_zones.len()); + assert!(expunged_zones.first().unwrap().zone_type.is_clickhouse()); + + // Disable clickhouse clusters via policy and restart single node + let mut input_builder = input.into_builder(); + input_builder.policy_mut().clickhouse_policy = + Some(clickhouse_policy(ClickhouseMode::SingleNodeOnly)); + let input = input_builder.build(); + + // Create a new blueprint for `ClickhouseMode::SingleNodeOnly` + let blueprint4 = Planner::new_based_on( + log.clone(), + &blueprint3, + &input, + "test_blueprint4", + &collection, + ) + .expect("created planner") + .with_rng_seed((TEST_NAME, "bp4")) + .plan() + .expect("plan"); + // All our clickhouse keeper and server zones that we created when we // enabled our clickhouse policy should be expunged when we disable it. - let expunged_zones: Vec<_> = blueprint3 + let expunged_zones: Vec<_> = blueprint4 .all_omicron_zones(BlueprintZoneFilter::Expunged) .map(|(_, z)| z.clone()) .collect(); @@ -3221,6 +3281,15 @@ mod test { assert_eq!(keeper_zone_ids, expunged_keeper_zone_ids); assert_eq!(server_zone_ids, expunged_server_zone_ids); + // We should have a new single-node clickhouze zone + assert_eq!( + 1, + blueprint4 + .all_omicron_zones(BlueprintZoneFilter::ShouldBeRunning) + .filter(|(_, z)| z.zone_type.is_clickhouse()) + .count() + ); + logctx.cleanup_successful(); } } diff --git a/nexus/reconfigurator/preparation/src/lib.rs b/nexus/reconfigurator/preparation/src/lib.rs index 9e14289e8a..24f32e9187 100644 --- a/nexus/reconfigurator/preparation/src/lib.rs +++ b/nexus/reconfigurator/preparation/src/lib.rs @@ -16,6 +16,7 @@ use nexus_db_queries::db::pagination::Paginator; use nexus_db_queries::db::DataStore; use nexus_types::deployment::Blueprint; use nexus_types::deployment::BlueprintMetadata; +use nexus_types::deployment::ClickhousePolicy; use nexus_types::deployment::CockroachDbClusterVersion; use nexus_types::deployment::CockroachDbSettings; use nexus_types::deployment::OmicronZoneExternalIp; @@ -75,6 +76,7 @@ pub struct PlanningInputFromDb<'a> { pub internal_dns_version: nexus_db_model::Generation, pub external_dns_version: nexus_db_model::Generation, pub cockroachdb_settings: &'a CockroachDbSettings, + pub clickhouse_policy: Option, pub log: &'a Logger, } @@ -138,6 +140,11 @@ impl PlanningInputFromDb<'_> { .await .internal_context("fetching cockroachdb settings")?; + let clickhouse_policy = datastore + .clickhouse_policy_get_latest(opctx) + .await + .internal_context("fetching clickhouse policy")?; + let planning_input = PlanningInputFromDb { sled_rows: &sled_rows, zpool_rows: &zpool_rows, @@ -156,6 +163,7 @@ impl PlanningInputFromDb<'_> { internal_dns_version, external_dns_version, cockroachdb_settings: &cockroachdb_settings, + clickhouse_policy, } .build() .internal_context("assembling planning_input")?; @@ -177,7 +185,7 @@ impl PlanningInputFromDb<'_> { .target_cockroachdb_cluster_version, target_crucible_pantry_zone_count: self .target_crucible_pantry_zone_count, - clickhouse_policy: None, + clickhouse_policy: self.clickhouse_policy.clone(), }; let mut builder = PlanningInputBuilder::new( policy, diff --git a/nexus/types/src/deployment.rs b/nexus/types/src/deployment.rs index eabe4e5a3b..74ef1dd878 100644 --- a/nexus/types/src/deployment.rs +++ b/nexus/types/src/deployment.rs @@ -59,6 +59,7 @@ pub use network_resources::OmicronZoneExternalSnatIp; pub use network_resources::OmicronZoneNetworkResources; pub use network_resources::OmicronZoneNic; pub use network_resources::OmicronZoneNicEntry; +pub use planning_input::ClickhouseMode; pub use planning_input::ClickhousePolicy; pub use planning_input::CockroachDbClusterVersion; pub use planning_input::CockroachDbPreserveDowngrade; diff --git a/nexus/types/src/deployment/planning_input.rs b/nexus/types/src/deployment/planning_input.rs index 5541df60e6..741cf3b539 100644 --- a/nexus/types/src/deployment/planning_input.rs +++ b/nexus/types/src/deployment/planning_input.rs @@ -14,6 +14,8 @@ use crate::external_api::views::PhysicalDiskState; use crate::external_api::views::SledPolicy; use crate::external_api::views::SledProvisionPolicy; use crate::external_api::views::SledState; +use chrono::DateTime; +use chrono::Utc; use clap::ValueEnum; use ipnetwork::IpNetwork; use omicron_common::address::IpRange; @@ -123,14 +125,12 @@ impl PlanningInput { } pub fn target_clickhouse_zone_count(&self) -> usize { - if let Some(policy) = &self.policy.clickhouse_policy { - if policy.deploy_with_standalone { - SINGLE_NODE_CLICKHOUSE_REDUNDANCY - } else { - 0 - } - } else { - SINGLE_NODE_CLICKHOUSE_REDUNDANCY + match self.policy.clickhouse_policy.as_ref().map(|policy| &policy.mode) + { + Some(&ClickhouseMode::ClusterOnly { .. }) => 0, + Some(&ClickhouseMode::SingleNodeOnly) + | Some(&ClickhouseMode::Both { .. }) + | None => SINGLE_NODE_CLICKHOUSE_REDUNDANCY, } } @@ -138,7 +138,7 @@ impl PlanningInput { self.policy .clickhouse_policy .as_ref() - .map(|policy| policy.target_servers) + .map(|policy| usize::from(policy.mode.target_servers())) .unwrap_or(0) } @@ -146,7 +146,7 @@ impl PlanningInput { self.policy .clickhouse_policy .as_ref() - .map(|policy| policy.target_keepers) + .map(|policy| usize::from(policy.mode.target_keepers())) .unwrap_or(0) } @@ -155,7 +155,18 @@ impl PlanningInput { } pub fn clickhouse_cluster_enabled(&self) -> bool { - self.policy.clickhouse_policy.is_some() + let Some(clickhouse_policy) = &self.policy.clickhouse_policy else { + return false; + }; + clickhouse_policy.mode.cluster_enabled() + } + + pub fn clickhouse_single_node_enabled(&self) -> bool { + let Some(clickhouse_policy) = &self.policy.clickhouse_policy else { + // If there is no policy we assume single-node is enabled. + return true; + }; + clickhouse_policy.mode.single_node_enabled() } pub fn all_sleds( @@ -876,20 +887,58 @@ pub struct Policy { pub clickhouse_policy: Option, } -/// Policy for replicated clickhouse setups #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ClickhousePolicy { - /// Should we run the single-node cluster alongside the replicated cluster? - /// This is stage 1 of our deployment plan as laid out in RFD 468 - /// - /// If this is set to false, then we will only deploy replicated clusters. - pub deploy_with_standalone: bool, + pub version: u32, + pub mode: ClickhouseMode, + pub time_created: DateTime, +} - /// Desired number of clickhouse servers - pub target_servers: usize, +/// How to deploy clickhouse nodes +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ClickhouseMode { + SingleNodeOnly, + ClusterOnly { target_servers: u8, target_keepers: u8 }, + Both { target_servers: u8, target_keepers: u8 }, +} - /// Desired number of clickhouse keepers - pub target_keepers: usize, +impl ClickhouseMode { + pub fn cluster_enabled(&self) -> bool { + match self { + ClickhouseMode::SingleNodeOnly => false, + ClickhouseMode::ClusterOnly { .. } + | ClickhouseMode::Both { .. } => true, + } + } + + pub fn single_node_enabled(&self) -> bool { + match self { + ClickhouseMode::ClusterOnly { .. } => false, + ClickhouseMode::SingleNodeOnly | ClickhouseMode::Both { .. } => { + true + } + } + } + + pub fn target_servers(&self) -> u8 { + match self { + ClickhouseMode::SingleNodeOnly => 0, + ClickhouseMode::ClusterOnly { target_servers, .. } => { + *target_servers + } + ClickhouseMode::Both { target_servers, .. } => *target_servers, + } + } + + pub fn target_keepers(&self) -> u8 { + match self { + ClickhouseMode::SingleNodeOnly => 0, + ClickhouseMode::ClusterOnly { target_keepers, .. } => { + *target_keepers + } + ClickhouseMode::Both { target_keepers, .. } => *target_keepers, + } + } } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/schema/crdb/clickhouse-policy/up1.sql b/schema/crdb/clickhouse-policy/up1.sql new file mode 100644 index 0000000000..506e329b21 --- /dev/null +++ b/schema/crdb/clickhouse-policy/up1.sql @@ -0,0 +1,5 @@ +CREATE TYPE IF NOT EXISTS omicron.public.clickhouse_mode AS ENUM ( + 'single_node_only', + 'cluster_only', + 'both' +); diff --git a/schema/crdb/clickhouse-policy/up2.sql b/schema/crdb/clickhouse-policy/up2.sql new file mode 100644 index 0000000000..52c950a570 --- /dev/null +++ b/schema/crdb/clickhouse-policy/up2.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS omicron.public.clickhouse_policy ( + version INT8 PRIMARY KEY, + clickhouse_mode omicron.public.clickhouse_mode NOT NULL, + clickhouse_cluster_target_servers INT2 NOT NULL, + clickhouse_cluster_target_keepers INT2 NOT NULL, + time_created TIMESTAMPTZ NOT NULL +); diff --git a/schema/crdb/dbinit.sql b/schema/crdb/dbinit.sql index f78e034721..3a2f2a3a81 100644 --- a/schema/crdb/dbinit.sql +++ b/schema/crdb/dbinit.sql @@ -42,6 +42,46 @@ ALTER DEFAULT PRIVILEGES GRANT INSERT, SELECT, UPDATE, DELETE ON TABLES to omicr */ ALTER RANGE default CONFIGURE ZONE USING num_replicas = 5; + +/* + * The deployment strategy for clickhouse + */ +CREATE TYPE IF NOT EXISTS omicron.public.clickhouse_mode AS ENUM ( + -- Only deploy a single node clickhouse + 'single_node_only', + + -- Only deploy a clickhouse cluster without any single node deployments + 'cluster_only', + + -- Deploy both a single node and cluster deployment. + -- This is the strategy for stage 1 described in RFD 468 + 'both' +); + +/* + * A planning policy for clickhouse for a single multirack setup + * + * We currently implicitly tie this policy to a rack, as we don't yet support + * multirack. Multiple parts of this database schema are going to have to change + * to support multirack, so we add one more for now. + */ +CREATE TABLE IF NOT EXISTS omicron.public.clickhouse_policy ( + -- Monotonically increasing version for all policies + -- + -- This is similar to `bp_target` which will also require being changed for + -- multirack to associate with some sort of rack group ID. + version INT8 PRIMARY KEY, + + clickhouse_mode omicron.public.clickhouse_mode NOT NULL, + + -- Only greater than 0 when clickhouse cluster is enabled + clickhouse_cluster_target_servers INT2 NOT NULL, + -- Only greater than 0 when clickhouse cluster is enabled + clickhouse_cluster_target_keepers INT2 NOT NULL, + + time_created TIMESTAMPTZ NOT NULL +); + /* * Racks */ @@ -4498,7 +4538,7 @@ INSERT INTO omicron.public.db_metadata ( version, target_version ) VALUES - (TRUE, NOW(), NOW(), '109.0.0', NULL) + (TRUE, NOW(), NOW(), '110.0.0', NULL) ON CONFLICT DO NOTHING; COMMIT; From e57776527f9af95c426ebacc3be8915dc7705f57 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 17 Oct 2024 11:37:02 -0700 Subject: [PATCH 2/7] [test-util] Embiggen SP-sim discovery timeout (#6879) --- gateway-test-utils/src/setup.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/gateway-test-utils/src/setup.rs b/gateway-test-utils/src/setup.rs index 056bb451f7..d8e5a89734 100644 --- a/gateway-test-utils/src/setup.rs +++ b/gateway-test-utils/src/setup.rs @@ -197,7 +197,11 @@ pub async fn test_setup_with_config( future::ready(result) }, &Duration::from_millis(100), - &Duration::from_secs(1), + // This seems like a pretty long time to wait for MGS to discover the + // simulated SPs, but we've seen tests fail due to timeouts here in the + // past, so we may as well be generous: + // https://github.com/oxidecomputer/omicron/issues/6877 + &Duration::from_secs(30), ) .await .unwrap(); From b21c9158617c36cd440d733988204cfcdd0622a5 Mon Sep 17 00:00:00 2001 From: "oxide-renovate[bot]" <146848827+oxide-renovate[bot]@users.noreply.github.com> Date: Thu, 17 Oct 2024 14:31:53 -0700 Subject: [PATCH 3/7] Update Rust crate uzers to 0.12 (#6860) --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9ed5eb5828..acb9e9d57f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12240,9 +12240,9 @@ dependencies = [ [[package]] name = "uzers" -version = "0.11.3" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76d283dc7e8c901e79e32d077866eaf599156cbf427fffa8289aecc52c5c3f63" +checksum = "4df81ff504e7d82ad53e95ed1ad5b72103c11253f39238bcc0235b90768a97dd" dependencies = [ "libc", "log", diff --git a/Cargo.toml b/Cargo.toml index 6168caee77..b585bc866b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -624,7 +624,7 @@ update-engine = { path = "update-engine" } url = "2.5.2" usdt = "0.5.0" uuid = { version = "1.10.0", features = ["serde", "v4"] } -uzers = "0.11" +uzers = "0.12" walkdir = "2.5" whoami = "1.5" wicket = { path = "wicket" } From 7b3dd553c58fc3c983d8dcb8bd0c62cf9499a83e Mon Sep 17 00:00:00 2001 From: "oxide-renovate[bot]" <146848827+oxide-renovate[bot]@users.noreply.github.com> Date: Fri, 18 Oct 2024 04:40:05 +0000 Subject: [PATCH 4/7] Update taiki-e/install-action digest to 3e1dd22 (#6897) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR contains the following updates: | Package | Type | Update | Change | |---|---|---|---| | [taiki-e/install-action](https://redirect.github.com/taiki-e/install-action) | action | digest | [`42f4ec8` -> `3e1dd22`](https://redirect.github.com/taiki-e/install-action/compare/42f4ec8...3e1dd22) | --- ### Configuration 📅 **Schedule**: Branch creation - "after 8pm,before 6am" in timezone America/Los_Angeles, Automerge - "after 8pm,before 6am" in timezone America/Los_Angeles. 🚦 **Automerge**: Enabled. ♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] If you want to rebase/retry this PR, check this box --- This PR has been generated by [Renovate Bot](https://redirect.github.com/renovatebot/renovate). Co-authored-by: oxide-renovate[bot] <146848827+oxide-renovate[bot]@users.noreply.github.com> --- .github/workflows/hakari.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/hakari.yml b/.github/workflows/hakari.yml index 25975873f3..3379ae9e51 100644 --- a/.github/workflows/hakari.yml +++ b/.github/workflows/hakari.yml @@ -23,7 +23,7 @@ jobs: with: toolchain: stable - name: Install cargo-hakari - uses: taiki-e/install-action@42f4ec8e42bf7fe4dadd39bfc534566095a8edff # v2 + uses: taiki-e/install-action@3e1dd227d968fb9fa43ff604bd9b0ccd1b714919 # v2 with: tool: cargo-hakari - name: Check workspace-hack Cargo.toml is up-to-date From 96db4fd0791b607bb0d3f5cbf60b1c3a6eb150ae Mon Sep 17 00:00:00 2001 From: iliana etaoin Date: Fri, 18 Oct 2024 11:20:08 -0700 Subject: [PATCH 5/7] xtask ls-apis: try SSH access for dendrite (#6840) --- .github/buildomat/build-and-test.sh | 28 ++++----- dev-tools/ls-apis/src/cargo.rs | 89 ++++++++++++++++++++++++++++- 2 files changed, 103 insertions(+), 14 deletions(-) diff --git a/.github/buildomat/build-and-test.sh b/.github/buildomat/build-and-test.sh index 70babec2b4..54ac713c3d 100755 --- a/.github/buildomat/build-and-test.sh +++ b/.github/buildomat/build-and-test.sh @@ -41,6 +41,21 @@ mkdir -p "$OUTPUT_DIR" banner prerequisites ptime -m bash ./tools/install_builder_prerequisites.sh -y +# Do some test runs of the `ls-apis` command. +# +# This may require cloning some dependent private repos. We do this before the +# main battery of tests because the GitHub tokens required for this only last +# for an hour so we want to get this done early. +# +# (TODO: This makes the build timings we record inaccurate.) +banner ls-apis +( + source ./tools/include/force-git-over-https.sh; + ptime -m cargo xtask ls-apis apis && + ptime -m cargo xtask ls-apis deployment-units && + ptime -m cargo xtask ls-apis servers +) + # # We build with: # @@ -82,19 +97,6 @@ export RUSTC_BOOTSTRAP=1 # We report build progress to stderr, and the "--timings=json" output goes to stdout. ptime -m cargo build -Z unstable-options --timings=json --workspace --tests --locked --verbose 1> "$OUTPUT_DIR/crate-build-timings.json" -# Do some test runs of the `ls-apis` command. -# -# This may require cloning some dependent private repos. We do this before the -# main battery of tests because the GitHub tokens required for this only last -# for an hour so we want to get this done early. -banner ls-apis -( - source ./tools/include/force-git-over-https.sh; - ptime -m cargo xtask ls-apis apis && - ptime -m cargo xtask ls-apis deployment-units && - ptime -m cargo xtask ls-apis servers -) - # # We apply our own timeout to ensure that we get a normal failure on timeout # rather than a buildomat timeout. See oxidecomputer/buildomat#8. diff --git a/dev-tools/ls-apis/src/cargo.rs b/dev-tools/ls-apis/src/cargo.rs index edff43ff12..1a7e87dd0c 100644 --- a/dev-tools/ls-apis/src/cargo.rs +++ b/dev-tools/ls-apis/src/cargo.rs @@ -65,7 +65,14 @@ impl Workspace { if let Some(manifest_path) = manifest_path { cmd.manifest_path(manifest_path); } - let metadata = cmd.exec().context("loading metadata")?; + let metadata = match cmd.exec() { + Err(original_err) if name == "maghemite" => { + dendrite_workaround(cmd, original_err)? + } + otherwise => otherwise.with_context(|| { + format!("failed to load metadata for {name}") + })?, + }; let workspace_root = metadata.workspace_root; // Build an index of all packages by id. Identify duplicates because we @@ -375,3 +382,83 @@ impl DepPath { self.0.iter().any(|p| pkgids.contains(p)) } } + +// Dendrite is not (yet) a public repository, but it's a dependency of +// Maghemite. There are two expected cases for running Omicron tests locally +// that we know of: +// - The developer has a Git credential helper of some kind set up to +// successfully clone private repositories over HTTPS. +// - The developer has an SSH agent or other local SSH key that they use to +// clone repositories over SSH. +// We call this function when we fail to fetch the Dendrite repository over +// HTTPS. Under the assumption that the user falls in the second group. +// we attempt to use SSH to fetch the repository by setting `GIT_CONFIG_*` +// environment variables to rewrite the repository URL to an SSH URL. If that +// fails, we'll verbosely inform the user as to how both methods failed and +// provide some context. +// +// This entire workaround can and very much should go away once Dendrite is +// public. +fn dendrite_workaround( + mut cmd: cargo_metadata::MetadataCommand, + original_err: cargo_metadata::Error, +) -> Result { + eprintln!( + "warning: failed to load metadata for maghemite; \ + trying dendrite workaround" + ); + + let count = std::env::var_os("GIT_CONFIG_COUNT") + .map(|s| -> Result { + s.into_string() + .map_err(|_| anyhow!("$GIT_CONFIG_COUNT is not an integer"))? + .parse() + .context("$GIT_CONFIG_COUNT is not an integer") + }) + .transpose()? + .unwrap_or_default(); + cmd.env("CARGO_NET_GIT_FETCH_WITH_CLI", "true"); + cmd.env( + format!("GIT_CONFIG_KEY_{count}"), + "url.git@github.com:oxidecomputer/dendrite.insteadOf", + ); + cmd.env( + format!("GIT_CONFIG_VALUE_{count}"), + "https://github.com/oxidecomputer/dendrite", + ); + cmd.env("GIT_CONFIG_COUNT", (count + 1).to_string()); + cmd.exec().map_err(|err| { + let cmd = cmd.cargo_command(); + let original_err = anyhow::Error::from(original_err); + let err = anyhow::Error::from(err); + anyhow::anyhow!("failed to load metadata for maghemite + +`cargo xtask ls-apis` expects to be able to run `cargo metadata` on the +Maghemite workspace that Omicron depends on. Maghemite has a dependency on a +private repository (Dendrite), so `cargo metadata` can fail if you are unable +to clone Dendrite via an HTTPS URL. As a fallback, we also tried to run `cargo +metadata` with environment variables that force `cargo metadata` to use an SSH +URL; unfortunately that also failed. + +To successfully run this command (or expectorate test), your environment needs +to be set up to clone a private Oxide repository from GitHub. This can be done +with either a Git credential helper or an SSH key: + +https://doc.rust-lang.org/cargo/appendix/git-authentication.html +https://docs.github.com/en/get-started/getting-started-with-git/caching-your-github-credentials-in-git + +(If you don't have access to private Oxide repos, you won't be able to +successfully run this command or test.) + +More context: https://github.com/oxidecomputer/omicron/issues/6839 + +===== The fallback command that failed: ===== +{cmd:?} + +===== The error that occurred while fetching using HTTPS: ===== +{original_err:?} + +===== The error that occurred while fetching using SSH (fallback): ===== +{err:?}") + }) +} From da0ea501e55b0731f6ce03381c8cda170421dc0a Mon Sep 17 00:00:00 2001 From: "oxide-renovate[bot]" <146848827+oxide-renovate[bot]@users.noreply.github.com> Date: Fri, 18 Oct 2024 11:30:56 -0700 Subject: [PATCH 6/7] Update Rust crate libc to 0.2.161 (#6899) --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- workspace-hack/Cargo.toml | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index acb9e9d57f..c5f924b2be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4713,9 +4713,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.159" +version = "0.2.161" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "561d97a539a36e26a9a5fad1ea11a3039a67714694aaa379433e580854bc3dc5" +checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" [[package]] name = "libdlpi-sys" diff --git a/Cargo.toml b/Cargo.toml index b585bc866b..64302887e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -418,7 +418,7 @@ ipnetwork = { version = "0.20", features = ["schemars"] } ispf = { git = "https://github.com/oxidecomputer/ispf" } key-manager = { path = "key-manager" } kstat-rs = "0.2.4" -libc = "0.2.159" +libc = "0.2.161" libipcc = { git = "https://github.com/oxidecomputer/libipcc", rev = "fdffa212373a8f92473ea5f411088912bf458d5f" } libfalcon = { git = "https://github.com/oxidecomputer/falcon", branch = "main" } libnvme = { git = "https://github.com/oxidecomputer/libnvme", rev = "dd5bb221d327a1bc9287961718c3c10d6bd37da0" } diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index b3ab1d5ed5..ddf57acb4c 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -71,7 +71,7 @@ itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12.1" } itertools-93f6ce9d446188ac = { package = "itertools", version = "0.10.5" } lalrpop-util = { version = "0.19.12" } lazy_static = { version = "1.5.0", default-features = false, features = ["spin_no_std"] } -libc = { version = "0.2.159", features = ["extra_traits"] } +libc = { version = "0.2.161", features = ["extra_traits"] } log = { version = "0.4.22", default-features = false, features = ["kv_unstable", "std"] } managed = { version = "0.8.0", default-features = false, features = ["alloc", "map"] } memchr = { version = "2.7.4" } @@ -187,7 +187,7 @@ itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12.1" } itertools-93f6ce9d446188ac = { package = "itertools", version = "0.10.5" } lalrpop-util = { version = "0.19.12" } lazy_static = { version = "1.5.0", default-features = false, features = ["spin_no_std"] } -libc = { version = "0.2.159", features = ["extra_traits"] } +libc = { version = "0.2.161", features = ["extra_traits"] } log = { version = "0.4.22", default-features = false, features = ["kv_unstable", "std"] } managed = { version = "0.8.0", default-features = false, features = ["alloc", "map"] } memchr = { version = "2.7.4" } From 778a7e931d52ab0e20e7fd75cf7a1b406c3d46c9 Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Fri, 18 Oct 2024 14:48:03 -0400 Subject: [PATCH 7/7] Add ClickHouse native connection pool (#6889) - Implement a connector and connection pool for talking to ClickHouse over the native TCP protocol - Add the native TCP address or a `BoxedResolver` for resolving it to all code that connects to ClickHouse. This is a pretty noisy change, since we now need two addresses / objects everywhere. We'll take it back down to one when we completely make the switch to the native protocol and remove the HTTP interface. - Remove the feature flag gating the native code, keeping that just for the prototype SQL client shell. - NFC formatting change to bring the `oximeter_db::native` import style in line with the rest of the crate. --- .github/buildomat/jobs/deploy.sh | 6 +- Cargo.lock | 39 ++- Cargo.toml | 4 +- dev-tools/omdb/src/bin/omdb/oxql.rs | 56 +++- dev-tools/omdb/tests/usage_errors.out | 7 +- nexus/src/app/oximeter.rs | 27 +- nexus/test-utils/src/lib.rs | 10 +- nexus/tests/integration_tests/oximeter.rs | 8 +- oximeter/collector/src/agent.rs | 20 +- .../src/bin/clickhouse-schema-updater.rs | 19 +- oximeter/collector/src/lib.rs | 89 ++--- oximeter/db/Cargo.toml | 12 +- oximeter/db/src/bin/oxdb/main.rs | 63 +++- oximeter/db/src/client/mod.rs | 315 ++++++++++++++---- oximeter/db/src/client/oxql.rs | 6 +- oximeter/db/src/lib.rs | 19 +- oximeter/db/src/native/block.rs | 47 +-- oximeter/db/src/native/connection.rs | 119 +++++-- oximeter/db/src/native/io/block.rs | 10 +- oximeter/db/src/native/io/column.rs | 20 +- oximeter/db/src/native/io/exception.rs | 4 +- oximeter/db/src/native/io/packet/client.rs | 20 +- oximeter/db/src/native/io/packet/server.rs | 18 +- oximeter/db/src/native/io/progress.rs | 3 +- oximeter/db/src/native/io/string.rs | 3 +- oximeter/db/src/native/io/varuint.rs | 3 +- oximeter/db/src/native/mod.rs | 4 + oximeter/db/src/native/packets/client.rs | 10 +- oximeter/db/src/native/packets/server.rs | 3 +- oximeter/db/src/shells/mod.rs | 2 +- oximeter/db/src/shells/native.rs | 5 +- oximeter/db/src/shells/oxql.rs | 5 +- oximeter/db/src/shells/sql.rs | 5 +- oximeter/db/tests/integration_test.rs | 21 +- workspace-hack/Cargo.toml | 4 +- 35 files changed, 724 insertions(+), 282 deletions(-) diff --git a/.github/buildomat/jobs/deploy.sh b/.github/buildomat/jobs/deploy.sh index 018b532107..a9f1bf8a9c 100755 --- a/.github/buildomat/jobs/deploy.sh +++ b/.github/buildomat/jobs/deploy.sh @@ -418,7 +418,11 @@ done /usr/oxide/oxide --resolve "$OXIDE_RESOLVE" --cacert "$E2E_TLS_CERT" \ project create --name images --description "some images" -/usr/oxide/oxide --resolve "$OXIDE_RESOLVE" --cacert "$E2E_TLS_CERT" \ +# NOTE: Use a relatively large timeout on this call, to avoid #6771 +/usr/oxide/oxide \ + --resolve "$OXIDE_RESOLVE" \ + --cacert "$E2E_TLS_CERT" \ + --timeout 60 \ disk import \ --path debian-11-genericcloud-amd64.raw \ --disk debian11-boot \ diff --git a/Cargo.lock b/Cargo.lock index c5f924b2be..f1ad516a4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1362,12 +1362,13 @@ dependencies = [ [[package]] name = "clickward" version = "0.1.0" -source = "git+https://github.com/oxidecomputer/clickward?rev=ceec762e6a87d2a22bf56792a3025e145caa095e#ceec762e6a87d2a22bf56792a3025e145caa095e" +source = "git+https://github.com/oxidecomputer/clickward?rev=a1b342c2558e835d09e6e39a40d3de798a29c2f#a1b342c2558e835d09e6e39a40d3de798a29c2f5" dependencies = [ "anyhow", "camino", "clap", "derive_more", + "schemars", "serde", "serde_json", "thiserror", @@ -8718,9 +8719,9 @@ dependencies = [ [[package]] name = "qorb" -version = "0.0.2" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "104066c20d7277d0af40a7333c579a2a71cc6b68c14982d1da2e5747a381a3ed" +checksum = "675f442a5904b8b5dc9f5d298be36676b29e2e852eace78a3d3d00822469c88e" dependencies = [ "anyhow", "async-trait", @@ -8736,7 +8737,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", - "tokio-tungstenite 0.23.1", + "tokio-tungstenite 0.24.0", "tracing", ] @@ -11556,6 +11557,18 @@ dependencies = [ "tungstenite 0.23.0", ] +[[package]] +name = "tokio-tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.24.0", +] + [[package]] name = "tokio-util" version = "0.7.12" @@ -11873,6 +11886,24 @@ dependencies = [ "utf-8", ] +[[package]] +name = "tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "twox-hash" version = "1.6.3" diff --git a/Cargo.toml b/Cargo.toml index 64302887e5..81e09c3a67 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -319,7 +319,7 @@ clickhouse-admin-api = { path = "clickhouse-admin/api" } clickhouse-admin-keeper-client = { path = "clients/clickhouse-admin-keeper-client" } clickhouse-admin-server-client = { path = "clients/clickhouse-admin-server-client" } clickhouse-admin-types = { path = "clickhouse-admin/types" } -clickward = { git = "https://github.com/oxidecomputer/clickward", rev = "ceec762e6a87d2a22bf56792a3025e145caa095e" } +clickward = { git = "https://github.com/oxidecomputer/clickward", rev = "a1b342c2558e835d09e6e39a40d3de798a29c2f" } cockroach-admin-api = { path = "cockroach-admin/api" } cockroach-admin-client = { path = "clients/cockroach-admin-client" } cockroach-admin-types = { path = "cockroach-admin/types" } @@ -520,7 +520,7 @@ propolis_api_types = { git = "https://github.com/oxidecomputer/propolis", rev = propolis-client = { git = "https://github.com/oxidecomputer/propolis", rev = "11371b0f3743f8df5b047dc0edc2699f4bdf3927" } propolis-mock-server = { git = "https://github.com/oxidecomputer/propolis", rev = "11371b0f3743f8df5b047dc0edc2699f4bdf3927" } proptest = "1.5.0" -qorb = "0.0.2" +qorb = "0.1.1" quote = "1.0" rand = "0.8.5" rand_core = "0.6.4" diff --git a/dev-tools/omdb/src/bin/omdb/oxql.rs b/dev-tools/omdb/src/bin/omdb/oxql.rs index 28f405e067..172344b56c 100644 --- a/dev-tools/omdb/src/bin/omdb/oxql.rs +++ b/dev-tools/omdb/src/bin/omdb/oxql.rs @@ -31,6 +31,15 @@ pub struct OxqlArgs { )] clickhouse_url: Option, + /// URL of the ClickHouse server to connect to for the native protcol. + #[arg( + long, + env = "OMDB_CLICKHOUSE_NATIVE_URL", + global = true, + help_heading = CONNECTION_OPTIONS_HEADING, + )] + clickhouse_native_url: Option, + /// Print summaries of each SQL query run against the database. #[clap(long = "summaries")] print_summaries: bool, @@ -47,7 +56,8 @@ impl OxqlArgs { omdb: &Omdb, log: &Logger, ) -> anyhow::Result<()> { - let addr = self.addr(omdb, log).await?; + let http_addr = self.resolve_http_addr(omdb, log).await?; + let native_addr = self.resolve_native_addr(omdb, log).await?; let opts = ShellOptions { print_summaries: self.print_summaries, @@ -55,21 +65,53 @@ impl OxqlArgs { }; oxql::shell( - addr.ip(), - addr.port(), + http_addr.ip(), + http_addr.port(), + native_addr.port(), log.new(slog::o!("component" => "clickhouse-client")), opts, ) .await } - /// Resolve the ClickHouse URL to a socket address. - async fn addr( + /// Resolve the ClickHouse native TCP socket address. + async fn resolve_native_addr( + &self, + omdb: &Omdb, + log: &Logger, + ) -> anyhow::Result { + self.resolve_addr( + omdb, + log, + self.clickhouse_native_url.as_deref(), + ServiceName::ClickhouseNative, + ) + .await + } + + /// Resolve the ClickHouse HTTP URL to a socket address. + async fn resolve_http_addr( + &self, + omdb: &Omdb, + log: &Logger, + ) -> anyhow::Result { + self.resolve_addr( + omdb, + log, + self.clickhouse_url.as_deref(), + ServiceName::Clickhouse, + ) + .await + } + + async fn resolve_addr( &self, omdb: &Omdb, log: &Logger, + maybe_url: Option<&str>, + srv: ServiceName, ) -> anyhow::Result { - match &self.clickhouse_url { + match maybe_url { Some(cli_or_env_url) => Url::parse(&cli_or_env_url) .context( "failed parsing URL from command-line or environment variable", @@ -87,7 +129,7 @@ impl OxqlArgs { Ok(SocketAddr::V6( omdb.dns_lookup_one( log.clone(), - ServiceName::Clickhouse, + srv, ) .await .context("failed looking up ClickHouse internal DNS entry")?, diff --git a/dev-tools/omdb/tests/usage_errors.out b/dev-tools/omdb/tests/usage_errors.out index 56fa624771..de632819d6 100644 --- a/dev-tools/omdb/tests/usage_errors.out +++ b/dev-tools/omdb/tests/usage_errors.out @@ -783,13 +783,16 @@ Usage: omdb oxql [OPTIONS] Options: --log-level log level filter [env: LOG_LEVEL=] [default: warn] --summaries Print summaries of each SQL query run against the database - --elapsed Print the total elapsed query duration --color Color output [default: auto] [possible values: auto, always, never] + --elapsed Print the total elapsed query duration -h, --help Print help Connection Options: --clickhouse-url URL of the ClickHouse server to connect to [env: OMDB_CLICKHOUSE_URL=] + --clickhouse-native-url + URL of the ClickHouse server to connect to for the native protcol [env: + OMDB_CLICKHOUSE_NATIVE_URL=] --dns-server [env: OMDB_DNS_SERVER=] @@ -808,7 +811,7 @@ error: unexpected argument '--summarizes' found tip: a similar argument exists: '--summaries' -Usage: omdb oxql <--clickhouse-url |--summaries|--elapsed> +Usage: omdb oxql <--clickhouse-url |--clickhouse-native-url |--summaries|--elapsed> For more information, try '--help'. ============================================= diff --git a/nexus/src/app/oximeter.rs b/nexus/src/app/oximeter.rs index 770b5ac61b..8203cf4d48 100644 --- a/nexus/src/app/oximeter.rs +++ b/nexus/src/app/oximeter.rs @@ -12,7 +12,7 @@ use internal_dns_types::names::ServiceName; use nexus_db_queries::context::OpContext; use nexus_db_queries::db; use nexus_db_queries::db::DataStore; -use omicron_common::address::CLICKHOUSE_HTTP_PORT; +use omicron_common::address::CLICKHOUSE_TCP_PORT; use omicron_common::api::external::{DataPageParams, Error, ListResultVec}; use omicron_common::api::internal::nexus::{self, ProducerEndpoint}; use oximeter_client::Client as OximeterClient; @@ -60,15 +60,26 @@ impl LazyTimeseriesClient { pub(crate) async fn get( &self, ) -> Result { - let address = match &self.source { - ClientSource::FromIp { address } => *address, - ClientSource::FromDns { resolver } => SocketAddr::new( - resolver.lookup_ip(ServiceName::Clickhouse).await?, - CLICKHOUSE_HTTP_PORT, - ), + let (http_address, native_address) = match &self.source { + ClientSource::FromIp { address } => { + let native_address = + SocketAddr::new(address.ip(), CLICKHOUSE_TCP_PORT); + (*address, native_address) + } + ClientSource::FromDns { resolver } => { + let http_address = SocketAddr::from( + resolver.lookup_socket_v6(ServiceName::Clickhouse).await?, + ); + let native_address = SocketAddr::from( + resolver + .lookup_socket_v6(ServiceName::ClickhouseNative) + .await?, + ); + (http_address, native_address) + } }; - Ok(oximeter_db::Client::new(address, &self.log)) + Ok(oximeter_db::Client::new(http_address, native_address, &self.log)) } } diff --git a/nexus/test-utils/src/lib.rs b/nexus/test-utils/src/lib.rs index 5bd63765c4..29c6d634a9 100644 --- a/nexus/test-utils/src/lib.rs +++ b/nexus/test-utils/src/lib.rs @@ -624,6 +624,7 @@ impl<'a, N: NexusServer> ControlPlaneTestContextBuilder<'a, N> { log.new(o!("component" => "oximeter")), nexus_internal_addr, clickhouse.http_address().port(), + clickhouse.native_address().port(), collector_id, ) .await @@ -1449,11 +1450,16 @@ pub async fn start_sled_agent( pub async fn start_oximeter( log: Logger, nexus_address: SocketAddr, - db_port: u16, + http_port: u16, + native_port: u16, id: Uuid, ) -> Result { let db = oximeter_collector::DbConfig { - address: Some(SocketAddr::new(Ipv6Addr::LOCALHOST.into(), db_port)), + address: Some(SocketAddr::new(Ipv6Addr::LOCALHOST.into(), http_port)), + native_address: Some(SocketAddr::new( + Ipv6Addr::LOCALHOST.into(), + native_port, + )), batch_size: 10, batch_interval: 1, replicated: false, diff --git a/nexus/tests/integration_tests/oximeter.rs b/nexus/tests/integration_tests/oximeter.rs index 6a63799790..d842a2cc4a 100644 --- a/nexus/tests/integration_tests/oximeter.rs +++ b/nexus/tests/integration_tests/oximeter.rs @@ -118,7 +118,12 @@ async fn test_oximeter_reregistration() { // ClickHouse client for verifying collection. let ch_address = context.clickhouse.http_address().into(); - let client = oximeter_db::Client::new(ch_address, &context.logctx.log); + let native_address = context.clickhouse.native_address().into(); + let client = oximeter_db::Client::new( + ch_address, + native_address, + &context.logctx.log, + ); client .init_single_node_db() .await @@ -302,6 +307,7 @@ async fn test_oximeter_reregistration() { context.logctx.log.new(o!("component" => "oximeter")), context.server.get_http_server_internal_address().await, context.clickhouse.http_address().port(), + context.clickhouse.native_address().port(), oximeter_id, ) .await diff --git a/oximeter/collector/src/agent.rs b/oximeter/collector/src/agent.rs index ce0ab78e61..bcfa3b4f4d 100644 --- a/oximeter/collector/src/agent.rs +++ b/oximeter/collector/src/agent.rs @@ -16,6 +16,7 @@ use chrono::Utc; use futures::TryStreamExt; use nexus_client::types::IdSortMode; use nexus_client::Client as NexusClient; +use omicron_common::address::CLICKHOUSE_TCP_PORT; use omicron_common::backoff; use omicron_common::backoff::BackoffError; use oximeter::types::ProducerResults; @@ -34,6 +35,7 @@ use slog::warn; use slog::Logger; use std::collections::btree_map::Entry; use std::collections::BTreeMap; +use std::net::SocketAddr; use std::net::SocketAddrV6; use std::ops::Bound; use std::sync::Arc; @@ -383,12 +385,15 @@ pub struct OximeterAgent { impl OximeterAgent { /// Construct a new agent with the given ID and logger. + // TODO(cleanup): Remove this lint when we have only a native resolver. + #[allow(clippy::too_many_arguments)] pub async fn with_id( id: Uuid, address: SocketAddrV6, refresh_interval: Duration, db_config: DbConfig, - resolver: BoxedResolver, + http_resolver: BoxedResolver, + native_resolver: BoxedResolver, log: &Logger, replicated: bool, ) -> Result { @@ -414,7 +419,8 @@ impl OximeterAgent { // - The DB doesn't exist at all. This reports a version number of 0. We // need to create the DB here, at the latest version. This is used in // fresh installations and tests. - let client = Client::new_with_pool(resolver, &log); + let client = + Client::new_with_pool(http_resolver, native_resolver, &log); match client.check_db_is_at_expected_version().await { Ok(_) => {} Err(oximeter_db::Error::DatabaseVersionMismatch { @@ -506,12 +512,18 @@ impl OximeterAgent { // prints the results as they're received. let insertion_log = log.new(o!("component" => "results-sink")); if let Some(db_config) = db_config { - let Some(address) = db_config.address else { + let Some(http_address) = db_config.address else { return Err(Error::Standalone(anyhow!( "Must provide explicit IP address in standalone mode" ))); }; - let client = Client::new(address, &log); + + // Grab the native TCP address, or construct one from the defaults. + let native_address = + db_config.native_address.unwrap_or_else(|| { + SocketAddr::new(http_address.ip(), CLICKHOUSE_TCP_PORT) + }); + let client = Client::new(http_address, native_address, &log); let replicated = client.is_oximeter_cluster().await?; if !replicated { client.init_single_node_db().await?; diff --git a/oximeter/collector/src/bin/clickhouse-schema-updater.rs b/oximeter/collector/src/bin/clickhouse-schema-updater.rs index 8e432e87c6..3b3ac78f8a 100644 --- a/oximeter/collector/src/bin/clickhouse-schema-updater.rs +++ b/oximeter/collector/src/bin/clickhouse-schema-updater.rs @@ -12,6 +12,7 @@ use camino::Utf8PathBuf; use clap::Parser; use clap::Subcommand; use omicron_common::address::CLICKHOUSE_HTTP_PORT; +use omicron_common::address::CLICKHOUSE_TCP_PORT; use oximeter_db::model::OXIMETER_VERSION; use oximeter_db::Client; use slog::Drain; @@ -22,13 +23,20 @@ use std::net::Ipv6Addr; use std::net::SocketAddr; use std::net::SocketAddrV6; -const DEFAULT_HOST: SocketAddr = SocketAddr::V6(SocketAddrV6::new( +const DEFAULT_HTTP_HOST: SocketAddr = SocketAddr::V6(SocketAddrV6::new( Ipv6Addr::LOCALHOST, CLICKHOUSE_HTTP_PORT, 0, 0, )); +const DEFAULT_NATIVE_HOST: SocketAddr = SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::LOCALHOST, + CLICKHOUSE_TCP_PORT, + 0, + 0, +)); + fn parse_log_level(s: &str) -> anyhow::Result { s.parse().map_err(|_| anyhow!("Invalid log level")) } @@ -37,9 +45,14 @@ fn parse_log_level(s: &str) -> anyhow::Result { #[derive(Clone, Debug, Parser)] struct Args { /// IP address and port at which to access ClickHouse. - #[arg(long, default_value_t = DEFAULT_HOST, env = "CLICKHOUSE_HOST")] + #[arg(long, default_value_t = DEFAULT_HTTP_HOST, env = "CLICKHOUSE_HOST")] host: SocketAddr, + /// IP address and port at which to access ClickHouse via the native TCP + /// protocol. + #[arg(long, default_value_t = DEFAULT_NATIVE_HOST, env = "CLICKHOUSE_NATIVE_HOST")] + native_host: SocketAddr, + /// Directory from which to read schema files for each version. #[arg( short = 's', @@ -87,7 +100,7 @@ fn build_logger(level: Level) -> Logger { async fn main() -> anyhow::Result<()> { let args = Args::parse(); let log = build_logger(args.log_level); - let client = Client::new(args.host, &log); + let client = Client::new(args.host, args.native_host, &log); let is_replicated = client.is_oximeter_cluster().await?; match args.cmd { Cmd::List => { diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index 01770c9540..b2bd191feb 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -18,11 +18,10 @@ use omicron_common::api::internal::nexus::ProducerEndpoint; use omicron_common::backoff; use omicron_common::FileKv; use qorb::backend; -use qorb::resolver::AllBackends; use qorb::resolver::BoxedResolver; -use qorb::resolver::Resolver; use qorb::resolvers::dns::DnsResolver; use qorb::resolvers::dns::DnsResolverConfig; +use qorb::resolvers::single_host::SingleHostResolver; use qorb::service; use serde::Deserialize; use serde::Serialize; @@ -33,14 +32,12 @@ use slog::o; use slog::warn; use slog::Drain; use slog::Logger; -use std::collections::BTreeMap; use std::net::SocketAddr; use std::net::SocketAddrV6; use std::path::Path; use std::sync::Arc; use std::time::Duration; use thiserror::Error; -use tokio::sync::watch; use uuid::Uuid; mod agent; @@ -78,12 +75,18 @@ impl From for HttpError { /// Configuration for interacting with the metric database. #[derive(Debug, Clone, Copy, Deserialize, Serialize)] pub struct DbConfig { - /// Optional address of the ClickHouse server. + /// Optional address of the ClickHouse server's HTTP interface. /// /// If "None", will be inferred from DNS. #[serde(default, skip_serializing_if = "Option::is_none")] pub address: Option, + /// Optional address of the ClickHouse server's native TCP interface. + /// + /// If None, will be inferred from DNS. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub native_address: Option, + /// Batch size of samples at which to insert. pub batch_size: usize, @@ -114,6 +117,7 @@ impl DbConfig { fn with_address(address: SocketAddr) -> Self { Self { address: Some(address), + native_address: None, batch_size: Self::DEFAULT_BATCH_SIZE, batch_interval: Self::DEFAULT_BATCH_INTERVAL, replicated: Self::DEFAULT_REPLICATED, @@ -163,29 +167,6 @@ pub struct OximeterArguments { pub address: SocketAddrV6, } -// Provides an alternative to the DNS resolver for cases where we want to -// contact a backend without performing resolution. -struct SingleHostResolver { - tx: watch::Sender, -} - -impl SingleHostResolver { - fn new(address: SocketAddr) -> Self { - let backends = Arc::new(BTreeMap::from([( - backend::Name::new("singleton"), - backend::Backend { address }, - )])); - let (tx, _rx) = watch::channel(backends.clone()); - Self { tx } - } -} - -impl Resolver for SingleHostResolver { - fn monitor(&mut self) -> watch::Receiver { - self.tx.subscribe() - } -} - // A "qorb connector" which converts a SocketAddr into a nexus_client::Client. struct NexusConnector { log: Logger, @@ -258,35 +239,55 @@ impl Oximeter { .map(|ip| SocketAddr::new(ip, DNS_PORT)) .collect(); - let make_clickhouse_resolver = || -> BoxedResolver { - if let Some(address) = config.db.address { - Box::new(SingleHostResolver::new(address)) - } else { - let service = if config.db.replicated { + // Closure to create a single resolver. + let make_resolver = + |maybe_address, srv_name: ServiceName| -> BoxedResolver { + if let Some(address) = maybe_address { + Box::new(SingleHostResolver::new(address)) + } else { + Box::new(DnsResolver::new( + service::Name(srv_name.srv_name()), + bootstrap_dns.clone(), + DnsResolverConfig { + hardcoded_ttl: Some(tokio::time::Duration::MAX), + ..Default::default() + }, + )) + } + }; + + // Closure to create _two_ resolvers, one to resolve the ClickHouse HTTP + // SRV record, and one for the native TCP record. + // + // TODO(cleanup): This should be removed if / when we entirely switch to + // the native protocol. + let make_clickhouse_resolvers = || -> (BoxedResolver, BoxedResolver) { + let http_resolver = make_resolver( + config.db.address, + if config.db.replicated { ServiceName::ClickhouseServer } else { ServiceName::Clickhouse - }; - Box::new(DnsResolver::new( - service::Name(service.srv_name()), - bootstrap_dns.clone(), - DnsResolverConfig { - hardcoded_ttl: Some(tokio::time::Duration::MAX), - ..Default::default() - }, - )) - } + }, + ); + let native_resolver = make_resolver( + config.db.native_address, + ServiceName::ClickhouseNative, + ); + (http_resolver, native_resolver) }; let make_agent = || async { debug!(log, "creating ClickHouse client"); + let (http_resolver, native_resolver) = make_clickhouse_resolvers(); Ok(Arc::new( OximeterAgent::with_id( args.id, args.address, config.refresh_interval, config.db, - make_clickhouse_resolver(), + http_resolver, + native_resolver, &log, config.db.replicated, ) diff --git a/oximeter/db/Cargo.toml b/oximeter/db/Cargo.toml index 38b22e83e9..b0afdfbb07 100644 --- a/oximeter/db/Cargo.toml +++ b/oximeter/db/Cargo.toml @@ -24,6 +24,7 @@ futures.workspace = true gethostname.workspace = true highway.workspace = true iana-time-zone.workspace = true +indexmap.workspace = true libc.workspace = true nom.workspace = true num.workspace = true @@ -56,10 +57,6 @@ optional = true workspace = true optional = true -[dependencies.indexmap] -workspace = true -optional = true - [dependencies.peg] workspace = true optional = true @@ -100,7 +97,6 @@ features = [ "rt-multi-thread", "macros" ] camino-tempfile.workspace = true criterion = { workspace = true, features = [ "async_tokio" ] } expectorate.workspace = true -indexmap.workspace = true itertools.workspace = true omicron-test-utils.workspace = true oximeter-test-utils.workspace = true @@ -111,9 +107,8 @@ strum.workspace = true tempfile.workspace = true [features] -default = [ "native-sql", "oxql", "sql" ] +default = [ "native-sql-shell", "oxql", "sql" ] sql = [ - "dep:indexmap", "dep:reedline", "dep:rustyline", "dep:sqlformat", @@ -126,10 +121,9 @@ oxql = [ "dep:reedline", "dep:tabled", ] -native-sql = [ +native-sql-shell = [ "dep:crossterm", "dep:display-error-chain", - "dep:indexmap", "dep:reedline", "dep:rustyline", "dep:sqlformat", diff --git a/oximeter/db/src/bin/oxdb/main.rs b/oximeter/db/src/bin/oxdb/main.rs index 2f80518145..ad5018eee5 100644 --- a/oximeter/db/src/bin/oxdb/main.rs +++ b/oximeter/db/src/bin/oxdb/main.rs @@ -10,6 +10,7 @@ use anyhow::{bail, Context}; use chrono::{DateTime, Utc}; use clap::{Args, Parser}; +use omicron_common::address::CLICKHOUSE_TCP_PORT; use oximeter::{ types::{Cumulative, Sample}, Metric, Target, @@ -59,10 +60,14 @@ struct OxDb { #[clap(short, long, default_value = "::1")] address: IpAddr, - /// Port on which to connect to the database + /// Port on which to connect to the database using the HTTP interface. #[clap(short, long, default_value = "8123", action)] port: u16, + /// Port on which to connect to the database using the native TCP interface. + #[clap(short, long, default_value_t = CLICKHOUSE_TCP_PORT)] + native_port: u16, + /// Logging level #[clap(short, long, default_value = "info", value_parser = level_from_str)] log_level: Level, @@ -155,7 +160,7 @@ enum Subcommand { }, /// Start a native SQL shell to a ClickHouse server. - #[cfg(feature = "native-sql")] + #[cfg(feature = "native-sql-shell")] NativeSql, } @@ -214,12 +219,13 @@ async fn insert_samples( async fn populate( address: IpAddr, - port: u16, + http_port: u16, + native_port: u16, log: Logger, args: PopulateArgs, ) -> Result<(), anyhow::Error> { info!(log, "populating Oximeter database"); - let client = make_client(address, port, &log).await?; + let client = make_client(address, http_port, native_port, &log).await?; let n_timeseries = args.n_projects * args.n_instances * args.n_cpus; debug!( log, @@ -268,23 +274,26 @@ async fn populate( async fn wipe_single_node_db( address: IpAddr, - port: u16, + http_port: u16, + native_port: u16, log: Logger, ) -> Result<(), anyhow::Error> { - let client = make_client(address, port, &log).await?; + let client = make_client(address, http_port, native_port, &log).await?; client.wipe_single_node_db().await.context("Failed to wipe database") } +#[allow(clippy::too_many_arguments)] async fn query( address: IpAddr, - port: u16, + http_port: u16, + native_port: u16, log: Logger, timeseries_name: String, filters: Vec, start: Option, end: Option, ) -> Result<(), anyhow::Error> { - let client = make_client(address, port, &log).await?; + let client = make_client(address, http_port, native_port, &log).await?; let filters = filters.iter().map(|s| s.as_str()).collect::>(); let timeseries = client .select_timeseries_with( @@ -316,10 +325,18 @@ async fn main() -> anyhow::Result<()> { match args.cmd { Subcommand::Describe => describe_data(), Subcommand::Populate { populate_args } => { - populate(args.address, args.port, log, populate_args).await? + populate( + args.address, + args.port, + args.native_port, + log, + populate_args, + ) + .await? } Subcommand::Wipe => { - wipe_single_node_db(args.address, args.port, log).await? + wipe_single_node_db(args.address, args.port, args.native_port, log) + .await? } Subcommand::Query { timeseries_name, @@ -342,6 +359,7 @@ async fn main() -> anyhow::Result<()> { query( args.address, args.port, + args.native_port, log, timeseries_name, filters, @@ -352,17 +370,30 @@ async fn main() -> anyhow::Result<()> { } #[cfg(feature = "sql")] Subcommand::Sql { opts } => { - oximeter_db::shells::sql::shell(args.address, args.port, log, opts) - .await? + oximeter_db::shells::sql::shell( + args.address, + args.port, + args.native_port, + log, + opts, + ) + .await? } #[cfg(feature = "oxql")] Subcommand::Oxql { opts } => { - oximeter_db::shells::oxql::shell(args.address, args.port, log, opts) - .await? + oximeter_db::shells::oxql::shell( + args.address, + args.port, + args.native_port, + log, + opts, + ) + .await? } - #[cfg(feature = "native-sql")] + #[cfg(feature = "native-sql-shell")] Subcommand::NativeSql => { - oximeter_db::shells::native::shell(args.address).await? + oximeter_db::shells::native::shell(args.address, args.native_port) + .await? } } Ok(()) diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index 22caaa7312..0a09da8abe 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -17,6 +17,7 @@ pub use self::dbwrite::DbWrite; pub use self::dbwrite::TestDbWrite; use crate::client::query_summary::QuerySummary; use crate::model; +use crate::native; use crate::query; use crate::Error; use crate::Metric; @@ -39,6 +40,7 @@ use qorb::backend; use qorb::backend::Error as QorbError; use qorb::pool::Pool; use qorb::resolver::BoxedResolver; +use qorb::resolvers::single_host::SingleHostResolver; use regex::Regex; use regex::RegexBuilder; use slog::debug; @@ -170,14 +172,21 @@ impl ClientVariant { pub struct Client { _id: Uuid, log: Logger, + // Source for creating HTTP connections. source: ClientSource, + // qorb pool for native TCP connections. + native_pool: DebugIgnore, schema: Mutex>, request_timeout: Duration, } impl Client { /// Construct a Clickhouse client of the database with a connection pool. - pub fn new_with_pool(resolver: BoxedResolver, log: &Logger) -> Self { + pub fn new_with_pool( + http_resolver: BoxedResolver, + native_resolver: BoxedResolver, + log: &Logger, + ) -> Self { let id = Uuid::new_v4(); let log = log.new(slog::o!( "component" => "clickhouse-client", @@ -190,25 +199,40 @@ impl Client { log, source: ClientSource::Pool { pool: DebugIgnore(Pool::new( - resolver, + http_resolver, Arc::new(ReqwestConnector {}), qorb::policy::Policy::default(), )), }, + native_pool: DebugIgnore(Pool::new( + native_resolver, + Arc::new(native::connection::Connector), + Default::default(), + )), schema, request_timeout, } } /// Construct a new ClickHouse client of the database at `address`. - pub fn new(address: SocketAddr, log: &Logger) -> Self { - Self::new_with_request_timeout(address, log, DEFAULT_REQUEST_TIMEOUT) + pub fn new( + http_address: SocketAddr, + native_address: SocketAddr, + log: &Logger, + ) -> Self { + Self::new_with_request_timeout( + http_address, + native_address, + log, + DEFAULT_REQUEST_TIMEOUT, + ) } /// Construct a new ClickHouse client of the database at `address`, and a /// custom request timeout. pub fn new_with_request_timeout( - address: SocketAddr, + http_address: SocketAddr, + native_address: SocketAddr, log: &Logger, request_timeout: Duration, ) -> Self { @@ -218,12 +242,17 @@ impl Client { "id" => id.to_string(), )); let client = reqwest::Client::new(); - let url = format!("http://{}", address); + let url = format!("http://{}", http_address); let schema = Mutex::new(BTreeMap::new()); Self { _id: id, log, source: ClientSource::Static(ReqwestClient { url, client }), + native_pool: DebugIgnore(Pool::new( + Box::new(SingleHostResolver::new(native_address)), + Arc::new(native::connection::Connector), + Default::default(), + )), schema, request_timeout, } @@ -240,20 +269,12 @@ impl Client { } } - /// Ping the ClickHouse server to verify connectivitiy. + /// Ping the ClickHouse server to verify connectivity. pub async fn ping(&self) -> Result<(), Error> { - let client = ClientVariant::new(&self.source).await?; - - handle_db_response( - client - .reqwest() - .get(format!("{}/ping", client.url())) - .send() - .await - .map_err(|err| Error::DatabaseUnavailable(err.to_string()))?, - ) - .await?; - debug!(self.log, "successful ping of ClickHouse server"); + let mut handle = self.native_pool.claim().await?; + trace!(self.log, "acquired native pool claim"); + handle.ping().await.map_err(Error::Native)?; + trace!(self.log, "successful ping of ClickHouse server"); Ok(()) } @@ -1162,7 +1183,7 @@ impl Client { self.expunge_timeseries_by_name_once(replicated, to_delete) .await .map_err(|err| match err { - Error::DatabaseUnavailable(_) => { + Error::DatabaseUnavailable(_) | Error::Connection(_) => { backoff::BackoffError::transient(err) } _ => backoff::BackoffError::permanent(err), @@ -1500,7 +1521,8 @@ mod tests { let logctx = test_setup_log("test_replicated"); let mut cluster = create_cluster(&logctx).await; let address = cluster.http_address().into(); - let client = Client::new(address, &logctx.log); + let native_address = cluster.native_address().into(); + let client = Client::new(address, native_address, &logctx.log); let futures: Vec<(&'static str, AsyncTest)> = vec![ ( "test_is_oximeter_cluster_replicated", @@ -1656,7 +1678,8 @@ mod tests { for (test_name, mut test) in futures { let testctx = test_setup_log(test_name); init_db(&cluster, &client).await; - test(&cluster, Client::new(address, &logctx.log)).await; + test(&cluster, Client::new(address, native_address, &logctx.log)) + .await; wipe_db(&cluster, &client).await; testctx.cleanup_successful(); } @@ -1665,14 +1688,34 @@ mod tests { } #[tokio::test] - async fn bad_db_connection_test() { - let logctx = test_setup_log("test_bad_db_connection"); + async fn cannot_ping_nonexistent_server() { + let logctx = test_setup_log("cannot_ping_nonexistent_server"); let log = &logctx.log; - let client = Client::new("127.0.0.1:443".parse().unwrap(), &log); - assert!(matches!( - client.ping().await, - Err(Error::DatabaseUnavailable(_)) - )); + let dont_care = "127.0.0.1:443".parse().unwrap(); + let bad_addr = "[::1]:80".parse().unwrap(); + let client = Client::new(dont_care, bad_addr, &log); + let e = client + .ping() + .await + .expect_err("Should fail to ping non-existent server"); + let Error::Connection(qorb::pool::Error::TimedOut) = &e else { + panic!("Expected connection error, found {e:?}"); + }; + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn can_ping_clickhouse() { + let logctx = test_setup_log("can_ping_clickhouse"); + let mut db = + ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); + client.ping().await.expect("Should be able to ping existing server"); + db.cleanup().await.unwrap(); logctx.cleanup_successful(); } @@ -1681,7 +1724,11 @@ mod tests { let logctx = test_setup_log("test_is_oximeter_cluster"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_is_oximeter_cluster_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -1704,7 +1751,11 @@ mod tests { let logctx = test_setup_log("test_insert_samples"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_insert_samples_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -1753,7 +1804,11 @@ mod tests { let logctx = test_setup_log("test_schema_mismatch"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_schema_mismatch_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -1786,7 +1841,11 @@ mod tests { let logctx = test_setup_log("test_schema_update"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_schema_updated_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -1865,7 +1924,11 @@ mod tests { let logctx = test_setup_log("test_client_select_timeseries_one"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_client_select_timeseries_one_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -1953,7 +2016,11 @@ mod tests { let logctx = test_setup_log("test_field_record_cont"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_field_record_count_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2011,7 +2078,11 @@ mod tests { let logctx = test_setup_log("test_unquoted_64bit_integers"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_unquoted_64bit_integers_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2045,7 +2116,11 @@ mod tests { let logctx = test_setup_log("test_differentiate_by_timeseries_name"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_differentiate_by_timeseries_name_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2115,7 +2190,11 @@ mod tests { let logctx = test_setup_log("test_select_timeseries_with_select_one"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_select_timeseries_with_select_one_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2180,7 +2259,11 @@ mod tests { ); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_select_timeseries_with_select_one_field_with_multiple_values_impl( &db, client, @@ -2253,7 +2336,11 @@ mod tests { test_setup_log("test_select_timeseries_with_select_multiple_fields_with_multiple_values"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_select_timeseries_with_select_multiple_fields_with_multiple_values_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2329,7 +2416,11 @@ mod tests { let logctx = test_setup_log("test_select_timeseries_with_all"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_select_timeseries_with_all_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2390,7 +2481,11 @@ mod tests { let logctx = test_setup_log("test_select_timeseries_with_start_time"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_select_timeseries_with_start_time_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2441,7 +2536,11 @@ mod tests { let logctx = test_setup_log("test_select_timeseries_with_limit"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_select_timeseries_with_limit_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2560,7 +2659,11 @@ mod tests { let logctx = test_setup_log("test_select_timeseries_with_order"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_select_timeseries_with_order_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2662,7 +2765,11 @@ mod tests { let logctx = test_setup_log("test_get_schema_no_new_values"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_get_schema_no_new_values_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2690,7 +2797,11 @@ mod tests { let logctx = test_setup_log("test_timeseries_schema_list"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_timeseries_schema_list_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2729,7 +2840,11 @@ mod tests { let logctx = test_setup_log("test_list_timeseries"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_list_timeseries_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3304,7 +3419,11 @@ mod tests { let logctx = test_setup_log("test_recall_of_all_fields"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_recall_of_all_fields_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3360,7 +3479,11 @@ mod tests { test_setup_log("test_database_version_update_is_idempotent"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); // NOTE: We don't init the DB, because the test explicitly tests that. test_database_version_update_is_idempotent_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3401,7 +3524,11 @@ mod tests { let logctx = test_setup_log("test_database_version_will_not_downgrade"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); // NOTE: We don't init the DB, because the test explicitly tests that. test_database_version_will_not_downgrade_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3440,7 +3567,11 @@ mod tests { let logctx = test_setup_log("test_database_version_wipes_old_version"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); // NOTE: We don't init the DB, because the test explicitly tests that. test_database_version_wipes_old_version_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3480,7 +3611,11 @@ mod tests { let logctx = test_setup_log("test_update_schema_cache_on_new_sample"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_update_schema_cache_on_new_sample_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3530,7 +3665,11 @@ mod tests { let logctx = test_setup_log("test_select_all_datum_types"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_select_all_datum_types_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3569,7 +3708,11 @@ mod tests { test_setup_log("test_new_schema_removed_when_not_inserted"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_new_schema_removed_when_not_inserted_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3787,14 +3930,15 @@ mod tests { async fn test_apply_one_schema_upgrade_impl( log: &Logger, - address: SocketAddr, + http_address: SocketAddr, + native_address: SocketAddr, replicated: bool, ) { let test_name = format!( "test_apply_one_schema_upgrade_{}", if replicated { "replicated" } else { "single_node" } ); - let client = Client::new(address, &log); + let client = Client::new(http_address, native_address, &log); // We'll test moving from version 1, which just creates a database and // table, to version 2, which adds two columns to that table in @@ -3871,7 +4015,13 @@ mod tests { let log = &logctx.log; let mut cluster = create_cluster(&logctx).await; let address = cluster.http_address().into(); - test_apply_one_schema_upgrade_impl(log, address, true).await; + test_apply_one_schema_upgrade_impl( + log, + address, + cluster.native_address().into(), + true, + ) + .await; cluster.cleanup().await.expect("Failed to cleanup ClickHouse cluster"); logctx.cleanup_successful(); } @@ -3885,7 +4035,13 @@ mod tests { .await .expect("Failed to start ClickHouse"); let address = db.http_address().into(); - test_apply_one_schema_upgrade_impl(log, address, false).await; + test_apply_one_schema_upgrade_impl( + log, + address, + db.native_address().into(), + false, + ) + .await; db.cleanup().await.expect("Failed to cleanup ClickHouse server"); logctx.cleanup_successful(); } @@ -3899,7 +4055,7 @@ mod tests { .await .expect("Failed to start ClickHouse"); let address = db.http_address().into(); - let client = Client::new(address, &log); + let client = Client::new(address, db.native_address().into(), &log); const REPLICATED: bool = false; client .initialize_db_with_version( @@ -3942,7 +4098,7 @@ mod tests { .await .expect("Failed to start ClickHouse"); let address = db.http_address().into(); - let client = Client::new(address, &log); + let client = Client::new(address, db.native_address().into(), &log); const REPLICATED: bool = false; client .initialize_db_with_version( @@ -3976,14 +4132,15 @@ mod tests { async fn test_ensure_schema_walks_through_multiple_steps_impl( log: &Logger, - address: SocketAddr, + http_address: SocketAddr, + native_address: SocketAddr, replicated: bool, ) { let test_name = format!( "test_ensure_schema_walks_through_multiple_steps_{}", if replicated { "replicated" } else { "single_node" } ); - let client = Client::new(address, &log); + let client = Client::new(http_address, native_address, &log); // We need to actually have the oximeter DB here, and the version table, // since `ensure_schema()` writes out versions to the DB as they're @@ -4076,7 +4233,10 @@ mod tests { .expect("Failed to start ClickHouse"); let address = db.http_address().into(); test_ensure_schema_walks_through_multiple_steps_impl( - log, address, false, + log, + address, + db.native_address().into(), + false, ) .await; db.cleanup().await.expect("Failed to cleanup ClickHouse server"); @@ -4092,7 +4252,10 @@ mod tests { let mut cluster = create_cluster(&logctx).await; let address = cluster.http_address().into(); test_ensure_schema_walks_through_multiple_steps_impl( - log, address, true, + log, + address, + cluster.native_address().into(), + true, ) .await; cluster.cleanup().await.expect("Failed to clean up ClickHouse cluster"); @@ -4172,7 +4335,7 @@ mod tests { .await .expect("Failed to start ClickHouse"); let address = db.http_address().into(); - let client = Client::new(address, &log); + let client = Client::new(address, db.native_address().into(), &log); client .init_single_node_db() .await @@ -4204,7 +4367,7 @@ mod tests { .await .expect("Failed to start ClickHouse"); let address = db.http_address().into(); - let client = Client::new(address, &log); + let client = Client::new(address, db.native_address().into(), &log); client .initialize_db_with_version(false, OXIMETER_VERSION) .await @@ -4353,7 +4516,11 @@ mod tests { .await .expect("Failed to start ClickHouse") }; - let client = Client::new(db.http_address().into(), &log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &log, + ); // Let's start with version 2, which is the first tracked and contains // the full SQL files we need to populate the DB. @@ -4546,6 +4713,7 @@ mod tests { test_expunge_timeseries_by_name_impl( log, db.http_address().into(), + db.native_address().into(), false, ) .await; @@ -4559,7 +4727,13 @@ mod tests { let logctx = test_setup_log(TEST_NAME); let mut cluster = create_cluster(&logctx).await; let address = cluster.http_address().into(); - test_expunge_timeseries_by_name_impl(&logctx.log, address, true).await; + test_expunge_timeseries_by_name_impl( + &logctx.log, + address, + cluster.native_address().into(), + true, + ) + .await; cluster.cleanup().await.expect("Failed to cleanup ClickHouse cluster"); logctx.cleanup_successful(); } @@ -4568,11 +4742,12 @@ mod tests { // upgrade. async fn test_expunge_timeseries_by_name_impl( log: &Logger, - address: SocketAddr, + http_address: SocketAddr, + native_address: SocketAddr, replicated: bool, ) { usdt::register_probes().unwrap(); - let client = Client::new(address, &log); + let client = Client::new(http_address, native_address, &log); const STARTING_VERSION: u64 = 1; const NEXT_VERSION: u64 = 2; diff --git a/oximeter/db/src/client/oxql.rs b/oximeter/db/src/client/oxql.rs index 4fdfc71b76..d9f3295375 100644 --- a/oximeter/db/src/client/oxql.rs +++ b/oximeter/db/src/client/oxql.rs @@ -1259,7 +1259,11 @@ mod tests { let db = ClickHouseDeployment::new_single_node(&logctx) .await .expect("Failed to start ClickHouse"); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); client .init_single_node_db() .await diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index d7d3c3e730..9e13cd64e0 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -34,12 +34,16 @@ use thiserror::Error; mod client; pub mod model; -#[cfg(feature = "native-sql")] pub mod native; #[cfg(any(feature = "oxql", test))] pub mod oxql; pub mod query; -#[cfg(any(feature = "oxql", feature = "sql", feature = "native-sql", test))] +#[cfg(any( + feature = "oxql", + feature = "sql", + feature = "native-sql-shell", + test +))] pub mod shells; #[cfg(any(feature = "sql", test))] pub mod sql; @@ -163,6 +167,9 @@ pub enum Error { #[cfg(any(feature = "oxql", test))] #[error(transparent)] Oxql(oxql::Error), + + #[error("Native protocol error")] + Native(#[source] crate::native::Error), } #[cfg(any(feature = "oxql", test))] @@ -258,11 +265,13 @@ pub struct TimeseriesPageSelector { /// Create a client to the timeseries database, and ensure the database exists. pub async fn make_client( address: IpAddr, - port: u16, + http_port: u16, + native_port: u16, log: &Logger, ) -> Result { - let address = SocketAddr::new(address, port); - let client = Client::new(address, &log); + let http_address = SocketAddr::new(address, http_port); + let native_address = SocketAddr::new(address, native_port); + let client = Client::new(http_address, native_address, &log); client .init_single_node_db() .await diff --git a/oximeter/db/src/native/block.rs b/oximeter/db/src/native/block.rs index a51b27352f..34fbaa4c25 100644 --- a/oximeter/db/src/native/block.rs +++ b/oximeter/db/src/native/block.rs @@ -7,21 +7,25 @@ //! Types for working with actual blocks and columns of data. use super::Error; -use chrono::{DateTime, NaiveDate}; +use chrono::DateTime; +use chrono::NaiveDate; use chrono_tz::Tz; use indexmap::IndexMap; -use nom::{ - bytes::complete::{tag, take_while1}, - character::complete::u8 as nom_u8, - combinator::{eof, map, map_opt, opt}, - sequence::{delimited, preceded, tuple}, - IResult, -}; -use std::{ - fmt, - net::{Ipv4Addr, Ipv6Addr}, - sync::LazyLock, -}; +use nom::bytes::complete::tag; +use nom::bytes::complete::take_while1; +use nom::character::complete::u8 as nom_u8; +use nom::combinator::eof; +use nom::combinator::map; +use nom::combinator::map_opt; +use nom::combinator::opt; +use nom::sequence::delimited; +use nom::sequence::preceded; +use nom::sequence::tuple; +use nom::IResult; +use std::fmt; +use std::net::Ipv4Addr; +use std::net::Ipv6Addr; +use std::sync::LazyLock; use uuid::Uuid; /// A set of rows and columns. @@ -740,12 +744,17 @@ impl std::str::FromStr for DataType { #[cfg(test)] mod tests { - use super::{ - Block, BlockInfo, Column, DataType, Precision, ValueArray, - DEFAULT_TIMEZONE, - }; - use crate::native::block::{datetime, datetime64}; - use chrono::{SubsecRound as _, Utc}; + use super::Block; + use super::BlockInfo; + use super::Column; + use super::DataType; + use super::Precision; + use super::ValueArray; + use super::DEFAULT_TIMEZONE; + use crate::native::block::datetime; + use crate::native::block::datetime64; + use chrono::SubsecRound as _; + use chrono::Utc; use chrono_tz::Tz; use indexmap::IndexMap; diff --git a/oximeter/db/src/native/connection.rs b/oximeter/db/src/native/connection.rs index f6367ca126..911788a91f 100644 --- a/oximeter/db/src/native/connection.rs +++ b/oximeter/db/src/native/connection.rs @@ -6,29 +6,81 @@ //! A connection and pool for talking to the ClickHouse server. +use super::io::packet::client::Encoder; +use super::io::packet::server::Decoder; +use super::packets::client::Packet as ClientPacket; +use super::packets::client::Query; +use super::packets::client::QueryResult; +use super::packets::client::OXIMETER_HELLO; +use super::packets::client::VERSION_MAJOR; +use super::packets::client::VERSION_MINOR; +use super::packets::client::VERSION_PATCH; +use super::packets::server::Hello as ServerHello; use super::packets::server::Packet as ServerPacket; -use super::packets::{ - client::{Packet as ClientPacket, Query, QueryResult}, - server::Progress, -}; -use super::{ - io::packet::{client::Encoder, server::Decoder}, - packets::{ - client::{OXIMETER_HELLO, VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}, - server::{Hello as ServerHello, REVISION}, - }, - Error, -}; +use super::packets::server::Progress; +use super::packets::server::REVISION; +use super::Error; use crate::native::probes; -use futures::{SinkExt as _, StreamExt as _}; +use futures::SinkExt as _; +use futures::StreamExt as _; +use qorb::backend; +use qorb::backend::Error as QorbError; use std::net::SocketAddr; -use tokio::net::{ - tcp::{OwnedReadHalf, OwnedWriteHalf}, - TcpStream, -}; -use tokio_util::codec::{FramedRead, FramedWrite}; +use tokio::net::tcp::OwnedReadHalf; +use tokio::net::tcp::OwnedWriteHalf; +use tokio::net::TcpStream; +use tokio_util::codec::FramedRead; +use tokio_util::codec::FramedWrite; use uuid::Uuid; +/// A pool of connections to a ClickHouse server over the native protocol. +pub type Pool = qorb::pool::Pool; + +/// A type for making connections to a ClickHouse server. +#[derive(Clone, Copy, Debug)] +pub struct Connector; + +impl From for QorbError { + fn from(e: Error) -> Self { + QorbError::Other(anyhow::anyhow!(e)) + } +} + +#[async_trait::async_trait] +impl backend::Connector for Connector { + type Connection = Connection; + + async fn connect( + &self, + backend: &backend::Backend, + ) -> Result { + Connection::new(backend.address).await.map_err(QorbError::from) + } + + async fn is_valid( + &self, + conn: &mut Self::Connection, + ) -> Result<(), QorbError> { + conn.ping().await.map_err(QorbError::from) + } + + async fn on_recycle( + &self, + conn: &mut Self::Connection, + ) -> Result<(), QorbError> { + // We try to cancel an outstanding query. But if there is _no_ + // outstanding query, we sill want to run the validation check of + // pinging the server. That notifies `qorb` if the server is alive in + // the case that there was no query to cancel + if conn.cancel().await.map_err(QorbError::from)? { + Ok(()) + } else { + // No query, so let's run the validation check. + self.is_valid(conn).await + } + } +} + /// A connection to a ClickHouse server. /// /// This connection object can be used to talk to a ClickHouse server through @@ -122,18 +174,25 @@ impl Connection { Err(Error::UnexpectedPacket(packet.kind())) } Some(Err(e)) => Err(e), - None => Err(Error::Disconnected), + None => { + probes::disconnected!(|| ()); + Err(Error::Disconnected) + } } } // Cancel a running query, if one exists. - async fn cancel(&mut self) -> Result<(), Error> { + // + // This returns an error if there is a query and we could not cancel it for + // some reason. It returns `Ok(true)` if we successfully canceled the query, + // or `Ok(false)` if there was no query to cancel at all. + async fn cancel(&mut self) -> Result { if self.outstanding_query { self.writer.send(ClientPacket::Cancel).await?; // Await EOS, throwing everything else away except errors. let res = loop { match self.reader.next().await { - Some(Ok(ServerPacket::EndOfStream)) => break Ok(()), + Some(Ok(ServerPacket::EndOfStream)) => break Ok(true), Some(Ok(other_packet)) => { probes::unexpected__server__packet!( || other_packet.kind() @@ -146,7 +205,7 @@ impl Connection { self.outstanding_query = false; return res; } - Ok(()) + Ok(false) } /// Send a SQL query, possibly with data. @@ -227,16 +286,14 @@ impl Connection { #[cfg(test)] mod tests { + use crate::native::block::DataType; + use crate::native::block::ValueArray; + use crate::native::connection::Connection; + use omicron_test_utils::dev::clickhouse::ClickHouseDeployment; + use omicron_test_utils::dev::test_setup_log; use std::sync::Arc; - - use crate::native::{ - block::{DataType, ValueArray}, - connection::Connection, - }; - use omicron_test_utils::dev::{ - clickhouse::ClickHouseDeployment, test_setup_log, - }; - use tokio::sync::{oneshot, Mutex}; + use tokio::sync::oneshot; + use tokio::sync::Mutex; #[tokio::test] async fn test_exchange_hello() { diff --git a/oximeter/db/src/native/io/block.rs b/oximeter/db/src/native/io/block.rs index e6ab6c8ba1..2a3645a150 100644 --- a/oximeter/db/src/native/io/block.rs +++ b/oximeter/db/src/native/io/block.rs @@ -6,10 +6,13 @@ //! Encoding and decoding data blocks. -use crate::native::block::{Block, BlockInfo}; +use crate::native::block::Block; +use crate::native::block::BlockInfo; use crate::native::io; use crate::native::Error; -use bytes::{Buf as _, BufMut as _, BytesMut}; +use bytes::Buf as _; +use bytes::BufMut as _; +use bytes::BytesMut; use indexmap::IndexMap; /// Encode a data packet to the server. @@ -102,7 +105,8 @@ fn encode_block_info(info: BlockInfo, mut dst: &mut BytesMut) { #[cfg(test)] mod tests { use super::*; - use crate::native::block::{Column, ValueArray}; + use crate::native::block::Column; + use crate::native::block::ValueArray; // Expected data block. // diff --git a/oximeter/db/src/native/io/column.rs b/oximeter/db/src/native/io/column.rs index 649d6a044d..2c47029ba7 100644 --- a/oximeter/db/src/native/io/column.rs +++ b/oximeter/db/src/native/io/column.rs @@ -6,13 +6,19 @@ //! Encode / decode a column. -use crate::native::{ - block::{Column, DataType, ValueArray}, - io, Error, -}; -use bytes::{Buf as _, BufMut as _, BytesMut}; -use chrono::{NaiveDate, TimeDelta, TimeZone}; -use std::net::{Ipv4Addr, Ipv6Addr}; +use crate::native::block::Column; +use crate::native::block::DataType; +use crate::native::block::ValueArray; +use crate::native::io; +use crate::native::Error; +use bytes::Buf as _; +use bytes::BufMut as _; +use bytes::BytesMut; +use chrono::NaiveDate; +use chrono::TimeDelta; +use chrono::TimeZone; +use std::net::Ipv4Addr; +use std::net::Ipv6Addr; use uuid::Uuid; // ClickHouse `Date`s are represented as an unsigned 16-bit number of days from diff --git a/oximeter/db/src/native/io/exception.rs b/oximeter/db/src/native/io/exception.rs index 6d35e9d429..31c6ddd23c 100644 --- a/oximeter/db/src/native/io/exception.rs +++ b/oximeter/db/src/native/io/exception.rs @@ -6,7 +6,9 @@ //! Decode server exception packets. -use crate::native::{io, packets::server::Exception, Error}; +use crate::native::io; +use crate::native::packets::server::Exception; +use crate::native::Error; use bytes::Buf as _; /// Decode a list of Exception packets from the server, if possible. diff --git a/oximeter/db/src/native/io/packet/client.rs b/oximeter/db/src/native/io/packet/client.rs index 31880348ae..c8397d68a2 100644 --- a/oximeter/db/src/native/io/packet/client.rs +++ b/oximeter/db/src/native/io/packet/client.rs @@ -7,16 +7,18 @@ //! Encode client packets destined for the server. use crate::native::block::Block; -use crate::native::packets::client::{ - ClientInfo, Query, QueryKind, Settings, Stage, -}; +use crate::native::io; +use crate::native::packets::client::ClientInfo; +use crate::native::packets::client::Hello; +use crate::native::packets::client::Packet; +use crate::native::packets::client::Query; +use crate::native::packets::client::QueryKind; +use crate::native::packets::client::Settings; +use crate::native::packets::client::Stage; use crate::native::probes; -use crate::native::{ - io, - packets::client::{Hello, Packet}, - Error, -}; -use bytes::{BufMut as _, BytesMut}; +use crate::native::Error; +use bytes::BufMut as _; +use bytes::BytesMut; /// Encoder for client packets. #[derive(Clone, Copy, Debug)] diff --git a/oximeter/db/src/native/io/packet/server.rs b/oximeter/db/src/native/io/packet/server.rs index 00d0352857..0ef6d96d4b 100644 --- a/oximeter/db/src/native/io/packet/server.rs +++ b/oximeter/db/src/native/io/packet/server.rs @@ -6,12 +6,14 @@ //! Decode packets from the ClickHouse server. -use crate::native::{ - io, - packets::server::{Hello, Packet, PasswordComplexityRule}, - probes, Error, -}; -use bytes::{Buf as _, BytesMut}; +use crate::native::io; +use crate::native::packets::server::Hello; +use crate::native::packets::server::Packet; +use crate::native::packets::server::PasswordComplexityRule; +use crate::native::probes; +use crate::native::Error; +use bytes::Buf as _; +use bytes::BytesMut; /// A decoder for packets from the ClickHouse server. #[derive(Debug)] @@ -199,7 +201,9 @@ mod tests { use std::time::Duration; use super::*; - use crate::native::packets::server::{Exception, Progress, REVISION}; + use crate::native::packets::server::Exception; + use crate::native::packets::server::Progress; + use crate::native::packets::server::REVISION; use bytes::BufMut as _; use tokio_util::codec::Decoder as _; diff --git a/oximeter/db/src/native/io/progress.rs b/oximeter/db/src/native/io/progress.rs index c60b50cb75..e6e8586c68 100644 --- a/oximeter/db/src/native/io/progress.rs +++ b/oximeter/db/src/native/io/progress.rs @@ -6,7 +6,8 @@ //! Decode progress packets from the server. -use crate::native::{io, packets::server::Progress}; +use crate::native::io; +use crate::native::packets::server::Progress; use std::time::Duration; /// Decode a progress packet from the server, if possible. diff --git a/oximeter/db/src/native/io/string.rs b/oximeter/db/src/native/io/string.rs index e93ba67c2d..c8b838601a 100644 --- a/oximeter/db/src/native/io/string.rs +++ b/oximeter/db/src/native/io/string.rs @@ -12,7 +12,8 @@ use super::varuint; use crate::native::Error; -use bytes::{Buf, BufMut}; +use bytes::Buf; +use bytes::BufMut; /// Encode a string into the ClickHouse format. pub fn encode(s: impl AsRef, mut buf: impl BufMut) { diff --git a/oximeter/db/src/native/io/varuint.rs b/oximeter/db/src/native/io/varuint.rs index 0476c83aff..1b9e561c3f 100644 --- a/oximeter/db/src/native/io/varuint.rs +++ b/oximeter/db/src/native/io/varuint.rs @@ -15,7 +15,8 @@ //! iterating over the values (especially since ClickHouse does not append a //! fixed-size length header to its messages). -use bytes::{Buf, BufMut}; +use bytes::Buf; +use bytes::BufMut; /// Encode a u64 as a variable-length integer, returning the number of bytes /// written. diff --git a/oximeter/db/src/native/mod.rs b/oximeter/db/src/native/mod.rs index 15d013fa35..9bddd6ad5c 100644 --- a/oximeter/db/src/native/mod.rs +++ b/oximeter/db/src/native/mod.rs @@ -120,6 +120,7 @@ //! actually sent if we believe we have an outstanding query. pub use connection::Connection; +pub use connection::Pool; pub use packets::client::QueryResult; pub use packets::server::Exception; @@ -136,6 +137,9 @@ mod probes { /// Emitted when we receive a packet from the server, with its kind. fn packet__received(kind: &str) {} + /// Emitted when we learn we've been disconnected from the server. + fn disconnected() {} + /// Emitted when we receive a data packet, with details about the size and /// data types for each column. fn data__packet__received( diff --git a/oximeter/db/src/native/packets/client.rs b/oximeter/db/src/native/packets/client.rs index 759aaabe6e..7d32ba11d8 100644 --- a/oximeter/db/src/native/packets/client.rs +++ b/oximeter/db/src/native/packets/client.rs @@ -6,11 +6,13 @@ //! Packets sent from client to server. -use super::server::{ProfileInfo, Progress}; +use super::server::ProfileInfo; +use super::server::Progress; use crate::native::block::Block; -use std::{ - borrow::Cow, collections::BTreeMap, net::SocketAddr, sync::LazyLock, -}; +use std::borrow::Cow; +use std::collections::BTreeMap; +use std::net::SocketAddr; +use std::sync::LazyLock; use uuid::Uuid; /// A packet sent from client to server in the native protocol. diff --git a/oximeter/db/src/native/packets/server.rs b/oximeter/db/src/native/packets/server.rs index 098e120cfa..864d9397cf 100644 --- a/oximeter/db/src/native/packets/server.rs +++ b/oximeter/db/src/native/packets/server.rs @@ -6,7 +6,8 @@ //! Packets sent from the server. -use std::{fmt, time::Duration}; +use std::fmt; +use std::time::Duration; use crate::native::block::Block; diff --git a/oximeter/db/src/shells/mod.rs b/oximeter/db/src/shells/mod.rs index eb9a9bd39a..ccad0010aa 100644 --- a/oximeter/db/src/shells/mod.rs +++ b/oximeter/db/src/shells/mod.rs @@ -11,7 +11,7 @@ use dropshot::EmptyScanParams; use dropshot::WhichPage; use oximeter::TimeseriesSchema; -#[cfg(any(feature = "native-sql", test))] +#[cfg(any(feature = "native-sql-shell", test))] pub mod native; #[cfg(any(feature = "oxql", test))] pub mod oxql; diff --git a/oximeter/db/src/shells/native.rs b/oximeter/db/src/shells/native.rs index c0e86367da..f513435275 100644 --- a/oximeter/db/src/shells/native.rs +++ b/oximeter/db/src/shells/native.rs @@ -10,15 +10,14 @@ use crate::native::{self, block::ValueArray, QueryResult}; use anyhow::Context as _; use crossterm::style::Stylize; use display_error_chain::DisplayErrorChain; -use omicron_common::address::CLICKHOUSE_TCP_PORT; use reedline::{DefaultPrompt, DefaultPromptSegment, Reedline, Signal}; use std::net::{IpAddr, SocketAddr}; use tabled::{builder::Builder, settings::Style}; /// Run the native SQL shell. -pub async fn shell(addr: IpAddr) -> anyhow::Result<()> { +pub async fn shell(addr: IpAddr, port: u16) -> anyhow::Result<()> { usdt::register_probes()?; - let addr = SocketAddr::new(addr, CLICKHOUSE_TCP_PORT); + let addr = SocketAddr::new(addr, port); let mut conn = native::Connection::new(addr) .await .context("Trying to connect to ClickHouse server")?; diff --git a/oximeter/db/src/shells/oxql.rs b/oximeter/db/src/shells/oxql.rs index f46d08c0cf..909b4916ac 100644 --- a/oximeter/db/src/shells/oxql.rs +++ b/oximeter/db/src/shells/oxql.rs @@ -32,12 +32,13 @@ pub struct ShellOptions { /// Run/execute the OxQL shell. pub async fn shell( address: IpAddr, - port: u16, + http_port: u16, + native_port: u16, log: Logger, opts: ShellOptions, ) -> anyhow::Result<()> { // Create the client. - let client = make_client(address, port, &log).await?; + let client = make_client(address, http_port, native_port, &log).await?; // A workaround to ensure the client has all available timeseries when the // shell starts. diff --git a/oximeter/db/src/shells/sql.rs b/oximeter/db/src/shells/sql.rs index f75713da3b..4d8c332aaf 100644 --- a/oximeter/db/src/shells/sql.rs +++ b/oximeter/db/src/shells/sql.rs @@ -50,11 +50,12 @@ impl Default for ShellOptions { /// Run/execute the SQL shell. pub async fn shell( address: IpAddr, - port: u16, + http_port: u16, + native_port: u16, log: Logger, opts: ShellOptions, ) -> anyhow::Result<()> { - let client = make_client(address, port, &log).await?; + let client = make_client(address, http_port, native_port, &log).await?; // A workaround to ensure the client has all available timeseries when the // shell starts. diff --git a/oximeter/db/tests/integration_test.rs b/oximeter/db/tests/integration_test.rs index 35f96dfd50..3a1649959e 100644 --- a/oximeter/db/tests/integration_test.rs +++ b/oximeter/db/tests/integration_test.rs @@ -63,7 +63,8 @@ async fn test_schemas_disjoint() -> anyhow::Result<()> { deployment.deploy().context("failed to deploy")?; let client1 = Client::new_with_request_timeout( - deployment.http_addr(1.into())?, + deployment.http_addr(1.into()), + deployment.native_addr(1.into()), log, request_timeout, ); @@ -158,12 +159,14 @@ async fn test_cluster() -> anyhow::Result<()> { deployment.deploy().context("failed to deploy")?; let client1 = Client::new_with_request_timeout( - deployment.http_addr(1.into())?, + deployment.http_addr(1.into()), + deployment.native_addr(1.into()), log, request_timeout, ); let client2 = Client::new_with_request_timeout( - deployment.http_addr(2.into())?, + deployment.http_addr(2.into()), + deployment.native_addr(2.into()), log, request_timeout, ); @@ -228,7 +231,8 @@ async fn test_cluster() -> anyhow::Result<()> { // Add a 3rd clickhouse server and wait for it to come up deployment.add_server().expect("failed to launch a 3rd clickhouse server"); let client3 = Client::new_with_request_timeout( - deployment.http_addr(3.into())?, + deployment.http_addr(3.into()), + deployment.native_addr(3.into()), log, request_timeout, ); @@ -329,7 +333,8 @@ async fn test_cluster() -> anyhow::Result<()> { // few hundred milliseconds. To shorten the length of our test, we create a // new client with a shorter timeout. let client1_short_timeout = Client::new_with_request_timeout( - deployment.http_addr(1.into())?, + deployment.http_addr(1.into()), + deployment.native_addr(1.into()), log, Duration::from_secs(2), ); @@ -450,7 +455,7 @@ async fn wait_for_num_points( Ok(()) } -/// Try to ping the server until it is responds. +/// Try to ping the server until it responds. async fn wait_for_ping(log: &Logger, client: &Client) -> anyhow::Result<()> { poll::wait_for_condition( || async { @@ -459,8 +464,8 @@ async fn wait_for_ping(log: &Logger, client: &Client) -> anyhow::Result<()> { .await .map_err(|_| poll::CondCheckError::::NotYet) }, - &Duration::from_millis(1), - &Duration::from_secs(10), + &Duration::from_millis(100), + &Duration::from_secs(30), ) .await .with_context(|| { diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index ddf57acb4c..6a19d6591b 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -90,7 +90,7 @@ pkcs8 = { version = "0.10.2", default-features = false, features = ["encryption" postgres-types = { version = "0.2.8", default-features = false, features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] } predicates = { version = "3.1.2" } proc-macro2 = { version = "1.0.87" } -qorb = { version = "0.0.2", features = ["qtop"] } +qorb = { version = "0.1.1", features = ["qtop"] } quote = { version = "1.0.37" } rand = { version = "0.8.5", features = ["small_rng"] } regex = { version = "1.11.0" } @@ -206,7 +206,7 @@ pkcs8 = { version = "0.10.2", default-features = false, features = ["encryption" postgres-types = { version = "0.2.8", default-features = false, features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] } predicates = { version = "3.1.2" } proc-macro2 = { version = "1.0.87" } -qorb = { version = "0.0.2", features = ["qtop"] } +qorb = { version = "0.1.1", features = ["qtop"] } quote = { version = "1.0.37" } rand = { version = "0.8.5", features = ["small_rng"] } regex = { version = "1.11.0" }