This repository has been archived by the owner on Jan 30, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 20
/
S3SiteMover.py
174 lines (147 loc) · 7.28 KB
/
S3SiteMover.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import os, re, sys
import commands
from time import time
import urlparse
from TimerCommand import TimerCommand
import SiteMover
from futil import *
from PilotErrors import PilotErrors
from pUtil import tolog, readpar, verifySetupCommand, getSiteInformation, extractFilePaths
from FileStateClient import updateFileState
from SiteInformation import SiteInformation
from configSiteMover import config_sm
from S3ObjectstoreSiteMover import S3ObjectstoreSiteMover
CMD_CHECKSUM = config_sm.COMMAND_MD5
class S3SiteMover(S3ObjectstoreSiteMover):
""" SiteMover that uses boto S3 client for both get and put """
# no registration is done
copyCommand = "S3"
checksum_command = "adler32"
timeout = 600
def get_data(self, gpfn, lfn, path, fsize=0, fchecksum=0, guid=0, **pdict):
""" copy input file from SE to local dir """
error = PilotErrors()
# Get input parameters from pdict
jobId = pdict.get('jobId', '')
workDir = pdict.get('workDir', '')
experiment = pdict.get('experiment', '')
proxycheck = pdict.get('proxycheck', False)
os_bucket_id = pdict.get('os_bucket_id', -1)
# try to get the direct reading control variable (False for direct reading mode; file should not be copied)
useCT = pdict.get('usect', True)
prodDBlockToken = pdict.get('access', '')
# get the Rucio tracing report
report = self.getStubTracingReport(pdict['report'], 'gfal-copy', lfn, guid)
if path == '': path = './'
fullname = os.path.join(path, lfn)
# get the site information object
si = getSiteInformation(experiment)
ret_path = si.getCopyPrefixPathNew(gpfn, stageIn=True)
if not ret_path.startswith("s3:"):
errorLog = "Failed to use copyprefix to convert the current path to S3 path."
tolog("!!WARNING!!1777!! %s" % (errorLog))
status = PilotErrors.ERR_STAGEINFAILED
state = "PSTAGE_FAIL"
output = errorLog
else:
gpfn = ret_path
status, output = self.stageIn(gpfn, fullname, fsize, fchecksum, experiment, os_bucket_id=os_bucket_id)
if status == 0:
updateFileState(lfn, workDir, jobId, mode="file_state", state="transferred", ftype="input")
state = "DONE"
else:
errors = PilotErrors()
state = errors.getErrorName(status)
if state == None:
state = "PSTAGE_FAIL"
self.prepareReport(state, report)
return status, output
def put_data(self, source, destination, fsize=0, fchecksum=0, **pdict):
""" copy output file from disk to local SE """
# function is based on dCacheSiteMover put function
error = PilotErrors()
pilotErrorDiag = ""
# Get input parameters from pdict
alt = pdict.get('alt', False)
lfn = pdict.get('lfn', '')
guid = pdict.get('guid', '')
token = pdict.get('token', '')
scope = pdict.get('scope', '')
dsname = pdict.get('dsname', '')
analysisJob = pdict.get('analJob', False)
testLevel = pdict.get('testLevel', '0')
extradirs = pdict.get('extradirs', '')
experiment = pdict.get('experiment', '')
proxycheck = pdict.get('proxycheck', False)
os_bucket_id = pdict.get('os_bucket_id', -1)
prodSourceLabel = pdict.get('prodSourceLabel', '')
# get the site information object
si = getSiteInformation(experiment)
tolog("put_data received prodSourceLabel=%s" % (prodSourceLabel))
if prodSourceLabel == 'ddm' and analysisJob:
tolog("Treating PanDA Mover job as a production job during stage-out")
analysisJob = False
# get the Rucio tracing report
report = self.getStubTracingReport(pdict['report'], 'gfal-copy', lfn, guid)
filename = os.path.basename(source)
# get all the proper paths
ec, pilotErrorDiag, tracer_error, dst_gpfn, lfcdir, surl = si.getProperPaths(error, analysisJob, token, prodSourceLabel, dsname, filename, scope=scope, alt=alt, sitemover=self) # quick workaround
if ec != 0:
self.prepareReport(tracer_error, report)
return self.put_data_retfail(ec, pilotErrorDiag)
# get local adler32 checksum
status, output, adler_size, adler_checksum = self.getLocalFileInfo(source, checksumType="adler32")
if status != 0:
errorLog = 'Failed to get local file %s adler32 checksum: %s' % (source, output)
tolog("!!WARNING!!1777!! %s" % (errorLog))
status = PilotErrors.ERR_STAGEINFAILED
state = "PSTAGE_FAIL"
output = errorLog
self.prepareReport(state, report)
return self.put_data_retfail(status, output, surl)
ret_path = si.getCopyPrefixPathNew(surl, stageIn=False)
tolog("Convert destination: %s to new path: %s" % (surl, ret_path))
if not ret_path.startswith("s3:"):
errorLog = "Failed to use copyprefix to convert the current path to S3 path."
tolog("!!WARNING!!1777!! %s" % (errorLog))
status = PilotErrors.ERR_STAGEINFAILED
state = "PSTAGE_FAIL"
output = errorLog
size = None
checksum = None
else:
status, output, size, checksum = self.stageOut(source, ret_path, token, experiment, os_bucket_id=os_bucket_id)
if status !=0:
errors = PilotErrors()
state = errors.getErrorName(status)
if state == None:
state = "PSTAGE_FAIL"
self.prepareReport(state, report)
return self.put_data_retfail(status, output, surl)
else:
if size == adler_size:
tolog("The file size is not changed. Will check whether adler32 changed.")
status, output, new_adler_size, new_adler_checksum = self.getLocalFileInfo(source, checksumType="adler32")
if status != 0:
errorLog = 'Failed to get local file %s adler32 checksum: %s' % (source, output)
tolog("!!WARNING!!1777!! %s" % (errorLog))
status = PilotErrors.ERR_STAGEINFAILED
state = "PSTAGE_FAIL"
output = errorLog
self.prepareReport(state, report)
return self.put_data_retfail(status, output, surl)
else:
if adler_checksum == new_adler_checksum:
tolog("The file checksum is not changed. Will use adler32 %s to replace the md5 checksum %s" % (adler_checksum, checksum))
checksum = adler_checksum
else:
errorLog = "The file checksum changed from %s(before transfer) to %s(after transfer)" % (adler_checksum, new_adler_checksum)
tolog("!!WARNING!!1777!! %s" % (errorLog))
status = PilotErrors.ERR_STAGEINFAILED
state = "PSTAGE_FAIL"
output = errorLog
self.prepareReport(state, report)
return self.put_data_retfail(status, output, surl)
state = "DONE"
self.prepareReport(state, report)
return 0, pilotErrorDiag, surl, size, checksum, self.arch_type