-
Notifications
You must be signed in to change notification settings - Fork 3
/
removeDupJobAccountant.py
153 lines (126 loc) · 5.33 KB
/
removeDupJobAccountant.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""
__removeDupJobAccountant.py__
It will get the last 1000 lines from JobAccountant/ComponentLog and parse it
looking for Report.*.pkl files.
Then it loads every one of those pickle files and create a list of all their
output files.
Next, it loads all LFNs from the wmbs_file_details table.
Finally, it does an intersection of the pickle output files against files
in the wmbs table. Printing out files that already exist in the database
and the pickle file names containing the same lfns.
Created on Apr 29, 2015.
@author: amaltaro
"""
from __future__ import print_function
import sys
import os
import subprocess
import threading
import logging
import json
from pprint import pformat
from WMCore.FwkJobReport.Report import Report
from WMCore.WMInit import connectToDB
from WMCore.Database.DBFormatter import DBFormatter
def main():
"""
_main_
"""
if 'WMAGENT_CONFIG' not in os.environ:
os.environ['WMAGENT_CONFIG'] = '/data/srv/wmagent/current/config/wmagent/config.py'
if 'manage' not in os.environ:
os.environ['manage'] = '/data/srv/wmagent/current/config/wmagent/manage'
### Fetch the report pickle files from the component log
command = ["tail", "-n1000", "/data/srv/wmagent/current/install/wmagentpy3/JobAccountant/ComponentLog"]
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8", text=True)
out, err = p.communicate()
logFiles = [line for line in out.splitlines() if 'install/wmagentpy3/JobCreator/JobCache' in line]
logFiles = [i.split()[2] for i in logFiles]
# now make these paths unique
logFiles = list(set(logFiles))
msg = "Found %d unique pickle files to parse " % len(logFiles)
### Now unpickle each of these files and get their output files
# also check whether any of them are duplicate
lfn2PklDict = {}
dupOutputPkl = {} # string value with the dup LFN and keyed by the pickle file path
jobReport = Report()
for pklPath in logFiles:
if not os.path.exists(pklPath):
continue
jobReport.load(pklPath)
for e in jobReport.getAllFiles():
lfn2PklDict.setdefault(e['lfn'], [])
lfn2PklDict[e['lfn']].append(pklPath)
# now check which files contain more than one pickle path (= created by diff jobs)
dupFiles = []
for lfn, pkls in lfn2PklDict.items():
if len(pkls) > 1:
dupFiles.append(lfn)
for pkl in pkls:
if pkl not in dupOutputPkl:
jobReport.load(pkl)
dupOutputPkl[pkl] = jobReport.__to_json__(None)
dupOutputPkl[pkl]['dup_lfns'] = []
dupOutputPkl[pkl]['dup_lfns'].append(lfn)
msg += "with a total of %d output files and %d duplicated" % (len(lfn2PklDict), len(dupFiles))
msg += " files to process among them."
msg += "\nDuplicate files are:\n%s" % dupFiles
print(msg)
if dupFiles:
print("See dupPickles.json for further details ...")
with open('dupPickles.json', 'w') as fo:
json.dump(dupOutputPkl, fo, indent=2)
if dupFiles:
print("Can we automatically delete those pickle files (Y/N)? ")
answer = input()
if answer.upper() == "Y":
# then delete all job report files but the first one - NOT ideal
for fname in dupFiles:
for pklFile in lfn2PklDict[fname][1:]:
if os.path.isfile(pklFile):
print("Deleting %s ..." % pklFile)
os.remove(pklFile)
else:
print(" File has probably been already deleted %s ..." % pklFile)
print(" Done!")
### Time to load all - this is BAD - LFNs from WMBS database
print("\nNow loading all LFNs from wmbs_file_details ...")
connectToDB()
myThread = threading.currentThread()
formatter = DBFormatter(logging, myThread.dbi)
output = myThread.transaction.processData("SELECT lfn FROM wmbs_file_details")
lfnsDB = formatter.format(output)
lfnsDB = [item[0] for item in lfnsDB]
print("Retrieved %d lfns from wmbs_file_details" % len(lfnsDB))
### Compare what are the duplicates
dupFiles = list(set(lfn2PklDict.keys()) & set(lfnsDB))
print("\nFound %d duplicate files." % len(dupFiles))
if len(dupFiles) == 0:
sys.exit(0)
### Print some basic data about these reports
print("Their overview is: ")
dbDupPkl = []
for fname in dupFiles:
for pklPath in lfn2PklDict[fname]:
jobInfo = {'lfn': fname}
jobInfo['pklPath'] = pklPath
try:
jobReport.load(pklPath)
except IOError:
# pkl file has been deleted already
continue
jobInfo['exitCode'] = jobReport.getExitCode()
jobInfo['taskSuccess'] = jobReport.taskSuccessful()
jobInfo['EOSLogURL'] = jobReport.getLogURL()
jobInfo['HostName'] = jobReport.getWorkerNodeInfo()['HostName']
jobInfo['Site'] = jobReport.getSiteName()
jobInfo['task'] = jobReport.getTaskName()
dbDupPkl.append(jobInfo)
print(pformat(dbDupPkl))
print("")
print("Remove these, restart the component and be happy:")
for job in dbDupPkl:
print(job['pklPath'])
sys.exit(0)
if __name__ == '__main__':
sys.exit(main())