Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added possibility to specify a maximum number of jobs on batch #4

Open
wants to merge 4 commits into
base: htcondor
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 61 additions & 25 deletions Manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import xml.sax


# takes care of looking into qstat
# takes care of looking into qstat
class pidWatcher(object):
def __init__(self,subInfo):
self.pidStates = {}
Expand Down Expand Up @@ -61,7 +61,7 @@ def __init__(self,subInfo):
self.pidStates.update({jobid:item["JobStatus"]})

def check_pidstatus(self, pid, debug=False):
if '.' not in str(pid): pid = pid + '.0'
if '.' not in str(pid): pid = pid + '.0'
if pid not in self.pidStates:
return -1
state = str(self.pidStates[pid])
Expand Down Expand Up @@ -98,20 +98,22 @@ def __init__(self,options,header,workdir):
self.merge = MergeManager(options.add,options.forceMerge,options.waitMerge,options.addNoTree)
self.subInfo = [] #information about the submission status
self.deadJobs = 0 #check if no file has been written to disk and nothing is on running on the batch
self.totalFiles = 0
self.totalFiles = 0
self.missingFiles = -1
self.move_cursor_up_cmd = None # pretty print status
self.stayAlive = 0 # loop counter to see if program is running
self.stayAlive = 0 # loop counter to see if program is running
self.numOfResubmit =0
self.watch = None
self.batchJobs = 0
self.printString = []
self.keepGoing = options.keepGoing
self.exitOnQuestion = options.exitOnQuestion
self.outputstream = self.workdir+'/Stream_'
#read xml file and do the magic
#read xml file and do the magic
def process_jobs(self,InputData,Job):
jsonhelper = HelpJSON(self.workdir+'/SubmissinInfoSave.p')
number_of_processes = len(InputData)
self.update_BatchInfo()
gc.disable()
for process in xrange(number_of_processes):
found = None
Expand All @@ -135,16 +137,19 @@ def process_jobs(self,InputData,Job):
#the used function should soon return the pid of the job for killing and knowing if something failed
def submit_jobs(self,OutputDirectory,nameOfCycle):
for process in self.subInfo:
proceed = self.check_BatchInfo(process.numberOfFiles)
if not proceed:
return -1
process.startingTime = time.time()
process.arrayPid = submit_qsub(process.numberOfFiles,self.outputstream+str(process.name),str(process.name),self.workdir)
print 'Submitted jobs',process.name, 'pid', process.arrayPid
process.reachedBatch = [False]*process.numberOfFiles
if process.status != 0:
process.status = 0
process.pids=[process.arrayPid+'.'+str(i) for i in range(process.numberOfFiles)]
# if any(process.pids):
# if any(process.pids):
# process.pids = ['']*process.numberOfFiles
#resubmit the jobs see above
#resubmit the jobs see above
def resubmit_jobs(self):
qstat_out = self.watch.parserWorked
ask = True
Expand All @@ -161,17 +166,20 @@ def resubmit_jobs(self):
exit(-1)
ask = False
if batchstatus != 1:
proceed = self.check_BatchInfo(1)
if not proceed:
return -1
process.pids[it-1] = resubmit(self.outputstream+process.name,process.name+'_'+str(it),self.workdir,self.header)
#print 'Resubmitted job',process.name,it, 'pid', process.pids[it-1]
self.printString.append('Resubmitted job '+process.name+' '+str(it)+' pid '+str(process.pids[it-1]))
if process.status != 0: process.status =0
process.reachedBatch[it-1] = False
#see how many jobs finished, were copied to workdir

