Skip to content

Commit

Permalink
several submission fixes (#402)
Browse files Browse the repository at this point in the history
* fix time stat in completed jobs

* catch dataset validation errors at submit time and log it

* don't kill submit because of bad actors

* add two more reset errors

* allow 0 tasks for external datasets
  • Loading branch information
dsschult authored Nov 12, 2024
1 parent 670e1eb commit eff160d
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 60 deletions.
3 changes: 1 addition & 2 deletions iceprod/core/data/dataset.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@
}
},
"required": ["name", "trays"]
},
"minItems": 1
}
}
},
"required": [ "version", "tasks", "description" ],
Expand Down
66 changes: 39 additions & 27 deletions iceprod/server/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,39 +140,51 @@ async def get_tasks_to_queue(self, num: int) -> list[Task]:

tasks = []
for f in asyncio.as_completed(futures):
task = await f
# add default resource requirements
task.requirements = self._get_resources(task)
try:
task = await f
except Exception: # already logged in function
continue
try:
# add default resource requirements
task.requirements = self._get_resources(task)
except Exception:
logger.warning('cannot get task resources for %s.%s', task.dataset.dataset_id, task.task_id, exc_info=True)
continue
tasks.append(task)

return tasks

async def _convert_to_task(self, task):
"""Convert from basic task dict to a Task object"""
d = deepcopy(await self.dataset_lookup(task['dataset_id']))
# don't bother looking up the job status - trust that if we got a task, we're in processing
j = Job(dataset=d, job_id=task['job_id'], job_index=task['job_index'], status=JOB_STATUS_START)
t = Task(
dataset=d,
job=j,
task_id=task['task_id'],
task_index=task['task_index'],
instance_id=task['instance_id'],
name=task['name'],
depends=task['depends'],
requirements=task['requirements'],
status=task['status'],
site=self.site,
stats={},
task_files=[],
)
await t.load_task_files_from_api(self.rest_client)

# load some config defaults
config = t.dataset.config
if (not config['options'].get('site_temp','')) and self.cfg['queue'].get('site_temp', ''):
config['options']['site_temp'] = self.cfg['queue']['site_temp']
add_default_options(config['options'])
try:
d = deepcopy(await self.dataset_lookup(task['dataset_id']))
# don't bother looking up the job status - trust that if we got a task, we're in processing
j = Job(dataset=d, job_id=task['job_id'], job_index=task['job_index'], status=JOB_STATUS_START)
t = Task(
dataset=d,
job=j,
task_id=task['task_id'],
task_index=task['task_index'],
instance_id=task['instance_id'],
name=task['name'],
depends=task['depends'],
requirements=task['requirements'],
status=task['status'],
site=self.site,
stats={},
task_files=[],
)
await t.load_task_files_from_api(self.rest_client)

# load some config defaults
config = t.dataset.config
if (not config['options'].get('site_temp','')) and self.cfg['queue'].get('site_temp', ''):
config['options']['site_temp'] = self.cfg['queue']['site_temp']
add_default_options(config['options'])

except Exception:
logger.warning('Error converting task dict to task: %s.%s', task['dataset_id'], task['task_id'], exc_info=True)
raise

return t

Expand Down
50 changes: 38 additions & 12 deletions iceprod/server/plugins/condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ def from_condor_status(num):
'connection timed out',
# GPU errors
'opencl error: could not set up context',
'opencl error: could build the opencl program',
# CVMFS errors
'python: command not found',
'cannot read file data: Stale file handle',
'setenv: command not found',
]


Expand Down Expand Up @@ -170,7 +172,10 @@ class CondorSubmit:
}

