Skip to content

Commit

Permalink
WIP: lower parts
Browse files Browse the repository at this point in the history
  • Loading branch information
davepacheco committed Aug 2, 2024
1 parent b309809 commit 2692ba8
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 150 deletions.
94 changes: 13 additions & 81 deletions nexus/db-queries/src/db/datastore/saga.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::db::model::Generation;
use crate::db::pagination::paginated;
use crate::db::pagination::paginated_multicolumn;
use crate::db::pagination::Paginator;
use crate::db::pool::DbConnection;
use crate::db::update_and_check::UpdateAndCheck;
use crate::db::update_and_check::UpdateStatus;
use async_bb8_diesel::AsyncRunQueryDsl;
Expand All @@ -25,23 +24,6 @@ use omicron_common::api::external::LookupType;
use omicron_common::api::external::ResourceType;
use std::ops::Add;

/// Reports the result of `sagas_reassign_sec_batched()`.
///
/// Callers need to know two things:
///
/// 1. Whether any sagas may have been re-assigned
/// (because they'll want to kick off saga recovery for them)
/// 2. Whether any errors were encountered
#[derive(Debug)]
pub(crate) enum Reassigned {
/// We successfully re-assigned all sagas that needed it
All { count: u32 },
/// We encountered an error and cannot tell how many sagas were re-assigned.
/// It was at least this many. (The count can be zero, but it's still
/// possible that some were re-assigned.)
AtLeast { count: u32, error: Error },
}

impl DataStore {
pub async fn saga_create(
&self,
Expand Down Expand Up @@ -228,63 +210,23 @@ impl DataStore {
Ok(events)
}

pub async fn sagas_reassign_sec_all_batched(
&self,
opctx: &OpContext,
nexus_zone_ids: &[db::saga_types::SecId],
new_sec_id: db::saga_types::SecId,
) -> Reassigned {
let now = chrono::Utc::now();
let conn = match self.pool_connection_authorized(opctx).await {
Ok(c) => c,
Err(error) => return Reassigned::AtLeast { count: 0, error },
};

let mut count = 0;
let limit: u32 = SQL_BATCH_SIZE.get();
loop {
debug!(&opctx.log, "sagas_reassign_sec_batched";
"count_so_far" => count);
match self
.sagas_reassign_sec(
&conn,
opctx,
nexus_zone_ids,
new_sec_id,
limit,
now,
)
.await
{
Ok(c) => {
count += c;
if c < limit {
break;
}
}
Err(error) => {
return Reassigned::AtLeast { count, error };
}
}
}

Reassigned::All { count }
}

// XXX-dap TODO-doc
async fn sagas_reassign_sec(
pub async fn sagas_reassign_sec(
&self,
conn: &async_bb8_diesel::Connection<DbConnection>,
opctx: &OpContext,
nexus_zone_ids: &[db::saga_types::SecId],
new_sec_id: db::saga_types::SecId,
limit: u32,
time: chrono::DateTime<chrono::Utc>,
) -> Result<u32, Error> {
use db::schema::saga::dsl;

) -> Result<usize, Error> {
opctx.authorize(authz::Action::Modify, &authz::FLEET).await?;

let now = chrono::Utc::now();
let conn = self.pool_connection_authorized(opctx).await?;

// It would be more robust to do this in batches. However, Diesel does
// not appear to support the UPDATE ... LIMIT syntax using the normal
// builder. In practice, it's extremely unlikely we'd have so many
// in-progress sagas that this would be a problem.
use db::schema::saga::dsl;
diesel::update(
dsl::saga
.filter(dsl::current_sec.is_not_null())
Expand All @@ -293,26 +235,16 @@ impl DataStore {
))
.filter(dsl::saga_state.ne(db::saga_types::SagaCachedState(
steno::SagaCachedState::Done,
)))
.limit(i64::from(limit)),
))),
)
.set((
dsl::current_sec.eq(Some(new_sec_id)),
dsl::adopt_generation.eq(dsl::adopt_generation.add(1)),
dsl::adopt_time.eq(time),
dsl::adopt_time.eq(now),
))
.execute_async(conn)
.execute_async(&*conn)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
.and_then(|c_usize| {
u32::try_from(c_usize).map_err(|_| {
Error::internal_error(&format!(
"database reported unexpected count of \
records updated (did not fit into u32): {}",
c_usize,
))
})
})
}
}