#see how many jobs finished, were copied to workdir
def check_jobstatus(self, OutputDirectory, nameOfCycle,remove = False, autoresubmit = True):
missing = open(self.workdir+'/missing_files.txt','w+')
waitingFlag_autoresub = False
missingRootFiles = 0
missingRootFiles = 0
ListOfDict =[]
self.watch = pidWatcher(self.subInfo)
ask = True
Expand All @@ -181,14 +189,14 @@ def check_jobstatus(self, OutputDirectory, nameOfCycle,remove = False, autoresub
rootFiles =0
self.subInfo[i].missingFiles = []
for it in range(process.numberOfFiles):
if process.jobsDone[it]:
if process.jobsDone[it]:
rootFiles+=1
continue
#have a look at the pids with qstat
batchstatus = self.watch.check_pidstatus(process.pids[it])
#kill batchjobs with error otherwise update batchinfo
batchstatus = process.process_batchStatus(batchstatus,it)
#check if files have arrived
#check if files have arrived
filename = OutputDirectory+'/'+self.workdir+'/'+nameOfCycle+'.'+process.data_type+'.'+process.name+'_'+str(it)+'.root'
#if process.jobsRunning[it]:
#print filename, os.path.exists(filename), process.jobsRunning[it], process.jobsDone[it], process.arrayPid, process.pids[it]
Expand All @@ -200,12 +208,12 @@ def check_jobstatus(self, OutputDirectory, nameOfCycle,remove = False, autoresub
missingRootFiles +=1
else:
rootFiles+=1
#auto resubmit if job dies, take care that there was some job before and warn the user if more then 10% of jobs die
#auto resubmit if job dies, take care that there was some job before and warn the user if more then 10% of jobs die
#print process.name,'batch status',batchstatus, 'process.reachedBatch',process.reachedBatch, 'process status',process.status,'resubmit counter',process.resubmit[it], 'resubmit active',autoresubmit
if (
process.notFoundCounter[it] > 5 and
not process.jobsRunning[it] and
not process.jobsDone[it] and
not process.jobsDone[it] and
process.reachedBatch[it] and
(process.resubmit[it] ==-1 or process.resubmit[it]>0) and
(process.pids[it] or process.arrayPid) and
Expand All @@ -229,7 +237,7 @@ def check_jobstatus(self, OutputDirectory, nameOfCycle,remove = False, autoresub
self.printString.append('AutoResubmitted job '+process.name+' '+str(it)+' pid '+str(process.pids[it]))
#time.sleep(5)
process.reachedBatch[it] = False
if process.resubmit[it] > 0 :
if process.resubmit[it] > 0 :
process.resubmit[it] -= 1
self.numOfResubmit +=1
# final status updates
Expand All @@ -255,7 +263,7 @@ def check_jobstatus(self, OutputDirectory, nameOfCycle,remove = False, autoresub
missing.close()
except IOError as e:
print "I/O error({0}): {1}".format(e.errno, e.strerror)

self.missingFiles = missingRootFiles
#Save/update pids and other information to json file, such that it can be loaded and used later
try:
Expand All @@ -265,9 +273,9 @@ def check_jobstatus(self, OutputDirectory, nameOfCycle,remove = False, autoresub
except IOError as e:
print "I/O error({0}): {1}".format(e.errno, e.strerror)
if(waitingFlag_autoresub): time.sleep(5)
#print status of jobs


#print status of jobs
def print_status(self):
if not self.move_cursor_up_cmd:
self.move_cursor_up_cmd = '\x1b[1A\x1b[2K'*(len(self.subInfo) + 3)
Expand All @@ -276,14 +284,14 @@ def print_status(self):
else:
print self.move_cursor_up_cmd
#time.sleep(.1) # 'blink'

for item in self.printString:
print item
self.printString = []

stayAliveArray = ['|','/','-','\\']
if self.stayAlive < 3:
self.stayAlive +=1
self.stayAlive +=1
else:
self.stayAlive = 0

Expand All @@ -297,7 +305,7 @@ def print_status(self):
readyFiles += process.rootFileCounter
print 'Number of files: ',readyFiles,'/',self.totalFiles,'(%.3i)' % (100*(1-float(readyFiles)/float(self.totalFiles))),stayAliveArray[self.stayAlive],stayAliveArray[self.stayAlive]
print '='*80

#take care of merging
def merge_files(self,OutputDirectory,nameOfCycle,InputData):
self.merge.merge(OutputDirectory,nameOfCycle,self.subInfo,self.workdir,InputData,self.outputstream)
Expand All @@ -311,6 +319,34 @@ def get_subInfoFinish(self):
return False
return True

# update current number of jobs in condor_q
def update_BatchInfo(self):
try:
proc_queryHTCStatus = subprocess.Popen(['condor_status','--submitters'],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
status = proc_queryHTCStatus.communicate()[0]
except Exception as e:
print 'Not able to fetch number of users jobs on batch with condor_status'
jobs = {'R':0,'I':0,'H':0}
for l in status.split('\n'):
if os.environ['USER'] in l and 'sched' in l:
jobs['R'] += int(l.split()[2])
jobs['I'] += int(l.split()[3])
jobs['H'] += int(l.split()[4])

self.batchJobs = jobs['R']+jobs['I']+jobs['H']

# check if user is allowed to submit attempted number of jobs
def check_BatchInfo(self, numberOfJobs):
if(self.header.BatchJobLimit>0):
self.update_BatchInfo()
freeSlots = self.header.BatchJobLimit - self.batchJobs
if (freeSlots - numberOfJobs) <= 0:
print 'You currently have %i jobs on HTCondor (of max. %i), and you are trying to submit more than %i additional jobs.'%(self.batchJobs,self.header.BatchJobLimit,freeSlots)
print 'Adjust the jobsplitting or try again, once you have fewer jobs on HTCondor.'
print 'Nothing will be (re-)submitted at this moment.'
return False
return True

#class to take care of merging (maybe rethink design)
class MergeManager(object):
def __init__(self,add,force,wait,onlyhist=False):
Expand All @@ -327,8 +363,8 @@ def get_mergerStatus(self):
return False

def merge(self,OutputDirectory,nameOfCycle,info,workdir,InputData,outputdir):
if not self.add and not self.force and not self.onlyhist: return
#print "Don't worry your are using nice = 10"
if not self.add and not self.force and not self.onlyhist: return
#print "Don't worry your are using nice = 10"
OutputTreeName = ""
for inputObj in InputData:
for mylist in inputObj.io_list.other:
Expand All @@ -342,7 +378,7 @@ def merge(self,OutputDirectory,nameOfCycle,info,workdir,InputData,outputdir):
if (not os.path.exists(OutputDirectory+'/'+nameOfCycle+'.'+process.data_type+'.'+process.name+'.root') and all(process.jobsDone) and process.status !=2 ) or self.force:
self.active_process.append(add_histos(OutputDirectory,nameOfCycle+'.'+process.data_type+'.'+process.name,process.numberOfFiles,workdir,OutputTreeName,self.onlyhist,outputdir+process.name))
process.status = 2
#elif process.status !=2:
#elif process.status !=2:
# process.status = 3

def wait_till_finished(self):
Expand Down
7 changes: 3 additions & 4 deletions io_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def __init__(self,xmlfile):
self.AutoResubmit =0
self.MaxJobsPerProcess = -1
self.RemoveEmptyFileSplit = False
self.BatchJobLimit = 5000
while '<JobConfiguration' not in line:
self.header.append(line)
line = f.readline()
Expand All @@ -145,6 +146,8 @@ def __init__(self,xmlfile):
self.MaxJobsPerProcess = int(self.ConfigParse.attributes['MaxJobsPerProcess'].value)
if self.ConfigParse.hasAttribute('RemoveEmptyFileSplit'):
self.RemoveEmptyFileSplit = bool(self.ConfigParse.attributes['RemoveEmptyFileSplit'].value)
if self.ConfigParse.hasAttribute('BatchJobLimit'):
self.BatchJobLimit = int(self.ConfigParse.attributes['BatchJobLimit'].value)

if 'ConfigSGE' in line:
self.ConfigSGE = parseString(line).getElementsByTagName('ConfigSGE')[0]
Expand Down Expand Up @@ -298,7 +301,3 @@ def result_info(Job, path, header, other = []):
outfile.close()

return 1