diff --git a/nexus/src/app/crucible.rs b/nexus/src/app/crucible.rs index fdd67da617..b8fca26c14 100644 --- a/nexus/src/app/crucible.rs +++ b/nexus/src/app/crucible.rs @@ -11,6 +11,8 @@ use crucible_agent_client::types::CreateRegion; use crucible_agent_client::types::GetSnapshotResponse; use crucible_agent_client::types::Region; use crucible_agent_client::types::RegionId; +use crucible_agent_client::types::RunningSnapshot; +use crucible_agent_client::types::Snapshot; use crucible_agent_client::types::State as RegionState; use crucible_agent_client::Client as CrucibleAgentClient; use futures::StreamExt; @@ -262,6 +264,193 @@ impl super::Nexus { Ok(returned_region) } + /// Ensure that a running snapshot for a region snapshot exists and is + /// running. + async fn ensure_crucible_running_snapshot_impl( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + snapshot_id: Uuid, + ) -> Result<(Region, Snapshot, RunningSnapshot), Error> { + // Validate with the Crucible agent that both the underlying region and + // snapshot exist + + info!( + log, + "contacting crucible agent to confirm region exists"; + "dataset" => ?dataset.id(), + "region" => ?region_id, + ); + + let region = match self + .maybe_get_crucible_region(log, dataset, region_id) + .await? + { + Some(region) => { + info!( + log, + "confirmed the region exists"; + "dataset" => ?dataset.id(), + "region" => ?region, + ); + + region + } + + None => { + error!( + log, + "region does not exist!"; + "dataset" => ?dataset.id(), + "region" => ?region_id, + ); + + return Err(Error::invalid_request(format!( + "dataset {:?} region {:?} does not exist!", + dataset.id(), + region_id, + ))); + } + }; + + info!( + log, + "contacting crucible agent to confirm snapshot exists"; + "dataset" => ?dataset.id(), + "region" => ?region_id, + "snapshot" => ?snapshot_id, + ); + + let snapshot = match self + .maybe_get_crucible_snapshot(log, dataset, region_id, snapshot_id) + .await? + { + Some(snapshot) => { + info!( + log, + "confirmed the snapshot exists"; + "dataset" => ?dataset.id(), + "region" => ?region.id, + "snapshot" => ?snapshot, + ); + + snapshot + } + + None => { + // snapshot does not exist! + error!( + log, + "snapshot does not exist!"; + "dataset" => ?dataset.id(), + "region" => ?region_id, + "snapshot" => ?snapshot_id, + ); + + return Err(Error::invalid_request(format!( + "dataset {:?} region {:?} snapshot {:?} does not exist!", + dataset.id(), + region_id, + snapshot_id, + ))); + } + }; + + let client = self.crucible_agent_client_for_dataset(dataset)?; + let dataset_id = dataset.id(); + + // Request the running snapshot start, polling until the state + // transitions from Requested to Created + + let create_running_snapshot = || async { + let running_snapshot = match ProgenitorOperationRetry::new( + || async { + client + .region_run_snapshot( + &RegionId(region_id.to_string()), + &snapshot_id.to_string(), + ) + .await + }, + || async { self.crucible_agent_gone_check(dataset_id).await }, + ) + .run(log) + .await + { + Ok(v) => Ok(v), + + Err(e) => { + error!( + log, + "region_run_snapshot saw {:?}", + e; + "dataset" => %dataset_id, + "region" => %region_id, + "snapshot" => %snapshot_id, + ); + + // Return an error if Nexus is unable to create the + // requested running snapshot + Err(BackoffError::Permanent(WaitError::Permanent( + into_external_error(e), + ))) + } + }?; + + match running_snapshot.state { + RegionState::Requested => { + Err(BackoffError::transient(WaitError::Transient(anyhow!( + "Running snapshot creation in progress" + )))) + } + + RegionState::Created => Ok(running_snapshot), + + _ => Err(BackoffError::Permanent(WaitError::Permanent( + Error::internal_error(&format!( + "Failed to create running snapshot, unexpected \ + state: {:?}", + region.state + )), + ))), + } + }; + + let log_create_failure = |_, delay| { + warn!( + log, + "Running snapshot requested, not yet created. Retrying in {:?}", + delay; + "dataset" => %dataset.id(), + "region" => %region_id, + "snapshot" => %snapshot_id, + ); + }; + + let running_snapshot = backoff::retry_notify( + backoff::retry_policy_internal_service(), + create_running_snapshot, + log_create_failure, + ) + .await + .map_err(|e| match e { + WaitError::Transient(e) => { + // The backoff crate can be configured with a maximum elapsed + // time before giving up, which means that Transient could be + // returned here. Our current policies do **not** set this + // though. + Error::internal_error(&e.to_string()) + } + + WaitError::Permanent(e) => e, + })?; + + let running_snapshot = running_snapshot.into_inner(); + + Ok((region, snapshot, running_snapshot)) + } + /// Returns a Ok(Some(Region)) if a region with id {region_id} exists, /// Ok(None) if it does not (a 404 was seen), and Err otherwise. async fn maybe_get_crucible_region( @@ -306,6 +495,57 @@ impl super::Nexus { } } + /// Returns a Ok(Some(Snapshot)) if a snapshot exists, Ok(None) if it does + /// not (a 404 was seen), and Err otherwise. + async fn maybe_get_crucible_snapshot( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + snapshot_id: Uuid, + ) -> Result, Error> { + let client = self.crucible_agent_client_for_dataset(dataset)?; + let dataset_id = dataset.id(); + + let result = ProgenitorOperationRetry::new( + || async { + client + .region_get_snapshot( + &RegionId(region_id.to_string()), + &snapshot_id.to_string(), + ) + .await + }, + || async { self.crucible_agent_gone_check(dataset_id).await }, + ) + .run(log) + .await; + + match result { + Ok(v) => Ok(Some(v.into_inner())), + + Err(e) => { + if e.is_not_found() { + // A 404 Not Found is ok for this function, just return None + Ok(None) + } else { + error!( + log, + "region_get_snapshot saw {:?}", + e; + "dataset_id" => %dataset_id, + "region_id" => %region_id, + "snapshot_id" => %snapshot_id, + ); + + // Return an error if Nexus is unable to query the dataset's + // agent for the requested snapshot + Err(into_external_error(e)) + } + } + } + } + async fn get_crucible_region_snapshots( &self, log: &Logger, @@ -1022,4 +1262,58 @@ impl super::Nexus { Ok(()) } + + /// Ensure that a Crucible "running snapshot" is created. + pub async fn ensure_crucible_running_snapshot( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + snapshot_id: Uuid, + ) -> Result<(Region, Snapshot, RunningSnapshot), Error> { + self.ensure_crucible_running_snapshot_impl( + log, + dataset, + region_id, + snapshot_id, + ) + .await + } + + /// Given a list of datasets and region snapshots, send POST calls to the + /// datasets corresponding Crucible Agent for each running read-only + /// downstairs corresponding to the snapshot. + pub async fn ensure_crucible_running_snapshots( + &self, + log: &Logger, + datasets_and_snapshots: Vec<( + db::model::Dataset, + db::model::RegionSnapshot, + )>, + ) -> Result, Error> { + let request_count = datasets_and_snapshots.len(); + if request_count == 0 { + return Ok(vec![]); + } + + futures::stream::iter(datasets_and_snapshots) + .map(|(dataset, region_snapshot)| async move { + self.ensure_crucible_running_snapshot_impl( + &log, + &dataset, + region_snapshot.region_id, + region_snapshot.snapshot_id, + ) + .await + }) + // Execute the requests concurrently. + .buffer_unordered(std::cmp::min( + request_count, + MAX_CONCURRENT_REGION_REQUESTS, + )) + .collect::>>() + .await + .into_iter() + .collect::, _>>() + } } diff --git a/nexus/src/app/sagas/snapshot_create.rs b/nexus/src/app/sagas/snapshot_create.rs index 1d6903fa61..76a82e7491 100644 --- a/nexus/src/app/sagas/snapshot_create.rs +++ b/nexus/src/app/sagas/snapshot_create.rs @@ -100,7 +100,6 @@ use crate::app::sagas::declare_saga_actions; use crate::app::{authn, authz, db}; use crate::external_api::params; use anyhow::anyhow; -use crucible_agent_client::{types::RegionId, Client as CrucibleAgentClient}; use nexus_db_model::Generation; use nexus_db_queries::db::identity::{Asset, Resource}; use nexus_db_queries::db::lookup::LookupPath; @@ -1363,60 +1362,15 @@ async fn ssc_start_running_snapshot( ))); }; - // Create a Crucible agent client - let url = format!("http://{}", dataset_addr); - let client = CrucibleAgentClient::new(&url); - - info!( - log, - "contacting crucible agent to confirm region exists"; - "dataset" => ?dataset, - "region" => ?region, - "url" => url, - ); - - // Validate with the Crucible agent that the snapshot exists - let crucible_region = retry_until_known_result(log, || async { - client.region_get(&RegionId(region.id().to_string())).await - }) - .await - .map_err(|e| e.to_string()) - .map_err(ActionError::action_failed)?; - - info!( - log, - "confirmed the region exists with crucible agent"; - "crucible region" => ?crucible_region - ); - - let crucible_snapshot = retry_until_known_result(log, || async { - client - .region_get_snapshot( - &RegionId(region.id().to_string()), - &snapshot_id.to_string(), - ) - .await - }) - .await - .map_err(|e| e.to_string()) - .map_err(ActionError::action_failed)?; - - info!( - log, - "successfully accessed crucible snapshot"; - "crucible snapshot" => ?crucible_snapshot - ); - // Start the snapshot running - let crucible_running_snapshot = - retry_until_known_result(log, || async { - client - .region_run_snapshot( - &RegionId(region.id().to_string()), - &snapshot_id.to_string(), - ) - .await - }) + let (crucible_region, _, crucible_running_snapshot) = osagactx + .nexus() + .ensure_crucible_running_snapshot( + &log, + &dataset, + region.id(), + snapshot_id, + ) .await .map_err(|e| e.to_string()) .map_err(ActionError::action_failed)?; @@ -1424,7 +1378,7 @@ async fn ssc_start_running_snapshot( info!( log, "successfully started running region snapshot"; - "crucible running snapshot" => ?crucible_running_snapshot + "running snapshot" => ?crucible_running_snapshot ); // Map from the region to the snapshot @@ -1437,6 +1391,7 @@ async fn ssc_start_running_snapshot( 0 ) ); + let snapshot_addr = format!( "{}", SocketAddrV6::new( @@ -1446,6 +1401,7 @@ async fn ssc_start_running_snapshot( 0 ) ); + info!(log, "map {} to {}", region_addr, snapshot_addr); map.insert(region_addr, snapshot_addr.clone());