From aeac16d1dea518068c82d4a04bafe3bbb459c65d Mon Sep 17 00:00:00 2001 From: Steffen Albrecht Date: Sat, 12 Oct 2019 12:40:55 +0200 Subject: [PATCH 1/3] Added possibility to specify a maximum number of jobs on batch --- Manager.py | 37 +++++++++++++++++++++++++++++++++++++ io_func.py | 7 +++---- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/Manager.py b/Manager.py index ba8c1c5..14ef970 100644 --- a/Manager.py +++ b/Manager.py @@ -97,6 +97,7 @@ def __init__(self,options,header,workdir): self.stayAlive = 0 # loop counter to see if program is running self.numOfResubmit =0 self.watch = None + self.batchJobs = 0 self.printString = [] self.keepGoing = options.keepGoing self.exitOnQuestion = options.exitOnQuestion @@ -105,6 +106,7 @@ def __init__(self,options,header,workdir): def process_jobs(self,InputData,Job): jsonhelper = HelpJSON(self.workdir+'/SubmissinInfoSave.p') number_of_processes = len(InputData) + self.update_BatchInfo() gc.disable() for process in xrange(number_of_processes): found = None @@ -128,6 +130,9 @@ def process_jobs(self,InputData,Job): #the used function should soon return the pid of the job for killing and knowing if something failed def submit_jobs(self,OutputDirectory,nameOfCycle): for process in self.subInfo: + proceed = self.check_BatchInfo(process.numberOfFiles) + if not proceed: + return -1 process.startingTime = time.time() process.arrayPid = submit_qsub(process.numberOfFiles,self.outputstream+str(process.name),str(process.name),self.workdir) print 'Submitted jobs',process.name, 'pid', process.arrayPid @@ -154,6 +159,9 @@ def resubmit_jobs(self): exit(-1) ask = False if batchstatus != 1: + proceed = self.check_BatchInfo(1) + if not proceed: + return -1 process.pids[it-1] = resubmit(self.outputstream+process.name,process.name+'_'+str(it),self.workdir,self.header) #print 'Resubmitted job',process.name,it, 'pid', process.pids[it-1] self.printString.append('Resubmitted job '+process.name+' '+str(it)+' pid '+str(process.pids[it-1])) @@ -303,6 +311,35 @@ def get_subInfoFinish(self): return False return True + # update current number of jobs in condor_q + def update_BatchInfo(self): + try: + proc_queryHTCStatus = subprocess.Popen(['condor_status','--submitters'],stdout=subprocess.PIPE,stderr=subprocess.PIPE) + status = proc_queryHTCStatus.communicate()[0] + except Exception as e: + print 'Not able to fetch number of users jobs on batch with condor_status' + jobs = {'R':0,'I':0,'H':0} + for l in status.split('\n'): + if os.environ['USER'] in l and 'sched' in l: + jobs['R'] += int(l.split()[2]) + jobs['I'] += int(l.split()[3]) + jobs['H'] += int(l.split()[4]) + + self.batchJobs = jobs['R']+jobs['I']+jobs['H'] + + # check if user is allowed to submit attempted number of jobs + def check_BatchInfo(self, numberOfJobs): + if(self.header.BatchJobLimit>0): + self.update_BatchInfo() + freeSlots = self.header.BatchJobLimit - self.batchJobs + if (freeSlots - numberOfJobs) <= 0: + print 'You currently have %i jobs on HTCondor (of max. %i), and you are trying to submit more than %i additional jobs.'%(self.batchJobs,self.header.BatchJobLimit,freeSlots) + print 'Adjust the jobsplitting or try again, once you have fewer jobs on HTCondor.' + print 'Nothing will be (re-)submitted at this moment.' + return False + else: + return True + #class to take care of merging (maybe rethink design) class MergeManager(object): def __init__(self,add,force,wait,onlyhist=False): diff --git a/io_func.py b/io_func.py index a6440a8..bfd4157 100644 --- a/io_func.py +++ b/io_func.py @@ -132,6 +132,7 @@ def __init__(self,xmlfile): self.AutoResubmit =0 self.MaxJobsPerProcess = -1 self.RemoveEmptyFileSplit = False + self.BatchJobLimit = 5000 while ' Date: Mon, 14 Oct 2019 13:19:59 +0200 Subject: [PATCH 2/3] fixed return of check_BatchInfo --- Manager.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Manager.py b/Manager.py index 14ef970..6c4be73 100644 --- a/Manager.py +++ b/Manager.py @@ -337,8 +337,7 @@ def check_BatchInfo(self, numberOfJobs): print 'Adjust the jobsplitting or try again, once you have fewer jobs on HTCondor.' print 'Nothing will be (re-)submitted at this moment.' return False - else: - return True + return True #class to take care of merging (maybe rethink design) class MergeManager(object): From 559ae9913447317cfa239e73afe2b766bb5ad2d9 Mon Sep 17 00:00:00 2001 From: Steffen Date: Mon, 14 Oct 2019 14:59:28 +0200 Subject: [PATCH 3/3] fixed bug that occured, when you had jobs running and ran sframe_batch without submitting anything --- Manager.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Manager.py b/Manager.py index 6c4be73..1dfc0d3 100644 --- a/Manager.py +++ b/Manager.py @@ -39,8 +39,10 @@ def __init__(self,subInfo): print 'Going to wait for 5 minutes, lets see if condor_q will start to work again.' time.sleep(300) return + ListOfPids = [subInfo[k].arrayPid for k in range(len(subInfo))] + if(any(i==-1 for i in ListOfPids)): + self.parserWorked=False if self.parserWorked: - ListOfPids = [subInfo[k].arrayPid for k in range(len(subInfo))] # adding Pids of resubmitted jobs for process in subInfo: for pid in process.pids: