Skip to content

Commit

Permalink
Revert to in-memory
Browse files Browse the repository at this point in the history
  • Loading branch information
jpbruinsslot committed Nov 18, 2024
1 parent da3f337 commit 05f36d4
Show file tree
Hide file tree
Showing 11 changed files with 33 additions and 313 deletions.
18 changes: 7 additions & 11 deletions mula/scheduler/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from opentelemetry import trace

from scheduler import context, schedulers, server
from scheduler.schedulers import create_schedulers_for_organisation, new_scheduler
from scheduler.schedulers import create_schedulers, new_scheduler
from scheduler.utils import thread

tracer = trace.get_tracer(__name__)
Expand Down Expand Up @@ -87,18 +87,14 @@ def run(self) -> None:
os._exit(1)

def start_schedulers(self) -> None:
schedulers_db, _ = self.ctx.datastores.scheduler_store.get_schedulers()
if not schedulers_db:
self.logger.warning("No schedulers to start")
return
boefje = schedulers.BoefjeScheduler(ctx=self.ctx)
boefje.run()

for scheduler_db in schedulers_db:
scheduler = new_scheduler(self.ctx, scheduler_db)
if not scheduler:
self.logger.error("Failed to create scheduler", scheduler_id=scheduler_db.scheduler_id)
continue
normalizer = schedulers.NormalizerScheduler(ctx=self.ctx)
normalizer.run()

scheduler.run()
report = schedulers.ReportScheduler(ctx=self.ctx)
report.run()

def start_collectors(self) -> None:
thread.ThreadRunner(
Expand Down
24 changes: 0 additions & 24 deletions mula/scheduler/models/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,7 @@ class Scheduler(BaseModel):
model_config = ConfigDict(from_attributes=True, use_enum_values=True)

id: str
enabled: bool = True
maxsize: int
organisation: str
type: SchedulerType
allow_replace: bool = True
allow_updates: bool = True
allow_priority_updates: bool = True

last_activity: datetime | None = None
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
modified_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))


class SchedulerDB(Base):
__tablename__ = "schedulers"

id = Column(String, primary_key=True)
enabled = Column(Boolean, default=True, nullable=False)
maxsize = Column(Integer, nullable=False)
organisation = Column(String, nullable=False)
type = Column(SQLAlchemyEnum(SchedulerType), nullable=False)
allow_replace = Column(Boolean, default=True, nullable=False)
allow_updates = Column(Boolean, default=True, nullable=False)
allow_priority_updates = Column(Boolean, default=True)

last_activity = Column(DateTime, default=datetime.now)
created_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now())
modified_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now())
2 changes: 1 addition & 1 deletion mula/scheduler/schedulers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .scheduler import Scheduler
from .schedulers import BoefjeScheduler, NormalizerScheduler, ReportScheduler
from .utils import create_schedulers_for_organisation, new_scheduler
from .utils import create_schedulers, new_scheduler
13 changes: 2 additions & 11 deletions mula/scheduler/schedulers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,20 +434,11 @@ def last_activity(self, value: datetime) -> None:
with self.lock:
self._last_activity = value

# TODO: update
def dict(self) -> dict[str, Any]:
"""Get a dict representation of the scheduler."""
return {
"id": self.scheduler_id,
"enabled": self.enabled,
"priority_queue": {
"id": self.queue.pq_id,
"item_type": self.queue.item_type.type,
"maxsize": self.queue.maxsize,
"qsize": self.queue.qsize(),
"allow_replace": self.queue.allow_replace,
"allow_updates": self.queue.allow_updates,
"allow_priority_updates": self.queue.allow_priority_updates,
},
"item_type": self.queue.item_type.type,
"qsize": self.queue.qsize(),
"last_activity": self.last_activity,
}
10 changes: 5 additions & 5 deletions mula/scheduler/schedulers/schedulers/boefje.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class BoefjeScheduler(Scheduler):

