Skip to content

Commit

Permalink
Use one experiment per optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
Yngve S. Kristiansen authored and yngve-sk committed Sep 30, 2024
1 parent 30dba51 commit d6862e5
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 19 deletions.
12 changes: 3 additions & 9 deletions src/ert/simulator/batch_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from .batch_simulator_context import BatchContext

if TYPE_CHECKING:
from ert.storage import Ensemble, Storage
from ert.storage import Ensemble, Experiment


class BatchSimulator:
Expand Down Expand Up @@ -181,7 +181,7 @@ def start(
self,
case_name: str,
case_data: List[Tuple[int, Dict[str, Dict[str, Any]]]],
storage: Storage,
experiment: Experiment,
) -> BatchContext:
"""Start batch simulation, return a simulation context
Expand Down Expand Up @@ -240,13 +240,7 @@ def start(
time, so when you have called the 'start' method you need to let that
batch complete before you start a new batch.
"""
experiment = storage.create_experiment(
parameters=self.ert_config.ensemble_config.parameter_configuration,
responses=self.ert_config.ensemble_config.response_configuration,
name=f"experiment_{case_name}",
)
ensemble = storage.create_ensemble(
experiment.id,
ensemble = experiment.create_ensemble(
name=case_name,
ensemble_size=self.ert_config.model_config.num_realizations,
)
Expand Down
8 changes: 7 additions & 1 deletion src/everest/detached/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,14 @@ def start_server(config: EverestConfig, ert_config: ErtConfig, storage):
"Failed to save optimization config: {}".format(e)
)

experiment = storage.create_experiment(
name=f"DetachedEverest@{datetime.now().strftime('%Y-%m-%d@%H:%M:%S')}",
parameters=[],
responses=[],
)

_server = BatchSimulator(ert_config, {}, [])
_context = _server.start("dispatch_server", [(0, {})], storage)
_context = _server.start("dispatch_server", [(0, {})], experiment)

return _context

Expand Down
12 changes: 10 additions & 2 deletions src/everest/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ def get_internalized_keys(config: EverestConfig, batch_ids: Optional[Set[int]] =
with open_storage(config.storage_dir, "r") as storage:
for batch_id in batch_ids:
case_name = f"batch_{batch_id}"
experiment = storage.get_experiment_by_name(f"experiment_{case_name}")
experiments = [*storage.experiments]
assert len(experiments) == 1
experiment = experiments[0]

ensemble = experiment.get_ensemble_by_name(case_name)
if not internal_keys:
internal_keys = set(ensemble.get_summary_keyset())
Expand Down Expand Up @@ -313,7 +316,12 @@ def _load_simulation_data(
# pylint: disable=unnecessary-lambda-assignment
def load_batch_by_id():
case_name = f"batch_{batch}"
experiment = storage.get_experiment_by_name(f"experiment_{case_name}")
experiments = [*storage.experiments]

# Always assume 1 experiment per simulation/enspath, never multiple
assert len(experiments) == 1
experiment = experiments[0]

ensemble = experiment.get_ensemble_by_name(case_name)
return ensemble.load_all_summary_data()

Expand Down
16 changes: 15 additions & 1 deletion src/everest/simulator/simulator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import time
from collections import defaultdict
from datetime import datetime
from itertools import count
from typing import Any, DefaultDict, Dict, List, Mapping, Optional, Tuple, Union

Expand Down Expand Up @@ -35,6 +36,7 @@ def __init__(self, ever_config: EverestConfig, callback=None):
self._ert_config, controls_def, results_def, callback=callback
)

self._experiment_id = None
self._batch = 0
self._cache: Optional[_SimulatorCache] = None
if ever_config.simulator is not None and ever_config.simulator.enable_cache:
Expand Down Expand Up @@ -132,7 +134,19 @@ def __call__(
case_data.append((real_id, controls))

with open_storage(self._ert_config.ens_path, "w") as storage:
sim_context = self.start(f"batch_{self._batch}", case_data, storage)
if self._experiment_id is None:
experiment = storage.create_experiment(
name=f"EnOpt@{datetime.now().strftime('%Y-%m-%d@%H:%M:%S')}",
parameters=self.ert_config.ensemble_config.parameter_configuration,
responses=self.ert_config.ensemble_config.response_configuration,
)

self._experiment_id = experiment.id
else:
experiment = storage.get_experiment(self._experiment_id)

sim_context = self.start(f"batch_{self._batch}", case_data, experiment)

while sim_context.running():
time.sleep(0.2)
results = sim_context.results()
Expand Down
49 changes: 43 additions & 6 deletions tests/ert/unit_tests/simulator/test_batch_sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,14 @@ def batch_simulator(batch_sim_example):
def test_that_starting_with_invalid_key_raises_key_error(
batch_simulator, _input, match, storage
):
experiment = storage.create_experiment(
name="EnOptCase",
parameters=batch_simulator.ert_config.ensemble_config.parameter_configuration,
responses=batch_simulator.ert_config.ensemble_config.response_configuration,
)

with pytest.raises(KeyError, match=match):
batch_simulator.start("case", _input, storage)
batch_simulator.start("case", _input, experiment)


@pytest.mark.integration_test
Expand All @@ -166,7 +172,13 @@ def test_batch_simulation(batch_simulator, storage):
),
]

