Skip to content

Commit

Permalink
Merge pull request #48 from cmsdaq/15X-candidate
Browse files Browse the repository at this point in the history
15 x candidate
  • Loading branch information
smorovic committed Sep 29, 2014
2 parents 75602df + 3ad3e7c commit 8746450
Show file tree
Hide file tree
Showing 27 changed files with 1,551 additions and 428 deletions.
17 changes: 17 additions & 0 deletions cgi/report_suspend_cgi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/usr/bin/env python2.6
import cgi
import os
form = cgi.FieldStorage()

boxesdir='appliance/boxes/'

print "Content-Type: text/html" # HTML is following
print
print "<TITLE>CGI script suspend</TITLE>"
if "host" not in form:
print "<H1>Error</H1>"
print "Please provide host name "
else:
os.unlink(boxesdir+str(form["host"].value))
print "<H1>file "+os.getcwd()+boxesdir+str(form["host"].value)+" deleted</H1>"

14 changes: 14 additions & 0 deletions cgi/suspend_cgi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env python2.6
import cgi
import os
form = cgi.FieldStorage()
print "Content-Type: text/html" # HTML is following
print
print "<TITLE>CGI script suspend</TITLE>"
try:
os.unlink('suspend')
except:
pass
fp = open('suspend','w+')
fp.close()

6 changes: 4 additions & 2 deletions etc/hltd.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[General]
enabled = False
exec_directory = /opt/hltd
user = daqlocal
watch_directory = /fff/data
Expand All @@ -10,11 +11,12 @@ mount_options_ramdisk = rw,noatime,vers=4,rsize=65536,wsize=65536,namlen=255,har
mount_options_output = rw,vers=4,rsize=65536,wsize=65536,namlen=255,hard,proto=tcp,timeo=600,retrans=2,sec=sys
micromerge_output = /fff/BU0/output
delete_run_dir = True
output_adler32 = True

[Monitoring]
use_elasticsearch = True
close_es_index = False
es_cmssw_log_level = WARNING
es_cmssw_log_level = DISABLED
es_hltd_log_level = ERROR

[Web]
Expand All @@ -23,7 +25,7 @@ soap2file_port = 8010

[Resources]
resource_base = /etc/appliance/resources
resource_use_fraction = 1.0
resource_use_fraction = 0.5

[DQM]
dqm_machine = False
Expand Down
Binary file removed lib/procname.so
Binary file not shown.
82 changes: 65 additions & 17 deletions python/aUtils.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import sys,traceback
import os
import os,stat
import time,datetime
import shutil
import json
import logging

import zlib

from inotifywrapper import InotifyWrapper
import _inotify as inotify


