diff --git a/dev-tools/omdb/src/bin/omdb/db.rs b/dev-tools/omdb/src/bin/omdb/db.rs index 6cba2df75a..be4e1e8696 100644 --- a/dev-tools/omdb/src/bin/omdb/db.rs +++ b/dev-tools/omdb/src/bin/omdb/db.rs @@ -76,7 +76,6 @@ use nexus_db_model::SnapshotState; use nexus_db_model::SwCaboose; use nexus_db_model::SwRotPage; use nexus_db_model::UpstairsRepairNotification; -use nexus_db_model::UpstairsRepairNotificationType; use nexus_db_model::UpstairsRepairProgress; use nexus_db_model::Vmm; use nexus_db_model::Volume; @@ -603,7 +602,6 @@ impl DbArgs { command: RegionReplacementCommands::List(args), }) => { cmd_db_region_replacement_list( - &opctx, &datastore, &self.fetch_opts, args, @@ -623,24 +621,14 @@ impl DbArgs { DbCommands::RegionReplacement(RegionReplacementArgs { command: RegionReplacementCommands::Info(args), }) => { - cmd_db_region_replacement_info( - &opctx, - &datastore, - &self.fetch_opts, - args, - ) - .await + cmd_db_region_replacement_info(&opctx, &datastore, args).await } DbCommands::RegionReplacement(RegionReplacementArgs { command: RegionReplacementCommands::Request(args), }) => { let token = omdb.check_allow_destructive()?; cmd_db_region_replacement_request( - &opctx, - &datastore, - &self.fetch_opts, - args, - token, + &opctx, &datastore, args, token, ) .await } @@ -1530,7 +1518,6 @@ async fn cmd_db_snapshot_info( /// List all region replacement requests async fn cmd_db_region_replacement_list( - _opctx: &OpContext, datastore: &DataStore, fetch_opts: &DbFetchOptions, args: &RegionReplacementListArgs, @@ -1538,51 +1525,49 @@ async fn cmd_db_region_replacement_list( let ctx = || "listing region replacement requests".to_string(); let limit = fetch_opts.fetch_limit; - let requests: Vec = datastore - .pool_connection_for_tests() - .await? - .transaction_async(|conn| async move { - use db::schema::region_replacement::dsl; - - match (args.state, args.after) { - (Some(state), Some(after)) => { - dsl::region_replacement - .filter(dsl::replacement_state.eq(state)) - .filter(dsl::request_time.gt(after)) - .limit(i64::from(u32::from(limit))) - .select(RegionReplacement::as_select()) - .get_results_async(&conn) - .await - } + let requests: Vec = { + let conn = datastore.pool_connection_for_tests().await?; - (Some(state), None) => { - dsl::region_replacement - .filter(dsl::replacement_state.eq(state)) - .limit(i64::from(u32::from(limit))) - .select(RegionReplacement::as_select()) - .get_results_async(&conn) - .await - } + use db::schema::region_replacement::dsl; - (None, Some(after)) => { - dsl::region_replacement - .filter(dsl::request_time.gt(after)) - .limit(i64::from(u32::from(limit))) - .select(RegionReplacement::as_select()) - .get_results_async(&conn) - .await - } + match (args.state, args.after) { + (Some(state), Some(after)) => { + dsl::region_replacement + .filter(dsl::replacement_state.eq(state)) + .filter(dsl::request_time.gt(after)) + .limit(i64::from(u32::from(limit))) + .select(RegionReplacement::as_select()) + .get_results_async(&*conn) + .await? + } - (None, None) => { - dsl::region_replacement - .limit(i64::from(u32::from(limit))) - .select(RegionReplacement::as_select()) - .get_results_async(&conn) - .await - } + (Some(state), None) => { + dsl::region_replacement + .filter(dsl::replacement_state.eq(state)) + .limit(i64::from(u32::from(limit))) + .select(RegionReplacement::as_select()) + .get_results_async(&*conn) + .await? + } + + (None, Some(after)) => { + dsl::region_replacement + .filter(dsl::request_time.gt(after)) + .limit(i64::from(u32::from(limit))) + .select(RegionReplacement::as_select()) + .get_results_async(&*conn) + .await? } - }) - .await?; + + (None, None) => { + dsl::region_replacement + .limit(i64::from(u32::from(limit))) + .select(RegionReplacement::as_select()) + .get_results_async(&*conn) + .await? + } + } + }; check_limit(&requests, limit, ctx); @@ -1617,29 +1602,25 @@ async fn cmd_db_region_replacement_list( /// Display all non-complete region replacements async fn cmd_db_region_replacement_status( - _opctx: &OpContext, + opctx: &OpContext, datastore: &DataStore, fetch_opts: &DbFetchOptions, ) -> Result<(), anyhow::Error> { let ctx = || "listing region replacement requests".to_string(); let limit = fetch_opts.fetch_limit; - let requests: Vec = datastore - .pool_connection_for_tests() - .await? - .transaction_async(|conn| async move { - use db::schema::region_replacement::dsl; + let requests: Vec = { + let conn = datastore.pool_connection_for_tests().await?; - dsl::region_replacement - .filter( - dsl::replacement_state.ne(RegionReplacementState::Complete), - ) - .limit(i64::from(u32::from(limit))) - .select(RegionReplacement::as_select()) - .get_results_async(&conn) - .await - }) - .await?; + use db::schema::region_replacement::dsl; + + dsl::region_replacement + .filter(dsl::replacement_state.ne(RegionReplacementState::Complete)) + .limit(i64::from(u32::from(limit))) + .select(RegionReplacement::as_select()) + .get_results_async(&*conn) + .await? + }; check_limit(&requests, limit, ctx); @@ -1658,41 +1639,16 @@ async fn cmd_db_region_replacement_status( // downstairs being repaired is a "new" region id. This will give us // the most recent repair id. let maybe_repair: Option = datastore - .pool_connection_for_tests() - .await? - .transaction_async(|conn| async move { - use db::schema::upstairs_repair_notification::dsl; - - dsl::upstairs_repair_notification - .filter(dsl::region_id.eq(new_region_id)) - .filter( - dsl::notification_type - .eq(UpstairsRepairNotificationType::Started), - ) - .order_by(dsl::time.desc()) - .limit(1) - .first_async(&conn) - .await - .optional() - }) + .most_recent_started_repair_notification(opctx, new_region_id) .await?; if let Some(repair) = maybe_repair { let maybe_repair_progress: Option = datastore - .pool_connection_for_tests() - .await? - .transaction_async(|conn| async move { - use db::schema::upstairs_repair_progress::dsl; - - dsl::upstairs_repair_progress - .filter(dsl::repair_id.eq(repair.repair_id)) - .order_by(dsl::time.desc()) - .limit(1) - .first_async(&conn) - .await - .optional() - }) + .most_recent_repair_progress( + opctx, + repair.repair_id.into(), + ) .await?; if let Some(repair_progress) = maybe_repair_progress { @@ -1720,34 +1676,14 @@ async fn cmd_db_region_replacement_status( /// Show details for a single region replacement async fn cmd_db_region_replacement_info( - _opctx: &OpContext, + opctx: &OpContext, datastore: &DataStore, - _fetch_opts: &DbFetchOptions, args: &RegionReplacementInfoArgs, ) -> Result<(), anyhow::Error> { - let request: Option = datastore - .pool_connection_for_tests() - .await? - .transaction_async(|conn| async move { - use db::schema::region_replacement::dsl; - - dsl::region_replacement - .filter(dsl::id.eq(args.replacement_id)) - .limit(i64::from(1)) - .select(RegionReplacement::as_select()) - .first_async(&conn) - .await - .optional() - }) + let request = datastore + .get_region_replacement_request_by_id(opctx, args.replacement_id) .await?; - let Some(request) = request else { - bail!( - "no region replacement request with id {} found", - args.replacement_id, - ); - }; - // Show details println!(" started: {}", request.request_time); println!(" state: {:?}", request.replacement_state); @@ -1758,18 +1694,7 @@ async fn cmd_db_region_replacement_info( if let Some(new_region_id) = request.new_region_id { // Find all related notifications let notifications: Vec = datastore - .pool_connection_for_tests() - .await? - .transaction_async(|conn| async move { - use db::schema::upstairs_repair_notification::dsl; - - dsl::upstairs_repair_notification - .filter(dsl::region_id.eq(new_region_id)) - .order_by(dsl::time.asc()) - .select(UpstairsRepairNotification::as_select()) - .get_results_async(&conn) - .await - }) + .repair_notifications_for_region(opctx, new_region_id) .await?; #[derive(Tabled)] @@ -1816,41 +1741,13 @@ async fn cmd_db_region_replacement_info( // and use that to search for progress. let maybe_repair: Option = datastore - .pool_connection_for_tests() - .await? - .transaction_async(|conn| async move { - use db::schema::upstairs_repair_notification::dsl; - - dsl::upstairs_repair_notification - .filter(dsl::region_id.eq(new_region_id)) - .filter( - dsl::notification_type - .eq(UpstairsRepairNotificationType::Started), - ) - .order_by(dsl::time.desc()) - .limit(1) - .first_async(&conn) - .await - .optional() - }) + .most_recent_started_repair_notification(opctx, new_region_id) .await?; if let Some(repair) = maybe_repair { let maybe_repair_progress: Option = datastore - .pool_connection_for_tests() - .await? - .transaction_async(|conn| async move { - use db::schema::upstairs_repair_progress::dsl; - - dsl::upstairs_repair_progress - .filter(dsl::repair_id.eq(repair.repair_id)) - .order_by(dsl::time.desc()) - .limit(1) - .first_async(&conn) - .await - .optional() - }) + .most_recent_repair_progress(opctx, repair.repair_id.into()) .await?; if let Some(repair_progress) = maybe_repair_progress { @@ -1872,17 +1769,7 @@ async fn cmd_db_region_replacement_info( // Find the steps that the driver saga has committed to the DB. let steps: Vec = datastore - .pool_connection_for_tests() - .await? - .transaction_async(|conn| async move { - use db::schema::region_replacement_step::dsl; - - dsl::region_replacement_step - .filter(dsl::replacement_id.eq(args.replacement_id)) - .order_by(dsl::step_time.asc()) - .load_async(&conn) - .await - }) + .region_replacement_request_steps(opctx, args.replacement_id) .await?; #[derive(Tabled)] @@ -1938,7 +1825,6 @@ async fn cmd_db_region_replacement_info( async fn cmd_db_region_replacement_request( opctx: &OpContext, datastore: &DataStore, - _fetch_opts: &DbFetchOptions, args: &RegionReplacementRequestArgs, _destruction_token: DestructiveOperationToken, ) -> Result<(), anyhow::Error> { diff --git a/nexus/db-queries/src/db/datastore/region_replacement.rs b/nexus/db-queries/src/db/datastore/region_replacement.rs index d12d123e7e..56e73d2b2c 100644 --- a/nexus/db-queries/src/db/datastore/region_replacement.rs +++ b/nexus/db-queries/src/db/datastore/region_replacement.rs @@ -522,6 +522,24 @@ impl DataStore { .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } + /// Return all steps for a region replacement request + pub async fn region_replacement_request_steps( + &self, + opctx: &OpContext, + id: Uuid, + ) -> Result, Error> { + use db::schema::region_replacement_step::dsl; + + dsl::region_replacement_step + .filter(dsl::replacement_id.eq(id)) + .order_by(dsl::step_time.desc()) + .get_results_async::( + &*self.pool_connection_authorized(opctx).await?, + ) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + /// Record a step taken to drive a region replacement forward pub async fn add_region_replacement_request_step( &self, diff --git a/nexus/db-queries/src/db/datastore/volume.rs b/nexus/db-queries/src/db/datastore/volume.rs index a7b9273aa8..1230c7cd68 100644 --- a/nexus/db-queries/src/db/datastore/volume.rs +++ b/nexus/db-queries/src/db/datastore/volume.rs @@ -1428,6 +1428,73 @@ impl DataStore { Ok(()) } + + /// For a downstairs being repaired, find the most recent repair + /// notification + pub async fn most_recent_started_repair_notification( + &self, + opctx: &OpContext, + region_id: Uuid, + ) -> Result, Error> { + let conn = self.pool_connection_authorized(opctx).await?; + + use db::schema::upstairs_repair_notification::dsl; + + dsl::upstairs_repair_notification + .filter(dsl::region_id.eq(region_id)) + .filter( + dsl::notification_type + .eq(UpstairsRepairNotificationType::Started), + ) + .order_by(dsl::time.desc()) + .limit(1) + .first_async(&*conn) + .await + .optional() + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + + /// For a downstairs being repaired, return all related repair notifications + /// in order of notification time. + pub async fn repair_notifications_for_region( + &self, + opctx: &OpContext, + region_id: Uuid, + ) -> Result, Error> { + let conn = self.pool_connection_authorized(opctx).await?; + + use db::schema::upstairs_repair_notification::dsl; + + dsl::upstairs_repair_notification + .filter(dsl::region_id.eq(region_id)) + .order_by(dsl::time.asc()) + .select(UpstairsRepairNotification::as_select()) + .get_results_async(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + + /// For a repair ID, find the most recent progress notification + pub async fn most_recent_repair_progress( + &self, + opctx: &OpContext, + repair_id: TypedUuid, + ) -> Result, Error> { + let conn = self.pool_connection_authorized(opctx).await?; + + use db::schema::upstairs_repair_progress::dsl; + + dsl::upstairs_repair_progress + .filter( + dsl::repair_id.eq(nexus_db_model::to_db_typed_uuid(repair_id)), + ) + .order_by(dsl::time.desc()) + .limit(1) + .first_async(&*conn) + .await + .optional() + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } } #[derive(Default, Clone, Debug, Serialize, Deserialize)]