From 52730870c5d86834e5b1781a3bcc68edc74ca28c Mon Sep 17 00:00:00 2001 From: Peter Verveer Date: Wed, 18 Dec 2024 08:24:53 +0000 Subject: [PATCH] Refactor the everest run model --- src/ert/run_models/everest_run_model.py | 819 ++++++++++++----------- src/everest/config/environment_config.py | 11 +- src/everest/detached/jobs/everserver.py | 43 +- src/everest/simulator/__init__.py | 3 - src/everest/simulator/simulator_cache.py | 58 -- tests/everest/test_environment.py | 4 +- tests/everest/test_everest_output.py | 8 +- tests/everest/test_everserver.py | 8 +- tests/everest/test_simulator_cache.py | 2 +- tests/everest/test_yaml_parser.py | 8 +- 10 files changed, 478 insertions(+), 486 deletions(-) delete mode 100644 src/everest/simulator/simulator_cache.py diff --git a/src/ert/run_models/everest_run_model.py b/src/ert/run_models/everest_run_model.py index aa28a26ef55..803e231af32 100644 --- a/src/ert/run_models/everest_run_model.py +++ b/src/ert/run_models/everest_run_model.py @@ -6,19 +6,14 @@ import logging import os import queue -import random import shutil from collections import defaultdict -from collections.abc import Callable, Mapping +from collections.abc import Callable from dataclasses import dataclass +from enum import IntEnum from pathlib import Path from types import TracebackType -from typing import ( - TYPE_CHECKING, - Any, - Literal, - Protocol, -) +from typing import TYPE_CHECKING, Any, Protocol import numpy as np import seba_sqlite.sqlite_storage @@ -38,70 +33,16 @@ from ert.storage import open_storage from everest.config import EverestConfig from everest.optimizer.everest2ropt import everest2ropt -from everest.simulator import SimulatorCache from everest.simulator.everest_to_ert import everest_to_ert_config from everest.strings import EVEREST from ..run_arg import RunArg, create_run_arguments -from .base_run_model import BaseRunModel, StatusEvents +from .base_run_model import BaseRunModel if TYPE_CHECKING: - import numpy.typing as npt - from ert.storage import Ensemble, Experiment -# A number of settings for the table reporters: -RESULT_COLUMNS = { - "result_id": "ID", - "batch_id": "Batch", - "functions.weighted_objective": "Total-Objective", - "linear_constraints.violations": "IC-violation", - "nonlinear_constraints.violations": "OC-violation", - "functions.objectives": "Objective", - "functions.constraints": "Constraint", - "evaluations.variables": "Control", - "linear_constraints.values": "IC-diff", - "nonlinear_constraints.values": "OC-diff", - "functions.scaled_objectives": "Scaled-Objective", - "functions.scaled_constraints": "Scaled-Constraint", - "evaluations.scaled_variables": "Scaled-Control", - "nonlinear_constraints.scaled_values": "Scaled-OC-diff", - "nonlinear_constraints.scaled_violations": "Scaled-OC-violation", -} -GRADIENT_COLUMNS = { - "result_id": "ID", - "batch_id": "Batch", - "gradients.weighted_objective": "Total-Gradient", - "gradients.objectives": "Grad-objective", - "gradients.constraints": "Grad-constraint", -} -SIMULATION_COLUMNS = { - "result_id": "ID", - "batch_id": "Batch", - "realization": "Realization", - "evaluations.evaluation_ids": "Simulation", - "evaluations.variables": "Control", - "evaluations.objectives": "Objective", - "evaluations.constraints": "Constraint", - "evaluations.scaled_variables": "Scaled-Control", - "evaluations.scaled_objectives": "Scaled-Objective", - "evaluations.scaled_constraints": "Scaled-Constraint", -} -PERTURBATIONS_COLUMNS = { - "result_id": "ID", - "batch_id": "Batch", - "realization": "Realization", - "evaluations.perturbed_evaluation_ids": "Simulation", - "evaluations.perturbed_variables": "Control", - "evaluations.perturbed_objectives": "Objective", - "evaluations.perturbed_constraints": "Constraint", - "evaluations.scaled_perturbed_variables": "Scaled-Control", - "evaluations.scaled_perturbed_objectives": "Scaled-Objective", - "evaluations.scaled_perturbed_constraints": "Scaled-Constraint", -} -MIN_HEADER_LEN = 3 - logger = logging.getLogger(__name__) @@ -129,6 +70,14 @@ class OptimizerCallback(Protocol): def __call__(self) -> str | None: ... +class EverestExitCode(IntEnum): + COMPLETED = 1 + TOO_FEW_REALIZATIONS = 2 + MAX_FUNCTIONS_REACHED = 3 + MAX_BATCH_NUM_REACHED = 4 + USER_ABORT = 5 + + @dataclass class OptimalResult: batch: int @@ -152,80 +101,50 @@ def __init__( self, config: ErtConfig, everest_config: EverestConfig, - simulation_callback: SimulationCallback, - optimization_callback: OptimizerCallback, - display_all_jobs: bool = True, + simulation_callback: SimulationCallback | None, + optimization_callback: OptimizerCallback | None, ): - everest_config = self._add_defaults(everest_config) - Path(everest_config.log_dir).mkdir(parents=True, exist_ok=True) Path(everest_config.optimization_output_dir).mkdir(parents=True, exist_ok=True) - self.ropt_config = everest2ropt(everest_config) - self.everest_config = everest_config - self.support_restart = False + assert everest_config.environment is not None + logging.getLogger(EVEREST).info( + "Using random seed: %d", everest_config.environment.random_seed + ) + logging.getLogger(EVEREST).info( + "To deterministically reproduce this experiment, " + "add the above random seed to your configuration file." + ) + + self._ropt_config = everest2ropt(everest_config) + self._everest_config = everest_config self._sim_callback = simulation_callback self._opt_callback = optimization_callback self._fm_errors: dict[int, dict[str, Any]] = {} - self._simulation_delete_run_path = ( - False - if everest_config.simulator is None - else (everest_config.simulator.delete_run_path or False) - ) - self._display_all_jobs = display_all_jobs self._result: OptimalResult | None = None - self._exit_code: Literal["max_batch_num_reached"] | OptimizerExitCode | None = ( - None + self._exit_code: EverestExitCode | None = None + self._evaluator_cache: _EvaluatorCache | None = ( + _EvaluatorCache() + if ( + everest_config.simulator is not None + and everest_config.simulator.enable_cache + ) + else None ) - self._max_batch_num_reached = False - self._simulator_cache: SimulatorCache | None = None - if ( - everest_config.simulator is not None - and everest_config.simulator.enable_cache - ): - self._simulator_cache = SimulatorCache() self._experiment: Experiment | None = None - self.eval_server_cfg: EvaluatorServerConfig | None = None - storage = open_storage(config.ens_path, mode="w") - status_queue: queue.SimpleQueue[StatusEvents] = queue.SimpleQueue() - self.batch_id: int = 0 - self.status: SimulationStatus | None = None + self._eval_server_cfg: EvaluatorServerConfig | None = None + self._batch_id: int = 0 + self._status: SimulationStatus | None = None super().__init__( config, - storage, + open_storage(config.ens_path, mode="w"), config.queue_config, - status_queue, + queue.SimpleQueue(), active_realizations=[], # Set dynamically in run_forward_model() ) - - self.num_retries_per_iter = 0 # OK? - - @staticmethod - def _add_defaults(config: EverestConfig) -> EverestConfig: - """This function exists as a temporary mechanism to default configurations that - needs to be global in the sense that they should carry over both to ropt and ERT. - When the proper mechanism for this is implemented this code - should die. - - """ - defaulted_config = config.copy() - assert defaulted_config.environment is not None - - random_seed = defaulted_config.environment.random_seed - if random_seed is None: - random_seed = random.randint(1, 2**30) - - defaulted_config.environment.random_seed = random_seed - - logging.getLogger(EVEREST).info("Using random seed: %d", random_seed) - logging.getLogger(EVEREST).info( - "To deterministically reproduce this experiment, " - "add the above random seed to your configuration file." - ) - - return defaulted_config + self.support_restart = False @classmethod def create( @@ -234,29 +153,39 @@ def create( simulation_callback: SimulationCallback | None = None, optimization_callback: OptimizerCallback | None = None, ) -> EverestRunModel: - def default_simulation_callback( - simulation_status: SimulationStatus | None, - ) -> str | None: - return None - - def default_optimization_callback() -> str | None: - return None - - ert_config = everest_to_ert_config(cls._add_defaults(ever_config)) return cls( - config=ert_config, + config=everest_to_ert_config(ever_config), everest_config=ever_config, - simulation_callback=simulation_callback or default_simulation_callback, - optimization_callback=optimization_callback - or default_optimization_callback, + simulation_callback=simulation_callback, + optimization_callback=optimization_callback, ) + @classmethod + def name(cls) -> str: + return "Optimization run" + + @classmethod + def description(cls) -> str: + return "Run batches " + + @property + def exit_code(self) -> EverestExitCode | None: + return self._exit_code + + @property + def result(self) -> OptimalResult | None: + return self._result + + def __repr__(self) -> str: + config_json = json.dumps(self._everest_config, sort_keys=True, indent=2) + return f"EverestRunModel(config={config_json})" + def run_experiment( self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False ) -> None: self.log_at_startup() self.restart = restart - self.eval_server_cfg = evaluator_server_config + self._eval_server_cfg = evaluator_server_config self._experiment = self._storage.create_experiment( name=f"EnOpt@{datetime.datetime.now().strftime('%Y-%m-%d@%H:%M:%S')}", parameters=self.ert_config.ensemble_config.parameter_configuration, @@ -273,7 +202,7 @@ def run_experiment( # This mechanism is outdated and not supported by the ropt package. It # is retained for now via the seba_sqlite package. seba_storage = SqliteStorage( # type: ignore - optimizer, self.everest_config.optimization_output_dir + optimizer, self._everest_config.optimization_output_dir ) # Run the optimization: @@ -284,102 +213,69 @@ def run_experiment( seba_storage.get_optimal_result() # type: ignore ) - self._exit_code = ( - "max_batch_num_reached" - if self._max_batch_num_reached - else optimizer_exit_code - ) - - def check_if_runpath_exists(self) -> bool: - return ( - self.everest_config.simulation_dir is not None - and os.path.exists(self.everest_config.simulation_dir) - and any(os.listdir(self.everest_config.simulation_dir)) - ) - - def _handle_errors( - self, - batch: int, - simulation: Any, - realization: str, - fm_name: str, - error_path: str, - ) -> None: - fm_id = f"b_{batch}_r_{realization}_s_{simulation}_{fm_name}" - fm_logger = logging.getLogger("forward_models") - with open(error_path, encoding="utf-8") as errors: - error_str = errors.read() - - error_hash = hash(error_str) - err_msg = "Batch: {} Realization: {} Simulation: {} Job: {} Failed {}".format( - batch, realization, simulation, fm_name, "Error: {}\n {}" - ) - - if error_hash not in self._fm_errors: - error_id = len(self._fm_errors) - fm_logger.error(err_msg.format(error_id, error_str)) - self._fm_errors.update({error_hash: {"error_id": error_id, "ids": [fm_id]}}) - elif fm_id not in self._fm_errors[error_hash]["ids"]: - self._fm_errors[error_hash]["ids"].append(fm_id) - error_id = self._fm_errors[error_hash]["error_id"] - fm_logger.error(err_msg.format(error_id, "")) - - def _delete_runpath(self, run_args: list[RunArg]) -> None: - logging.getLogger(EVEREST).debug("Simulation callback called") - if self._simulation_delete_run_path: - for i, real in self.get_current_snapshot().reals.items(): - path_to_delete = run_args[int(i)].runpath - if real["status"] == "Finished" and os.path.isdir(path_to_delete): - - def onerror( - _: Callable[..., Any], - path: str, - sys_info: tuple[ - type[BaseException], BaseException, TracebackType - ], - ) -> None: - logging.getLogger(EVEREST).debug( - f"Failed to remove {path}, {sys_info}" - ) - - shutil.rmtree(path_to_delete, onerror=onerror) # pylint: disable=deprecated-argument - - def _on_before_forward_model_evaluation( - self, _: OptimizerEvent, optimizer: BasicOptimizer - ) -> None: - logging.getLogger(EVEREST).debug("Optimization callback called") - - if ( - self.everest_config.optimization is not None - and self.everest_config.optimization.max_batch_num is not None - and (self.batch_id >= self.everest_config.optimization.max_batch_num) - ): - self._max_batch_num_reached = True - logging.getLogger(EVEREST).info("Maximum number of batches reached") - optimizer.abort_optimization() - if ( - self._opt_callback is not None - and self._opt_callback() == "stop_optimization" - ): - logging.getLogger(EVEREST).info("User abort requested.") - optimizer.abort_optimization() + if self._exit_code is None: + self._exit_code = self._get_everest_exit_code(optimizer_exit_code) def _create_optimizer(self) -> BasicOptimizer: - assert ( - self.everest_config.environment is not None - and self.everest_config.environment is not None - ) - - ropt_output_folder = Path(self.everest_config.optimization_output_dir) + RESULT_COLUMNS = { + "result_id": "ID", + "batch_id": "Batch", + "functions.weighted_objective": "Total-Objective", + "linear_constraints.violations": "IC-violation", + "nonlinear_constraints.violations": "OC-violation", + "functions.objectives": "Objective", + "functions.constraints": "Constraint", + "evaluations.variables": "Control", + "linear_constraints.values": "IC-diff", + "nonlinear_constraints.values": "OC-diff", + "functions.scaled_objectives": "Scaled-Objective", + "functions.scaled_constraints": "Scaled-Constraint", + "evaluations.scaled_variables": "Scaled-Control", + "nonlinear_constraints.scaled_values": "Scaled-OC-diff", + "nonlinear_constraints.scaled_violations": "Scaled-OC-violation", + } + GRADIENT_COLUMNS = { + "result_id": "ID", + "batch_id": "Batch", + "gradients.weighted_objective": "Total-Gradient", + "gradients.objectives": "Grad-objective", + "gradients.constraints": "Grad-constraint", + } + SIMULATION_COLUMNS = { + "result_id": "ID", + "batch_id": "Batch", + "realization": "Realization", + "evaluations.evaluation_ids": "Simulation", + "evaluations.variables": "Control", + "evaluations.objectives": "Objective", + "evaluations.constraints": "Constraint", + "evaluations.scaled_variables": "Scaled-Control", + "evaluations.scaled_objectives": "Scaled-Objective", + "evaluations.scaled_constraints": "Scaled-Constraint", + } + PERTURBATIONS_COLUMNS = { + "result_id": "ID", + "batch_id": "Batch", + "realization": "Realization", + "evaluations.perturbed_evaluation_ids": "Simulation", + "evaluations.perturbed_variables": "Control", + "evaluations.perturbed_objectives": "Objective", + "evaluations.perturbed_constraints": "Constraint", + "evaluations.scaled_perturbed_variables": "Scaled-Control", + "evaluations.scaled_perturbed_objectives": "Scaled-Objective", + "evaluations.scaled_perturbed_constraints": "Scaled-Constraint", + } + MIN_HEADER_LEN = 3 # Initialize the optimizer with output tables. `min_header_len` is set # to ensure that all tables have the same number of header lines, # simplifying code that reads them as fixed width tables. `maximize` is # set because ropt reports minimization results, while everest wants # maximization results, necessitating a conversion step. + ropt_output_folder = Path(self._everest_config.optimization_output_dir) optimizer = ( BasicOptimizer( - enopt_config=self.ropt_config, evaluator=self._forward_model_evaluator + enopt_config=self._ropt_config, evaluator=self._forward_model_evaluator ) .add_table( columns=RESULT_COLUMNS, @@ -420,101 +316,170 @@ def _create_optimizer(self) -> BasicOptimizer: return optimizer - @classmethod - def name(cls) -> str: - return "Optimization run" + def _get_everest_exit_code( + self, optimizer_exit_code: OptimizerExitCode + ) -> EverestExitCode: + match optimizer_exit_code: + case OptimizerExitCode.MAX_FUNCTIONS_REACHED: + return EverestExitCode.MAX_FUNCTIONS_REACHED + case OptimizerExitCode.USER_ABORT: + return EverestExitCode.USER_ABORT + case OptimizerExitCode.TOO_FEW_REALIZATIONS: + return EverestExitCode.TOO_FEW_REALIZATIONS + case _: + return EverestExitCode.COMPLETED - @classmethod - def description(cls) -> str: - return "Run batches " + def _on_before_forward_model_evaluation( + self, _: OptimizerEvent, optimizer: BasicOptimizer + ) -> None: + logging.getLogger(EVEREST).debug("Optimization callback called") - @property - def exit_code( - self, - ) -> Literal["max_batch_num_reached"] | OptimizerExitCode | None: - return self._exit_code + if ( + self._everest_config.optimization is not None + and self._everest_config.optimization.max_batch_num is not None + and (self._batch_id >= self._everest_config.optimization.max_batch_num) + ): + self._exit_code = EverestExitCode.MAX_BATCH_NUM_REACHED + logging.getLogger(EVEREST).info("Maximum number of batches reached") + optimizer.abort_optimization() + if ( + self._opt_callback is not None + and self._opt_callback() == "stop_optimization" + ): + logging.getLogger(EVEREST).info("User abort requested.") + optimizer.abort_optimization() - @property - def result(self) -> OptimalResult | None: - return self._result + def _forward_model_evaluator( + self, control_values: NDArray[np.float64], evaluator_context: EvaluatorContext + ) -> EvaluatorResult: + # Reset the current run status: + self._status = None - def __repr__(self) -> str: - config_json = json.dumps(self.everest_config, sort_keys=True, indent=2) - return f"EverestRunModel(config={config_json})" + # Get any cached_results results that may be useful: + cached_results = self._get_cached_results(control_values, evaluator_context) + + # Create the case to run: + batch_data = self._init_batch_data( + control_values, evaluator_context, cached_results + ) + + # Initialize a new experiment in storage: + assert self._experiment + ensemble = self._experiment.create_ensemble( + name=f"batch_{self._batch_id}", + ensemble_size=len(batch_data), + ) + for sim_id, controls in enumerate(batch_data.values()): + self._setup_sim(sim_id, controls, ensemble) + + # Evaluate the case: + run_args = self._get_run_args(ensemble, evaluator_context, batch_data) + self._context_env.update( + { + "_ERT_EXPERIMENT_ID": str(ensemble.experiment_id), + "_ERT_ENSEMBLE_ID": str(ensemble.id), + "_ERT_SIMULATION_MODE": "batch_simulation", + } + ) + assert self._eval_server_cfg + self._evaluate_and_postprocess(run_args, ensemble, self._eval_server_cfg) + + # If necessary, delete the run path: + self._delete_runpath(run_args) + + # Gather the results and create the result for ropt: + results = self._gather_results(ensemble) + evaluator_result = self._get_evaluator_result( + control_values, evaluator_context, batch_data, results, cached_results + ) + + # Increase the batch ID for the next evaluation: + self._batch_id += 1 + + # Add the results from the evaluations to the cache: + self._add_results_to_cache( + control_values, + evaluator_context, + batch_data, + evaluator_result.objectives, + evaluator_result.constraints, + ) + + return evaluator_result + + def _get_cached_results( + self, control_values: NDArray[np.float64], evaluator_context: EvaluatorContext + ) -> dict[int, Any]: + cached_results: dict[int, Any] = {} + if self._evaluator_cache is not None: + assert evaluator_context.config.realizations.names is not None + for sim_idx, realization in enumerate(evaluator_context.realizations): + cached_data = self._evaluator_cache.get( + evaluator_context.config.realizations.names[realization], + control_values[sim_idx, :], + ) + if cached_data is not None: + cached_results[sim_idx] = cached_data + return cached_results @staticmethod - def _add_control( - controls: Mapping[str, Any], - control_name: tuple[Any, ...], - control_value: float, - ) -> None: - group_name = control_name[0] - variable_name = control_name[1] - group = controls[group_name] - if len(control_name) > 2: - index_name = str(control_name[2]) - if variable_name in group: - group[variable_name][index_name] = control_value + def _init_batch_data( + control_values: NDArray[np.float64], + evaluator_context: EvaluatorContext, + cached_results: dict[int, Any], + ) -> dict[int, dict[str, Any]]: + def add_control( + controls: dict[str, Any], + control_name: tuple[Any, ...], + control_value: float, + ) -> None: + group_name = control_name[0] + variable_name = control_name[1] + group = controls.get(group_name, {}) + if len(control_name) > 2: + index_name = str(control_name[2]) + if variable_name in group: + group[variable_name][index_name] = control_value + else: + group[variable_name] = {index_name: control_value} else: - group[variable_name] = {index_name: control_value} - else: - group[variable_name] = control_value + group[variable_name] = control_value + controls[group_name] = group + + batch_data = {} + for control_idx in range(control_values.shape[0]): + add_to_case = ( + evaluator_context.active is None + or evaluator_context.active[evaluator_context.realizations[control_idx]] + ) + if add_to_case and control_idx not in cached_results: + controls: dict[str, Any] = {} + assert evaluator_context.config.variables.names is not None + for control_name, control_value in zip( + evaluator_context.config.variables.names, + control_values[control_idx, :], + strict=False, + ): + add_control(controls, control_name, control_value) + batch_data[control_idx] = controls + return batch_data @staticmethod - def _get_active_results( + def _get_calculated_results( results: list[dict[str, NDArray[np.float64]]], names: tuple[str], controls: NDArray[np.float64], - active: NDArray[np.bool_], + batch_data: dict[int, Any], ) -> NDArray[np.float64]: + control_indices = list(batch_data.keys()) values = np.zeros((controls.shape[0], len(names)), dtype=float64) for func_idx, name in enumerate(names): - values[active, func_idx] = np.fromiter( + values[control_indices, func_idx] = np.fromiter( (np.nan if not result else result[name][0] for result in results), dtype=np.float64, ) return values - def init_case_data( - self, - control_values: NDArray[np.float64], - metadata: EvaluatorContext, - realization_ids: list[int], - ) -> tuple[ - list[tuple[int, defaultdict[str, Any]]], NDArray[np.bool_], dict[int, int] - ]: - active = ( - np.ones(control_values.shape[0], dtype=np.bool_) - if metadata.active is None - else np.fromiter( - (metadata.active[realization] for realization in metadata.realizations), - dtype=np.bool_, - ) - ) - case_data = [] - cached = {} - - for sim_idx, real_id in enumerate(realization_ids): - if self._simulator_cache is not None: - cache_id = self._simulator_cache.find_key( - real_id, control_values[sim_idx, :] - ) - if cache_id is not None: - cached[sim_idx] = cache_id - active[sim_idx] = False - - if active[sim_idx]: - controls: defaultdict[str, Any] = defaultdict(dict) - assert metadata.config.variables.names is not None - for control_name, control_value in zip( - metadata.config.variables.names, - control_values[sim_idx, :], - strict=False, - ): - self._add_control(controls, control_name, control_value) - case_data.append((real_id, controls)) - return case_data, active, cached - def _setup_sim( self, sim_id: int, @@ -549,7 +514,7 @@ def _check_suffix( f"Key {key} has suffixes, a suffix must be specified" ) - if set(controls.keys()) != set(self.everest_config.control_names): + if set(controls.keys()) != set(self._everest_config.control_names): err_msg = "Mismatch between initialized and provided control names." raise KeyError(err_msg) @@ -564,45 +529,25 @@ def _check_suffix( ) for var_name, var_setting in control.items(): _check_suffix(ext_config, var_name, var_setting) - ensemble.save_parameters( control_name, sim_id, ExtParamConfig.to_dataset(control) ) - def _forward_model_evaluator( - self, control_values: NDArray[np.float64], metadata: EvaluatorContext - ) -> EvaluatorResult: - def _slug(entity: str) -> str: - entity = " ".join(str(entity).split()) - return "".join([x if x.isalnum() else "_" for x in entity.strip()]) - - self.status = None # Reset the current run status - assert metadata.config.realizations.names - realization_ids = [ - metadata.config.realizations.names[realization] - for realization in metadata.realizations - ] - case_data, active, cached = self.init_case_data( - control_values=control_values, - metadata=metadata, - realization_ids=realization_ids, - ) - assert self._experiment - ensemble = self._experiment.create_ensemble( - name=f"batch_{self.batch_id}", - ensemble_size=len(case_data), - ) - for sim_id, (geo_id, controls) in enumerate(case_data): - assert isinstance(geo_id, int) - self._setup_sim(sim_id, controls, ensemble) - + def _get_run_args( + self, + ensemble: Ensemble, + evaluator_context: EvaluatorContext, + batch_data: dict[int, Any], + ) -> list[RunArg]: substitutions = self.ert_config.substitutions - # fill in the missing geo_id data - substitutions[""] = _slug(ensemble.name) - self.active_realizations = [True] * len(case_data) - for sim_id, (geo_id, _) in enumerate(case_data): - if self.active_realizations[sim_id]: - substitutions[f""] = str(geo_id) + substitutions[""] = ensemble.name + self.active_realizations = [True] * len(batch_data) + assert evaluator_context.config.realizations.names is not None + for sim_id, control_idx in enumerate(batch_data.keys()): + realization = evaluator_context.realizations[control_idx] + substitutions[f""] = str( + evaluator_context.config.realizations.names[realization] + ) run_paths = Runpaths( jobname_format=self.ert_config.model_config.jobname_format_string, @@ -611,99 +556,131 @@ def _slug(entity: str) -> str: substitutions=substitutions, eclbase=self.ert_config.model_config.eclbase_format_string, ) - run_args = create_run_arguments( + return create_run_arguments( run_paths, self.active_realizations, ensemble=ensemble, ) - self._context_env.update( - { - "_ERT_EXPERIMENT_ID": str(ensemble.experiment_id), - "_ERT_ENSEMBLE_ID": str(ensemble.id), - "_ERT_SIMULATION_MODE": "batch_simulation", - } - ) - assert self.eval_server_cfg - self._evaluate_and_postprocess(run_args, ensemble, self.eval_server_cfg) + def _delete_runpath(self, run_args: list[RunArg]) -> None: + if ( + self._everest_config.simulator is not None + and self._everest_config.simulator.delete_run_path + ): + for i, real in self.get_current_snapshot().reals.items(): + path_to_delete = run_args[int(i)].runpath + if real["status"] == "Finished" and os.path.isdir(path_to_delete): - self._delete_runpath(run_args) - # gather results - results: list[dict[str, npt.NDArray[np.float64]]] = [] + def onerror( + _: Callable[..., Any], + path: str, + sys_info: tuple[ + type[BaseException], BaseException, TracebackType + ], + ) -> None: + logging.getLogger(EVEREST).debug( + f"Failed to remove {path}, {sys_info}" + ) + + shutil.rmtree(path_to_delete, onerror=onerror) # pylint: disable=deprecated-argument + + def _gather_results( + self, ensemble: Ensemble + ) -> list[dict[str, NDArray[np.float64]]]: + results: list[dict[str, NDArray[np.float64]]] = [] for sim_id, successful in enumerate(self.active_realizations): if not successful: logger.error(f"Simulation {sim_id} failed.") results.append({}) continue d = {} - for key in self.everest_config.result_names: + for key in self._everest_config.result_names: data = ensemble.load_responses(key, (sim_id,)) d[key] = data["values"].to_numpy() results.append(d) - - for fnc_name, alias in self.everest_config.function_aliases.items(): + for fnc_name, alias in self._everest_config.function_aliases.items(): for result in results: result[fnc_name] = result[alias] + return results - objectives = self._get_active_results( + def _get_evaluator_result( + self, + control_values: NDArray[np.float64], + evaluator_context: EvaluatorContext, + batch_data: dict[int, Any], + results: list[dict[str, NDArray[np.float64]]], + cached_results: dict[int, Any], + ) -> EvaluatorResult: + # We minimize the negative of the objectives: + objectives = -self._get_calculated_results( results, - metadata.config.objectives.names, # type: ignore + evaluator_context.config.objectives.names, # type: ignore control_values, - active, + batch_data, ) constraints = None - if metadata.config.nonlinear_constraints is not None: - constraints = self._get_active_results( + if evaluator_context.config.nonlinear_constraints is not None: + constraints = self._get_calculated_results( results, - metadata.config.nonlinear_constraints.names, # type: ignore + evaluator_context.config.nonlinear_constraints.names, # type: ignore control_values, - active, + batch_data, ) - if self._simulator_cache is not None: - for sim_idx, cache_id in cached.items(): - objectives[sim_idx, ...] = self._simulator_cache.get_objectives( - cache_id - ) + if self._evaluator_cache is not None: + for control_idx, ( + cached_objectives, + cached_constraints, + ) in cached_results.items(): + objectives[control_idx, ...] = cached_objectives if constraints is not None: - constraints[sim_idx, ...] = self._simulator_cache.get_constraints( - cache_id - ) - - sim_ids = np.empty(control_values.shape[0], dtype=np.intc) - sim_ids.fill(-1) - sim_ids[active] = np.arange(len(results), dtype=np.intc) - - # Add the results from active simulations to the cache: - if self._simulator_cache is not None: - for sim_idx, real_id in enumerate(realization_ids): - if active[sim_idx]: - self._simulator_cache.add_simulation_results( - sim_idx, real_id, control_values, objectives, constraints - ) + assert cached_constraints is not None + constraints[control_idx, ...] = cached_constraints - # Note the negative sign for the objective results. Everest aims to do a - # maximization, while the standard practice of minimizing is followed by - # ropt. Therefore we will minimize the negative of the objectives: - evaluator_result = EvaluatorResult( - objectives=-objectives, + sim_ids = np.full(control_values.shape[0], -1, dtype=np.intc) + sim_ids[list(batch_data.keys())] = np.arange(len(batch_data), dtype=np.intc) + return EvaluatorResult( + objectives=objectives, constraints=constraints, - batch_id=self.batch_id, + batch_id=self._batch_id, evaluation_ids=sim_ids, ) - self.batch_id += 1 + def _add_results_to_cache( + self, + control_values: NDArray[np.float64], + evaluator_context: EvaluatorContext, + batch_data: dict[int, Any], + objectives: NDArray[np.float64], + constraints: NDArray[np.float64] | None, + ) -> None: + if self._evaluator_cache is not None: + assert evaluator_context.config.realizations.names is not None + for control_idx in batch_data: + realization = evaluator_context.realizations[control_idx] + self._evaluator_cache.add( + evaluator_context.config.realizations.names[realization], + control_values[control_idx, ...], + objectives[control_idx, ...], + None if constraints is None else constraints[control_idx, ...], + ) - return evaluator_result + def check_if_runpath_exists(self) -> bool: + return ( + self._everest_config.simulation_dir is not None + and os.path.exists(self._everest_config.simulation_dir) + and any(os.listdir(self._everest_config.simulation_dir)) + ) def send_snapshot_event(self, event: Event, iteration: int) -> None: super().send_snapshot_event(event, iteration) if type(event) in (EESnapshot, EESnapshotUpdate): newstatus = self._simulation_status(self.get_current_snapshot()) - if self.status != newstatus: # No change in status - self._sim_callback(newstatus) - self.status = newstatus + if self._status != newstatus: # No change in status + if self._sim_callback is not None: + self._sim_callback(newstatus) + self._status = newstatus def _simulation_status(self, snapshot: EnsembleSnapshot) -> SimulationStatus: jobs_progress: list[list[JobProgress]] = [] @@ -728,10 +705,10 @@ def _simulation_status(self, snapshot: EnsembleSnapshot) -> SimulationStatus: ) if fm_step.get("error", ""): self._handle_errors( - batch=self.batch_id, + batch=self._batch_id, simulation=simulation, realization=realization, - fm_name=fm_step.get("name", "Unknwon"), # type: ignore + fm_name=fm_step.get("name", "Unknown"), # type: ignore error_path=fm_step.get("stderr", ""), # type: ignore ) jobs_progress.append(jobs) @@ -739,5 +716,71 @@ def _simulation_status(self, snapshot: EnsembleSnapshot) -> SimulationStatus: return { "status": self.get_current_status(), "progress": jobs_progress, - "batch_number": self.batch_id, + "batch_number": self._batch_id, } + + def _handle_errors( + self, + batch: int, + simulation: Any, + realization: str, + fm_name: str, + error_path: str, + ) -> None: + fm_id = f"b_{batch}_r_{realization}_s_{simulation}_{fm_name}" + fm_logger = logging.getLogger("forward_models") + with open(error_path, encoding="utf-8") as errors: + error_str = errors.read() + + error_hash = hash(error_str) + err_msg = "Batch: {} Realization: {} Simulation: {} Job: {} Failed {}".format( + batch, realization, simulation, fm_name, "Error: {}\n {}" + ) + + if error_hash not in self._fm_errors: + error_id = len(self._fm_errors) + fm_logger.error(err_msg.format(error_id, error_str)) + self._fm_errors.update({error_hash: {"error_id": error_id, "ids": [fm_id]}}) + elif fm_id not in self._fm_errors[error_hash]["ids"]: + self._fm_errors[error_hash]["ids"].append(fm_id) + error_id = self._fm_errors[error_hash]["error_id"] + fm_logger.error(err_msg.format(error_id, "")) + + +class _EvaluatorCache: + EPS = float(np.finfo(np.float32).eps) + + def __init__(self) -> None: + self._data: defaultdict[ + int, + list[ + tuple[ + NDArray[np.float64], NDArray[np.float64], NDArray[np.float64] | None + ] + ], + ] = defaultdict(list) + + def add( + self, + realization_id: int, + control_values: NDArray[np.float64], + objectives: NDArray[np.float64], + constraints: NDArray[np.float64] | None, + ) -> None: + self._data[realization_id].append( + ( + control_values.copy(), + objectives.copy(), + None if constraints is None else constraints.copy(), + ), + ) + + def get( + self, realization_id: int, controls: NDArray[np.float64] + ) -> tuple[NDArray[np.float64], NDArray[np.float64] | None] | None: + for control_values, objectives, constraints in self._data.get( + realization_id, [] + ): + if np.allclose(controls, control_values, rtol=0.0, atol=self.EPS): + return objectives, constraints + return None diff --git a/src/everest/config/environment_config.py b/src/everest/config/environment_config.py index 080c7bb09b1..449d1d787b0 100644 --- a/src/everest/config/environment_config.py +++ b/src/everest/config/environment_config.py @@ -1,6 +1,7 @@ -from typing import Literal +from typing import Literal, Self -from pydantic import BaseModel, Field, field_validator +from numpy.random import SeedSequence +from pydantic import BaseModel, Field, field_validator, model_validator from everest.config.validation_utils import check_path_valid @@ -43,3 +44,9 @@ class EnvironmentConfig(BaseModel, extra="forbid"): # type: ignore def validate_output_folder(cls, output_folder): # pylint:disable=E0213 check_path_valid(output_folder) return output_folder + + @model_validator(mode="after") + def validate_random_seed(self) -> Self: + if self.random_seed is None: + self.random_seed = SeedSequence().entropy + return self diff --git a/src/everest/detached/jobs/everserver.py b/src/everest/detached/jobs/everserver.py index 936ccedf236..02ae28a1172 100755 --- a/src/everest/detached/jobs/everserver.py +++ b/src/everest/detached/jobs/everserver.py @@ -30,11 +30,10 @@ HTTPBasic, HTTPBasicCredentials, ) -from ropt.enums import OptimizerExitCode from ert.config import QueueSystem from ert.ensemble_evaluator import EvaluatorServerConfig -from ert.run_models.everest_run_model import EverestRunModel +from ert.run_models.everest_run_model import EverestExitCode, EverestRunModel from everest import export_to_csv, export_with_progress from everest.config import EverestConfig, ServerConfig from everest.detached import ServerStatus, get_opt_status, update_everserver_status @@ -373,25 +372,31 @@ def main(): def _get_optimization_status(exit_code, shared_data): - if exit_code == "max_batch_num_reached": - return ServerStatus.completed, "Maximum number of batches reached." - - if exit_code == OptimizerExitCode.MAX_FUNCTIONS_REACHED: - return ServerStatus.completed, "Maximum number of function evaluations reached." + match exit_code: + case EverestExitCode.MAX_BATCH_NUM_REACHED: + return ServerStatus.completed, "Maximum number of batches reached." + + case EverestExitCode.MAX_FUNCTIONS_REACHED: + return ( + ServerStatus.completed, + "Maximum number of function evaluations reached.", + ) - if exit_code == OptimizerExitCode.USER_ABORT: - return ServerStatus.stopped, "Optimization aborted." + case EverestExitCode.USER_ABORT: + return ServerStatus.stopped, "Optimization aborted." - if exit_code == OptimizerExitCode.TOO_FEW_REALIZATIONS: - status = ( - ServerStatus.stopped if shared_data[STOP_ENDPOINT] else ServerStatus.failed - ) - messages = _failed_realizations_messages(shared_data) - for msg in messages: - logging.getLogger(EVEREST).error(msg) - return status, "\n".join(messages) - - return ServerStatus.completed, "Optimization completed." + case EverestExitCode.TOO_FEW_REALIZATIONS: + status = ( + ServerStatus.stopped + if shared_data[STOP_ENDPOINT] + else ServerStatus.failed + ) + messages = _failed_realizations_messages(shared_data) + for msg in messages: + logging.getLogger(EVEREST).error(msg) + return status, "\n".join(messages) + case _: + return ServerStatus.completed, "Optimization completed." def _failed_realizations_messages(shared_data): diff --git a/src/everest/simulator/__init__.py b/src/everest/simulator/__init__.py index 02e9a03f80e..b1611081e43 100644 --- a/src/everest/simulator/__init__.py +++ b/src/everest/simulator/__init__.py @@ -1,5 +1,3 @@ -from everest.simulator.simulator_cache import SimulatorCache - JOB_SUCCESS = "Finished" JOB_WAITING = "Waiting" JOB_RUNNING = "Running" @@ -109,5 +107,4 @@ "JOB_RUNNING", "JOB_SUCCESS", "JOB_WAITING", - "SimulatorCache", ] diff --git a/src/everest/simulator/simulator_cache.py b/src/everest/simulator/simulator_cache.py deleted file mode 100644 index db4e3a7ae3c..00000000000 --- a/src/everest/simulator/simulator_cache.py +++ /dev/null @@ -1,58 +0,0 @@ -from collections import defaultdict -from itertools import count - -import numpy as np -from numpy._typing import NDArray - - -# This cache can be used to prevent re-evaluation of forward models. Due to its -# simplicity it has some limitations: -# - There is no limit on the number of cached entries. -# - Searching in the cache is by brute-force, iterating over the entries. -# Both of these should not be an issue for the intended use with cases where the -# forward models are very expensive to compute: The number of cached entries is -# not expected to become prohibitively large. -class SimulatorCache: - def __init__(self) -> None: - # Stores the realization/controls key, together with an ID. - self._keys: defaultdict[int, list[tuple[NDArray[np.float64], int]]] = ( - defaultdict(list) - ) - # Store objectives and constraints by ID: - self._objectives: dict[int, NDArray[np.float64]] = {} - self._constraints: dict[int, NDArray[np.float64]] = {} - - # Generate unique ID's: - self._counter = count() - - def add_simulation_results( - self, - sim_idx: int, - real_id: int, - control_values: NDArray[np.float64], - objectives: NDArray[np.float64], - constraints: NDArray[np.float64] | None, - ): - cache_id = next(self._counter) - self._keys[real_id].append((control_values[sim_idx, :].copy(), cache_id)) - self._objectives[cache_id] = objectives[sim_idx, ...].copy() - if constraints is not None: - self._constraints[cache_id] = constraints[sim_idx, ...].copy() - - def find_key(self, real_id: int, control_vector: NDArray[np.float64]) -> int | None: - # Brute-force search, premature optimization is the root of all evil: - for cached_vector, cache_id in self._keys.get(real_id, []): - if np.allclose( - control_vector, - cached_vector, - rtol=0.0, - atol=float(np.finfo(np.float32).eps), - ): - return cache_id - return None - - def get_objectives(self, cache_id: int) -> NDArray[np.float64]: - return self._objectives[cache_id] - - def get_constraints(self, cache_id: int) -> NDArray[np.float64]: - return self._constraints[cache_id] diff --git a/tests/everest/test_environment.py b/tests/everest/test_environment.py index 998b64b0384..85fca3261b4 100644 --- a/tests/everest/test_environment.py +++ b/tests/everest/test_environment.py @@ -14,7 +14,7 @@ def test_seed(copy_math_func_test_data_to_tmp): config.environment.random_seed = random_seed run_model = EverestRunModel.create(config) - assert random_seed == run_model.everest_config.environment.random_seed + assert random_seed == run_model._everest_config.environment.random_seed # Res ert_config = _everest_to_ert_config_dict(config) @@ -26,5 +26,5 @@ def test_loglevel(copy_math_func_test_data_to_tmp): config = EverestConfig.load_file(CONFIG_FILE) config.environment.log_level = "info" run_model = EverestRunModel.create(config) - config = run_model.everest_config + config = run_model._everest_config assert len(EverestConfig.lint_config_dict(config.to_dict())) == 0 diff --git a/tests/everest/test_everest_output.py b/tests/everest/test_everest_output.py index 5287251aca4..78ee7e439b8 100644 --- a/tests/everest/test_everest_output.py +++ b/tests/everest/test_everest_output.py @@ -52,13 +52,7 @@ async def test_everest_output(copy_mocked_test_data_to_tmp): initial_folders = set(folders) initial_files = set(files) - # Tests in this class used to fail when a callback was passed in - # Use a callback just to see that everything works fine, even though - # the callback does nothing - def useless_cb(*args, **kwargs): - pass - - EverestRunModel.create(config, optimization_callback=useless_cb) + EverestRunModel.create(config) # Check the output folder is created when stating the optimization # in everest workflow diff --git a/tests/everest/test_everserver.py b/tests/everest/test_everserver.py index 3c501440eec..84224de944e 100644 --- a/tests/everest/test_everserver.py +++ b/tests/everest/test_everserver.py @@ -5,9 +5,9 @@ from pathlib import Path from unittest.mock import patch -from ropt.enums import OptimizerExitCode from seba_sqlite.snapshot import SebaSnapshot +from ert.run_models.everest_run_model import EverestExitCode from everest.config import EverestConfig, ServerConfig from everest.detached import ServerStatus, everserver_status from everest.detached.jobs import everserver @@ -33,8 +33,8 @@ def fail_optimization(self, from_ropt=False): # shared_data (see set_shared_status() below). self._sim_callback(None) if from_ropt: - self._exit_code = OptimizerExitCode.TOO_FEW_REALIZATIONS - return OptimizerExitCode.TOO_FEW_REALIZATIONS + self._exit_code = EverestExitCode.TOO_FEW_REALIZATIONS + return EverestExitCode.TOO_FEW_REALIZATIONS raise Exception("Failed optimization") @@ -121,7 +121,7 @@ def test_everserver_status_failure(_1, copy_math_func_test_data_to_tmp): "ert.run_models.everest_run_model.EverestRunModel.run_experiment", autospec=True, side_effect=lambda self, evaluator_server_config, restart=False: check_status( - ServerConfig.get_hostfile_path(self.everest_config.output_dir), + ServerConfig.get_hostfile_path(self._everest_config.output_dir), status=ServerStatus.running, ), ) diff --git a/tests/everest/test_simulator_cache.py b/tests/everest/test_simulator_cache.py index e347fc76476..02f9ae4d4ff 100644 --- a/tests/everest/test_simulator_cache.py +++ b/tests/everest/test_simulator_cache.py @@ -47,7 +47,7 @@ def new_call(*args): Path("everest_output/optimization_output/seba.db").unlink() # The batch_id was used as a stopping criterion, so it must be reset: - run_model.batch_id = 0 + run_model._batch_id = 0 run_model.run_experiment(evaluator_server_config) assert n_evals == 0 diff --git a/tests/everest/test_yaml_parser.py b/tests/everest/test_yaml_parser.py index 436a594c0b3..0169508a6ba 100644 --- a/tests/everest/test_yaml_parser.py +++ b/tests/everest/test_yaml_parser.py @@ -17,9 +17,13 @@ def test_random_seed(random_seed): if random_seed: config["environment"] = {"random_seed": random_seed} ever_config = EverestConfig.with_defaults(**config) - assert ever_config.environment.random_seed == random_seed ert_config = everest_to_ert_config(ever_config) - assert ert_config.random_seed == random_seed + if random_seed is None: + assert ever_config.environment.random_seed > 0 + assert ert_config.random_seed > 0 + else: + assert ever_config.environment.random_seed == random_seed + assert ert_config.random_seed == random_seed def test_read_file(tmp_path, monkeypatch):