diff --git a/CHANGELOG.md b/CHANGELOG.md index b2c845c94..9bf50b6d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ Recommendation: for ease of reading, use the following order: ## [Unreleased] ### Fixed - GQL api flows queries now fetch dataset polling source only once per dataset(and only if Ingest flow type is here) +- Flow trigger status now become disable on flow fail ## [0.216.0] - 2024-12-30 ### Changed diff --git a/src/domain/flow-system/services/src/flow/flow_agent_impl.rs b/src/domain/flow-system/services/src/flow/flow_agent_impl.rs index 99319587d..7976ef838 100644 --- a/src/domain/flow-system/services/src/flow/flow_agent_impl.rs +++ b/src/domain/flow-system/services/src/flow/flow_agent_impl.rs @@ -590,6 +590,16 @@ impl MessageConsumerT for FlowAgentImpl { .await?; } + // In case of failure: + // - disable trigger + if message.outcome.is_failed() { + let flow_trigger_service = + target_catalog.get_one::().unwrap(); + flow_trigger_service + .pause_flow_trigger(finish_time, flow.flow_key.clone()) + .await?; + } + let outbox = target_catalog.get_one::().unwrap(); outbox .post_message( diff --git a/src/domain/flow-system/services/tests/tests/test_flow_agent_impl.rs b/src/domain/flow-system/services/tests/tests/test_flow_agent_impl.rs index 2dd62fa6d..7dec4dee3 100644 --- a/src/domain/flow-system/services/tests/tests/test_flow_agent_impl.rs +++ b/src/domain/flow-system/services/tests/tests/test_flow_agent_impl.rs @@ -6258,5 +6258,148 @@ async fn test_respect_last_success_time_when_activate_configuration() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +#[test_log::test(tokio::test)] +async fn test_disable_trigger_on_flow_fail() { + let harness = FlowHarness::new().await; + + // Create a "foo" root dataset, and configure ingestion schedule every 60ms + let foo_create_result = harness + .create_root_dataset(DatasetAlias { + dataset_name: DatasetName::new_unchecked("foo"), + account_name: None, + }) + .await; + let foo_id = foo_create_result.dataset_handle.id; + + harness + .set_dataset_flow_ingest( + foo_id.clone(), + DatasetFlowType::Ingest, + IngestRule { + fetch_uncacheable: false, + }, + ) + .await; + + let trigger_rule = FlowTriggerRule::Schedule(Duration::milliseconds(60).into()); + + harness + .set_dataset_flow_trigger( + harness.now_datetime(), + foo_id.clone(), + DatasetFlowType::Ingest, + trigger_rule.clone(), + ) + .await; + harness.eager_initialization().await; + + // Run scheduler concurrently with manual triggers script + tokio::select! { + // Run API service + res = harness.flow_agent.run() => res.int_err(), + + // Run simulation script and task drivers + _ = async { + // Task 0: start running at 10ms, finish at 20ms + let foo_task0_driver = harness.task_driver(TaskDriverArgs { + task_id: TaskID::new(0), + task_metadata: TaskMetadata::from(vec![(METADATA_TASK_FLOW_ID, "0")]), + dataset_id: Some(foo_id.clone()), + run_since_start: Duration::milliseconds(10), + finish_in_with: Some((Duration::milliseconds(10), TaskOutcome::Success(TaskResult::Empty))), + expected_logical_plan: LogicalPlan::UpdateDataset(LogicalPlanUpdateDataset { + dataset_id: foo_id.clone(), + fetch_uncacheable: false + }), + }); + let foo_task0_handle = foo_task0_driver.run(); + + // Task 1: start running at 90ms, finish at 100ms + let foo_task1_driver = harness.task_driver(TaskDriverArgs { + task_id: TaskID::new(1), + task_metadata: TaskMetadata::from(vec![(METADATA_TASK_FLOW_ID, "1")]), + dataset_id: Some(foo_id.clone()), + run_since_start: Duration::milliseconds(90), + finish_in_with: Some((Duration::milliseconds(10), TaskOutcome::Failed(TaskError::Empty))), + expected_logical_plan: LogicalPlan::UpdateDataset(LogicalPlanUpdateDataset { + dataset_id: foo_id.clone(), + fetch_uncacheable: false + }), + }); + let foo_task1_handle = foo_task1_driver.run(); + + // Main simulation boundary - 120ms total + // - "foo" should immediately schedule "task 0", since "foo" has never run yet + // - "task 0" will take action and complete, this will schedule the next flow + // run for "foo" after full period + // - when that period is over, "task 1" should be scheduled + // - "task 1" will take action and complete + let sim_handle = harness.advance_time(Duration::milliseconds(120)); + tokio::join!(foo_task0_handle, foo_task1_handle, sim_handle) + } => Ok(()) + } + .unwrap(); + + let test_flow_listener = harness.catalog.get_one::().unwrap(); + test_flow_listener.define_dataset_display_name(foo_id.clone(), "foo".to_string()); + + pretty_assertions::assert_eq!( + format!("{}", test_flow_listener.as_ref()), + indoc::indoc!( + r#" + #0: +0ms: + "foo" Ingest: + Flow ID = 0 Waiting AutoPolling + + #1: +0ms: + "foo" Ingest: + Flow ID = 0 Waiting AutoPolling Executor(task=0, since=0ms) + + #2: +10ms: + "foo" Ingest: + Flow ID = 0 Running(task=0) + + #3: +20ms: + "foo" Ingest: + Flow ID = 1 Waiting AutoPolling Schedule(wakeup=80ms) + Flow ID = 0 Finished Success + + #4: +80ms: + "foo" Ingest: + Flow ID = 1 Waiting AutoPolling Executor(task=1, since=80ms) + Flow ID = 0 Finished Success + + #5: +90ms: + "foo" Ingest: + Flow ID = 1 Running(task=1) + Flow ID = 0 Finished Success + + #6: +100ms: + "foo" Ingest: + Flow ID = 1 Finished Failed + Flow ID = 0 Finished Success + + "# + ) + ); + + let flow_key = FlowKey::Dataset(FlowKeyDataset::new(foo_id.clone(), DatasetFlowType::Ingest)); + let current_trigger = harness + .flow_trigger_service + .find_trigger(flow_key.clone()) + .await + .unwrap(); + assert_eq!( + current_trigger, + Some(FlowTriggerState { + flow_key: flow_key.clone(), + status: FlowTriggerStatus::PausedTemporarily, + rule: trigger_rule, + }) + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // TODO next: // - derived more than 1 level diff --git a/src/domain/task-system/domain/src/entities/task_status.rs b/src/domain/task-system/domain/src/entities/task_status.rs index 5a68b0d19..5c737c678 100644 --- a/src/domain/task-system/domain/src/entities/task_status.rs +++ b/src/domain/task-system/domain/src/entities/task_status.rs @@ -42,6 +42,10 @@ impl TaskOutcome { pub fn is_success(&self) -> bool { matches!(self, Self::Success(_)) } + + pub fn is_failed(&self) -> bool { + matches!(self, Self::Failed(_)) + } } ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////