Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Boefje runonce functionality in scheduler #3906

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mula/scheduler/models/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ class Plugin(BaseModel):
produces: list[str]
cron: str | None = None
interval: int | None = None
runon: list[str] | None = None
34 changes: 29 additions & 5 deletions mula/scheduler/schedulers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ def push_items_to_queue(self, items: list[models.Task]) -> None:

count += 1

def push_item_to_queue_with_timeout(self, item: models.Task, max_tries: int = 5, timeout: int = 1) -> None:
def push_item_to_queue_with_timeout(
self, item: models.Task, max_tries: int = 5, timeout: int = 1, create_schedule: bool = True
) -> None:
"""Push an item to the queue, with a timeout.

Args:
Expand All @@ -192,9 +194,9 @@ def push_item_to_queue_with_timeout(self, item: models.Task, max_tries: int = 5,
if tries >= max_tries and max_tries != -1:
raise QueueFullError()

self.push_item_to_queue(item)
self.push_item_to_queue(item=item, create_schedule=create_schedule)

def push_item_to_queue(self, item: models.Task) -> models.Task:
def push_item_to_queue(self, item: models.Task, create_schedule: bool = True) -> models.Task:
"""Push a Task to the queue.

Args:
Expand Down Expand Up @@ -263,11 +265,11 @@ def push_item_to_queue(self, item: models.Task) -> models.Task:
scheduler_id=self.scheduler_id,
)

item = self.post_push(item)
item = self.post_push(item, create_schedule)

return item

def post_push(self, item: models.Task) -> models.Task:
def post_push(self, item: models.Task, create_schedule: bool = True) -> models.Task:
"""After an in item is pushed to the queue, we execute this function

Args:
Expand All @@ -285,6 +287,28 @@ def post_push(self, item: models.Task) -> models.Task:
)
return item

scheduler_create_schedule = self.create_schedule
if not scheduler_create_schedule:
self.logger.debug(
"Scheduler is not creating schedules, not creating schedule for item %s",
item.id,
item_id=item.id,
queue_id=self.queue.pq_id,
scheduler_id=self.scheduler_id,
)
return item

item_create_schedule = create_schedule
if not item_create_schedule:
self.logger.debug(
"Item is not creating schedules, not creating schedule for item %s",
item.id,
item_id=item.id,
queue_id=self.queue.pq_id,
scheduler_id=self.scheduler_id,
)
return item

schedule_db = None
if item.schedule_id is not None:
schedule_db = self.ctx.datastores.schedule_store.get_schedule(item.schedule_id)
Expand Down
38 changes: 33 additions & 5 deletions mula/scheduler/schedulers/schedulers/boefje.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ def push_tasks_for_scan_profile_mutations(self, body: bytes) -> None:
thread_name_prefix=f"BoefjeScheduler-TPE-{self.scheduler_id}-mutations"
) as executor:
for boefje in boefjes:
# Is the boefje allowed to run on the ooi?
if not self.has_boefje_permission_to_run(boefje, ooi):
self.logger.debug(
"Boefje not allowed to run on ooi",
Expand All @@ -204,13 +205,40 @@ def push_tasks_for_scan_profile_mutations(self, body: bytes) -> None:
)
continue

create_schedule = True
run_task = True

# What type of run boefje is it?
if boefje.runon:
create_schedule = False
run_task = False
if mutation.operation == MutationOperationType.CREATE:
run_task = "create" in boefje.runon
elif mutation.operation == MutationOperationType.UPDATE:
run_task = "update" in boefje.runon

if not run_task:
self.logger.debug(
"Based on boefje run on type, skipping",
boefje_id=boefje.id,
ooi_primary_key=ooi.primary_key,
organisation_id=self.organisation.id,
scheduler_id=self.scheduler_id,
)
continue

boefje_task = BoefjeTask(
boefje=Boefje.model_validate(boefje.model_dump()),
input_ooi=ooi.primary_key if ooi else None,
organization=self.organisation.id,
)

executor.submit(self.push_boefje_task, boefje_task, self.push_tasks_for_scan_profile_mutations.__name__)
executor.submit(
self.push_boefje_task,
boefje_task,
create_schedule,
self.push_tasks_for_scan_profile_mutations.__name__,
)

@tracer.start_as_current_span("boefje_push_tasks_for_new_boefjes")
def push_tasks_for_new_boefjes(self) -> None:
Expand Down Expand Up @@ -461,7 +489,7 @@ def push_tasks_for_rescheduling(self):
executor.submit(self.push_boefje_task, new_boefje_task, self.push_tasks_for_rescheduling.__name__)

@tracer.start_as_current_span("boefje_push_task")
def push_boefje_task(self, boefje_task: BoefjeTask, caller: str = "") -> None:
def push_boefje_task(self, boefje_task: BoefjeTask, create_schedule: bool = True, caller: str = "") -> None:
"""Given a Boefje and OOI create a BoefjeTask and push it onto
the queue.

Expand Down Expand Up @@ -582,7 +610,7 @@ def push_boefje_task(self, boefje_task: BoefjeTask, caller: str = "") -> None:
)

try:
self.push_item_to_queue_with_timeout(task, self.max_tries)
self.push_item_to_queue_with_timeout(task, self.max_tries, create_schedule)
except QueueFullError:
self.logger.warning(
"Could not add task to queue, queue was full: %s",
Expand All @@ -607,7 +635,7 @@ def push_boefje_task(self, boefje_task: BoefjeTask, caller: str = "") -> None:
caller=caller,
)

def push_item_to_queue(self, item: Task) -> Task:
def push_item_to_queue(self, item: Task, create_schedule: bool = True) -> Task:
"""Some boefje scheduler specific logic before pushing the item to the
queue."""
boefje_task = BoefjeTask.model_validate(item.data)
Expand All @@ -620,7 +648,7 @@ def push_item_to_queue(self, item: Task) -> Task:
item.id = new_id
item.data = boefje_task.model_dump()

return super().push_item_to_queue(item)
return super().push_item_to_queue(item=item, create_schedule=create_schedule)

@tracer.start_as_current_span("boefje_has_boefje_permission_to_run")
def has_boefje_permission_to_run(self, boefje: Plugin, ooi: OOI) -> bool:
Expand Down
6 changes: 3 additions & 3 deletions mula/scheduler/schedulers/schedulers/normalizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def push_normalizer_task(self, normalizer_task: models.NormalizerTask, caller: s
)

try:
self.push_item_to_queue_with_timeout(task, self.max_tries)
self.push_item_to_queue_with_timeout(item=task, max_tries=self.max_tries)
except QueueFullError:
self.logger.warning(
"Could not add task to queue, queue was full: %s",
Expand All @@ -279,7 +279,7 @@ def push_normalizer_task(self, normalizer_task: models.NormalizerTask, caller: s
caller=caller,
)

def push_item_to_queue(self, item: Task) -> Task:
def push_item_to_queue(self, item: Task, create_schedule: bool = True) -> Task:
"""Some normalizer scheduler specific logic before pushing the item to the
queue."""
normalizer_task = NormalizerTask.model_validate(item.data)
Expand All @@ -292,7 +292,7 @@ def push_item_to_queue(self, item: Task) -> Task:
item.id = new_id
item.data = normalizer_task.model_dump()

return super().push_item_to_queue(item)
return super().push_item_to_queue(item=item, create_schedule=create_schedule)

@tracer.start_as_current_span("normalizer_has_normalizer_permission_to_run")
def has_normalizer_permission_to_run(self, normalizer: Plugin) -> bool:
Expand Down