Skip to content

Commit

Permalink
Merge branch 'hltd-161-candidate'
Browse files Browse the repository at this point in the history
  • Loading branch information
smorovic committed Feb 19, 2015
2 parents 28e3a55 + 1c73fbb commit 7e68482
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 16 deletions.
2 changes: 1 addition & 1 deletion python/aUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ def mergeAndMoveJsnDataMaybe(self,outDir, removeInput=True):
self.setFieldByName("FileAdler32","-1")
self.writeout()
jsndatFile = fileHandler(outfile)
jsndatFile.moveFile(os.path.join(outDir, os.path.basename(outfile)),adler32=False)
jsndatFile.moveFile(os.path.join(outDir, os.path.basename(outfile)),adler32=False,createDestinationDir=False)
except Exception as ex:
self.logger.error("Unable to copy jsonStream data file "+str(outfile)+" to output.")
self.logger.exception(ex)
Expand Down
53 changes: 46 additions & 7 deletions python/anelastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self,mr,tempdir,outdir,run_number):
self.maxQueuedLumi=0
self.maxReceivedEoLS=0
self.maxClosedLumi=0
self.iniReceived=False

def join(self, stop=False, timeout=None):
if stop: self.stop()
Expand Down Expand Up @@ -86,6 +87,21 @@ def run(self):
if endTimeout==0: break
endTimeout-=1

if self.iniReceived==False:
try:
os.stat(rawinputdir)
except OSError as ex:
try:
time.sleep(.5)
os.stat(rawinputdir)
except:
#no such file or directory
if ex.errno==2:
self.logger.error('Starting run with input directory missing. anelastic script will die.')
os._exit(1)
except:
pass

if self.checkClosure()==False:
self.logger.error('not all lumisections were closed on exit!')
try:
Expand Down Expand Up @@ -118,7 +134,9 @@ def process(self):
self.jsdfile=self.infile.filepath
elif filetype == COMPLETE:
self.processCompleteFile()
elif filetype == INI: self.processINIfile()
elif filetype == INI:
self.processINIfile()
self.iniReceived=True
elif not self.firstStream.isSet():
self.buffer.append(self.infile)
if filetype == EOLS:
Expand Down Expand Up @@ -219,7 +237,7 @@ def processINIfile(self):
self.activeStreams.append(stream)
self.streamCounters[stream]=0
self.infile.moveFile(newpath = localfilepath)
self.infile.moveFile(newpath = remotefilepath,copy = True)
self.infile.moveFile(newpath = remotefilepath,copy = True,createDestinationDir=False,missingDirAlert=False)
else:
self.logger.debug("compare %s , %s " %(localfilepath,filepath))
if not filecmp.cmp(localfilepath,filepath,False):
Expand All @@ -242,13 +260,14 @@ def processDefinitionFile(self):
return
except:
pass
self.infile.moveFile(newpath,copy = True,adler32=False,silent=True)
self.infile.moveFile(newpath,copy = True,adler32=False,silent=True,createDestinationDir=False,missingDirAlert=False)
#delete as we will use the one without pid
try:os.unlink(oldpath)
except:pass
else:
#name with pid: copy to output
self.infile.moveFile(os.path.join(outputDir,run,self.infile.basename),copy = True,adler32=False,silent=True)
self.infile.moveFile(os.path.join(outputDir,run,self.infile.basename),copy = True,adler32=False,
silent=True,createDestinationDir=False,missingDirAlert=False)

def createEOLSFile(self,ls):
eolname = os.path.join(self.tempdir,'run'+self.run_number.zfill(conf.run_number_padding)+"_"+ls+"_EoLS.jsn")
Expand Down Expand Up @@ -614,7 +633,7 @@ def checkClosure(self):
except:pass
doChecksum=False
else:
(status,checksum)=datfile.moveFile(newfilepath,adler32=doChecksum)
(status,checksum)=datfile.moveFile(newfilepath,adler32=doChecksum,createDestinationDir=False,missingDirAlert=True)
checksum_success=True
if doChecksum and status:
if checksum_cmssw!=checksum&0xffffffff:
Expand Down Expand Up @@ -652,7 +671,7 @@ def checkClosure(self):
if outfile.mergeAndMoveJsnDataMaybe(os.path.join(self.outdir,outfile.run))==False:return

outfile.esCopy()
result,checksum=outfile.moveFile(newfilepath)
result,checksum=outfile.moveFile(newfilepath,createDestinationDir=False)
if result:
self.outfileList.remove(outfile)

