From eff160d5f6ebbcd5eb879ff2cb769b46ea031a65 Mon Sep 17 00:00:00 2001 From: David Schultz Date: Tue, 12 Nov 2024 15:24:02 -0600 Subject: [PATCH] several submission fixes (#402) * fix time stat in completed jobs * catch dataset validation errors at submit time and log it * don't kill submit because of bad actors * add two more reset errors * allow 0 tasks for external datasets --- iceprod/core/data/dataset.schema.json | 3 +- iceprod/server/grid.py | 66 ++++++++++++++++----------- iceprod/server/plugins/condor.py | 50 +++++++++++++++----- requirements-docs.txt | 12 ++--- requirements-tests.txt | 16 +++---- requirements.txt | 8 ++-- tests/server/plugins/condor_test.py | 2 +- 7 files changed, 97 insertions(+), 60 deletions(-) diff --git a/iceprod/core/data/dataset.schema.json b/iceprod/core/data/dataset.schema.json index 29525ed5..c149eacc 100644 --- a/iceprod/core/data/dataset.schema.json +++ b/iceprod/core/data/dataset.schema.json @@ -145,8 +145,7 @@ } }, "required": ["name", "trays"] - }, - "minItems": 1 + } } }, "required": [ "version", "tasks", "description" ], diff --git a/iceprod/server/grid.py b/iceprod/server/grid.py index bfcebf86..ef8dec42 100644 --- a/iceprod/server/grid.py +++ b/iceprod/server/grid.py @@ -140,39 +140,51 @@ async def get_tasks_to_queue(self, num: int) -> list[Task]: tasks = [] for f in asyncio.as_completed(futures): - task = await f - # add default resource requirements - task.requirements = self._get_resources(task) + try: + task = await f + except Exception: # already logged in function + continue + try: + # add default resource requirements + task.requirements = self._get_resources(task) + except Exception: + logger.warning('cannot get task resources for %s.%s', task.dataset.dataset_id, task.task_id, exc_info=True) + continue tasks.append(task) return tasks async def _convert_to_task(self, task): """Convert from basic task dict to a Task object""" - d = deepcopy(await self.dataset_lookup(task['dataset_id'])) - # don't bother looking up the job status - trust that if we got a task, we're in processing - j = Job(dataset=d, job_id=task['job_id'], job_index=task['job_index'], status=JOB_STATUS_START) - t = Task( - dataset=d, - job=j, - task_id=task['task_id'], - task_index=task['task_index'], - instance_id=task['instance_id'], - name=task['name'], - depends=task['depends'], - requirements=task['requirements'], - status=task['status'], - site=self.site, - stats={}, - task_files=[], - ) - await t.load_task_files_from_api(self.rest_client) - - # load some config defaults - config = t.dataset.config - if (not config['options'].get('site_temp','')) and self.cfg['queue'].get('site_temp', ''): - config['options']['site_temp'] = self.cfg['queue']['site_temp'] - add_default_options(config['options']) + try: + d = deepcopy(await self.dataset_lookup(task['dataset_id'])) + # don't bother looking up the job status - trust that if we got a task, we're in processing + j = Job(dataset=d, job_id=task['job_id'], job_index=task['job_index'], status=JOB_STATUS_START) + t = Task( + dataset=d, + job=j, + task_id=task['task_id'], + task_index=task['task_index'], + instance_id=task['instance_id'], + name=task['name'], + depends=task['depends'], + requirements=task['requirements'], + status=task['status'], + site=self.site, + stats={}, + task_files=[], + ) + await t.load_task_files_from_api(self.rest_client) + + # load some config defaults + config = t.dataset.config + if (not config['options'].get('site_temp','')) and self.cfg['queue'].get('site_temp', ''): + config['options']['site_temp'] = self.cfg['queue']['site_temp'] + add_default_options(config['options']) + + except Exception: + logger.warning('Error converting task dict to task: %s.%s', task['dataset_id'], task['task_id'], exc_info=True) + raise return t diff --git a/iceprod/server/plugins/condor.py b/iceprod/server/plugins/condor.py index a2bbc9b0..db4085ca 100644 --- a/iceprod/server/plugins/condor.py +++ b/iceprod/server/plugins/condor.py @@ -107,9 +107,11 @@ def from_condor_status(num): 'connection timed out', # GPU errors 'opencl error: could not set up context', + 'opencl error: could build the opencl program', # CVMFS errors 'python: command not found', 'cannot read file data: Stale file handle', + 'setenv: command not found', ] @@ -170,7 +172,10 @@ class CondorSubmit: } _GENERIC_ADS = ['Iwd', 'IceProdDatasetId', 'IceProdTaskId', 'IceProdTaskInstanceId', 'MATCH_EXP_JOBGLIDEIN_ResourceName'] - AD_INFO = ['RemotePool', 'RemoteHost', 'LastRemoteHost', 'LastRemotePool', 'HoldReason', 'LastHoldReason', 'RemoveReason', 'MachineAttrGLIDEIN_Site0'] + _GENERIC_ADS + AD_INFO = [ + 'RemotePool', 'RemoteHost', 'RemoteWallClockTime', 'ResidentSetSize_RAW', 'DiskUsage_RAW', + 'HoldReason', 'RemoveReason', 'Reason', 'MachineAttrGLIDEIN_Site0', + ] + _GENERIC_ADS AD_PROJECTION_QUEUE = ['JobStatus', 'RemotePool', 'RemoteHost'] + _GENERIC_ADS AD_PROJECTION_HISTORY = [ 'JobStatus', 'ExitCode', 'RemoveReason', 'LastHoldReason', 'CpusUsage', 'RemoteSysCpu', 'RemoteUserCpu', @@ -302,7 +307,7 @@ def create_submit_dir(self, task: Task, jel_dir: Path) -> Path: path.mkdir(parents=True) return path - async def submit(self, tasks: list[Task], jel: Path): + async def submit(self, tasks: list[Task], jel: Path) -> dict[CondorJobId, CondorJob]: """ Submit multiple jobs to Condor as a single batch. @@ -311,6 +316,9 @@ async def submit(self, tasks: list[Task], jel: Path): Args: tasks: IceProd Tasks to submit jel: common job event log + + Returns: + dict of new jobs """ jel_dir = jel.parent transfer_plugin_str = ';'.join(f'{k}={v}' for k,v in self.transfer_plugins.items()) @@ -429,7 +437,18 @@ async def submit(self, tasks: list[Task], jel: Path): logger.debug("submitfile:\n%s", submitfile) s = htcondor.Submit(submitfile) - self.condor_schedd.submit(s, count=1, itemdata=s.itemdata()) + submit_result = self.condor_schedd.submit(s, count=1, itemdata=s.itemdata()) + + cluster_id = int(submit_result.cluster()) + ret = {} + for i,job in enumerate(jobset): + ret[CondorJobId(cluster_id=cluster_id, proc_id=i)] = CondorJob( + dataset_id=job['datasetid'].strip('"'), + task_id=job['taskid'].strip('"'), + instance_id=job['taskinstance'].strip('"'), + submit_dir=Path(job['initialdir']), + ) + return ret def get_jobs(self) -> {CondorJobId: CondorJob}: """ @@ -574,7 +593,8 @@ async def submit(self): for key in tasks_by_dataset: tasks = tasks_by_dataset[key] try: - await self.submitter.submit(tasks, cur_jel) + ret = await self.submitter.submit(tasks, cur_jel) + self.jobs.update(ret) except Exception as e: logger.warning('submit failed for dataset %s', key, exc_info=True) async with asyncio.TaskGroup() as tg: @@ -663,12 +683,18 @@ async def wait(self, timeout): # get stats cpu = event.get('CpusUsage', None) - if not cpu: - cpu = parse_usage(event.get('RunRemoteUsage', '')) gpu = event.get('GpusUsage', None) - memory = event.get('MemoryUsage', None) # MB - disk = event.get('DiskUsage', None) # KB - time_ = event.get('LastRemoteWallClockTime', None) # seconds + memory = event.get('ResidentSetSize_RAW', None) # KB + if memory is None: + memory = event.get('MemoryUsage', None)*1000 # MB + disk = event.get('DiskUsage_RAW', None) # KB + if disk is None: + disk = event.get('DiskUsage', None) # KB + time_ = event.get('RemoteWallClockTime', None) # seconds + if time_ is None: + time_ = parse_usage(event.get('RunRemoteUsage', '')) / event.get('RequestCpus', 1) + elif cpu is None and time_: + cpu = parse_usage(event.get('RunRemoteUsage', '')) / time_ # data_in = event['ReceivedBytes'] # KB # data_out = event['SentBytes'] # KB @@ -678,7 +704,7 @@ async def wait(self, timeout): if gpu is not None: resources['gpu'] = gpu if memory is not None: - resources['memory'] = memory/1000. + resources['memory'] = memory/1000000. if disk is not None: resources['disk'] = disk/1000000. if time_ is not None: @@ -696,10 +722,10 @@ async def wait(self, timeout): reason = None if r := event.get('HoldReason'): reason = r - elif r := event.get('LastHoldReason'): - reason = r elif r := event.get('RemoveReason'): reason = r + elif r := event.get('Reason'): + reason = r # finish job await self.finish(job_id, success=success, resources=resources, stats=stats, reason=reason) diff --git a/requirements-docs.txt b/requirements-docs.txt index c57e35cb..659419bd 100644 --- a/requirements-docs.txt +++ b/requirements-docs.txt @@ -16,9 +16,9 @@ attrs==24.2.0 # referencing babel==2.16.0 # via sphinx -boto3==1.35.53 +boto3==1.35.59 # via iceprod (setup.py) -botocore==1.35.53 +botocore==1.35.59 # via # boto3 # s3transfer @@ -50,7 +50,7 @@ exceptiongroup==1.2.2 # via anyio h11==0.14.0 # via httpcore -htcondor==23.10.1 +htcondor==24.1.1 # via iceprod (setup.py) httpcore==1.0.6 # via httpx @@ -79,7 +79,7 @@ markupsafe==3.0.2 # via jinja2 motor==3.6.0 # via iceprod (setup.py) -packaging==24.1 +packaging==24.2 # via sphinx psutil==6.1.0 # via iceprod (setup.py) @@ -123,7 +123,7 @@ requests-futures==1.0.1 # wipac-rest-tools requests-toolbelt==1.0.0 # via iceprod (setup.py) -rpds-py==0.20.1 +rpds-py==0.21.0 # via # jsonschema # referencing @@ -155,7 +155,7 @@ sphinxcontrib-serializinghtml==2.0.0 # via sphinx statsd==4.0.1 # via iceprod (setup.py) -tomli==2.0.2 +tomli==2.1.0 # via sphinx tornado==6.4.1 # via diff --git a/requirements-tests.txt b/requirements-tests.txt index 9d720ca0..cabc3e4c 100644 --- a/requirements-tests.txt +++ b/requirements-tests.txt @@ -14,11 +14,11 @@ attrs==24.2.0 # referencing beautifulsoup4==4.12.3 # via iceprod (setup.py) -boto3==1.35.53 +boto3==1.35.59 # via # iceprod (setup.py) # moto -botocore==1.35.53 +botocore==1.35.59 # via # boto3 # moto @@ -60,7 +60,7 @@ flexmock==0.12.1 # via iceprod (setup.py) h11==0.14.0 # via httpcore -htcondor==23.10.1 +htcondor==24.1.1 # via iceprod (setup.py) httpcore==1.0.6 # via httpx @@ -95,11 +95,11 @@ mccabe==0.7.0 # via flake8 mock==5.1.0 # via iceprod (setup.py) -moto[s3]==5.0.18 +moto[s3]==5.0.20 # via iceprod (setup.py) motor==3.6.0 # via iceprod (setup.py) -packaging==24.1 +packaging==24.2 # via pytest pluggy==1.5.0 # via pytest @@ -173,7 +173,7 @@ responses==0.25.3 # via moto respx==0.21.1 # via iceprod (setup.py) -rpds-py==0.20.1 +rpds-py==0.21.0 # via # jsonschema # referencing @@ -191,7 +191,7 @@ soupsieve==2.6 # via beautifulsoup4 statsd==4.0.1 # via iceprod (setup.py) -tomli==2.0.2 +tomli==2.1.0 # via # coverage # pytest @@ -216,7 +216,7 @@ urllib3==2.2.3 # responses # types-requests # wipac-rest-tools -werkzeug==3.1.0 +werkzeug==3.1.3 # via moto wipac-dev-tools==1.13.0 # via diff --git a/requirements.txt b/requirements.txt index 99f01d08..79fd4422 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,9 +12,9 @@ attrs==24.2.0 # via # jsonschema # referencing -boto3==1.35.53 +boto3==1.35.59 # via iceprod (setup.py) -botocore==1.35.53 +botocore==1.35.59 # via # boto3 # s3transfer @@ -44,7 +44,7 @@ exceptiongroup==1.2.2 # via anyio h11==0.14.0 # via httpcore -htcondor==23.10.1 +htcondor==24.1.1 # via iceprod (setup.py) httpcore==1.0.6 # via httpx @@ -106,7 +106,7 @@ requests-futures==1.0.1 # wipac-rest-tools requests-toolbelt==1.0.0 # via iceprod (setup.py) -rpds-py==0.20.1 +rpds-py==0.21.0 # via # jsonschema # referencing diff --git a/tests/server/plugins/condor_test.py b/tests/server/plugins/condor_test.py index 3a58597c..caa3f0f7 100644 --- a/tests/server/plugins/condor_test.py +++ b/tests/server/plugins/condor_test.py @@ -359,7 +359,7 @@ async def test_Grid_submit(schedd, i3prod_path): jel_path = i3prod_path/'today'/'jel.log' jel_path.parent.mkdir() g.get_current_JEL = MagicMock(return_value=jel_path) - g.submitter.submit = AsyncMock() + g.submitter.submit = AsyncMock(return_value={}) g.task_reset = AsyncMock() await g.submit()