-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
re-assign sagas from expunged Nexus instances #6215
Changes from all commits
b309809
2692ba8
cc7cdd2
9a005e0
90d8409
00649e6
6a31f2a
befc6c6
c14ad8c
67326ff
87e5718
a5210ca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,18 +9,19 @@ use super::SQL_BATCH_SIZE; | |
use crate::db; | ||
use crate::db::error::public_error_from_diesel; | ||
use crate::db::error::ErrorHandler; | ||
use crate::db::model::Generation; | ||
use crate::db::pagination::paginated; | ||
use crate::db::pagination::paginated_multicolumn; | ||
use crate::db::pagination::Paginator; | ||
use crate::db::update_and_check::UpdateAndCheck; | ||
use crate::db::update_and_check::UpdateStatus; | ||
use async_bb8_diesel::AsyncRunQueryDsl; | ||
use diesel::prelude::*; | ||
use nexus_auth::authz; | ||
use nexus_auth::context::OpContext; | ||
use omicron_common::api::external::Error; | ||
use omicron_common::api::external::LookupType; | ||
use omicron_common::api::external::ResourceType; | ||
use std::ops::Add; | ||
|
||
impl DataStore { | ||
pub async fn saga_create( | ||
|
@@ -80,29 +81,22 @@ impl DataStore { | |
/// now, we're implementing saga adoption only in cases where the original | ||
/// SEC/Nexus has been expunged.) | ||
/// | ||
/// However, in the future, it may be possible for multiple SECs to try and | ||
/// update the same saga, and overwrite each other's state. For example, | ||
/// one SEC might try and update the state to Running while the other one | ||
/// updates it to Done. That case would have to be carefully considered and | ||
/// tested here, probably using the (currently unused) | ||
/// `current_adopt_generation` field to enable optimistic concurrency. | ||
/// | ||
/// To reiterate, we are *not* considering the case where several SECs try | ||
/// to update the same saga. That will be a future enhancement. | ||
/// It's conceivable that multiple SECs do try to udpate the same saga | ||
/// concurrently. That would be a bug. This is noticed and prevented by | ||
/// making this query conditional on current_sec and failing with a conflict | ||
/// if the current SEC has changed. | ||
pub async fn saga_update_state( | ||
&self, | ||
saga_id: steno::SagaId, | ||
new_state: steno::SagaCachedState, | ||
current_sec: db::saga_types::SecId, | ||
current_adopt_generation: Generation, | ||
) -> Result<(), Error> { | ||
use db::schema::saga::dsl; | ||
|
||
let saga_id: db::saga_types::SagaId = saga_id.into(); | ||
let result = diesel::update(dsl::saga) | ||
.filter(dsl::id.eq(saga_id)) | ||
.filter(dsl::current_sec.eq(current_sec)) | ||
.filter(dsl::adopt_generation.eq(current_adopt_generation)) | ||
.set(dsl::saga_state.eq(db::saga_types::SagaCachedState(new_state))) | ||
.check_if_exists::<db::saga_types::Saga>(saga_id) | ||
.execute_and_check(&*self.pool_connection_unauthorized().await?) | ||
|
@@ -119,20 +113,19 @@ impl DataStore { | |
|
||
match result.status { | ||
UpdateStatus::Updated => Ok(()), | ||
UpdateStatus::NotUpdatedButExists => Err(Error::invalid_request( | ||
format!( | ||
"failed to update saga {:?} with state {:?}: preconditions not met: \ | ||
expected current_sec = {:?}, adopt_generation = {:?}, \ | ||
but found current_sec = {:?}, adopt_generation = {:?}, state = {:?}", | ||
UpdateStatus::NotUpdatedButExists => { | ||
Err(Error::invalid_request(format!( | ||
"failed to update saga {:?} with state {:?}:\ | ||
preconditions not met: \ | ||
expected current_sec = {:?}, \ | ||
but found current_sec = {:?}, state = {:?}", | ||
saga_id, | ||
new_state, | ||
current_sec, | ||
current_adopt_generation, | ||
result.found.current_sec, | ||
result.found.adopt_generation, | ||
result.found.saga_state, | ||
) | ||
)), | ||
))) | ||
} | ||
} | ||
} | ||
|
||
|
@@ -207,16 +200,75 @@ impl DataStore { | |
|
||
Ok(events) | ||
} | ||
|
||
/// Updates all sagas that are currently assigned to any of the SEC ids in | ||
/// `sec_ids`, assigning them to `new_sec_id` instead. | ||
/// | ||
/// Generally, an SEC id corresponds to a Nexus id. This change causes the | ||
/// Nexus instance `new_sec_id` to discover these sagas and resume executing | ||
/// them the next time it performs saga recovery (which is normally on | ||
/// startup and periodically). Generally, `new_sec_id` is the _current_ | ||
/// Nexus instance and the caller should activate the saga recovery | ||
/// background task after calling this function to immediately resume the | ||
/// newly-assigned sagas. | ||
/// | ||
/// **Warning:** This operation is only safe if the other SECs `sec_ids` are | ||
/// not currently running. If those SECs are still running, then two (or | ||
/// more) SECs may wind up running the same saga concurrently. This would | ||
/// likely violate implicit assumptions made by various saga actions, | ||
/// leading to hard-to-debug errors and state corruption. | ||
pub async fn sagas_reassign_sec( | ||
&self, | ||
opctx: &OpContext, | ||
sec_ids: &[db::saga_types::SecId], | ||
new_sec_id: db::saga_types::SecId, | ||
) -> Result<usize, Error> { | ||
opctx.authorize(authz::Action::Modify, &authz::FLEET).await?; | ||
|
||
let now = chrono::Utc::now(); | ||
let conn = self.pool_connection_authorized(opctx).await?; | ||
|
||
// It would be more robust to do this in batches. However, Diesel does | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking at diesel side-eyed again 🙄 |
||
// not appear to support the UPDATE ... LIMIT syntax using the normal | ||
// builder. In practice, it's extremely unlikely we'd have so many | ||
// in-progress sagas that this would be a problem. | ||
use db::schema::saga::dsl; | ||
diesel::update( | ||
dsl::saga | ||
.filter(dsl::current_sec.is_not_null()) | ||
.filter( | ||
dsl::current_sec.eq_any( | ||
sec_ids.into_iter().cloned().collect::<Vec<_>>(), | ||
), | ||
) | ||
.filter(dsl::saga_state.ne(db::saga_types::SagaCachedState( | ||
steno::SagaCachedState::Done, | ||
))), | ||
) | ||
.set(( | ||
dsl::current_sec.eq(Some(new_sec_id)), | ||
dsl::adopt_generation.eq(dsl::adopt_generation.add(1)), | ||
dsl::adopt_time.eq(now), | ||
)) | ||
.execute_async(&*conn) | ||
.await | ||
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod test { | ||
use super::*; | ||
use crate::db::datastore::test_utils::datastore_test; | ||
use async_bb8_diesel::AsyncConnection; | ||
use async_bb8_diesel::AsyncSimpleConnection; | ||
use db::queries::ALLOW_FULL_TABLE_SCAN_SQL; | ||
use nexus_db_model::{SagaNodeEvent, SecId}; | ||
use nexus_test_utils::db::test_setup_database; | ||
use omicron_common::api::external::Generation; | ||
use omicron_test_utils::dev; | ||
use rand::seq::SliceRandom; | ||
use std::collections::BTreeSet; | ||
use uuid::Uuid; | ||
|
||
// Tests pagination in listing sagas that are candidates for recovery | ||
|
@@ -440,7 +492,6 @@ mod test { | |
node_cx.saga_id, | ||
steno::SagaCachedState::Running, | ||
node_cx.sec_id, | ||
db::model::Generation::new(), | ||
) | ||
.await | ||
.expect("updating state to Running again"); | ||
|
@@ -451,7 +502,6 @@ mod test { | |
node_cx.saga_id, | ||
steno::SagaCachedState::Done, | ||
node_cx.sec_id, | ||
db::model::Generation::new(), | ||
) | ||
.await | ||
.expect("updating state to Done"); | ||
|
@@ -463,7 +513,6 @@ mod test { | |
node_cx.saga_id, | ||
steno::SagaCachedState::Done, | ||
node_cx.sec_id, | ||
db::model::Generation::new(), | ||
) | ||
.await | ||
.expect("updating state to Done again"); | ||
|
@@ -509,4 +558,156 @@ mod test { | |
SagaNodeEvent::new(event, self.sec_id) | ||
} | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_saga_reassignment() { | ||
// Test setup | ||
let logctx = dev::test_setup_log("test_saga_reassignment"); | ||
let mut db = test_setup_database(&logctx.log).await; | ||
let (_, datastore) = datastore_test(&logctx, &db).await; | ||
let opctx = OpContext::for_tests(logctx.log.clone(), datastore.clone()); | ||
|
||
// Populate the database with a few different sagas: | ||
// | ||
// - assigned to SEC A: done, running, and unwinding | ||
// - assigned to SEC B: done, running, and unwinding | ||
// - assigned to SEC C: done, running, and unwinding | ||
// - assigned to SEC D: done, running, and unwinding | ||
// | ||
// Then we'll reassign SECs B's and C's sagas to SEC A and check exactly | ||
// which sagas were changed by this. This exercises: | ||
// - that we don't touch A's sagas (the one we're assigning *to*) | ||
// - that we do touch both B's and C's sagas (the ones we're assigning | ||
// *from*) | ||
// - that we don't touch D's sagas (some other SEC) | ||
// - that we don't touch any "done" sagas | ||
// - that we do touch both running and unwinding sagas | ||
let mut sagas_to_insert = Vec::new(); | ||
let sec_a = SecId(Uuid::new_v4()); | ||
let sec_b = SecId(Uuid::new_v4()); | ||
let sec_c = SecId(Uuid::new_v4()); | ||
let sec_d = SecId(Uuid::new_v4()); | ||
|
||
for sec_id in [sec_a, sec_b, sec_c, sec_d] { | ||
for state in [ | ||
steno::SagaCachedState::Running, | ||
steno::SagaCachedState::Unwinding, | ||
steno::SagaCachedState::Done, | ||
] { | ||
let params = steno::SagaCreateParams { | ||
id: steno::SagaId(Uuid::new_v4()), | ||
name: steno::SagaName::new("tewst saga"), | ||
dag: serde_json::value::Value::Null, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I never thought about how handy a |
||
state, | ||
}; | ||
|
||
sagas_to_insert | ||
.push(db::model::saga_types::Saga::new(sec_id, params)); | ||
} | ||
} | ||
println!("sagas to insert: {:?}", sagas_to_insert); | ||
|
||
// These two sets are complements, but we write out the conditions to | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is great. It makes the test really clear beyond the description above! |
||
// double-check that we've got it right. | ||
let sagas_affected: BTreeSet<_> = sagas_to_insert | ||
.iter() | ||
.filter_map(|saga| { | ||
((saga.creator == sec_b || saga.creator == sec_c) | ||
&& (saga.saga_state.0 == steno::SagaCachedState::Running | ||
|| saga.saga_state.0 | ||
== steno::SagaCachedState::Unwinding)) | ||
.then(|| saga.id) | ||
}) | ||
.collect(); | ||
let sagas_unaffected: BTreeSet<_> = sagas_to_insert | ||
.iter() | ||
.filter_map(|saga| { | ||
(saga.creator == sec_a | ||
|| saga.creator == sec_d | ||
|| saga.saga_state.0 == steno::SagaCachedState::Done) | ||
.then(|| saga.id) | ||
}) | ||
.collect(); | ||
println!("sagas affected: {:?}", sagas_affected); | ||
println!("sagas UNaffected: {:?}", sagas_unaffected); | ||
assert_eq!(sagas_affected.intersection(&sagas_unaffected).count(), 0); | ||
assert_eq!( | ||
sagas_affected.len() + sagas_unaffected.len(), | ||
sagas_to_insert.len() | ||
); | ||
|
||
// Insert the sagas. | ||
let count = { | ||
use db::schema::saga::dsl; | ||
let conn = datastore.pool_connection_for_tests().await.unwrap(); | ||
diesel::insert_into(dsl::saga) | ||
.values(sagas_to_insert) | ||
.execute_async(&*conn) | ||
.await | ||
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) | ||
.expect("successful insertion") | ||
}; | ||
assert_eq!(count, sagas_affected.len() + sagas_unaffected.len()); | ||
|
||
// Reassign uncompleted sagas from SECs B and C to SEC A. | ||
let nreassigned = datastore | ||
.sagas_reassign_sec(&opctx, &[sec_b, sec_c], sec_a) | ||
.await | ||
.expect("failed to re-assign sagas"); | ||
|
||
// Fetch all the sagas and check their states. | ||
let all_sagas: Vec<_> = datastore | ||
.pool_connection_for_tests() | ||
.await | ||
.unwrap() | ||
.transaction_async(|conn| async move { | ||
use db::schema::saga::dsl; | ||
conn.batch_execute_async(ALLOW_FULL_TABLE_SCAN_SQL).await?; | ||
dsl::saga | ||
.select(nexus_db_model::Saga::as_select()) | ||
.load_async(&conn) | ||
.await | ||
}) | ||
.await | ||
.unwrap(); | ||
|
||
for saga in all_sagas { | ||
println!("checking saga: {:?}", saga); | ||
let current_sec = saga.current_sec.unwrap(); | ||
if sagas_affected.contains(&saga.id) { | ||
assert!(saga.creator == sec_b || saga.creator == sec_c); | ||
assert_eq!(current_sec, sec_a); | ||
assert_eq!(*saga.adopt_generation, Generation::from(2)); | ||
assert!( | ||
saga.saga_state.0 == steno::SagaCachedState::Running | ||
|| saga.saga_state.0 | ||
== steno::SagaCachedState::Unwinding | ||
); | ||
} else if sagas_unaffected.contains(&saga.id) { | ||
assert_eq!(current_sec, saga.creator); | ||
assert_eq!(*saga.adopt_generation, Generation::from(1)); | ||
// Its SEC and state could be anything since we've deliberately | ||
// included sagas with various states and SECs that should not | ||
// be affected by the reassignment. | ||
} else { | ||
println!( | ||
"ignoring saga that was not created by this test: {:?}", | ||
saga | ||
); | ||
} | ||
} | ||
|
||
assert_eq!(nreassigned, sagas_affected.len()); | ||
|
||
// If we do it again, we should make no changes. | ||
let nreassigned = datastore | ||
.sagas_reassign_sec(&opctx, &[sec_b, sec_c], sec_a) | ||
.await | ||
.expect("failed to re-assign sagas"); | ||
assert_eq!(nreassigned, 0); | ||
|
||
// Test cleanup | ||
db.cleanup().await.unwrap(); | ||
logctx.cleanup_successful(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed some of the paranoia here because it's going to be too complicated to make it continue to work and it does seem unnecessary at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome! When I looked at this approximately 2 months ago I had the same exact thought! I was hoping it wouldn't be a bone of contention (pardon the pun).