ctx = batch_simulator.start("case", case_data, storage=storage)
experiment = storage.create_experiment(
name="EnOptCase",
parameters=batch_simulator.ert_config.ensemble_config.parameter_configuration,
responses=batch_simulator.ert_config.ensemble_config.response_configuration,
)

ctx = batch_simulator.start("case", case_data, experiment)
assert len(case_data) == len(ctx.mask)

# Asking for results before it is complete.
Expand Down Expand Up @@ -280,8 +292,15 @@ def test_that_batch_simulator_handles_invalid_suffixes_at_start(
},
["ORDER"],
)

experiment = storage.create_experiment(
name="EnOptCase",
parameters=batch_sim_example.ensemble_config.parameter_configuration,
responses=batch_sim_example.ensemble_config.response_configuration,
)

with pytest.raises(KeyError, match=match):
rsim.start("case", inp, storage)
rsim.start("case", inp, experiment)


@pytest.mark.integration_test
Expand Down Expand Up @@ -328,7 +347,13 @@ def test_batch_simulation_suffixes(batch_sim_example, storage):
),
]

ctx = rsim.start("case", case_data, storage=storage)
experiment = storage.create_experiment(
name="EnOptCase",
parameters=ert_config.ensemble_config.parameter_configuration,
responses=ert_config.ensemble_config.response_configuration,
)

ctx = rsim.start("case", case_data, experiment)
assert len(case_data) == len(ctx)
_wait_for_completion(ctx)

Expand Down Expand Up @@ -395,8 +420,14 @@ def test_stop_sim(copy_case, storage):
),
]

experiment = storage.create_experiment(
name="EnOptCase",
parameters=ert_config.ensemble_config.parameter_configuration,
responses=ert_config.ensemble_config.response_configuration,
)

# Starting a simulation which should actually run through.
ctx = rsim.start(case_name, case_data, storage=storage)
ctx = rsim.start(case_name, case_data, experiment)

ctx.stop()
status = ctx.status
Expand Down Expand Up @@ -459,7 +490,13 @@ def test_batch_ctx_status_failing_jobs(setup_case, storage):
for idx in range(10)
]

batch_ctx = rsim.start("case_name", ensembles, storage=storage)
experiment = storage.create_experiment(
name="EnOptCase",
parameters=ert_config.ensemble_config.parameter_configuration,
responses=ert_config.ensemble_config.response_configuration,
)

batch_ctx = rsim.start("case_name", ensembles, experiment)
while batch_ctx.running():
assertContextStatusOddFailures(batch_ctx)
time.sleep(1)
Expand Down
22 changes: 22 additions & 0 deletions tests/everest/test_everest_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ert.storage import open_storage
from everest.config import EverestConfig
from everest.detached import generate_everserver_ert_config, start_server
from everest.simulator.everest_to_ert import everest_to_ert_config
from everest.strings import (
DEFAULT_OUTPUT_DIR,
DETACHED_NODE_DIR,
Expand All @@ -19,6 +20,27 @@
from tests.everest.utils import relpath, tmpdir


@tmpdir(relpath("..", "..", "test-data", "everest", "math_func"))
def test_that_one_experiment_creates_one_ensemble_per_batch():
config = EverestConfig.load_file("config_minimal.yml")
workflow = _EverestWorkflow(config)
assert workflow is not None

workflow.start_optimization()

batches = os.listdir(config.simulation_dir)
ert_config = ErtConfig.with_plugins().from_dict(everest_to_ert_config(config))
enspath = ert_config.ens_path

with open_storage(enspath, mode="r") as storage:
experiments = [*storage.experiments]
assert len(experiments) == 1
experiment = experiments[0]

ensemble_names = {ens.name for ens in experiment.ensembles}
assert ensemble_names == set(batches)


@pytest.mark.integration_test
@patch("ert.simulator.BatchSimulator.start", return_value=None)
@tmpdir(relpath("test_data", "mocked_test_case"))
Expand Down

0 comments on commit d6862e5

Please sign in to comment.