Skip to content

Commit

Permalink
Merge pull request #274 from runpod/sls-core-tests
Browse files Browse the repository at this point in the history
fix: add missing sls core tests
  • Loading branch information
justinmerrell authored Jan 30, 2024
2 parents 1127ffb + 8cf43e9 commit 65215bd
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 30 deletions.
88 changes: 78 additions & 10 deletions .github/tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
{
"hardwareConfig": {
"endpointConfig": {
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80",
"name": "runpod-python E2E Test - Basic"
"name": "runpod-python E2E Test - Basic",
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
}
},
"input": {
Expand All @@ -13,8 +13,8 @@
{
"hardwareConfig": {
"endpointConfig": {
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80",
"name": "runpod-python E2E Test - Long Job"
"name": "runpod-python E2E Test - Long Job",
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
}
},
"input": {
Expand All @@ -25,8 +25,8 @@
{
"hardwareConfig": {
"endpointConfig": {
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80",
"name": "runpod-python E2E Test - Generator Handler"
"name": "runpod-python E2E Test - Generator Handler",
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
},
"templateConfig": {
"dockerArgs": "python3 -u /handler.py --generator --return_aggregate_stream"
Expand All @@ -43,8 +43,8 @@
{
"hardwareConfig": {
"endpointConfig": {
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80",
"name": "runpod-python E2E Test - Async Generator Handler"
"name": "runpod-python E2E Test - Async Generator Handler",
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
},
"templateConfig": {
"dockerArgs": "python3 -u /handler.py --async_generator --return_aggregate_stream"
Expand All @@ -61,8 +61,8 @@
{
"hardwareConfig": {
"endpointConfig": {
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80",
"name": "runpod-python E2E Test - Serverless Core"
"name": "runpod-python E2E Test - Serverless Core - Basic",
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
},
"templateConfig": {
"env": [
Expand All @@ -76,5 +76,73 @@
"input": {
"mock_return": "this worked!"
}
},
{
"hardwareConfig": {
"endpointConfig": {
"name": "runpod-python E2E Test - Serverless Core - Long Job",
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
},
"templateConfig": {
"env": [
{
"key": "RUNPOD_USE_CORE",
"value": "true"
}
]
}
},
"input": {
"mock_return": "Delay test successful returned after waiting 5 minutes.",
"mock_delay": 300
}
},
{
"hardwareConfig": {
"endpointConfig": {
"name": "runpod-python E2E Test - Serverless Core - Generator Handler",
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
},
"templateConfig": {
"dockerArgs": "python3 -u /handler.py --generator --return_aggregate_stream",
"env": [
{
"key": "RUNPOD_USE_CORE",
"value": "true"
}
]
}
},
"input": {
"mock_return": [
"value1",
"value2",
"value3"
]
}
},
{
"hardwareConfig": {
"endpointConfig": {
"name": "runpod-python E2E Test - Serverless Core - Async Generator Handler",
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
},
"templateConfig": {
"dockerArgs": "python3 -u /handler.py --async_generator --return_aggregate_stream",
"env": [
{
"key": "RUNPOD_USE_CORE",
"value": "true"
}
]
}
},
"input": {
"mock_return": [
"value1",
"value2",
"value3"
]
}
}
]
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Change Log

## Release 1.6.0 (1/29/24)

### Fixed

- Rust Serverless Core Passing all tests.
- GitHub Action and Python package updates
- Changelog date typo

## Release 1.5.3 (1/25/24)

### Added
Expand Down
65 changes: 45 additions & 20 deletions runpod/serverless/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
from ctypes import CDLL, byref, c_char_p, c_int
from typing import Any, Callable, List, Dict, Optional

from runpod.version import __version__ as runpod_version
from runpod.serverless.modules.rp_logger import RunPodLogger

from runpod.serverless.modules import rp_job

log = RunPodLogger()

Expand Down Expand Up @@ -44,6 +45,7 @@ class Hook: # pylint: disable=too-many-instance-attributes

def __new__(cls):
if Hook._instance is None:
log.debug("SLS Core | Initializing Hook.")
Hook._instance = object.__new__(cls)
Hook._initialized = False
return Hook._instance
Expand Down Expand Up @@ -136,7 +138,7 @@ def progress_update(self, job_id: str, json_data: bytes) -> bool:
c_char_p(json_data), c_int(len(json_data))
))

def stream_output(self, job_id: str, job_output: bytes) -> bool:
async def stream_output(self, job_id: str, job_output: bytes) -> bool:
"""
send part of a streaming result to AI-API.
"""
Expand Down Expand Up @@ -170,48 +172,70 @@ def finish_stream(self, job_id: str) -> bool:


# -------------------------------- Process Job ------------------------------- #
async def _process_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]:
async def _process_job(config: Dict[str, Any], job: Dict[str, Any], hook) -> Dict[str, Any]:
""" Process a single job. """
hook = Hook()
handler = config['handler']

result = {}
try:
result = handler(job)
except Exception as err:
raise RuntimeError(
f"run {job['id']}: user code raised an {type(err).__name__}") from err
if inspect.isgeneratorfunction(handler) or inspect.isasyncgenfunction(handler):
log.debug("SLS Core | Running job as a generator.")
generator_output = rp_job.run_job_generator(handler, job)
aggregated_output = {'output': []}

async for part in generator_output:
log.debug(f"SLS Core | Streaming output: {part}", job['id'])

if 'error' in part:
aggregated_output = part
break
if config.get('return_aggregate_stream', False):
aggregated_output['output'].append(part['output'])

await hook.stream_output(job['id'], part)

if inspect.isgeneratorfunction(handler):
for part in result:
hook.stream_output(job['id'], part)
log.debug("SLS Core | Finished streaming output.", job['id'])
hook.finish_stream(job['id'])
result = aggregated_output

hook.finish_stream(job['id'])
else:
log.debug("SLS Core | Running job as a standard function.")
result = await rp_job.run_job(handler, job)
result = result.get('output', result)

except Exception as err: # pylint: disable=broad-except
log.error(f"SLS Core | Error running job: {err}", job['id'])
result = {'error': str(err)}

else:
finally:
log.debug(f"SLS Core | Posting output: {result}", job['id'])
hook.post_output(job['id'], result)


# -------------------------------- Run Worker -------------------------------- #
# ---------------------------------------------------------------------------- #
# Run Worker #
# ---------------------------------------------------------------------------- #
async def run(config: Dict[str, Any]) -> None:
""" Run the worker.
Args:
config: A dictionary containing the following keys:
handler: A function that takes a job and returns a result.
"""
handler = config['handler']
max_concurrency = config.get('max_concurrency', 4)
max_jobs = config.get('max_jobs', 4)
max_concurrency = config.get('max_concurrency', 1)
max_jobs = config.get('max_jobs', 1)

hook = Hook()
serverless_hook = Hook()

while True:
jobs = hook.get_jobs(max_concurrency, max_jobs)
jobs = serverless_hook.get_jobs(max_concurrency, max_jobs)

if len(jobs) == 0 or jobs is None:
await asyncio.sleep(0)
continue

for job in jobs:
asyncio.create_task(_process_job(handler, job), name=job['id'])
asyncio.create_task(_process_job(config, job, serverless_hook), name=job['id'])
await asyncio.sleep(0)

await asyncio.sleep(0)
Expand All @@ -220,6 +244,7 @@ async def run(config: Dict[str, Any]) -> None:
def main(config: Dict[str, Any]) -> None:
"""Run the worker in an asyncio event loop."""
if config.get('handler') is None:
log.error("SLS Core | config must contain a handler function")
raise ValueError("config must contain a handler function")

try:
Expand Down
Binary file modified runpod/serverless/sls_core.so
Binary file not shown.

0 comments on commit 65215bd

Please sign in to comment.