Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Corrections #3

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions pandaserver/dataservice/adder_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,16 +336,16 @@ def handle_failed_job(self) -> None:

if source and error_code:
try:
self.logger.debug("AdderGen.run will call apply_retrial_rules")
retryModule.apply_retrial_rules(
self.logger.debug("AdderGen.run will call processing_job_failure")
retryModule.processing_job_failure(
self.taskBuffer,
self.job.PandaID,
errors,
self.job.attemptNr,
)
self.logger.debug("apply_retrial_rules is back")
self.logger.debug("processing_job_failure is back")
except Exception as e:
self.logger.error(f"apply_retrial_rules excepted and needs to be investigated ({e}): {traceback.format_exc()}")
self.logger.error(f"processing_job_failure excepted and needs to be investigated ({e}): {traceback.format_exc()}")

self.job.jobStatus = "failed"
for file in self.job.Files:
Expand Down Expand Up @@ -488,18 +488,18 @@ def check_job_status(self) -> None:
"error_diag": error_diag,
}
]
self.logger.debug("AdderGen.run 2 will call apply_retrial_rules")
retryModule.apply_retrial_rules(
self.logger.debug("AdderGen.run 2 will call processing_job_failure")
retryModule.processing_job_failure(
self.taskBuffer,
job_tmp.PandaID,
errors,
job_tmp.attemptNr,
)
self.logger.debug("apply_retrial_rules 2 is back")
self.logger.debug("processing_job_failure 2 is back")
except IndexError:
pass
except Exception as e:
self.logger.error(f"apply_retrial_rules 2 excepted and needs to be investigated ({e}): {traceback.format_exc()}")
self.logger.error(f"processing_job_failure 2 excepted and needs to be investigated ({e}): {traceback.format_exc()}")

self.setup_closer()

Expand Down
16 changes: 8 additions & 8 deletions pandaserver/jobdispatcher/Watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ def run(self):
]

try:
self.logger.debug("Watcher will call apply_retrial_rules")
retryModule.apply_retrial_rules(self.taskBuffer, job.PandaID, errors, job.attemptNr)
self.logger.debug("apply_retrial_rules is back")
self.logger.debug("Watcher will call processing_job_failure")
retryModule.processing_job_failure(self.taskBuffer, job.PandaID, errors, job.attemptNr)
self.logger.debug("processing_job_failure is back")
except Exception as e:
self.logger.debug(f"apply_retrial_rules excepted and needs to be investigated ({e}): {traceback.format_exc()}")
self.logger.debug(f"processing_job_failure excepted and needs to be investigated ({e}): {traceback.format_exc()}")

