Skip to content

Commit

Permalink
manually fmt long strings, then cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
jmpesp committed Jun 6, 2024
1 parent 62f0aa8 commit 5f5d732
Showing 1 changed file with 86 additions and 36 deletions.
122 changes: 86 additions & 36 deletions nexus/src/app/background/region_replacement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ use crate::app::sagas;
use crate::app::RegionAllocationStrategy;
use futures::future::BoxFuture;
use futures::FutureExt;
use nexus_db_model::RegionReplacement;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db::DataStore;
use omicron_uuid_kinds::GenericUuid;
use omicron_uuid_kinds::TypedUuid;
use serde_json::json;
use std::sync::Arc;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::Sender;

pub struct RegionReplacementDetector {
Expand All @@ -36,6 +38,25 @@ impl RegionReplacementDetector {
) -> Self {
RegionReplacementDetector { datastore, saga_request }
}

async fn send_start_request(
&self,
serialized_authn: authn::saga::Serialized,
request: RegionReplacement,
) -> Result<(), SendError<sagas::SagaRequest>> {
let saga_request = sagas::SagaRequest::RegionReplacementStart {
params: sagas::region_replacement_start::Params {
serialized_authn,
request,
allocation_strategy:
RegionAllocationStrategy::RandomWithDistinctSleds {
seed: None,
},
},
};

self.saga_request.send(saga_request).await
}
}

impl BackgroundTask for RegionReplacementDetector {
Expand All @@ -51,45 +72,65 @@ impl BackgroundTask for RegionReplacementDetector {
let mut err = 0;

// Find regions on expunged physical disks
let regions_to_be_replaced =
match self.datastore.find_regions_on_expunged_physical_disks(
opctx
).await {
Ok(regions) => regions,
let regions_to_be_replaced = match self
.datastore
.find_regions_on_expunged_physical_disks(opctx)
.await
{
Ok(regions) => regions,

Err(e) => {
error!(&log, "find_regions_on_expunged_physical_disks failed: {e}");
err += 1;
Err(e) => {
error!(
&log,
"find_regions_on_expunged_physical_disks failed: \
{e}"
);
err += 1;

return json!({
"region_replacement_started_ok": ok,
"region_replacement_started_err": err,
});
}
};
return json!({
"region_replacement_started_ok": ok,
"region_replacement_started_err": err,
});
}
};

// Then create replacement requests for those if one doesn't exist
// yet.
for region in regions_to_be_replaced {
let maybe_request =
match self.datastore.lookup_region_replacement_request_by_old_region_id(
let maybe_request = match self
.datastore
.lookup_region_replacement_request_by_old_region_id(
opctx,
TypedUuid::from_untyped_uuid(region.id()),
).await {
Ok(v) => v,
)
.await
{
Ok(v) => v,

Err(e) => {
error!(&log, "error looking for existing region replacement requests for {}: {e}", region.id());
continue;
}
};
Err(e) => {
error!(
&log,
"error looking for existing region \
replacement requests for {}: {e}",
region.id(),
);
continue;
}
};

if maybe_request.is_none() {
match self.datastore.create_region_replacement_request_for_region(opctx, &region).await {
match self
.datastore
.create_region_replacement_request_for_region(
opctx, &region,
)
.await
{
Ok(request_id) => {
info!(
&log,
"added region replacement request {request_id} for {} volume {}",
"added region replacement request \
{request_id} for {} volume {}",
region.id(),
region.volume_id(),
);
Expand All @@ -98,7 +139,8 @@ impl BackgroundTask for RegionReplacementDetector {
Err(e) => {
error!(
&log,
"error adding region replacement request for region {} volume id {}: {e}",
"error adding region replacement request for \
region {} volume id {}: {e}",
region.id(),
region.volume_id(),
);
Expand All @@ -108,33 +150,41 @@ impl BackgroundTask for RegionReplacementDetector {
}
}

// Next, for each region replacement request in state "Requested", run the start saga.
match self.datastore.get_requested_region_replacements(opctx).await {
// Next, for each region replacement request in state "Requested",
// run the start saga.
match self.datastore.get_requested_region_replacements(opctx).await
{
Ok(requests) => {
for request in requests {
let result = self.saga_request.send(sagas::SagaRequest::RegionReplacementStart {
params: sagas::region_replacement_start::Params {
serialized_authn: authn::saga::Serialized::for_opctx(opctx),
let result = self
.send_start_request(
authn::saga::Serialized::for_opctx(opctx),
request,
allocation_strategy: RegionAllocationStrategy::RandomWithDistinctSleds { seed: None },
},
}).await;
)
.await;

match result {
Ok(()) => {
ok += 1;
}

Err(e) => {
error!(&log, "sending region replacement request failed: {e}");
error!(
&log,
"sending region replacement start request \
failed: {e}",
);
err += 1;
}
};
}
}

Err(e) => {
error!(&log, "query for region replacement requests failed: {e}");
error!(
&log,
"query for region replacement requests failed: {e}",
);
}
}

Expand Down

0 comments on commit 5f5d732

Please sign in to comment.