From 71ee793636d3f3bd7f772c4e21e2588c9bec240c Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Tue, 25 Jun 2024 16:51:18 -0700 Subject: [PATCH 1/4] [nexus][saga] Paginate saga nodes by node ID, not saga ID --- nexus/db-queries/src/db/datastore/saga.rs | 4 ++-- nexus/db-queries/src/db/saga_recovery.rs | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/nexus/db-queries/src/db/datastore/saga.rs b/nexus/db-queries/src/db/datastore/saga.rs index 1cd41a9806..2f748de5b3 100644 --- a/nexus/db-queries/src/db/datastore/saga.rs +++ b/nexus/db-queries/src/db/datastore/saga.rs @@ -131,10 +131,10 @@ impl DataStore { pub async fn saga_node_event_list_by_id( &self, id: db::saga_types::SagaId, - pagparams: &DataPageParams<'_, Uuid>, + pagparams: &DataPageParams<'_, db::saga_types::SagaNodeId>, ) -> ListResultVec { use db::schema::saga_node_event::dsl; - paginated(dsl::saga_node_event, dsl::saga_id, &pagparams) + paginated(dsl::saga_node_event, dsl::node_id, &pagparams) .filter(dsl::saga_id.eq(id)) .load_async::( &*self.pool_connection_unauthorized().await?, diff --git a/nexus/db-queries/src/db/saga_recovery.rs b/nexus/db-queries/src/db/saga_recovery.rs index 25f8ff788d..18721962e4 100644 --- a/nexus/db-queries/src/db/saga_recovery.rs +++ b/nexus/db-queries/src/db/saga_recovery.rs @@ -163,9 +163,9 @@ where } // Creates new page params for querying sagas. -fn new_page_params( - marker: Option<&uuid::Uuid>, -) -> DataPageParams<'_, uuid::Uuid> { +fn new_page_params( + marker: Option<&T> +) -> DataPageParams<'_, T> { DataPageParams { marker, direction: dropshot::PaginationOrder::Ascending, @@ -295,7 +295,7 @@ async fn load_saga_log( break; } events.append(&mut some_events); - last_id = Some(events.last().as_ref().unwrap().saga_id.0); + last_id = Some(events.last().as_ref().unwrap().node_id.into()); } Ok(events) } From 2c5a1a68facc27caf78ed0eb6d4b8ffcb2d6051c Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Tue, 25 Jun 2024 16:52:35 -0700 Subject: [PATCH 2/4] fmt --- nexus/db-queries/src/db/saga_recovery.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/nexus/db-queries/src/db/saga_recovery.rs b/nexus/db-queries/src/db/saga_recovery.rs index 18721962e4..171e594071 100644 --- a/nexus/db-queries/src/db/saga_recovery.rs +++ b/nexus/db-queries/src/db/saga_recovery.rs @@ -163,9 +163,7 @@ where } // Creates new page params for querying sagas. -fn new_page_params( - marker: Option<&T> -) -> DataPageParams<'_, T> { +fn new_page_params(marker: Option<&T>) -> DataPageParams<'_, T> { DataPageParams { marker, direction: dropshot::PaginationOrder::Ascending, From e97a58a8a3c6ecc7755fa228c0fadbe33bd757f4 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Wed, 26 Jun 2024 11:20:03 -0700 Subject: [PATCH 3/4] Paginator, regression tests, multi-column --- nexus/db-model/src/saga_types.rs | 8 +- nexus/db-queries/src/db/datastore/saga.rs | 54 ----- nexus/db-queries/src/db/saga_recovery.rs | 262 +++++++++++++++++++--- 3 files changed, 237 insertions(+), 87 deletions(-) diff --git a/nexus/db-model/src/saga_types.rs b/nexus/db-model/src/saga_types.rs index 3ad3e2603c..010c717356 100644 --- a/nexus/db-model/src/saga_types.rs +++ b/nexus/db-model/src/saga_types.rs @@ -76,7 +76,7 @@ impl From<&SecId> for Uuid { /// This exists because Omicron cannot implement foreign traits /// for foreign types. #[derive( - AsExpression, Copy, Clone, Debug, FromSqlRow, PartialEq, PartialOrd, + AsExpression, Copy, Clone, Debug, FromSqlRow, PartialEq, PartialOrd, Ord, Eq, )] #[diesel(sql_type = sql_types::Uuid)] pub struct SagaId(pub steno::SagaId); @@ -110,7 +110,7 @@ where /// This exists because Omicron cannot implement foreign traits /// for foreign types. #[derive( - AsExpression, Copy, Clone, Debug, FromSqlRow, PartialEq, PartialOrd, + AsExpression, Copy, Clone, Debug, FromSqlRow, PartialEq, PartialOrd, Ord, Eq, )] #[diesel(sql_type = sql_types::BigInt)] pub struct SagaNodeId(pub steno::SagaNodeId); @@ -181,7 +181,7 @@ impl FromSql for SagaCachedState { } /// Represents a row in the "Saga" table -#[derive(Queryable, Insertable, Clone, Debug, Selectable)] +#[derive(Queryable, Insertable, Clone, Debug, Selectable, PartialEq)] #[diesel(table_name = saga)] pub struct Saga { pub id: SagaId, @@ -222,7 +222,7 @@ impl Saga { } /// Represents a row in the "SagaNodeEvent" table -#[derive(Queryable, Insertable, Clone, Debug, Selectable)] +#[derive(Queryable, Insertable, Clone, Debug, Selectable, PartialEq)] #[diesel(table_name = saga_node_event)] pub struct SagaNodeEvent { pub saga_id: SagaId, diff --git a/nexus/db-queries/src/db/datastore/saga.rs b/nexus/db-queries/src/db/datastore/saga.rs index 2f748de5b3..c42d14d0d7 100644 --- a/nexus/db-queries/src/db/datastore/saga.rs +++ b/nexus/db-queries/src/db/datastore/saga.rs @@ -9,17 +9,13 @@ use crate::db; use crate::db::error::public_error_from_diesel; use crate::db::error::ErrorHandler; use crate::db::model::Generation; -use crate::db::pagination::paginated; use crate::db::update_and_check::UpdateAndCheck; use crate::db::update_and_check::UpdateStatus; use async_bb8_diesel::AsyncRunQueryDsl; use diesel::prelude::*; -use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; -use omicron_common::api::external::ListResultVec; use omicron_common::api::external::LookupType; use omicron_common::api::external::ResourceType; -use uuid::Uuid; impl DataStore { pub async fn saga_create( @@ -103,54 +99,4 @@ impl DataStore { )), } } - - pub async fn saga_list_unfinished_by_id( - &self, - sec_id: &db::SecId, - pagparams: &DataPageParams<'_, Uuid>, - ) -> ListResultVec { - use db::schema::saga::dsl; - paginated(dsl::saga, dsl::id, &pagparams) - .filter(dsl::saga_state.ne(db::saga_types::SagaCachedState( - steno::SagaCachedState::Done, - ))) - .filter(dsl::current_sec.eq(*sec_id)) - .load_async(&*self.pool_connection_unauthorized().await?) - .await - .map_err(|e| { - public_error_from_diesel( - e, - ErrorHandler::NotFoundByLookup( - ResourceType::SagaDbg, - LookupType::ById(sec_id.0), - ), - ) - }) - } - - pub async fn saga_node_event_list_by_id( - &self, - id: db::saga_types::SagaId, - pagparams: &DataPageParams<'_, db::saga_types::SagaNodeId>, - ) -> ListResultVec { - use db::schema::saga_node_event::dsl; - paginated(dsl::saga_node_event, dsl::node_id, &pagparams) - .filter(dsl::saga_id.eq(id)) - .load_async::( - &*self.pool_connection_unauthorized().await?, - ) - .await - .map_err(|e| { - public_error_from_diesel( - e, - ErrorHandler::NotFoundByLookup( - ResourceType::SagaDbg, - LookupType::ById(id.0 .0), - ), - ) - })? - .into_iter() - .map(|db_event| steno::SagaNodeEvent::try_from(db_event)) - .collect::>() - } } diff --git a/nexus/db-queries/src/db/saga_recovery.rs b/nexus/db-queries/src/db/saga_recovery.rs index 171e594071..eb2003508c 100644 --- a/nexus/db-queries/src/db/saga_recovery.rs +++ b/nexus/db-queries/src/db/saga_recovery.rs @@ -6,13 +6,22 @@ use crate::context::OpContext; use crate::db; +use crate::db::error::public_error_from_diesel; +use crate::db::error::ErrorHandler; +use crate::db::pagination::{paginated, paginated_multicolumn, Paginator}; +use async_bb8_diesel::AsyncRunQueryDsl; +use diesel::prelude::*; +use diesel::ExpressionMethods; +use diesel::SelectableHelper; use futures::{future::BoxFuture, TryFutureExt}; -use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; +use omicron_common::api::external::LookupType; +use omicron_common::api::external::ResourceType; use omicron_common::backoff::retry_notify; use omicron_common::backoff::retry_policy_internal_service; use omicron_common::backoff::BackoffError; use std::future::Future; +use std::num::NonZeroU32; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -162,14 +171,10 @@ where })) } -// Creates new page params for querying sagas. -fn new_page_params(marker: Option<&T>) -> DataPageParams<'_, T> { - DataPageParams { - marker, - direction: dropshot::PaginationOrder::Ascending, - limit: std::num::NonZeroU32::new(100).unwrap(), - } -} +const BATCH_SIZE: NonZeroU32 = unsafe { + // Safety: 100 is more than zero + NonZeroU32::new_unchecked(100) +}; /// Queries the database to return a list of uncompleted sagas assigned to SEC /// `sec_id` @@ -193,17 +198,32 @@ async fn list_unfinished_sagas( // Although we could read them all into memory simultaneously, this // risks blocking the DB for an unreasonable amount of time. Instead, // we paginate to avoid cutting off availability to the DB. - let mut last_id = None; let mut sagas = vec![]; - loop { - let pagparams = new_page_params(last_id.as_ref()); - let mut some_sagas = - datastore.saga_list_unfinished_by_id(sec_id, &pagparams).await?; - if some_sagas.is_empty() { - break; - } - sagas.append(&mut some_sagas); - last_id = Some(sagas.last().as_ref().unwrap().id.0 .0); + let mut paginator = Paginator::new(BATCH_SIZE); + let conn = datastore.pool_connection_unauthorized().await?; + while let Some(p) = paginator.next() { + use db::schema::saga::dsl; + + let mut batch = paginated(dsl::saga, dsl::id, &p.current_pagparams()) + .filter(dsl::saga_state.ne(db::saga_types::SagaCachedState( + steno::SagaCachedState::Done, + ))) + .filter(dsl::current_sec.eq(*sec_id)) + .select(db::saga_types::Saga::as_select()) + .load_async(&*conn) + .await + .map_err(|e| { + public_error_from_diesel( + e, + ErrorHandler::NotFoundByLookup( + ResourceType::SagaDbg, + LookupType::ById(sec_id.0), + ), + ) + })?; + + paginator = p.found_batch(&batch, &|row| row.id); + sagas.append(&mut batch); } Ok(sagas) } @@ -283,17 +303,38 @@ async fn load_saga_log( // Although we could read them all into memory simultaneously, this // risks blocking the DB for an unreasonable amount of time. Instead, // we paginate to avoid cutting off availability. - let mut last_id = None; let mut events = vec![]; - loop { - let pagparams = new_page_params(last_id.as_ref()); - let mut some_events = - datastore.saga_node_event_list_by_id(saga.id, &pagparams).await?; - if some_events.is_empty() { - break; - } - events.append(&mut some_events); - last_id = Some(events.last().as_ref().unwrap().node_id.into()); + let mut paginator = Paginator::new(BATCH_SIZE); + let conn = datastore.pool_connection_unauthorized().await?; + while let Some(p) = paginator.next() { + use db::schema::saga_node_event::dsl; + let batch = paginated_multicolumn( + dsl::saga_node_event, + (dsl::node_id, dsl::event_type), + &p.current_pagparams(), + ) + .filter(dsl::saga_id.eq(saga.id)) + .select(db::saga_types::SagaNodeEvent::as_select()) + .load_async(&*conn) + .map_err(|e| { + public_error_from_diesel( + e, + ErrorHandler::NotFoundByLookup( + ResourceType::SagaDbg, + LookupType::ById(saga.id.0 .0), + ), + ) + }) + .await?; + paginator = + p.found_batch(&batch, &|row| (row.node_id, row.event_type.clone())); + + let mut batch = batch + .into_iter() + .map(|event| steno::SagaNodeEvent::try_from(event)) + .collect::, Error>>()?; + + events.append(&mut batch); } Ok(events) } @@ -306,6 +347,8 @@ mod test { use nexus_test_utils::db::test_setup_database; use omicron_test_utils::dev; use once_cell::sync::Lazy; + use pretty_assertions::assert_eq; + use rand::seq::SliceRandom; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use steno::{ new_action_noop_undo, Action, ActionContext, ActionError, @@ -573,4 +616,165 @@ mod test { db.cleanup().await.unwrap(); logctx.cleanup_successful(); } + + #[tokio::test] + async fn test_list_unfinished_sagas() { + // Test setup + let logctx = dev::test_setup_log("test_list_unfinished_sagas"); + let log = logctx.log.new(o!()); + let (mut db, db_datastore) = new_db(&log).await; + let sec_id = db::SecId(uuid::Uuid::new_v4()); + let opctx = OpContext::for_tests( + log, + Arc::clone(&db_datastore) as Arc, + ); + + // Create a couple batches of sagas. + let new_running_db_saga = || { + let params = steno::SagaCreateParams { + id: steno::SagaId(Uuid::new_v4()), + name: steno::SagaName::new("test saga"), + dag: serde_json::value::Value::Null, + state: steno::SagaCachedState::Running, + }; + + db::model::saga_types::Saga::new(sec_id, params) + }; + let mut inserted_sagas = (0..BATCH_SIZE.get() * 2) + .map(|_| new_running_db_saga()) + .collect::>(); + + // Shuffle these sagas into a random order to check that the pagination + // order is working as intended on the read path, which we'll do later + // in this test. + inserted_sagas.shuffle(&mut rand::thread_rng()); + + // Insert the batches of unfinished sagas into the database + let conn = db_datastore + .pool_connection_unauthorized() + .await + .expect("Failed to access db connection"); + diesel::insert_into(db::schema::saga::dsl::saga) + .values(inserted_sagas.clone()) + .execute_async(&*conn) + .await + .expect("Failed to insert test setup data"); + + // List them, expect to see them all in order by ID. + let mut observed_sagas = + list_unfinished_sagas(&opctx, &db_datastore, &sec_id) + .await + .expect("Failed to list unfinished sagas"); + inserted_sagas.sort_by_key(|a| a.id); + + // Timestamps can change slightly when we insert them. + // + // Sanitize them to make input/output equality checks easier. + let sanitize_timestamps = |sagas: &mut Vec| { + for saga in sagas { + saga.time_created = chrono::DateTime::UNIX_EPOCH; + saga.adopt_time = chrono::DateTime::UNIX_EPOCH; + } + }; + sanitize_timestamps(&mut observed_sagas); + sanitize_timestamps(&mut inserted_sagas); + + assert_eq!( + inserted_sagas, observed_sagas, + "Observed sagas did not match inserted sagas" + ); + + // Test cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_list_unfinished_nodes() { + // Test setup + let logctx = dev::test_setup_log("test_list_unfinished_nodes"); + let log = logctx.log.new(o!()); + let (mut db, db_datastore) = new_db(&log).await; + let sec_id = db::SecId(uuid::Uuid::new_v4()); + let saga_id = steno::SagaId(Uuid::new_v4()); + + // Create a couple batches of saga events + let new_db_saga_nodes = + |node_id: u32, event_type: steno::SagaNodeEventType| { + let event = steno::SagaNodeEvent { + saga_id, + node_id: steno::SagaNodeId::from(node_id), + event_type, + }; + + db::model::saga_types::SagaNodeEvent::new(event, sec_id) + }; + let mut inserted_nodes = (0..BATCH_SIZE.get() * 2) + .flat_map(|i| { + // This isn't an exhaustive list of event types, but gives us a few + // options to pick from. Since this is a pagination key, it's + // important to include a variety here. + use steno::SagaNodeEventType::*; + [ + new_db_saga_nodes(i, Started), + new_db_saga_nodes(i, UndoStarted), + new_db_saga_nodes(i, UndoFinished), + ] + }) + .collect::>(); + + // Shuffle these nodes into a random order to check that the pagination + // order is working as intended on the read path, which we'll do later + // in this test. + inserted_nodes.shuffle(&mut rand::thread_rng()); + + // Insert them into the database + let conn = db_datastore + .pool_connection_unauthorized() + .await + .expect("Failed to access db connection"); + diesel::insert_into(db::schema::saga_node_event::dsl::saga_node_event) + .values(inserted_nodes.clone()) + .execute_async(&*conn) + .await + .expect("Failed to insert test setup data"); + + // List them, expect to see them all in order by ID. + // + // Note that we need to make up a saga to see this, but the + // part of it that actually matters is the ID. + let params = steno::SagaCreateParams { + id: saga_id, + name: steno::SagaName::new("test saga"), + dag: serde_json::value::Value::Null, + state: steno::SagaCachedState::Running, + }; + let saga = db::model::saga_types::Saga::new(sec_id, params); + let observed_nodes = load_saga_log(&db_datastore, &saga) + .await + .expect("Failed to list unfinished nodes"); + inserted_nodes.sort_by_key(|a| (a.node_id, a.event_type.clone())); + + let inserted_nodes = inserted_nodes + .into_iter() + .map(|node| steno::SagaNodeEvent::try_from(node)) + .collect::, _>>() + .expect("Couldn't convert DB nodes to steno nodes"); + + // The steno::SagaNodeEvent type doesn't implement PartialEq, so we need to do this + // a little manually. + assert_eq!(inserted_nodes.len(), observed_nodes.len()); + for i in 0..inserted_nodes.len() { + assert_eq!(inserted_nodes[i].saga_id, observed_nodes[i].saga_id); + assert_eq!(inserted_nodes[i].node_id, observed_nodes[i].node_id); + assert_eq!( + inserted_nodes[i].event_type.label(), + observed_nodes[i].event_type.label() + ); + } + + // Test cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } } From d7ed0b00d10ffd1debc2dd0f9637749bdb882848 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Thu, 27 Jun 2024 15:48:37 -0700 Subject: [PATCH 4/4] review feedback --- nexus/db-queries/src/db/saga_recovery.rs | 71 ++++++++++++++++-------- 1 file changed, 48 insertions(+), 23 deletions(-) diff --git a/nexus/db-queries/src/db/saga_recovery.rs b/nexus/db-queries/src/db/saga_recovery.rs index eb2003508c..e85011f60f 100644 --- a/nexus/db-queries/src/db/saga_recovery.rs +++ b/nexus/db-queries/src/db/saga_recovery.rs @@ -6,6 +6,7 @@ use crate::context::OpContext; use crate::db; +use crate::db::datastore::SQL_BATCH_SIZE; use crate::db::error::public_error_from_diesel; use crate::db::error::ErrorHandler; use crate::db::pagination::{paginated, paginated_multicolumn, Paginator}; @@ -21,7 +22,6 @@ use omicron_common::backoff::retry_notify; use omicron_common::backoff::retry_policy_internal_service; use omicron_common::backoff::BackoffError; use std::future::Future; -use std::num::NonZeroU32; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -171,11 +171,6 @@ where })) } -const BATCH_SIZE: NonZeroU32 = unsafe { - // Safety: 100 is more than zero - NonZeroU32::new_unchecked(100) -}; - /// Queries the database to return a list of uncompleted sagas assigned to SEC /// `sec_id` // For now, we do the simplest thing: we fetch all the sagas that the @@ -199,8 +194,8 @@ async fn list_unfinished_sagas( // risks blocking the DB for an unreasonable amount of time. Instead, // we paginate to avoid cutting off availability to the DB. let mut sagas = vec![]; - let mut paginator = Paginator::new(BATCH_SIZE); - let conn = datastore.pool_connection_unauthorized().await?; + let mut paginator = Paginator::new(SQL_BATCH_SIZE); + let conn = datastore.pool_connection_authorized(opctx).await?; while let Some(p) = paginator.next() { use db::schema::saga::dsl; @@ -257,7 +252,7 @@ where "saga_name" => saga_name.clone(), ); - let log_events = load_saga_log(datastore, &saga).await?; + let log_events = load_saga_log(&opctx, datastore, &saga).await?; trace!( opctx.log, "recovering saga: loaded log"; @@ -295,6 +290,7 @@ where /// Queries the database to load the full log for the specified saga async fn load_saga_log( + opctx: &OpContext, datastore: &db::DataStore, saga: &db::saga_types::Saga, ) -> Result, Error> { @@ -304,8 +300,8 @@ async fn load_saga_log( // risks blocking the DB for an unreasonable amount of time. Instead, // we paginate to avoid cutting off availability. let mut events = vec![]; - let mut paginator = Paginator::new(BATCH_SIZE); - let conn = datastore.pool_connection_unauthorized().await?; + let mut paginator = Paginator::new(SQL_BATCH_SIZE); + let conn = datastore.pool_connection_authorized(opctx).await?; while let Some(p) = paginator.next() { use db::schema::saga_node_event::dsl; let batch = paginated_multicolumn( @@ -316,15 +312,7 @@ async fn load_saga_log( .filter(dsl::saga_id.eq(saga.id)) .select(db::saga_types::SagaNodeEvent::as_select()) .load_async(&*conn) - .map_err(|e| { - public_error_from_diesel( - e, - ErrorHandler::NotFoundByLookup( - ResourceType::SagaDbg, - LookupType::ById(saga.id.0 .0), - ), - ) - }) + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) .await?; paginator = p.found_batch(&batch, &|row| (row.node_id, row.event_type.clone())); @@ -640,7 +628,7 @@ mod test { db::model::saga_types::Saga::new(sec_id, params) }; - let mut inserted_sagas = (0..BATCH_SIZE.get() * 2) + let mut inserted_sagas = (0..SQL_BATCH_SIZE.get() * 2) .map(|_| new_running_db_saga()) .collect::>(); @@ -696,6 +684,10 @@ mod test { let log = logctx.log.new(o!()); let (mut db, db_datastore) = new_db(&log).await; let sec_id = db::SecId(uuid::Uuid::new_v4()); + let opctx = OpContext::for_tests( + log, + Arc::clone(&db_datastore) as Arc, + ); let saga_id = steno::SagaId(Uuid::new_v4()); // Create a couple batches of saga events @@ -709,7 +701,7 @@ mod test { db::model::saga_types::SagaNodeEvent::new(event, sec_id) }; - let mut inserted_nodes = (0..BATCH_SIZE.get() * 2) + let mut inserted_nodes = (0..SQL_BATCH_SIZE.get() * 2) .flat_map(|i| { // This isn't an exhaustive list of event types, but gives us a few // options to pick from. Since this is a pagination key, it's @@ -750,7 +742,7 @@ mod test { state: steno::SagaCachedState::Running, }; let saga = db::model::saga_types::Saga::new(sec_id, params); - let observed_nodes = load_saga_log(&db_datastore, &saga) + let observed_nodes = load_saga_log(&opctx, &db_datastore, &saga) .await .expect("Failed to list unfinished nodes"); inserted_nodes.sort_by_key(|a| (a.node_id, a.event_type.clone())); @@ -777,4 +769,37 @@ mod test { db.cleanup().await.unwrap(); logctx.cleanup_successful(); } + + #[tokio::test] + async fn test_list_no_unfinished_nodes() { + // Test setup + let logctx = dev::test_setup_log("test_list_no_unfinished_nodes"); + let log = logctx.log.new(o!()); + let (mut db, db_datastore) = new_db(&log).await; + let sec_id = db::SecId(uuid::Uuid::new_v4()); + let opctx = OpContext::for_tests( + log, + Arc::clone(&db_datastore) as Arc, + ); + let saga_id = steno::SagaId(Uuid::new_v4()); + + let params = steno::SagaCreateParams { + id: saga_id, + name: steno::SagaName::new("test saga"), + dag: serde_json::value::Value::Null, + state: steno::SagaCachedState::Running, + }; + let saga = db::model::saga_types::Saga::new(sec_id, params); + + // Test that this returns "no nodes" rather than throwing some "not + // found" error. + let observed_nodes = load_saga_log(&opctx, &db_datastore, &saga) + .await + .expect("Failed to list unfinished nodes"); + assert_eq!(observed_nodes.len(), 0); + + // Test cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } }