Skip to content

Commit

Permalink
Merge pull request #4753 from AndresTanasijczuk/cancelpj
Browse files Browse the repository at this point in the history
When cancelling ASO transfers because of postjob timeout, add retries to the cancellation trials, and if cancellation fails return fatal error.
  • Loading branch information
mmascher committed Apr 14, 2015
2 parents 7a97b27 + 3ec8efa commit 31f0690
Showing 1 changed file with 96 additions and 49 deletions.
145 changes: 96 additions & 49 deletions src/python/TaskWorker/Actions/PostJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def run(self):
## It is set by PostJob in the perform_transfers() function,
## when is_failure_permanent() is called to determine if a
## failure is permanent or not.
reasons, app, severity = doc['failure_reason'], 'aso', ''
reasons, app, severity = doc['failure_reason'], 'aso', None
msg += " Failure reasons follow:"
if app:
msg += "\n-----> %s log start -----" % str(app).upper()
Expand All @@ -284,36 +284,49 @@ def run(self):
msg += "\n<----- %s log finish ----" % str(app).upper()
self.logger.error(msg)
else:
reasons, app, severity = 'Failure reason unavailable.', '', ''
reasons, app, severity = "Failure reason unavailable.", None, None
self.logger.error(msg)
self.logger.warning("WARNING: no failure reason available.")
self.failures[doc_id] = {'reasons': reasons, 'app': app, 'severity': severity}
else:
exmsg = "Got an unknown status: %s" % (transfer_status)
exmsg = "Got an unknown transfer status: %s" % (transfer_status)
raise RuntimeError, exmsg
if all_transfers_finished:
msg = "All transfers finished. There were %s failed/killed transfers." % (len(failed_killed_transfers))
msg = "All transfers finished."
self.logger.info(msg)
if failed_killed_transfers:
msg += " (%s)" % ', '.join(failed_killed_transfers)
msg = "There were %d failed/killed transfers:" % (len(failed_killed_transfers))
msg += " %s" % (", ".join(failed_killed_transfers))
self.logger.info(msg)
self.logger.info("====== Finished to monitor ASO transfers.")
return 1
else:
self.logger.info(msg)
self.logger.info("====== Finished to monitor ASO transfers.")
return 0
self.logger.info("====== Finished to monitor ASO transfers.")
return 0
## If there is a timeout for transfers to complete, check if it was exceeded
## and if so kill the ongoing transfers. # timeout = -1 means no timeout.
if self.retry_timeout != -1 and time.time() - starttime > self.retry_timeout:
msg = "Killing ongoing ASO transfers after timeout of %d (seconds)." % (self.retry_timeout)
msg = "Post-job reached its timeout of %d seconds waiting for ASO transfers to complete." % (self.retry_timeout)
msg += " Will cancel ongoing ASO transfers."
self.logger.warning(msg)
reason = "Killed ASO transfer after timeout of %d (seconds)." % (self.retry_timeout)
self.logger.info("====== Starting to cancel ongoing ASO transfers.")
docs_to_cancel = {}
reason = "Cancelled ASO transfer after timeout of %d seconds." % (self.retry_timeout)
for doc_info in self.docs_in_transfer:
doc_id = doc_info['doc_id']
if doc_id not in done_transfers + failed_killed_transfers:
app, severity = '', ''
self.failures[doc_id] = {'reasons': reason, 'app': app, 'severity': severity}
self.cancel({doc_id: reason})
docs_to_cancel.update({doc_id: reason})
cancelled, not_cancelled = self.cancel(docs_to_cancel, max_retries = 2)
for doc_id in cancelled:
failed_killed_transfers.append(doc_id)
app, severity = None, None
self.failures[doc_id] = {'reasons': reason, 'app': app, 'severity': severity}
if not_cancelled:
msg = "Failed to cancel %d ASO transfers: %s" % (len(not_cancelled), ", ".join(not_cancelled))
self.logger.error(msg)
self.logger.info("====== Finished to cancel ongoing ASO transfers.")
self.logger.info("====== Finished to monitor ASO transfers.")
return 2
self.logger.info("====== Finished to cancel ongoing ASO transfers.")
self.logger.info("====== Finished to monitor ASO transfers.")
return 1

Expand Down Expand Up @@ -554,7 +567,7 @@ def inject_to_aso(self):
self.logger.info(msg)
commit_result_msg = self.couch_database.commitOne(doc)[0]
if 'error' in commit_result_msg:
msg = "Got error injecting document to ASO database:\n%s" % (commit_result_msg)
msg = "Error injecting document to ASO database:\n%s" % (commit_result_msg)
self.logger.info(msg)
return False
## Record all files for which we want the post-job to monitor their transfer.
Expand Down Expand Up @@ -694,42 +707,65 @@ def get_transfers_statuses_fallback(self):

##= = = = = ASOServerJob = = = = = = = = = = = = = = = = = = = = = = = = = = = =

