Skip to content

Commit

Permalink
Add general error handling to the nersc jaws flow
Browse files Browse the repository at this point in the history
When in a non-awaited coroutine, catch any errors and update the job,
since otherwise errors just get logged by the coroutine manager and
forgetten about

Also fix some spelling errors
  • Loading branch information
MrCreosote committed Jan 10, 2025
1 parent 486749a commit 2390381
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 44 deletions.
2 changes: 1 addition & 1 deletion cdmtaskservice/job_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async def submit(self, job_input: models.JobInput, user: kb_auth.KBaseUser) -> s
image = await self._mongo.get_image(parsedimage.name, digest=parsedimage.digest, tag=tag)
await self._s3.has_bucket(job_input.output_dir.split("/", 1)[0])
paths = [f.file if isinstance(f, models.S3File) else f for f in job_input.input_files]
# TODO PERF may wan to make concurrency configurable here
# TODO PERF may want to make concurrency configurable here
# TODO PERF this checks the file path syntax again, consider some way to avoid
meta = await self._s3.get_object_meta(S3Paths(paths))
new_input = []
Expand Down
53 changes: 28 additions & 25 deletions cdmtaskservice/jobflows/nersc_jaws.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import os
from pathlib import Path
import traceback
from typing import Any

from cdmtaskservice import models
Expand Down Expand Up @@ -66,6 +67,23 @@ def __init__(
self._s3insecure = s3_insecure_ssl
self._coman = _not_falsy(coro_manager, "coro_manager")
self._callback_root = _require_string(service_root_url, "service_root_url")

async def _handle_exception(self, e: Exception, job_id: str, errtype: str):
# TODO LOGGING figure out how logging it going to work etc.
logging.getLogger(__name__).exception(f"Error {errtype} job {job_id}")

Check warning on line 73 in cdmtaskservice/jobflows/nersc_jaws.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jobflows/nersc_jaws.py#L73

Added line #L73 was not covered by tests
# if this fails, well, then we're screwed
await self._mongo.set_job_error(

Check warning on line 75 in cdmtaskservice/jobflows/nersc_jaws.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jobflows/nersc_jaws.py#L75

Added line #L75 was not covered by tests
job_id,
# We'll need to see what kinds of errors happen and change the user message appropriately.
# Just provide a generic message for now, as most errors aren't going to be fixable
# by users
"An unexpected error occurred",
str(e),
models.JobState.ERROR,
# TODO TEST will need a way to mock out timestamps
timestamp.utcdatetime(),
traceback=traceback.format_exc(),
)

async def start_job(self, job: models.Job, objmeta: list[S3ObjectMeta]):
"""
Expand All @@ -77,7 +95,6 @@ async def start_job(self, job: models.Job, objmeta: list[S3ObjectMeta]):
"""
if _not_falsy(job, "job").state != models.JobState.CREATED:
raise InvalidJobStateError("Job must be in the created state")
logr = logging.getLogger(__name__)
# Could check that the s3 and job paths / etags match... YAGNI
# TODO PERF this validates the file paths yet again. Maybe the way to go is just have
# a validate method on S3Paths which can be called or not as needed, with
Expand Down Expand Up @@ -112,10 +129,7 @@ async def start_job(self, job: models.Job, objmeta: list[S3ObjectMeta]):
timestamp.utcdatetime(),
)
except Exception as e:
# TODO LOGGING figure out how logging it going to work etc.
logr.exception(f"Error starting download for job {job.id}")
# TODO IMPORTANT ERRORHANDLING update job state to ERROR w/ message and don't raise
raise e
await self._handle_exception(e, job.id, "starting file download for")

Check warning on line 132 in cdmtaskservice/jobflows/nersc_jaws.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jobflows/nersc_jaws.py#L132

Added line #L132 was not covered by tests

