Skip to content

Commit

Permalink
Merge pull request #240 from runpod/job_get_fail
Browse files Browse the repository at this point in the history
Job get fail
  • Loading branch information
justinmerrell authored Nov 29, 2023
2 parents 5581e2b + c3936af commit 0293403
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 7 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Change Log

## Release 1.3.7 (11/29/23)

### Fixed

- Catch timeouts when checking for available jobs.

### Changed

- Updated and pinned aiohttp to 3.9.1

---

## Release 1.3.6 (11/23/23)

## Fixed
Expand Down
22 changes: 15 additions & 7 deletions runpod/serverless/modules/rp_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any]
else:
next_job = received_request

except asyncio.TimeoutError:
log.debug("Timeout error, retrying.")
if retry is False:
break

except Exception as err: # pylint: disable=broad-except
err_type = type(err).__name__
err_message = str(err)
Expand All @@ -102,8 +107,6 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any]

await asyncio.sleep(1)
else:
log.debug("Confirmed valid request.", next_job['id'])

job_list.add_job(next_job["id"])
log.debug("Request ID added.", next_job['id'])

Expand All @@ -115,10 +118,16 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any]
async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]:
"""
Run the job using the handler.
Returns the job output or error.
Args:
handler (Callable): The handler function to use.
job (Dict[str, Any]): The job to run.
Returns:
Dict[str, Any]: The result of running the job.
"""
log.info('Started', job["id"])
run_result = {"error": "No output from handler."}
log.info('Started.', job["id"])
run_result = {}

try:
handler_return = handler(job)
Expand All @@ -129,8 +138,7 @@ async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]:
if isinstance(job_output, dict):
error_msg = job_output.pop("error", None)
refresh_worker = job_output.pop("refresh_worker", None)

run_result = {"output": job_output}
run_result['output'] = job_output

if error_msg:
run_result["error"] = error_msg
Expand Down
17 changes: 17 additions & 0 deletions tests/test_serverless/test_modules/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Test Serverless Job Module
'''

import asyncio
from unittest.mock import Mock, patch

from unittest import IsolatedAsyncioTestCase
Expand Down Expand Up @@ -139,6 +140,22 @@ async def test_get_job_no_input(self):
assert job is None
assert mock_log.error.call_count == 1

async def test_get_job_no_timeout(self):
""" Tests the get_job function with a timeout """
# Timeout Mock
response_timeout = Mock(ClientResponse)
response_timeout.status = 200

with patch("aiohttp.ClientSession") as mock_session_timeout, \
patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):

mock_session_timeout.get.return_value.__aenter__.side_effect = asyncio.TimeoutError
job = await rp_job.get_job(mock_session_timeout, retry=False)

assert job is None
assert mock_log.error.call_count == 0

async def test_get_job_exception(self):
'''
Tests the get_job function with an exception
Expand Down

0 comments on commit 0293403

Please sign in to comment.