ES_DIR_NAME = "TEMP_ES_DIRECTORY"
UNKNOWN,JSD,STREAM,INDEX,FAST,SLOW,OUTPUT,STREAMERR,STREAMDQMHISTOUTPUT,INI,EOLS,EOR,COMPLETE,DAT,PDAT,PIDPB,PB,CRASH,MODULELEGEND,PATHLEGEND,BOX,BOLS = range(22) #file types
UNKNOWN,JSD,STREAM,INDEX,FAST,SLOW,OUTPUT,STREAMERR,STREAMDQMHISTOUTPUT,INI,EOLS,EOR,COMPLETE,DAT,PDAT,PIDPB,PB,CRASH,MODULELEGEND,PATHLEGEND,BOX,BOLS,HLTRATES,HLTRATESJSD = range(24) #file types
TO_ELASTICIZE = [STREAM,INDEX,OUTPUT,STREAMERR,STREAMDQMHISTOUTPUT,EOLS,EOR,COMPLETE]
TEMPEXT = ".recv"
ZEROLS = 'ls0000'
Expand Down Expand Up @@ -117,9 +117,11 @@ def getFiletype(self,filepath = None):
elif "CRASH" in name and "_PID" in name: return CRASH
elif "EOLS" in name: return EOLS
elif "EOR" in name: return EOR
if ext==".jsd" and name.startswith("HLTRATES"): return HLTRATESJSD
if ext==".jsn":
if STREAMDQMHISTNAME.upper() in name and "_PID" not in name: return STREAMDQMHISTOUTPUT
if "STREAM" in name and "_PID" not in name: return OUTPUT
if name.startswith("HLTRATES"): return HLTRATES
if ext==".pb":
if "_PID" not in name: return PB
else: return PIDPB
Expand All @@ -137,11 +139,12 @@ def getFileHeaders(self):
name,ext = self.name,self.ext
splitname = name.split("_")
if filetype in [STREAM,INI,PDAT,PIDPB,CRASH]: self.run,self.ls,self.stream,self.pid = splitname
elif filetype == SLOW: self.run,self.ls,self.pid = splitname
elif filetype == SLOW: self.run,self.ls,self.pid = splitname #this is wrong
elif filetype == FAST: self.run,self.pid = splitname
elif filetype in [DAT,PB,OUTPUT,STREAMERR,STREAMDQMHISTOUTPUT]: self.run,self.ls,self.stream,self.host = splitname
elif filetype == INDEX: self.run,self.ls,self.index,self.pid = splitname
elif filetype == EOLS: self.run,self.ls,self.eols = splitname
elif filetype == HLTRATES:ftype,self.run,self.ls,self.pid = splitname
else:
self.logger.warning("Bad filetype: %s" %self.filepath)
self.run,self.ls,self.stream = [None]*3
Expand Down Expand Up @@ -182,7 +185,7 @@ def getJsonData(self,filepath = None):

def setJsdfile(self,jsdfile):
self.jsdfile = jsdfile
if self.filetype in [OUTPUT,STREAMDQMHISTOUTPUT,CRASH,STREAMERR]: self.initData()
if self.filetype in [OUTPUT,STREAMDQMHISTOUTPUT,CRASH,STREAMERR,HLTRATES]: self.initData()

def initData(self):
defs = self.definitions
Expand Down Expand Up @@ -225,19 +228,20 @@ def getFieldByName(self,field):
self.logger.warning("bad field request %r in %r" %(field,self.definitions))
return False

def setFieldByName(self,field,value):
def setFieldByName(self,field,value,warning=True):
index,ftype = self.getFieldIndex(field)
data = self.data["data"]
if index > -1:
data[index] = value
return True
else:
self.logger.warning("bad field request %r in %r" %(field,self.definitions))
if warning==True:
self.logger.warning("bad field request %r in %r" %(field,self.definitions))
return False

#get definitions from jsd file
def getDefinitions(self):
if self.filetype == STREAM:
if self.filetype in [STREAM,HLTRATES]:
self.jsdfile = self.data["definition"]
elif not self.jsdfile:
self.logger.warning("jsd file not set")
Expand All @@ -259,30 +263,36 @@ def deleteFile(self):
return False
return True

def moveFile(self,newpath,copy = False):
if not self.exists(): return True
def moveFile(self,newpath,copy = False,adler32=False):
checksum=1
if not self.exists(): return True,checksum
oldpath = self.filepath
newdir = os.path.dirname(newpath)

if not os.path.exists(oldpath): return False
if not os.path.exists(oldpath):
self.logger.error("Source path does not exist: " + oldpath)
return False,checksum

self.logger.info("%s -> %s" %(oldpath,newpath))
retries = 5
newpath_tmp = newpath+TEMPEXT
while True:
try:
if not os.path.isdir(newdir): os.makedirs(newdir)
if copy: shutil.copy(oldpath,newpath_tmp)
else:
shutil.move(oldpath,newpath_tmp)

if adler32:checksum=self.moveFileAdler32(oldpath,newpath_tmp,copy)
else:
if copy: shutil.copy(oldpath,newpath_tmp)
else:
shutil.move(oldpath,newpath_tmp)
break

