Skip to content

Commit

Permalink
Retrieve dataset event created through RESTful API when creating dag …
Browse files Browse the repository at this point in the history
…run (apache#38332)

Fetch dataset events generated via the RESTful API during the 
creation of DAG runs. Currently, dataset events produced via 
the RESTful API are overlooked because there is no 
'source_dag_run' attribute. Furthermore, take into account the 
timestamps from dataset events created through the RESTful API 
when calculating data intervals.
  • Loading branch information
Lee-W authored Mar 21, 2024
1 parent a0c1985 commit a1671f1
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
1 change: 0 additions & 1 deletion airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1270,7 +1270,6 @@ def _create_dag_runs_dataset_triggered(
DagScheduleDatasetReference,
DatasetEvent.dataset_id == DagScheduleDatasetReference.dataset_id,
)
.join(DatasetEvent.source_dag_run)
.where(*dataset_event_filters)
).all()

Expand Down
18 changes: 11 additions & 7 deletions airflow/timetables/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# under the License.
from __future__ import annotations

import operator
from typing import TYPE_CHECKING, Any, Collection, Sequence

from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
Expand Down Expand Up @@ -183,12 +182,17 @@ def data_interval_for_events(
if not events:
return DataInterval(logical_date, logical_date)

start = min(
events, key=operator.attrgetter("source_dag_run.data_interval_start")
).source_dag_run.data_interval_start
end = max(
events, key=operator.attrgetter("source_dag_run.data_interval_end")
).source_dag_run.data_interval_end
start_dates, end_dates = [], []
for event in events:
if event.source_dag_run is not None:
start_dates.append(event.source_dag_run.data_interval_start)
end_dates.append(event.source_dag_run.data_interval_end)
else:
start_dates.append(event.timestamp)
end_dates.append(event.timestamp)

start = min(start_dates)
end = max(end_dates)
return DataInterval(start, end)

def next_dagrun_info(
Expand Down

0 comments on commit a1671f1

Please sign in to comment.