diff --git a/iceprod/server/plugins/condor.py b/iceprod/server/plugins/condor.py index 1fb57b29..b8c32272 100644 --- a/iceprod/server/plugins/condor.py +++ b/iceprod/server/plugins/condor.py @@ -305,7 +305,7 @@ def create_submit_dir(self, task: Task, jel_dir: Path) -> Path: path.mkdir(parents=True) return path - async def submit(self, tasks: list[Task], jel: Path): + async def submit(self, tasks: list[Task], jel: Path) -> dict[CondorJobId, CondorJob]: """ Submit multiple jobs to Condor as a single batch. @@ -314,6 +314,9 @@ async def submit(self, tasks: list[Task], jel: Path): Args: tasks: IceProd Tasks to submit jel: common job event log + + Returns: + dict of new jobs """ jel_dir = jel.parent transfer_plugin_str = ';'.join(f'{k}={v}' for k,v in self.transfer_plugins.items()) @@ -432,7 +435,18 @@ async def submit(self, tasks: list[Task], jel: Path): logger.debug("submitfile:\n%s", submitfile) s = htcondor.Submit(submitfile) - self.condor_schedd.submit(s, count=1, itemdata=s.itemdata()) + submit_result = self.condor_schedd.submit(s, count=1, itemdata=s.itemdata()) + + cluster_id = int(submit_result.cluster()) + ret = {} + for i,job in enumerate(jobset): + ret[CondorJobId(cluster_id=cluster_id, proc_id=i)] = CondorJob( + dataset_id=job['datasetid'].strip('"'), + task_id=job['taskid'].strip('"'), + instance_id=job['taskinstance'].strip('"'), + submit_dir=Path(job['initialdir']), + ) + return ret def get_jobs(self) -> {CondorJobId: CondorJob}: """ @@ -541,7 +555,8 @@ async def run(self, forever=True): while True: start = time.monotonic() try: - await self.submit() + ret = await self.submit() + self.jobs.update(ret) except Exception: logger.warning('failed to submit', exc_info=True) wait_time = max(0, self.cfg['queue']['submit_interval'] - (time.monotonic() - start))