Skip to content

Commit

Permalink
Merge branch 'main' into feature/add-logging-to-reports
Browse files Browse the repository at this point in the history
  • Loading branch information
underdarknl authored Nov 28, 2024
2 parents 7ccf2f2 + c86325a commit c7230fd
Showing 1 changed file with 70 additions and 1 deletion.
71 changes: 70 additions & 1 deletion mula/scheduler/schedulers/schedulers/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from opentelemetry import trace

from scheduler import context, storage
from scheduler.models import Organisation, ReportTask, Task
from scheduler.models import Organisation, ReportTask, Task, TaskStatus
from scheduler.schedulers import Scheduler
from scheduler.schedulers.queue import PriorityQueue, QueueFullError
from scheduler.storage import filters
Expand Down Expand Up @@ -87,6 +87,40 @@ def push_tasks_for_rescheduling(self):
) as executor:
for schedule in schedules:
report_task = ReportTask.model_validate(schedule.data)

# When the schedule has no schedule (cron expression), but a
# task is already executed for this schedule we should not run
# the task again
if schedule.schedule is None:
try:
_, count = self.ctx.datastores.task_store.get_tasks(
scheduler_id=self.scheduler_id,
task_type=report_task.type,
filters=filters.FilterRequest(
filters=[
filters.Filter(column="hash", operator="eq", value=report_task.hash),
filters.Filter(column="schedule_id", operator="eq", value=str(schedule.id)),
]
),
)
if count > 0:
self.logger.debug(
"Schedule has no schedule, but task already executed",
schedule_id=schedule.id,
scheduler_id=self.scheduler_id,
organisation_id=self.organisation.id,
)
continue
except storage.errors.StorageError as exc_db:
self.logger.error(
"Could not get latest task by hash %s",
report_task.hash,
scheduler_id=self.scheduler_id,
organisation_id=self.organisation.id,
exc_info=exc_db,
)
continue

executor.submit(self.push_report_task, report_task, self.push_tasks_for_rescheduling.__name__)

def push_report_task(self, report_task: ReportTask, caller: str = "") -> None:
Expand All @@ -98,6 +132,16 @@ def push_report_task(self, report_task: ReportTask, caller: str = "") -> None:
caller=caller,
)

if self.has_report_task_started_running(report_task):
self.logger.debug(
"Report task already running",
task_hash=report_task.hash,
organisation_id=self.organisation.id,
scheduler_id=self.scheduler_id,
caller=caller,
)
return

if self.is_item_on_queue_by_hash(report_task.hash):
self.logger.debug(
"Report task already on queue",
Expand Down Expand Up @@ -139,3 +183,28 @@ def push_report_task(self, report_task: ReportTask, caller: str = "") -> None:
scheduler_id=self.scheduler_id,
caller=caller,
)

def has_report_task_started_running(self, task: ReportTask) -> bool:
task_db = None
try:
task_db = self.ctx.datastores.task_store.get_latest_task_by_hash(task.hash)
except storage.errors.StorageError as exc_db:
self.logger.error(
"Could not get latest task by hash %s",
task.hash,
organisation_id=self.organisation.id,
scheduler_id=self.scheduler_id,
exc_info=exc_db,
)
raise exc_db

if task_db is not None and task_db.status not in [TaskStatus.FAILED, TaskStatus.COMPLETED]:
self.logger.debug(
"Task is still running, according to the datastore",
task_id=task_db.id,
organisation_id=self.organisation.id,
scheduler_id=self.scheduler_id,
)
return True

return False

0 comments on commit c7230fd

Please sign in to comment.