Expand Down
3 changes: 2 additions & 1 deletion nexus/reconfigurator/execution/src/cockroachdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod test {
use nexus_test_utils_macros::nexus_test;
use nexus_types::deployment::CockroachDbClusterVersion;
use std::sync::Arc;
use uuid::Uuid;

type ControlPlaneTestContext =
nexus_test_utils::ControlPlaneTestContext<omicron_nexus::Server>;
Expand Down Expand Up @@ -101,7 +102,7 @@ mod test {
datastore,
resolver,
&blueprint,
"test-suite",
Uuid::new_v4(),
&overrides,
)
.await
Expand Down
10 changes: 5 additions & 5 deletions nexus/reconfigurator/execution/src/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1194,7 +1194,7 @@ mod test {
datastore,
resolver,
&blueprint,
"test-suite",
Uuid::new_v4(),
&overrides,
)
.await
Expand Down Expand Up @@ -1332,7 +1332,7 @@ mod test {
datastore,
resolver,
&blueprint2,
"test-suite",
Uuid::new_v4(),
&overrides,
)
.await
Expand Down Expand Up @@ -1406,7 +1406,7 @@ mod test {
datastore,
resolver,
&blueprint2,
"test-suite",
Uuid::new_v4(),
&overrides,
)
.await
Expand Down Expand Up @@ -1442,7 +1442,7 @@ mod test {
datastore,
resolver,
&blueprint2,
"test-suite",
Uuid::new_v4(),
&overrides,
)
.await
Expand Down Expand Up @@ -1536,7 +1536,7 @@ mod test {
datastore,
resolver,
&blueprint,
"test-suite",
Uuid::new_v4(),
&overrides,
)
.await
Expand Down
39 changes: 17 additions & 22 deletions nexus/reconfigurator/execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,41 +76,32 @@ impl From<nexus_db_model::Sled> for Sled {
///
/// The assumption is that callers are running this periodically or in a loop to
/// deal with transient errors or changes in the underlying system state.
pub async fn realize_blueprint<S>(
pub async fn realize_blueprint(
opctx: &OpContext,
datastore: &DataStore,
resolver: &Resolver,
blueprint: &Blueprint,
nexus_label: S,
nexus_id: Uuid, // XXX-dap type, and replace nexus_label too?
) -> Result<(), Vec<anyhow::Error>>
where
String: From<S>,
{
nexus_id: Uuid,
) -> Result<bool, Vec<anyhow::Error>> {
realize_blueprint_with_overrides(
opctx,
datastore,
resolver,
blueprint,
nexus_label,
nexus_id,
&Default::default(),
)
.await
}

pub async fn realize_blueprint_with_overrides<S>(
pub async fn realize_blueprint_with_overrides(
opctx: &OpContext,
datastore: &DataStore,
resolver: &Resolver,
blueprint: &Blueprint,
nexus_label: S,
nexus_id: Uuid, // XXX-dap type, and replace nexus_label too?
nexus_id: Uuid,
overrides: &Overridables,
) -> Result<(), Vec<anyhow::Error>>
where
String: From<S>,
{
) -> Result<bool, Vec<anyhow::Error>> {
let opctx = opctx.child(BTreeMap::from([(
"comment".to_string(),
blueprint.comment.clone(),
Expand Down Expand Up @@ -205,7 +196,7 @@ where
dns::deploy_dns(
&opctx,
datastore,
String::from(nexus_label),
nexus_id.to_string(),
blueprint,
&sleds_by_id,
overrides,
Expand Down Expand Up @@ -248,14 +239,18 @@ where
// For any expunged Nexus zones, re-assign in-progress sagas to some other
// Nexus. If this fails for some reason, it doesn't affect anything else.
let sec_id = nexus_db_model::SecId(nexus_id);
if let Err(error) = sagas::reassign_sagas_from_expunged(
let reassigned = sagas::reassign_sagas_from_expunged(
&opctx, datastore, blueprint, sec_id,
)
.await
.context("failed to re-assign sagas")
{
errors.push(error);
}
.context("failed to re-assign sagas");
let needs_saga_recovery = match reassigned {
Ok(needs_recovery) => needs_recovery,
Err(error) => {
errors.push(error);
false
}
};

// This is likely to error if any cluster upgrades are in progress (which
// can take some time), so it should remain at the end so that other parts
Expand All @@ -267,7 +262,7 @@ where
}

if errors.is_empty() {
Ok(())
Ok(needs_saga_recovery)
} else {
Err(errors)
}
Expand Down
62 changes: 32 additions & 30 deletions nexus/reconfigurator/execution/src/sagas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,50 +10,52 @@ use nexus_db_queries::db::DataStore;
use nexus_types::deployment::Blueprint;
use nexus_types::deployment::BlueprintZoneFilter;
use omicron_common::api::external::Error;
use slog::info;
use std::num::NonZeroU32;

/// Reports what happened when we tried to re-assign sagas
///
/// Callers need to know two things:
///
/// 1. Whether any sagas may have been re-assigned
/// (because they'll want to kick off saga recovery for them)
/// 2. Whether any errors were encountered
#[derive(Debug)]
pub(crate) enum Reassigned {
/// We successfully re-assigned all sagas that needed it
All { count: u32 },
/// We encountered an error and cannot tell how many sagas were re-assigned.
/// It was at least this many. (The count can be zero, but it's still
/// possible that some were re-assigned.)
AtLeast { count: u32, error: Error },
}
use omicron_uuid_kinds::GenericUuid;
use slog::{debug, info, warn};

/// For each expunged Nexus zone, re-assign sagas owned by that Nexus to the
/// specified nexus (`nexus_id`).
// TODO-dap activate recovery background task
pub(crate) async fn reassign_sagas_from_expunged(
opctx: &OpContext,
datastore: &DataStore,
blueprint: &Blueprint,
nexus_id: SecId,
) -> Reassigned {
) -> Result<bool, Error> {
let log = &opctx.log;

// Identify any Nexus zones that have been expunged and need to have sagas
// re-assigned.
let nexus_zones_ids: Vec<_> = blueprint
let nexus_zone_ids: Vec<_> = blueprint
.all_omicron_zones(BlueprintZoneFilter::Expunged)
.filter_map(|(_, z)| z.zone_type.is_nexus().then(|| z.id))
.filter_map(|(_, z)| {
z.zone_type
.is_nexus()
.then(|| nexus_db_model::SecId(z.id.into_untyped_uuid()))
})
.collect();

debug!(log, "re-assign sagas: found Nexus instances";
"nexus_zones_ids" => ?nexus_zones_ids);


let result = datastore.sagas_reassign_sec_batched(opctx, &nexus_zone_ids, nexus_id);
info!(log, "re-assign sagas";
"nexus_zones_ids" => ?nexus_zones_ids);

"nexus_zone_ids" => ?nexus_zone_ids);

let result =
datastore.sagas_reassign_sec(opctx, &nexus_zone_ids, nexus_id).await;

match result {
Ok(count) => {
info!(log, "re-assigned sagas";
"nexus_zone_ids" => ?nexus_zone_ids,
"count" => count,
);

Ok(count != 0)
}
Err(error) => {
warn!(log, "failed to re-assign sagas";
"nexus_zone_ids" => ?nexus_zone_ids,
&error,
);

Err(error)
}
}
}
3 changes: 2 additions & 1 deletion nexus/src/app/background/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,8 @@ impl BackgroundTasksInitializer {
datastore.clone(),
resolver.clone(),
rx_blueprint.clone(),
nexus_id.to_string(),
nexus_id,
task_saga_recovery.clone(),
);
let rx_blueprint_exec = blueprint_executor.watcher();
driver.register(TaskDefinition {
Expand Down
Loading

0 comments on commit 2692ba8

Please sign in to comment.