Skip to content

Commit

Permalink
WIP: lower parts
Browse files Browse the repository at this point in the history
  • Loading branch information
davepacheco committed Aug 2, 2024
1 parent b309809 commit 5bc8cba
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 107 deletions.
94 changes: 13 additions & 81 deletions nexus/db-queries/src/db/datastore/saga.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::db::model::Generation;
use crate::db::pagination::paginated;
use crate::db::pagination::paginated_multicolumn;
use crate::db::pagination::Paginator;
use crate::db::pool::DbConnection;
use crate::db::update_and_check::UpdateAndCheck;
use crate::db::update_and_check::UpdateStatus;
use async_bb8_diesel::AsyncRunQueryDsl;
Expand All @@ -25,23 +24,6 @@ use omicron_common::api::external::LookupType;
use omicron_common::api::external::ResourceType;
use std::ops::Add;

/// Reports the result of `sagas_reassign_sec_batched()`.
///
/// Callers need to know two things:
///
/// 1. Whether any sagas may have been re-assigned
/// (because they'll want to kick off saga recovery for them)
/// 2. Whether any errors were encountered
#[derive(Debug)]
pub(crate) enum Reassigned {
/// We successfully re-assigned all sagas that needed it
All { count: u32 },
/// We encountered an error and cannot tell how many sagas were re-assigned.
/// It was at least this many. (The count can be zero, but it's still
/// possible that some were re-assigned.)
AtLeast { count: u32, error: Error },
}

impl DataStore {
pub async fn saga_create(
&self,
Expand Down Expand Up @@ -228,63 +210,23 @@ impl DataStore {
Ok(events)
}

pub async fn sagas_reassign_sec_all_batched(
&self,
opctx: &OpContext,
nexus_zone_ids: &[db::saga_types::SecId],
new_sec_id: db::saga_types::SecId,
) -> Reassigned {
let now = chrono::Utc::now();
let conn = match self.pool_connection_authorized(opctx).await {
Ok(c) => c,
Err(error) => return Reassigned::AtLeast { count: 0, error },
};

let mut count = 0;
let limit: u32 = SQL_BATCH_SIZE.get();
loop {
debug!(&opctx.log, "sagas_reassign_sec_batched";
"count_so_far" => count);
match self
.sagas_reassign_sec(
&conn,
opctx,
nexus_zone_ids,
new_sec_id,
limit,
now,
)
.await
{
Ok(c) => {
count += c;
if c < limit {
break;
}
}
Err(error) => {
return Reassigned::AtLeast { count, error };
}
}
}

Reassigned::All { count }
}

// XXX-dap TODO-doc
async fn sagas_reassign_sec(
pub async fn sagas_reassign_sec(
&self,
conn: &async_bb8_diesel::Connection<DbConnection>,
opctx: &OpContext,
nexus_zone_ids: &[db::saga_types::SecId],
new_sec_id: db::saga_types::SecId,
limit: u32,
time: chrono::DateTime<chrono::Utc>,
) -> Result<u32, Error> {
use db::schema::saga::dsl;

) -> Result<usize, Error> {
opctx.authorize(authz::Action::Modify, &authz::FLEET).await?;

let now = chrono::Utc::now();
let conn = self.pool_connection_authorized(opctx).await?;

// It would be more robust to do this in batches. However, Diesel does
// not appear to support the UPDATE ... LIMIT syntax using the normal
// builder. In practice, it's extremely unlikely we'd have so many
// in-progress sagas that this would be a problem.
use db::schema::saga::dsl;
diesel::update(
dsl::saga
.filter(dsl::current_sec.is_not_null())
Expand All @@ -293,26 +235,16 @@ impl DataStore {
))
.filter(dsl::saga_state.ne(db::saga_types::SagaCachedState(
steno::SagaCachedState::Done,
)))
.limit(i64::from(limit)),
))),
)
.set((
dsl::current_sec.eq(Some(new_sec_id)),
dsl::adopt_generation.eq(dsl::adopt_generation.add(1)),
dsl::adopt_time.eq(time),
dsl::adopt_time.eq(now),
))
.execute_async(conn)
.execute_async(&*conn)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
.and_then(|c_usize| {
u32::try_from(c_usize).map_err(|_| {
Error::internal_error(&format!(
"database reported unexpected count of \
records updated (did not fit into u32): {}",
c_usize,
))
})
})
}
}

Expand Down
61 changes: 35 additions & 26 deletions nexus/reconfigurator/execution/src/sagas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,8 @@ use nexus_db_queries::db::DataStore;
use nexus_types::deployment::Blueprint;
use nexus_types::deployment::BlueprintZoneFilter;
use omicron_common::api::external::Error;
use slog::info;
use std::num::NonZeroU32;

/// Reports what happened when we tried to re-assign sagas
///
/// Callers need to know two things:
///
/// 1. Whether any sagas may have been re-assigned
/// (because they'll want to kick off saga recovery for them)
/// 2. Whether any errors were encountered
#[derive(Debug)]
pub(crate) enum Reassigned {
/// We successfully re-assigned all sagas that needed it
All { count: u32 },
/// We encountered an error and cannot tell how many sagas were re-assigned.
/// It was at least this many. (The count can be zero, but it's still
/// possible that some were re-assigned.)
AtLeast { count: u32, error: Error },
}
use omicron_uuid_kinds::GenericUuid;
use slog::{debug, info, warn};

/// For each expunged Nexus zone, re-assign sagas owned by that Nexus to the
/// specified nexus (`nexus_id`).
Expand All @@ -38,22 +21,48 @@ pub(crate) async fn reassign_sagas_from_expunged(
datastore: &DataStore,
blueprint: &Blueprint,
nexus_id: SecId,
) -> Reassigned {
) -> Result<(), Error> {
let log = &opctx.log;

// Identify any Nexus zones that have been expunged and need to have sagas
// re-assigned.
let nexus_zones_ids: Vec<_> = blueprint
let nexus_zone_ids: Vec<_> = blueprint
.all_omicron_zones(BlueprintZoneFilter::Expunged)
.filter_map(|(_, z)| z.zone_type.is_nexus().then(|| z.id))
.filter_map(|(_, z)| {
z.zone_type
.is_nexus()
.then(|| nexus_db_model::SecId(z.id.into_untyped_uuid()))
})
.collect();

debug!(log, "re-assign sagas: found Nexus instances";
"nexus_zones_ids" => ?nexus_zones_ids);
"nexus_zone_ids" => ?nexus_zone_ids);

let do_recovery = match datastore
.sagas_reassign_sec(opctx, &nexus_zone_ids, nexus_id)
.await
{
Ok(count) => {
info!(log, "re-assigned sagas";
"nexus_zone_ids" => ?nexus_zone_ids,
"count" => count,
);

// Activate saga recovery if we re-assigned any sagas.
count != 0
}
Err(error) => {
warn!(log, "failed to re-assign sagas";
"nexus_zone_ids" => ?nexus_zone_ids,
error,
);

let result = datastore.sagas_reassign_sec_batched(opctx, &nexus_zone_ids, nexus_id);
info!(log, "re-assign sagas";
"nexus_zones_ids" => ?nexus_zones_ids);
// Depending on the kind of error we got, it's possible we did
// actually re-assign some sagas. Activate saga recovery in
// case we did.
true
}
};

todo!();
}

0 comments on commit 5bc8cba

Please sign in to comment.