Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

be more aggressive about cleaning out jobs that have finished #397

Merged
merged 9 commits into from
Oct 25, 2024
14 changes: 11 additions & 3 deletions iceprod/server/plugins/condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def from_condor_status(num):
'sigterm',
'killed',
'bus error (core dumped)',
'segmentation fault (core dumped)',
'operation timed out',
'connection timed out',
]
Expand Down Expand Up @@ -898,25 +899,32 @@ async def check_submit_dir(self):
Return directory paths that should be cleaned up.
"""
# get time limits
queue_tasks = {j.task_id for j in self.jobs.values()}
queued_time = self.cfg['queue'].get('max_task_queued_time', 86400*2)
processing_time = self.cfg['queue'].get('max_task_processing_time', 86400*2)
suspend_time = self.cfg['queue'].get('suspend_submit_dir_time', 86400)
now = time.time()
job_clean_logs_time = now - suspend_time
job_old_time = now - (queued_time + processing_time)
dir_old_time = now - (queued_time + processing_time + suspend_time)
logger.debug('now: %r, job_old_time: %r, dir_old_time: %r', now, job_old_time, dir_old_time)
logger.debug('now: %r, job_clean_logs_time: %r, job_old_time: %r, dir_old_time: %r', now, job_clean_logs_time, job_old_time, dir_old_time)

for daydir in self.submit_dir.glob('[0-9][0-9][0-9][0-9]*'):
logger.debug('looking at daydir %s', daydir)
if daydir.is_dir():
empty = True
for path in daydir.iterdir():
logger.debug('looking at path %s', path)
job_active = path.name.split('_')[0] in queue_tasks
logger.debug('looking at path %s, active: %r', path, job_active)
st = path.lstat()
logger.debug('stat: %r', st)
if stat.S_ISDIR(st.st_mode):
empty = False
if st.st_mtime < job_old_time:
if not job_active:
if st.st_mtime < job_clean_logs_time:
logger.info('cleaning up submit dir %s', path)
shutil.rmtree(path)
elif st.st_mtime < job_old_time:
yield path
if st.st_mtime < dir_old_time:
logger.info('cleaning up submit dir %s', path)
Expand Down
4 changes: 2 additions & 2 deletions requirements-docs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ attrs==24.2.0
# referencing
babel==2.16.0
# via sphinx
boto3==1.35.45
boto3==1.35.49
# via iceprod (setup.py)
botocore==1.35.45
botocore==1.35.49
# via
# boto3
# s3transfer
Expand Down
6 changes: 3 additions & 3 deletions requirements-tests.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ attrs==24.2.0
# referencing
beautifulsoup4==4.12.3
# via iceprod (setup.py)
boto3==1.35.45
boto3==1.35.49
# via
# iceprod (setup.py)
# moto
botocore==1.35.45
botocore==1.35.49
# via
# boto3
# moto
Expand Down Expand Up @@ -216,7 +216,7 @@ urllib3==2.2.3
# responses
# types-requests
# wipac-rest-tools
werkzeug==3.0.4
werkzeug==3.0.6
# via moto
wipac-dev-tools==1.13.0
# via
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ attrs==24.2.0
# via
# jsonschema
# referencing
boto3==1.35.45
boto3==1.35.49
# via iceprod (setup.py)
botocore==1.35.45
botocore==1.35.49
# via
# boto3
# s3transfer
Expand Down
34 changes: 31 additions & 3 deletions tests/server/plugins/condor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ async def test_Grid_check_delete_day(schedd, i3prod_path, set_time):
assert g.jels == {}


async def test_Grid_check_old(schedd, i3prod_path, set_time):
async def test_Grid_check_old_delete(schedd, i3prod_path, set_time):
override = ['queue.type=htcondor', 'queue.max_task_queued_time=10', 'queue.max_task_processing_time=10', 'queue.suspend_submit_dir_time=10']
cfg = iceprod.server.config.IceProdConfig(save=False, override=override)

Expand All @@ -661,7 +661,7 @@ async def test_Grid_check_old(schedd, i3prod_path, set_time):
assert dirs == {daydir.name: []}


async def test_Grid_check_oldjob(schedd, i3prod_path, set_time):
async def test_Grid_check_oldjob_remove(schedd, i3prod_path, set_time):
override = ['queue.type=htcondor', 'queue.max_task_queued_time=10', 'queue.max_task_processing_time=10', 'queue.suspend_submit_dir_time=10']
cfg = iceprod.server.config.IceProdConfig(save=False, override=override)

Expand All @@ -682,7 +682,7 @@ async def test_Grid_check_oldjob(schedd, i3prod_path, set_time):
os.utime(p, (t, t))
logging.info('set time to %d', t)

jobs[CondorJobId(cluster_id=1, proc_id=0)] = CondorJob(status=JobStatus.IDLE, submit_dir=p)
jobs[CondorJobId(cluster_id=1, proc_id=0)] = CondorJob(status=JobStatus.IDLE, submit_dir=p, task_id=p.name)

await g.check()

Expand All @@ -691,6 +691,34 @@ async def test_Grid_check_oldjob(schedd, i3prod_path, set_time):
assert g.submitter.remove.call_count == 1


async def test_Grid_check_oldjob_delete(schedd, i3prod_path, set_time):
override = ['queue.type=htcondor', 'queue.max_task_queued_time=10', 'queue.max_task_processing_time=10', 'queue.suspend_submit_dir_time=10']
cfg = iceprod.server.config.IceProdConfig(save=False, override=override)

rc = MagicMock()
g = iceprod.server.plugins.condor.Grid(cfg=cfg, rest_client=rc, cred_client=None)

jobs = {}
g.submitter.get_jobs = MagicMock(return_value=jobs)
g.submitter.get_history = MagicMock(return_value={})
g.submitter.remove = MagicMock()
g.get_tasks_on_queue = AsyncMock(return_value=[])

jel = g.get_current_JEL()
daydir = jel.parent
p = daydir / 'olddir'
p.mkdir()
t = time.mktime(set_time.utctimetuple()) - 15 # must be older than suspend time
os.utime(p, (t, t))
logging.info('set time to %d', t)

await g.check()

dirs = {x.name: [x for x in x.iterdir() if x.is_dir()] for x in g.submit_dir.glob('[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]T[0-9][0-9]')}
assert dirs == {daydir.name: []}
assert g.submitter.remove.call_count == 0


@pytest.mark.parametrize('jel_jobs,queue_jobs,hist_jobs,remove_calls,finish_calls', [
({(1,0): JobStatus.IDLE},
{(1,0): JobStatus.IDLE},
Expand Down
Loading