Skip to content

Commit

Permalink
Update trigger status on flow fail
Browse files Browse the repository at this point in the history
  • Loading branch information
rmn-boiko authored and zaychenko-sergei committed Jan 8, 2025
1 parent 8f2aa8e commit 7059364
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Recommendation: for ease of reading, use the following order:
## [Unreleased]
### Fixed
- GQL api flows queries now fetch dataset polling source only once per dataset(and only if Ingest flow type is here)
- Flow trigger status now become disable on flow fail

## [0.216.0] - 2024-12-30
### Changed
Expand Down
10 changes: 10 additions & 0 deletions src/domain/flow-system/services/src/flow/flow_agent_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,16 @@ impl MessageConsumerT<TaskProgressMessage> for FlowAgentImpl {
.await?;
}

// In case of failure:
// - disable trigger
if message.outcome.is_failed() {
let flow_trigger_service =
target_catalog.get_one::<dyn FlowTriggerService>().unwrap();
flow_trigger_service
.pause_flow_trigger(finish_time, flow.flow_key.clone())
.await?;
}

let outbox = target_catalog.get_one::<dyn Outbox>().unwrap();
outbox
.post_message(
Expand Down
143 changes: 143 additions & 0 deletions src/domain/flow-system/services/tests/tests/test_flow_agent_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6258,5 +6258,148 @@ async fn test_respect_last_success_time_when_activate_configuration() {

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[test_log::test(tokio::test)]
async fn test_disable_trigger_on_flow_fail() {
let harness = FlowHarness::new().await;

// Create a "foo" root dataset, and configure ingestion schedule every 60ms
let foo_create_result = harness
.create_root_dataset(DatasetAlias {
dataset_name: DatasetName::new_unchecked("foo"),
account_name: None,
})
.await;
let foo_id = foo_create_result.dataset_handle.id;

harness
.set_dataset_flow_ingest(
foo_id.clone(),
DatasetFlowType::Ingest,
IngestRule {
fetch_uncacheable: false,
},
)
.await;

let trigger_rule = FlowTriggerRule::Schedule(Duration::milliseconds(60).into());

harness
.set_dataset_flow_trigger(
harness.now_datetime(),
foo_id.clone(),
DatasetFlowType::Ingest,
trigger_rule.clone(),
)
.await;
harness.eager_initialization().await;

// Run scheduler concurrently with manual triggers script
tokio::select! {
// Run API service
res = harness.flow_agent.run() => res.int_err(),

// Run simulation script and task drivers
_ = async {
// Task 0: start running at 10ms, finish at 20ms
let foo_task0_driver = harness.task_driver(TaskDriverArgs {
task_id: TaskID::new(0),
task_metadata: TaskMetadata::from(vec![(METADATA_TASK_FLOW_ID, "0")]),
dataset_id: Some(foo_id.clone()),
run_since_start: Duration::milliseconds(10),
finish_in_with: Some((Duration::milliseconds(10), TaskOutcome::Success(TaskResult::Empty))),
expected_logical_plan: LogicalPlan::UpdateDataset(LogicalPlanUpdateDataset {
dataset_id: foo_id.clone(),
fetch_uncacheable: false
}),
});
let foo_task0_handle = foo_task0_driver.run();

// Task 1: start running at 90ms, finish at 100ms
let foo_task1_driver = harness.task_driver(TaskDriverArgs {
task_id: TaskID::new(1),
task_metadata: TaskMetadata::from(vec![(METADATA_TASK_FLOW_ID, "1")]),
dataset_id: Some(foo_id.clone()),
run_since_start: Duration::milliseconds(90),
finish_in_with: Some((Duration::milliseconds(10), TaskOutcome::Failed(TaskError::Empty))),
expected_logical_plan: LogicalPlan::UpdateDataset(LogicalPlanUpdateDataset {
dataset_id: foo_id.clone(),
fetch_uncacheable: false
}),
});
let foo_task1_handle = foo_task1_driver.run();

// Main simulation boundary - 120ms total
// - "foo" should immediately schedule "task 0", since "foo" has never run yet
// - "task 0" will take action and complete, this will schedule the next flow
// run for "foo" after full period
// - when that period is over, "task 1" should be scheduled
// - "task 1" will take action and complete
let sim_handle = harness.advance_time(Duration::milliseconds(120));
tokio::join!(foo_task0_handle, foo_task1_handle, sim_handle)
} => Ok(())
}
.unwrap();

let test_flow_listener = harness.catalog.get_one::<FlowSystemTestListener>().unwrap();
test_flow_listener.define_dataset_display_name(foo_id.clone(), "foo".to_string());

pretty_assertions::assert_eq!(
format!("{}", test_flow_listener.as_ref()),
indoc::indoc!(
r#"
#0: +0ms:
"foo" Ingest:
Flow ID = 0 Waiting AutoPolling
#1: +0ms:
"foo" Ingest:
Flow ID = 0 Waiting AutoPolling Executor(task=0, since=0ms)
#2: +10ms:
"foo" Ingest:
Flow ID = 0 Running(task=0)
#3: +20ms:
"foo" Ingest:
Flow ID = 1 Waiting AutoPolling Schedule(wakeup=80ms)
Flow ID = 0 Finished Success
#4: +80ms:
"foo" Ingest:
Flow ID = 1 Waiting AutoPolling Executor(task=1, since=80ms)
Flow ID = 0 Finished Success
#5: +90ms:
"foo" Ingest:
Flow ID = 1 Running(task=1)
Flow ID = 0 Finished Success
#6: +100ms:
"foo" Ingest:
Flow ID = 1 Finished Failed
Flow ID = 0 Finished Success
"#
)
);

let flow_key = FlowKey::Dataset(FlowKeyDataset::new(foo_id.clone(), DatasetFlowType::Ingest));
let current_trigger = harness
.flow_trigger_service
.find_trigger(flow_key.clone())
.await
.unwrap();
assert_eq!(
current_trigger,
Some(FlowTriggerState {
flow_key: flow_key.clone(),
status: FlowTriggerStatus::PausedTemporarily,
rule: trigger_rule,
})
);
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// TODO next:
// - derived more than 1 level
4 changes: 4 additions & 0 deletions src/domain/task-system/domain/src/entities/task_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ impl TaskOutcome {
pub fn is_success(&self) -> bool {
matches!(self, Self::Success(_))
}

pub fn is_failed(&self) -> bool {
matches!(self, Self::Failed(_))
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit 7059364

Please sign in to comment.