Skip to content

Commit

Permalink
Log more details when worker crashes (#2448)
Browse files Browse the repository at this point in the history
* jobs are deleted, not canceled

* fix logging strings

* add details in worker crashes

* not true anymore: we don't check for blocklist when finishing a job

* don't change log format

* remove unused import
  • Loading branch information
severo authored Feb 13, 2024
1 parent 8139901 commit 42d945d
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 14 deletions.
2 changes: 1 addition & 1 deletion libs/libcommon/src/libcommon/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ def finish_job(
# check if the job is still in started status
job_info = job_result["job_info"]
if not Queue().is_job_started(job_id=job_info["job_id"]):
logging.debug("the job was cancelled, don't update the cache")
logging.debug("the job was deleted, don't update the cache")
return TasksStatistics()
# if the job could not provide an output, finish it and return
if not job_result["output"]:
Expand Down
6 changes: 3 additions & 3 deletions libs/libcommon/src/libcommon/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,8 +698,8 @@ def get_next_waiting_job(
)
raise EmptyQueueError("no job available")

def _start_newest_job_and_cancel_others(self, job: JobDocument) -> JobDocument:
"""Start a job (the newest one for unicity_id) and cancel the other ones.
def _start_newest_job_and_delete_others(self, job: JobDocument) -> JobDocument:
"""Start a job (the newest one for unicity_id) and delete the other ones.
A lock is used to ensure that the job is not started by another worker.
Expand Down Expand Up @@ -803,7 +803,7 @@ def start_job(
raise RuntimeError(
f"The job type {next_waiting_job.type} is not in the list of allowed job types {job_types_only}"
)
started_job = self._start_newest_job_and_cancel_others(job=next_waiting_job)
started_job = self._start_newest_job_and_delete_others(job=next_waiting_job)
return started_job.info()

def get_job_with_id(self, job_id: str) -> JobDocument:
Expand Down
2 changes: 1 addition & 1 deletion libs/libcommon/tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ def test_delete_dataset_waiting_jobs(queue_mongo_resource: QueueMongoResource) -
-> deletes at several levels (dataset, config, split)
-> deletes waiting jobs, but not started jobs
-> remove locks
-> does not cancel, and does not remove locks, for other datasets
-> does not delete, and does not remove locks, for other datasets
"""
dataset = "dataset"
other_dataset = "other_dataset"
Expand Down
4 changes: 2 additions & 2 deletions services/worker/src/worker/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,13 @@ def is_worker_alive(self, worker_loop_executor: OutputExecutor) -> bool:
try:
worker_loop_executor.stop() # raises an error if the worker returned unexpected exit code
except ProcessExitedWithError as err:
explanation = f"exit code f{err.exit_code}"
explanation = f"exit code {err.exit_code}"
if err.exit_code == -9:
explanation += " SIGKILL - surely an OOM"
error_msg = f"Worker crashed ({explanation})"
state = self.get_state()
if state and state["current_job_info"]:
error_msg += f"when running job_id={state['current_job_info']['job_id']}"
error_msg += f" when running job_id={state['current_job_info']['job_id']}"
logging.error(error_msg)
raise
return False
6 changes: 1 addition & 5 deletions services/worker/src/worker/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from libcommon.dtos import JobInfo, JobParams, JobResult, Priority
from libcommon.exceptions import (
CustomError,
DatasetInBlockListError,
DatasetNotFoundError,
DatasetScriptError,
JobManagerCrashedError,
Expand Down Expand Up @@ -117,10 +116,7 @@ def run_job(self) -> JobResult:
return job_result

def finish(self, job_result: JobResult) -> None:
try:
finish_job(job_result=job_result)
except DatasetInBlockListError:
self.debug("The dataset is blocked and has been deleted from the Datasets Server.")
finish_job(job_result=job_result)

def raise_if_parallel_response_exists(self, parallel_step_name: str) -> None:
try:
Expand Down
4 changes: 2 additions & 2 deletions services/worker/src/worker/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ def run(self) -> None:
continue
with StepProfiler("loop", "sleep"):
self.sleep()
except BaseException:
logging.exception("quit due to an uncaught error while processing the job")
except BaseException as err:
logging.exception(f"quit due to an uncaught error: {err}")
raise

def process_next_job(self) -> bool:
Expand Down

0 comments on commit 42d945d

Please sign in to comment.