Skip to content

Commit

Permalink
Wait for "running snapshots" to be running! (#6216)
Browse files Browse the repository at this point in the history
The snapshot create saga doesn't actually wait for running snapshots to
be created, it simply requests them! This has been mostly fine up until
now, but Nexus should actually wait for the state of the running
snapshot to transition from Requested to Created, meaning that the
Crucible agent has taken all the steps to actually start the appropriate
read-only downstairs.
  • Loading branch information
jmpesp authored Aug 6, 2024
1 parent 5beedcf commit 490fba7
Show file tree
Hide file tree
Showing 2 changed files with 305 additions and 55 deletions.
294 changes: 294 additions & 0 deletions nexus/src/app/crucible.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use crucible_agent_client::types::CreateRegion;
use crucible_agent_client::types::GetSnapshotResponse;
use crucible_agent_client::types::Region;
use crucible_agent_client::types::RegionId;
use crucible_agent_client::types::RunningSnapshot;
use crucible_agent_client::types::Snapshot;
use crucible_agent_client::types::State as RegionState;
use crucible_agent_client::Client as CrucibleAgentClient;
use futures::StreamExt;
Expand Down Expand Up @@ -262,6 +264,193 @@ impl super::Nexus {
Ok(returned_region)
}

/// Ensure that a running snapshot for a region snapshot exists and is
/// running.
async fn ensure_crucible_running_snapshot_impl(
&self,
log: &Logger,
dataset: &db::model::Dataset,
region_id: Uuid,
snapshot_id: Uuid,
) -> Result<(Region, Snapshot, RunningSnapshot), Error> {
// Validate with the Crucible agent that both the underlying region and
// snapshot exist

info!(
log,
"contacting crucible agent to confirm region exists";
"dataset" => ?dataset.id(),
"region" => ?region_id,
);

let region = match self
.maybe_get_crucible_region(log, dataset, region_id)
.await?
{
Some(region) => {
info!(
log,
"confirmed the region exists";
"dataset" => ?dataset.id(),
"region" => ?region,
);

region
}

None => {
error!(
log,
"region does not exist!";
"dataset" => ?dataset.id(),
"region" => ?region_id,
);

return Err(Error::invalid_request(format!(
"dataset {:?} region {:?} does not exist!",
dataset.id(),
region_id,
)));
}
};

info!(
log,
"contacting crucible agent to confirm snapshot exists";
"dataset" => ?dataset.id(),
"region" => ?region_id,
"snapshot" => ?snapshot_id,
);

let snapshot = match self
.maybe_get_crucible_snapshot(log, dataset, region_id, snapshot_id)
.await?
{
Some(snapshot) => {
info!(
log,
"confirmed the snapshot exists";
"dataset" => ?dataset.id(),
"region" => ?region.id,
"snapshot" => ?snapshot,
);

snapshot
}

None => {
// snapshot does not exist!
error!(
log,
"snapshot does not exist!";
"dataset" => ?dataset.id(),
"region" => ?region_id,
"snapshot" => ?snapshot_id,
);

return Err(Error::invalid_request(format!(
"dataset {:?} region {:?} snapshot {:?} does not exist!",
dataset.id(),
region_id,
snapshot_id,
)));
}
};

let client = self.crucible_agent_client_for_dataset(dataset)?;
let dataset_id = dataset.id();

// Request the running snapshot start, polling until the state
// transitions from Requested to Created

let create_running_snapshot = || async {
let running_snapshot = match ProgenitorOperationRetry::new(
|| async {
client
.region_run_snapshot(
&RegionId(region_id.to_string()),
&snapshot_id.to_string(),
)
.await
},
|| async { self.crucible_agent_gone_check(dataset_id).await },
)
.run(log)
.await
{
Ok(v) => Ok(v),

Err(e) => {
error!(
log,
"region_run_snapshot saw {:?}",
e;
"dataset" => %dataset_id,
"region" => %region_id,
"snapshot" => %snapshot_id,
);

// Return an error if Nexus is unable to create the
// requested running snapshot
Err(BackoffError::Permanent(WaitError::Permanent(
into_external_error(e),
)))
}
}?;

match running_snapshot.state {
RegionState::Requested => {
Err(BackoffError::transient(WaitError::Transient(anyhow!(
"Running snapshot creation in progress"
))))
}

RegionState::Created => Ok(running_snapshot),

_ => Err(BackoffError::Permanent(WaitError::Permanent(
Error::internal_error(&format!(
"Failed to create running snapshot, unexpected \
state: {:?}",
region.state
)),
))),
}
};

let log_create_failure = |_, delay| {
warn!(
log,
"Running snapshot requested, not yet created. Retrying in {:?}",
delay;
"dataset" => %dataset.id(),
"region" => %region_id,
"snapshot" => %snapshot_id,
);
};

let running_snapshot = backoff::retry_notify(
backoff::retry_policy_internal_service(),
create_running_snapshot,
log_create_failure,
)
.await
.map_err(|e| match e {
WaitError::Transient(e) => {
// The backoff crate can be configured with a maximum elapsed
// time before giving up, which means that Transient could be
// returned here. Our current policies do **not** set this
// though.
Error::internal_error(&e.to_string())
}

WaitError::Permanent(e) => e,
})?;

let running_snapshot = running_snapshot.into_inner();

Ok((region, snapshot, running_snapshot))
}

/// Returns a Ok(Some(Region)) if a region with id {region_id} exists,
/// Ok(None) if it does not (a 404 was seen), and Err otherwise.
async fn maybe_get_crucible_region(
Expand Down Expand Up @@ -306,6 +495,57 @@ impl super::Nexus {
}
}

/// Returns a Ok(Some(Snapshot)) if a snapshot exists, Ok(None) if it does
/// not (a 404 was seen), and Err otherwise.
async fn maybe_get_crucible_snapshot(
&self,
log: &Logger,
dataset: &db::model::Dataset,
region_id: Uuid,
snapshot_id: Uuid,
) -> Result<Option<Snapshot>, Error> {
let client = self.crucible_agent_client_for_dataset(dataset)?;
let dataset_id = dataset.id();

let result = ProgenitorOperationRetry::new(
|| async {
client
.region_get_snapshot(
&RegionId(region_id.to_string()),
&snapshot_id.to_string(),
)
.await
},
|| async { self.crucible_agent_gone_check(dataset_id).await },
)
.run(log)
.await;

match result {
Ok(v) => Ok(Some(v.into_inner())),

Err(e) => {
if e.is_not_found() {
// A 404 Not Found is ok for this function, just return None
Ok(None)
} else {
error!(
log,
"region_get_snapshot saw {:?}",
e;
"dataset_id" => %dataset_id,
"region_id" => %region_id,
"snapshot_id" => %snapshot_id,
);

// Return an error if Nexus is unable to query the dataset's
// agent for the requested snapshot
Err(into_external_error(e))
}
}
}
}

async fn get_crucible_region_snapshots(
&self,
log: &Logger,
Expand Down Expand Up @@ -1022,4 +1262,58 @@ impl super::Nexus {

Ok(())
}

/// Ensure that a Crucible "running snapshot" is created.
pub async fn ensure_crucible_running_snapshot(
&self,
log: &Logger,
dataset: &db::model::Dataset,
region_id: Uuid,
snapshot_id: Uuid,
) -> Result<(Region, Snapshot, RunningSnapshot), Error> {
self.ensure_crucible_running_snapshot_impl(
log,
dataset,
region_id,
snapshot_id,
)
.await
}

/// Given a list of datasets and region snapshots, send POST calls to the
/// datasets corresponding Crucible Agent for each running read-only
/// downstairs corresponding to the snapshot.
pub async fn ensure_crucible_running_snapshots(
&self,
log: &Logger,
datasets_and_snapshots: Vec<(
db::model::Dataset,
db::model::RegionSnapshot,
)>,
) -> Result<Vec<(Region, Snapshot, RunningSnapshot)>, Error> {
let request_count = datasets_and_snapshots.len();
if request_count == 0 {
return Ok(vec![]);
}

futures::stream::iter(datasets_and_snapshots)
.map(|(dataset, region_snapshot)| async move {
self.ensure_crucible_running_snapshot_impl(
&log,
&dataset,
region_snapshot.region_id,
region_snapshot.snapshot_id,
)
.await
})
// Execute the requests concurrently.
.buffer_unordered(std::cmp::min(
request_count,
MAX_CONCURRENT_REGION_REQUESTS,
))
.collect::<Vec<Result<_, Error>>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
}
}
Loading

0 comments on commit 490fba7

Please sign in to comment.