Skip to content

Commit

Permalink
remove need for saga channel for background tasks (#5964)
Browse files Browse the repository at this point in the history
  • Loading branch information
davepacheco authored Jul 3, 2024
1 parent 6179905 commit 991c195
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 368 deletions.
82 changes: 0 additions & 82 deletions nexus/src/app/background/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,6 @@ mod test {
use super::Driver;
use crate::app::background::driver::TaskDefinition;
use crate::app::background::Activator;
use crate::app::sagas::SagaRequest;
use assert_matches::assert_matches;
use chrono::Utc;
use futures::future::BoxFuture;
Expand All @@ -448,7 +447,6 @@ mod test {
use std::time::Instant;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::Sender;
use tokio::sync::watch;

type ControlPlaneTestContext =
Expand Down Expand Up @@ -808,84 +806,4 @@ mod test {
// such a task that would allow us to reliably distinguish between these
// two without also spending a lot of wall-clock time on this test.
}

/// Simple BackgroundTask impl that sends a test-only SagaRequest
struct SagaRequestTask {
saga_request: Sender<SagaRequest>,
}

impl SagaRequestTask {
fn new(saga_request: Sender<SagaRequest>) -> SagaRequestTask {
SagaRequestTask { saga_request }
}
}

impl BackgroundTask for SagaRequestTask {
fn activate<'a>(
&'a mut self,
_: &'a OpContext,
) -> BoxFuture<'a, serde_json::Value> {
async {
let _ = self.saga_request.send(SagaRequest::TestOnly).await;
serde_json::Value::Null
}
.boxed()
}
}

#[nexus_test(server = crate::Server)]
async fn test_saga_request_flow(cptestctx: &ControlPlaneTestContext) {
let nexus = &cptestctx.server.server_context().nexus;
let datastore = nexus.datastore();
let opctx = OpContext::for_tests(
cptestctx.logctx.log.clone(),
datastore.clone(),
);

let (saga_request, mut saga_request_recv) = SagaRequest::channel();
let t1 = SagaRequestTask::new(saga_request);

let mut driver = Driver::new();
let (_dep_tx1, dep_rx1) = watch::channel(0);
let act1 = Activator::new();

let h1 = driver.register(TaskDefinition {
name: "t1",
description: "test saga request flow task",
period: Duration::from_secs(300), // should not fire in this test
task_impl: Box::new(t1),
opctx: opctx.child(std::collections::BTreeMap::new()),
watchers: vec![Box::new(dep_rx1.clone())],
activator: &act1,
});

assert!(matches!(
saga_request_recv.try_recv(),
Err(mpsc::error::TryRecvError::Empty),
));

driver.activate(&h1);

// wait 1 second for the saga request to arrive
tokio::select! {
_ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => {
assert!(false);
}

saga_request = saga_request_recv.recv() => {
match saga_request {
None => {
assert!(false);
}

Some(saga_request) => {
assert!(matches!(
saga_request,
SagaRequest::TestOnly,
));
}
}
}
}
}
}
41 changes: 36 additions & 5 deletions nexus/src/app/background/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ use super::tasks::vpc_routes;
use super::Activator;
use super::Driver;
use crate::app::oximeter::PRODUCER_LEASE_DURATION;
use crate::app::sagas::SagaRequest;
use crate::app::saga::StartSaga;
use nexus_config::BackgroundTaskConfig;
use nexus_config::DnsTasksConfig;
use nexus_db_model::DnsGroup;
Expand All @@ -122,7 +122,6 @@ use nexus_db_queries::db::DataStore;
use oximeter::types::ProducerRegistry;
use std::collections::BTreeMap;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::watch;
use uuid::Uuid;

