forked from PanDAWMS/pilot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFileState.py
266 lines (218 loc) · 10.5 KB
/
FileState.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
import os
import commands
from pUtil import tolog
class FileState:
"""
This class is used to set and update the current output file state dictionary.
When the job is running, the file fileState-<JobID>.pickle is created which contains
the current state of the output file dictionary
File state dictionary format:
{ file_name1 : state1, ..., file_nameN : stateN }
where file_name does not contain file path since it will change for holding jobs (from work dir to data dir), and
the state variables have the list form "file_state", "reg_state" (for output files) and "file_state", "transfer_mode" (input files).
"file_state" can assume the following values for "output" files:
"not_created" : initial value for all files at the beginning of the job
"created" : file was created and is waiting to be transferred
"not_transferred" : file has not been transferred
"transferred" : file has already been transferred (no further action)
"missing" : file was never created, the job failed (e.g. output file of a failed job; a log should never be missing)
"alt_transferred" : file was transferred to an alternative SE (T-1)
"file_state" can assume the following values for "input" files:
"not_transferred" : file has not been transferred (can remain in this state for FileStager and directIO modes)
"transferred" : file has already been transferred (no further action)
"reg_state" can assume the following values (relevant for output files):
"not_registered" : file was not registered in the LFC
"registered" : file was already registered in the LFC (no further action)
"transfer_mode" can assume the following values (relevant for input files)
"copy_to_scratch" : default file transfer mode
"remote_io" : direct access / remote IO tranfer mode
"file_stager" : file stager tranfer mode
"no_transfer" : input file has been skipped
E.g. a file with state = "created", "not_registered" should first be transferred and then registered in the LFC.
The file state dictionary should be created with "not_created" states as soon as the output files are known (pilot).
The "created" states should be set after the payload has run and if the file in question were actually created.
"transferred" should be set by the mover once the file in question has been transferred.
"registered" should be added to the file state once the file has been registered.
"copy_to_scratch" is to set for all input files by default. In case remote IO / FileStager instructions are found in copysetup[in]
the state will be changed to "remote_io" / "file_stager". Brokerage can also decide that remote IO is to be used. In that case,
"remote_io" will be set for the relevant input files (e.g. DBRelease and lib files are excluded, i.e. they will have "copy_to_scratch"
transfer mode).
"""
def __init__(self, workDir, jobId="0", mode="", ftype="output", fileName=""):
""" Default init """
self.fileStateDictionary = {} # file dictionary holding all objects
self.mode = mode # test mode
# use default filename unless specified by initiator
if fileName == "" and ftype != "": # assume output files
self.filename = os.path.join(workDir, "fileState-%s-%s.pickle" % (ftype, jobId))
else:
self.filename = os.path.join(workDir, fileName)
# add mode variable if needed (e.g. mode="test")
if self.mode != "":
self.filename = self.filename.replace(".pickle", "-%s.pickle" % (self.mode))
# load the dictionary from file if it exists
if os.path.exists(self.filename):
tolog("Using file state dictionary: %s" % (self.filename))
status = self.get()
else:
tolog("File does not exist: %s (will be created)" % (self.filename))
def get(self):
""" Read job state dictionary from file """
status = False
# De-serialize the file state file
try:
fp = open(self.filename, "r")
except:
tolog("FILESTATE FAILURE: get function could not open file: %s" % self.filename)
pass
else:
from pickle import load
try:
# load the dictionary from file
self.fileStateDictionary = load(fp)
except:
tolog("FILESTATE FAILURE: could not deserialize file: %s" % self.filename)
pass
else:
status = True
fp.close()
return status
def put(self):
"""
Create/Update the file state file
"""
status = False
# write pickle file
from pickle import dump
try:
fp = open(self.filename, "w")
except Exception, e:
tolog("FILESTATE FAILURE: Could not open file state file: %s, %s" % (self.filename, str(e)))
_cmd = "whoami; ls -lF %s" % (self.filename)
tolog("Executing command: %s" % (_cmd))
ec, rs = commands.getstatusoutput(_cmd)
tolog("%d, %s" % (ec, rs))
else:
try:
# write the dictionary to file
dump(self.fileStateDictionary, fp)
except Exception, e:
tolog("FILESTATE FAILURE: Could not pickle data to file state file: %s, %s" % (self.filename, str(e)))
else:
status = True
fp.close()
return status
def getNumberOfFiles(self):
""" Get the number of files from the file state dictionary """
return len(self.fileStateDictionary.keys())
def getStateList(self, filename):
""" Get the current state list for a file """
if self.fileStateDictionary.has_key(filename):
state_list = self.fileStateDictionary[filename]
else:
state_list = ["", ""]
return state_list
def updateStateList(self, filename, state_list):
""" Update the state list for a file """
status = False
try:
self.fileStateDictionary[filename] = state_list
except Exception, e:
tolog("FILESTATE FAILURE: could not update state list for file: %s, %s" % (filename, str(e)))
else:
status = True
return status
def getFileState(self, filename):
""" Return the current state of a given file """
# get current state list
return self.getStateList(filename)
def updateState(self, filename, mode="file_state", state="not_transferred"):
""" Update the file or registration state for a file """
status = False
tolog("updateState: filename=%s" % (filename))
tolog("updateState: mode=%s" % (mode))
tolog("updateState: state=%s" % (state))
# get current state list
state_list = self.getStateList(filename)
# update file state
try:
if mode == "file_state":
state_list[0] = state
elif mode == "reg_state" or mode == "transfer_mode":
state_list[1] = state
else:
tolog("FILESTATE FAILURE: unknown state: %s" % (mode))
except Exception, e:
tolog("FILESTATE FAILURE: %s" % str(e))
else:
# update state list
status = self.updateStateList(filename, state_list)
# update the file state file for every update (necessary since a failed put operation can abort everything)
if status:
status = self.put()
return status
def resetStates(self, file_list, ftype="output"):
""" Set all states in file list to not_created, not_registered """
# note: file state will be reset completely
tolog("Resetting file list: %s" % str(file_list))
# initialize file state dictionary
self.fileStateDictionary = {}
if ftype == "output":
for filename in file_list:
self.fileStateDictionary[filename] = ['not_created', 'not_registered']
else: # input
for filename in file_list:
self.fileStateDictionary[filename] = ['not_transferred', 'copy_to_scratch']
# write to file
status = self.put()
def hasOnlyCopyToScratch(self):
""" Check if there are only copy_to_scratch transfer modes in the file dictionary """
status = True
# loop over all input files and see if there is any non-copy_to_scratch transfer mode
for filename in self.fileStateDictionary.keys():
# get the file states
states = self.fileStateDictionary[filename]
tolog("filename=%s states=%s"%(filename, str(states)))
if states[1] != 'copy_to_scratch' and states[1] != 'no_transfer': # 'no_transfer' is set for DBRelease files
tolog("Job does not have only copy-to-scratch transfers")
status = False
break
return status
def getFilesOfState(self, state="transferred"):
""" Return a comma-separated list of files for a given transfer type """
file_names = []
# loop over all files
for filename in self.fileStateDictionary.keys():
# get the file states
states = self.fileStateDictionary[filename]
tolog("filename=%s states=%s"%(filename,str(states)))
if states[0] == state:
file_names.append(filename)
# Create the comma-separated list
filenames = ""
for f in file_names:
filenames += f + ","
if filenames.endswith(","):
filenames = filenames[:-1]
return filenames
def dumpFileStates(self, ftype="output"):
""" Print all the files and their states """
if ftype == "output":
tolog("File name / File state / Registration state")
else:
tolog("File name / File state / Transfer mode")
tolog("-"*100)
n = self.getNumberOfFiles()
i = 1
if n > 0:
sorted_keys = self.fileStateDictionary.keys()
sorted_keys.sort()
for filename in sorted_keys:
states = self.fileStateDictionary[filename]
if len(states) == 2:
tolog("%d. %s\t%s\t%s" % (i, filename, states[0], states[1]))
else:
tolog("%s\t-\t-" % (filename))
i += 1
else:
tolog("(No files)")