From e97a58a8a3c6ecc7755fa228c0fadbe33bd757f4 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Wed, 26 Jun 2024 11:20:03 -0700 Subject: [PATCH] Paginator, regression tests, multi-column --- nexus/db-model/src/saga_types.rs | 8 +- nexus/db-queries/src/db/datastore/saga.rs | 54 ----- nexus/db-queries/src/db/saga_recovery.rs | 262 +++++++++++++++++++--- 3 files changed, 237 insertions(+), 87 deletions(-) diff --git a/nexus/db-model/src/saga_types.rs b/nexus/db-model/src/saga_types.rs index 3ad3e2603c..010c717356 100644 --- a/nexus/db-model/src/saga_types.rs +++ b/nexus/db-model/src/saga_types.rs @@ -76,7 +76,7 @@ impl From<&SecId> for Uuid { /// This exists because Omicron cannot implement foreign traits /// for foreign types. #[derive( - AsExpression, Copy, Clone, Debug, FromSqlRow, PartialEq, PartialOrd, + AsExpression, Copy, Clone, Debug, FromSqlRow, PartialEq, PartialOrd, Ord, Eq, )] #[diesel(sql_type = sql_types::Uuid)] pub struct SagaId(pub steno::SagaId); @@ -110,7 +110,7 @@ where /// This exists because Omicron cannot implement foreign traits /// for foreign types. #[derive( - AsExpression, Copy, Clone, Debug, FromSqlRow, PartialEq, PartialOrd, + AsExpression, Copy, Clone, Debug, FromSqlRow, PartialEq, PartialOrd, Ord, Eq, )] #[diesel(sql_type = sql_types::BigInt)] pub struct SagaNodeId(pub steno::SagaNodeId); @@ -181,7 +181,7 @@ impl FromSql for SagaCachedState { } /// Represents a row in the "Saga" table -#[derive(Queryable, Insertable, Clone, Debug, Selectable)] +#[derive(Queryable, Insertable, Clone, Debug, Selectable, PartialEq)] #[diesel(table_name = saga)] pub struct Saga { pub id: SagaId, @@ -222,7 +222,7 @@ impl Saga { } /// Represents a row in the "SagaNodeEvent" table -#[derive(Queryable, Insertable, Clone, Debug, Selectable)] +#[derive(Queryable, Insertable, Clone, Debug, Selectable, PartialEq)] #[diesel(table_name = saga_node_event)] pub struct SagaNodeEvent { pub saga_id: SagaId, diff --git a/nexus/db-queries/src/db/datastore/saga.rs b/nexus/db-queries/src/db/datastore/saga.rs index 2f748de5b3..c42d14d0d7 100644 --- a/nexus/db-queries/src/db/datastore/saga.rs +++ b/nexus/db-queries/src/db/datastore/saga.rs @@ -9,17 +9,13 @@ use crate::db; use crate::db::error::public_error_from_diesel; use crate::db::error::ErrorHandler; use crate::db::model::Generation; -use crate::db::pagination::paginated; use crate::db::update_and_check::UpdateAndCheck; use crate::db::update_and_check::UpdateStatus; use async_bb8_diesel::AsyncRunQueryDsl; use diesel::prelude::*; -use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; -use omicron_common::api::external::ListResultVec; use omicron_common::api::external::LookupType; use omicron_common::api::external::ResourceType; -use uuid::Uuid; impl DataStore { pub async fn saga_create( @@ -103,54 +99,4 @@ impl DataStore { )), } } - - pub async fn saga_list_unfinished_by_id( - &self, - sec_id: &db::SecId, - pagparams: &DataPageParams<'_, Uuid>, - ) -> ListResultVec { - use db::schema::saga::dsl; - paginated(dsl::saga, dsl::id, &pagparams) - .filter(dsl::saga_state.ne(db::saga_types::SagaCachedState( - steno::SagaCachedState::Done, - ))) - .filter(dsl::current_sec.eq(*sec_id)) - .load_async(&*self.pool_connection_unauthorized().await?) - .await - .map_err(|e| { - public_error_from_diesel( - e, - ErrorHandler::NotFoundByLookup( - ResourceType::SagaDbg, - LookupType::ById(sec_id.0), - ), - ) - }) - } - - pub async fn saga_node_event_list_by_id( - &self, - id: db::saga_types::SagaId, - pagparams: &DataPageParams<'_, db::saga_types::SagaNodeId>, - ) -> ListResultVec { - use db::schema::saga_node_event::dsl; - paginated(dsl::saga_node_event, dsl::node_id, &pagparams) - .filter(dsl::saga_id.eq(id)) - .load_async::( - &*self.pool_connection_unauthorized().await?, - ) - .await - .map_err(|e| { - public_error_from_diesel( - e, - ErrorHandler::NotFoundByLookup( - ResourceType::SagaDbg, - LookupType::ById(id.0 .0), - ), - ) - })? - .into_iter() - .map(|db_event| steno::SagaNodeEvent::try_from(db_event)) - .collect::>() - } } diff --git a/nexus/db-queries/src/db/saga_recovery.rs b/nexus/db-queries/src/db/saga_recovery.rs index 171e594071..eb2003508c 100644 --- a/nexus/db-queries/src/db/saga_recovery.rs +++ b/nexus/db-queries/src/db/saga_recovery.rs @@ -6,13 +6,22 @@ use crate::context::OpContext; use crate::db; +use crate::db::error::public_error_from_diesel; +use crate::db::error::ErrorHandler; +use crate::db::pagination::{paginated, paginated_multicolumn, Paginator}; +use async_bb8_diesel::AsyncRunQueryDsl; +use diesel::prelude::*; +use diesel::ExpressionMethods; +use diesel::SelectableHelper; use futures::{future::BoxFuture, TryFutureExt}; -use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; +use omicron_common::api::external::LookupType; +use omicron_common::api::external::ResourceType; use omicron_common::backoff::retry_notify; use omicron_common::backoff::retry_policy_internal_service; use omicron_common::backoff::BackoffError; use std::future::Future; +use std::num::NonZeroU32; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -162,14 +171,10 @@ where })) } -// Creates new page params for querying sagas. -fn new_page_params(marker: Option<&T>) -> DataPageParams<'_, T> { - DataPageParams { - marker, - direction: dropshot::PaginationOrder::Ascending, - limit: std::num::NonZeroU32::new(100).unwrap(), - } -} +const BATCH_SIZE: NonZeroU32 = unsafe { + // Safety: 100 is more than zero + NonZeroU32::new_unchecked(100) +}; /// Queries the database to return a list of uncompleted sagas assigned to SEC /// `sec_id` @@ -193,17 +198,32 @@ async fn list_unfinished_sagas( // Although we could read them all into memory simultaneously, this // risks blocking the DB for an unreasonable amount of time. Instead, // we paginate to avoid cutting off availability to the DB. - let mut last_id = None; let mut sagas = vec![]; - loop { - let pagparams = new_page_params(last_id.as_ref()); - let mut some_sagas = - datastore.saga_list_unfinished_by_id(sec_id, &pagparams).await?; - if some_sagas.is_empty() { - break; - } - sagas.append(&mut some_sagas); - last_id = Some(sagas.last().as_ref().unwrap().id.0 .0); + let mut paginator = Paginator::new(BATCH_SIZE); + let conn = datastore.pool_connection_unauthorized().await?; + while let Some(p) = paginator.next() { + use db::schema::saga::dsl; + + let mut batch = paginated(dsl::saga, dsl::id, &p.current_pagparams()) + .filter(dsl::saga_state.ne(db::saga_types::SagaCachedState( + steno::SagaCachedState::Done, + ))) + .filter(dsl::current_sec.eq(*sec_id)) + .select(db::saga_types::Saga::as_select()) + .load_async(&*conn) + .await + .map_err(|e| { + public_error_from_diesel( + e, + ErrorHandler::NotFoundByLookup( + ResourceType::SagaDbg, + LookupType::ById(sec_id.0), + ), + ) + })?; + + paginator = p.found_batch(&batch, &|row| row.id); + sagas.append(&mut batch); } Ok(sagas) } @@ -283,17 +303,38 @@ async fn load_saga_log( // Although we could read them all into memory simultaneously, this // risks blocking the DB for an unreasonable amount of time. Instead, // we paginate to avoid cutting off availability. - let mut last_id = None; let mut events = vec![]; - loop { - let pagparams = new_page_params(last_id.as_ref()); - let mut some_events = - datastore.saga_node_event_list_by_id(saga.id, &pagparams).await?; - if some_events.is_empty() { - break; - } - events.append(&mut some_events); - last_id = Some(events.last().as_ref().unwrap().node_id.into()); + let mut paginator = Paginator::new(BATCH_SIZE); + let conn = datastore.pool_connection_unauthorized().await?; + while let Some(p) = paginator.next() { + use db::schema::saga_node_event::dsl; + let batch = paginated_multicolumn( + dsl::saga_node_event, + (dsl::node_id, dsl::event_type), + &p.current_pagparams(), + ) + .filter(dsl::saga_id.eq(saga.id)) + .select(db::saga_types::SagaNodeEvent::as_select()) + .load_async(&*conn) + .map_err(|e| { + public_error_from_diesel( + e, + ErrorHandler::NotFoundByLookup( + ResourceType::SagaDbg, + LookupType::ById(saga.id.0 .0), + ), + ) + }) + .await?; + paginator = + p.found_batch(&batch, &|row| (row.node_id, row.event_type.clone())); + + let mut batch = batch + .into_iter() + .map(|event| steno::SagaNodeEvent::try_from(event)) + .collect::, Error>>()?; + + events.append(&mut batch); } Ok(events) } @@ -306,6 +347,8 @@ mod test { use nexus_test_utils::db::test_setup_database; use omicron_test_utils::dev; use once_cell::sync::Lazy; + use pretty_assertions::assert_eq; + use rand::seq::SliceRandom; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use steno::{ new_action_noop_undo, Action, ActionContext, ActionError, @@ -573,4 +616,165 @@ mod test { db.cleanup().await.unwrap(); logctx.cleanup_successful(); } + + #[tokio::test] + async fn test_list_unfinished_sagas() { + // Test setup + let logctx = dev::test_setup_log("test_list_unfinished_sagas"); + let log = logctx.log.new(o!()); + let (mut db, db_datastore) = new_db(&log).await; + let sec_id = db::SecId(uuid::Uuid::new_v4()); + let opctx = OpContext::for_tests( + log, + Arc::clone(&db_datastore) as Arc, + ); + + // Create a couple batches of sagas. + let new_running_db_saga = || { + let params = steno::SagaCreateParams { + id: steno::SagaId(Uuid::new_v4()), + name: steno::SagaName::new("test saga"), + dag: serde_json::value::Value::Null, + state: steno::SagaCachedState::Running, + }; + + db::model::saga_types::Saga::new(sec_id, params) + }; + let mut inserted_sagas = (0..BATCH_SIZE.get() * 2) + .map(|_| new_running_db_saga()) + .collect::>(); + + // Shuffle these sagas into a random order to check that the pagination + // order is working as intended on the read path, which we'll do later + // in this test. + inserted_sagas.shuffle(&mut rand::thread_rng()); + + // Insert the batches of unfinished sagas into the database + let conn = db_datastore + .pool_connection_unauthorized() + .await + .expect("Failed to access db connection"); + diesel::insert_into(db::schema::saga::dsl::saga) + .values(inserted_sagas.clone()) + .execute_async(&*conn) + .await + .expect("Failed to insert test setup data"); + + // List them, expect to see them all in order by ID. + let mut observed_sagas = + list_unfinished_sagas(&opctx, &db_datastore, &sec_id) + .await + .expect("Failed to list unfinished sagas"); + inserted_sagas.sort_by_key(|a| a.id); + + // Timestamps can change slightly when we insert them. + // + // Sanitize them to make input/output equality checks easier. + let sanitize_timestamps = |sagas: &mut Vec| { + for saga in sagas { + saga.time_created = chrono::DateTime::UNIX_EPOCH; + saga.adopt_time = chrono::DateTime::UNIX_EPOCH; + } + }; + sanitize_timestamps(&mut observed_sagas); + sanitize_timestamps(&mut inserted_sagas); + + assert_eq!( + inserted_sagas, observed_sagas, + "Observed sagas did not match inserted sagas" + ); + + // Test cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_list_unfinished_nodes() { + // Test setup + let logctx = dev::test_setup_log("test_list_unfinished_nodes"); + let log = logctx.log.new(o!()); + let (mut db, db_datastore) = new_db(&log).await; + let sec_id = db::SecId(uuid::Uuid::new_v4()); + let saga_id = steno::SagaId(Uuid::new_v4()); + + // Create a couple batches of saga events + let new_db_saga_nodes = + |node_id: u32, event_type: steno::SagaNodeEventType| { + let event = steno::SagaNodeEvent { + saga_id, + node_id: steno::SagaNodeId::from(node_id), + event_type, + }; + + db::model::saga_types::SagaNodeEvent::new(event, sec_id) + }; + let mut inserted_nodes = (0..BATCH_SIZE.get() * 2) + .flat_map(|i| { + // This isn't an exhaustive list of event types, but gives us a few + // options to pick from. Since this is a pagination key, it's + // important to include a variety here. + use steno::SagaNodeEventType::*; + [ + new_db_saga_nodes(i, Started), + new_db_saga_nodes(i, UndoStarted), + new_db_saga_nodes(i, UndoFinished), + ] + }) + .collect::>(); + + // Shuffle these nodes into a random order to check that the pagination + // order is working as intended on the read path, which we'll do later + // in this test. + inserted_nodes.shuffle(&mut rand::thread_rng()); + + // Insert them into the database + let conn = db_datastore + .pool_connection_unauthorized() + .await + .expect("Failed to access db connection"); + diesel::insert_into(db::schema::saga_node_event::dsl::saga_node_event) + .values(inserted_nodes.clone()) + .execute_async(&*conn) + .await + .expect("Failed to insert test setup data"); + + // List them, expect to see them all in order by ID. + // + // Note that we need to make up a saga to see this, but the + // part of it that actually matters is the ID. + let params = steno::SagaCreateParams { + id: saga_id, + name: steno::SagaName::new("test saga"), + dag: serde_json::value::Value::Null, + state: steno::SagaCachedState::Running, + }; + let saga = db::model::saga_types::Saga::new(sec_id, params); + let observed_nodes = load_saga_log(&db_datastore, &saga) + .await + .expect("Failed to list unfinished nodes"); + inserted_nodes.sort_by_key(|a| (a.node_id, a.event_type.clone())); + + let inserted_nodes = inserted_nodes + .into_iter() + .map(|node| steno::SagaNodeEvent::try_from(node)) + .collect::, _>>() + .expect("Couldn't convert DB nodes to steno nodes"); + + // The steno::SagaNodeEvent type doesn't implement PartialEq, so we need to do this + // a little manually. + assert_eq!(inserted_nodes.len(), observed_nodes.len()); + for i in 0..inserted_nodes.len() { + assert_eq!(inserted_nodes[i].saga_id, observed_nodes[i].saga_id); + assert_eq!(inserted_nodes[i].node_id, observed_nodes[i].node_id); + assert_eq!( + inserted_nodes[i].event_type.label(), + observed_nodes[i].event_type.label() + ); + } + + // Test cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } }