From 886548b81545d72096fdde66cfe23228e160310b Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Thu, 27 Jun 2024 17:29:42 -0700 Subject: [PATCH] [nexus][saga] Fix saga node pagination, use Paginator API (#5953) - Uses the `Paginator` API for listing sagas and saga node events - For saga node events, paginates by `(node_id, event_type)` rather than by `saga_id`, which was previously incorrect - Adds regression tests that cross the `BATCH_SIZE` boundary and check pagination order for both queries Fixes https://github.com/oxidecomputer/omicron/issues/5948 --- nexus/db-model/src/saga_types.rs | 8 +- nexus/db-queries/src/db/datastore/saga.rs | 54 ---- nexus/db-queries/src/db/saga_recovery.rs | 293 +++++++++++++++++++--- 3 files changed, 264 insertions(+), 91 deletions(-) diff --git a/nexus/db-model/src/saga_types.rs b/nexus/db-model/src/saga_types.rs index 3ad3e2603c..010c717356 100644 --- a/nexus/db-model/src/saga_types.rs +++ b/nexus/db-model/src/saga_types.rs @@ -76,7 +76,7 @@ impl From<&SecId> for Uuid { /// This exists because Omicron cannot implement foreign traits /// for foreign types. #[derive( - AsExpression, Copy, Clone, Debug, FromSqlRow, PartialEq, PartialOrd, + AsExpression, Copy, Clone, Debug, FromSqlRow, PartialEq, PartialOrd, Ord, Eq, )] #[diesel(sql_type = sql_types::Uuid)] pub struct SagaId(pub steno::SagaId); @@ -110,7 +110,7 @@ where /// This exists because Omicron cannot implement foreign traits /// for foreign types. #[derive( - AsExpression, Copy, Clone, Debug, FromSqlRow, PartialEq, PartialOrd, + AsExpression, Copy, Clone, Debug, FromSqlRow, PartialEq, PartialOrd, Ord, Eq, )] #[diesel(sql_type = sql_types::BigInt)] pub struct SagaNodeId(pub steno::SagaNodeId); @@ -181,7 +181,7 @@ impl FromSql for SagaCachedState { } /// Represents a row in the "Saga" table -#[derive(Queryable, Insertable, Clone, Debug, Selectable)] +#[derive(Queryable, Insertable, Clone, Debug, Selectable, PartialEq)] #[diesel(table_name = saga)] pub struct Saga { pub id: SagaId, @@ -222,7 +222,7 @@ impl Saga { } /// Represents a row in the "SagaNodeEvent" table -#[derive(Queryable, Insertable, Clone, Debug, Selectable)] +#[derive(Queryable, Insertable, Clone, Debug, Selectable, PartialEq)] #[diesel(table_name = saga_node_event)] pub struct SagaNodeEvent { pub saga_id: SagaId, diff --git a/nexus/db-queries/src/db/datastore/saga.rs b/nexus/db-queries/src/db/datastore/saga.rs index 1cd41a9806..c42d14d0d7 100644 --- a/nexus/db-queries/src/db/datastore/saga.rs +++ b/nexus/db-queries/src/db/datastore/saga.rs @@ -9,17 +9,13 @@ use crate::db; use crate::db::error::public_error_from_diesel; use crate::db::error::ErrorHandler; use crate::db::model::Generation; -use crate::db::pagination::paginated; use crate::db::update_and_check::UpdateAndCheck; use crate::db::update_and_check::UpdateStatus; use async_bb8_diesel::AsyncRunQueryDsl; use diesel::prelude::*; -use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; -use omicron_common::api::external::ListResultVec; use omicron_common::api::external::LookupType; use omicron_common::api::external::ResourceType; -use uuid::Uuid; impl DataStore { pub async fn saga_create( @@ -103,54 +99,4 @@ impl DataStore { )), } } - - pub async fn saga_list_unfinished_by_id( - &self, - sec_id: &db::SecId, - pagparams: &DataPageParams<'_, Uuid>, - ) -> ListResultVec { - use db::schema::saga::dsl; - paginated(dsl::saga, dsl::id, &pagparams) - .filter(dsl::saga_state.ne(db::saga_types::SagaCachedState( - steno::SagaCachedState::Done, - ))) - .filter(dsl::current_sec.eq(*sec_id)) - .load_async(&*self.pool_connection_unauthorized().await?) - .await - .map_err(|e| { - public_error_from_diesel( - e, - ErrorHandler::NotFoundByLookup( - ResourceType::SagaDbg, - LookupType::ById(sec_id.0), - ), - ) - }) - } - - pub async fn saga_node_event_list_by_id( - &self, - id: db::saga_types::SagaId, - pagparams: &DataPageParams<'_, Uuid>, - ) -> ListResultVec { - use db::schema::saga_node_event::dsl; - paginated(dsl::saga_node_event, dsl::saga_id, &pagparams) - .filter(dsl::saga_id.eq(id)) - .load_async::( - &*self.pool_connection_unauthorized().await?, - ) - .await - .map_err(|e| { - public_error_from_diesel( - e, - ErrorHandler::NotFoundByLookup( - ResourceType::SagaDbg, - LookupType::ById(id.0 .0), - ), - ) - })? - .into_iter() - .map(|db_event| steno::SagaNodeEvent::try_from(db_event)) - .collect::>() - } } diff --git a/nexus/db-queries/src/db/saga_recovery.rs b/nexus/db-queries/src/db/saga_recovery.rs index 25f8ff788d..e85011f60f 100644 --- a/nexus/db-queries/src/db/saga_recovery.rs +++ b/nexus/db-queries/src/db/saga_recovery.rs @@ -6,9 +6,18 @@ use crate::context::OpContext; use crate::db; +use crate::db::datastore::SQL_BATCH_SIZE; +use crate::db::error::public_error_from_diesel; +use crate::db::error::ErrorHandler; +use crate::db::pagination::{paginated, paginated_multicolumn, Paginator}; +use async_bb8_diesel::AsyncRunQueryDsl; +use diesel::prelude::*; +use diesel::ExpressionMethods; +use diesel::SelectableHelper; use futures::{future::BoxFuture, TryFutureExt}; -use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; +use omicron_common::api::external::LookupType; +use omicron_common::api::external::ResourceType; use omicron_common::backoff::retry_notify; use omicron_common::backoff::retry_policy_internal_service; use omicron_common::backoff::BackoffError; @@ -162,17 +171,6 @@ where })) } -// Creates new page params for querying sagas. -fn new_page_params( - marker: Option<&uuid::Uuid>, -) -> DataPageParams<'_, uuid::Uuid> { - DataPageParams { - marker, - direction: dropshot::PaginationOrder::Ascending, - limit: std::num::NonZeroU32::new(100).unwrap(), - } -} - /// Queries the database to return a list of uncompleted sagas assigned to SEC /// `sec_id` // For now, we do the simplest thing: we fetch all the sagas that the @@ -195,17 +193,32 @@ async fn list_unfinished_sagas( // Although we could read them all into memory simultaneously, this // risks blocking the DB for an unreasonable amount of time. Instead, // we paginate to avoid cutting off availability to the DB. - let mut last_id = None; let mut sagas = vec![]; - loop { - let pagparams = new_page_params(last_id.as_ref()); - let mut some_sagas = - datastore.saga_list_unfinished_by_id(sec_id, &pagparams).await?; - if some_sagas.is_empty() { - break; - } - sagas.append(&mut some_sagas); - last_id = Some(sagas.last().as_ref().unwrap().id.0 .0); + let mut paginator = Paginator::new(SQL_BATCH_SIZE); + let conn = datastore.pool_connection_authorized(opctx).await?; + while let Some(p) = paginator.next() { + use db::schema::saga::dsl; + + let mut batch = paginated(dsl::saga, dsl::id, &p.current_pagparams()) + .filter(dsl::saga_state.ne(db::saga_types::SagaCachedState( + steno::SagaCachedState::Done, + ))) + .filter(dsl::current_sec.eq(*sec_id)) + .select(db::saga_types::Saga::as_select()) + .load_async(&*conn) + .await + .map_err(|e| { + public_error_from_diesel( + e, + ErrorHandler::NotFoundByLookup( + ResourceType::SagaDbg, + LookupType::ById(sec_id.0), + ), + ) + })?; + + paginator = p.found_batch(&batch, &|row| row.id); + sagas.append(&mut batch); } Ok(sagas) } @@ -239,7 +252,7 @@ where "saga_name" => saga_name.clone(), ); - let log_events = load_saga_log(datastore, &saga).await?; + let log_events = load_saga_log(&opctx, datastore, &saga).await?; trace!( opctx.log, "recovering saga: loaded log"; @@ -277,6 +290,7 @@ where /// Queries the database to load the full log for the specified saga async fn load_saga_log( + opctx: &OpContext, datastore: &db::DataStore, saga: &db::saga_types::Saga, ) -> Result, Error> { @@ -285,17 +299,30 @@ async fn load_saga_log( // Although we could read them all into memory simultaneously, this // risks blocking the DB for an unreasonable amount of time. Instead, // we paginate to avoid cutting off availability. - let mut last_id = None; let mut events = vec![]; - loop { - let pagparams = new_page_params(last_id.as_ref()); - let mut some_events = - datastore.saga_node_event_list_by_id(saga.id, &pagparams).await?; - if some_events.is_empty() { - break; - } - events.append(&mut some_events); - last_id = Some(events.last().as_ref().unwrap().saga_id.0); + let mut paginator = Paginator::new(SQL_BATCH_SIZE); + let conn = datastore.pool_connection_authorized(opctx).await?; + while let Some(p) = paginator.next() { + use db::schema::saga_node_event::dsl; + let batch = paginated_multicolumn( + dsl::saga_node_event, + (dsl::node_id, dsl::event_type), + &p.current_pagparams(), + ) + .filter(dsl::saga_id.eq(saga.id)) + .select(db::saga_types::SagaNodeEvent::as_select()) + .load_async(&*conn) + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + .await?; + paginator = + p.found_batch(&batch, &|row| (row.node_id, row.event_type.clone())); + + let mut batch = batch + .into_iter() + .map(|event| steno::SagaNodeEvent::try_from(event)) + .collect::, Error>>()?; + + events.append(&mut batch); } Ok(events) } @@ -308,6 +335,8 @@ mod test { use nexus_test_utils::db::test_setup_database; use omicron_test_utils::dev; use once_cell::sync::Lazy; + use pretty_assertions::assert_eq; + use rand::seq::SliceRandom; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use steno::{ new_action_noop_undo, Action, ActionContext, ActionError, @@ -575,4 +604,202 @@ mod test { db.cleanup().await.unwrap(); logctx.cleanup_successful(); } + + #[tokio::test] + async fn test_list_unfinished_sagas() { + // Test setup + let logctx = dev::test_setup_log("test_list_unfinished_sagas"); + let log = logctx.log.new(o!()); + let (mut db, db_datastore) = new_db(&log).await; + let sec_id = db::SecId(uuid::Uuid::new_v4()); + let opctx = OpContext::for_tests( + log, + Arc::clone(&db_datastore) as Arc, + ); + + // Create a couple batches of sagas. + let new_running_db_saga = || { + let params = steno::SagaCreateParams { + id: steno::SagaId(Uuid::new_v4()), + name: steno::SagaName::new("test saga"), + dag: serde_json::value::Value::Null, + state: steno::SagaCachedState::Running, + }; + + db::model::saga_types::Saga::new(sec_id, params) + }; + let mut inserted_sagas = (0..SQL_BATCH_SIZE.get() * 2) + .map(|_| new_running_db_saga()) + .collect::>(); + + // Shuffle these sagas into a random order to check that the pagination + // order is working as intended on the read path, which we'll do later + // in this test. + inserted_sagas.shuffle(&mut rand::thread_rng()); + + // Insert the batches of unfinished sagas into the database + let conn = db_datastore + .pool_connection_unauthorized() + .await + .expect("Failed to access db connection"); + diesel::insert_into(db::schema::saga::dsl::saga) + .values(inserted_sagas.clone()) + .execute_async(&*conn) + .await + .expect("Failed to insert test setup data"); + + // List them, expect to see them all in order by ID. + let mut observed_sagas = + list_unfinished_sagas(&opctx, &db_datastore, &sec_id) + .await + .expect("Failed to list unfinished sagas"); + inserted_sagas.sort_by_key(|a| a.id); + + // Timestamps can change slightly when we insert them. + // + // Sanitize them to make input/output equality checks easier. + let sanitize_timestamps = |sagas: &mut Vec| { + for saga in sagas { + saga.time_created = chrono::DateTime::UNIX_EPOCH; + saga.adopt_time = chrono::DateTime::UNIX_EPOCH; + } + }; + sanitize_timestamps(&mut observed_sagas); + sanitize_timestamps(&mut inserted_sagas); + + assert_eq!( + inserted_sagas, observed_sagas, + "Observed sagas did not match inserted sagas" + ); + + // Test cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_list_unfinished_nodes() { + // Test setup + let logctx = dev::test_setup_log("test_list_unfinished_nodes"); + let log = logctx.log.new(o!()); + let (mut db, db_datastore) = new_db(&log).await; + let sec_id = db::SecId(uuid::Uuid::new_v4()); + let opctx = OpContext::for_tests( + log, + Arc::clone(&db_datastore) as Arc, + ); + let saga_id = steno::SagaId(Uuid::new_v4()); + + // Create a couple batches of saga events + let new_db_saga_nodes = + |node_id: u32, event_type: steno::SagaNodeEventType| { + let event = steno::SagaNodeEvent { + saga_id, + node_id: steno::SagaNodeId::from(node_id), + event_type, + }; + + db::model::saga_types::SagaNodeEvent::new(event, sec_id) + }; + let mut inserted_nodes = (0..SQL_BATCH_SIZE.get() * 2) + .flat_map(|i| { + // This isn't an exhaustive list of event types, but gives us a few + // options to pick from. Since this is a pagination key, it's + // important to include a variety here. + use steno::SagaNodeEventType::*; + [ + new_db_saga_nodes(i, Started), + new_db_saga_nodes(i, UndoStarted), + new_db_saga_nodes(i, UndoFinished), + ] + }) + .collect::>(); + + // Shuffle these nodes into a random order to check that the pagination + // order is working as intended on the read path, which we'll do later + // in this test. + inserted_nodes.shuffle(&mut rand::thread_rng()); + + // Insert them into the database + let conn = db_datastore + .pool_connection_unauthorized() + .await + .expect("Failed to access db connection"); + diesel::insert_into(db::schema::saga_node_event::dsl::saga_node_event) + .values(inserted_nodes.clone()) + .execute_async(&*conn) + .await + .expect("Failed to insert test setup data"); + + // List them, expect to see them all in order by ID. + // + // Note that we need to make up a saga to see this, but the + // part of it that actually matters is the ID. + let params = steno::SagaCreateParams { + id: saga_id, + name: steno::SagaName::new("test saga"), + dag: serde_json::value::Value::Null, + state: steno::SagaCachedState::Running, + }; + let saga = db::model::saga_types::Saga::new(sec_id, params); + let observed_nodes = load_saga_log(&opctx, &db_datastore, &saga) + .await + .expect("Failed to list unfinished nodes"); + inserted_nodes.sort_by_key(|a| (a.node_id, a.event_type.clone())); + + let inserted_nodes = inserted_nodes + .into_iter() + .map(|node| steno::SagaNodeEvent::try_from(node)) + .collect::, _>>() + .expect("Couldn't convert DB nodes to steno nodes"); + + // The steno::SagaNodeEvent type doesn't implement PartialEq, so we need to do this + // a little manually. + assert_eq!(inserted_nodes.len(), observed_nodes.len()); + for i in 0..inserted_nodes.len() { + assert_eq!(inserted_nodes[i].saga_id, observed_nodes[i].saga_id); + assert_eq!(inserted_nodes[i].node_id, observed_nodes[i].node_id); + assert_eq!( + inserted_nodes[i].event_type.label(), + observed_nodes[i].event_type.label() + ); + } + + // Test cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_list_no_unfinished_nodes() { + // Test setup + let logctx = dev::test_setup_log("test_list_no_unfinished_nodes"); + let log = logctx.log.new(o!()); + let (mut db, db_datastore) = new_db(&log).await; + let sec_id = db::SecId(uuid::Uuid::new_v4()); + let opctx = OpContext::for_tests( + log, + Arc::clone(&db_datastore) as Arc, + ); + let saga_id = steno::SagaId(Uuid::new_v4()); + + let params = steno::SagaCreateParams { + id: saga_id, + name: steno::SagaName::new("test saga"), + dag: serde_json::value::Value::Null, + state: steno::SagaCachedState::Running, + }; + let saga = db::model::saga_types::Saga::new(sec_id, params); + + // Test that this returns "no nodes" rather than throwing some "not + // found" error. + let observed_nodes = load_saga_log(&opctx, &db_datastore, &saga) + .await + .expect("Failed to list unfinished nodes"); + assert_eq!(observed_nodes.len(), 0); + + // Test cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } }