Skip to content

Commit

Permalink
Kill tasks during job prep
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie committed Jan 13, 2025
1 parent c0ecbe5 commit 50798ed
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 6 deletions.
1 change: 1 addition & 0 deletions changes.d/6535.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ensure tasks can be killed while in the preparing state.
13 changes: 8 additions & 5 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1078,18 +1078,21 @@ def kill_tasks(
to_kill: List[TaskProxy] = []
unkillable: List[TaskProxy] = []
for itask in itasks:
if itask.state(*TASK_STATUSES_ACTIVE):
if itask.state_reset(is_held=True):
self.data_store_mgr.delta_task_state(itask)
if not itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):
unkillable.append(itask)
continue
if itask.state_reset(is_held=True):
self.data_store_mgr.delta_task_state(itask)
if itask.state(TASK_STATUS_PREPARING):
self.task_job_mgr.kill_prep_task(itask)
else:
to_kill.append(itask)
if jobless:
# Directly set failed in sim mode:
self.task_events_mgr.process_message(
itask, 'CRITICAL', TASK_STATUS_FAILED,
flag=self.task_events_mgr.FLAG_RECEIVED
)
else:
unkillable.append(itask)
if warn and unkillable:
LOG.warning(
"Tasks not killable: "
Expand Down
10 changes: 9 additions & 1 deletion cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class TaskJobManager:

def __init__(self, workflow, proc_pool, workflow_db_mgr,
task_events_mgr, data_store_mgr, bad_hosts):
self.workflow = workflow
self.workflow: str = workflow
self.proc_pool = proc_pool
self.workflow_db_mgr: WorkflowDatabaseManager = workflow_db_mgr
self.task_events_mgr = task_events_mgr
Expand Down Expand Up @@ -196,6 +196,14 @@ def kill_task_jobs(
self._kill_task_jobs_callback_255
)

def kill_prep_task(self, itask: 'TaskProxy') -> None:
"""Kill a preparing task."""
itask.waiting_on_job_prep = False
itask.local_job_file_path = None # reset for retry
self._prep_submit_task_job_error(
self.workflow, itask, '(killed in job prep)', ''
)

def poll_task_jobs(self, workflow, itasks, msg=None):
"""Poll jobs of specified tasks.
Expand Down
48 changes: 48 additions & 0 deletions tests/integration/test_kill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import logging
import pytest

from cylc.flow.commands import kill_tasks, run_cmd
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_state import TASK_STATUS_SUBMIT_FAILED, TASK_STATUS_WAITING


async def test_kill_preparing(
one_conf,
flow,
scheduler,
start,
monkeypatch: pytest.MonkeyPatch,
log_filter,
):
"""Test killing a preparing task."""
schd: Scheduler = scheduler(
flow(one_conf), run_mode='live', paused_start=False
)
async with start(schd):
# Make the task indefinitely preparing:
monkeypatch.setattr(
schd.task_job_mgr, '_prep_submit_task_job', lambda *a, **k: None
)
itask = schd.pool.get_tasks()[0]
assert itask.state(TASK_STATUS_WAITING, is_held=False)
schd.start_job_submission([itask])

await run_cmd(kill_tasks(schd, [itask.tokens.relative_id]))
assert itask.state(TASK_STATUS_SUBMIT_FAILED, is_held=True)
assert log_filter(logging.ERROR, 'killed in job prep')

0 comments on commit 50798ed

Please sign in to comment.