Skip to content

Commit

Permalink
workflows: fixed accesses to authors api
Browse files Browse the repository at this point in the history
  • Loading branch information
DonHaul committed Nov 25, 2024
1 parent a28f6bb commit a323f49
Show file tree
Hide file tree
Showing 10 changed files with 494 additions and 46 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
files: ^(backend|backoffice|workflow)/
files: ^(backend|backoffice|workflows)/
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.6.0
Expand Down
12 changes: 3 additions & 9 deletions workflows/dags/author/author_create/author_create_approved.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,14 @@ def author_create_approved_dag():
"""
inspire_http_hook = InspireHttpHook()
inspire_http_record_management_hook = InspireHTTPRecordManagementHook()
workflow_management_hook = WorkflowManagementHook()
workflow_management_hook = WorkflowManagementHook(AUTHORS)
workflow_ticket_management_hook = AuthorWorkflowTicketManagementHook()

@task()
def set_workflow_status_to_running(**context):
status_name = "running"
workflow_management_hook.set_workflow_status(
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
status_name=status_name, workflow_id=context["params"]["workflow_id"]
)

@task.branch()
Expand Down Expand Up @@ -107,7 +105,6 @@ def create_author_on_inspire(**context: dict) -> str:
workflow_management_hook.partial_update_workflow(
workflow_id=context["params"]["workflow_id"],
workflow_partial_update_data={"data": workflow_data["data"]},
collection=AUTHORS,
)
return status

Expand All @@ -124,9 +121,7 @@ def author_create_success_branch(**context: dict) -> str:
def set_author_create_workflow_status_to_completed(**context: dict) -> None:
status_name = "completed"
workflow_management_hook.set_workflow_status(
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
status_name=status_name, workflow_id=context["params"]["workflow_id"]
)

@task
Expand All @@ -142,7 +137,6 @@ def set_author_create_workflow_status_to_error(**context: dict) -> None:
workflow_management_hook.set_workflow_status(
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
)

# task definitions
Expand Down
11 changes: 3 additions & 8 deletions workflows/dags/author/author_create/author_create_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,14 @@ def author_create_initialization_dag():
"""
inspire_http_hook = InspireHttpHook()
workflow_management_hook = WorkflowManagementHook()
workflow_management_hook = WorkflowManagementHook(AUTHORS)
workflow_ticket_management_hook = AuthorWorkflowTicketManagementHook()

@task()
def set_workflow_status_to_running(**context):
status_name = "running"
workflow_management_hook.set_workflow_status(
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
status_name=status_name, workflow_id=context["params"]["workflow_id"]
)

@task()
Expand All @@ -57,7 +55,6 @@ def set_schema(**context):
workflow_partial_update_data={
"data": {**context["params"]["data"], "$schema": schema}
},
collection=AUTHORS,
)

@task()
Expand Down Expand Up @@ -86,9 +83,7 @@ def create_author_create_user_ticket(**context: dict) -> None:
def set_author_create_workflow_status_to_approval(**context: dict) -> None:
status_name = "approval"
workflow_management_hook.set_workflow_status(
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
status_name=status_name, workflow_id=context["params"]["workflow_id"]
)

# task dependencies
Expand Down
10 changes: 3 additions & 7 deletions workflows/dags/author/author_create/author_create_rejected.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,20 @@ def author_create_rejected_dag() -> None:
2. set_author_create_workflow_status_to_completed: Sets the status of
the author creation workflow to 'completed'.
"""
workflow_management_hook = WorkflowManagementHook()
workflow_management_hook = WorkflowManagementHook(AUTHORS)

@task()
def set_author_create_workflow_status_to_completed(**context: dict) -> None:
status_name = "completed"
workflow_management_hook.set_workflow_status(
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
status_name=status_name, workflow_id=context["params"]["workflow_id"]
)

@task()
def set_workflow_status_to_running(**context):
status_name = "running"
workflow_management_hook.set_workflow_status(
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
status_name=status_name, workflow_id=context["params"]["workflow_id"]
)

# task definitions
Expand Down
14 changes: 4 additions & 10 deletions workflows/dags/author/author_update/author_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,14 @@ def author_update_dag():
"""
inspire_http_hook = InspireHttpHook()
inspire_http_record_management_hook = InspireHTTPRecordManagementHook()
workflow_management_hook = WorkflowManagementHook()
workflow_management_hook = WorkflowManagementHook(AUTHORS)
workflow_ticket_management_hook = AuthorWorkflowTicketManagementHook()

@task()
def set_author_update_workflow_status_to_running(**context):
status_name = "running"
workflow_management_hook.set_workflow_status(
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
status_name=status_name, workflow_id=context["params"]["workflow_id"]
)

@task()
Expand Down Expand Up @@ -94,9 +92,7 @@ def update_author_on_inspire(**context):
def set_author_update_workflow_status_to_completed(**context):
status_name = "completed"
workflow_management_hook.set_workflow_status(
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
status_name=status_name, workflow_id=context["params"]["workflow_id"]
)

@task.branch()
Expand All @@ -114,9 +110,7 @@ def set_author_update_workflow_status_to_error(**context):
ti = context["ti"]
status_name = ti.xcom_pull(task_ids="update_author_on_inspire")
workflow_management_hook.set_workflow_status(
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
status_name=status_name, workflow_id=context["params"]["workflow_id"]
)

# task definitions
Expand Down
20 changes: 10 additions & 10 deletions workflows/plugins/hooks/backoffice/workflow_management_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ class WorkflowManagementHook(BackofficeHook):
:type http_conn_id: str
"""

def set_workflow_status(
self, status_name: str, workflow_id: str, collection: str
) -> Response:
def __init__(self, collection):
super().__init__()
self.endpoint = f"api/workflows/{collection}"

def set_workflow_status(self, status_name: str, workflow_id: str) -> Response:
"""
Updates the status of a workflow in the backoffice system.
Expand All @@ -32,21 +34,19 @@ def set_workflow_status(
"status": status_name,
}
return self.partial_update_workflow(
workflow_partial_update_data=request_data,
workflow_id=workflow_id,
collection=collection,
workflow_partial_update_data=request_data, workflow_id=workflow_id
)

def get_workflow(self, workflow_id: str) -> dict:
endpoint = f"api/workflows/{workflow_id}"
endpoint = f"{self.endpoint}/{workflow_id}"
response = self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs, method="GET", endpoint=endpoint
)
response = self.run(endpoint=endpoint, headers=self.headers)
return response.json()

def update_workflow(self, workflow_id: str, workflow_data: dict) -> Response:
endpoint = f"api/workflows/{workflow_id}/"
endpoint = f"{self.endpoint}/{workflow_id}/"
return self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs,
method="PUT",
Expand All @@ -55,9 +55,9 @@ def update_workflow(self, workflow_id: str, workflow_data: dict) -> Response:
)

def partial_update_workflow(
self, workflow_id: str, workflow_partial_update_data: dict, collection: str
self, workflow_id: str, workflow_partial_update_data: dict
) -> Response:
endpoint = f"api/workflows/{collection}/{workflow_id}/"
endpoint = f"{self.endpoint}/{workflow_id}/"
return self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs,
method="PATCH",
Expand Down
Loading

0 comments on commit a323f49

Please sign in to comment.