_GENERIC_ADS = ['Iwd', 'IceProdDatasetId', 'IceProdTaskId', 'IceProdTaskInstanceId', 'MATCH_EXP_JOBGLIDEIN_ResourceName']
AD_INFO = ['RemotePool', 'RemoteHost', 'LastRemoteHost', 'LastRemotePool', 'HoldReason', 'LastHoldReason', 'RemoveReason', 'MachineAttrGLIDEIN_Site0'] + _GENERIC_ADS
AD_INFO = [
'RemotePool', 'RemoteHost', 'RemoteWallClockTime', 'ResidentSetSize_RAW', 'DiskUsage_RAW',
'HoldReason', 'RemoveReason', 'Reason', 'MachineAttrGLIDEIN_Site0',
] + _GENERIC_ADS
AD_PROJECTION_QUEUE = ['JobStatus', 'RemotePool', 'RemoteHost'] + _GENERIC_ADS
AD_PROJECTION_HISTORY = [
'JobStatus', 'ExitCode', 'RemoveReason', 'LastHoldReason', 'CpusUsage', 'RemoteSysCpu', 'RemoteUserCpu',
Expand Down Expand Up @@ -302,7 +307,7 @@ def create_submit_dir(self, task: Task, jel_dir: Path) -> Path:
path.mkdir(parents=True)
return path

async def submit(self, tasks: list[Task], jel: Path):
async def submit(self, tasks: list[Task], jel: Path) -> dict[CondorJobId, CondorJob]:
"""
Submit multiple jobs to Condor as a single batch.
Expand All @@ -311,6 +316,9 @@ async def submit(self, tasks: list[Task], jel: Path):
Args:
tasks: IceProd Tasks to submit
jel: common job event log
Returns:
dict of new jobs
"""
jel_dir = jel.parent
transfer_plugin_str = ';'.join(f'{k}={v}' for k,v in self.transfer_plugins.items())
Expand Down Expand Up @@ -429,7 +437,18 @@ async def submit(self, tasks: list[Task], jel: Path):
logger.debug("submitfile:\n%s", submitfile)

s = htcondor.Submit(submitfile)
self.condor_schedd.submit(s, count=1, itemdata=s.itemdata())
submit_result = self.condor_schedd.submit(s, count=1, itemdata=s.itemdata())

cluster_id = int(submit_result.cluster())
ret = {}
for i,job in enumerate(jobset):
ret[CondorJobId(cluster_id=cluster_id, proc_id=i)] = CondorJob(
dataset_id=job['datasetid'].strip('"'),
task_id=job['taskid'].strip('"'),
instance_id=job['taskinstance'].strip('"'),
submit_dir=Path(job['initialdir']),
)
return ret

def get_jobs(self) -> {CondorJobId: CondorJob}:
"""
Expand Down Expand Up @@ -574,7 +593,8 @@ async def submit(self):
for key in tasks_by_dataset:
tasks = tasks_by_dataset[key]
try:
await self.submitter.submit(tasks, cur_jel)
ret = await self.submitter.submit(tasks, cur_jel)
self.jobs.update(ret)
except Exception as e:
logger.warning('submit failed for dataset %s', key, exc_info=True)
async with asyncio.TaskGroup() as tg:
Expand Down Expand Up @@ -663,12 +683,18 @@ async def wait(self, timeout):

# get stats
cpu = event.get('CpusUsage', None)
if not cpu:
cpu = parse_usage(event.get('RunRemoteUsage', ''))
gpu = event.get('GpusUsage', None)
memory = event.get('MemoryUsage', None) # MB
disk = event.get('DiskUsage', None) # KB
time_ = event.get('LastRemoteWallClockTime', None) # seconds
memory = event.get('ResidentSetSize_RAW', None) # KB
if memory is None:
memory = event.get('MemoryUsage', None)*1000 # MB
disk = event.get('DiskUsage_RAW', None) # KB
if disk is None:
disk = event.get('DiskUsage', None) # KB
time_ = event.get('RemoteWallClockTime', None) # seconds
if time_ is None:
time_ = parse_usage(event.get('RunRemoteUsage', '')) / event.get('RequestCpus', 1)
elif cpu is None and time_:
cpu = parse_usage(event.get('RunRemoteUsage', '')) / time_
# data_in = event['ReceivedBytes'] # KB
# data_out = event['SentBytes'] # KB

Expand All @@ -678,7 +704,7 @@ async def wait(self, timeout):
if gpu is not None:
resources['gpu'] = gpu
if memory is not None:
resources['memory'] = memory/1000.
resources['memory'] = memory/1000000.
if disk is not None:
resources['disk'] = disk/1000000.
if time_ is not None:
Expand All @@ -696,10 +722,10 @@ async def wait(self, timeout):
reason = None
if r := event.get('HoldReason'):
reason = r
elif r := event.get('LastHoldReason'):
reason = r
elif r := event.get('RemoveReason'):
reason = r
elif r := event.get('Reason'):
reason = r

# finish job
await self.finish(job_id, success=success, resources=resources, stats=stats, reason=reason)
Expand Down
12 changes: 6 additions & 6 deletions requirements-docs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ attrs==24.2.0
# referencing
babel==2.16.0
# via sphinx
boto3==1.35.53
boto3==1.35.59
# via iceprod (setup.py)
botocore==1.35.53
botocore==1.35.59
# via
# boto3
# s3transfer
Expand Down Expand Up @@ -50,7 +50,7 @@ exceptiongroup==1.2.2
# via anyio
h11==0.14.0
# via httpcore
htcondor==23.10.1
htcondor==24.1.1
# via iceprod (setup.py)
httpcore==1.0.6
# via httpx
Expand Down Expand Up @@ -79,7 +79,7 @@ markupsafe==3.0.2
# via jinja2
motor==3.6.0
# via iceprod (setup.py)
packaging==24.1
packaging==24.2
# via sphinx
psutil==6.1.0
# via iceprod (setup.py)
Expand Down Expand Up @@ -123,7 +123,7 @@ requests-futures==1.0.1
# wipac-rest-tools
requests-toolbelt==1.0.0
# via iceprod (setup.py)
rpds-py==0.20.1
rpds-py==0.21.0
# via
# jsonschema
# referencing
Expand Down Expand Up @@ -155,7 +155,7 @@ sphinxcontrib-serializinghtml==2.0.0
# via sphinx
statsd==4.0.1
# via iceprod (setup.py)
tomli==2.0.2
tomli==2.1.0
# via sphinx
tornado==6.4.1
# via
Expand Down
16 changes: 8 additions & 8 deletions requirements-tests.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ attrs==24.2.0
# referencing
beautifulsoup4==4.12.3
# via iceprod (setup.py)
boto3==1.35.53
boto3==1.35.59
# via
# iceprod (setup.py)
# moto
botocore==1.35.53
botocore==1.35.59
# via
# boto3
# moto
Expand Down Expand Up @@ -60,7 +60,7 @@ flexmock==0.12.1
# via iceprod (setup.py)
h11==0.14.0
# via httpcore
htcondor==23.10.1
htcondor==24.1.1
# via iceprod (setup.py)
httpcore==1.0.6
# via httpx
Expand Down Expand Up @@ -95,11 +95,11 @@ mccabe==0.7.0
# via flake8
mock==5.1.0
# via iceprod (setup.py)
moto[s3]==5.0.18
moto[s3]==5.0.20
# via iceprod (setup.py)
motor==3.6.0
# via iceprod (setup.py)
packaging==24.1
packaging==24.2
# via pytest
pluggy==1.5.0
# via pytest
Expand Down Expand Up @@ -173,7 +173,7 @@ responses==0.25.3
# via moto
respx==0.21.1
# via iceprod (setup.py)
rpds-py==0.20.1
rpds-py==0.21.0
# via
# jsonschema
# referencing
Expand All @@ -191,7 +191,7 @@ soupsieve==2.6
# via beautifulsoup4
statsd==4.0.1
# via iceprod (setup.py)
tomli==2.0.2
tomli==2.1.0
# via
# coverage
# pytest
Expand All @@ -216,7 +216,7 @@ urllib3==2.2.3
# responses
# types-requests
# wipac-rest-tools
werkzeug==3.1.0
werkzeug==3.1.3
# via moto
wipac-dev-tools==1.13.0
# via
Expand Down
8 changes: 4 additions & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ attrs==24.2.0
# via
# jsonschema
# referencing
boto3==1.35.53
boto3==1.35.59
# via iceprod (setup.py)
botocore==1.35.53
botocore==1.35.59
# via
# boto3
# s3transfer
Expand Down Expand Up @@ -44,7 +44,7 @@ exceptiongroup==1.2.2
# via anyio
h11==0.14.0
# via httpcore
htcondor==23.10.1
htcondor==24.1.1
# via iceprod (setup.py)
httpcore==1.0.6
# via httpx
Expand Down Expand Up @@ -106,7 +106,7 @@ requests-futures==1.0.1
# wipac-rest-tools
requests-toolbelt==1.0.0
# via iceprod (setup.py)
rpds-py==0.20.1
rpds-py==0.21.0
# via
# jsonschema
# referencing
Expand Down
2 changes: 1 addition & 1 deletion tests/server/plugins/condor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ async def test_Grid_submit(schedd, i3prod_path):
jel_path = i3prod_path/'today'/'jel.log'
jel_path.parent.mkdir()
g.get_current_JEL = MagicMock(return_value=jel_path)
g.submitter.submit = AsyncMock()
g.submitter.submit = AsyncMock(return_value={})
g.task_reset = AsyncMock()

await g.submit()
Expand Down

0 comments on commit eff160d

Please sign in to comment.