Skip to content

Commit

Permalink
abandoned attempt to do this with limits
Browse files Browse the repository at this point in the history
  • Loading branch information
davepacheco committed Aug 2, 2024
1 parent 17c01c4 commit b309809
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 4 deletions.
117 changes: 117 additions & 0 deletions nexus/db-queries/src/db/datastore/saga.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,34 @@ use crate::db::model::Generation;
use crate::db::pagination::paginated;
use crate::db::pagination::paginated_multicolumn;
use crate::db::pagination::Paginator;
use crate::db::pool::DbConnection;
use crate::db::update_and_check::UpdateAndCheck;
use crate::db::update_and_check::UpdateStatus;
use async_bb8_diesel::AsyncRunQueryDsl;
use diesel::prelude::*;
use nexus_auth::authz;
use nexus_auth::context::OpContext;
use omicron_common::api::external::Error;
use omicron_common::api::external::LookupType;
use omicron_common::api::external::ResourceType;
use std::ops::Add;

/// Reports the result of `sagas_reassign_sec_batched()`.
///
/// Callers need to know two things:
///
/// 1. Whether any sagas may have been re-assigned
/// (because they'll want to kick off saga recovery for them)
/// 2. Whether any errors were encountered
#[derive(Debug)]
pub(crate) enum Reassigned {
/// We successfully re-assigned all sagas that needed it
All { count: u32 },
/// We encountered an error and cannot tell how many sagas were re-assigned.
/// It was at least this many. (The count can be zero, but it's still
/// possible that some were re-assigned.)
AtLeast { count: u32, error: Error },
}

impl DataStore {
pub async fn saga_create(
Expand Down Expand Up @@ -207,6 +227,93 @@ impl DataStore {

Ok(events)
}

pub async fn sagas_reassign_sec_all_batched(
&self,
opctx: &OpContext,
nexus_zone_ids: &[db::saga_types::SecId],
new_sec_id: db::saga_types::SecId,
) -> Reassigned {
let now = chrono::Utc::now();
let conn = match self.pool_connection_authorized(opctx).await {
Ok(c) => c,
Err(error) => return Reassigned::AtLeast { count: 0, error },
};

let mut count = 0;
let limit: u32 = SQL_BATCH_SIZE.get();
loop {
debug!(&opctx.log, "sagas_reassign_sec_batched";
"count_so_far" => count);
match self
.sagas_reassign_sec(
&conn,
opctx,
nexus_zone_ids,
new_sec_id,
limit,
now,
)
.await
{
Ok(c) => {
count += c;
if c < limit {
break;
}
}
Err(error) => {
return Reassigned::AtLeast { count, error };
}
}
}

Reassigned::All { count }
}

// XXX-dap TODO-doc
async fn sagas_reassign_sec(
&self,
conn: &async_bb8_diesel::Connection<DbConnection>,
opctx: &OpContext,
nexus_zone_ids: &[db::saga_types::SecId],
new_sec_id: db::saga_types::SecId,
limit: u32,
time: chrono::DateTime<chrono::Utc>,
) -> Result<u32, Error> {
use db::schema::saga::dsl;

opctx.authorize(authz::Action::Modify, &authz::FLEET).await?;

diesel::update(
dsl::saga
.filter(dsl::current_sec.is_not_null())
.filter(dsl::current_sec.eq_any(
nexus_zone_ids.into_iter().cloned().collect::<Vec<_>>(),
))
.filter(dsl::saga_state.ne(db::saga_types::SagaCachedState(
steno::SagaCachedState::Done,
)))
.limit(i64::from(limit)),
)
.set((
dsl::current_sec.eq(Some(new_sec_id)),
dsl::adopt_generation.eq(dsl::adopt_generation.add(1)),
dsl::adopt_time.eq(time),
))
.execute_async(conn)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
.and_then(|c_usize| {
u32::try_from(c_usize).map_err(|_| {
Error::internal_error(&format!(
"database reported unexpected count of \
records updated (did not fit into u32): {}",
c_usize,
))
})
})
}
}

#[cfg(test)]
Expand Down Expand Up @@ -510,3 +617,13 @@ mod test {
}
}
}

// XXX-dap TODO-coverage want test that inserts:
// - some sagas assigned to this SEC that are done
// - more than BATCH_SIZE sagas assigned to this SEC that are unwinding
// - more than BATCH_SIZE sagas assigned to this SEC that are running
// - some sagas assigned to another SEC that are running
//
// then two tests:
// 1. run the one-batch thing and make sure we only got at most the batch size
// 2. run the whole thing and make sure that we got exactly the right ones
38 changes: 34 additions & 4 deletions nexus/reconfigurator/execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use slog::info;
use slog_error_chain::InlineErrorChain;
use std::collections::BTreeMap;
use std::net::SocketAddrV6;
use uuid::Uuid;

