diff --git a/capsul/database/__init__.py b/capsul/database/__init__.py index 1da57476..9ecaa25d 100644 --- a/capsul/database/__init__.py +++ b/capsul/database/__init__.py @@ -271,6 +271,9 @@ def job_parameters_from_values(self, job_dict, parameters_values): result[k] = parameters_values[i] return result + def successful_node_paths(self, engine_id, execution_id): + raise NotImplementedError + def print_execution_report(self, report, file=sys.stdout): print( "====================\n" "| Execution report |\n" "====================\n", @@ -410,6 +413,8 @@ def update_executable(self, engine_id, execution_id, executable): # pprint(parameters.content) while stack: node, parameters = stack.pop(0) + if parameters is None: + continue for field in node.user_fields(): value = parameters.get(field.name, undefined) value = parameters.no_proxy(value) @@ -421,7 +426,7 @@ def update_executable(self, engine_id, execution_id, executable): setattr(node, field.name, value) if isinstance(node, Pipeline): stack.extend( - (n, parameters["nodes"][n.name]) + (n, parameters["nodes"].get(n.name)) for n in node.nodes.values() if n is not node and isinstance(n, Process) and n.activated ) diff --git a/capsul/database/redis.py b/capsul/database/redis.py index 1a54b950..bf9c464f 100644 --- a/capsul/database/redis.py +++ b/capsul/database/redis.py @@ -310,31 +310,6 @@ def _enter(self): """ ) - self._update_workflow_parameters = self.redis.register_script( - """ - local execution_key = KEYS[1] - - local parameters_location = cjson.decode(ARGV[1]) - local output_parameters = cjson.decode(ARGV[2]) - local workflow_parameters = cjson.decode(redis.call('hget', execution_key, 'workflow_parameters')) - - local parameters = workflow_parameters['content'] - for index, value in ipairs(parameters_location) do - local i = tonumber(value) - if i then - parameters = parameters[i+1] - else - parameters = parameters[value] - end - end - - for k, v in pairs(output_parameters) do - workflow_parameters['proxy_values'][parameters[k][2]+1] = v - end - redis.call('hset', execution_key, 'workflow_parameters', cjson.encode(workflow_parameters)) - """ - ) - self._dispose = self.redis.register_script( """ local function table_find(array, value) @@ -808,6 +783,19 @@ def execution_report_json(self, engine_id, execution_id): return result + def successful_node_paths(self, engine_id, execution_id): + execution_key = f"capsul:{engine_id}:{execution_id}" + failed = json.loads(self.redis.hget(execution_key, "done")) + for job_uuid in failed: + job = json.loads( + self.redis.hget(f"capsul:{engine_id}:{execution_id}", f"job:{job_uuid}") + ) + parameters_location = job.get("parameters_location") + if parameters_location: + result = tuple(i for i in parameters_location if i != "nodes") + if result != ("directories_creation",): + yield result + def dispose(self, engine_id, execution_id, bypass_persistence=False): keys = [f"capsul:{engine_id}", f"capsul:{engine_id}:{execution_id}"] args = [execution_id, int(bool(bypass_persistence))] diff --git a/capsul/engine/__init__.py b/capsul/engine/__init__.py index 030ddbb3..49d2ad00 100644 --- a/capsul/engine/__init__.py +++ b/capsul/engine/__init__.py @@ -410,6 +410,25 @@ def run(self, executable, timeout=None, print_report=False, debug=False, **kwarg self.dispose(execution_id) return status + def prepare_pipeline_for_retry(self, pipeline, execution_id): + """Modify a pipeline given a previous execution to select only the nodes that + weren't successful. Running the pipeline after this step will retry the + execution of faile jobs. This method adds (or modify if it exists) an unselectd + pipeline step called "succesfully_executed" containing all nodes that were + succesfully executed. + """ "" + successful_nodes = [] + for path in self.database.successful_node_paths(self.engine_id, execution_id): + successful_nodes.append(pipeline.node_from_path(path).name) + step_field = None + if pipeline.field("pipeline_steps"): + step_field = pipeline.pipeline_steps.fields("succesfully_executed") + if step_field is None: + pipeline.add_pipeline_step("succesfully_executed", successful_nodes, False) + else: + step_field.nodes = successful_nodes + setattr(pipeline.pipeline_steps, "succesfully_executed", False) + class Workers(Controller): def __init__(self, engine_label, engine_config, database): diff --git a/capsul/pipeline/pipeline.py b/capsul/pipeline/pipeline.py index a907ffcf..a5f18e65 100644 --- a/capsul/pipeline/pipeline.py +++ b/capsul/pipeline/pipeline.py @@ -2449,6 +2449,13 @@ def __setattr__(self, name, value): self.dispatch_value(self, name, value) return result + def __setitem__(self, path, value): + path = path.split(".") + node_path = path[:-1] + node = self.node_from_path(node_path) + setattr(node, path[-1], value) + self.dispatch_value(node, path[-1], value) + def dispatch_value(self, node, name, value): """Propagate the value from a pipeline plug through links""" # print(f"!dispatch! {node.name}.{name} = {value}") @@ -2650,6 +2657,15 @@ def import_json(self, json): ): self.dispatch_value(node, names[-1], json_value) + def node_from_path(self, path): + node = self + for path_item in path: + if isinstance(node, ProcessIteration): + node = node.process + else: + node = node.nodes[path_item] + return node + class CustomPipeline(Pipeline): def __init__(self, definition="custom_pipeline", json_executable={}):