# updateJobs was successful and it failed a job with taskBufferErrorCode
try:
Expand All @@ -155,20 +155,20 @@ def run(self):
source = "taskBufferErrorCode"
error_code = job_tmp.taskBufferErrorCode
error_diag = job_tmp.taskBufferErrorDiag
self.logger.debug("Watcher.run 2 will call apply_retrial_rules")
retryModule.apply_retrial_rules(
self.logger.debug("Watcher.run 2 will call processing_job_failure")
retryModule.processing_job_failure(
self.taskBuffer,
job_tmp.PandaID,
source,
error_code,
error_diag,
job_tmp.attemptNr,
)
self.logger.debug("apply_retrial_rules 2 is back")
self.logger.debug("processing_job_failure 2 is back")
except IndexError:
pass
except Exception as e:
self.logger.error(f"apply_retrial_rules 2 excepted and needs to be investigated ({e}): {traceback.format_exc()}")
self.logger.error(f"processing_job_failure 2 excepted and needs to be investigated ({e}): {traceback.format_exc()}")

cThr = Closer(self.taskBuffer, destDBList, job)
cThr.run()
Expand Down
65 changes: 65 additions & 0 deletions pandaserver/taskbuffer/OraDBProxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14415,6 +14415,71 @@ def setMaxAttempt(self, jobID, taskID, files, maxAttempt):
tmpLog.debug("done")
return True

def increase_max_attempt(self, job_id, task_id, files):
"""Increase the max attempt number by one for specific files."""
comment = " /* DBProxy.increase_max_attempt */"
method_name = comment.split(" ")[-2].split(".")[-1]
tmp_log = LogWrapper(_logger, method_name)
tmp_log.debug("start")

# Update the file entries to increase the max attempt number by one
input_types = ("input", "pseudo_input", "pp_input", "trn_log", "trn_output")
input_files = [
pandafile for pandafile in files
if pandafile.type in input_types and re.search("DBRelease", pandafile.lfn) is None
]
input_file_ids = [input_file.fileID for input_file in input_files]
input_dataset_ids = [input_file.datasetID for input_file in input_files]

if input_file_ids:
try:
# Start transaction
self.conn.begin()

var_map = {
":taskID": task_id,
":pandaID": job_id,
}

# Bind the files
file_bindings = []
for index, file_id in enumerate(input_file_ids):
var_map[f":file{index}"] = file_id
file_bindings.append(f":file{index}")
file_bindings_str = ",".join(file_bindings)

# Bind the datasets
dataset_bindings = []
for index, dataset_id in enumerate(input_dataset_ids):
var_map[f":dataset{index}"] = dataset_id
dataset_bindings.append(f":dataset{index}")
dataset_bindings_str = ",".join(dataset_bindings)

sql_update = f"""
UPDATE ATLAS_PANDA.JEDI_Dataset_Contents
SET maxAttempt = maxAttempt + 1
WHERE JEDITaskID = :taskID
AND datasetID IN ({dataset_bindings_str})
AND fileID IN ({file_bindings_str})
AND pandaID = :pandaID
"""

self.cur.execute(sql_update + comment, var_map)

# Commit updates
if not self._commit():
raise RuntimeError("Commit error")

except Exception:
# Roll back
self._rollback()
# Log error
self.dumpErrorMessage(_logger, method_name)
return False

tmp_log.debug("done")
return True

def setNoRetry(self, jobID, taskID, files):
# Logging
comment = " /* DBProxy.setNoRetry */"
Expand Down
11 changes: 11 additions & 0 deletions pandaserver/taskbuffer/TaskBuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2138,6 +2138,17 @@ def setMaxAttempt(self, jobID, jediTaskID, files, attemptNr):

return ret

# error classification action: increase by one the max number of retries
def increase_max_attempt(self, job_id, task_id, files):
# get proxy
proxy = self.proxyPool.getProxy()
# exec
ret = proxy.increase_max_attempt(job_id, task_id, files)
# release proxy
self.proxyPool.putProxy(proxy)

return ret

# retry module action: set maxAttempt to the current attemptNr to avoid further retries
def setNoRetry(self, jobID, jediTaskID, files):
# get proxy
Expand Down
42 changes: 25 additions & 17 deletions pandaserver/taskbuffer/retryModule.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
INCREASE_MEM_XTIMES = "increase_memory_xtimes"
REDUCE_INPUT_PER_JOB = "reduce_input_per_job"

SYSTEM_ERROR_CLASS = "system"


def timeit(method):
"""
Expand Down Expand Up @@ -239,13 +241,15 @@ def preprocess_rules(rules, error_diag_job, release_job, architecture_job, wqid_


@timeit
def apply_retrial_rules(task_buffer, jobID, errors, attemptNr):
def apply_retrial_rules(task_buffer, job, errors, attemptNr):
"""
Get rules from DB and applies them to a failed job. Actions can be:
- flag the job so it is not retried again (error code is a final state and retrying will not help)
- limit the number of retries
- increase the memory of a job if it failed because of insufficient memory
"""
jobID = job.PandaID

_logger.debug(f"Entered apply_retrial_rules for PandaID={jobID}, errors={errors}, attemptNr={attemptNr}")

retrial_rules = task_buffer.getRetrialRules()
Expand All @@ -254,7 +258,6 @@ def apply_retrial_rules(task_buffer, jobID, errors, attemptNr):
return

try:
job = task_buffer.peekJobs([jobID], fromDefined=False, fromArchived=True, fromWaiting=False)[0]
acted_on_job = False
for error in errors:
# in case of multiple errors for a job (e.g. pilot error + exe error) we will only apply one action
Expand Down Expand Up @@ -435,19 +438,18 @@ def apply_retrial_rules(task_buffer, jobID, errors, attemptNr):
_logger.debug(f"No retrial rules to apply for jobID {jobID}, attemptNr {attemptNr}, failed with {errors}. (Exception {e})")


def find_error_source(task_buffer, jobID):
def find_error_source(job_spec):
# List of errors
error_code_source = ["pilotError", "exeError", "supError", "ddmError", "brokerageError", "jobDispatcherError",
"taskBufferError"]
error_code_source = ["pilotError", "exeError", "supError", "ddmError", "brokerageError", "jobDispatcherError", "taskBufferError"]

job_spec = task_buffer.peekJobs([jobID])[0]
job_id = job_spec.PandaID
job_errors = []
# Check that jobID is available
# Check that job_id is available
if not job_spec:
_logger.debug(f"Job with ID {jobID} not found")
_logger.debug(f"Job with ID {job_id} not found")
return
else:
_logger.debug(f"Got job with ID {job_spec.PandaID} and status {job_spec.jobStatus}")
_logger.debug(f"Got job with ID {job_id} and status {job_spec.jobStatus}")
for source in error_code_source:
error_code = getattr(job_spec, source + "Code", None) # 1099
error_diag = getattr(job_spec, source + "Diag", None) # "Test error message"
Expand All @@ -466,8 +468,8 @@ def find_error_source(task_buffer, jobID):

return job_errors

def classify_error(task_buffer, job_errors):

def classify_error(task_buffer, job_errors):
# Get the error classification rules
sql = "SELECT error_source, error_code, error_diag, error_class FROM ATLAS_PANDA.ERROR_CLASSIFICATION"
var_map = []
Expand All @@ -490,18 +492,24 @@ def classify_error(task_buffer, job_errors):
_logger.debug(f"The job with {job_errors} did not match any rule and could not be classified")
return "Unknown" # Default if no match found

def apply_error_classification_logic(jobID):

def apply_error_classification_logic(task_buffer, job):
# Find the error source and getting the code, diag, and source
job_errors = find_error_source(jobID)
job_errors = find_error_source(job)

# Classify the error
class_error = classify_error(job_errors)
error_class = classify_error(task_buffer, job_errors)
return error_class


def processing_job_failure(task_buffer, job_id, errors, attempt_number):
# get the job spec from the ID
job = task_buffer.peekJobs([job_id], fromDefined=False, fromArchived=True, fromWaiting=False)[0]

def processing_job_failure(task_buffer, jobID, errors, attemptNr):
# Run the retry module on the job
apply_retrial_rules(task_buffer, jobID, errors, attemptNr)
apply_retrial_rules(task_buffer, job, errors, attempt_number)

# Applying error classification logic
apply_error_classification_logic(jobID)

error_class = apply_error_classification_logic(task_buffer, job)
if error_class == SYSTEM_ERROR_CLASS:
task_buffer.increase_max_attempt(job_id, job.jediTaskID, job.Files)