Skip to content

Commit

Permalink
Merge pull request #271 from runpod/core-fix
Browse files Browse the repository at this point in the history
Core fix
  • Loading branch information
justinmerrell authored Jan 16, 2024
2 parents 8287a38 + c0222a7 commit ff83f8f
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
19 changes: 19 additions & 0 deletions .github/tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,24 @@
"value3"
]
}
},
{
"hardwareConfig": {
"endpointConfig": {
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80",
"name": "runpod-python E2E Test - Serverless Core"
},
"templateConfig": {
"env": [
{
"key": "RUNPOD_USE_CORE",
"value": "true"
}
]
}
},
"input": {
"mock_return": "this worked!"
}
}
]
9 changes: 6 additions & 3 deletions runpod/serverless/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def finish_stream(self, job_id: str) -> bool:


# -------------------------------- Process Job ------------------------------- #
def _process_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]:
async def _process_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]:
""" Process a single job. """
hook = Hook()

Expand Down Expand Up @@ -207,18 +207,21 @@ async def run(config: Dict[str, Any]) -> None:
while True:
jobs = hook.get_jobs(max_concurrency, max_jobs)

if len(jobs) == 0:
if len(jobs) == 0 or jobs is None:
continue

for job in jobs:
asyncio.create_task(_process_job(handler, job))
asyncio.create_task(_process_job(handler, job), name=job['id'])
await asyncio.sleep(0)

await asyncio.sleep(0)


def main(config: Dict[str, Any]) -> None:
"""Run the worker in an asyncio event loop."""
if config.get('handler') is None:
raise ValueError("config must contain a handler function")

try:
work_loop = asyncio.new_event_loop()
asyncio.ensure_future(run(config), loop=work_loop)
Expand Down

0 comments on commit ff83f8f

Please sign in to comment.