From f93547ce6cf5996fa308c3a7b9c8ff5ef594631d Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Sat, 25 May 2024 00:07:02 +0000 Subject: [PATCH 1/3] [#3886 2/4] Region replacement omdb commands This commit adds some commands to omdb related to the new region replacement logic: $ ./target/debug/omdb db region-replacement Query for information about region replacements, optionally manually triggering one Usage: omdb db region-replacement [OPTIONS] Commands: list List region replacement requests status Show current region replacements and their status info Show detailed information for a region replacement request Manually request a region replacement help Print this message or the help of the given subcommand(s) `list` will list all region replacement requests, along with their request time and state. `status` will show a summary of all non-Complete region replacements, along with their state and progress. `info` will show a detailed view of a region replacement, starting with the details that the `status` summary shows, then showing all related notifications and steps taken to drive the replacement forward. Finally, `request` will request that a region be replaced, and return the ID of the replacement. --- Cargo.lock | 1 + dev-tools/omdb/Cargo.toml | 1 + dev-tools/omdb/src/bin/omdb/db.rs | 527 ++++++++++++++++++++++++++ dev-tools/omdb/tests/usage_errors.out | 4 + 4 files changed, 533 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 1dfaff0d77..1e73e29784 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5514,6 +5514,7 @@ dependencies = [ "gateway-messages", "gateway-test-utils", "humantime", + "indicatif", "internal-dns", "ipnetwork", "multimap", diff --git a/dev-tools/omdb/Cargo.toml b/dev-tools/omdb/Cargo.toml index 3c466b1683..9cdf03093c 100644 --- a/dev-tools/omdb/Cargo.toml +++ b/dev-tools/omdb/Cargo.toml @@ -55,6 +55,7 @@ uuid.workspace = true ipnetwork.workspace = true omicron-workspace-hack.workspace = true multimap.workspace = true +indicatif.workspace = true [dev-dependencies] expectorate.workspace = true diff --git a/dev-tools/omdb/src/bin/omdb/db.rs b/dev-tools/omdb/src/bin/omdb/db.rs index 549f289ad0..6208c60dc0 100644 --- a/dev-tools/omdb/src/bin/omdb/db.rs +++ b/dev-tools/omdb/src/bin/omdb/db.rs @@ -15,6 +15,7 @@ // NOTE: emanates from Tabled macros #![allow(clippy::useless_vec)] +use crate::check_allow_destructive::DestructiveOperationToken; use crate::helpers::CONNECTION_OPTIONS_HEADING; use crate::helpers::DATABASE_OPTIONS_HEADING; use crate::Omdb; @@ -25,7 +26,9 @@ use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; use async_bb8_diesel::AsyncSimpleConnection; use camino::Utf8PathBuf; +use chrono::DateTime; use chrono::SecondsFormat; +use chrono::Utc; use clap::ArgAction; use clap::Args; use clap::Subcommand; @@ -39,6 +42,9 @@ use diesel::NullableExpressionMethods; use diesel::OptionalExtension; use diesel::TextExpressionMethods; use gateway_client::types::SpType; +use indicatif::ProgressBar; +use indicatif::ProgressDrawTarget; +use indicatif::ProgressStyle; use ipnetwork::IpNetwork; use nexus_config::PostgresConfigWithUrl; use nexus_db_model::Dataset; @@ -59,12 +65,19 @@ use nexus_db_model::NetworkInterfaceKind; use nexus_db_model::Probe; use nexus_db_model::Project; use nexus_db_model::Region; +use nexus_db_model::RegionReplacement; +use nexus_db_model::RegionReplacementState; +use nexus_db_model::RegionReplacementStep; +use nexus_db_model::RegionReplacementStepType; use nexus_db_model::RegionSnapshot; use nexus_db_model::Sled; use nexus_db_model::Snapshot; use nexus_db_model::SnapshotState; use nexus_db_model::SwCaboose; use nexus_db_model::SwRotPage; +use nexus_db_model::UpstairsRepairNotification; +use nexus_db_model::UpstairsRepairNotificationType; +use nexus_db_model::UpstairsRepairProgress; use nexus_db_model::Vmm; use nexus_db_model::Volume; use nexus_db_model::VpcSubnet; @@ -270,6 +283,9 @@ enum DbCommands { Inventory(InventoryArgs), /// Save the current Reconfigurator inputs to a file ReconfiguratorSave(ReconfiguratorSaveArgs), + /// Query for information about region replacements, optionally manually + /// triggering one. + RegionReplacement(RegionReplacementArgs), /// Print information about sleds Sleds(SledsArgs), /// Print information about customer instances @@ -434,6 +450,47 @@ struct SledsArgs { filter: Option, } +#[derive(Debug, Args)] +struct RegionReplacementArgs { + #[command(subcommand)] + command: RegionReplacementCommands, +} + +#[derive(Debug, Subcommand)] +enum RegionReplacementCommands { + /// List region replacement requests + List(RegionReplacementListArgs), + /// Show current region replacements and their status + Status, + /// Show detailed information for a region replacement + Info(RegionReplacementInfoArgs), + /// Manually request a region replacement + Request(RegionReplacementRequestArgs), +} + +#[derive(Debug, Args)] +struct RegionReplacementListArgs { + /// Only show region replacement requests in this state + #[clap(long)] + state: Option, + + /// Only show region replacement requests after a certain date + #[clap(long)] + after: Option>, +} + +#[derive(Debug, Args)] +struct RegionReplacementInfoArgs { + /// The UUID of the region replacement request + replacement_id: Uuid, +} + +#[derive(Debug, Args)] +struct RegionReplacementRequestArgs { + /// The UUID of the region to replace + region_id: Uuid, +} + #[derive(Debug, Args)] struct NetworkArgs { #[command(subcommand)] @@ -542,6 +599,51 @@ impl DbArgs { ) .await } + DbCommands::RegionReplacement(RegionReplacementArgs { + command: RegionReplacementCommands::List(args), + }) => { + cmd_db_region_replacement_list( + &opctx, + &datastore, + &self.fetch_opts, + args, + ) + .await + } + DbCommands::RegionReplacement(RegionReplacementArgs { + command: RegionReplacementCommands::Status, + }) => { + cmd_db_region_replacement_status( + &opctx, + &datastore, + &self.fetch_opts, + ) + .await + } + DbCommands::RegionReplacement(RegionReplacementArgs { + command: RegionReplacementCommands::Info(args), + }) => { + cmd_db_region_replacement_info( + &opctx, + &datastore, + &self.fetch_opts, + args, + ) + .await + } + DbCommands::RegionReplacement(RegionReplacementArgs { + command: RegionReplacementCommands::Request(args), + }) => { + let token = omdb.check_allow_destructive()?; + cmd_db_region_replacement_request( + &opctx, + &datastore, + &self.fetch_opts, + args, + token, + ) + .await + } DbCommands::Sleds(args) => { cmd_db_sleds(&opctx, &datastore, &self.fetch_opts, args).await } @@ -1426,6 +1528,431 @@ async fn cmd_db_snapshot_info( Ok(()) } +/// List all region replacement requests +async fn cmd_db_region_replacement_list( + _opctx: &OpContext, + datastore: &DataStore, + fetch_opts: &DbFetchOptions, + args: &RegionReplacementListArgs, +) -> Result<(), anyhow::Error> { + let ctx = || "listing region replacement requests".to_string(); + let limit = fetch_opts.fetch_limit; + + let requests: Vec = datastore + .pool_connection_for_tests() + .await? + .transaction_async(|conn| async move { + use db::schema::region_replacement::dsl; + + match (args.state, args.after) { + (Some(state), Some(after)) => { + dsl::region_replacement + .filter(dsl::replacement_state.eq(state)) + .filter(dsl::request_time.gt(after)) + .limit(i64::from(u32::from(limit))) + .select(RegionReplacement::as_select()) + .get_results_async(&conn) + .await + } + + (Some(state), None) => { + dsl::region_replacement + .filter(dsl::replacement_state.eq(state)) + .limit(i64::from(u32::from(limit))) + .select(RegionReplacement::as_select()) + .get_results_async(&conn) + .await + } + + (None, Some(after)) => { + dsl::region_replacement + .filter(dsl::request_time.gt(after)) + .limit(i64::from(u32::from(limit))) + .select(RegionReplacement::as_select()) + .get_results_async(&conn) + .await + } + + (None, None) => { + dsl::region_replacement + .limit(i64::from(u32::from(limit))) + .select(RegionReplacement::as_select()) + .get_results_async(&conn) + .await + } + } + }) + .await?; + + check_limit(&requests, limit, ctx); + + #[derive(Tabled)] + #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] + struct Row { + pub id: Uuid, + pub request_time: DateTime, + pub replacement_state: String, + } + + let mut rows = Vec::with_capacity(requests.len()); + + for request in requests { + rows.push(Row { + id: request.id, + request_time: request.request_time, + replacement_state: format!("{:?}", request.replacement_state), + }); + } + + let table = tabled::Table::new(rows) + .with(tabled::settings::Style::empty()) + .with(tabled::settings::Padding::new(0, 1, 0, 0)) + .with(tabled::settings::Panel::header("Region replacement requests")) + .to_string(); + + println!("{}", table); + + Ok(()) +} + +/// Display all non-complete region replacements +async fn cmd_db_region_replacement_status( + _opctx: &OpContext, + datastore: &DataStore, + fetch_opts: &DbFetchOptions, +) -> Result<(), anyhow::Error> { + let ctx = || "listing region replacement requests".to_string(); + let limit = fetch_opts.fetch_limit; + + let requests: Vec = datastore + .pool_connection_for_tests() + .await? + .transaction_async(|conn| async move { + use db::schema::region_replacement::dsl; + + dsl::region_replacement + .filter( + dsl::replacement_state.ne(RegionReplacementState::Complete), + ) + .limit(i64::from(u32::from(limit))) + .select(RegionReplacement::as_select()) + .get_results_async(&conn) + .await + }) + .await?; + + check_limit(&requests, limit, ctx); + + for request in requests { + println!("{}:", request.id); + println!(); + + println!(" started: {}", request.request_time); + println!(" state: {:?}", request.replacement_state); + println!("old region id: {}", request.old_region_id); + println!("new region id: {:?}", request.new_region_id); + println!(); + + if let Some(new_region_id) = request.new_region_id { + println!(); + + // Find the most recent upstairs repair notification where the + // downstairs being repaired is a "new" region id. This will give us + // the most recent repair id. + let maybe_repair: Option = datastore + .pool_connection_for_tests() + .await? + .transaction_async(|conn| async move { + use db::schema::upstairs_repair_notification::dsl; + + dsl::upstairs_repair_notification + .filter(dsl::region_id.eq(new_region_id)) + .filter( + dsl::notification_type + .eq(UpstairsRepairNotificationType::Started), + ) + .order_by(dsl::time.desc()) + .limit(1) + .first_async(&conn) + .await + .optional() + }) + .await?; + + if let Some(repair) = maybe_repair { + let maybe_repair_progress: Option = + datastore + .pool_connection_for_tests() + .await? + .transaction_async(|conn| async move { + use db::schema::upstairs_repair_progress::dsl; + + dsl::upstairs_repair_progress + .filter(dsl::repair_id.eq(repair.repair_id)) + .order_by(dsl::time.desc()) + .limit(1) + .first_async(&conn) + .await + .optional() + }) + .await?; + + if let Some(repair_progress) = maybe_repair_progress { + let bar = ProgressBar::with_draw_target( + Some(repair_progress.total_items as u64), + ProgressDrawTarget::stdout(), + ) + .with_style(ProgressStyle::with_template( + "progress:\t{wide_bar:.green} [{pos:>7}/{len:>7}]", + )?) + .with_position(repair_progress.current_item as u64); + + bar.abandon(); + } + } + } + + println!(); + } + + Ok(()) +} + +/// Show details for a single region replacement +async fn cmd_db_region_replacement_info( + _opctx: &OpContext, + datastore: &DataStore, + _fetch_opts: &DbFetchOptions, + args: &RegionReplacementInfoArgs, +) -> Result<(), anyhow::Error> { + let request: Option = datastore + .pool_connection_for_tests() + .await? + .transaction_async(|conn| async move { + use db::schema::region_replacement::dsl; + + dsl::region_replacement + .filter(dsl::id.eq(args.replacement_id)) + .limit(i64::from(1)) + .select(RegionReplacement::as_select()) + .first_async(&conn) + .await + .optional() + }) + .await?; + + let Some(request) = request else { + bail!( + "no region replacement request with id {} found", + args.replacement_id, + ); + }; + + // Show details + println!(" started: {}", request.request_time); + println!(" state: {:?}", request.replacement_state); + println!("old region id: {}", request.old_region_id); + println!("new region id: {:?}", request.new_region_id); + println!(); + + if let Some(new_region_id) = request.new_region_id { + // Find all related notifications + let notifications: Vec = datastore + .pool_connection_for_tests() + .await? + .transaction_async(|conn| async move { + use db::schema::upstairs_repair_notification::dsl; + + dsl::upstairs_repair_notification + .filter(dsl::region_id.eq(new_region_id)) + .order_by(dsl::time.asc()) + .select(UpstairsRepairNotification::as_select()) + .get_results_async(&conn) + .await + }) + .await?; + + #[derive(Tabled)] + #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] + struct Row { + pub time: DateTime, + + pub repair_id: String, + pub repair_type: String, + + pub upstairs_id: String, + pub session_id: String, + + pub notification_type: String, + } + + let mut rows = Vec::with_capacity(notifications.len()); + + for notification in ¬ifications { + rows.push(Row { + time: notification.time, + repair_id: notification.repair_id.to_string(), + repair_type: format!("{:?}", notification.repair_type), + upstairs_id: notification.upstairs_id.to_string(), + session_id: notification.session_id.to_string(), + notification_type: format!( + "{:?}", + notification.notification_type + ), + }); + } + + let table = tabled::Table::new(rows) + .with(tabled::settings::Style::empty()) + .with(tabled::settings::Padding::new(0, 1, 0, 0)) + .with(tabled::settings::Panel::header("Repair notifications")) + .to_string(); + + println!("{}", table); + + println!(); + + // Use the most recent notification to get the most recent repair ID, + // and use that to search for progress. + + let maybe_repair: Option = datastore + .pool_connection_for_tests() + .await? + .transaction_async(|conn| async move { + use db::schema::upstairs_repair_notification::dsl; + + dsl::upstairs_repair_notification + .filter(dsl::region_id.eq(new_region_id)) + .filter( + dsl::notification_type + .eq(UpstairsRepairNotificationType::Started), + ) + .order_by(dsl::time.desc()) + .limit(1) + .first_async(&conn) + .await + .optional() + }) + .await?; + + if let Some(repair) = maybe_repair { + let maybe_repair_progress: Option = + datastore + .pool_connection_for_tests() + .await? + .transaction_async(|conn| async move { + use db::schema::upstairs_repair_progress::dsl; + + dsl::upstairs_repair_progress + .filter(dsl::repair_id.eq(repair.repair_id)) + .order_by(dsl::time.desc()) + .limit(1) + .first_async(&conn) + .await + .optional() + }) + .await?; + + if let Some(repair_progress) = maybe_repair_progress { + let bar = ProgressBar::with_draw_target( + Some(repair_progress.total_items as u64), + ProgressDrawTarget::stdout(), + ) + .with_style(ProgressStyle::with_template( + "progress:\t{wide_bar:.green} [{pos:>7}/{len:>7}]", + )?) + .with_position(repair_progress.current_item as u64); + + bar.abandon(); + + println!(); + } + } + + // Find the steps that the driver saga has committed to the DB. + + let steps: Vec = datastore + .pool_connection_for_tests() + .await? + .transaction_async(|conn| async move { + use db::schema::region_replacement_step::dsl; + + dsl::region_replacement_step + .filter(dsl::replacement_id.eq(args.replacement_id)) + .order_by(dsl::step_time.asc()) + .load_async(&conn) + .await + }) + .await?; + + #[derive(Tabled)] + #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] + struct StepRow { + pub time: DateTime, + pub step_type: String, + pub details: String, + } + + let mut rows = Vec::with_capacity(steps.len()); + + for step in steps { + rows.push(StepRow { + time: step.step_time, + step_type: format!("{:?}", step.step_type), + details: match step.step_type { + RegionReplacementStepType::Propolis => { + format!( + "instance {:?} vmm {:?}", + step.step_associated_instance_id, + step.step_associated_vmm_id, + ) + } + + RegionReplacementStepType::Pantry => { + format!( + "address {:?}:{:?} job {:?}", + step.step_associated_pantry_ip, + step.step_associated_pantry_port, + step.step_associated_pantry_job_id, + ) + } + }, + }); + } + + println!(); + + let table = tabled::Table::new(rows) + .with(tabled::settings::Style::empty()) + .with(tabled::settings::Padding::new(0, 1, 0, 0)) + .with(tabled::settings::Panel::header("Repair steps")) + .to_string(); + + println!("{}", table); + } + + Ok(()) +} + +/// Manually request a region replacement +async fn cmd_db_region_replacement_request( + opctx: &OpContext, + datastore: &DataStore, + _fetch_opts: &DbFetchOptions, + args: &RegionReplacementRequestArgs, + _destruction_token: DestructiveOperationToken, +) -> Result<(), anyhow::Error> { + let region = datastore.get_region(args.region_id).await?; + + let request_id = datastore + .create_region_replacement_request_for_region(opctx, ®ion) + .await?; + + println!("region replacement {request_id} created"); + + Ok(()) +} + // SLEDS #[derive(Tabled)] diff --git a/dev-tools/omdb/tests/usage_errors.out b/dev-tools/omdb/tests/usage_errors.out index 15fc9d322e..563d23d6f3 100644 --- a/dev-tools/omdb/tests/usage_errors.out +++ b/dev-tools/omdb/tests/usage_errors.out @@ -107,6 +107,8 @@ Commands: dns Print information about internal and external DNS inventory Print information about collected hardware/software inventory reconfigurator-save Save the current Reconfigurator inputs to a file + region-replacement Query for information about region replacements, optionally manually + triggering one sleds Print information about sleds instances Print information about customer instances network Print information about the network @@ -145,6 +147,8 @@ Commands: dns Print information about internal and external DNS inventory Print information about collected hardware/software inventory reconfigurator-save Save the current Reconfigurator inputs to a file + region-replacement Query for information about region replacements, optionally manually + triggering one sleds Print information about sleds instances Print information about customer instances network Print information about the network From 9970286c4b70f49f7f92e527e19061c964ea2de4 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Thu, 30 May 2024 01:57:12 +0000 Subject: [PATCH 2/3] move the println after the bar.abandon --- dev-tools/omdb/src/bin/omdb/db.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev-tools/omdb/src/bin/omdb/db.rs b/dev-tools/omdb/src/bin/omdb/db.rs index 6208c60dc0..6cba2df75a 100644 --- a/dev-tools/omdb/src/bin/omdb/db.rs +++ b/dev-tools/omdb/src/bin/omdb/db.rs @@ -1654,8 +1654,6 @@ async fn cmd_db_region_replacement_status( println!(); if let Some(new_region_id) = request.new_region_id { - println!(); - // Find the most recent upstairs repair notification where the // downstairs being repaired is a "new" region id. This will give us // the most recent repair id. @@ -1708,6 +1706,8 @@ async fn cmd_db_region_replacement_status( .with_position(repair_progress.current_item as u64); bar.abandon(); + + println!(); } } } From a155af9134db2bcbdb1add772f94f4da5dd009f8 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Thu, 30 May 2024 03:47:02 +0000 Subject: [PATCH 3/3] Move some queries into DataStore impls remove unused arguments don't use a transaction when only one query is performed --- dev-tools/omdb/src/bin/omdb/db.rs | 244 +++++------------- .../src/db/datastore/region_replacement.rs | 18 ++ nexus/db-queries/src/db/datastore/volume.rs | 67 +++++ 3 files changed, 150 insertions(+), 179 deletions(-) diff --git a/dev-tools/omdb/src/bin/omdb/db.rs b/dev-tools/omdb/src/bin/omdb/db.rs index 6cba2df75a..be4e1e8696 100644 --- a/dev-tools/omdb/src/bin/omdb/db.rs +++ b/dev-tools/omdb/src/bin/omdb/db.rs @@ -76,7 +76,6 @@ use nexus_db_model::SnapshotState; use nexus_db_model::SwCaboose; use nexus_db_model::SwRotPage; use nexus_db_model::UpstairsRepairNotification; -use nexus_db_model::UpstairsRepairNotificationType; use nexus_db_model::UpstairsRepairProgress; use nexus_db_model::Vmm; use nexus_db_model::Volume; @@ -603,7 +602,6 @@ impl DbArgs { command: RegionReplacementCommands::List(args), }) => { cmd_db_region_replacement_list( - &opctx, &datastore, &self.fetch_opts, args, @@ -623,24 +621,14 @@ impl DbArgs { DbCommands::RegionReplacement(RegionReplacementArgs { command: RegionReplacementCommands::Info(args), }) => { - cmd_db_region_replacement_info( - &opctx, - &datastore, - &self.fetch_opts, - args, - ) - .await + cmd_db_region_replacement_info(&opctx, &datastore, args).await } DbCommands::RegionReplacement(RegionReplacementArgs { command: RegionReplacementCommands::Request(args), }) => { let token = omdb.check_allow_destructive()?; cmd_db_region_replacement_request( - &opctx, - &datastore, - &self.fetch_opts, - args, - token, + &opctx, &datastore, args, token, ) .await } @@ -1530,7 +1518,6 @@ async fn cmd_db_snapshot_info( /// List all region replacement requests async fn cmd_db_region_replacement_list( - _opctx: &OpContext, datastore: &DataStore, fetch_opts: &DbFetchOptions, args: &RegionReplacementListArgs, @@ -1538,51 +1525,49 @@ async fn cmd_db_region_replacement_list( let ctx = || "listing region replacement requests".to_string(); let limit = fetch_opts.fetch_limit; - let requests: Vec = datastore - .pool_connection_for_tests() - .await? - .transaction_async(|conn| async move { - use db::schema::region_replacement::dsl; - - match (args.state, args.after) { - (Some(state), Some(after)) => { - dsl::region_replacement - .filter(dsl::replacement_state.eq(state)) - .filter(dsl::request_time.gt(after)) - .limit(i64::from(u32::from(limit))) - .select(RegionReplacement::as_select()) - .get_results_async(&conn) - .await - } + let requests: Vec = { + let conn = datastore.pool_connection_for_tests().await?; - (Some(state), None) => { - dsl::region_replacement - .filter(dsl::replacement_state.eq(state)) - .limit(i64::from(u32::from(limit))) - .select(RegionReplacement::as_select()) - .get_results_async(&conn) - .await - } + use db::schema::region_replacement::dsl; - (None, Some(after)) => { - dsl::region_replacement - .filter(dsl::request_time.gt(after)) - .limit(i64::from(u32::from(limit))) - .select(RegionReplacement::as_select()) - .get_results_async(&conn) - .await - } + match (args.state, args.after) { + (Some(state), Some(after)) => { + dsl::region_replacement + .filter(dsl::replacement_state.eq(state)) + .filter(dsl::request_time.gt(after)) + .limit(i64::from(u32::from(limit))) + .select(RegionReplacement::as_select()) + .get_results_async(&*conn) + .await? + } - (None, None) => { - dsl::region_replacement - .limit(i64::from(u32::from(limit))) - .select(RegionReplacement::as_select()) - .get_results_async(&conn) - .await - } + (Some(state), None) => { + dsl::region_replacement + .filter(dsl::replacement_state.eq(state)) + .limit(i64::from(u32::from(limit))) + .select(RegionReplacement::as_select()) + .get_results_async(&*conn) + .await? + } + + (None, Some(after)) => { + dsl::region_replacement + .filter(dsl::request_time.gt(after)) + .limit(i64::from(u32::from(limit))) + .select(RegionReplacement::as_select()) + .get_results_async(&*conn) + .await? } - }) - .await?; + + (None, None) => { + dsl::region_replacement + .limit(i64::from(u32::from(limit))) + .select(RegionReplacement::as_select()) + .get_results_async(&*conn) + .await? + } + } + }; check_limit(&requests, limit, ctx); @@ -1617,29 +1602,25 @@ async fn cmd_db_region_replacement_list( /// Display all non-complete region replacements async fn cmd_db_region_replacement_status( - _opctx: &OpContext, + opctx: &OpContext, datastore: &DataStore, fetch_opts: &DbFetchOptions, ) -> Result<(), anyhow::Error> { let ctx = || "listing region replacement requests".to_string(); let limit = fetch_opts.fetch_limit; - let requests: Vec = datastore - .pool_connection_for_tests() - .await? - .transaction_async(|conn| async move { - use db::schema::region_replacement::dsl; + let requests: Vec = { + let conn = datastore.pool_connection_for_tests().await?; - dsl::region_replacement - .filter( - dsl::replacement_state.ne(RegionReplacementState::Complete), - ) - .limit(i64::from(u32::from(limit))) - .select(RegionReplacement::as_select()) - .get_results_async(&conn) - .await - }) - .await?; + use db::schema::region_replacement::dsl; + + dsl::region_replacement + .filter(dsl::replacement_state.ne(RegionReplacementState::Complete)) + .limit(i64::from(u32::from(limit))) + .select(RegionReplacement::as_select()) + .get_results_async(&*conn) + .await? + }; check_limit(&requests, limit, ctx); @@ -1658,41 +1639,16 @@ async fn cmd_db_region_replacement_status( // downstairs being repaired is a "new" region id. This will give us // the most recent repair id. let maybe_repair: Option = datastore - .pool_connection_for_tests() - .await? - .transaction_async(|conn| async move { - use db::schema::upstairs_repair_notification::dsl; - - dsl::upstairs_repair_notification - .filter(dsl::region_id.eq(new_region_id)) - .filter( - dsl::notification_type - .eq(UpstairsRepairNotificationType::Started), - ) - .order_by(dsl::time.desc()) - .limit(1) - .first_async(&conn) - .await - .optional() - }) + .most_recent_started_repair_notification(opctx, new_region_id) .await?; if let Some(repair) = maybe_repair { let maybe_repair_progress: Option = datastore - .pool_connection_for_tests() - .await? - .transaction_async(|conn| async move { - use db::schema::upstairs_repair_progress::dsl; - - dsl::upstairs_repair_progress - .filter(dsl::repair_id.eq(repair.repair_id)) - .order_by(dsl::time.desc()) - .limit(1) - .first_async(&conn) - .await - .optional() - }) + .most_recent_repair_progress( + opctx, + repair.repair_id.into(), + ) .await?; if let Some(repair_progress) = maybe_repair_progress { @@ -1720,34 +1676,14 @@ async fn cmd_db_region_replacement_status( /// Show details for a single region replacement async fn cmd_db_region_replacement_info( - _opctx: &OpContext, + opctx: &OpContext, datastore: &DataStore, - _fetch_opts: &DbFetchOptions, args: &RegionReplacementInfoArgs, ) -> Result<(), anyhow::Error> { - let request: Option = datastore - .pool_connection_for_tests() - .await? - .transaction_async(|conn| async move { - use db::schema::region_replacement::dsl; - - dsl::region_replacement - .filter(dsl::id.eq(args.replacement_id)) - .limit(i64::from(1)) - .select(RegionReplacement::as_select()) - .first_async(&conn) - .await - .optional() - }) + let request = datastore + .get_region_replacement_request_by_id(opctx, args.replacement_id) .await?; - let Some(request) = request else { - bail!( - "no region replacement request with id {} found", - args.replacement_id, - ); - }; - // Show details println!(" started: {}", request.request_time); println!(" state: {:?}", request.replacement_state); @@ -1758,18 +1694,7 @@ async fn cmd_db_region_replacement_info( if let Some(new_region_id) = request.new_region_id { // Find all related notifications let notifications: Vec = datastore - .pool_connection_for_tests() - .await? - .transaction_async(|conn| async move { - use db::schema::upstairs_repair_notification::dsl; - - dsl::upstairs_repair_notification - .filter(dsl::region_id.eq(new_region_id)) - .order_by(dsl::time.asc()) - .select(UpstairsRepairNotification::as_select()) - .get_results_async(&conn) - .await - }) + .repair_notifications_for_region(opctx, new_region_id) .await?; #[derive(Tabled)] @@ -1816,41 +1741,13 @@ async fn cmd_db_region_replacement_info( // and use that to search for progress. let maybe_repair: Option = datastore - .pool_connection_for_tests() - .await? - .transaction_async(|conn| async move { - use db::schema::upstairs_repair_notification::dsl; - - dsl::upstairs_repair_notification - .filter(dsl::region_id.eq(new_region_id)) - .filter( - dsl::notification_type - .eq(UpstairsRepairNotificationType::Started), - ) - .order_by(dsl::time.desc()) - .limit(1) - .first_async(&conn) - .await - .optional() - }) + .most_recent_started_repair_notification(opctx, new_region_id) .await?; if let Some(repair) = maybe_repair { let maybe_repair_progress: Option = datastore - .pool_connection_for_tests() - .await? - .transaction_async(|conn| async move { - use db::schema::upstairs_repair_progress::dsl; - - dsl::upstairs_repair_progress - .filter(dsl::repair_id.eq(repair.repair_id)) - .order_by(dsl::time.desc()) - .limit(1) - .first_async(&conn) - .await - .optional() - }) + .most_recent_repair_progress(opctx, repair.repair_id.into()) .await?; if let Some(repair_progress) = maybe_repair_progress { @@ -1872,17 +1769,7 @@ async fn cmd_db_region_replacement_info( // Find the steps that the driver saga has committed to the DB. let steps: Vec = datastore - .pool_connection_for_tests() - .await? - .transaction_async(|conn| async move { - use db::schema::region_replacement_step::dsl; - - dsl::region_replacement_step - .filter(dsl::replacement_id.eq(args.replacement_id)) - .order_by(dsl::step_time.asc()) - .load_async(&conn) - .await - }) + .region_replacement_request_steps(opctx, args.replacement_id) .await?; #[derive(Tabled)] @@ -1938,7 +1825,6 @@ async fn cmd_db_region_replacement_info( async fn cmd_db_region_replacement_request( opctx: &OpContext, datastore: &DataStore, - _fetch_opts: &DbFetchOptions, args: &RegionReplacementRequestArgs, _destruction_token: DestructiveOperationToken, ) -> Result<(), anyhow::Error> { diff --git a/nexus/db-queries/src/db/datastore/region_replacement.rs b/nexus/db-queries/src/db/datastore/region_replacement.rs index d12d123e7e..56e73d2b2c 100644 --- a/nexus/db-queries/src/db/datastore/region_replacement.rs +++ b/nexus/db-queries/src/db/datastore/region_replacement.rs @@ -522,6 +522,24 @@ impl DataStore { .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } + /// Return all steps for a region replacement request + pub async fn region_replacement_request_steps( + &self, + opctx: &OpContext, + id: Uuid, + ) -> Result, Error> { + use db::schema::region_replacement_step::dsl; + + dsl::region_replacement_step + .filter(dsl::replacement_id.eq(id)) + .order_by(dsl::step_time.desc()) + .get_results_async::( + &*self.pool_connection_authorized(opctx).await?, + ) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + /// Record a step taken to drive a region replacement forward pub async fn add_region_replacement_request_step( &self, diff --git a/nexus/db-queries/src/db/datastore/volume.rs b/nexus/db-queries/src/db/datastore/volume.rs index a7b9273aa8..1230c7cd68 100644 --- a/nexus/db-queries/src/db/datastore/volume.rs +++ b/nexus/db-queries/src/db/datastore/volume.rs @@ -1428,6 +1428,73 @@ impl DataStore { Ok(()) } + + /// For a downstairs being repaired, find the most recent repair + /// notification + pub async fn most_recent_started_repair_notification( + &self, + opctx: &OpContext, + region_id: Uuid, + ) -> Result, Error> { + let conn = self.pool_connection_authorized(opctx).await?; + + use db::schema::upstairs_repair_notification::dsl; + + dsl::upstairs_repair_notification + .filter(dsl::region_id.eq(region_id)) + .filter( + dsl::notification_type + .eq(UpstairsRepairNotificationType::Started), + ) + .order_by(dsl::time.desc()) + .limit(1) + .first_async(&*conn) + .await + .optional() + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + + /// For a downstairs being repaired, return all related repair notifications + /// in order of notification time. + pub async fn repair_notifications_for_region( + &self, + opctx: &OpContext, + region_id: Uuid, + ) -> Result, Error> { + let conn = self.pool_connection_authorized(opctx).await?; + + use db::schema::upstairs_repair_notification::dsl; + + dsl::upstairs_repair_notification + .filter(dsl::region_id.eq(region_id)) + .order_by(dsl::time.asc()) + .select(UpstairsRepairNotification::as_select()) + .get_results_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + + /// For a repair ID, find the most recent progress notification + pub async fn most_recent_repair_progress( + &self, + opctx: &OpContext, + repair_id: TypedUuid, + ) -> Result, Error> { + let conn = self.pool_connection_authorized(opctx).await?; + + use db::schema::upstairs_repair_progress::dsl; + + dsl::upstairs_repair_progress + .filter( + dsl::repair_id.eq(nexus_db_model::to_db_typed_uuid(repair_id)), + ) + .order_by(dsl::time.desc()) + .limit(1) + .first_async(&*conn) + .await + .optional() + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } } #[derive(Default, Clone, Debug, Serialize, Deserialize)]