diff --git a/.github/tests.json b/.github/tests.json index 384dec13..0e7e96d0 100644 --- a/.github/tests.json +++ b/.github/tests.json @@ -57,5 +57,24 @@ "value3" ] } + }, + { + "hardwareConfig": { + "endpointConfig": { + "gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80", + "name": "runpod-python E2E Test - Serverless Core" + }, + "templateConfig": { + "env": [ + { + "key": "RUNPOD_USE_CORE", + "value": "true" + } + ] + } + }, + "input": { + "mock_return": "this worked!" + } } ] diff --git a/runpod/serverless/core.py b/runpod/serverless/core.py index 6c9f9a6c..7074d7ff 100644 --- a/runpod/serverless/core.py +++ b/runpod/serverless/core.py @@ -170,7 +170,7 @@ def finish_stream(self, job_id: str) -> bool: # -------------------------------- Process Job ------------------------------- # -def _process_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]: +async def _process_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]: """ Process a single job. """ hook = Hook() @@ -207,11 +207,11 @@ async def run(config: Dict[str, Any]) -> None: while True: jobs = hook.get_jobs(max_concurrency, max_jobs) - if len(jobs) == 0: + if len(jobs) == 0 or jobs is None: continue for job in jobs: - asyncio.create_task(_process_job(handler, job)) + asyncio.create_task(_process_job(handler, job), name=job['id']) await asyncio.sleep(0) await asyncio.sleep(0) @@ -219,6 +219,9 @@ async def run(config: Dict[str, Any]) -> None: def main(config: Dict[str, Any]) -> None: """Run the worker in an asyncio event loop.""" + if config.get('handler') is None: + raise ValueError("config must contain a handler function") + try: work_loop = asyncio.new_event_loop() asyncio.ensure_future(run(config), loop=work_loop)