Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
Merge pull request #91 from DonHaul/restart-workflow-data-loss
Browse files Browse the repository at this point in the history
restart actions: on restart use data from previous run
  • Loading branch information
DonHaul authored Sep 9, 2024
2 parents 6bef194 + 346f1c1 commit 90b8d7c
Show file tree
Hide file tree
Showing 8 changed files with 711 additions and 157 deletions.
22 changes: 17 additions & 5 deletions backoffice/backoffice/workflows/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,12 @@ def restart_workflow_dags(workflow_id, workflow_type, params=None):
:param params: parameters of new dag execution
:returns: request response
"""

data = fetch_data_workflow_dag(workflow_id, workflow_type)
delete_workflow_dag_runs(workflow_id, workflow_type)

return trigger_airflow_dag(
WORKFLOW_DAGS[workflow_type].initialize, str(workflow_id), params
WORKFLOW_DAGS[workflow_type].initialize, str(workflow_id), params or data
)


Expand All @@ -171,9 +173,19 @@ def delete_workflow_dag_runs(workflow_id, workflow_type):
"""
executed_dags_for_workflow = find_executed_dags(workflow_id, workflow_type)

for dag_id in executed_dags_for_workflow:
for dag_id, _ in executed_dags_for_workflow.items():
delete_workflow_dag(dag_id, str(workflow_id))

return JsonResponse(
data={"message": f"Dag runs for worfklow {workflow_id} have been deleted"}
)

def fetch_data_workflow_dag(workflow_id, workflow_type):
"""Fetches Data that the workflow ran with
:param workflow_id: workflow_id for dag to get data of
:param workflow_type: type of workflow
:returns: data workflow dags used
"""

executed_dags_for_workflow = find_executed_dags(workflow_id, workflow_type)

_, dag = next(iter(executed_dags_for_workflow.items()))
return dag["conf"].get("data")
1 change: 1 addition & 0 deletions backoffice/backoffice/workflows/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def restart(self, request, pk=None):
workflow.id, workflow.workflow_type
)

Decision.objects.filter(workflow=workflow).delete()
return airflow_utils.restart_workflow_dags(
workflow.id, workflow.workflow_type, request.data.get("params")
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
interactions:
- request:
body: '{"dag_run_id": "00000000-0000-0000-0000-000000000001", "conf": {"workflow_id":
"00000000-0000-0000-0000-000000000001", "data": {"test": "test"}}}'
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
Content-Length:
- '145'
Content-Type:
- application/json
method: POST
uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns
response:
body:
string: "{\n \"conf\": {\n \"data\": {\n \"test\": \"test\"\n },\n
\ \"workflow_id\": \"00000000-0000-0000-0000-000000000001\"\n },\n \"dag_id\":
\"author_create_initialization_dag\",\n \"dag_run_id\": \"00000000-0000-0000-0000-000000000001\",\n
\ \"data_interval_end\": \"2024-08-30T11:26:33.983555+00:00\",\n \"data_interval_start\":
\"2024-08-30T11:26:33.983555+00:00\",\n \"end_date\": null,\n \"execution_date\":
\"2024-08-30T11:26:33.983555+00:00\",\n \"external_trigger\": true,\n \"last_scheduling_decision\":
null,\n \"logical_date\": \"2024-08-30T11:26:33.983555+00:00\",\n \"note\":
null,\n \"run_type\": \"manual\",\n \"start_date\": null,\n \"state\":
\"queued\"\n}\n"
headers:
Cache-Control:
- no-store
Connection:
- close
Content-Length:
- '621'
Content-Type:
- application/json
Date:
- Fri, 30 Aug 2024 11:26:34 GMT
Server:
- gunicorn
X-Robots-Tag:
- noindex, nofollow
status:
code: 200
message: OK
- request:
body: null
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
method: GET
uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns/00000000-0000-0000-0000-000000000001
response:
body:
string: "{\n \"conf\": {\n \"data\": {\n \"test\": \"test\"\n },\n
\ \"workflow_id\": \"00000000-0000-0000-0000-000000000001\"\n },\n \"dag_id\":
\"author_create_initialization_dag\",\n \"dag_run_id\": \"00000000-0000-0000-0000-000000000001\",\n
\ \"data_interval_end\": \"2024-08-30T11:26:33.983555+00:00\",\n \"data_interval_start\":
\"2024-08-30T11:26:33.983555+00:00\",\n \"end_date\": \"2024-08-30T11:26:51.968680+00:00\",\n
\ \"execution_date\": \"2024-08-30T11:26:33.983555+00:00\",\n \"external_trigger\":
true,\n \"last_scheduling_decision\": \"2024-08-30T11:26:51.966580+00:00\",\n
\ \"logical_date\": \"2024-08-30T11:26:33.983555+00:00\",\n \"note\": null,\n
\ \"run_type\": \"manual\",\n \"start_date\": \"2024-08-30T11:26:34.088225+00:00\",\n
\ \"state\": \"failed\"\n}\n"
headers:
Cache-Control:
- no-store
Connection:
- close
Content-Length:
- '711'
Content-Type:
- application/json
Date:
- Fri, 30 Aug 2024 11:27:03 GMT
Server:
- gunicorn
X-Robots-Tag:
- noindex, nofollow
status:
code: 200
message: OK
- request:
body: null
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
method: GET
uri: http://airflow-webserver:8080/api/v1/dags/author_create_approved_dag/dagRuns/00000000-0000-0000-0000-000000000001
response:
body:
string: "{\n \"detail\": \"DAGRun with DAG ID: 'author_create_approved_dag'
and DagRun ID: '00000000-0000-0000-0000-000000000001' not found\",\n \"status\":
404,\n \"title\": \"DAGRun not found\",\n \"type\": \"https://airflow.apache.org/docs/apache-airflow/2.9.3/stable-rest-api-ref.html#section/Errors/NotFound\"\n}\n"
headers:
Cache-Control:
- no-store
Connection:
- close
Content-Length:
- '294'
Content-Type:
- application/problem+json
Date:
- Fri, 30 Aug 2024 11:27:03 GMT
Server:
- gunicorn
X-Robots-Tag:
- noindex, nofollow
status:
code: 404
message: NOT FOUND
- request:
body: null
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
method: GET
uri: http://airflow-webserver:8080/api/v1/dags/author_create_rejected_dag/dagRuns/00000000-0000-0000-0000-000000000001
response:
body:
string: "{\n \"detail\": \"DAGRun with DAG ID: 'author_create_rejected_dag'
and DagRun ID: '00000000-0000-0000-0000-000000000001' not found\",\n \"status\":
404,\n \"title\": \"DAGRun not found\",\n \"type\": \"https://airflow.apache.org/docs/apache-airflow/2.9.3/stable-rest-api-ref.html#section/Errors/NotFound\"\n}\n"
headers:
Cache-Control:
- no-store
Connection:
- close
Content-Length:
- '294'
Content-Type:
- application/problem+json
Date:
- Fri, 30 Aug 2024 11:27:03 GMT
Server:
- gunicorn
X-Robots-Tag:
- noindex, nofollow
status:
code: 404
message: NOT FOUND
- request:
body: null
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
Content-Length:
- '0'
method: DELETE
uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns/00000000-0000-0000-0000-000000000001
response:
body:
string: ''
headers:
Cache-Control:
- no-store
Connection:
- close
Content-Type:
- application/json
Date:
- Fri, 30 Aug 2024 11:27:45 GMT
Server:
- gunicorn
X-Robots-Tag:
- noindex, nofollow
status:
code: 204
message: NO CONTENT
version: 1
Loading

0 comments on commit 90b8d7c

Please sign in to comment.