mod cockroachdb;
mod datasets;
Expand All @@ -32,6 +33,7 @@ mod external_networking;
mod omicron_physical_disks;
mod omicron_zones;
mod overridables;
mod sagas;
mod sled_state;

pub use dns::blueprint_external_dns_config;
Expand Down Expand Up @@ -80,6 +82,7 @@ pub async fn realize_blueprint<S>(
resolver: &Resolver,
blueprint: &Blueprint,
nexus_label: S,
nexus_id: Uuid, // XXX-dap type, and replace nexus_label too?
) -> Result<(), Vec<anyhow::Error>>
where
String: From<S>,
Expand All @@ -90,6 +93,7 @@ where
resolver,
blueprint,
nexus_label,
nexus_id,
&Default::default(),
)
.await
Expand All @@ -101,6 +105,7 @@ pub async fn realize_blueprint_with_overrides<S>(
resolver: &Resolver,
blueprint: &Blueprint,
nexus_label: S,
nexus_id: Uuid, // XXX-dap type, and replace nexus_label too?
overrides: &Overridables,
) -> Result<(), Vec<anyhow::Error>>
where
Expand Down Expand Up @@ -233,14 +238,39 @@ where
omicron_physical_disks::decommission_expunged_disks(&opctx, datastore)
.await?;

// From this point on, we'll assume that any errors that we encounter do
// *not* require stopping execution. We'll just accumulate them and return
// them all at the end.
//
// TODO We should probably do this with more of the errors above, too.
let mut errors = Vec::new();

// For any expunged Nexus zones, re-assign in-progress sagas to some other
// Nexus. If this fails for some reason, it doesn't affect anything else.
let sec_id = nexus_db_model::SecId(nexus_id);
if let Err(error) = sagas::reassign_sagas_from_expunged(
&opctx, datastore, blueprint, sec_id,
)
.await
.context("failed to re-assign sagas")
{
errors.push(error);
}

// This is likely to error if any cluster upgrades are in progress (which
// can take some time), so it should remain at the end so that other parts
// of the blueprint can progress normally.
cockroachdb::ensure_settings(&opctx, datastore, blueprint)
.await
.map_err(|err| vec![err])?;
if let Err(error) =
cockroachdb::ensure_settings(&opctx, datastore, blueprint).await
{
errors.push(error);
}

Ok(())
if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
}

#[cfg(test)]
Expand Down
59 changes: 59 additions & 0 deletions nexus/reconfigurator/execution/src/sagas.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! Re-assign sagas from expunged Nexus zones
use nexus_db_model::SecId;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db::DataStore;
use nexus_types::deployment::Blueprint;
use nexus_types::deployment::BlueprintZoneFilter;
use omicron_common::api::external::Error;
use slog::info;
use std::num::NonZeroU32;

/// Reports what happened when we tried to re-assign sagas
///
/// Callers need to know two things:
///
/// 1. Whether any sagas may have been re-assigned
/// (because they'll want to kick off saga recovery for them)
/// 2. Whether any errors were encountered
#[derive(Debug)]
pub(crate) enum Reassigned {
/// We successfully re-assigned all sagas that needed it
All { count: u32 },
/// We encountered an error and cannot tell how many sagas were re-assigned.
/// It was at least this many. (The count can be zero, but it's still
/// possible that some were re-assigned.)
AtLeast { count: u32, error: Error },
}

/// For each expunged Nexus zone, re-assign sagas owned by that Nexus to the
/// specified nexus (`nexus_id`).
// TODO-dap activate recovery background task
pub(crate) async fn reassign_sagas_from_expunged(
opctx: &OpContext,
datastore: &DataStore,
blueprint: &Blueprint,
nexus_id: SecId,
) -> Reassigned {
let log = &opctx.log;

// Identify any Nexus zones that have been expunged and need to have sagas
// re-assigned.
let nexus_zones_ids: Vec<_> = blueprint
.all_omicron_zones(BlueprintZoneFilter::Expunged)
.filter_map(|(_, z)| z.zone_type.is_nexus().then(|| z.id))
.collect();

debug!(log, "re-assign sagas: found Nexus instances";
"nexus_zones_ids" => ?nexus_zones_ids);


let result = datastore.sagas_reassign_sec_batched(opctx, &nexus_zone_ids, nexus_id);
info!(log, "re-assign sagas";
"nexus_zones_ids" => ?nexus_zones_ids);

}
2 changes: 2 additions & 0 deletions nexus/src/app/background/tasks/blueprint_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,14 @@ impl BlueprintExecutor {
});
}

let nexus_id = todo!(); // XXX-dap
let result = nexus_reconfigurator_execution::realize_blueprint(
opctx,
&self.datastore,
&self.resolver,
blueprint,
&self.nexus_label,
nexus_id,
)
.await;

Expand Down

0 comments on commit b309809

Please sign in to comment.