From fa268e06bb5caf452c0ef63bc420f738495d1c07 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Tue, 10 Sep 2024 09:39:17 +0200 Subject: [PATCH] Use one experiment per optimization (Everest) Use one experiment per optimization --- src/ert/simulator/batch_simulator.py | 14 ++---- src/everest/detached/__init__.py | 8 ++- src/everest/export.py | 12 ++++- src/everest/simulator/simulator.py | 15 +++++- .../unit_tests/simulator/test_batch_sim.py | 49 ++++++++++++++++--- tests/everest/test_everest_output.py | 22 +++++++++ 6 files changed, 101 insertions(+), 19 deletions(-) diff --git a/src/ert/simulator/batch_simulator.py b/src/ert/simulator/batch_simulator.py index 77fe397023c..70fe59613a5 100644 --- a/src/ert/simulator/batch_simulator.py +++ b/src/ert/simulator/batch_simulator.py @@ -1,6 +1,7 @@ from __future__ import annotations from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union +from uuid import UUID import numpy as np @@ -9,7 +10,7 @@ from .batch_simulator_context import BatchContext if TYPE_CHECKING: - from ert.storage import Ensemble, Storage + from ert.storage import Ensemble, Experiment class BatchSimulator: @@ -91,6 +92,7 @@ def callback(*args, **kwargs): self.control_keys = set(controls.keys()) self.result_keys = set(results) self.callback = callback + self._experiment_id: Optional[UUID] = None ens_config = self.ert_config.ensemble_config for control_name, variables in controls.items(): @@ -181,7 +183,7 @@ def start( self, case_name: str, case_data: List[Tuple[int, Dict[str, Dict[str, Any]]]], - storage: Storage, + experiment: Experiment, ) -> BatchContext: """Start batch simulation, return a simulation context @@ -240,13 +242,7 @@ def start( time, so when you have called the 'start' method you need to let that batch complete before you start a new batch. """ - experiment = storage.create_experiment( - parameters=self.ert_config.ensemble_config.parameter_configuration, - responses=self.ert_config.ensemble_config.response_configuration, - name=f"experiment_{case_name}", - ) - ensemble = storage.create_ensemble( - experiment.id, + ensemble = experiment.create_ensemble( name=case_name, ensemble_size=self.ert_config.model_config.num_realizations, ) diff --git a/src/everest/detached/__init__.py b/src/everest/detached/__init__.py index 2f3cad20ffa..e18487aeeab 100644 --- a/src/everest/detached/__init__.py +++ b/src/everest/detached/__init__.py @@ -93,8 +93,14 @@ def start_server(config: EverestConfig, ert_config: ErtConfig, storage): "Failed to save optimization config: {}".format(e) ) + experiment = storage.create_experiment( + name=f"DetachedEverest@{datetime.now().strftime('%Y-%m-%d@%H:%M:%S')}", + parameters=[], + responses=[], + ) + _server = BatchSimulator(ert_config, {}, []) - _context = _server.start("dispatch_server", [(0, {})], storage) + _context = _server.start("dispatch_server", [(0, {})], experiment) return _context diff --git a/src/everest/export.py b/src/everest/export.py index 4740b1d3c17..b1dd673b7f2 100644 --- a/src/everest/export.py +++ b/src/everest/export.py @@ -162,7 +162,10 @@ def get_internalized_keys(config: EverestConfig, batch_ids: Optional[Set[int]] = with open_storage(config.storage_dir, "r") as storage: for batch_id in batch_ids: case_name = f"batch_{batch_id}" - experiment = storage.get_experiment_by_name(f"experiment_{case_name}") + experiments = [*storage.experiments] + assert len(experiments) == 1 + experiment = storage.experiments[0] + ensemble = experiment.get_ensemble_by_name(case_name) if not internal_keys: internal_keys = set(ensemble.get_summary_keyset()) @@ -313,7 +316,12 @@ def _load_simulation_data( # pylint: disable=unnecessary-lambda-assignment def load_batch_by_id(): case_name = f"batch_{batch}" - experiment = storage.get_experiment_by_name(f"experiment_{case_name}") + experiments = [*storage.experiments] + + # Always assume 1 experiment per simulation/enspath, never multiple + assert len(experiments) == 1 + experiment = experiments[0] + ensemble = experiment.get_ensemble_by_name(case_name) return ensemble.load_all_summary_data() diff --git a/src/everest/simulator/simulator.py b/src/everest/simulator/simulator.py index eafe861f317..ef4d2f89bc7 100644 --- a/src/everest/simulator/simulator.py +++ b/src/everest/simulator/simulator.py @@ -1,3 +1,4 @@ +import datetime import time from collections import defaultdict from itertools import count @@ -132,7 +133,19 @@ def __call__( case_data.append((real_id, controls)) with open_storage(self._ert_config.ens_path, "w") as storage: - sim_context = self.start(f"batch_{self._batch}", case_data, storage) + if self._experiment_id is None: + experiment = storage.create_experiment( + name=f"EnOpt@{datetime.now().strftime('%Y-%m-%d@%H:%M:%S')}", + parameters=self.ert_config.ensemble_config.parameter_configuration, + responses=self.ert_config.ensemble_config.response_configuration, + ) + + self._experiment_id = experiment.id + else: + experiment = storage.get_experiment(self._experiment_id) + + sim_context = self.start(f"batch_{self._batch}", case_data, experiment) + while sim_context.running(): time.sleep(0.2) results = sim_context.results() diff --git a/tests/ert/unit_tests/simulator/test_batch_sim.py b/tests/ert/unit_tests/simulator/test_batch_sim.py index 591467d1969..c58d3e93725 100644 --- a/tests/ert/unit_tests/simulator/test_batch_sim.py +++ b/tests/ert/unit_tests/simulator/test_batch_sim.py @@ -142,8 +142,14 @@ def batch_simulator(batch_sim_example): def test_that_starting_with_invalid_key_raises_key_error( batch_simulator, _input, match, storage ): + experiment = storage.create_experiment( + name="EnOptCase", + parameters=batch_simulator.ert_config.ensemble_config.parameter_configuration, + responses=batch_simulator.ert_config.ensemble_config.response_configuration, + ) + with pytest.raises(KeyError, match=match): - batch_simulator.start("case", _input, storage) + batch_simulator.start("case", _input, experiment) @pytest.mark.integration_test @@ -166,7 +172,13 @@ def test_batch_simulation(batch_simulator, storage): ), ] - ctx = batch_simulator.start("case", case_data, storage=storage) + experiment = storage.create_experiment( + name="EnOptCase", + parameters=batch_simulator.ert_config.ensemble_config.parameter_configuration, + responses=batch_simulator.ert_config.ensemble_config.response_configuration, + ) + + ctx = batch_simulator.start("case", case_data, experiment) assert len(case_data) == len(ctx.mask) # Asking for results before it is complete. @@ -280,8 +292,15 @@ def test_that_batch_simulator_handles_invalid_suffixes_at_start( }, ["ORDER"], ) + + experiment = storage.create_experiment( + name="EnOptCase", + parameters=batch_sim_example.ensemble_config.parameter_configuration, + responses=batch_sim_example.ensemble_config.response_configuration, + ) + with pytest.raises(KeyError, match=match): - rsim.start("case", inp, storage) + rsim.start("case", inp, experiment) @pytest.mark.integration_test @@ -328,7 +347,13 @@ def test_batch_simulation_suffixes(batch_sim_example, storage): ), ] - ctx = rsim.start("case", case_data, storage=storage) + experiment = storage.create_experiment( + name="EnOptCase", + parameters=ert_config.ensemble_config.parameter_configuration, + responses=ert_config.ensemble_config.response_configuration, + ) + + ctx = rsim.start("case", case_data, experiment) assert len(case_data) == len(ctx) _wait_for_completion(ctx) @@ -395,8 +420,14 @@ def test_stop_sim(copy_case, storage): ), ] + experiment = storage.create_experiment( + name="EnOptCase", + parameters=ert_config.ensemble_config.parameter_configuration, + responses=ert_config.ensemble_config.response_configuration, + ) + # Starting a simulation which should actually run through. - ctx = rsim.start(case_name, case_data, storage=storage) + ctx = rsim.start(case_name, case_data, experiment) ctx.stop() status = ctx.status @@ -459,7 +490,13 @@ def test_batch_ctx_status_failing_jobs(setup_case, storage): for idx in range(10) ] - batch_ctx = rsim.start("case_name", ensembles, storage=storage) + experiment = storage.create_experiment( + name="EnOptCase", + parameters=ert_config.ensemble_config.parameter_configuration, + responses=ert_config.ensemble_config.response_configuration, + ) + + batch_ctx = rsim.start("case_name", ensembles, experiment) while batch_ctx.running(): assertContextStatusOddFailures(batch_ctx) time.sleep(1) diff --git a/tests/everest/test_everest_output.py b/tests/everest/test_everest_output.py index 3737f92cc21..2397b283745 100644 --- a/tests/everest/test_everest_output.py +++ b/tests/everest/test_everest_output.py @@ -9,6 +9,7 @@ from ert.storage import open_storage from everest.config import EverestConfig from everest.detached import generate_everserver_ert_config, start_server +from everest.simulator.everest_to_ert import everest_to_ert_config from everest.strings import ( DEFAULT_OUTPUT_DIR, DETACHED_NODE_DIR, @@ -19,6 +20,27 @@ from tests.everest.utils import relpath, tmpdir +@tmpdir(relpath("..", "examples", "math_func")) +def test_that_one_experiment_creates_one_ensemble_per_batch(): + config = EverestConfig.load_file("config_minimal.yml") + workflow = _EverestWorkflow(config) + assert workflow is not None + + workflow.start_optimization() + + batches = os.listdir(config.simulation_dir) + ert_config = ErtConfig.with_plugins().from_dict(everest_to_ert_config(config)) + enspath = ert_config.ens_path + + with open_storage(enspath, mode="r") as storage: + experiments = [*storage.experiments] + assert len(experiments) == 1 + experiment = experiments[0] + + ensemble_names = {ens.name for ens in experiment.ensembles} + assert ensemble_names == set(batches) + + @pytest.mark.integration_test @patch("ert.simulator.BatchSimulator.start", return_value=None) @tmpdir(relpath("test_data", "mocked_test_case"))