Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle uncaught exceptions in coroutine manager and simplify #139

Merged
merged 1 commit into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cdmtaskservice/app_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
# This method is getting pretty long but it's stupid simple so...
# May want to parallelize some of this for faster startups. would need to rework prints
logr = logging.getLogger(__name__)
coman = await CoroutineWrangler.create()
coman = CoroutineWrangler()

Check warning on line 61 in cdmtaskservice/app_state.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/app_state.py#L61

Added line #L61 was not covered by tests
logr.info("Connecting to KBase auth service... ")
auth = await KBaseAuth.create(
cfg.auth_url,
Expand Down Expand Up @@ -112,6 +112,7 @@
images = Images(mongodao, imginfo)
job_submit = JobSubmit(mongodao, s3, coman, runners)
app.state._mongo = mongocli
app.state._coroman = coman

Check warning on line 115 in cdmtaskservice/app_state.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/app_state.py#L115

Added line #L115 was not covered by tests
app.state._cdmstate = AppState(
auth, sfapi_client, s3, job_submit, job_state, images, runners
)
Expand All @@ -133,6 +134,7 @@
"""
appstate = _get_app_state_from_app(app) # first to check state was set up
app.state._mongo.close()
app.state._coroman.destroy()

Check warning on line 137 in cdmtaskservice/app_state.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/app_state.py#L137

Added line #L137 was not covered by tests
await appstate.sfapi_client.destroy()
# https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown
await asyncio.sleep(0.250)
Expand Down
48 changes: 20 additions & 28 deletions cdmtaskservice/coroutine_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,41 @@

import asyncio
import logging
from typing import Awaitable, Self
from typing import Awaitable

from cdmtaskservice.arg_checkers import not_falsy as _not_falsy


_CLOSE_DELAY_SEC = 60


class CoroutineWrangler:
""" The coroutine manager. """

@classmethod
async def create(cls) -> Self:
""" Create the coroutine manager. """
cw = CoroutineWrangler()
cw._reaper_task = asyncio.create_task(cw._reaper())
return cw

def __init__(self):
self._closedelay = _CLOSE_DELAY_SEC # may want to make configurable?
self._coros = []
""" Create the coroutine manager. """
self._coros = set()

Check warning on line 21 in cdmtaskservice/coroutine_manager.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/coroutine_manager.py#L21

Added line #L21 was not covered by tests
self._destroy = False

# TODO CODE There's a simpler say to deal with this. Write tests and then implement
# https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
async def _reaper(self):
logr = logging.getLogger(__name__)
while not self._destroy:
await asyncio.sleep(self._closedelay)
logr.info(f"Reaper processing {len(self._coros)} coroutines")
self._coros = [coro for coro in self._coros if not coro.done()]
logr.info(f"Reaper: {len(self._coros)} coroutines remaining")

async def run_coroutine(self, coro: Awaitable):
""" Run a coroutine to completion. """
if self._destroy:
raise ValueError("Manager is destroyed")
task = asyncio.create_task(_not_falsy(coro, "coro"))
self._coros.append(task)

async def destroy(self):
task = asyncio.create_task(self._exception_wrapper(_not_falsy(coro, "coro")))
self._coros.add(task)
task.add_done_callback(self._coros.discard)
logging.getLogger(__name__).info(

Check warning on line 31 in cdmtaskservice/coroutine_manager.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/coroutine_manager.py#L28-L31

Added lines #L28 - L31 were not covered by tests
f"Running coroutine {coro}. {len(self._coros)} coroutines running."
)

async def _exception_wrapper(self, coro):
try:
await coro
except Exception as e:
logging.getLogger(__name__).exception(

Check warning on line 39 in cdmtaskservice/coroutine_manager.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/coroutine_manager.py#L36-L39

Added lines #L36 - L39 were not covered by tests
f"Coroutine {coro} threw an unhandled exception: {e}"
)
# Nothing else can be done

def destroy(self):
""" Cancel all coroutines and destroy the manager. """
self._destroy = True
self._reaper_task.cancel()
for coro in self._coros:
coro.cancel()
Loading