Skip to content

Commit

Permalink
Fix Queue Limiter (#125)
Browse files Browse the repository at this point in the history
Also includes a quick fix for
#123
  • Loading branch information
kraftp authored Sep 30, 2024
1 parent 1081866 commit 346e5fc
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 1 deletion.
8 changes: 7 additions & 1 deletion dbos/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Callable

from dbos.logger import dbos_logger
from dbos.queue import Queue

if TYPE_CHECKING:
Expand All @@ -18,7 +19,12 @@
def scheduler_loop(
func: ScheduledWorkflow, cron: str, stop_event: threading.Event
) -> None:
iter = croniter(cron, datetime.now(timezone.utc), second_at_beginning=True)
try:
iter = croniter(cron, datetime.now(timezone.utc), second_at_beginning=True)
except Exception as e:
dbos_logger.error(
f'Cannot run scheduled function {func.__name__}. Invalid crontab "{cron}"'
)
while not stop_event.is_set():
nextExecTime = iter.get_next(datetime)
sleepTime = nextExecTime - datetime.now(timezone.utc)
Expand Down
2 changes: 2 additions & 0 deletions dbos/system_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,7 @@ def start_queued_workflows(self, queue: "Queue") -> List[str]:
query = (
sa.select(sa.func.count())
.select_from(SystemSchema.workflow_queue)
.where(SystemSchema.workflow_queue.c.queue_name == queue.name)
.where(
SystemSchema.workflow_queue.c.started_at_epoch_ms.isnot(None)
)
Expand Down Expand Up @@ -1167,6 +1168,7 @@ def start_queued_workflows(self, queue: "Queue") -> List[str]:
c.execute(
sa.delete(SystemSchema.workflow_queue)
.where(SystemSchema.workflow_queue.c.completed_at_epoch_ms != None)
.where(SystemSchema.workflow_queue.c.queue_name == queue.name)
.where(
SystemSchema.workflow_queue.c.started_at_epoch_ms
< start_time_ms - limiter_period_ms
Expand Down
79 changes: 79 additions & 0 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,82 @@ def test_workflow(var1: str, var2: str) -> float:

# Verify all queue entries eventually get cleaned up.
assert queue_entries_are_cleaned_up(dbos)


def test_multiple_queues(dbos: DBOS) -> None:

wf_counter = 0
flag = False
workflow_event = threading.Event()
main_thread_event = threading.Event()

@DBOS.workflow()
def workflow_one() -> None:
nonlocal wf_counter
wf_counter += 1
main_thread_event.set()
workflow_event.wait()

@DBOS.workflow()
def workflow_two() -> None:
nonlocal flag
flag = True

concurrency_queue = Queue("test_concurrency_queue", 1)
handle1 = concurrency_queue.enqueue(workflow_one)
assert handle1.get_status().queue_name == "test_concurrency_queue"
handle2 = concurrency_queue.enqueue(workflow_two)

@DBOS.workflow()
def limited_workflow(var1: str, var2: str) -> float:
assert var1 == "abc" and var2 == "123"
return time.time()

limit = 5
period = 2
limiter_queue = Queue(
"test_limit_queue", limiter={"limit": limit, "period": period}
)

handles: list[WorkflowHandle[float]] = []
times: list[float] = []

# Launch a number of tasks equal to three times the limit.
# This should lead to three "waves" of the limit tasks being
# executed simultaneously, followed by a wait of the period,
# followed by the next wave.
num_waves = 3
for _ in range(limit * num_waves):
h = limiter_queue.enqueue(limited_workflow, "abc", "123")
handles.append(h)
for h in handles:
times.append(h.get_result())

# Verify that each "wave" of tasks started at the ~same time.
for wave in range(num_waves):
for i in range(wave * limit, (wave + 1) * limit - 1):
assert times[i + 1] - times[i] < 0.2

# Verify that the gap between "waves" is ~equal to the period
for wave in range(num_waves - 1):
assert times[limit * (wave + 1)] - times[limit * wave] > period - 0.2
assert times[limit * (wave + 1)] - times[limit * wave] < period + 0.2

# Verify all workflows get the SUCCESS status eventually
dbos._sys_db.wait_for_buffer_flush()
for h in handles:
assert h.get_status().status == WorkflowStatusString.SUCCESS.value

# Verify that during all this time, the second task
# was not launched on the concurrency-limited queue.
# Then, finish the first task and verify the second
# task runs on schedule.
assert not flag
workflow_event.set()
assert handle1.get_result() == None
assert handle2.get_result() == None
assert flag
assert wf_counter == 1

# Verify all queue entries eventually get cleaned up.
assert queue_entries_are_cleaned_up(dbos)

0 comments on commit 346e5fc

Please sign in to comment.