diff --git a/dbos/scheduler/scheduler.py b/dbos/scheduler/scheduler.py index 03c75ae9..64e4e256 100644 --- a/dbos/scheduler/scheduler.py +++ b/dbos/scheduler/scheduler.py @@ -2,6 +2,7 @@ from datetime import datetime, timezone from typing import TYPE_CHECKING, Callable +from dbos.logger import dbos_logger from dbos.queue import Queue if TYPE_CHECKING: @@ -18,7 +19,12 @@ def scheduler_loop( func: ScheduledWorkflow, cron: str, stop_event: threading.Event ) -> None: - iter = croniter(cron, datetime.now(timezone.utc), second_at_beginning=True) + try: + iter = croniter(cron, datetime.now(timezone.utc), second_at_beginning=True) + except Exception as e: + dbos_logger.error( + f'Cannot run scheduled function {func.__name__}. Invalid crontab "{cron}"' + ) while not stop_event.is_set(): nextExecTime = iter.get_next(datetime) sleepTime = nextExecTime - datetime.now(timezone.utc) diff --git a/dbos/system_database.py b/dbos/system_database.py index 3528cd6e..5c076658 100644 --- a/dbos/system_database.py +++ b/dbos/system_database.py @@ -1100,6 +1100,7 @@ def start_queued_workflows(self, queue: "Queue") -> List[str]: query = ( sa.select(sa.func.count()) .select_from(SystemSchema.workflow_queue) + .where(SystemSchema.workflow_queue.c.queue_name == queue.name) .where( SystemSchema.workflow_queue.c.started_at_epoch_ms.isnot(None) ) @@ -1167,6 +1168,7 @@ def start_queued_workflows(self, queue: "Queue") -> List[str]: c.execute( sa.delete(SystemSchema.workflow_queue) .where(SystemSchema.workflow_queue.c.completed_at_epoch_ms != None) + .where(SystemSchema.workflow_queue.c.queue_name == queue.name) .where( SystemSchema.workflow_queue.c.started_at_epoch_ms < start_time_ms - limiter_period_ms diff --git a/tests/test_queue.py b/tests/test_queue.py index d40cbdcd..e3c5bf2c 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -241,3 +241,82 @@ def test_workflow(var1: str, var2: str) -> float: # Verify all queue entries eventually get cleaned up. assert queue_entries_are_cleaned_up(dbos) + + +def test_multiple_queues(dbos: DBOS) -> None: + + wf_counter = 0 + flag = False + workflow_event = threading.Event() + main_thread_event = threading.Event() + + @DBOS.workflow() + def workflow_one() -> None: + nonlocal wf_counter + wf_counter += 1 + main_thread_event.set() + workflow_event.wait() + + @DBOS.workflow() + def workflow_two() -> None: + nonlocal flag + flag = True + + concurrency_queue = Queue("test_concurrency_queue", 1) + handle1 = concurrency_queue.enqueue(workflow_one) + assert handle1.get_status().queue_name == "test_concurrency_queue" + handle2 = concurrency_queue.enqueue(workflow_two) + + @DBOS.workflow() + def limited_workflow(var1: str, var2: str) -> float: + assert var1 == "abc" and var2 == "123" + return time.time() + + limit = 5 + period = 2 + limiter_queue = Queue( + "test_limit_queue", limiter={"limit": limit, "period": period} + ) + + handles: list[WorkflowHandle[float]] = [] + times: list[float] = [] + + # Launch a number of tasks equal to three times the limit. + # This should lead to three "waves" of the limit tasks being + # executed simultaneously, followed by a wait of the period, + # followed by the next wave. + num_waves = 3 + for _ in range(limit * num_waves): + h = limiter_queue.enqueue(limited_workflow, "abc", "123") + handles.append(h) + for h in handles: + times.append(h.get_result()) + + # Verify that each "wave" of tasks started at the ~same time. + for wave in range(num_waves): + for i in range(wave * limit, (wave + 1) * limit - 1): + assert times[i + 1] - times[i] < 0.2 + + # Verify that the gap between "waves" is ~equal to the period + for wave in range(num_waves - 1): + assert times[limit * (wave + 1)] - times[limit * wave] > period - 0.2 + assert times[limit * (wave + 1)] - times[limit * wave] < period + 0.2 + + # Verify all workflows get the SUCCESS status eventually + dbos._sys_db.wait_for_buffer_flush() + for h in handles: + assert h.get_status().status == WorkflowStatusString.SUCCESS.value + + # Verify that during all this time, the second task + # was not launched on the concurrency-limited queue. + # Then, finish the first task and verify the second + # task runs on schedule. + assert not flag + workflow_event.set() + assert handle1.get_result() == None + assert handle2.get_result() == None + assert flag + assert wf_counter == 1 + + # Verify all queue entries eventually get cleaned up. + assert queue_entries_are_cleaned_up(dbos)