except (OSError,IOError),e:
self.logger.exception(e)
retries-=1
if retries == 0:
self.logger.error("Failure to move file "+str(oldpath)+" to "+str(newpath_tmp))
return False
return False,checksum
else:
time.sleep(0.5)
retries = 5
Expand All @@ -297,14 +307,50 @@ def moveFile(self,newpath,copy = False):
retries-=1
if retries == 0:
self.logger.error("Failure to rename the temporary file "+str(newpath_tmp)+" to "+str(newpath))
return False
return False,checksum
else:
time.sleep(0.5)

self.filepath = newpath
self.getFileInfo()
return True
return True,checksum

#move file (works only on src as file, not directory)
def moveFileAdler32(self,src,dst,copy):

if os.path.isdir(src):
raise Error("source `%s` is a directory")

if os.path.isdir(dst):
dst = os.path.join(dst, os.path.basename(src))

try:
if os.path.samefile(src, dst):
raise Error("`%s` and `%s` are the same file" % (src, dst))
except OSError:
pass

#initial adler32 value
adler32c=1
#calculate checksum on the fly
with open(src, 'rb') as fsrc:
with open(dst, 'wb') as fdst:

length=16*1024
while 1:
buf = fsrc.read(length)
if not buf:
break
adler32c=zlib.adler32(buf,adler32c)
fdst.write(buf)

#copy mode bits on the destionation file
st = os.stat(src)
mode = stat.S_IMODE(st.st_mode)
os.chmod(dst, mode)

if copy==False:os.unlink(src)
return adler32c

def exists(self):
return os.path.exists(self.filepath)
Expand Down Expand Up @@ -431,4 +477,6 @@ def action_cat(self,data1,data2):
elif data2: return str(data2)
else: return ""

def action_adler32(self,data1,data2):
return "-1"

27 changes: 24 additions & 3 deletions python/anelastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,13 +554,33 @@ def checkClosure(self):
for datfile in datfilelist:
if datfile.stream == stream:
newfilepath = os.path.join(self.outdir,datfile.run,datfile.basename)
datfile.moveFile(newfilepath)
(filestem,ext)=os.path.splitext(datfile.filepath)
checksum_file = filestem+'.checksum'
doChecksum=conf.output_adler32
checksum_cmssw=None
try:
with open(checksum_file,"r") as fi:
checksum_cmssw = int(fi.read())
except:
doChecksum=False
pass
(status,checksum)=datfile.moveFile(newfilepath,adler32=doChecksum)
if doChecksum and status:
if checksum_cmssw!=checksum&0xffffffff:
self.logger.fatal("checksum mismatch for "+ datfile.filepath + " expected:" + str(checksum_cmssw) + " got:" + str(checksum))
if checksum_cmssw!=None:
outfile.setFieldByName("FileAdler32",str(checksum_cmssw&0xffffffff))
outfile.writeout()
try:
os.unlink(filestem+'.checksum')
except:pass
self.datfileList.remove(datfile)

#move output file in rundir
#move output json file in rundir
newfilepath = os.path.join(self.outdir,outfile.run,outfile.basename)
outfile.esCopy()
if outfile.moveFile(newfilepath):
result,checksum=outfile.moveFile(newfilepath)
if result:
self.outfileList.remove(outfile)


Expand Down Expand Up @@ -591,6 +611,7 @@ def checkClosure(self):
numErr = errfile.getFieldByName("ErrorEvents") or 0
total = self.totalEvent
errfile.setFieldByName("Processed", str(total - numErr) )
errfile.setFieldByName("FileAdler32", "-1", warning=False)
errfile.writeout()
newfilepath = os.path.join(self.outdir,errfile.run,errfile.basename)
errfile.moveFile(newfilepath)
Expand Down
Loading

0 comments on commit 8746450

Please sign in to comment.