From c86325af28e3c0790b03f10dc0f8ab552a3abfa5 Mon Sep 17 00:00:00 2001 From: JP Bruins Slot Date: Thu, 28 Nov 2024 15:10:46 +0100 Subject: [PATCH] Add additional check if task already run for report scheduler (#3900) Co-authored-by: Rieven --- .../scheduler/schedulers/schedulers/report.py | 71 ++++++++++++++++++- 1 file changed, 70 insertions(+), 1 deletion(-) diff --git a/mula/scheduler/schedulers/schedulers/report.py b/mula/scheduler/schedulers/schedulers/report.py index 099c21642d9..4fa4189c58c 100644 --- a/mula/scheduler/schedulers/schedulers/report.py +++ b/mula/scheduler/schedulers/schedulers/report.py @@ -7,7 +7,7 @@ from opentelemetry import trace from scheduler import context, storage -from scheduler.models import Organisation, ReportTask, Task +from scheduler.models import Organisation, ReportTask, Task, TaskStatus from scheduler.schedulers import Scheduler from scheduler.schedulers.queue import PriorityQueue, QueueFullError from scheduler.storage import filters @@ -87,6 +87,40 @@ def push_tasks_for_rescheduling(self): ) as executor: for schedule in schedules: report_task = ReportTask.model_validate(schedule.data) + + # When the schedule has no schedule (cron expression), but a + # task is already executed for this schedule we should not run + # the task again + if schedule.schedule is None: + try: + _, count = self.ctx.datastores.task_store.get_tasks( + scheduler_id=self.scheduler_id, + task_type=report_task.type, + filters=filters.FilterRequest( + filters=[ + filters.Filter(column="hash", operator="eq", value=report_task.hash), + filters.Filter(column="schedule_id", operator="eq", value=str(schedule.id)), + ] + ), + ) + if count > 0: + self.logger.debug( + "Schedule has no schedule, but task already executed", + schedule_id=schedule.id, + scheduler_id=self.scheduler_id, + organisation_id=self.organisation.id, + ) + continue + except storage.errors.StorageError as exc_db: + self.logger.error( + "Could not get latest task by hash %s", + report_task.hash, + scheduler_id=self.scheduler_id, + organisation_id=self.organisation.id, + exc_info=exc_db, + ) + continue + executor.submit(self.push_report_task, report_task, self.push_tasks_for_rescheduling.__name__) def push_report_task(self, report_task: ReportTask, caller: str = "") -> None: @@ -98,6 +132,16 @@ def push_report_task(self, report_task: ReportTask, caller: str = "") -> None: caller=caller, ) + if self.has_report_task_started_running(report_task): + self.logger.debug( + "Report task already running", + task_hash=report_task.hash, + organisation_id=self.organisation.id, + scheduler_id=self.scheduler_id, + caller=caller, + ) + return + if self.is_item_on_queue_by_hash(report_task.hash): self.logger.debug( "Report task already on queue", @@ -139,3 +183,28 @@ def push_report_task(self, report_task: ReportTask, caller: str = "") -> None: scheduler_id=self.scheduler_id, caller=caller, ) + + def has_report_task_started_running(self, task: ReportTask) -> bool: + task_db = None + try: + task_db = self.ctx.datastores.task_store.get_latest_task_by_hash(task.hash) + except storage.errors.StorageError as exc_db: + self.logger.error( + "Could not get latest task by hash %s", + task.hash, + organisation_id=self.organisation.id, + scheduler_id=self.scheduler_id, + exc_info=exc_db, + ) + raise exc_db + + if task_db is not None and task_db.status not in [TaskStatus.FAILED, TaskStatus.COMPLETED]: + self.logger.debug( + "Task is still running, according to the datastore", + task_id=task_db.id, + organisation_id=self.organisation.id, + scheduler_id=self.scheduler_id, + ) + return True + + return False