From d6862e57d1b7bc23c6584ae137de4fb702a525fc Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Tue, 10 Sep 2024 09:39:17 +0200 Subject: [PATCH] Use one experiment per optimization --- src/ert/simulator/batch_simulator.py | 12 ++--- src/everest/detached/__init__.py | 8 ++- src/everest/export.py | 12 ++++- src/everest/simulator/simulator.py | 16 +++++- .../unit_tests/simulator/test_batch_sim.py | 49 ++++++++++++++++--- tests/everest/test_everest_output.py | 22 +++++++++ 6 files changed, 100 insertions(+), 19 deletions(-) diff --git a/src/ert/simulator/batch_simulator.py b/src/ert/simulator/batch_simulator.py index 77fe397023c..20593b20aeb 100644 --- a/src/ert/simulator/batch_simulator.py +++ b/src/ert/simulator/batch_simulator.py @@ -9,7 +9,7 @@ from .batch_simulator_context import BatchContext if TYPE_CHECKING: - from ert.storage import Ensemble, Storage + from ert.storage import Ensemble, Experiment class BatchSimulator: @@ -181,7 +181,7 @@ def start( self, case_name: str, case_data: List[Tuple[int, Dict[str, Dict[str, Any]]]], - storage: Storage, + experiment: Experiment, ) -> BatchContext: """Start batch simulation, return a simulation context @@ -240,13 +240,7 @@ def start( time, so when you have called the 'start' method you need to let that batch complete before you start a new batch. """ - experiment = storage.create_experiment( - parameters=self.ert_config.ensemble_config.parameter_configuration, - responses=self.ert_config.ensemble_config.response_configuration, - name=f"experiment_{case_name}", - ) - ensemble = storage.create_ensemble( - experiment.id, + ensemble = experiment.create_ensemble( name=case_name, ensemble_size=self.ert_config.model_config.num_realizations, ) diff --git a/src/everest/detached/__init__.py b/src/everest/detached/__init__.py index 2f3cad20ffa..e18487aeeab 100644 --- a/src/everest/detached/__init__.py +++ b/src/everest/detached/__init__.py @@ -93,8 +93,14 @@ def start_server(config: EverestConfig, ert_config: ErtConfig, storage): "Failed to save optimization config: {}".format(e) ) + experiment = storage.create_experiment( + name=f"DetachedEverest@{datetime.now().strftime('%Y-%m-%d@%H:%M:%S')}", + parameters=[], + responses=[], + ) + _server = BatchSimulator(ert_config, {}, []) - _context = _server.start("dispatch_server", [(0, {})], storage) + _context = _server.start("dispatch_server", [(0, {})], experiment) return _context diff --git a/src/everest/export.py b/src/everest/export.py index 4740b1d3c17..0a0642652f1 100644 --- a/src/everest/export.py +++ b/src/everest/export.py @@ -162,7 +162,10 @@ def get_internalized_keys(config: EverestConfig, batch_ids: Optional[Set[int]] = with open_storage(config.storage_dir, "r") as storage: for batch_id in batch_ids: case_name = f"batch_{batch_id}" - experiment = storage.get_experiment_by_name(f"experiment_{case_name}") + experiments = [*storage.experiments] + assert len(experiments) == 1 + experiment = experiments[0] + ensemble = experiment.get_ensemble_by_name(case_name) if not internal_keys: internal_keys = set(ensemble.get_summary_keyset()) @@ -313,7 +316,12 @@ def _load_simulation_data( # pylint: disable=unnecessary-lambda-assignment def load_batch_by_id(): case_name = f"batch_{batch}" - experiment = storage.get_experiment_by_name(f"experiment_{case_name}") + experiments = [*storage.experiments] + + # Always assume 1 experiment per simulation/enspath, never multiple + assert len(experiments) == 1 + experiment = experiments[0] + ensemble = experiment.get_ensemble_by_name(case_name) return ensemble.load_all_summary_data() diff --git a/src/everest/simulator/simulator.py b/src/everest/simulator/simulator.py index eafe861f317..38ec1a94230 100644 --- a/src/everest/simulator/simulator.py +++ b/src/everest/simulator/simulator.py @@ -1,5 +1,6 @@ import time from collections import defaultdict +from datetime import datetime from itertools import count from typing import Any, DefaultDict, Dict, List, Mapping, Optional, Tuple, Union @@ -35,6 +36,7 @@ def __init__(self, ever_config: EverestConfig, callback=None): self._ert_config, controls_def, results_def, callback=callback ) + self._experiment_id = None self._batch = 0 self._cache: Optional[_SimulatorCache] = None if ever_config.simulator is not None and ever_config.simulator.enable_cache: @@ -132,7 +134,19 @@ def __call__( case_data.append((real_id, controls)) with open_storage(self._ert_config.ens_path, "w") as storage: - sim_context = self.start(f"batch_{self._batch}", case_data, storage) + if self._experiment_id is None: + experiment = storage.create_experiment( + name=f"EnOpt@{datetime.now().strftime('%Y-%m-%d@%H:%M:%S')}", + parameters=self.ert_config.ensemble_config.parameter_configuration, + responses=self.ert_config.ensemble_config.response_configuration, + ) + + self._experiment_id = experiment.id + else: + experiment = storage.get_experiment(self._experiment_id) + + sim_context = self.start(f"batch_{self._batch}", case_data, experiment) + while sim_context.running(): time.sleep(0.2) results = sim_context.results() diff --git a/tests/ert/unit_tests/simulator/test_batch_sim.py b/tests/ert/unit_tests/simulator/test_batch_sim.py index 591467d1969..c58d3e93725 100644 --- a/tests/ert/unit_tests/simulator/test_batch_sim.py +++ b/tests/ert/unit_tests/simulator/test_batch_sim.py @@ -142,8 +142,14 @@ def batch_simulator(batch_sim_example): def test_that_starting_with_invalid_key_raises_key_error( batch_simulator, _input, match, storage ): + experiment = storage.create_experiment( + name="EnOptCase", + parameters=batch_simulator.ert_config.ensemble_config.parameter_configuration, + responses=batch_simulator.ert_config.ensemble_config.response_configuration, + ) + with pytest.raises(KeyError, match=match): - batch_simulator.start("case", _input, storage) + batch_simulator.start("case", _input, experiment) @pytest.mark.integration_test @@ -166,7 +172,13 @@ def test_batch_simulation(batch_simulator, storage): ), ] - ctx = batch_simulator.start("case", case_data, storage=storage) + experiment = storage.create_experiment( + name="EnOptCase", + parameters=batch_simulator.ert_config.ensemble_config.parameter_configuration, + responses=batch_simulator.ert_config.ensemble_config.response_configuration, + ) + + ctx = batch_simulator.start("case", case_data, experiment) assert len(case_data) == len(ctx.mask) # Asking for results before it is complete. @@ -280,8 +292,15 @@ def test_that_batch_simulator_handles_invalid_suffixes_at_start( }, ["ORDER"], ) + + experiment = storage.create_experiment( + name="EnOptCase", + parameters=batch_sim_example.ensemble_config.parameter_configuration, + responses=batch_sim_example.ensemble_config.response_configuration, + ) + with pytest.raises(KeyError, match=match): - rsim.start("case", inp, storage) + rsim.start("case", inp, experiment) @pytest.mark.integration_test @@ -328,7 +347,13 @@ def test_batch_simulation_suffixes(batch_sim_example, storage): ), ] - ctx = rsim.start("case", case_data, storage=storage) + experiment = storage.create_experiment( + name="EnOptCase", + parameters=ert_config.ensemble_config.parameter_configuration, + responses=ert_config.ensemble_config.response_configuration, + ) + + ctx = rsim.start("case", case_data, experiment) assert len(case_data) == len(ctx) _wait_for_completion(ctx) @@ -395,8 +420,14 @@ def test_stop_sim(copy_case, storage): ), ] + experiment = storage.create_experiment( + name="EnOptCase", + parameters=ert_config.ensemble_config.parameter_configuration, + responses=ert_config.ensemble_config.response_configuration, + ) + # Starting a simulation which should actually run through. - ctx = rsim.start(case_name, case_data, storage=storage) + ctx = rsim.start(case_name, case_data, experiment) ctx.stop() status = ctx.status @@ -459,7 +490,13 @@ def test_batch_ctx_status_failing_jobs(setup_case, storage): for idx in range(10) ] - batch_ctx = rsim.start("case_name", ensembles, storage=storage) + experiment = storage.create_experiment( + name="EnOptCase", + parameters=ert_config.ensemble_config.parameter_configuration, + responses=ert_config.ensemble_config.response_configuration, + ) + + batch_ctx = rsim.start("case_name", ensembles, experiment) while batch_ctx.running(): assertContextStatusOddFailures(batch_ctx) time.sleep(1) diff --git a/tests/everest/test_everest_output.py b/tests/everest/test_everest_output.py index 3737f92cc21..9d0b76c7cc9 100644 --- a/tests/everest/test_everest_output.py +++ b/tests/everest/test_everest_output.py @@ -9,6 +9,7 @@ from ert.storage import open_storage from everest.config import EverestConfig from everest.detached import generate_everserver_ert_config, start_server +from everest.simulator.everest_to_ert import everest_to_ert_config from everest.strings import ( DEFAULT_OUTPUT_DIR, DETACHED_NODE_DIR, @@ -19,6 +20,27 @@ from tests.everest.utils import relpath, tmpdir +@tmpdir(relpath("..", "..", "test-data", "everest", "math_func")) +def test_that_one_experiment_creates_one_ensemble_per_batch(): + config = EverestConfig.load_file("config_minimal.yml") + workflow = _EverestWorkflow(config) + assert workflow is not None + + workflow.start_optimization() + + batches = os.listdir(config.simulation_dir) + ert_config = ErtConfig.with_plugins().from_dict(everest_to_ert_config(config)) + enspath = ert_config.ens_path + + with open_storage(enspath, mode="r") as storage: + experiments = [*storage.experiments] + assert len(experiments) == 1 + experiment = experiments[0] + + ensemble_names = {ens.name for ens in experiment.ensembles} + assert ensemble_names == set(batches) + + @pytest.mark.integration_test @patch("ert.simulator.BatchSimulator.start", return_value=None) @tmpdir(relpath("test_data", "mocked_test_case"))