diff --git a/src/ert/callbacks.py b/src/ert/callbacks.py index b3904dc9f5c..214222433b7 100644 --- a/src/ert/callbacks.py +++ b/src/ert/callbacks.py @@ -5,7 +5,7 @@ from pathlib import Path from typing import Iterable -from ert.config import EnsembleConfig, ParameterConfig, SummaryConfig +from ert.config import ParameterConfig, ResponseConfig, SummaryConfig from ert.run_arg import RunArg from .load_status import LoadResult, LoadStatus @@ -38,10 +38,10 @@ def _read_parameters( def _write_responses_to_storage( - ens_config: EnsembleConfig, run_arg: RunArg + run_arg: RunArg, response_configs: Iterable[ResponseConfig] ) -> LoadResult: errors = [] - for config in ens_config.response_configs.values(): + for config in response_configs: if isinstance(config, SummaryConfig) and not config.keys: # Nothing to load, should not be handled here, should never be # added in the first place @@ -64,7 +64,6 @@ def _write_responses_to_storage( def forward_model_ok( run_arg: RunArg, - ens_conf: EnsembleConfig, ) -> LoadResult: parameters_result = LoadResult(LoadStatus.LOAD_SUCCESSFUL, "") response_result = LoadResult(LoadStatus.LOAD_SUCCESSFUL, "") @@ -78,7 +77,10 @@ def forward_model_ok( ) if parameters_result.status == LoadStatus.LOAD_SUCCESSFUL: - response_result = _write_responses_to_storage(ens_conf, run_arg) + response_result = _write_responses_to_storage( + run_arg, + run_arg.ensemble_storage.experiment.response_configuration.values(), + ) except Exception as err: logging.exception(f"Failed to load results for realization {run_arg.iens}") diff --git a/src/ert/cli/main.py b/src/ert/cli/main.py index 0548c1effe2..61bac00459e 100644 --- a/src/ert/cli/main.py +++ b/src/ert/cli/main.py @@ -83,7 +83,8 @@ def run_cli(args: Namespace, _: Any = None) -> None: evaluator_server_config = EvaluatorServerConfig(custom_port_range=args.port_range) experiment = storage.create_experiment( - parameters=ert.ensembleConfig().parameter_configuration + parameters=ert.ensembleConfig().parameter_configuration, + responses=ert.ensembleConfig().response_configuration, ) # Note that asyncio.run should be called once in ert/shared/main.py diff --git a/src/ert/config/__init__.py b/src/ert/config/__init__.py index 69dfe123f3d..d9577324450 100644 --- a/src/ert/config/__init__.py +++ b/src/ert/config/__init__.py @@ -19,6 +19,7 @@ from .parsing import ConfigValidationError, ConfigWarning from .queue_config import QueueConfig from .queue_system import QueueSystem +from .response_config import ResponseConfig from .summary_config import SummaryConfig from .summary_observation import SummaryObservation from .surface_config import SurfaceConfig @@ -53,6 +54,7 @@ "PriorDict", "QueueConfig", "QueueSystem", + "ResponseConfig", "SummaryConfig", "SummaryObservation", "SurfaceConfig", diff --git a/src/ert/config/ensemble_config.py b/src/ert/config/ensemble_config.py index ce3d2af2e56..24a6caa6ae5 100644 --- a/src/ert/config/ensemble_config.py +++ b/src/ert/config/ensemble_config.py @@ -304,3 +304,7 @@ def get_summary_keys(self) -> List[str]: @property def parameter_configuration(self) -> List[ParameterConfig]: return list(self.parameter_configs.values()) + + @property + def response_configuration(self) -> List[ResponseConfig]: + return list(self.response_configs.values()) diff --git a/src/ert/config/gen_data_config.py b/src/ert/config/gen_data_config.py index 10c384c6556..997c51b852c 100644 --- a/src/ert/config/gen_data_config.py +++ b/src/ert/config/gen_data_config.py @@ -5,7 +5,6 @@ import numpy as np import xarray as xr -from sortedcontainers import SortedList from typing_extensions import Self from ert.validation import rangestring_to_list @@ -18,11 +17,11 @@ @dataclass class GenDataConfig(ResponseConfig): input_file: str = "" - report_steps: Optional[SortedList] = None + report_steps: Optional[List[int]] = None def __post_init__(self) -> None: if isinstance(self.report_steps, list): - self.report_steps = SortedList(set(self.report_steps)) + self.report_steps = list(set(self.report_steps)) @classmethod def from_config_list(cls, gen_data: List[str]) -> Self: @@ -35,8 +34,10 @@ def from_config_list(cls, gen_data: List[str]) -> Self: f"Missing or unsupported RESULT_FILE for GEN_DATA key {name!r}", name ) - report_steps = rangestring_to_list(options.get("REPORT_STEPS", "")) - report_steps = SortedList(report_steps) if report_steps else None + report_steps: Optional[List[int]] = rangestring_to_list( + options.get("REPORT_STEPS", "") + ) + report_steps = sorted(list(report_steps)) if report_steps else None if os.path.isabs(res_file): result_file_context = next( x for x in gen_data if x.startswith("RESULT_FILE:") diff --git a/src/ert/config/observations.py b/src/ert/config/observations.py index 0c5633b3fd0..f569b7c36de 100644 --- a/src/ert/config/observations.py +++ b/src/ert/config/observations.py @@ -123,7 +123,8 @@ def _handle_history_observation( if history_type is None: raise ValueError("Need a history type in order to use history observations") - response_config.keys.append(summary_key) + if summary_key not in response_config.keys: + response_config.keys.append(summary_key) error = history_observation["ERROR"] error_min = history_observation["ERROR_MIN"] error_mode = history_observation["ERROR_MODE"] @@ -323,7 +324,10 @@ def _handle_summary_observation( time_map: List[datetime], ) -> Dict[str, ObsVector]: summary_key = summary_dict["KEY"] - ensemble_config["summary"].keys.append(summary_key) # type: ignore + summary_config = ensemble_config["summary"] + assert isinstance(summary_config, SummaryConfig) + if summary_key not in summary_config.keys: + summary_config.keys.append(summary_key) value, std_dev = cls._make_value_and_std_dev(summary_dict) try: diff --git a/src/ert/config/parameter_config.py b/src/ert/config/parameter_config.py index 3e6c59953e5..4da60bdece9 100644 --- a/src/ert/config/parameter_config.py +++ b/src/ert/config/parameter_config.py @@ -22,6 +22,8 @@ def __init__(self, data: List[Tuple[Any, Any]]) -> None: for i, (key, value) in enumerate(data): if isinstance(value, Path): data[i] = (key, str(value)) + if isinstance(value, set): + data[i] = (key, list(value)) super().__init__(data) diff --git a/src/ert/config/response_config.py b/src/ert/config/response_config.py index 98c0660f2fe..6ebf3660191 100644 --- a/src/ert/config/response_config.py +++ b/src/ert/config/response_config.py @@ -1,8 +1,11 @@ import dataclasses from abc import ABC, abstractmethod +from typing import Any, Dict import xarray as xr +from ert.config.parameter_config import CustomDict + @dataclasses.dataclass class ResponseConfig(ABC): @@ -11,3 +14,8 @@ class ResponseConfig(ABC): @abstractmethod def read_from_file(self, run_path: str, iens: int) -> xr.Dataset: ... + + def to_dict(self) -> Dict[str, Any]: + data = dataclasses.asdict(self, dict_factory=CustomDict) + data["_ert_kind"] = self.__class__.__name__ + return data diff --git a/src/ert/config/summary_config.py b/src/ert/config/summary_config.py index 9605bcfa216..68c35089c72 100644 --- a/src/ert/config/summary_config.py +++ b/src/ert/config/summary_config.py @@ -3,7 +3,7 @@ import logging from dataclasses import dataclass from datetime import datetime -from typing import TYPE_CHECKING, Set +from typing import TYPE_CHECKING, Set, Union import xarray as xr from ecl.summary import EclSum @@ -23,7 +23,11 @@ class SummaryConfig(ResponseConfig): input_file: str keys: List[str] - refcase: Optional[Set[datetime]] = None + refcase: Union[Set[datetime], List[str], None] = None + + def __post_init__(self) -> None: + if isinstance(self.refcase, list): + self.refcase = {datetime.fromisoformat(val) for val in self.refcase} def read_from_file(self, run_path: str, iens: int) -> xr.Dataset: filename = self.input_file.replace("", str(iens)) @@ -42,6 +46,7 @@ def read_from_file(self, run_path: str, iens: int) -> xr.Dataset: c_time = summary.alloc_time_vector(True) time_map = [t.datetime() for t in c_time] if self.refcase: + assert isinstance(self.refcase, set) missing = self.refcase.difference(time_map) if missing: first, last = min(missing), max(missing) diff --git a/src/ert/enkf_main.py b/src/ert/enkf_main.py index 0f1caa98f60..0adcc13e18d 100644 --- a/src/ert/enkf_main.py +++ b/src/ert/enkf_main.py @@ -222,7 +222,6 @@ def loadFromForwardModel( run_context = self.ensemble_context(fs, realization, iteration) nr_loaded = fs.load_from_run_path( self.getEnsembleSize(), - self.ensembleConfig(), run_context.run_args, run_context.mask, ) diff --git a/src/ert/ensemble_evaluator/_builder/_step.py b/src/ert/ensemble_evaluator/_builder/_step.py index c6eaedbcdb1..737b7a6cb38 100644 --- a/src/ert/ensemble_evaluator/_builder/_step.py +++ b/src/ert/ensemble_evaluator/_builder/_step.py @@ -8,7 +8,6 @@ SOURCE_TEMPLATE_STEP = "/step/{step_id}" if TYPE_CHECKING: - from ert.config.ensemble_config import EnsembleConfig from ert.run_arg import RunArg @@ -27,7 +26,6 @@ class LegacyStep: name: str max_runtime: Optional[int] run_arg: "RunArg" - ensemble_config: "EnsembleConfig" num_cpu: int run_path: Path job_script: str diff --git a/src/ert/gui/ertwidgets/caselist.py b/src/ert/gui/ertwidgets/caselist.py index 85900667212..a903b396688 100644 --- a/src/ert/gui/ertwidgets/caselist.py +++ b/src/ert/gui/ertwidgets/caselist.py @@ -97,7 +97,8 @@ def addItem(self): new_case_name = dialog.showAndTell() if new_case_name != "": ensemble = self.storage.create_experiment( - parameters=self.facade.ensemble_config.parameter_configuration + parameters=self.facade.ensemble_config.parameter_configuration, + responses=self.facade.ensemble_config.response_configuration, ).create_ensemble( name=new_case_name, ensemble_size=self.facade.get_ensemble_size(), diff --git a/src/ert/gui/simulation/simulation_panel.py b/src/ert/gui/simulation/simulation_panel.py index 93d6927e67e..8ffb4e08605 100644 --- a/src/ert/gui/simulation/simulation_panel.py +++ b/src/ert/gui/simulation/simulation_panel.py @@ -146,7 +146,8 @@ def runSimulation(self): try: args = self.getSimulationArguments() experiment = self._notifier.storage.create_experiment( - parameters=self.ert.ensembleConfig().parameter_configuration + parameters=self.ert.ensembleConfig().parameter_configuration, + responses=self.ert.ensembleConfig().response_configuration, ) model = create_model( self.ert, diff --git a/src/ert/job_queue/job_queue_node.py b/src/ert/job_queue/job_queue_node.py index 67628ed177b..6c1a4782289 100644 --- a/src/ert/job_queue/job_queue_node.py +++ b/src/ert/job_queue/job_queue_node.py @@ -20,7 +20,6 @@ from .thread_status import ThreadStatus if TYPE_CHECKING: - from ..config import EnsembleConfig from ..run_arg import RunArg from .driver import Driver @@ -97,13 +96,11 @@ def __init__( status_file: str, exit_file: str, run_arg: "RunArg", - ensemble_config: "EnsembleConfig", max_runtime: Optional[int] = None, callback_timeout: Optional[Callable[[int], None]] = None, ): self.callback_timeout = callback_timeout self.run_arg = run_arg - self.ensemble_config = ensemble_config argc = 1 argv = StringList() argv.append(run_path) @@ -177,9 +174,7 @@ def submit(self, driver: "Driver") -> SubmitStatus: return self._submit(driver) def run_done_callback(self) -> Optional[LoadStatus]: - callback_status, status_msg = forward_model_ok( - self.run_arg, self.ensemble_config - ) + callback_status, status_msg = forward_model_ok(self.run_arg) if callback_status == LoadStatus.LOAD_SUCCESSFUL: self._set_queue_status(JobStatus.SUCCESS) elif callback_status == LoadStatus.TIME_MAP_FAILURE: diff --git a/src/ert/job_queue/queue.py b/src/ert/job_queue/queue.py index 9d2c45322fb..9b05609216a 100644 --- a/src/ert/job_queue/queue.py +++ b/src/ert/job_queue/queue.py @@ -41,7 +41,6 @@ from . import ResPrototype if TYPE_CHECKING: - from ert.config import ErtConfig from ert.ensemble_evaluator import LegacyStep from ert.run_arg import RunArg @@ -478,13 +477,12 @@ async def execute_queue_comms_via_bus( # pylint: disable=too-many-arguments def add_job_from_run_arg( self, run_arg: "RunArg", - ert_config: "ErtConfig", + job_script: str, max_runtime: Optional[int], num_cpu: int, ) -> None: job_name = run_arg.job_name run_path = run_arg.runpath - job_script = ert_config.queue_config.job_script job = JobQueueNode( job_script=job_script, @@ -494,7 +492,6 @@ def add_job_from_run_arg( status_file=self.status_file, exit_file=self.exit_file, run_arg=run_arg, - ensemble_config=ert_config.ensemble_config, max_runtime=max_runtime, ) @@ -515,7 +512,6 @@ def add_ee_stage( status_file=self.status_file, exit_file=self.exit_file, run_arg=stage.run_arg, - ensemble_config=stage.ensemble_config, max_runtime=stage.max_runtime, callback_timeout=callback_timeout, ) diff --git a/src/ert/run_models/base_run_model.py b/src/ert/run_models/base_run_model.py index cc5a006b120..f88d50c2df5 100644 --- a/src/ert/run_models/base_run_model.py +++ b/src/ert/run_models/base_run_model.py @@ -395,7 +395,6 @@ def _build_ensemble( name="legacy step", max_runtime=self.ert().analysisConfig().max_runtime, run_arg=run_arg, - ensemble_config=self.ert().resConfig().ensemble_config, num_cpu=self.ert().get_num_cpu(), run_path=Path(run_arg.runpath), job_script=self.ert().resConfig().queue_config.job_script, diff --git a/src/ert/simulator/batch_simulator.py b/src/ert/simulator/batch_simulator.py index a019af3cd41..2d9162015a5 100644 --- a/src/ert/simulator/batch_simulator.py +++ b/src/ert/simulator/batch_simulator.py @@ -3,7 +3,6 @@ from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union import numpy as np -from sortedcontainers import SortedList from ert.config import ErtConfig, ExtParamConfig, GenDataConfig from ert.enkf_main import EnKFMain @@ -112,9 +111,7 @@ def callback(*args, **kwargs): for key in results: ens_config.addNode( - GenDataConfig( - name=key, input_file=f"{key}_%d", report_steps=SortedList([0]) - ) + GenDataConfig(name=key, input_file=f"{key}_%d", report_steps=[0]) ) def _setup_sim( @@ -237,7 +234,8 @@ def start( batch complete before you start a new batch. """ experiment = storage.create_experiment( - self.ert_config.ensemble_config.parameter_configuration + parameters=self.ert_config.ensemble_config.parameter_configuration, + responses=self.ert_config.ensemble_config.response_configuration, ) ensemble = storage.create_ensemble( experiment.id, name=case_name, ensemble_size=self.ert.getEnsembleSize() diff --git a/src/ert/simulator/simulation_context.py b/src/ert/simulator/simulation_context.py index 91b6dfe700b..a4be80dc8d0 100644 --- a/src/ert/simulator/simulation_context.py +++ b/src/ert/simulator/simulation_context.py @@ -50,7 +50,7 @@ def _run_forward_model( continue job_queue.add_job_from_run_arg( run_arg, - ert.resConfig(), + ert.resConfig().queue_config.job_script, max_runtime, ert.get_num_cpu(), ) diff --git a/src/ert/storage/local_ensemble.py b/src/ert/storage/local_ensemble.py index 7b78c9300bf..03d8725c9f8 100644 --- a/src/ert/storage/local_ensemble.py +++ b/src/ert/storage/local_ensemble.py @@ -20,7 +20,6 @@ if TYPE_CHECKING: import numpy.typing as npt - from ert.config import EnsembleConfig from ert.run_arg import RunArg from ert.storage.local_experiment import ( LocalExperimentAccessor, @@ -34,7 +33,6 @@ def _load_realization( sim_fs: LocalEnsembleAccessor, realisation: int, - ensemble_config: EnsembleConfig, run_args: List[RunArg], ) -> Tuple[LoadResult, int]: sim_fs.update_realization_state( @@ -42,7 +40,7 @@ def _load_realization( [RealizationState.UNDEFINED], RealizationState.INITIALIZED, ) - result = forward_model_ok(run_args[realisation], ensemble_config) + result = forward_model_ok(run_args[realisation]) sim_fs.state_map[realisation] = ( RealizationState.HAS_DATA if result.status == LoadStatus.LOAD_SUCCESSFUL @@ -280,7 +278,6 @@ def sync(self) -> None: def load_from_run_path( self, ensemble_size: int, - ensemble_config: EnsembleConfig, run_args: List[RunArg], active_realizations: List[bool], ) -> int: @@ -290,7 +287,7 @@ def load_from_run_path( async_result = [ pool.apply_async( _load_realization, - (self, iens, ensemble_config, run_args), + (self, iens, run_args), ) for iens in range(ensemble_size) if active_realizations[iens] diff --git a/src/ert/storage/local_experiment.py b/src/ert/storage/local_experiment.py index e91065b8691..0d6d27e4c08 100644 --- a/src/ert/storage/local_experiment.py +++ b/src/ert/storage/local_experiment.py @@ -8,7 +8,15 @@ import numpy as np import xtgeo -from ert.config import ExtParamConfig, Field, GenKwConfig, SurfaceConfig +from ert.config import ( + ExtParamConfig, + Field, + GenDataConfig, + GenKwConfig, + SummaryConfig, + SurfaceConfig, +) +from ert.config.response_config import ResponseConfig if TYPE_CHECKING: from ert.config.parameter_config import ParameterConfig @@ -24,8 +32,15 @@ } +_KNOWN_RESPONSE_TYPES = { + SummaryConfig.__name__: SummaryConfig, + GenDataConfig.__name__: GenDataConfig, +} + + class LocalExperimentReader: _parameter_file = Path("parameter.json") + _responses_file = Path("responses.json") _simulation_arguments_file = Path("simulation_arguments.json") def __init__(self, storage: LocalStorageReader, uuid: UUID, path: Path) -> None: @@ -57,6 +72,16 @@ def parameter_info(self) -> Dict[str, Any]: info = json.load(f) return info + @property + def response_info(self) -> Dict[str, Any]: + info: Dict[str, Any] + path = self.mount_point / self._responses_file + if not path.exists(): + raise ValueError(f"{str(self._responses_file)} does not exist") + with open(path, encoding="utf-8", mode="r") as f: + info = json.load(f) + return info + def get_surface(self, name: str) -> xtgeo.RegularSurface: return xtgeo.surface_from_file( str(self.mount_point / f"{name}.irap"), @@ -72,14 +97,23 @@ def parameter_configuration(self) -> Dict[str, ParameterConfig]: params[data["name"]] = _KNOWN_PARAMETER_TYPES[param_type](**data) return params + @property + def response_configuration(self) -> Dict[str, ResponseConfig]: + params = {} + for data in self.response_info.values(): + param_type = data.pop("_ert_kind") + params[data["name"]] = _KNOWN_RESPONSE_TYPES[param_type](**data) + return params + class LocalExperimentAccessor(LocalExperimentReader): - def __init__( + def __init__( # pylint: disable=too-many-arguments self, storage: LocalStorageAccessor, uuid: UUID, path: Path, parameters: Optional[List[ParameterConfig]] = None, + responses: Optional[List[ResponseConfig]] = None, ) -> None: self._storage: LocalStorageAccessor = storage self._id = uuid @@ -100,6 +134,19 @@ def __init__( with open(self.mount_point / self._parameter_file, "w", encoding="utf-8") as f: json.dump(parameter_data, f) + responses = [] if responses is None else responses + response_file = self.mount_point / self._responses_file + response_data = ( + json.loads(response_file.read_text(encoding="utf-8")) + if response_file.exists() + else {} + ) + + for response in responses: + response_data.update({response.name: response.to_dict()}) + with open(response_file, "w", encoding="utf-8") as f: + json.dump(response_data, f, default=str) + @property def ensembles(self) -> Generator[LocalEnsembleAccessor, None, None]: yield from super().ensembles # type: ignore diff --git a/src/ert/storage/local_storage.py b/src/ert/storage/local_storage.py index ed0d64bfbf9..213d428e313 100644 --- a/src/ert/storage/local_storage.py +++ b/src/ert/storage/local_storage.py @@ -38,12 +38,12 @@ if TYPE_CHECKING: - from ert.config.parameter_config import ParameterConfig + from ert.config import ParameterConfig, ResponseConfig logger = logging.getLogger(__name__) -_LOCAL_STORAGE_VERSION = 2 +_LOCAL_STORAGE_VERSION = 3 class _Migrations(BaseModel): @@ -180,10 +180,21 @@ def __init__( block_fs.migrate(self.path) self._add_migration_information(0, "block_fs") elif version == 1: - from ert.storage.migration import gen_kw # pylint: disable=C0415 + from ert.storage.migration import ( # pylint: disable=C0415 + gen_kw, + response_info, + ) gen_kw.migrate(self.path) + response_info.migrate(self.path) self._add_migration_information(1, "gen_kw") + elif version == 2: + from ert.storage.migration import ( # pylint: disable=C0415 + response_info, + ) + + response_info.migrate(self.path) + self._add_migration_information(2, "response") except Exception as err: # pylint: disable=broad-exception-caught logger.error(f"Migrating storage at {self.path} failed with {err}") @@ -220,12 +231,16 @@ def to_accessor(self) -> LocalStorageAccessor: return self def create_experiment( - self, parameters: Optional[List[ParameterConfig]] = None + self, + parameters: Optional[List[ParameterConfig]] = None, + responses: Optional[List[ResponseConfig]] = None, ) -> LocalExperimentAccessor: exp_id = uuid4() path = self._experiment_path(exp_id) path.mkdir(parents=True, exist_ok=False) - exp = LocalExperimentAccessor(self, exp_id, path, parameters=parameters) + exp = LocalExperimentAccessor( + self, exp_id, path, parameters=parameters, responses=responses + ) self._experiments[exp.id] = exp return exp diff --git a/src/ert/storage/migration/block_fs.py b/src/ert/storage/migration/block_fs.py index 7b13452de5f..d48dae36f6b 100644 --- a/src/ert/storage/migration/block_fs.py +++ b/src/ert/storage/migration/block_fs.py @@ -17,8 +17,10 @@ from ert.config import ( EnsembleConfig, Field, + GenDataConfig, GenKwConfig, ParameterConfig, + ResponseConfig, SurfaceConfig, field_transform, ) @@ -95,12 +97,20 @@ def migrate_case(storage: StorageAccessor, path: Path) -> None: parameter_configs.extend(_migrate_gen_kw_info(data_file, ens_config)) parameter_configs.extend(_migrate_surface_info(data_file, ens_config)) + # Copy experiment response data + response_configs: List[ResponseConfig] = [] + for data_file in response_files: + response_configs.extend(_migrate_summary_info(data_file, ens_config)) + response_configs.extend(_migrate_gen_data_info(data_file, ens_config)) + # Guess iteration number iteration = 0 if (match := re.search(r"_(\d+)$", path.name)) is not None: iteration = int(match[1]) - experiment = storage.create_experiment(parameters=parameter_configs) + experiment = storage.create_experiment( + parameters=parameter_configs, responses=response_configs + ) ensemble = experiment.create_ensemble( name=path.name, ensemble_size=ensemble_size, @@ -256,6 +266,18 @@ def _migrate_field( ensemble.save_parameters(block.name, block.realization_index, ds) +def _migrate_summary_info( + data_file: DataFile, + ens_config: EnsembleConfig, +) -> List[ResponseConfig]: + seen = set() + for block in data_file.blocks(Kind.SUMMARY): + if block.name in seen: + continue + seen.add(block.name) + return [ens_config["summary"]] if seen else [] # type: ignore + + def _migrate_summary( ensemble: EnsembleAccessor, data_file: DataFile, @@ -291,6 +313,23 @@ def _migrate_summary( ensemble.save_response("summary", ds, realization_index) +def _migrate_gen_data_info( + data_file: DataFile, + ens_config: EnsembleConfig, +) -> List[ResponseConfig]: + seen = set() + configs: List[ResponseConfig] = [] + for block in data_file.blocks(Kind.GEN_DATA): + if block.name in seen: + continue + seen.add(block.name) + config = ens_config[block.name] + assert isinstance(config, GenDataConfig) + + configs.append(config) + return configs + + def _migrate_gen_data( ensemble: EnsembleAccessor, data_file: DataFile, diff --git a/src/ert/storage/migration/response_info.py b/src/ert/storage/migration/response_info.py new file mode 100644 index 00000000000..dd27292288d --- /dev/null +++ b/src/ert/storage/migration/response_info.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +import json +from typing import TYPE_CHECKING + +from ert.storage.local_storage import local_storage_get_ert_config + +if TYPE_CHECKING: + from pathlib import Path + + +def migrate(path: Path) -> None: + ert_config = local_storage_get_ert_config() + ens_config = ert_config.ensemble_config + for experiment in path.glob("experiments/*"): + response_info = {} + for response in ens_config.response_configuration: + response_info[response.name] = response.to_dict() + with open(experiment / "responses.json", "w", encoding="utf-8") as fout: + fout.write(json.dumps(response_info, default=str)) + with open(path / "index.json", encoding="utf-8") as f: + index_json = json.load(f) + index_json["version"] = 3 + with open(path / "index.json", "w", encoding="utf-8") as f: + f.write(json.dumps(index_json)) diff --git a/test-data/block_storage b/test-data/block_storage index aac08ccf968..84305c7fb68 160000 --- a/test-data/block_storage +++ b/test-data/block_storage @@ -1 +1 @@ -Subproject commit aac08ccf9687e507d38e3de6c386d3014b5e9865 +Subproject commit 84305c7fb683c8aa49d2e46723307721447658fa diff --git a/tests/unit_tests/ensemble_evaluator/conftest.py b/tests/unit_tests/ensemble_evaluator/conftest.py index 7ff87201b32..288a4ba383f 100644 --- a/tests/unit_tests/ensemble_evaluator/conftest.py +++ b/tests/unit_tests/ensemble_evaluator/conftest.py @@ -1,7 +1,6 @@ import json import os import stat -from dataclasses import dataclass from pathlib import Path from unittest.mock import Mock @@ -71,7 +70,7 @@ def _make_ensemble_builder(monkeypatch, tmpdir, num_reals, num_jobs, job_sleep=0 monkeypatch.setattr( ert.job_queue.job_queue_node, "forward_model_ok", - lambda _, _b: (LoadStatus.LOAD_SUCCESSFUL, ""), + lambda _: (LoadStatus.LOAD_SUCCESSFUL, ""), ) monkeypatch.setattr( JobQueueNode, "run_exit_callback", lambda _: (LoadStatus.LOAD_FAILURE, "") @@ -106,10 +105,6 @@ def _make_ensemble_builder(monkeypatch, tmpdir, num_reals, num_jobs, job_sleep=0 ) ) - @dataclass - class RunArg: - iens: int - for iens in range(0, num_reals): run_path = Path(tmpdir / f"real_{iens}") os.mkdir(run_path) @@ -133,7 +128,6 @@ class RunArg: run_arg=Mock(iens=iens), # the first callback_argument is expected to be a run_arg # from the run_arg, the queue wants to access the iens prop - ensemble_config=None, run_path=run_path, num_cpu=1, name="dummy step", diff --git a/tests/unit_tests/ensemble_evaluator/ensemble_evaluator_utils.py b/tests/unit_tests/ensemble_evaluator/ensemble_evaluator_utils.py index fb7f7cfd52d..69451551892 100644 --- a/tests/unit_tests/ensemble_evaluator/ensemble_evaluator_utils.py +++ b/tests/unit_tests/ensemble_evaluator/ensemble_evaluator_utils.py @@ -71,7 +71,6 @@ def __init__(self, _iter, reals, steps, jobs, id_): ], name=f"step-{step_no}", max_runtime=0, - ensemble_config=None, num_cpu=0, run_path=None, run_arg=None, diff --git a/tests/unit_tests/ensemble_evaluator/test_ensemble_builder.py b/tests/unit_tests/ensemble_evaluator/test_ensemble_builder.py index 2f9ef599a46..8562dd94936 100644 --- a/tests/unit_tests/ensemble_evaluator/test_ensemble_builder.py +++ b/tests/unit_tests/ensemble_evaluator/test_ensemble_builder.py @@ -29,7 +29,6 @@ def test_build_ensemble(active_real): job_script="job_script", job_name="job_name", num_cpu=1, - ensemble_config=MagicMock(), jobs=[ LegacyJob( ext_job=MagicMock(), diff --git a/tests/unit_tests/job_queue/conftest.py b/tests/unit_tests/job_queue/conftest.py new file mode 100644 index 00000000000..21d14da146a --- /dev/null +++ b/tests/unit_tests/job_queue/conftest.py @@ -0,0 +1,43 @@ +import stat +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +import ert +from ert.load_status import LoadStatus + + +@pytest.fixture +def mock_fm_ok(monkeypatch): + fm_ok = MagicMock(return_value=(LoadStatus.LOAD_SUCCESSFUL, "")) + monkeypatch.setattr(ert.job_queue.job_queue_node, "forward_model_ok", fm_ok) + yield fm_ok + + +@pytest.fixture +def simple_script(tmp_path): + SIMPLE_SCRIPT = """#!/bin/sh +echo "finished successfully" > STATUS +""" + fout = Path(tmp_path / "job_script") + fout.write_text(SIMPLE_SCRIPT, encoding="utf-8") + fout.chmod(stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG) + yield str(fout) + + +@pytest.fixture +def failing_script(tmp_path): + """ + This script is susceptible to race conditions. Python works + better than sh.""" + FAILING_SCRIPT = """#!/usr/bin/env python +import sys +with open("one_byte_pr_invocation", "a") as f: + f.write(".") +sys.exit(1) + """ + fout = Path(tmp_path / "failing_script") + fout.write_text(FAILING_SCRIPT, encoding="utf-8") + fout.chmod(stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG) + yield str(fout) diff --git a/tests/unit_tests/job_queue/test_job_queue.py b/tests/unit_tests/job_queue/test_job_queue.py index d59ff634d5e..4822b1dd80f 100644 --- a/tests/unit_tests/job_queue/test_job_queue.py +++ b/tests/unit_tests/job_queue/test_job_queue.py @@ -1,16 +1,15 @@ import json import stat import time -from dataclasses import dataclass from pathlib import Path from threading import BoundedSemaphore from typing import Any, Callable, Dict, Optional -from unittest.mock import patch +from unittest.mock import MagicMock, patch + +import pytest -import ert.callbacks from ert.config import QueueSystem from ert.job_queue import Driver, JobQueue, JobQueueNode, JobStatus -from ert.load_status import LoadStatus def wait_for( @@ -28,72 +27,46 @@ def wait_for( ) -def dummy_exit_callback(*args): - print(args) - - DUMMY_CONFIG: Dict[str, Any] = { "job_script": "job_script.py", "num_cpu": 1, "job_name": "dummy_job_{}", "run_path": "dummy_path_{}", - "ok_callback": lambda _, _b: (LoadStatus.LOAD_SUCCESSFUL, ""), - "exit_callback": dummy_exit_callback, } -SIMPLE_SCRIPT = """#!/usr/bin/env python -print('hello') -""" -NEVER_ENDING_SCRIPT = """#!/usr/bin/env python +@pytest.fixture +def never_ending_script(tmp_path): + NEVER_ENDING_SCRIPT = """#!/usr/bin/env python import time while True: time.sleep(0.5) -""" - -FAILING_SCRIPT = """#!/usr/bin/env python -import sys -sys.exit(1) -""" - - -@dataclass -class RunArg: - iens: int + """ + fout = Path(tmp_path / "never_ending_job_script") + fout.write_text(NEVER_ENDING_SCRIPT, encoding="utf-8") + fout.chmod(stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG) + yield str(fout) def create_local_queue( - monkeypatch, executable_script: str, max_submit: int = 1, max_runtime: Optional[int] = None, callback_timeout: Optional["Callable[[int], None]"] = None, ): - monkeypatch.setattr( - ert.job_queue.job_queue_node, "forward_model_ok", DUMMY_CONFIG["ok_callback"] - ) - monkeypatch.setattr( - JobQueueNode, "run_exit_callback", DUMMY_CONFIG["exit_callback"] - ) - driver = Driver(driver_type=QueueSystem.LOCAL) job_queue = JobQueue(driver, max_submit=max_submit) - scriptpath = Path(DUMMY_CONFIG["job_script"]) - scriptpath.write_text(executable_script, encoding="utf-8") - scriptpath.chmod(stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG) - for iens in range(10): Path(DUMMY_CONFIG["run_path"].format(iens)).mkdir(exist_ok=False) job = JobQueueNode( - job_script=DUMMY_CONFIG["job_script"], + job_script=executable_script, job_name=DUMMY_CONFIG["job_name"].format(iens), run_path=DUMMY_CONFIG["run_path"].format(iens), num_cpu=DUMMY_CONFIG["num_cpu"], status_file=job_queue.status_file, exit_file=job_queue.exit_file, - run_arg=RunArg(iens), - ensemble_config=None, + run_arg=MagicMock(), max_runtime=max_runtime, callback_timeout=callback_timeout, ) @@ -110,9 +83,9 @@ def start_all(job_queue, sema_pool): job = job_queue.fetch_next_waiting() -def test_kill_jobs(tmpdir, monkeypatch): +def test_kill_jobs(tmpdir, monkeypatch, never_ending_script): monkeypatch.chdir(tmpdir) - job_queue = create_local_queue(monkeypatch, NEVER_ENDING_SCRIPT) + job_queue = create_local_queue(never_ending_script) assert job_queue.queue_size == 10 assert job_queue.is_active() @@ -141,9 +114,9 @@ def test_kill_jobs(tmpdir, monkeypatch): job.wait_for() -def test_add_jobs(tmpdir, monkeypatch): +def test_add_jobs(tmpdir, monkeypatch, simple_script): monkeypatch.chdir(tmpdir) - job_queue = create_local_queue(monkeypatch, SIMPLE_SCRIPT) + job_queue = create_local_queue(simple_script) assert job_queue.queue_size == 10 assert job_queue.is_active() @@ -161,9 +134,9 @@ def test_add_jobs(tmpdir, monkeypatch): job.wait_for() -def test_failing_jobs(tmpdir, monkeypatch): +def test_failing_jobs(tmpdir, monkeypatch, failing_script): monkeypatch.chdir(tmpdir) - job_queue = create_local_queue(monkeypatch, FAILING_SCRIPT, max_submit=1) + job_queue = create_local_queue(failing_script, max_submit=1) assert job_queue.queue_size == 10 assert job_queue.is_active() @@ -187,20 +160,17 @@ def test_failing_jobs(tmpdir, monkeypatch): assert job_queue.snapshot()[iens] == str(JobStatus.FAILED) -def test_timeout_jobs(tmpdir, monkeypatch): +def test_timeout_jobs(tmpdir, monkeypatch, never_ending_script): monkeypatch.chdir(tmpdir) job_numbers = set() - def callback(iens): - nonlocal job_numbers - job_numbers.add(iens) + mock_callback = MagicMock() job_queue = create_local_queue( - monkeypatch, - NEVER_ENDING_SCRIPT, + never_ending_script, max_submit=1, max_runtime=5, - callback_timeout=callback, + callback_timeout=mock_callback, ) assert job_queue.queue_size == 10 @@ -223,15 +193,15 @@ def callback(iens): iens = job_queue._differ.qindex_to_iens(q_index) assert job_queue.snapshot()[iens] == str(JobStatus.IS_KILLED) - assert job_numbers == set(range(10)) + assert len(mock_callback.mock_calls) == 20 for job in job_queue.job_list: job.wait_for() -def test_add_dispatch_info(tmpdir, monkeypatch): +def test_add_dispatch_info(tmpdir, monkeypatch, simple_script): monkeypatch.chdir(tmpdir) - job_queue = create_local_queue(monkeypatch, SIMPLE_SCRIPT) + job_queue = create_local_queue(simple_script) ens_id = "some_id" cert = "My very nice cert" token = "my_super_secret_token" @@ -260,9 +230,9 @@ def test_add_dispatch_info(tmpdir, monkeypatch): assert (runpath / cert_file).read_text(encoding="utf-8") == cert -def test_add_dispatch_info_cert_none(tmpdir, monkeypatch): +def test_add_dispatch_info_cert_none(tmpdir, monkeypatch, simple_script): monkeypatch.chdir(tmpdir) - job_queue = create_local_queue(monkeypatch, SIMPLE_SCRIPT) + job_queue = create_local_queue(simple_script) ens_id = "some_id" dispatch_url = "wss://example.org" cert = None diff --git a/tests/unit_tests/job_queue/test_job_queue_manager.py b/tests/unit_tests/job_queue/test_job_queue_manager.py index fbf7c963f9b..64979b024da 100644 --- a/tests/unit_tests/job_queue/test_job_queue_manager.py +++ b/tests/unit_tests/job_queue/test_job_queue_manager.py @@ -1,62 +1,28 @@ import os import stat -from dataclasses import dataclass from pathlib import Path from threading import BoundedSemaphore -from typing import Callable, List, TypedDict +from typing import List, TypedDict +from unittest.mock import MagicMock import pytest -import ert.callbacks from ert.config import QueueSystem from ert.job_queue import Driver, JobQueue, JobQueueManager, JobQueueNode, JobStatus -from ert.load_status import LoadStatus - - -@dataclass -class RunArg: - iens: int class Config(TypedDict): - job_script: str num_cpu: int job_name: str run_path: str - ok_callback: Callable - exit_callback: Callable - - -def dummy_ok_callback(runarg, path): - (Path(path) / "OK").write_text("success", encoding="utf-8") - return (LoadStatus.LOAD_SUCCESSFUL, "") - - -def dummy_exit_callback(self): - Path("ERROR").write_text("failure", encoding="utf-8") DUMMY_CONFIG: Config = { - "job_script": "job_script.py", "num_cpu": 1, "job_name": "dummy_job_{}", "run_path": "dummy_path_{}", - "ok_callback": dummy_ok_callback, - "exit_callback": dummy_exit_callback, } -SIMPLE_SCRIPT = """#!/bin/sh -echo "finished successfully" > STATUS -""" - -# This script is susceptible to race conditions. Python works -# better than sh. -FAILING_SCRIPT = """#!/usr/bin/env python -import sys -with open("one_byte_pr_invocation", "a") as f: - f.write(".") -sys.exit(1) -""" MOCK_BSUB = """#!/bin/sh echo "$@" > test.out @@ -67,55 +33,33 @@ def dummy_exit_callback(self): def create_local_queue( - monkeypatch, executable_script: str, max_submit: int = 2, num_realizations: int = 10 + executable_script: str, max_submit: int = 2, num_realizations: int = 10 ): - monkeypatch.setattr( - ert.job_queue.job_queue_node, "forward_model_ok", DUMMY_CONFIG["ok_callback"] - ) - monkeypatch.setattr( - JobQueueNode, "run_exit_callback", DUMMY_CONFIG["exit_callback"] - ) - driver = Driver(driver_type=QueueSystem.LOCAL) job_queue = JobQueue(driver, max_submit=max_submit) - scriptpath = Path(DUMMY_CONFIG["job_script"]) - scriptpath.write_text(executable_script, encoding="utf-8") - scriptpath.chmod(stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG) - for iens in range(num_realizations): Path(DUMMY_CONFIG["run_path"].format(iens)).mkdir() job = JobQueueNode( - job_script=DUMMY_CONFIG["job_script"], + job_script=executable_script, job_name=DUMMY_CONFIG["job_name"].format(iens), run_path=os.path.realpath(DUMMY_CONFIG["run_path"].format(iens)), num_cpu=DUMMY_CONFIG["num_cpu"], status_file=job_queue.status_file, exit_file=job_queue.exit_file, - run_arg=RunArg(iens), - ensemble_config=Path(DUMMY_CONFIG["run_path"].format(iens)).resolve(), + run_arg=MagicMock(), ) job_queue.add_job(job, iens) return job_queue -def test_num_cpu_submitted_correctly_lsf(tmpdir, monkeypatch): +@pytest.mark.usefixtures("use_tmpdir", "mock_fm_ok") +def test_num_cpu_submitted_correctly_lsf(tmpdir, simple_script): """Assert that num_cpu from the ERT configuration is passed on to the bsub command used to submit jobs to LSF""" - monkeypatch.setattr( - ert.job_queue.job_queue_node, "forward_model_ok", DUMMY_CONFIG["ok_callback"] - ) - monkeypatch.setattr( - JobQueueNode, "run_exit_callback", DUMMY_CONFIG["exit_callback"] - ) - monkeypatch.chdir(tmpdir) os.putenv("PATH", os.getcwd() + ":" + os.getenv("PATH")) driver = Driver(driver_type=QueueSystem.LSF) - script = Path(DUMMY_CONFIG["job_script"]) - script.write_text(SIMPLE_SCRIPT, encoding="utf-8") - script.chmod(stat.S_IRWXU) - bsub = Path("bsub") bsub.write_text(MOCK_BSUB, encoding="utf-8") bsub.chmod(stat.S_IRWXU) @@ -125,14 +69,13 @@ def test_num_cpu_submitted_correctly_lsf(tmpdir, monkeypatch): os.mkdir(DUMMY_CONFIG["run_path"].format(job_id)) job = JobQueueNode( - job_script=DUMMY_CONFIG["job_script"], + job_script=simple_script, job_name=DUMMY_CONFIG["job_name"].format(job_id), run_path=os.path.realpath(DUMMY_CONFIG["run_path"].format(job_id)), num_cpu=4, status_file="STATUS", exit_file="ERROR", - run_arg=RunArg(iens=job_id), - ensemble_config=Path(DUMMY_CONFIG["run_path"].format(job_id)).resolve(), + run_arg=MagicMock(), ) pool_sema = BoundedSemaphore(value=2) @@ -153,25 +96,23 @@ def test_num_cpu_submitted_correctly_lsf(tmpdir, monkeypatch): assert found_cpu_arg is True -def test_execute_queue(tmpdir, monkeypatch): +def test_execute_queue(tmpdir, monkeypatch, mock_fm_ok, simple_script): monkeypatch.chdir(tmpdir) - job_queue = create_local_queue(monkeypatch, SIMPLE_SCRIPT) + job_queue = create_local_queue(simple_script) manager = JobQueueManager(job_queue) manager.execute_queue() - for job in job_queue.job_list: - assert (Path(job.run_path) / "OK").read_text(encoding="utf-8") == "success" + assert len(mock_fm_ok.mock_calls) == len(job_queue.job_list) @pytest.mark.parametrize("max_submit_num", [1, 2, 3]) -def test_max_submit_reached(tmpdir, max_submit_num, monkeypatch): +def test_max_submit_reached(tmpdir, max_submit_num, monkeypatch, failing_script): """Check that the JobQueueManager will submit exactly the maximum number of resubmissions in the case of scripts that fail.""" monkeypatch.chdir(tmpdir) num_realizations = 2 job_queue = create_local_queue( - monkeypatch, - FAILING_SCRIPT, + failing_script, max_submit=max_submit_num, num_realizations=num_realizations, ) @@ -193,11 +134,9 @@ def test_max_submit_reached(tmpdir, max_submit_num, monkeypatch): @pytest.mark.parametrize("max_submit_num", [1, 2, 3]) -def test_kill_queue(tmpdir, max_submit_num, monkeypatch): +def test_kill_queue(tmpdir, max_submit_num, monkeypatch, simple_script): monkeypatch.chdir(tmpdir) - job_queue = create_local_queue( - monkeypatch, SIMPLE_SCRIPT, max_submit=max_submit_num - ) + job_queue = create_local_queue(simple_script, max_submit=max_submit_num) manager = JobQueueManager(job_queue) job_queue.kill_all_jobs() manager.execute_queue() diff --git a/tests/unit_tests/job_queue/test_job_queue_manager_torque.py b/tests/unit_tests/job_queue/test_job_queue_manager_torque.py index a0b2fd255c2..76a89f78897 100644 --- a/tests/unit_tests/job_queue/test_job_queue_manager_torque.py +++ b/tests/unit_tests/job_queue/test_job_queue_manager_torque.py @@ -1,16 +1,14 @@ import os import stat -from dataclasses import dataclass from pathlib import Path from threading import BoundedSemaphore -from typing import Callable, TypedDict +from typing import TypedDict +from unittest.mock import MagicMock import pytest -import ert.job_queue.job_queue_node from ert.config import QueueSystem from ert.job_queue import Driver, JobQueueNode, JobStatus -from ert.load_status import LoadStatus @pytest.fixture(name="temp_working_directory") @@ -22,48 +20,17 @@ def fixture_temp_working_directory(tmpdir, monkeypatch): @pytest.fixture(name="dummy_config") def fixture_dummy_config(): return JobConfig( - { - "job_script": "job_script.py", - "num_cpu": 1, - "job_name": "dummy_job_{}", - "run_path": "dummy_path_{}", - "ok_callback": dummy_ok_callback, - "exit_callback": dummy_exit_callback, - } + num_cpu=1, + job_name="dummy_job_{}", + run_path="dummy_path_{}", ) -@dataclass -class RunArg: - iens: int - - class JobConfig(TypedDict): - job_script: str num_cpu: int job_name: str run_path: str - ok_callback: Callable - exit_callback: Callable - - -def dummy_ok_callback(runargs, path): - (Path(path) / "OK").write_text("success", encoding="utf-8") - return (LoadStatus.LOAD_SUCCESSFUL, "") - - -def dummy_exit_callback(*_args): - Path("ERROR").write_text("failure", encoding="utf-8") - -SIMPLE_SCRIPT = """#!/bin/sh -echo "finished successfully" > STATUS -""" - -FAILING_FORWARD_MODEL = """#!/usr/bin/env python -import sys -sys.exit(1) -""" MOCK_QSUB = """#!/bin/sh echo "torque job submitted" > job_output @@ -143,30 +110,23 @@ def _deploy_script(scriptname: Path, scripttext: str): script.chmod(stat.S_IRWXU) -def _build_jobqueuenode(monkeypatch, dummy_config: JobConfig, job_id=0): - monkeypatch.setattr( - ert.job_queue.job_queue_node, "forward_model_ok", dummy_config["ok_callback"] - ) - monkeypatch.setattr( - JobQueueNode, "run_exit_callback", dummy_config["exit_callback"] - ) - +def _build_jobqueuenode(job_script, dummy_config: JobConfig, job_id=0): runpath = Path(dummy_config["run_path"].format(job_id)) runpath.mkdir() job = JobQueueNode( - job_script=dummy_config["job_script"], + job_script=job_script, job_name=dummy_config["job_name"].format(job_id), run_path=os.path.realpath(dummy_config["run_path"].format(job_id)), num_cpu=1, status_file="STATUS", exit_file="ERROR", - run_arg=RunArg(iens=job_id), - ensemble_config=Path(dummy_config["run_path"].format(job_id)).resolve(), + run_arg=MagicMock(), ) - return (job, runpath) + return job, runpath +@pytest.mark.usefixtures("use_tmpdir") @pytest.mark.parametrize( "qsub_script, qstat_script", [ @@ -183,7 +143,12 @@ def _build_jobqueuenode(monkeypatch, dummy_config: JobConfig, job_id=0): ], ) def test_run_torque_job( - monkeypatch, temp_working_directory, dummy_config, qsub_script, qstat_script + temp_working_directory, + dummy_config, + qsub_script, + qstat_script, + mock_fm_ok, + simple_script, ): """Verify that the torque driver will succeed in submitting and monitoring torque jobs even when the Torque commands qsub and qstat @@ -192,7 +157,6 @@ def test_run_torque_job( A flaky torque command is a shell script that sometimes but not always returns with a non-zero exit code.""" - _deploy_script(dummy_config["job_script"], SIMPLE_SCRIPT) _deploy_script("qsub", qsub_script) _deploy_script("qstat", qstat_script) @@ -201,7 +165,7 @@ def test_run_torque_job( options=[("QSTAT_CMD", temp_working_directory / "qstat")], ) - (job, runpath) = _build_jobqueuenode(monkeypatch, dummy_config) + job, runpath = _build_jobqueuenode(simple_script, dummy_config) job.run(driver, BoundedSemaphore()) job.wait_for() @@ -210,24 +174,24 @@ def test_run_torque_job( assert Path("job_output").exists() # The "done" callback: - assert (runpath / "OK").read_text(encoding="utf-8") == "success" + mock_fm_ok.assert_called() +@pytest.mark.usefixtures("use_tmpdir") @pytest.mark.parametrize( "user_qstat_option, expected_options", [("", "-f 10001"), ("-x", "-f -x 10001"), ("-f", "-f -f 10001")], ) def test_that_torque_driver_passes_options_to_qstat( - monkeypatch, temp_working_directory, dummy_config, user_qstat_option, expected_options, + simple_script, ): """The driver supports setting options to qstat, but the hard-coded -f option is always there.""" - _deploy_script(dummy_config["job_script"], SIMPLE_SCRIPT) _deploy_script("qsub", MOCK_QSUB) _deploy_script( "qstat", @@ -245,13 +209,14 @@ def test_that_torque_driver_passes_options_to_qstat( ], ) - job, _runpath = _build_jobqueuenode(monkeypatch, dummy_config) + job, _runpath = _build_jobqueuenode(simple_script, dummy_config) job.run(driver, BoundedSemaphore()) job.wait_for() assert Path("qstat_options").read_text(encoding="utf-8").strip() == expected_options +@pytest.mark.usefixtures("mock_fm_ok", "use_tmpdir") @pytest.mark.parametrize( "job_state, exit_status, expected_status", [ @@ -264,14 +229,13 @@ def test_that_torque_driver_passes_options_to_qstat( ], ) def test_torque_job_status_from_qstat_output( - monkeypatch, temp_working_directory, dummy_config, job_state, exit_status, expected_status, + simple_script, ): - _deploy_script(dummy_config["job_script"], SIMPLE_SCRIPT) _deploy_script("qsub", MOCK_QSUB) _deploy_script( "qstat", @@ -284,7 +248,7 @@ def test_torque_job_status_from_qstat_output( options=[("QSTAT_CMD", temp_working_directory / "qstat")], ) - job, _runpath = _build_jobqueuenode(monkeypatch, dummy_config) + job, _runpath = _build_jobqueuenode(simple_script, dummy_config) pool_sema = BoundedSemaphore(value=2) job.run(driver, pool_sema) diff --git a/tests/unit_tests/scenarios/test_summary_response.py b/tests/unit_tests/scenarios/test_summary_response.py index 2e296b948dd..eb4ed92a318 100644 --- a/tests/unit_tests/scenarios/test_summary_response.py +++ b/tests/unit_tests/scenarios/test_summary_response.py @@ -18,7 +18,8 @@ def prior_ensemble(storage, setup_configuration): ert_config = setup_configuration.ert_config return storage.create_experiment( - parameters=ert_config.ensemble_config.parameter_configuration + parameters=ert_config.ensemble_config.parameter_configuration, + responses=ert_config.ensemble_config.response_configuration, ).create_ensemble(ensemble_size=100, name="prior") diff --git a/tests/unit_tests/status/test_tracking_integration.py b/tests/unit_tests/status/test_tracking_integration.py index e1297aef97c..f559638ca46 100644 --- a/tests/unit_tests/status/test_tracking_integration.py +++ b/tests/unit_tests/status/test_tracking_integration.py @@ -185,7 +185,8 @@ def test_tracking( os.chdir(ert_config.config_path) ert = EnKFMain(ert_config) experiment_id = storage.create_experiment( - ert.ensembleConfig().parameter_configuration + parameters=ert.ensembleConfig().parameter_configuration, + responses=ert.ensembleConfig().response_configuration, ) model = create_model( @@ -406,7 +407,8 @@ def test_tracking_missing_ecl( storage, parsed, storage.create_experiment( - ert_config.ensemble_config.parameter_configuration + parameters=ert_config.ensemble_config.parameter_configuration, + responses=ert_config.ensemble_config.response_configuration, ), ) diff --git a/tests/unit_tests/storage/migration/test_version_1.py b/tests/unit_tests/storage/migration/test_version_1.py new file mode 100644 index 00000000000..dcf47525c76 --- /dev/null +++ b/tests/unit_tests/storage/migration/test_version_1.py @@ -0,0 +1,27 @@ +import json + +import pytest + +from ert.config import ErtConfig +from ert.storage import open_storage +from ert.storage.local_storage import local_storage_set_ert_config + + +@pytest.fixture(scope="module", autouse=True) +def set_ert_config(block_storage_path): + ert_config = ErtConfig.from_file( + str(block_storage_path / "version-1/poly_example/poly.ert") + ) + yield local_storage_set_ert_config(ert_config) + local_storage_set_ert_config(None) + + +def test_migrate_gen_kw(setup_case, set_ert_config): + setup_case("block_storage/version-1/poly_example", "poly.ert") + with open_storage("storage", "w") as storage: + assert len(list(storage.experiments)) == 1 + experiment = list(storage.experiments)[0] + param_info = json.loads( + (experiment._path / "parameter.json").read_text(encoding="utf-8") + ) + assert "COEFFS" in param_info diff --git a/tests/unit_tests/storage/migration/test_version_2.py b/tests/unit_tests/storage/migration/test_version_2.py new file mode 100644 index 00000000000..d3ce13f248c --- /dev/null +++ b/tests/unit_tests/storage/migration/test_version_2.py @@ -0,0 +1,36 @@ +import json + +import pytest + +from ert.config import ErtConfig +from ert.storage import open_storage +from ert.storage.local_storage import local_storage_set_ert_config + + +@pytest.fixture(scope="module", autouse=True) +def set_ert_config(block_storage_path): + ert_config = ErtConfig.from_file( + str(block_storage_path / "version-2/snake_oil/snake_oil.ert") + ) + yield local_storage_set_ert_config(ert_config) + local_storage_set_ert_config(None) + + +def test_migrate_gen_kw(setup_case, set_ert_config): + ert_config = setup_case("block_storage/version-2/snake_oil", "snake_oil.ert") + with open_storage(ert_config.ens_path, "w") as storage: + assert len(list(storage.experiments)) == 1 + experiment = list(storage.experiments)[0] + response_info = json.loads( + (experiment._path / "responses.json").read_text(encoding="utf-8") + ) + assert ( + list(experiment.response_configuration.values()) + == ert_config.ensemble_config.response_configuration + ) + assert list(response_info) == [ + "SNAKE_OIL_OPR_DIFF", + "SNAKE_OIL_WPR_DIFF", + "SNAKE_OIL_GPR_DIFF", + "summary", + ] diff --git a/tests/unit_tests/storage/test_local_storage.py b/tests/unit_tests/storage/test_local_storage.py index 0f85e692ff2..bb1845cefa1 100644 --- a/tests/unit_tests/storage/test_local_storage.py +++ b/tests/unit_tests/storage/test_local_storage.py @@ -1,5 +1,8 @@ +import hypothesis.strategies as st import pytest +from hypothesis import given +from ert.config import GenDataConfig, SummaryConfig from ert.storage import StorageReader, open_storage from ert.storage import local_storage as local @@ -98,3 +101,66 @@ def test_to_accessor(tmp_path): with open_storage(tmp_path, mode="w") as storage_accessor: storage_reader: StorageReader = storage_accessor storage_reader.to_accessor() + + +@pytest.fixture(scope="module") +def shared_storage(tmp_path_factory): + yield tmp_path_factory.mktemp("storage") / "serialize" + + +@given(name=st.text(), input_file=st.text()) +def test_serialize_deserialize_gen_data_responses(shared_storage, name, input_file): + responses = [GenDataConfig(name=name, input_file=input_file)] + with open_storage(shared_storage, "w") as storage: + experiment = storage.create_experiment( + responses=responses, + ) + storage.create_ensemble(experiment, ensemble_size=5) + with open_storage(shared_storage) as storage: + assert ( + list(storage.get_experiment(experiment.id).response_configuration.values()) + == responses + ) + + +@st.composite +def refcase(draw): + datetimes = draw(st.lists(st.datetimes())) + container_type = draw(st.sampled_from([set(), list(), None])) + if isinstance(container_type, set): + return set(datetimes) + elif isinstance(container_type, list): + return [str(date) for date in datetimes] + return None + + +@given(refcase()) +def test_refcase_conversion_to_set(refcase): + SummaryConfig(name="name", input_file="input_file", keys=["keys"], refcase=refcase) + assert isinstance(SummaryConfig.refcase, set) or SummaryConfig.refcase is None + + +@given( + name=st.text(), + input_file=st.text(), + keys=st.lists(st.text()), + refcase=st.sets(st.datetimes()), +) +def test_serialize_deserialize_summary_responses( + shared_storage, name, input_file, keys, refcase +): + if isinstance(refcase, list): + refcase = [str(date) for date in refcase] + responses = [ + SummaryConfig(name=name, input_file=input_file, keys=keys, refcase=refcase) + ] + with open_storage(shared_storage, "w") as storage: + experiment = storage.create_experiment( + responses=responses, + ) + storage.create_ensemble(experiment, ensemble_size=5) + with open_storage(shared_storage) as storage: + assert ( + list(storage.get_experiment(experiment.id).response_configuration.values()) + == responses + ) diff --git a/tests/unit_tests/test_load_forward_model.py b/tests/unit_tests/test_load_forward_model.py index f5e9316fd32..006b4778fc4 100644 --- a/tests/unit_tests/test_load_forward_model.py +++ b/tests/unit_tests/test_load_forward_model.py @@ -16,6 +16,28 @@ from ert.storage import open_storage +@pytest.fixture() +@pytest.mark.usefixtures("use_tmpdir") +def setup_case(storage): + def func(config_text): + Path("config.ert").write_text(config_text, encoding="utf-8") + + ert_config = ErtConfig.from_file("config.ert") + ert = EnKFMain(ert_config) + prior_ensemble = storage.create_ensemble( + storage.create_experiment( + responses=ert_config.ensemble_config.response_configuration + ), + name="prior", + ensemble_size=ert.getEnsembleSize(), + ) + run_context = ert.ensemble_context(prior_ensemble, [True], iteration=0) + ert.createRunPath(run_context) + return ert, prior_ensemble + + yield func + + def run_simulator(time_step_count, start_date) -> EclSum: ecl_sum = EclSum.writer("SNAKE_OIL_FIELD", start_date, 10, 10, 10) @@ -127,9 +149,7 @@ def test_load_forward_model(snake_oil_default_storage): ), ], ) -def test_load_forward_model_summary( - summary_configuration, prior_ensemble, expected, caplog -): +def test_load_forward_model_summary(summary_configuration, storage, expected, caplog): config_text = dedent( """ NUM_REALIZATIONS 1 @@ -146,6 +166,12 @@ def test_load_forward_model_summary( ert_config = ErtConfig.from_file("config.ert") ert = EnKFMain(ert_config) + experiment_id = storage.create_experiment( + responses=ert_config.ensemble_config.response_configuration + ) + prior_ensemble = storage.create_ensemble( + experiment_id, name="prior", ensemble_size=100 + ) run_context = ert.ensemble_context(prior_ensemble, [True], iteration=0) ert.createRunPath(run_context) @@ -159,20 +185,15 @@ def test_load_forward_model_summary( @pytest.mark.usefixtures("use_tmpdir") -def test_load_forward_model_gen_data(prior_ensemble): +def test_load_forward_model_gen_data(setup_case): config_text = dedent( """ NUM_REALIZATIONS 1 GEN_DATA RESPONSE RESULT_FILE:response_%d.out REPORT_STEPS:0,1 INPUT_FORMAT:ASCII """ ) - Path("config.ert").write_text(config_text, encoding="utf-8") - - ert_config = ErtConfig.from_file("config.ert") - ert = EnKFMain(ert_config) - run_context = ert.ensemble_context(prior_ensemble, [True], iteration=0) - ert.createRunPath(run_context) + ert, prior_ensemble = setup_case(config_text) run_path = Path("simulations/realization-0/iter-0/") with open(run_path / "response_0.out", "w", encoding="utf-8") as fout: fout.write("\n".join(["1", "2", "3"])) @@ -189,20 +210,15 @@ def test_load_forward_model_gen_data(prior_ensemble): @pytest.mark.usefixtures("use_tmpdir") -def test_single_valued_gen_data_with_active_info_is_loaded(prior_ensemble): +def test_single_valued_gen_data_with_active_info_is_loaded(setup_case): config_text = dedent( """ NUM_REALIZATIONS 1 GEN_DATA RESPONSE RESULT_FILE:response_%d.out REPORT_STEPS:0 INPUT_FORMAT:ASCII """ ) - Path("config.ert").write_text(config_text, encoding="utf-8") + ert, prior_ensemble = setup_case(config_text) - ert_config = ErtConfig.from_file("config.ert") - ert = EnKFMain(ert_config) - - run_context = ert.ensemble_context(prior_ensemble, [True], iteration=0) - ert.createRunPath(run_context) run_path = Path("simulations/realization-0/iter-0/") with open(run_path / "response_0.out", "w", encoding="utf-8") as fout: fout.write("\n".join(["1"])) @@ -217,20 +233,15 @@ def test_single_valued_gen_data_with_active_info_is_loaded(prior_ensemble): @pytest.mark.usefixtures("use_tmpdir") -def test_that_all_decativated_values_are_loaded(prior_ensemble): +def test_that_all_decativated_values_are_loaded(setup_case): config_text = dedent( """ NUM_REALIZATIONS 1 GEN_DATA RESPONSE RESULT_FILE:response_%d.out REPORT_STEPS:0 INPUT_FORMAT:ASCII """ ) - Path("config.ert").write_text(config_text, encoding="utf-8") + ert, prior_ensemble = setup_case(config_text) - ert_config = ErtConfig.from_file("config.ert") - ert = EnKFMain(ert_config) - - run_context = ert.ensemble_context(prior_ensemble, [True], iteration=0) - ert.createRunPath(run_context) run_path = Path("simulations/realization-0/iter-0/") with open(run_path / "response_0.out", "w", encoding="utf-8") as fout: fout.write("\n".join(["-1"])) @@ -248,7 +259,7 @@ def test_that_all_decativated_values_are_loaded(prior_ensemble): @pytest.mark.usefixtures("use_tmpdir") -def test_loading_gen_data_without_restart(prior_ensemble): +def test_loading_gen_data_without_restart(storage): config_text = dedent( """ NUM_REALIZATIONS 1 @@ -259,6 +270,13 @@ def test_loading_gen_data_without_restart(prior_ensemble): ert_config = ErtConfig.from_file("config.ert") ert = EnKFMain(ert_config) + prior_ensemble = storage.create_ensemble( + storage.create_experiment( + responses=ert_config.ensemble_config.response_configuration + ), + name="prior", + ensemble_size=ert.getEnsembleSize(), + ) run_context = ert.ensemble_context(prior_ensemble, [True], iteration=0) ert.createRunPath(run_context) diff --git a/tests/unit_tests/test_summary_response.py b/tests/unit_tests/test_summary_response.py index 451bf2c346b..df28acc2721 100644 --- a/tests/unit_tests/test_summary_response.py +++ b/tests/unit_tests/test_summary_response.py @@ -31,7 +31,9 @@ def test_load_summary_response_restart_not_zero(tmpdir, snapshot, request, stora ert_config = ErtConfig.from_file("config.ert") ert = EnKFMain(ert_config) - experiment_id = storage.create_experiment() + experiment_id = storage.create_experiment( + responses=ert_config.ensemble_config.response_configuration + ) ensemble = storage.create_ensemble( experiment_id, name="prior", ensemble_size=ert.getEnsembleSize() )