def cancel(self, doc_ids_reasons = None):
def cancel(self, doc_ids_reasons = None, max_retries = 0):
"""
Method used to "cancel/kill" ASO transfers. The only thing that this
function does is to put the 'state' field of the corresponding documents in
ASO database to 'killed' (and the 'end_time' field to the current time).
the ASO database to 'killed' (and the 'end_time' field to the current time).
Killing actual FTS transfers (if possible) is left to ASO.
"""
now = str(datetime.datetime.now())
if not doc_ids_reasons:
if doc_ids_reasons is None:
for doc_info in self.docs_in_transfer:
doc_id = doc_info['doc_id']
doc_ids_reasons[doc_id] = ''
for doc_id, reason in doc_ids_reasons.iteritems():
msg = "Cancelling ASO transfer %s" % (doc_id)
if reason:
msg += " with following reason: %s" % (reason)
doc_ids_reasons[doc_id] = None
if not doc_ids_reasons:
msg = "There are no ASO transfers to cancel."
self.logger.info(msg)
doc = self.load_couch_document(doc_id)
if not doc:
msg = "Could not cancel ASO transfer %s" % (doc_id)
return [], []
now = str(datetime.datetime.now())
cancelled, not_cancelled = [], []
max_retries = max(int(max_retries), 0)
if max_retries:
msg = "In case of cancellation failure, will retry up to %d times." % (max_retries)
self.logger.info(msg)
for retry in range(max_retries + 1):
if retry > 0:
time.sleep(3*60)
msg = "This is cancellation retry number %d." % (retry)
self.logger.info(msg)
continue
doc['state'] = 'killed'
doc['end_time'] = now
if reason:
if doc['failure_reason']:
if type(doc['failure_reason']) == list:
doc['failure_reason'].append(reason)
elif type(doc['failure_reason']) == str:
doc['failure_reason'] = [doc['failure_reason'], reason]
for doc_id, reason in doc_ids_reasons.iteritems():
if doc_id in cancelled:
continue
msg = "Cancelling ASO transfer %s" % (doc_id)
if reason:
msg += " with following reason: %s" % (reason)
self.logger.info(msg)
doc = self.load_couch_document(doc_id)
if not doc:
msg = "Could not cancel ASO transfer %s; failed to load document." % (doc_id)
self.logger.warning(msg)
if retry == max_retries:
not_cancelled.append((doc_id, msg))
continue
doc['state'] = 'killed'
doc['end_time'] = now
if reason:
if doc['failure_reason']:
if type(doc['failure_reason']) == list:
doc['failure_reason'].append(reason)
elif type(doc['failure_reason']) == str:
doc['failure_reason'] = [doc['failure_reason'], reason]
else:
doc['failure_reason'] = reason
res = self.couch_database.commitOne(doc)
if 'error' in res:
msg = "Error cancelling ASO transfer %s: %s" % (doc_id, res)
self.logger.warning(msg)
if retry == max_retries:
not_cancelled.append((doc_id, msg))
else:
doc['failure_reason'] = reason
res = self.couch_database.commitOne(doc)
if 'error' in res:
exmsg = "Got error killing ASO transfer %s: %s" % (doc_id, res)
raise RuntimeError, exmsg
cancelled.append(doc_id)
return cancelled, not_cancelled

##= = = = = ASOServerJob = = = = = = = = = = = = = = = = = = = = = = = = = = = =

Expand Down Expand Up @@ -1159,10 +1195,23 @@ def perform_transfers(self):
self.job_ad, self.crab_retry, \
self.retry_timeout, self.job_failed, \
self.transfer_logs, self.transfer_outputs)
aso_job_result = ASO_JOB.run()
aso_job_retval = ASO_JOB.run()

## If no transfers failed, return success immediately.
if not aso_job_result:
return aso_job_result
if aso_job_retval == 0:
ASO_JOB = None
return 0

## Return code 2 means post-job timed out waiting for transfer to complete and
## not all the transfers could be cancelled.
if aso_job_retval == 2:
ASO_JOB = None
msg = "Stageout failed with code %d." % (aso_job_retval)
msg += "\nPost-job timed out waiting for ASO transfers to complete."
msg += "\nAttempts were made to cancel the ongoing transfers,"
msg += " but cancellation failed for some transfers."
msg += "\nConsidering cancellation failures as a permament stageout error."
raise PermanentStageoutError(msg)

## Retrieve the stageout failures (a dictionary where the keys are the IDs of
## the ASO documents for which the stageout job failed and the values are the
Expand All @@ -1183,9 +1232,8 @@ def perform_transfers(self):
else:
failures[doc_id]['severity'] = 'recoverable'
## Message for stageout error exception.
msg = "Stageout failed with code %d."
msg += "\nThere were %d failed/killed stageout jobs."
msg = msg % (aso_job_result, num_failures)
msg = "Stageout failed with code %d." % (aso_job_retval)
msg += "\nThere were %d failed/killed stageout jobs." % (num_failures)
if num_permanent_failures:
msg += " %d of those jobs had a permanent failure." % (num_permanent_failures)
msg += "\nFailure reasons (per document) follow:"
Expand All @@ -1198,7 +1246,6 @@ def perform_transfers(self):
msg += "\n <----- %s log finish ----" % (str(failures[doc_id]['app']).upper())
if failures[doc_id]['severity']:
msg += "\n The last failure reason is %s." % (str(failures[doc_id]['severity']).lower())

## Raise stageout exception.
if num_permanent_failures:
raise PermanentStageoutError(msg)
Expand Down Expand Up @@ -1758,7 +1805,7 @@ def is_failure_permanent(self, reason):
self.logger.debug(msg)
return True
permanent_failure_reasons = [
".*killed aso transfer after timeout.*",
".*cancelled aso transfer after timeout.*",
".*permission denied.*",
".*disk quota exceeded.*",
".*operation not permitted*",
Expand Down

0 comments on commit 31f0690

Please sign in to comment.