diff --git a/pandaserver/dataservice/adder_gen.py b/pandaserver/dataservice/adder_gen.py index b4ef8fe9..d1daa5bd 100644 --- a/pandaserver/dataservice/adder_gen.py +++ b/pandaserver/dataservice/adder_gen.py @@ -336,16 +336,16 @@ def handle_failed_job(self) -> None: if source and error_code: try: - self.logger.debug("AdderGen.run will call apply_retrial_rules") - retryModule.apply_retrial_rules( + self.logger.debug("AdderGen.run will call processing_job_failure") + retryModule.processing_job_failure( self.taskBuffer, self.job.PandaID, errors, self.job.attemptNr, ) - self.logger.debug("apply_retrial_rules is back") + self.logger.debug("processing_job_failure is back") except Exception as e: - self.logger.error(f"apply_retrial_rules excepted and needs to be investigated ({e}): {traceback.format_exc()}") + self.logger.error(f"processing_job_failure excepted and needs to be investigated ({e}): {traceback.format_exc()}") self.job.jobStatus = "failed" for file in self.job.Files: @@ -488,18 +488,18 @@ def check_job_status(self) -> None: "error_diag": error_diag, } ] - self.logger.debug("AdderGen.run 2 will call apply_retrial_rules") - retryModule.apply_retrial_rules( + self.logger.debug("AdderGen.run 2 will call processing_job_failure") + retryModule.processing_job_failure( self.taskBuffer, job_tmp.PandaID, errors, job_tmp.attemptNr, ) - self.logger.debug("apply_retrial_rules 2 is back") + self.logger.debug("processing_job_failure 2 is back") except IndexError: pass except Exception as e: - self.logger.error(f"apply_retrial_rules 2 excepted and needs to be investigated ({e}): {traceback.format_exc()}") + self.logger.error(f"processing_job_failure 2 excepted and needs to be investigated ({e}): {traceback.format_exc()}") self.setup_closer() diff --git a/pandaserver/jobdispatcher/Watcher.py b/pandaserver/jobdispatcher/Watcher.py index 99163f0b..006bdb43 100755 --- a/pandaserver/jobdispatcher/Watcher.py +++ b/pandaserver/jobdispatcher/Watcher.py @@ -136,11 +136,11 @@ def run(self): ] try: - self.logger.debug("Watcher will call apply_retrial_rules") - retryModule.apply_retrial_rules(self.taskBuffer, job.PandaID, errors, job.attemptNr) - self.logger.debug("apply_retrial_rules is back") + self.logger.debug("Watcher will call processing_job_failure") + retryModule.processing_job_failure(self.taskBuffer, job.PandaID, errors, job.attemptNr) + self.logger.debug("processing_job_failure is back") except Exception as e: - self.logger.debug(f"apply_retrial_rules excepted and needs to be investigated ({e}): {traceback.format_exc()}") + self.logger.debug(f"processing_job_failure excepted and needs to be investigated ({e}): {traceback.format_exc()}") # updateJobs was successful and it failed a job with taskBufferErrorCode try: @@ -155,8 +155,8 @@ def run(self): source = "taskBufferErrorCode" error_code = job_tmp.taskBufferErrorCode error_diag = job_tmp.taskBufferErrorDiag - self.logger.debug("Watcher.run 2 will call apply_retrial_rules") - retryModule.apply_retrial_rules( + self.logger.debug("Watcher.run 2 will call processing_job_failure") + retryModule.processing_job_failure( self.taskBuffer, job_tmp.PandaID, source, @@ -164,11 +164,11 @@ def run(self): error_diag, job_tmp.attemptNr, ) - self.logger.debug("apply_retrial_rules 2 is back") + self.logger.debug("processing_job_failure 2 is back") except IndexError: pass except Exception as e: - self.logger.error(f"apply_retrial_rules 2 excepted and needs to be investigated ({e}): {traceback.format_exc()}") + self.logger.error(f"processing_job_failure 2 excepted and needs to be investigated ({e}): {traceback.format_exc()}") cThr = Closer(self.taskBuffer, destDBList, job) cThr.run() diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index fcc7d3e5..e3de7377 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -14415,6 +14415,71 @@ def setMaxAttempt(self, jobID, taskID, files, maxAttempt): tmpLog.debug("done") return True + def increase_max_attempt(self, job_id, task_id, files): + """Increase the max attempt number by one for specific files.""" + comment = " /* DBProxy.increase_max_attempt */" + method_name = comment.split(" ")[-2].split(".")[-1] + tmp_log = LogWrapper(_logger, method_name) + tmp_log.debug("start") + + # Update the file entries to increase the max attempt number by one + input_types = ("input", "pseudo_input", "pp_input", "trn_log", "trn_output") + input_files = [ + pandafile for pandafile in files + if pandafile.type in input_types and re.search("DBRelease", pandafile.lfn) is None + ] + input_file_ids = [input_file.fileID for input_file in input_files] + input_dataset_ids = [input_file.datasetID for input_file in input_files] + + if input_file_ids: + try: + # Start transaction + self.conn.begin() + + var_map = { + ":taskID": task_id, + ":pandaID": job_id, + } + + # Bind the files + file_bindings = [] + for index, file_id in enumerate(input_file_ids): + var_map[f":file{index}"] = file_id + file_bindings.append(f":file{index}") + file_bindings_str = ",".join(file_bindings) + + # Bind the datasets + dataset_bindings = [] + for index, dataset_id in enumerate(input_dataset_ids): + var_map[f":dataset{index}"] = dataset_id + dataset_bindings.append(f":dataset{index}") + dataset_bindings_str = ",".join(dataset_bindings) + + sql_update = f""" + UPDATE ATLAS_PANDA.JEDI_Dataset_Contents + SET maxAttempt = maxAttempt + 1 + WHERE JEDITaskID = :taskID + AND datasetID IN ({dataset_bindings_str}) + AND fileID IN ({file_bindings_str}) + AND pandaID = :pandaID + """ + + self.cur.execute(sql_update + comment, var_map) + + # Commit updates + if not self._commit(): + raise RuntimeError("Commit error") + + except Exception: + # Roll back + self._rollback() + # Log error + self.dumpErrorMessage(_logger, method_name) + return False + + tmp_log.debug("done") + return True + def setNoRetry(self, jobID, taskID, files): # Logging comment = " /* DBProxy.setNoRetry */" diff --git a/pandaserver/taskbuffer/TaskBuffer.py b/pandaserver/taskbuffer/TaskBuffer.py index c4737fce..b810114b 100755 --- a/pandaserver/taskbuffer/TaskBuffer.py +++ b/pandaserver/taskbuffer/TaskBuffer.py @@ -2138,6 +2138,17 @@ def setMaxAttempt(self, jobID, jediTaskID, files, attemptNr): return ret + # error classification action: increase by one the max number of retries + def increase_max_attempt(self, job_id, task_id, files): + # get proxy + proxy = self.proxyPool.getProxy() + # exec + ret = proxy.increase_max_attempt(job_id, task_id, files) + # release proxy + self.proxyPool.putProxy(proxy) + + return ret + # retry module action: set maxAttempt to the current attemptNr to avoid further retries def setNoRetry(self, jobID, jediTaskID, files): # get proxy diff --git a/pandaserver/taskbuffer/retryModule.py b/pandaserver/taskbuffer/retryModule.py index a766667e..223e865c 100644 --- a/pandaserver/taskbuffer/retryModule.py +++ b/pandaserver/taskbuffer/retryModule.py @@ -16,6 +16,8 @@ INCREASE_MEM_XTIMES = "increase_memory_xtimes" REDUCE_INPUT_PER_JOB = "reduce_input_per_job" +SYSTEM_ERROR_CLASS = "system" + def timeit(method): """ @@ -239,13 +241,15 @@ def preprocess_rules(rules, error_diag_job, release_job, architecture_job, wqid_ @timeit -def apply_retrial_rules(task_buffer, jobID, errors, attemptNr): +def apply_retrial_rules(task_buffer, job, errors, attemptNr): """ Get rules from DB and applies them to a failed job. Actions can be: - flag the job so it is not retried again (error code is a final state and retrying will not help) - limit the number of retries - increase the memory of a job if it failed because of insufficient memory """ + jobID = job.PandaID + _logger.debug(f"Entered apply_retrial_rules for PandaID={jobID}, errors={errors}, attemptNr={attemptNr}") retrial_rules = task_buffer.getRetrialRules() @@ -254,7 +258,6 @@ def apply_retrial_rules(task_buffer, jobID, errors, attemptNr): return try: - job = task_buffer.peekJobs([jobID], fromDefined=False, fromArchived=True, fromWaiting=False)[0] acted_on_job = False for error in errors: # in case of multiple errors for a job (e.g. pilot error + exe error) we will only apply one action @@ -435,19 +438,18 @@ def apply_retrial_rules(task_buffer, jobID, errors, attemptNr): _logger.debug(f"No retrial rules to apply for jobID {jobID}, attemptNr {attemptNr}, failed with {errors}. (Exception {e})") -def find_error_source(task_buffer, jobID): +def find_error_source(job_spec): # List of errors - error_code_source = ["pilotError", "exeError", "supError", "ddmError", "brokerageError", "jobDispatcherError", - "taskBufferError"] + error_code_source = ["pilotError", "exeError", "supError", "ddmError", "brokerageError", "jobDispatcherError", "taskBufferError"] - job_spec = task_buffer.peekJobs([jobID])[0] + job_id = job_spec.PandaID job_errors = [] - # Check that jobID is available + # Check that job_id is available if not job_spec: - _logger.debug(f"Job with ID {jobID} not found") + _logger.debug(f"Job with ID {job_id} not found") return else: - _logger.debug(f"Got job with ID {job_spec.PandaID} and status {job_spec.jobStatus}") + _logger.debug(f"Got job with ID {job_id} and status {job_spec.jobStatus}") for source in error_code_source: error_code = getattr(job_spec, source + "Code", None) # 1099 error_diag = getattr(job_spec, source + "Diag", None) # "Test error message" @@ -466,8 +468,8 @@ def find_error_source(task_buffer, jobID): return job_errors -def classify_error(task_buffer, job_errors): +def classify_error(task_buffer, job_errors): # Get the error classification rules sql = "SELECT error_source, error_code, error_diag, error_class FROM ATLAS_PANDA.ERROR_CLASSIFICATION" var_map = [] @@ -490,18 +492,24 @@ def classify_error(task_buffer, job_errors): _logger.debug(f"The job with {job_errors} did not match any rule and could not be classified") return "Unknown" # Default if no match found -def apply_error_classification_logic(jobID): + +def apply_error_classification_logic(task_buffer, job): # Find the error source and getting the code, diag, and source - job_errors = find_error_source(jobID) + job_errors = find_error_source(job) # Classify the error - class_error = classify_error(job_errors) + error_class = classify_error(task_buffer, job_errors) + return error_class + +def processing_job_failure(task_buffer, job_id, errors, attempt_number): + # get the job spec from the ID + job = task_buffer.peekJobs([job_id], fromDefined=False, fromArchived=True, fromWaiting=False)[0] -def processing_job_failure(task_buffer, jobID, errors, attemptNr): # Run the retry module on the job - apply_retrial_rules(task_buffer, jobID, errors, attemptNr) + apply_retrial_rules(task_buffer, job, errors, attempt_number) # Applying error classification logic - apply_error_classification_logic(jobID) - + error_class = apply_error_classification_logic(task_buffer, job) + if error_class == SYSTEM_ERROR_CLASS: + task_buffer.increase_max_attempt(job_id, job.jediTaskID, job.Files)