Expand Down Expand Up @@ -687,7 +706,7 @@ def checkClosure(self):
errfile.setFieldByName("FileAdler32", "-1", warning=False)
errfile.writeout()
newfilepath = os.path.join(self.outdir,errfile.run,errfile.basename)
errfile.moveFile(newfilepath)
errfile.moveFile(newfilepath,createDestinationDir=False)


#close lumisection if all streams are closed
Expand Down Expand Up @@ -875,6 +894,8 @@ def abortMerging(self):
watchDir = os.path.join(conf.watch_directory,dirname)
outputDir = sys.argv[4]

outputRunDir = os.path.join(outputDir,'run'+run_number.zfill(conf.run_number_padding))

dqmHandler = None

mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO # watched events
Expand All @@ -897,6 +918,24 @@ def abortMerging(self):
mr.register_inotify_path(watchDir,mask)
mr.start_inotify()

#ensuring that output run dir gets created (after inotify init)
attempts = 3
while attempts>0:
attempts-=1
try:
os.makedirs(outputRunDir)
logger.info("created "+outputRunDir)
break
except OSError as ex:
if ex.errno == 17: break
logging.exception(ex)
time.sleep(.5)
continue
except Exception as ex:
logging.exception(ex)
time.sleep(.5)
continue

#starting lsRanger thread
ls = LumiSectionRanger(mr,watchDir,outputDir,run_number)
ls.setSource(eventQueue)
Expand Down
17 changes: 12 additions & 5 deletions python/elasticbu.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,17 @@ def createDocMappingsMaybe(self,index_name,mapping):
inmapping = json.loads(res.content)
for indexname in inmapping:
properties = inmapping[indexname]['mappings'][key]['properties']

self.logger.info('checking mapping '+ indexname + '/' + key + ' which has '
+ str(len(mapping[key]['properties'])) + '(index:' + str(len(properties)) + ') entries..')

#should be size 1
for pdoc in mapping[key]['properties']:
if pdoc not in properties:
requests.post(self.ip_url+'/'+index_name+'/'+key+'/_mapping',json.dumps(doc))
break
else:
self.logger.warning('requests error code '+res.status_code+' in mapping request')

def read_line(self,fullpath):
with open(fullpath,'r') as fp:
Expand Down Expand Up @@ -299,11 +305,11 @@ def elasticize_eols(self,infile):
basename = infile.basename
self.logger.info(basename)
data = infile.data['data']
data.append(infile.mtime)
data.append(infile.ls[2:])
data.insert(0,infile.mtime)
data.insert(0,infile.ls[2:])

values = [int(f) if f.isdigit() else str(f) for f in data]
keys = ["NEvents","NFiles","TotalEvents","fm_date","ls"]
keys = ["ls","fm_date","NEvents","NFiles","TotalEvents","NLostEvents"]
document = dict(zip(keys, values))

document['id'] = infile.name+"_"+os.uname()[1]
Expand Down Expand Up @@ -617,8 +623,9 @@ def checkBoxes(self,dir):
break
runstring = l.split('=')
try:
runs = runstring[1].strip('\n ').split(',')
for run in runs:
runs = runstring[1].strip('\n').split(',')
for rrun in runs:
run = rrun.strip()
if run.isdigit()==False:continue
if int(run)==int(self.nr):
runFound=True
Expand Down
1 change: 1 addition & 0 deletions python/hltd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,7 @@ def __init__(self,nr,dirname,bu_dir,instance):
#polling for HLT menu directory
while os.path.exists(self.menu_directory)==False and conf.dqm_machine==False and conf.role=='fu':
readMenuAttempts+=1
time.sleep(.1)
#10 seconds allowed before defaulting to local configuration
if readMenuAttempts>50: break

Expand Down
3 changes: 2 additions & 1 deletion python/mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@
'ls' :{'type':'integer'},
'NEvents' :{'type':'integer'},
'NFiles' :{'type':'integer'},
'TotalEvents' :{'type':'integer'}
'TotalEvents' :{'type':'integer'},
'NLostEvents' :{'type':'integer'}
},
'_timestamp' : {
'enabled' : True,
Expand Down
Binary file removed rpm/hltd-1.6.1-2.x86_64.rpm
Binary file not shown.
Binary file added rpm/hltd-1.6.2-1.x86_64.rpm
Binary file not shown.
4 changes: 2 additions & 2 deletions scripts/hltdrpm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ cd $TOPDIR
# we are done here, write the specs and make the fu***** rpm
cat > hltd.spec <<EOF
Name: hltd
Version: 1.6.1
Release: 2
Version: 1.6.2
Release: 1
Summary: hlt daemon
License: gpl
Group: DAQ
Expand Down

0 comments on commit 7e68482

Please sign in to comment.