Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
user actions: restart actions and respective tests added
Browse files Browse the repository at this point in the history
  • Loading branch information
DonHaul committed Jul 31, 2024
1 parent c37c719 commit 93fc158
Show file tree
Hide file tree
Showing 23 changed files with 2,801 additions and 45 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ local_settings.py
db.sqlite3
db.sqlite3-journal

# Airflow stuff:
workflows/logs/

# Flask stuff:
instance/
.webassets-cache
Expand Down
2 changes: 1 addition & 1 deletion backoffice/.envs/local/.django
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ OPENSEARCH_HOST=opensearch:9200
OPENSEARCH_INDEX_PREFIX=backoffice-backend-local

# Airflow
AIRFLOW_BASE_URL=http://host.docker.internal:8080
AIRFLOW_BASE_URL=http://airflow-webserver:8080
AIRFLOW_TOKEN=YWlyZmxvdzphaXJmbG93
131 changes: 128 additions & 3 deletions backoffice/backoffice/workflows/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from requests.exceptions import RequestException
from rest_framework import status

from backoffice.workflows.constants import WORKFLOW_DAGS

AIRFLOW_BASE_URL = environ.get("AIRFLOW_BASE_URL")

AIRFLOW_HEADERS = {
Expand All @@ -24,7 +26,7 @@ def trigger_airflow_dag(dag_id, workflow_id, extra_data=None):
:returns: request response
"""

data = {"dag_run_id": workflow_id, "conf": {"workflow_id": workflow_id}}
data = {"dag_run_id": str(workflow_id), "conf": {"workflow_id": str(workflow_id)}}

if extra_data is not None:
data["conf"].update(extra_data)
Expand All @@ -33,10 +35,9 @@ def trigger_airflow_dag(dag_id, workflow_id, extra_data=None):

try:
logger.info(
"Triggering DAG %s with data: %s and %s %s",
"Triggering DAG %s with data: %s and %s",
dag_id,
data,
AIRFLOW_HEADERS,
url,
)
response = requests.post(url, json=data, headers=AIRFLOW_HEADERS)
Expand All @@ -45,3 +46,127 @@ def trigger_airflow_dag(dag_id, workflow_id, extra_data=None):
except RequestException:
data = {"error": response.json()}
return JsonResponse(data, status=status.HTTP_502_BAD_GATEWAY)


def restart_failed_tasks(workflow_id, workflow_type):
"""Restarts failed tasks of an airflow dag.
:param workflow_id: id of workflow to restart failed tasks
:param workflow_type: type of workflow to retrieve
:returns: request response
"""

dag_id = find_failed_dag(workflow_id, workflow_type)
# assumes current task is one of the failed tasks
data = {
"dry_run": False,
"dag_run_id": str(workflow_id),
"reset_dag_runs": False,
"only_failed": True,
}

url = f"{AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/clearTaskInstances"

try:
logger.info(
"Clearing Failed Tasks of DAG %s with data: %s and %s",
dag_id,
data,
url,
)
response = requests.post(
url,
json=data,
headers=AIRFLOW_HEADERS,
)
response.raise_for_status()
return JsonResponse(response.json())
except RequestException:
data = {"error": response.json()}
return JsonResponse(data, status=status.HTTP_424_FAILED_DEPENDENCY)


def find_executed_dags(workflow_id, workflow_type):
"""For a given workflow find dags associated to it.
:param workflow_id: id of workflow to retrieve executed dags
:param workflow_type: type of workflow to retrieve
:returns: dictionary with executed dags and their status
"""

executed_dags_for_workflow = {}
# find dags that were executed
for dag_id in WORKFLOW_DAGS[workflow_type]:
response = requests.get(
f"{AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/dagRuns/{workflow_id}",
headers=AIRFLOW_HEADERS,
)
if response.status_code == status.HTTP_200_OK:
executed_dags_for_workflow[dag_id] = response.json()

return executed_dags_for_workflow


def find_failed_dag(workflow_id, workflow_type):
"""For a given workflow find failed dags.
:param workflow_id: id of workflow to retrieve the failed dags
:param workflow_type: type of workflow to retrieve
:returns: failed dag id or none
"""

executed_dags_for_workflow = find_executed_dags(str(workflow_id), workflow_type)

for dag, dag_data in executed_dags_for_workflow.items():
if dag_data["state"] == "failed":
return dag


def delete_workflow_dag(dag_id, workflow_id):
"""Delete dag run.
:param dag_id: dag to be removed
:param workflow_id: id of workflow whoose dag execution should be deleted
:returns: request response
"""

url = f"{AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/dagRuns/{str(workflow_id)}"
try:
logger.info(
"Deleting dag Failed Tasks of DAG %s with no data and %s",
dag_id,
url,
)
response = requests.delete(
url,
headers=AIRFLOW_HEADERS,
)
response.raise_for_status()
return JsonResponse({"message": "Successfully deleted DAG"})
except RequestException:
return JsonResponse(
{"error": "Failed to delete DAG"}, status=status.HTTP_424_FAILED_DEPENDENCY
)


def restart_workflow_dags(workflow_id, workflow_type, params=None):
"""Restarts dags of a given workflow.
:param workflow_id: workflow_id for dags that should be restarted
:param workflow_type: type of workflow the will be restarted
:param params: parameters of new dag execution
:returns: request response
"""
executed_dags_for_workflow = find_executed_dags(workflow_id, workflow_type)

for dag_id in executed_dags_for_workflow:
delete_workflow_dag(dag_id, str(workflow_id))

return trigger_airflow_dag(
WORKFLOW_DAGS[workflow_type].initialize, str(workflow_id), params
)

return JsonResponse(
{"error": "Failed to restart"}, status=status.HTTP_424_FAILED_DEPENDENCY
)
20 changes: 17 additions & 3 deletions backoffice/backoffice/workflows/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
WorkflowSerializer,
WorkflowTicketSerializer,
)
from backoffice.workflows.constants import WORKFLOW_DAG, ResolutionDags
from backoffice.workflows.constants import WORKFLOW_DAGS, ResolutionDags
from backoffice.workflows.documents import WorkflowDocument
from backoffice.workflows.models import Workflow, WorkflowTicket

Expand Down Expand Up @@ -105,11 +105,13 @@ def create(self, request):
)
logger.info(
"Trigger Airflow DAG: %s for %s",
WORKFLOW_DAG[workflow.workflow_type],
WORKFLOW_DAGS[workflow.workflow_type].initialize,
workflow.id,
)
return airflow_utils.trigger_airflow_dag(
WORKFLOW_DAG[workflow.workflow_type], str(workflow.id), workflow.data
WORKFLOW_DAGS[workflow.workflow_type].initialize,
str(workflow.id),
workflow.data,
)

@action(detail=True, methods=["post"])
Expand All @@ -123,10 +125,22 @@ def resolve(self, request, pk=None):
ResolutionDags[serializer.validated_data["value"]],
pk,
)

return airflow_utils.trigger_airflow_dag(
ResolutionDags[serializer.validated_data["value"]].label, pk, extra_data
)

@action(detail=True, methods=["post"])
def restart(self, request, pk=None):
workflow = Workflow.objects.get(id=pk)

if request.data.get("restart_current_task"):
return airflow_utils.restart_failed_tasks(workflow)

return airflow_utils.restart_workflow_dags(
workflow.id, workflow.workflow_type, request.data.get("params")
)


class WorkflowDocumentView(BaseDocumentViewSet):
def __init__(self, *args, **kwargs):
Expand Down
26 changes: 18 additions & 8 deletions backoffice/backoffice/workflows/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,25 @@ class WorkflowType(models.TextChoices):

DEFAULT_WORKFLOW_TYPE = WorkflowType.HEP_CREATE

# author dags for each workflow type
WORKFLOW_DAG = {
WorkflowType.HEP_CREATE: "",
WorkflowType.HEP_UPDATE: "",
WorkflowType.AUTHOR_CREATE: "author_create_initialization_dag",
WorkflowType.AUTHOR_UPDATE: "author_update_dag",
}


class ResolutionDags(models.TextChoices):
accept = "accept", "author_create_approved_dag"
reject = "reject", "author_create_rejected_dag"


class AuthorCreateDags(models.TextChoices):
initialize = "author_create_initialization_dag", "initialize"
approve = "author_create_approved_dag", "approve"
reject = "author_create_rejected_dag", "reject"


class AuthorUpdateDags(models.TextChoices):
initialize = "author_update_dag", "initialize"


WORKFLOW_DAGS = {
WorkflowType.HEP_CREATE: "",
WorkflowType.HEP_UPDATE: "",
WorkflowType.AUTHOR_CREATE: AuthorCreateDags,
WorkflowType.AUTHOR_UPDATE: AuthorUpdateDags,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
interactions:
- request:
body: '{"dag_run_id": "00000000-0000-0000-0000-000000000001", "conf": {"workflow_id":
"00000000-0000-0000-0000-000000000001"}}'
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
Content-Length:
- '119'
Content-Type:
- application/json
method: POST
uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns
response:
body:
string: "{\n \"conf\": {\n \"workflow_id\": \"00000000-0000-0000-0000-000000000001\"\n
\ },\n \"dag_id\": \"author_create_initialization_dag\",\n \"dag_run_id\":
\"00000000-0000-0000-0000-000000000001\",\n \"data_interval_end\": \"2024-07-30T12:13:41.736880+00:00\",\n
\ \"data_interval_start\": \"2024-07-30T12:13:41.736880+00:00\",\n \"end_date\":
null,\n \"execution_date\": \"2024-07-30T12:13:41.736880+00:00\",\n \"external_trigger\":
true,\n \"last_scheduling_decision\": null,\n \"logical_date\": \"2024-07-30T12:13:41.736880+00:00\",\n
\ \"note\": null,\n \"run_type\": \"manual\",\n \"start_date\": null,\n
\ \"state\": \"queued\"\n}\n"
headers:
Connection:
- close
Content-Length:
- '579'
Content-Type:
- application/json
Date:
- Tue, 30 Jul 2024 12:13:41 GMT
Server:
- gunicorn
X-Robots-Tag:
- noindex, nofollow
status:
code: 200
message: OK
- request:
body: null
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
Content-Length:
- '0'
Content-Type:
- application/json
method: DELETE
uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns/00000000-0000-0000-0000-000000000001
response:
body:
string: ''
headers:
Connection:
- close
Content-Type:
- application/json
Date:
- Tue, 30 Jul 2024 12:13:41 GMT
Server:
- gunicorn
X-Robots-Tag:
- noindex, nofollow
status:
code: 204
message: NO CONTENT
- request:
body: null
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
Content-Length:
- '0'
Content-Type:
- application/json
method: DELETE
uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns/00000000-0000-0000-0000-000000000001
response:
body:
string: "{\n \"detail\": \"DAGRun with DAG ID: 'author_create_initialization_dag'
and DagRun ID: '00000000-0000-0000-0000-000000000001' not found\",\n \"status\":
404,\n \"title\": \"Not Found\",\n \"type\": \"https://airflow.apache.org/docs/apache-airflow/2.8.3/stable-rest-api-ref.html#section/Errors/NotFound\"\n}\n"
headers:
Connection:
- close
Content-Length:
- '293'
Content-Type:
- application/problem+json
Date:
- Tue, 30 Jul 2024 12:13:41 GMT
Server:
- gunicorn
X-Robots-Tag:
- noindex, nofollow
status:
code: 404
message: NOT FOUND
version: 1
Loading

0 comments on commit 93fc158

Please sign in to comment.