async def download_complete(self, job: models.AdminJobDetails):
"""
Expand All @@ -128,7 +142,7 @@ async def download_complete(self, job: models.AdminJobDetails):
# no errors, continue, otherwise put the job into an errored state.
# TODO ERRHANDLING IMPORTANT upload the output file from the download task and check for
# errors. If any exist, put the job into an errored state.
# TDOO LOGGING Add any relevant logs from the task / download task output file in state
# TODO LOGGING Add any relevant logs from the task / download task output file in state
# call
await self._mongo.update_job_state(
job.id,
Expand All @@ -139,7 +153,6 @@ async def download_complete(self, job: models.AdminJobDetails):
await self._coman.run_coroutine(self._submit_jaws_job(job))

async def _submit_jaws_job(self, job: models.AdminJobDetails):
logr = logging.getLogger(__name__)
try:
# TODO PERF configure file download concurrency
jaws_job_id = await self._nman.run_JAWS(job)
Expand All @@ -155,10 +168,7 @@ async def _submit_jaws_job(self, job: models.AdminJobDetails):
jaws_info = await poll_jaws(self._jaws, job.id, jaws_job_id)
await self._job_complete(job, jaws_info)
except Exception as e:
# TODO LOGGING figure out how logging it going to work etc.
logr.exception(f"Error starting JAWS job for job {job.id}")
# TODO IMPORTANT ERRORHANDLING update job state to ERROR w/ message and don't raise
raise e
await self._handle_exception(e, job.id, "starting JAWS job for")

Check warning on line 171 in cdmtaskservice/jobflows/nersc_jaws.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jobflows/nersc_jaws.py#L171

Added line #L171 was not covered by tests

async def job_complete(self, job: models.AdminJobDetails):
"""
Expand All @@ -175,9 +185,10 @@ async def job_complete(self, job: models.AdminJobDetails):
async def _job_complete(self, job: models.AdminJobDetails, jaws_info: dict[str, Any]):
if not jaws_client.is_done(jaws_info):
raise InvalidJobStateError("JAWS run is incomplete")
# TODO ERRHANDLING IMPORTANT if in an error state, pull the erros.json file from the
# JAWS job dir and add stderr / out to job record (what do to about huge
# logs?) and set job to error
# TODO ERRHANDLING IMPORTANT if in an error state, use https://github.com/ICRAR/ijson
# to pull data out of the the erros.json file at NERSC (since it could
# be huge. Store the stderr/out files... where? Check their Etags
# and set job to error
await self._mongo.update_job_state(
job.id,
models.JobState.JOB_SUBMITTED,
Expand All @@ -187,7 +198,6 @@ async def _job_complete(self, job: models.AdminJobDetails, jaws_info: dict[str,
await self._coman.run_coroutine(self._upload_files(job, jaws_info))

async def _upload_files(self, job: models.AdminJobDetails, jaws_info: dict[str, Any]):
logr = logging.getLogger(__name__)

async def presign(output_files: list[Path]) -> list[S3PresignedPost]:
root = job.job_input.output_dir
Expand Down Expand Up @@ -218,10 +228,7 @@ async def presign(output_files: list[Path]) -> list[S3PresignedPost]:
timestamp.utcdatetime(),
)
except Exception as e:
# TODO LOGGING figure out how logging it going to work etc.
logr.exception(f"Error starting upload for job {job.id}")
# TODO IMPORTANT ERRORHANDLING update job state to ERROR w/ message and don't raise
raise e
await self._handle_exception(e, job.id, "starting file upload for")

Check warning on line 231 in cdmtaskservice/jobflows/nersc_jaws.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jobflows/nersc_jaws.py#L231

Added line #L231 was not covered by tests

async def upload_complete(self, job: models.AdminJobDetails):
"""
Expand All @@ -234,13 +241,12 @@ async def upload_complete(self, job: models.AdminJobDetails):
# no errors, continue, otherwise put the job into an errored state.
# TODO ERRHANDLING IMPORTANT upload the output file from the upload task and check for
# errors. If any exist, put the job into an errored state.
# TDOO LOGGING Add any relevant logs from the task / download task output file in state
# TODO LOGGING Add any relevant logs from the task / download task output file in state
# call. Alternatively, just add code to fetch them from NERSC rather
# than storing them permanently. 99% of the time they'll be uninteresting
await self._coman.run_coroutine(self._upload_complete(job))

async def _upload_complete(self, job: models.AdminJobDetails):
logr = logging.getLogger(__name__)
try:
md5s = await self._nman.get_uploaded_JAWS_files(job)
filemd5s = {os.path.join(job.job_input.output_dir, f): md5 for f, md5 in md5s.items()}
Expand All @@ -265,7 +271,4 @@ async def _upload_complete(self, job: models.AdminJobDetails):
timestamp.utcdatetime()
)
except Exception as e:
# TODO LOGGING figure out how logging it going to work etc.
logr.exception(f"Error completing job {job.id}")
# TODO IMPORTANT ERRORHANDLING update job state to ERROR w/ message and don't raise
raise e
await self._handle_exception(e, job.id, "completing")

Check warning on line 274 in cdmtaskservice/jobflows/nersc_jaws.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jobflows/nersc_jaws.py#L274

Added line #L274 was not covered by tests
18 changes: 15 additions & 3 deletions cdmtaskservice/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
FLD_JOB_JAWS_DETAILS = "jaws_details"
FLD_JAWS_DETAILS_RUN_ID = "run_id"
FLD_JOB_OUTPUTS = "outputs"
FLD_JOB_ERROR = "error"
FLD_JOB_ADMIN_ERROR = "admin_error"
FLD_JOB_TRACEBACK = "traceback"


# https://en.wikipedia.org/wiki/Filename#Comparison_of_filename_limitations
Expand Down Expand Up @@ -671,7 +674,7 @@ def name_with_digest(self):


class S3FileOutput(BaseModel):
""" Am output file in an S3 instance. """
""" An output file in an S3 instance. """

# no validators since this is an outgoing data structure only
file: Annotated[str, Field(
Expand Down Expand Up @@ -706,13 +709,17 @@ class Job(BaseModel):
description="A list of tuples of (job_state, time_job_state_entered)."
)]
outputs: list[S3FileOutput] | None = None
# TODO ERRORHANDLING add error field and class
error: Annotated[str | None, Field(
example="The front fell off",
description="A description of the error that occurred."
)] = None


class NERSCDetails(BaseModel):
"""
Details about a job run at NERSC.
"""
# TODO NERSC more details, logs, etc.
download_task_id: Annotated[list[str], Field(
description="IDs for a tasks run via the NERSC SFAPI to download files from an S3 "
+ "instance to NERSC. Note that task details only persist for ~10 minutes past "
Expand All @@ -739,5 +746,10 @@ class AdminJobDetails(Job):
Information about a job with added details of interest to service administrators.
"""
nersc_details: NERSCDetails | None = None
# TODO NERSC more details, logs, etc.
jaws_details: JAWSDetails | None = None
admin_error: Annotated[str | None, Field(
example="The back fell off",
description="A description of the error that occurred oriented towards service "
+ "admins, potentially including more details."
)] = None
traceback: Annotated[str | None, Field(description="The error's traceback.")] = None
46 changes: 36 additions & 10 deletions cdmtaskservice/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,17 @@ async def get_job(
async def _update_job_state(
self,
job_id: str,
current_state: models.JobState,
state: models.JobState,
time: datetime.datetime,
push: dict[str, Any] | None = None,
set_: dict[str, Any] | None = None,
current_state: models.JobState | None = None,
):
query = {models.FLD_JOB_ID: _require_string(job_id, "job_id")}
if current_state:
query[models.FLD_JOB_STATE] = current_state.value

Check warning on line 151 in cdmtaskservice/mongo.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/mongo.py#L149-L151

Added lines #L149 - L151 were not covered by tests
res = await self._col_jobs.update_one(
{
models.FLD_JOB_ID: _require_string(job_id, "job_id"),
models.FLD_JOB_STATE: _not_falsy(current_state, "current_state").value,
},
query,
{
"$push": (push if push else {}) | {
models.FLD_JOB_TRANS_TIMES:
Expand Down Expand Up @@ -181,7 +181,7 @@ async def update_job_state(
state - the new state for the job.
time - the time at which the job transitioned to the new state.
"""
await self._update_job_state(job_id, current_state, state, time)
await self._update_job_state(job_id, state, time, current_state=current_state)

Check warning on line 184 in cdmtaskservice/mongo.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/mongo.py#L184

Added line #L184 was not covered by tests

_FLD_NERSC_DL_TASK = f"{models.FLD_JOB_NERSC_DETAILS}.{models.FLD_NERSC_DETAILS_DL_TASK_ID}"

Expand All @@ -202,7 +202,7 @@ async def add_NERSC_download_task_id(
"""
# may need to make this more generic where the cluster is passed in and mapped to
# a job structure location or something if we support more than NERSC
await self._update_job_state(job_id, current_state, state, time, push={
await self._update_job_state(job_id, state, time, current_state=current_state, push={

Check warning on line 205 in cdmtaskservice/mongo.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/mongo.py#L205

Added line #L205 was not covered by tests
self._FLD_NERSC_DL_TASK: _require_string(task_id, "task_id")
})

Expand All @@ -225,7 +225,7 @@ async def add_JAWS_run_id(
"""
# may need to make this more generic where the cluster is passed in and mapped to
# a job structure location or something if we support more than NERSC
await self._update_job_state(job_id, current_state, state, time, push={
await self._update_job_state(job_id, state, time, current_state=current_state, push={

Check warning on line 228 in cdmtaskservice/mongo.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/mongo.py#L228

Added line #L228 was not covered by tests
self._FLD_JAWS_RUN_ID: _require_string(run_id, "run_id")
})

Expand All @@ -248,7 +248,7 @@ async def add_NERSC_upload_task_id(
"""
# may need to make this more generic where the cluster is passed in and mapped to
# a job structure location or something if we support more than NERSC
await self._update_job_state(job_id, current_state, state, time, push={
await self._update_job_state(job_id, state, time, current_state=current_state, push={

Check warning on line 251 in cdmtaskservice/mongo.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/mongo.py#L251

Added line #L251 was not covered by tests
self._FLD_NERSC_UL_TASK: _require_string(task_id, "task_id")
})

Expand All @@ -269,9 +269,35 @@ async def add_output_files_to_job(
"""
out = [o.model_dump() for o in _not_falsy(output, "output")]
await self._update_job_state(
job_id, current_state, state, time, set_={models.FLD_JOB_OUTPUTS: out}
job_id, state, time, current_state=current_state, set_={models.FLD_JOB_OUTPUTS: out}
)

async def set_job_error(
self,
job_id: str,
user_error: str,
admin_error: str,
state: models.JobState,
time: datetime.datetime,
traceback: str | None = None,
):
"""
Put the job into an error state.
job_id - the job ID.
user_error - an error message targeted towards a service user.
admin_error - an error message targeted towards a service admin.
state - the new state for the job.
time - the time at which the job transitioned to the new state.
traceback - the error traceback.
"""
# TODO RETRIES will need to clear the error fields when attempting a retry
await self._update_job_state(job_id, state, time, set_={

Check warning on line 295 in cdmtaskservice/mongo.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/mongo.py#L295

Added line #L295 was not covered by tests
models.FLD_JOB_ERROR: user_error,
models.FLD_JOB_ADMIN_ERROR: admin_error,
models.FLD_JOB_TRACEBACK: traceback,
})


class NoSuchImageError(Exception):
""" The image does not exist in the system. """
Expand Down
2 changes: 1 addition & 1 deletion cdmtaskservice/nersc/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
# TODO CLEANUP clean up old code versions @NERSC somehow. Not particularly important

_DT_TARGET = Machine.dtns
# TDOO NERSCUIPDATE delete the following line when DTN downloads work normally.
# TODO NERSCUPDATE delete the following line when DTN downloads work normally.
# See https://nersc.servicenowservices.com/sp?sys_id=ad33e85f1b5a5610ac81a820f54bcba0&view=sp&id=ticket&table=incident
_DT_WORKAROUND = "source /etc/bashrc"

Expand Down
4 changes: 2 additions & 2 deletions cdmtaskservice/s3/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def split_paths(self, include_full_path=False) -> Generator[list[str, ...], None
yield parts


# TDOO TEST add tests for public method
# TODO TEST add tests for public method
# TODO S3PATHS allow accepting model paths to the constructor here so they aren't validated 2x
def validate_path(path: str, index: int = None) -> str:
"""
Expand Down Expand Up @@ -94,7 +94,7 @@ def validate_path(path: str, index: int = None) -> str:
return f"{bucket}/{key}"


# TDOO TEST add tests for public method
# TODO TEST add tests for public method
def validate_bucket_name(bucket_name: str, index: int = None):
"""
Validate an S3 bucket name.
Expand Down
2 changes: 1 addition & 1 deletion cdmtaskservice/s3/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ async def upload_presigned_url(
) from e


# TDOO CODE could merge this with the above method and add a toggle... eh
# TODO CODE could merge this with the above method and add a toggle... eh
async def upload_presigned_url_with_crc32(
session: aiohttp.ClientSession,
url: str,
Expand Down
2 changes: 1 addition & 1 deletion test/nersc/nersc_remote_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# TDOO TEST add tests
# TODO TEST add tests

from cdmtaskservice.nersc import remote # @UnusedImport

Expand Down

0 comments on commit 2390381

Please sign in to comment.