ITEM_TYPE: Any = models.BoefjeTask

def __init__(self, ctx: context.AppContext, scheduler_id: str, queue: PriorityQueue | None = None):
def __init__(self, ctx: context.AppContext):
"""Initializes the BoefjeScheduler.
Args:
Expand All @@ -34,18 +34,18 @@ def __init__(self, ctx: context.AppContext, scheduler_id: str, queue: PriorityQu
callback: The callback function to call when a task is completed.
"""
self.logger: structlog.BoundLogger = structlog.getLogger(__name__)
self.scheduler_id = "boefje"

self.queue = queue or PriorityQueue(
pq_id=scheduler_id,
self.queue = PriorityQueue(
pq_id=self.scheduler_id,
maxsize=ctx.config.pq_maxsize,
item_type=self.ITEM_TYPE,
allow_priority_updates=True,
pq_store=ctx.datastores.pq_store,
)

super().__init__(ctx=ctx, queue=self.queue, scheduler_id=scheduler_id, create_schedule=True)
super().__init__(ctx=ctx, queue=self.queue, scheduler_id=self.scheduler_id, create_schedule=True)

# Priority ranker
self.priority_ranker = BoefjeRanker(self.ctx)

def run(self) -> None:
Expand Down
13 changes: 5 additions & 8 deletions mula/scheduler/schedulers/schedulers/normalizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,19 @@ class NormalizerScheduler(Scheduler):

ITEM_TYPE: Any = models.NormalizerTask

def __init__(
self, ctx: context.AppContext, scheduler_id: str, organisation_id: str, queue: PriorityQueue | None = None
):
def __init__(self, ctx: context.AppContext):
self.logger: structlog.BoundLogger = structlog.getLogger(__name__)
self.organisation_id: str = organisation_id
self.create_schedule = False
self.scheduler_id = "normalizer"

self.queue = queue or PriorityQueue(
pq_id=scheduler_id,
self.queue = PriorityQueue(
pq_id=self.scheduler_id,
maxsize=ctx.config.pq_maxsize,
item_type=self.ITEM_TYPE,
allow_priority_updates=True,
pq_store=ctx.datastores.pq_store,
)

super().__init__(ctx=ctx, queue=self.queue, scheduler_id=scheduler_id)
super().__init__(ctx=ctx, queue=self.queue, scheduler_id=self.scheduler_id, create_schedule=False)

self.ranker = NormalizerRanker(ctx=self.ctx)

Expand Down
10 changes: 4 additions & 6 deletions mula/scheduler/schedulers/schedulers/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,19 @@
class ReportScheduler(Scheduler):
ITEM_TYPE: Any = models.ReportTask

def __init__(
self, ctx: context.AppContext, scheduler_id: str, organisation_id: str, queue: PriorityQueue | None = None
):
def __init__(self, ctx: context.AppContext):
self.logger: structlog.BoundLogger = structlog.get_logger(__name__)
self.organisation_id: str = organisation_id
self.scheduler_id: str = "report"

self.queue = queue or PriorityQueue(
pq_id=scheduler_id,
pq_id=self.scheduler_id,
maxsize=ctx.config.pq_maxsize,
item_type=self.ITEM_TYPE,
allow_priority_updates=True,
pq_store=ctx.datastores.pq_store,
)

super().__init__(ctx=ctx, queue=self.queue, scheduler_id=scheduler_id, create_schedule=True)
super().__init__(ctx=ctx, queue=self.queue, scheduler_id=self.scheduler_id, create_schedule=True)

def run(self) -> None:
# Rescheduling
Expand Down
66 changes: 0 additions & 66 deletions mula/scheduler/schedulers/utils.py

This file was deleted.

88 changes: 9 additions & 79 deletions mula/scheduler/server/handlers/schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,92 +37,22 @@ def __init__(self, api: fastapi.FastAPI, ctx: context.AppContext, s: dict[str, s

self.api.add_api_route(
path="/schedulers/{scheduler_id}",
endpoint=self.patch,
methods=["PATCH"],
response_model=models.Scheduler,
endpoint=self.push,
methods=["POST"],
response_model=models.Task,
status_code=status.HTTP_200_OK,
description="Update a scheduler",
description="Push a task to a scheduler",
)

def list(
self,
request: fastapi.Request,
scheduler_id: str | None = None,
scheduler_type: str | None = None,
organisation: str | None = None,
offset: int = 0,
limit: int = 10,
min_created_at: datetime.datetime | None = None,
max_created_at: datetime.datetime | None = None,
filters: storage.filters.FilterRequest | None = None,
) -> Any:
if (min_created_at is not None and max_created_at is not None) and min_created_at > max_created_at:
raise BadRequestError("min_created_at must be less than max_created_at")

results, count = self.ctx.datastores.scheduler_store.get_schedulers(
scheduler_id=scheduler_id,
scheduler_type=scheduler_type,
organisation=organisation,
offset=offset,
limit=limit,
min_created_at=min_created_at,
max_created_at=max_created_at,
filters=filters,
)

return utils.paginate(request, results, count, offset, limit)
def list(self, request: fastapi.Request) -> Any:
return [models.Scheduler(**s.dict()) for s in self.schedulers.values()]

def get(self, scheduler_id: str) -> Any:
scheduler = self.ctx.datastores.scheduler_store.get_scheduler(scheduler_id)
if scheduler is None:
raise NotFoundError(f"Scheduler {scheduler_id} not found")

return scheduler

def patch(self, scheduler_id: str, item: models.Scheduler) -> Any:
scheduler_db = self.ctx.datastores.scheduler_store.get_scheduler(scheduler_id)
if scheduler_db is None:
raise NotFoundError(f"Scheduler {scheduler_id} not found")

patch_data = scheduler_db.model_dump(exclude_unset=True)
if len(patch_data) == 0:
raise BadRequestError("no data to patch")

updated_scheduler = scheduler_db.model_copy(update=patch_data)

# Update the scheduler in database
self.ctx.datastores.scheduler_store.update_scheduler(updated_scheduler)

# Update the running scheduler in memory
scheduler_mem = self.schedulers.get(scheduler_id)
if scheduler_mem is None:
raise NotFoundError(f"Scheduler {scheduler_id} not found")

for attr, value in patch_data.items():
setattr(scheduler_mem, attr, value)

# Enable or disable the scheduler if needed.
if scheduler_mem.enabled:
scheduler_mem.enable()
elif not scheduler_mem.enabled:
scheduler_mem.disable()

return updated_scheduler

def delete(self, scheduler_id: str) -> Any:
scheduler_db = self.ctx.datastores.scheduler_store.get_scheduler(scheduler_id)
if scheduler_db is None:
s = self.schedulers.get(scheduler_id)
if s is None:
raise NotFoundError(f"Scheduler {scheduler_id} not found")

# Delete the scheduler in database
self.ctx.datastores.scheduler_store.delete_scheduler(scheduler_id)

# Delete the running scheduler in memory
scheduler_mem = self.schedulers.get(scheduler_id)
if scheduler_mem is not None:
scheduler_mem.stop()

return None
return models.Scheduler(**s.dict())

def pop(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def upgrade():
sa.Column("id", sa.String(), nullable=False),
sa.Column("enabled", sa.Boolean(), nullable=False),
sa.Column("maxsize", sa.Integer(), nullable=False),
sa.Column("organisation", sa.String(), nullable=False),
sa.Column("type", sa.Enum("BOEFJE", "NORMALIZER", "REPORT", name="schedulertype"), nullable=False),
sa.Column("allow_replace", sa.Boolean(), nullable=False),
sa.Column("allow_updates", sa.Boolean(), nullable=False),
Expand Down
Loading

0 comments on commit 05f36d4

Please sign in to comment.