Skip to content

Commit

Permalink
Handle uncaught exceptions in corotine manager and simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
MrCreosote committed Dec 20, 2024
1 parent 1fb9a4e commit 6f7d372
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 29 deletions.
4 changes: 3 additions & 1 deletion cdmtaskservice/app_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async def build_app(
# This method is getting pretty long but it's stupid simple so...
# May want to parallelize some of this for faster startups. would need to rework prints
logr = logging.getLogger(__name__)
coman = await CoroutineWrangler.create()
coman = CoroutineWrangler()

Check warning on line 61 in cdmtaskservice/app_state.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/app_state.py#L61

Added line #L61 was not covered by tests
logr.info("Connecting to KBase auth service... ")
auth = await KBaseAuth.create(
cfg.auth_url,
Expand Down Expand Up @@ -112,6 +112,7 @@ async def build_app(
images = Images(mongodao, imginfo)
job_submit = JobSubmit(mongodao, s3, coman, runners)
app.state._mongo = mongocli
app.state._coroman = coman

Check warning on line 115 in cdmtaskservice/app_state.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/app_state.py#L115

Added line #L115 was not covered by tests
app.state._cdmstate = AppState(
auth, sfapi_client, s3, job_submit, job_state, images, runners
)
Expand All @@ -133,6 +134,7 @@ async def destroy_app_state(app: FastAPI):
"""
appstate = _get_app_state_from_app(app) # first to check state was set up
app.state._mongo.close()
app.state._coroman.destroy()

Check warning on line 137 in cdmtaskservice/app_state.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/app_state.py#L137

Added line #L137 was not covered by tests
await appstate.sfapi_client.destroy()
# https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown
await asyncio.sleep(0.250)
Expand Down
48 changes: 20 additions & 28 deletions cdmtaskservice/coroutine_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,41 @@

import asyncio
import logging
from typing import Awaitable, Self
from typing import Awaitable

from cdmtaskservice.arg_checkers import not_falsy as _not_falsy


_CLOSE_DELAY_SEC = 60


class CoroutineWrangler:
""" The coroutine manager. """

@classmethod
async def create(cls) -> Self:
""" Create the coroutine manager. """
cw = CoroutineWrangler()
cw._reaper_task = asyncio.create_task(cw._reaper())
return cw

def __init__(self):
self._closedelay = _CLOSE_DELAY_SEC # may want to make configurable?
self._coros = []
""" Create the coroutine manager. """
self._coros = set()

Check warning on line 21 in cdmtaskservice/coroutine_manager.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/coroutine_manager.py#L21

Added line #L21 was not covered by tests
self._destroy = False

# TODO CODE There's a simpler say to deal with this. Write tests and then implement
# https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
async def _reaper(self):
logr = logging.getLogger(__name__)
while not self._destroy:
await asyncio.sleep(self._closedelay)
logr.info(f"Reaper processing {len(self._coros)} coroutines")
self._coros = [coro for coro in self._coros if not coro.done()]
logr.info(f"Reaper: {len(self._coros)} coroutines remaining")

async def run_coroutine(self, coro: Awaitable):
""" Run a coroutine to completion. """
if self._destroy:
raise ValueError("Manager is destroyed")
task = asyncio.create_task(_not_falsy(coro, "coro"))
self._coros.append(task)

async def destroy(self):
task = asyncio.create_task(self._exception_wrapper(_not_falsy(coro, "coro")))
self._coros.add(task)
task.add_done_callback(self._coros.discard)
logging.getLogger(__name__).info(

Check warning on line 31 in cdmtaskservice/coroutine_manager.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/coroutine_manager.py#L28-L31

Added lines #L28 - L31 were not covered by tests
f"Running coroutine {coro}. {len(self._coros)} coroutines running."
)

async def _exception_wrapper(self, coro):
try:
await coro
except Exception as e:
logging.getLogger(__name__).exception(

Check warning on line 39 in cdmtaskservice/coroutine_manager.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/coroutine_manager.py#L36-L39

Added lines #L36 - L39 were not covered by tests
f"Coroutine {coro} threw an unhandled exception: {e}"
)
# Nothing else can be done

def destroy(self):
""" Cancel all coroutines and destroy the manager. """
self._destroy = True
self._reaper_task.cancel()
for coro in self._coros:
coro.cancel()

0 comments on commit 6f7d372

Please sign in to comment.