diff --git a/python/lsst/pipe/base/quantum_provenance_graph.py b/python/lsst/pipe/base/quantum_provenance_graph.py index 4545f7289..6ecc5a66c 100644 --- a/python/lsst/pipe/base/quantum_provenance_graph.py +++ b/python/lsst/pipe/base/quantum_provenance_graph.py @@ -634,7 +634,7 @@ def get_dataset_info(self, key: DatasetKey) -> DatasetInfo: """ return self._xgraph.nodes[key] - def add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExpression) -> None: + def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExpression) -> None: """Add a new quantum graph to the `QuantumProvenanceGraph`. Step through the quantum graph. Annotate a `networkx.DiGraph` @@ -831,7 +831,7 @@ def add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExpre # Update `QuantumInfo.status` for this quantum. quantum_info["status"] = new_status - def resolve_duplicates( + def __resolve_duplicates( self, butler: Butler, collections: Sequence[str] | None = None, @@ -971,6 +971,37 @@ def resolve_duplicates( # self._finalized = True so that it cannot be run again. self._finalized = True + def assemble_quantum_provenance_graph( + self, + butler: Butler, + qgraphs: Sequence[QuantumGraph | ResourcePathExpression], + collections: Sequence[str] | None = None, + where: str = "", + curse_failed_logs: bool = False, + ) -> None: + output_runs = [] + if not isinstance(qgraphs, Sequence): + qgraphs = list(qgraphs) + for count, qgraph in enumerate(qgraphs): + # If the most recent graph's timestamp was earlier than any of the + # previous graphs, raise a RuntimeError. + if len(qgraphs) > 1: + breakpoint() + for graph in qgraphs[: count - 1]: + if qgraph.metadata["time"] < graph.metadata["time"]: + raise RuntimeError( + """add_new_graph may only be called on graphs + which are passed in the order they were + created. Please call again, passing your + graphs in order.""" + ) + self.__add_new_graph(butler, qgraph) + output_runs.append(qgraph.metadata["output_run"]) + # If the user has not passed a `collections` variable + if not collections: + collections = list(reversed(output_runs)) + self.__resolve_duplicates(butler, collections, where, curse_failed_logs) + def to_summary(self, butler: Butler, do_store_logs: bool = True) -> Summary: """Summarize the `QuantumProvenanceGraph`. diff --git a/tests/test_quantum_provenance_graph.py b/tests/test_quantum_provenance_graph.py index eccda2073..2d031c90f 100644 --- a/tests/test_quantum_provenance_graph.py +++ b/tests/test_quantum_provenance_graph.py @@ -54,8 +54,7 @@ def test_qpg_reports(self) -> None: # make a simple qgraph to make an execution report on butler, qgraph = simpleQGraph.makeSimpleQGraph(root=root) qpg = QuantumProvenanceGraph() - qpg.add_new_graph(butler, qgraph) - qpg.resolve_duplicates(butler) + qpg.assemble_quantum_provenance_graph(butler, qgraph) summary = qpg.to_summary(butler) for task_summary in summary.tasks.values():