diff --git a/cdmtaskservice/app_state.py b/cdmtaskservice/app_state.py index ea14385..06729f3 100644 --- a/cdmtaskservice/app_state.py +++ b/cdmtaskservice/app_state.py @@ -58,7 +58,7 @@ async def build_app( # This method is getting pretty long but it's stupid simple so... # May want to parallelize some of this for faster startups. would need to rework prints logr = logging.getLogger(__name__) - coman = await CoroutineWrangler.create() + coman = CoroutineWrangler() logr.info("Connecting to KBase auth service... ") auth = await KBaseAuth.create( cfg.auth_url, @@ -112,6 +112,7 @@ async def build_app( images = Images(mongodao, imginfo) job_submit = JobSubmit(mongodao, s3, coman, runners) app.state._mongo = mongocli + app.state._coroman = coman app.state._cdmstate = AppState( auth, sfapi_client, s3, job_submit, job_state, images, runners ) @@ -133,6 +134,7 @@ async def destroy_app_state(app: FastAPI): """ appstate = _get_app_state_from_app(app) # first to check state was set up app.state._mongo.close() + app.state._coroman.destroy() await appstate.sfapi_client.destroy() # https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown await asyncio.sleep(0.250) diff --git a/cdmtaskservice/coroutine_manager.py b/cdmtaskservice/coroutine_manager.py index 592e187..b1af732 100644 --- a/cdmtaskservice/coroutine_manager.py +++ b/cdmtaskservice/coroutine_manager.py @@ -8,49 +8,41 @@ import asyncio import logging -from typing import Awaitable, Self +from typing import Awaitable from cdmtaskservice.arg_checkers import not_falsy as _not_falsy -_CLOSE_DELAY_SEC = 60 - - class CoroutineWrangler: """ The coroutine manager. """ - @classmethod - async def create(cls) -> Self: - """ Create the coroutine manager. """ - cw = CoroutineWrangler() - cw._reaper_task = asyncio.create_task(cw._reaper()) - return cw - def __init__(self): - self._closedelay = _CLOSE_DELAY_SEC # may want to make configurable? - self._coros = [] + """ Create the coroutine manager. """ + self._coros = set() self._destroy = False - # TODO CODE There's a simpler say to deal with this. Write tests and then implement - # https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task - async def _reaper(self): - logr = logging.getLogger(__name__) - while not self._destroy: - await asyncio.sleep(self._closedelay) - logr.info(f"Reaper processing {len(self._coros)} coroutines") - self._coros = [coro for coro in self._coros if not coro.done()] - logr.info(f"Reaper: {len(self._coros)} coroutines remaining") - async def run_coroutine(self, coro: Awaitable): """ Run a coroutine to completion. """ if self._destroy: raise ValueError("Manager is destroyed") - task = asyncio.create_task(_not_falsy(coro, "coro")) - self._coros.append(task) - - async def destroy(self): + task = asyncio.create_task(self._exception_wrapper(_not_falsy(coro, "coro"))) + self._coros.add(task) + task.add_done_callback(self._coros.discard) + logging.getLogger(__name__).info( + f"Running coroutine {coro}. {len(self._coros)} coroutines running." + ) + + async def _exception_wrapper(self, coro): + try: + await coro + except Exception as e: + logging.getLogger(__name__).exception( + f"Coroutine {coro} threw an unhandled exception: {e}" + ) + # Nothing else can be done + + def destroy(self): """ Cancel all coroutines and destroy the manager. """ self._destroy = True - self._reaper_task.cancel() for coro in self._coros: coro.cancel()