diff --git a/Manager.py b/Manager.py index e35bf94..5462651 100644 --- a/Manager.py +++ b/Manager.py @@ -19,7 +19,7 @@ import xml.sax -# takes care of looking into qstat +# takes care of looking into qstat class pidWatcher(object): def __init__(self,subInfo): self.pidStates = {} @@ -61,7 +61,7 @@ def __init__(self,subInfo): self.pidStates.update({jobid:item["JobStatus"]}) def check_pidstatus(self, pid, debug=False): - if '.' not in str(pid): pid = pid + '.0' + if '.' not in str(pid): pid = pid + '.0' if pid not in self.pidStates: return -1 state = str(self.pidStates[pid]) @@ -98,20 +98,22 @@ def __init__(self,options,header,workdir): self.merge = MergeManager(options.add,options.forceMerge,options.waitMerge,options.addNoTree) self.subInfo = [] #information about the submission status self.deadJobs = 0 #check if no file has been written to disk and nothing is on running on the batch - self.totalFiles = 0 + self.totalFiles = 0 self.missingFiles = -1 self.move_cursor_up_cmd = None # pretty print status - self.stayAlive = 0 # loop counter to see if program is running + self.stayAlive = 0 # loop counter to see if program is running self.numOfResubmit =0 self.watch = None + self.batchJobs = 0 self.printString = [] self.keepGoing = options.keepGoing self.exitOnQuestion = options.exitOnQuestion self.outputstream = self.workdir+'/Stream_' - #read xml file and do the magic + #read xml file and do the magic def process_jobs(self,InputData,Job): jsonhelper = HelpJSON(self.workdir+'/SubmissinInfoSave.p') number_of_processes = len(InputData) + self.update_BatchInfo() gc.disable() for process in xrange(number_of_processes): found = None @@ -135,6 +137,9 @@ def process_jobs(self,InputData,Job): #the used function should soon return the pid of the job for killing and knowing if something failed def submit_jobs(self,OutputDirectory,nameOfCycle): for process in self.subInfo: + proceed = self.check_BatchInfo(process.numberOfFiles) + if not proceed: + return -1 process.startingTime = time.time() process.arrayPid = submit_qsub(process.numberOfFiles,self.outputstream+str(process.name),str(process.name),self.workdir) print 'Submitted jobs',process.name, 'pid', process.arrayPid @@ -142,9 +147,9 @@ def submit_jobs(self,OutputDirectory,nameOfCycle): if process.status != 0: process.status = 0 process.pids=[process.arrayPid+'.'+str(i) for i in range(process.numberOfFiles)] - # if any(process.pids): + # if any(process.pids): # process.pids = ['']*process.numberOfFiles - #resubmit the jobs see above + #resubmit the jobs see above def resubmit_jobs(self): qstat_out = self.watch.parserWorked ask = True @@ -161,17 +166,20 @@ def resubmit_jobs(self): exit(-1) ask = False if batchstatus != 1: + proceed = self.check_BatchInfo(1) + if not proceed: + return -1 process.pids[it-1] = resubmit(self.outputstream+process.name,process.name+'_'+str(it),self.workdir,self.header) #print 'Resubmitted job',process.name,it, 'pid', process.pids[it-1] self.printString.append('Resubmitted job '+process.name+' '+str(it)+' pid '+str(process.pids[it-1])) if process.status != 0: process.status =0 process.reachedBatch[it-1] = False - - #see how many jobs finished, were copied to workdir + + #see how many jobs finished, were copied to workdir def check_jobstatus(self, OutputDirectory, nameOfCycle,remove = False, autoresubmit = True): missing = open(self.workdir+'/missing_files.txt','w+') waitingFlag_autoresub = False - missingRootFiles = 0 + missingRootFiles = 0 ListOfDict =[] self.watch = pidWatcher(self.subInfo) ask = True @@ -181,14 +189,14 @@ def check_jobstatus(self, OutputDirectory, nameOfCycle,remove = False, autoresub rootFiles =0 self.subInfo[i].missingFiles = [] for it in range(process.numberOfFiles): - if process.jobsDone[it]: + if process.jobsDone[it]: rootFiles+=1 continue #have a look at the pids with qstat batchstatus = self.watch.check_pidstatus(process.pids[it]) #kill batchjobs with error otherwise update batchinfo batchstatus = process.process_batchStatus(batchstatus,it) - #check if files have arrived + #check if files have arrived filename = OutputDirectory+'/'+self.workdir+'/'+nameOfCycle+'.'+process.data_type+'.'+process.name+'_'+str(it)+'.root' #if process.jobsRunning[it]: #print filename, os.path.exists(filename), process.jobsRunning[it], process.jobsDone[it], process.arrayPid, process.pids[it] @@ -200,12 +208,12 @@ def check_jobstatus(self, OutputDirectory, nameOfCycle,remove = False, autoresub missingRootFiles +=1 else: rootFiles+=1 - #auto resubmit if job dies, take care that there was some job before and warn the user if more then 10% of jobs die + #auto resubmit if job dies, take care that there was some job before and warn the user if more then 10% of jobs die #print process.name,'batch status',batchstatus, 'process.reachedBatch',process.reachedBatch, 'process status',process.status,'resubmit counter',process.resubmit[it], 'resubmit active',autoresubmit if ( process.notFoundCounter[it] > 5 and not process.jobsRunning[it] and - not process.jobsDone[it] and + not process.jobsDone[it] and process.reachedBatch[it] and (process.resubmit[it] ==-1 or process.resubmit[it]>0) and (process.pids[it] or process.arrayPid) and @@ -229,7 +237,7 @@ def check_jobstatus(self, OutputDirectory, nameOfCycle,remove = False, autoresub self.printString.append('AutoResubmitted job '+process.name+' '+str(it)+' pid '+str(process.pids[it])) #time.sleep(5) process.reachedBatch[it] = False - if process.resubmit[it] > 0 : + if process.resubmit[it] > 0 : process.resubmit[it] -= 1 self.numOfResubmit +=1 # final status updates @@ -255,7 +263,7 @@ def check_jobstatus(self, OutputDirectory, nameOfCycle,remove = False, autoresub missing.close() except IOError as e: print "I/O error({0}): {1}".format(e.errno, e.strerror) - + self.missingFiles = missingRootFiles #Save/update pids and other information to json file, such that it can be loaded and used later try: @@ -265,9 +273,9 @@ def check_jobstatus(self, OutputDirectory, nameOfCycle,remove = False, autoresub except IOError as e: print "I/O error({0}): {1}".format(e.errno, e.strerror) if(waitingFlag_autoresub): time.sleep(5) - - - #print status of jobs + + + #print status of jobs def print_status(self): if not self.move_cursor_up_cmd: self.move_cursor_up_cmd = '\x1b[1A\x1b[2K'*(len(self.subInfo) + 3) @@ -276,14 +284,14 @@ def print_status(self): else: print self.move_cursor_up_cmd #time.sleep(.1) # 'blink' - + for item in self.printString: print item self.printString = [] stayAliveArray = ['|','/','-','\\'] if self.stayAlive < 3: - self.stayAlive +=1 + self.stayAlive +=1 else: self.stayAlive = 0 @@ -297,7 +305,7 @@ def print_status(self): readyFiles += process.rootFileCounter print 'Number of files: ',readyFiles,'/',self.totalFiles,'(%.3i)' % (100*(1-float(readyFiles)/float(self.totalFiles))),stayAliveArray[self.stayAlive],stayAliveArray[self.stayAlive] print '='*80 - + #take care of merging def merge_files(self,OutputDirectory,nameOfCycle,InputData): self.merge.merge(OutputDirectory,nameOfCycle,self.subInfo,self.workdir,InputData,self.outputstream) @@ -311,6 +319,34 @@ def get_subInfoFinish(self): return False return True + # update current number of jobs in condor_q + def update_BatchInfo(self): + try: + proc_queryHTCStatus = subprocess.Popen(['condor_status','--submitters'],stdout=subprocess.PIPE,stderr=subprocess.PIPE) + status = proc_queryHTCStatus.communicate()[0] + except Exception as e: + print 'Not able to fetch number of users jobs on batch with condor_status' + jobs = {'R':0,'I':0,'H':0} + for l in status.split('\n'): + if os.environ['USER'] in l and 'sched' in l: + jobs['R'] += int(l.split()[2]) + jobs['I'] += int(l.split()[3]) + jobs['H'] += int(l.split()[4]) + + self.batchJobs = jobs['R']+jobs['I']+jobs['H'] + + # check if user is allowed to submit attempted number of jobs + def check_BatchInfo(self, numberOfJobs): + if(self.header.BatchJobLimit>0): + self.update_BatchInfo() + freeSlots = self.header.BatchJobLimit - self.batchJobs + if (freeSlots - numberOfJobs) <= 0: + print 'You currently have %i jobs on HTCondor (of max. %i), and you are trying to submit more than %i additional jobs.'%(self.batchJobs,self.header.BatchJobLimit,freeSlots) + print 'Adjust the jobsplitting or try again, once you have fewer jobs on HTCondor.' + print 'Nothing will be (re-)submitted at this moment.' + return False + return True + #class to take care of merging (maybe rethink design) class MergeManager(object): def __init__(self,add,force,wait,onlyhist=False): @@ -327,8 +363,8 @@ def get_mergerStatus(self): return False def merge(self,OutputDirectory,nameOfCycle,info,workdir,InputData,outputdir): - if not self.add and not self.force and not self.onlyhist: return - #print "Don't worry your are using nice = 10" + if not self.add and not self.force and not self.onlyhist: return + #print "Don't worry your are using nice = 10" OutputTreeName = "" for inputObj in InputData: for mylist in inputObj.io_list.other: @@ -342,7 +378,7 @@ def merge(self,OutputDirectory,nameOfCycle,info,workdir,InputData,outputdir): if (not os.path.exists(OutputDirectory+'/'+nameOfCycle+'.'+process.data_type+'.'+process.name+'.root') and all(process.jobsDone) and process.status !=2 ) or self.force: self.active_process.append(add_histos(OutputDirectory,nameOfCycle+'.'+process.data_type+'.'+process.name,process.numberOfFiles,workdir,OutputTreeName,self.onlyhist,outputdir+process.name)) process.status = 2 - #elif process.status !=2: + #elif process.status !=2: # process.status = 3 def wait_till_finished(self): diff --git a/io_func.py b/io_func.py index a6440a8..bfd4157 100644 --- a/io_func.py +++ b/io_func.py @@ -132,6 +132,7 @@ def __init__(self,xmlfile): self.AutoResubmit =0 self.MaxJobsPerProcess = -1 self.RemoveEmptyFileSplit = False + self.BatchJobLimit = 5000 while '