Expand Down Expand Up @@ -254,7 +253,7 @@ impl BackgroundTasksInitializer {
rack_id: Uuid,
nexus_id: Uuid,
resolver: internal_dns::resolver::Resolver,
saga_request: Sender<SagaRequest>,
sagas: Arc<dyn StartSaga>,
producer_registry: ProducerRegistry,
) -> Driver {
let mut driver = self.driver;
Expand Down Expand Up @@ -548,7 +547,7 @@ impl BackgroundTasksInitializer {
{
let detector = region_replacement::RegionReplacementDetector::new(
datastore.clone(),
saga_request.clone(),
sagas.clone(),
);

driver.register(TaskDefinition {
Expand All @@ -569,7 +568,7 @@ impl BackgroundTasksInitializer {
let detector =
region_replacement_driver::RegionReplacementDriver::new(
datastore.clone(),
saga_request.clone(),
sagas.clone(),
);

driver.register(TaskDefinition {
Expand Down Expand Up @@ -731,7 +730,9 @@ fn init_dns(

#[cfg(test)]
pub mod test {
use crate::app::saga::StartSaga;
use dropshot::HandlerTaskMode;
use futures::FutureExt;
use nexus_db_model::DnsGroup;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db::datastore::DnsVersionUpdateBuilder;
Expand All @@ -740,9 +741,39 @@ pub mod test {
use nexus_types::internal_api::params as nexus_params;
use omicron_test_utils::dev::poll;
use std::net::SocketAddr;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use tempfile::TempDir;

/// Used by various tests of tasks that kick off sagas
pub(crate) struct NoopStartSaga {
count: AtomicU64,
}

impl NoopStartSaga {
pub(crate) fn new() -> Self {
Self { count: AtomicU64::new(0) }
}

pub(crate) fn count_reset(&self) -> u64 {
self.count.swap(0, Ordering::SeqCst)
}
}

impl StartSaga for NoopStartSaga {
fn saga_start(
&self,
_: steno::SagaDag,
) -> futures::prelude::future::BoxFuture<
'_,
Result<(), omicron_common::api::external::Error>,
> {
let _ = self.count.fetch_add(1, Ordering::SeqCst);
async { Ok(()) }.boxed()
}
}

type ControlPlaneTestContext =
nexus_test_utils::ControlPlaneTestContext<crate::Server>;

Expand Down
41 changes: 18 additions & 23 deletions nexus/src/app/background/tasks/region_replacement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
use crate::app::authn;
use crate::app::background::BackgroundTask;
use crate::app::saga::StartSaga;
use crate::app::sagas;
use crate::app::sagas::region_replacement_start::SagaRegionReplacementStart;
use crate::app::sagas::NexusSaga;
use crate::app::RegionAllocationStrategy;
use futures::future::BoxFuture;
use futures::FutureExt;
Expand All @@ -23,39 +26,31 @@ use omicron_uuid_kinds::GenericUuid;
use omicron_uuid_kinds::TypedUuid;
use serde_json::json;
use std::sync::Arc;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::Sender;

pub struct RegionReplacementDetector {
datastore: Arc<DataStore>,
saga_request: Sender<sagas::SagaRequest>,
sagas: Arc<dyn StartSaga>,
}

impl RegionReplacementDetector {
pub fn new(
datastore: Arc<DataStore>,
saga_request: Sender<sagas::SagaRequest>,
) -> Self {
RegionReplacementDetector { datastore, saga_request }
pub fn new(datastore: Arc<DataStore>, sagas: Arc<dyn StartSaga>) -> Self {
RegionReplacementDetector { datastore, sagas }
}

async fn send_start_request(
&self,
serialized_authn: authn::saga::Serialized,
request: RegionReplacement,
) -> Result<(), SendError<sagas::SagaRequest>> {
let saga_request = sagas::SagaRequest::RegionReplacementStart {
params: sagas::region_replacement_start::Params {
serialized_authn,
request,
allocation_strategy:
RegionAllocationStrategy::RandomWithDistinctSleds {
seed: None,
},
},
) -> Result<(), omicron_common::api::external::Error> {
let params = sagas::region_replacement_start::Params {
serialized_authn,
request,
allocation_strategy:
RegionAllocationStrategy::RandomWithDistinctSleds { seed: None },
};

self.saga_request.send(saga_request).await
let saga_dag = SagaRegionReplacementStart::prepare(&params)?;
self.sagas.saga_start(saga_dag).await
}
}

Expand Down Expand Up @@ -201,9 +196,9 @@ impl BackgroundTask for RegionReplacementDetector {
#[cfg(test)]
mod test {
use super::*;
use crate::app::background::init::test::NoopStartSaga;
use nexus_db_model::RegionReplacement;
use nexus_test_utils_macros::nexus_test;
use tokio::sync::mpsc;
use uuid::Uuid;

type ControlPlaneTestContext =
Expand All @@ -220,9 +215,9 @@ mod test {
datastore.clone(),
);

let (saga_request_tx, mut saga_request_rx) = mpsc::channel(1);
let starter = Arc::new(NoopStartSaga::new());
let mut task =
RegionReplacementDetector::new(datastore.clone(), saga_request_tx);
RegionReplacementDetector::new(datastore.clone(), starter.clone());

// Noop test
let result = task.activate(&opctx).await;
Expand Down Expand Up @@ -253,6 +248,6 @@ mod test {
})
);

saga_request_rx.try_recv().unwrap();
assert_eq!(starter.count_reset(), 1);
}
}
Loading

0 comments on commit 991c195

Please sign in to comment.