From 8b88b675c2f5c44f8e0efd6f4513df5c8d513514 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Thu, 17 Oct 2024 07:11:49 +0200 Subject: [PATCH] Replace erts interface towards reservoir simulators This replaces the yaml configuration file for Eclipse100/300 with a set of environment variables set through the plugin system. Ert cannot any longer start the raw Eclipse binary itself, it depends on the vendor supplied wrapper binary called "eclrun". Similarly, for OPM flow, Ert will now support a wrapper script "flowrun" if it is present, assuming it has a similar command line API as eclrun. If flowrun is not present, it will look for a binary "flow" in $PATH which can be used, but then only with single-cpu possibilities. Users can point to a custom location of eclrun by adding SETENV to the configuration file. --- .github/workflows/build_and_test.yml | 12 + .github/workflows/test_ert_with_flow.yml | 52 ++ src/ert/config/ert_config.py | 17 +- .../forward_model_steps.py | 82 ++- .../forward_models/res/script/ecl100.py | 9 - .../res/script/ecl100_config.yml | 14 - .../forward_models/res/script/ecl300.py | 9 - .../res/script/ecl300_config.yml | 8 - .../forward_models/res/script/ecl_config.py | 278 --------- .../forward_models/res/script/ecl_run.py | 588 ------------------ .../forward_models/res/script/flow.py | 9 - .../forward_models/run_reservoirsimulator.py | 430 +++++++++++++ .../unit_tests/config/test_forward_model.py | 116 ++-- .../resources/test_ecl_versioning_config.py | 203 ------ .../ert/unit_tests/resources/test_opm_flow.py | 333 ---------- ...onfig.py => test_run_eclipse_simulator.py} | 381 ++++++------ .../resources/test_run_flow_simulator.py | 126 ++++ .../resources/test_run_reservoirsimulator.py | 158 +++++ 18 files changed, 1050 insertions(+), 1775 deletions(-) create mode 100644 .github/workflows/test_ert_with_flow.yml delete mode 100755 src/ert/resources/forward_models/res/script/ecl100.py delete mode 100644 src/ert/resources/forward_models/res/script/ecl100_config.yml delete mode 100755 src/ert/resources/forward_models/res/script/ecl300.py delete mode 100644 src/ert/resources/forward_models/res/script/ecl300_config.yml delete mode 100644 src/ert/resources/forward_models/res/script/ecl_config.py delete mode 100644 src/ert/resources/forward_models/res/script/ecl_run.py delete mode 100755 src/ert/resources/forward_models/res/script/flow.py create mode 100755 src/ert/resources/forward_models/run_reservoirsimulator.py delete mode 100644 tests/ert/unit_tests/resources/test_ecl_versioning_config.py delete mode 100644 tests/ert/unit_tests/resources/test_opm_flow.py rename tests/ert/unit_tests/resources/{test_ecl_run_new_config.py => test_run_eclipse_simulator.py} (71%) create mode 100644 tests/ert/unit_tests/resources/test_run_flow_simulator.py create mode 100644 tests/ert/unit_tests/resources/test_run_reservoirsimulator.py diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index c7fef6e5b8c..1c26ba624a2 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -69,6 +69,18 @@ jobs: python-version: ${{ matrix.python-version }} secrets: inherit + test-ert-with-flow: + strategy: + fail-fast: false + matrix: + os: [ ubuntu-latest ] + python-version: [ '3.11', '3.12' ] + uses: ./.github/workflows/test_ert_with_flow.yml + with: + os: ${{ matrix.os }} + python-version: ${{ matrix.python-version }} + secrets: inherit + test-mac-ert: if: github.ref_type != 'tag' # when not tag strategy: diff --git a/.github/workflows/test_ert_with_flow.yml b/.github/workflows/test_ert_with_flow.yml new file mode 100644 index 00000000000..892424ae1cb --- /dev/null +++ b/.github/workflows/test_ert_with_flow.yml @@ -0,0 +1,52 @@ +on: + workflow_call: + inputs: + os: + type: string + python-version: + type: string + +env: + UV_SYSTEM_PYTHON: 1 + +jobs: + test-ert-with-flow: + name: Run ert tests + timeout-minutes: 20 + runs-on: ${{ inputs.os }} + + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + id: setup_python + with: + python-version: ${{ inputs.python-version }} + + - name: Install uv + uses: astral-sh/setup-uv@v4 + + - name: Install ert + run: + uv pip install ".[dev]" + + - name: Install flow + run: | + set -e + sudo apt install software-properties-common + sudo apt-add-repository ppa:opm/ppa + sudo apt update + sudo apt install mpi-default-bin + sudo apt install libopm-simulators-bin python3-opm-common + + which flow + flow --version + + - name: Run integration tests towards OPM flow without flowrun + run: | + set -e + pytest tests/ert/unit_tests/resources/test_run_flow_simulator.py + + cd test-data/ert/flow_example + perl -p -i -e 's/NUM_REALIZATIONS\s*12/NUM_REALIZATIONS 2/g' flow.ert + ert ensemble_experiment flow.ert --disable-monitor diff --git a/src/ert/config/ert_config.py b/src/ert/config/ert_config.py index deef1a1d90a..7afe5495919 100644 --- a/src/ert/config/ert_config.py +++ b/src/ert/config/ert_config.py @@ -732,10 +732,15 @@ def _create_list_of_forward_model_steps_to_run( cls, installed_steps: dict[str, ForwardModelStep], substitutions: Substitutions, - config_dict, + config_dict: dict, ) -> list[ForwardModelStep]: errors = [] fm_steps = [] + + env_vars = {} + for key, val in config_dict.get("SETENV", []): + env_vars[key] = val + for fm_step_description in config_dict.get(ConfigKeys.FORWARD_MODEL, []): if len(fm_step_description) > 1: unsubstituted_step_name, args = fm_step_description @@ -800,9 +805,15 @@ def _create_list_of_forward_model_steps_to_run( context=substitutions, forward_model_steps=[fm_step], skip_pre_experiment_validation=True, + env_vars=env_vars, ) - job_json = substituted_json["jobList"][0] - fm_step.validate_pre_experiment(job_json) + fm_json = substituted_json["jobList"][0] + fm_json["environment"] = { + **cls.ENV_PR_FM_STEP.get(fm_step.name, {}), # plugins + **fm_json["environment"], # setenv + **substituted_json["global_environment"], + } + fm_step.validate_pre_experiment(fm_json) except ForwardModelStepValidationError as err: errors.append( ConfigValidationError.with_context( diff --git a/src/ert/plugins/hook_implementations/forward_model_steps.py b/src/ert/plugins/hook_implementations/forward_model_steps.py index e1a4442cdd7..02f2421d1c0 100644 --- a/src/ert/plugins/hook_implementations/forward_model_steps.py +++ b/src/ert/plugins/hook_implementations/forward_model_steps.py @@ -1,12 +1,9 @@ -import os import shutil import subprocess from pathlib import Path from textwrap import dedent from typing import Literal -import yaml - from ert import ( ForwardModelStepDocumentation, ForwardModelStepJSON, @@ -14,7 +11,6 @@ ForwardModelStepValidationError, plugin, ) -from ert.plugins import ErtPluginManager class CarefulCopyFile(ForwardModelStepPlugin): @@ -207,12 +203,12 @@ def __init__(self) -> None: str( ( Path(__file__) - / "../../../resources/forward_models/res/script/ecl100.py" + / "../../../resources/forward_models/run_reservoirsimulator.py" ).resolve() ), - "", - "-v", + "eclipse", "", + "", "-n", "", "", @@ -220,18 +216,20 @@ def __init__(self) -> None: default_mapping={"": 1, "": ""}, ) - def validate_pre_experiment(self, _: ForwardModelStepJSON) -> None: + def validate_pre_experiment(self, fm_json: ForwardModelStepJSON) -> None: if "" not in self.private_args: raise ForwardModelStepValidationError( "Forward model step ECLIPSE100 must be given a VERSION argument" ) version = self.private_args[""] - available_versions = _available_eclrun_versions(simulator="eclipse") + available_versions = _available_eclrun_versions( + simulator="eclipse", env_vars=fm_json["environment"] + ) if available_versions and version not in available_versions: raise ForwardModelStepValidationError( - f"Unavailable ECLIPSE100 version {version} current supported " - f"versions {available_versions}" + f"Unavailable ECLIPSE100 version {version}. " + f"Available versions: {available_versions}" ) @staticmethod @@ -265,12 +263,12 @@ def __init__(self) -> None: str( ( Path(__file__) - / "../../../resources/forward_models/res/script/ecl300.py" + / "../../../resources/forward_models/run_reservoirsimulator.py" ).resolve() ), - "", - "-v", + "e300", "", + "", "-n", "", "", @@ -278,17 +276,22 @@ def __init__(self) -> None: default_mapping={"": 1, "": "", "": "version"}, ) - def validate_pre_experiment(self, _: ForwardModelStepJSON) -> None: + def validate_pre_experiment( + self, + fm_step_json: ForwardModelStepJSON, + ) -> None: if "" not in self.private_args: raise ForwardModelStepValidationError( "Forward model step ECLIPSE300 must be given a VERSION argument" ) version = self.private_args[""] - available_versions = _available_eclrun_versions(simulator="e300") + available_versions = _available_eclrun_versions( + simulator="e300", env_vars=fm_step_json["environment"] + ) if available_versions and version not in available_versions: raise ForwardModelStepValidationError( - f"Unavailable ECLIPSE300 version {version} current supported " - f"versions {available_versions}" + f"Unavailable ECLIPSE300 version {version}. " + f"Available versions: {available_versions}" ) @staticmethod @@ -317,12 +320,12 @@ def __init__(self) -> None: str( ( Path(__file__) - / "../../../resources/forward_models/res/script/flow.py" + / "../../../resources/forward_models/run_reservoirsimulator.py" ).resolve() ), - "", - "-v", + "flow", "", + "", "-n", "", "", @@ -628,33 +631,22 @@ def installable_forward_model_steps() -> list[type[ForwardModelStepPlugin]]: return [*_UpperCaseFMSteps, *_LowerCaseFMSteps] -def _available_eclrun_versions(simulator: Literal["eclipse", "e300"]) -> list[str]: - if shutil.which("eclrun") is None: - return [] - pm = ErtPluginManager() - ecl_config_path = ( - pm.get_ecl100_config_path() - if simulator == "eclipse" - else pm.get_ecl300_config_path() - ) - - if not ecl_config_path: - return [] - eclrun_env = {"PATH": os.getenv("PATH", "")} - - with open(ecl_config_path, encoding="utf-8") as f: - try: - config = yaml.safe_load(f) - except yaml.YAMLError as e: - raise ValueError(f"Failed parse: {ecl_config_path} as yaml") from e - ecl_install_path = config.get("eclrun_env", {}).get("PATH", "") - eclrun_env["PATH"] = eclrun_env["PATH"] + os.pathsep + ecl_install_path - +def _available_eclrun_versions( + simulator: Literal["eclipse", "e300"], env_vars: dict[str, str] +) -> list[str]: + eclrun_path = env_vars.get("ECLRUN_PATH", "") try: + eclrun_abspath = shutil.which(Path(eclrun_path) / "eclrun") + if eclrun_abspath is None: + return [] return ( subprocess.check_output( - ["eclrun", "--report-versions", simulator], - env=eclrun_env, + [ + eclrun_abspath, + simulator, + "--report-versions", + ], + env=env_vars, ) .decode("utf-8") .strip() diff --git a/src/ert/resources/forward_models/res/script/ecl100.py b/src/ert/resources/forward_models/res/script/ecl100.py deleted file mode 100755 index d7ceeb71b29..00000000000 --- a/src/ert/resources/forward_models/res/script/ecl100.py +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env python -import sys - -from ecl_config import Ecl100Config -from ecl_run import run - -if __name__ == "__main__": - config = Ecl100Config() - run(config, [arg for arg in sys.argv[1:] if len(arg) > 0]) diff --git a/src/ert/resources/forward_models/res/script/ecl100_config.yml b/src/ert/resources/forward_models/res/script/ecl100_config.yml deleted file mode 100644 index e84aab694d1..00000000000 --- a/src/ert/resources/forward_models/res/script/ecl100_config.yml +++ /dev/null @@ -1,14 +0,0 @@ -# NB: There are inter related dependencies between this file, the EclConfig -# class which essentially internalizes this file and the EclRun class -# which uses the EclConfig class. -# -# You can have a shared 'env' attribute which is an environment variable map -# which will be set before the simulator starts. If there are env settings in -# the simulator as well they will be merged with these common settings, the -# simulator specific take presedence. - -eclrun_env: - SLBSLS_LICENSE_FILE: something@yourcompany.com - ECLPATH: /path/to/ecl - PATH: /path/to/ecl/macros - LSB_JOBID: null diff --git a/src/ert/resources/forward_models/res/script/ecl300.py b/src/ert/resources/forward_models/res/script/ecl300.py deleted file mode 100755 index 480a30a27e5..00000000000 --- a/src/ert/resources/forward_models/res/script/ecl300.py +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env python -import sys - -from ecl_config import Ecl300Config -from ecl_run import run - -if __name__ == "__main__": - config = Ecl300Config() - run(config, [arg for arg in sys.argv[1:] if len(arg) > 0]) diff --git a/src/ert/resources/forward_models/res/script/ecl300_config.yml b/src/ert/resources/forward_models/res/script/ecl300_config.yml deleted file mode 100644 index 7c0b1c492c3..00000000000 --- a/src/ert/resources/forward_models/res/script/ecl300_config.yml +++ /dev/null @@ -1,8 +0,0 @@ -# This is an example configuration file for the local ecl300 installation, see -# the ecl100_config.yml file for more extensive documentation. - -eclrun_env: - SLBSLS_LICENSE_FILE: something@yourcompany.com - ECLPATH: /path/to/ecl - PATH: /path/to/ecl/macros - LSB_JOBID: null diff --git a/src/ert/resources/forward_models/res/script/ecl_config.py b/src/ert/resources/forward_models/res/script/ecl_config.py deleted file mode 100644 index b0bbeb9196c..00000000000 --- a/src/ert/resources/forward_models/res/script/ecl_config.py +++ /dev/null @@ -1,278 +0,0 @@ -import os -import re -import shutil -import subprocess -import sys -from pathlib import Path -from typing import Any - -import yaml - - -def re_getenv(match_obj): - match_str = match_obj.group(1) - variable = match_str[1:] - return os.getenv(variable, default=match_str) - - -def _replace_env(env: dict) -> dict: - """Small utility function will take a dict as input, and create a new - dictionary where all $VARIABLE in the values has been replaced with - getenv("VARIABLE"). Variables which are not recognized are left - unchanged.""" - new_env = {} - for key, value in env.items(): - new_env[key] = re.sub(r"(\$[A-Z0-9_]+)", re_getenv, value) - - return new_env - - -class Keys: - default_version: str = "default_version" - default: str = "default" - versions: str = "versions" - env: str = "env" - mpi: str = "mpi" - mpirun: str = "mpirun" - executable: str = "executable" - scalar: str = "scalar" - - -class Simulator: - """Small 'struct' with the config information for one simulator.""" - - def __init__( - self, - version: str, - executable: str, - env: dict[str, str], - mpirun: str | None = None, - ): - self.version: str = version - if not os.access(executable, os.X_OK): - raise OSError(f"The executable: '{executable}' can not be executed by user") - - self.executable: str = executable - self.env: dict[str, str] = env - self.mpirun: str | None = mpirun - self.name: str = "simulator" - - if mpirun is not None and not os.access(mpirun, os.X_OK): - raise OSError(f"The mpirun binary: '{mpirun}' is not executable by user") - - def __repr__(self) -> str: - mpistring: str = "" - if self.mpirun: - mpistring = " MPI" - return ( - f"{self.name}(version={self.version}, " - f"executable={self.executable}{mpistring})" - ) - - -class EclConfig: - """Represent the eclipse configuration at a site. - - The EclConfig class internalizes information of where the various eclipse - programs are installed on site, and which environment variables need to be - set before the simulator starts. The class is based on parsing a yaml - formatted configuration file, the source distribution contains commented - example file. - - """ - - def __init__(self, config_file: str, simulator_name: str = "not_set"): - with open(config_file, encoding="utf-8") as f: - try: - config = yaml.safe_load(f) - except yaml.YAMLError as e: - raise ValueError(f"Failed parse: {config_file} as yaml") from e - - self._config: dict = config - self._config_file: str = os.path.abspath(config_file) - self.simulator_name: str = simulator_name - - def __contains__(self, version: str) -> bool: - if version in self._config[Keys.versions]: - return True - - return self.default_version is not None and version in [None, Keys.default] - - def get_eclrun_env(self) -> dict[str, str] | None: - if "eclrun_env" in self._config: - return self._config["eclrun_env"].copy() - return None - - @property - def default_version(self) -> str | None: - return self._config.get(Keys.default_version) - - def _get_version(self, version_arg: str | None) -> str: - if version_arg in [None, Keys.default]: - version = self.default_version - else: - version = version_arg - - if version is None: - raise ValueError( - "The default version has not been " - f"set in the config file:{self._config_file}" - ) - - return version - - def _get_env(self, version: str, exe_type: str) -> dict[str, str]: - env: dict[str, str] = {} - env.update(self._config.get(Keys.env, {})) - - mpi_sim: dict[str, Any] = self._config[Keys.versions][ - self._get_version(version) - ][exe_type] - env.update(mpi_sim.get(Keys.env, {})) - - return _replace_env(env) - - def _get_sim(self, version: str | None, exe_type: str) -> Simulator: - version = self._get_version(version) - binaries: dict[str, str] = self._config[Keys.versions][version][exe_type] - mpirun = binaries[Keys.mpirun] if exe_type == Keys.mpi else None - return Simulator( - version, - binaries[Keys.executable], - self._get_env(version, exe_type), - mpirun=mpirun, - ) - - def sim(self, version: str | None = None) -> Simulator: - """Will return an object describing the simulator. - - Available attributes are 'executable' and 'env'. Observe that the - executable path is validated when you instantiate the Simulator object; - so if the executable key in the config file points to non-existing file - you will not get the error before this point. - """ - return self._get_sim(version, Keys.scalar) - - def mpi_sim(self, version: str | None = None) -> Simulator: - """MPI version of method sim()""" - return self._get_sim(version, Keys.mpi) - - def simulators(self, strict: bool = True) -> list[Simulator]: - simulators = [] - for version in self._config[Keys.versions]: - for exe_type in self._config[Keys.versions][version]: - if strict: - sim = self._get_sim(version, exe_type) - else: - try: - sim = self._get_sim(version, exe_type) - except OSError: - sys.stderr.write( - "Failed to create simulator object for: " - f"version:{version} {exe_type}\n" - ) - sim = None - - if sim: - simulators.append(sim) - return simulators - - -class Ecl100Config(EclConfig): - DEFAULT_CONFIG_FILE: str = os.path.join( - os.path.dirname(__file__), "ecl100_config.yml" - ) - - def __init__(self): - config_file = os.getenv("ECL100_SITE_CONFIG", default=self.DEFAULT_CONFIG_FILE) - super().__init__(config_file, simulator_name="eclipse") - - -class Ecl300Config(EclConfig): - DEFAULT_CONFIG_FILE: str = os.path.join( - os.path.dirname(__file__), "ecl300_config.yml" - ) - - def __init__(self): - config_file = os.getenv("ECL300_SITE_CONFIG", default=self.DEFAULT_CONFIG_FILE) - super().__init__(config_file, simulator_name="e300") - - -class FlowConfig(EclConfig): - DEFAULT_CONFIG_FILE: str = os.path.join( - os.path.dirname(__file__), "flow_config.yml" - ) - - def __init__(self): - config_file = os.getenv("FLOW_SITE_CONFIG", default=self.DEFAULT_CONFIG_FILE) - if not Path(config_file).exists(): - config_file = self.init_flow_config() - super().__init__(config_file, simulator_name="flow") - - @staticmethod - def init_flow_config() -> str: - binary_path = shutil.which("flow") - if binary_path is None: - raise FileNotFoundError( - "Could not find flow executable!\n" - " Requires flow to be installed in $PATH" - ) - - conf = { - "default_version": "default", - "versions": {"default": {"scalar": {"executable": binary_path}}}, - } - flow_config_yml = Path("flow_config.yml") - flow_config_yml.write_text(yaml.dump(conf), encoding="utf-8") - return str(flow_config_yml.absolute()) - - -class EclrunConfig: - """This class contains configurations for using the new eclrun binary - for running eclipse. It uses the old configurations classes above to - get the configuration in the ECLX00_SITE_CONFIG files. - """ - - def __init__(self, config: EclConfig, version: str): - self.simulator_name: str = config.simulator_name - self.run_env: dict[str, str] | None = self._get_run_env(config.get_eclrun_env()) - self.version: str = version - - @staticmethod - def _get_run_env(eclrun_env: dict[str, str] | None) -> dict[str, str] | None: - if eclrun_env is None: - return None - - env: dict = os.environ.copy() - if "PATH" in eclrun_env: - env["PATH"] = eclrun_env["PATH"] + os.pathsep + env["PATH"] - eclrun_env.pop("PATH") - - for key, value in eclrun_env.copy().items(): - if value is None: - if key in env: - env.pop(key) - eclrun_env.pop(key) - - env.update(eclrun_env) - return env - - def _get_available_eclrun_versions(self) -> list[str]: - try: - return ( - subprocess.check_output( - ["eclrun", "--report-versions", self.simulator_name], - env=self.run_env, - ) - .decode("utf-8") - .strip() - .split(" ") - ) - except subprocess.CalledProcessError: - return [] - - def can_use_eclrun(self) -> bool: - if self.run_env is None: - return False - - return self.version in self._get_available_eclrun_versions() diff --git a/src/ert/resources/forward_models/res/script/ecl_run.py b/src/ert/resources/forward_models/res/script/ecl_run.py deleted file mode 100644 index 561b920a165..00000000000 --- a/src/ert/resources/forward_models/res/script/ecl_run.py +++ /dev/null @@ -1,588 +0,0 @@ -import datetime -import glob -import os -import os.path -import re -import socket -import subprocess -import sys -import time -from argparse import ArgumentParser -from collections import namedtuple -from contextlib import contextmanager -from pathlib import Path -from random import random - -import resfo -from ecl_config import EclConfig, EclrunConfig, Simulator - - -def ecl_output_has_license_error(ecl_output: str): - return ( - "LICENSE ERROR" in ecl_output - or "LICENSE FAILURE" in ecl_output - or "not allowed in license" in ecl_output - ) - - -class EclError(RuntimeError): - def failed_due_to_license_problems(self) -> bool: - # self.args[0] contains the multiline ERROR messages and SLAVE startup messages - if ecl_output_has_license_error(self.args[0]): - return True - if re.search(a_slave_failed_pattern, self.args[0]): - for match in re.finditer(slave_run_paths, self.args[0], re.MULTILINE): - (ecl_case_starts_with, ecl_case_dir) = match.groups() - for prt_file in glob.glob( - f"{ecl_case_dir}/{ecl_case_starts_with}*.PRT" - ): - if ecl_output_has_license_error( - Path(prt_file).read_text(encoding="utf-8") - ): - return True - return False - - -EclipseResult = namedtuple("EclipseResult", "errors bugs") -body_sub_pattern = r"(\s^\s@.+$)*" -date_sub_pattern = r"\s+AT TIME\s+(?P\d+\.\d+)\s+DAYS\s+\((?P(.+)):\s*$" -error_pattern_e100 = ( - rf"^\s@-- ERROR\s(FROM PROCESSOR \d+)?{date_sub_pattern}${body_sub_pattern}" -) -error_pattern_e300 = rf"^\s@--Error${body_sub_pattern}" -slave_started_pattern = ( - rf"^\s@--MESSAGE{date_sub_pattern}\s^\s@\s+STARTING SLAVE.+${body_sub_pattern}" -) -a_slave_failed_pattern = r"\s@\s+SLAVE RUN.*HAS STOPPED WITH AN ERROR CONDITION.\s*" -slave_run_paths = r"^\s@\s+STARTING SLAVE\s+[^ ]+RUNNING \([^ ]\)\s*$" -slave_run_paths = r"\s@\s+STARTING SLAVE .* RUNNING (\w+)\s*^\s@\s+ON HOST.*IN DIRECTORY\s*^\s@\s+(.*)" - - -def make_LSB_MCPU_machine_list(LSB_MCPU_HOSTS): - host_numcpu_list = LSB_MCPU_HOSTS.split() - host_list = [] - for index in range(len(host_numcpu_list) // 2): - machine = host_numcpu_list[2 * index] - host_numcpu = int(host_numcpu_list[2 * index + 1]) - for _ in range(host_numcpu): - host_list.append(machine) - return host_list - - -def _expand_SLURM_range(rs): - if "-" in rs: - tmp = rs.split("-") - return range(int(tmp[0]), int(tmp[1]) + 1) - else: - return [int(rs)] - - -def _expand_SLURM_node(node_string): - match_object = re.match(r"(?P[^[]+)\[(?P[-0-9,]+)\]", node_string) - if match_object: - node_list = [] - base = match_object.groupdict()["base"] - range_string = match_object.groupdict()["range"] - for rs in range_string.split(","): - for num in _expand_SLURM_range(rs): - node_list.append(f"{base}{num}") - return node_list - else: - return [node_string] - - -def _expand_SLURM_task_count(task_count_string): - match_object = re.match(r"(?P\d+)(\(x(?P\d+)\))?", task_count_string) - if match_object: - match_dict = match_object.groupdict() - print(match_dict) - count = int(match_dict["count"]) - mult_string = match_dict["mult"] - mult = 1 if mult_string is None else int(mult_string) - - return [count] * mult - else: - raise ValueError(f"Failed to parse SLURM_TASKS_PER_NODE: {task_count_string}") - - -# The list of available machines/nodes and how many tasks each node should get -# is available in the slurm environment variables SLURM_JOB_NODELIST and -# SLURM_TASKS_PER_NODE. These string variables are in an incredibly compact -# notation, and there are some hoops to expand them. The short description is: -# -# 1. They represent flat lists of hostnames and the number of cpu's on that -# host respectively. -# -# 2. The outer structure is a ',' separated lis. -# -# 3. The items in SLURM_JOB_NODELIST have a compact notation -# base-[n1-n2,n3-n4] which is expanded to the nodelist: [base-n1, -# base-n1+1, base-n1+2, ... , base-n4-1, base-n4] -# -# 4. The SLURM_TASK_PER_NODE items has the compact notation 3(x4) which -# implies that four consecutive nodes (from the expanded -# SLURM_JOB_NODELIST) should have three CPUs each. -# -# For further details see the sbatch manual page. - - -def make_SLURM_machine_list(SLURM_JOB_NODELIST, SLURM_TASKS_PER_NODE): - # We split on ',' - but not on ',' which is inside a [...] - split_re = ",(?![^[]*\\])" - nodelist = [] - for node_string in re.split(split_re, SLURM_JOB_NODELIST): - nodelist += _expand_SLURM_node(node_string) - - task_count_list = [] - for task_count_string in SLURM_TASKS_PER_NODE.split(","): - task_count_list += _expand_SLURM_task_count(task_count_string) - - host_list = [] - for node, count in zip(nodelist, task_count_list, strict=False): - host_list += [node] * count - - return host_list - - -def make_LSB_machine_list(LSB_HOSTS): - return LSB_HOSTS.split() - - -@contextmanager -def pushd(run_path): - starting_directory = os.getcwd() - os.chdir(run_path) - yield - os.chdir(starting_directory) - - -def find_unsmry(basepath: Path) -> Path | None: - def _is_unsmry(base: str, path: str) -> bool: - if "." not in path: - return False - splitted = path.split(".") - return splitted[-2].endswith(base) and splitted[-1].lower() in [ - "unsmry", - "funsmry", - ] - - base = basepath.name - candidates: list[str] = list( - filter(lambda x: _is_unsmry(base, x), glob.glob(str(basepath) + "*")) - ) - if not candidates: - return None - if len(candidates) > 1: - raise ValueError( - f"Ambiguous reference to unsmry in {basepath}, could be any of {candidates}" - ) - return Path(candidates[0]) - - -def await_completed_unsmry_file( - smry_path: Path, max_wait: float = 15, poll_interval: float = 1.0 -) -> float: - """This function will wait until the provided smry file does not grow in size - during one poll interval. - - Such a wait is sometimes needed when different MPI hosts write data to a shared - disk system. - - If the file does not exist or is completely unreadable to resfo, this function - will timeout to max_wait. If NOSIM is included, this will happen. - - Size is defined in terms of readable data elementes through resfo. - - This function will always wait for at least one poll interval, the polling - interval is specified in seconds. - - The return value is the waited time (in seconds)""" - start_time = datetime.datetime.now() - prev_len = 0 - while (datetime.datetime.now() - start_time).total_seconds() < max_wait: - try: - resfo_sum = [r.read_keyword() for r in resfo.lazy_read(smry_path)] - except Exception: - time.sleep(poll_interval) - continue - - current_len = len(resfo_sum) - if prev_len == current_len: - # smry file is regarded complete - break - else: - prev_len = max(prev_len, current_len) - - time.sleep(poll_interval) - - return (datetime.datetime.now() - start_time).total_seconds() - - -class EclRun: - """Wrapper class to run Eclipse simulations. - - The EclRun class is a small wrapper class which is used to run Eclipse - simulations. It will load a configuration, i.e. where the binary is - installed and so on, from an instance of the EclConfig class. - - The main method is the runEclipse() method which will: - - 1. Set up redirection of the stdxxx file descriptors. - 2. Set the necessary environment variables. - 3. [MPI]: Create machine_file listing the nodes which should be used. - 4. fork+exec to actually run the Eclipse binary. - 5. Parse the output .PRT / .ECLEND file to check for errors. - - If the simulation fails the runEclipse() method will raise an exception. - - The class is called EclRun, and the main focus has been on running Eclipse - simulations, but it should also handle "eclipse-like" simulators, e.g. the - simulator OPM/flow. - - To actually create an executable script based on this class could in it's - simplest form be: - - #!/usr/bin/env python - import sys - from .ecl_run import EclRun - - run = EclRun() - run.runEclipse( ) - - - """ - - def __init__( - self, - ecl_case: str, - sim: Simulator, - num_cpu: int = 1, - check_status: bool = True, - summary_conversion: bool = False, - ): - self.sim = sim - self.check_status = check_status - self.num_cpu = int(num_cpu) - self.summary_conversion = summary_conversion - - # Dechipher the ecl_case argument. - input_arg = ecl_case - (_, ext) = os.path.splitext(input_arg) - if ext and ext in [".data", ".DATA"]: - data_file = input_arg - elif input_arg.islower(): - data_file = input_arg + ".data" - else: - data_file = input_arg + ".DATA" - - if not os.path.isfile(data_file): - raise OSError(f"No such file: {data_file}") - - (self.run_path, self.data_file) = os.path.split(data_file) - (self.base_name, ext) = os.path.splitext(self.data_file) - - if self.run_path is None: - self.run_path = os.getcwd() - else: - self.run_path = os.path.abspath(self.run_path) - - def runPath(self): - return self.run_path - - def baseName(self): - return self.base_name - - @property - def prt_path(self): - return Path(self.run_path) / (self.baseName() + ".PRT") - - def numCpu(self): - return self.num_cpu - - def _get_legacy_run_env(self): - my_env = os.environ.copy() - my_env.update(self.sim.env.items()) - return my_env - - def initMPI(self): - # If the environment variable LSB_MCPU_HOSTS is set we assume the job is - # running on LSF - otherwise we assume it is running on the current host. - # - # If the LSB_MCPU_HOSTS variable is indeed set it will be a string like this: - # - # host1 num_cpu1 host2 num_cpu2 ... - # - # i.e. an alternating list of hostname & number of - # cpu. Alternatively/in addition the environment variable - # LSB_HOSTS can be used. This variable is simply: - # - # host1 host1 host2 host3 - - LSB_MCPU_HOSTS = os.getenv("LSB_MCPU_HOSTS") - LSB_HOSTS = os.getenv("LSB_HOSTS") - - if LSB_MCPU_HOSTS or LSB_HOSTS: - LSB_MCPU_machine_list = make_LSB_MCPU_machine_list(LSB_MCPU_HOSTS) - LSB_machine_list = make_LSB_machine_list(LSB_HOSTS) - - if len(LSB_MCPU_machine_list) == self.num_cpu: - machine_list = LSB_MCPU_machine_list - elif len(LSB_machine_list) == self.num_cpu: - machine_list = LSB_machine_list - else: - raise EclError( - "LSF / MPI problems. " - f"Asked for:{self.num_cpu} cpu. " - f'LSB_MCPU_HOSTS: "{LSB_MCPU_HOSTS}" LSB_HOSTS: "{LSB_HOSTS}"' - ) - elif os.getenv("SLURM_JOB_NODELIST"): - machine_list = make_SLURM_machine_list( - os.getenv("SLURM_JOB_NODELIST"), os.getenv("SLURM_TASKS_PER_NODE") - ) - if len(machine_list) != self.num_cpu: - raise EclError( - f"SLURM / MPI problems - asked for {self.num_cpu} - " - f"got {len(machine_list)} nodes" - ) - else: - localhost = socket.gethostname() - machine_list = [localhost] * self.num_cpu - - self.machine_file = f"{self.base_name}.mpi" - with open(self.machine_file, "w", encoding="utf-8") as filehandle: - for host in machine_list: - filehandle.write(f"{host}\n") - - def _get_run_command(self, eclrun_config: EclrunConfig): - summary_conversion = "yes" if self.summary_conversion else "no" - return [ - "eclrun", - "-v", - eclrun_config.version, - eclrun_config.simulator_name, - f"{self.base_name}.DATA", - "--summary-conversion", - summary_conversion, - ] - - def _get_legacy_run_command(self): - if self.num_cpu == 1: - return [self.sim.executable, self.base_name] - else: - self.initMPI() - return [ - self.sim.mpirun, - "-machinefile", - self.machine_file, - "-np", - str(self.num_cpu), - self.sim.executable, - self.base_name, - ] - - def execEclipse(self, eclrun_config=None) -> int: - use_eclrun = eclrun_config is not None - - with pushd(self.run_path): - if not os.path.exists(self.data_file): - raise OSError(f"Can not find data_file:{self.data_file}") - if not os.access(self.data_file, os.R_OK): - raise OSError(f"Can not read data file:{self.data_file}") - - command = ( - self._get_run_command(eclrun_config) - if use_eclrun - else self._get_legacy_run_command() - ) - env = eclrun_config.run_env if use_eclrun else self._get_legacy_run_env() - - return subprocess.run( - command, - env=env, - check=False, - ).returncode - - LICENSE_FAILURE_RETRY_INITIAL_SLEEP = 90 - LICENSE_RETRY_STAGGER_FACTOR = 60 - LICENSE_RETRY_BACKOFF_EXPONENT = 3 - - def runEclipse(self, eclrun_config=None, retries_left=3, backoff_sleep=None): - backoff_sleep = ( - self.LICENSE_FAILURE_RETRY_INITIAL_SLEEP - if backoff_sleep is None - else backoff_sleep - ) - return_code = self.execEclipse(eclrun_config=eclrun_config) - - OK_file = os.path.join(self.run_path, f"{self.base_name}.OK") - if not self.check_status: - with open(OK_file, "w", encoding="utf-8") as f: - f.write("ECLIPSE simulation complete - NOT checked for errors.") - else: - if return_code != 0: - command = ( - self._get_run_command(eclrun_config) - if self.sim is None - else self._get_legacy_run_command() - ) - raise subprocess.CalledProcessError(return_code, command) - - try: - self.assertECLEND() - except EclError as err: - if err.failed_due_to_license_problems() and retries_left > 0: - time_to_wait = backoff_sleep + int( - random() * self.LICENSE_RETRY_STAGGER_FACTOR - ) - sys.stderr.write( - "ECLIPSE failed due to license failure " - f"retrying in {time_to_wait} seconds\n" - ) - time.sleep(time_to_wait) - self.runEclipse( - eclrun_config, - retries_left=retries_left - 1, - backoff_sleep=int( - backoff_sleep * self.LICENSE_RETRY_BACKOFF_EXPONENT - ), - ) - return - else: - raise err from None - if self.num_cpu > 1: - await_completed_unsmry_file( - find_unsmry(Path(self.run_path) / self.base_name) - ) - - with open(OK_file, "w", encoding="utf-8") as f: - f.write("ECLIPSE simulation OK") - - def assertECLEND(self): - tail_length = 5000 - result = self.readECLEND() - if result.errors > 0: - error_list = self.parseErrors() - sep = "\n\n...\n\n" - error_and_slave_msg = sep.join(error_list) - extra_message = "" - error_messages = [ - error for error in error_list if not "STARTING SLAVE" in str(error) - ] - if result.errors != len(error_messages): - extra_message = ( - f"\n\nWarning, mismatch between stated Error count ({result.errors}) " - f"and number of ERROR messages found in PRT ({len(error_messages)})." - f"\n\nTail ({tail_length} bytes) of PRT-file {self.prt_path}:\n\n" - ) + tail_textfile(self.prt_path, 5000) - - raise EclError( - "Eclipse simulation failed with:" - f"{result.errors:d} errors:\n\n{error_and_slave_msg}{extra_message}" - ) - - if result.bugs > 0: - raise EclError(f"Eclipse simulation failed with:{result.bugs:d} bugs") - - def readECLEND(self): - error_regexp = re.compile(r"^\s*Errors\s+(\d+)\s*$") - bug_regexp = re.compile(r"^\s*Bugs\s+(\d+)\s*$") - - report_file = os.path.join(self.run_path, f"{self.base_name}.ECLEND") - if not os.path.isfile(report_file): - report_file = self.prt_path - - errors = None - bugs = None - with open(report_file, encoding="utf-8") as filehandle: - for line in filehandle.readlines(): - error_match = re.match(error_regexp, line) - if error_match: - errors = int(error_match.group(1)) - - bug_match = re.match(bug_regexp, line) - if bug_match: - bugs = int(bug_match.group(1)) - if errors is None: - raise ValueError(f"Could not read errors from {report_file}") - if bugs is None: - raise ValueError(f"Could not read bugs from {report_file}") - - return EclipseResult(errors=errors, bugs=bugs) - - def parseErrors(self) -> list[str]: - """Extract multiline ERROR messages from the PRT file""" - error_list = [] - error_e100_regexp = re.compile(error_pattern_e100, re.MULTILINE) - error_e300_regexp = re.compile(error_pattern_e300, re.MULTILINE) - slave_started_regexp = re.compile(slave_started_pattern, re.MULTILINE) - with open(self.prt_path, encoding="utf-8") as filehandle: - content = filehandle.read() - - for regexp in [error_e100_regexp, error_e300_regexp, slave_started_regexp]: - offset = 0 - while True: - match = regexp.search(content[offset:]) - if match: - error_list.append( - content[offset + match.start() : offset + match.end()] - ) - offset += match.end() - else: - break - - return error_list - - -def run(config: EclConfig, argv): - parser = ArgumentParser() - parser.add_argument("ecl_case") - parser.add_argument("-v", "--version", dest="version", type=str) - parser.add_argument("-n", "--num-cpu", dest="num_cpu", type=int, default=1) - parser.add_argument( - "-i", "--ignore-errors", dest="ignore_errors", action="store_true" - ) - parser.add_argument( - "--summary-conversion", dest="summary_conversion", action="store_true" - ) - - options = parser.parse_args(argv) - - try: - eclrun_config = EclrunConfig(config, options.version) - if eclrun_config.can_use_eclrun(): - run = EclRun( - options.ecl_case, - None, - num_cpu=options.num_cpu, - check_status=not options.ignore_errors, - summary_conversion=options.summary_conversion, - ) - run.runEclipse(eclrun_config=eclrun_config) - else: - if options.num_cpu > 1: - sim = config.mpi_sim(version=options.version) - else: - sim = config.sim(version=options.version) - - run = EclRun( - options.ecl_case, - sim, - num_cpu=options.num_cpu, - check_status=not options.ignore_errors, - summary_conversion=options.summary_conversion, - ) - run.runEclipse() - except EclError as msg: - print(msg, file=sys.stderr) - sys.exit(-1) - - -def tail_textfile(file_path: Path, num_chars: int) -> str: - if not file_path.exists(): - return f"No output file {file_path}" - with open(file_path, encoding="utf-8") as file: - file.seek(0, 2) - file_end_position = file.tell() - seek_position = max(0, file_end_position - num_chars) - file.seek(seek_position) - return file.read()[-num_chars:] diff --git a/src/ert/resources/forward_models/res/script/flow.py b/src/ert/resources/forward_models/res/script/flow.py deleted file mode 100755 index 26f29245b50..00000000000 --- a/src/ert/resources/forward_models/res/script/flow.py +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env python -import sys - -from ecl_config import FlowConfig -from ecl_run import run - -if __name__ == "__main__": - config = FlowConfig() - run(config, [arg for arg in sys.argv[1:] if len(arg) > 0]) diff --git a/src/ert/resources/forward_models/run_reservoirsimulator.py b/src/ert/resources/forward_models/run_reservoirsimulator.py new file mode 100755 index 00000000000..f6781da5d07 --- /dev/null +++ b/src/ert/resources/forward_models/run_reservoirsimulator.py @@ -0,0 +1,430 @@ +#!/usr/bin/env python +import datetime +import glob +import os +import os.path +import re +import shutil +import subprocess +import sys +import time +from argparse import ArgumentParser +from collections import namedtuple +from pathlib import Path +from random import random +from typing import Literal, get_args + +import resfo + + +def ecl_output_has_license_error(ecl_output: str) -> bool: + return ( + "LICENSE ERROR" in ecl_output + or "LICENSE FAILURE" in ecl_output + or "not allowed in license" in ecl_output + ) + + +class EclError(RuntimeError): + def failed_due_to_license_problems(self) -> bool: + # self.args[0] contains the multiline ERROR messages and SLAVE startup messages + if ecl_output_has_license_error(self.args[0]): + return True + if re.search(a_slave_failed_pattern, self.args[0]): + for match in re.finditer(slave_run_paths, self.args[0], re.MULTILINE): + (ecl_case_starts_with, ecl_case_dir) = match.groups() + for prt_file in glob.glob( + f"{ecl_case_dir}/{ecl_case_starts_with}*.PRT" + ): + if ecl_output_has_license_error( + Path(prt_file).read_text(encoding="utf-8") + ): + return True + return False + + +Simulators = Literal["flow", "eclipse", "e300"] +EclipseResult = namedtuple("EclipseResult", "errors bugs") +body_sub_pattern = r"(\s^\s@.+$)*" +date_sub_pattern = r"\s+AT TIME\s+(?P\d+\.\d+)\s+DAYS\s+\((?P(.+)):\s*$" +error_pattern_e100 = ( + rf"^\s@-- ERROR\s(FROM PROCESSOR \d+)?{date_sub_pattern}${body_sub_pattern}" +) +error_pattern_e300 = rf"^\s@--Error${body_sub_pattern}" +slave_started_pattern = ( + rf"^\s@--MESSAGE{date_sub_pattern}\s^\s@\s+STARTING SLAVE.+${body_sub_pattern}" +) +a_slave_failed_pattern = r"\s@\s+SLAVE RUN.*HAS STOPPED WITH AN ERROR CONDITION.\s*" +slave_run_paths = r"^\s@\s+STARTING SLAVE\s+[^ ]+RUNNING \([^ ]\)\s*$" +slave_run_paths = r"\s@\s+STARTING SLAVE .* RUNNING (\w+)\s*^\s@\s+ON HOST.*IN DIRECTORY\s*^\s@\s+(.*)" + + +def find_unsmry(basepath: Path) -> Path | None: + def _is_unsmry(base: str, path: str) -> bool: + if "." not in path: + return False + splitted = path.split(".") + return splitted[-2].endswith(base) and splitted[-1].lower() in [ + "unsmry", + "funsmry", + ] + + base = basepath.name + candidates: list[str] = list( + filter(lambda x: _is_unsmry(base, x), glob.glob(str(basepath) + "*")) + ) + if not candidates: + return None + if len(candidates) > 1: + raise ValueError( + f"Ambiguous reference to unsmry in {basepath}, could be any of {candidates}" + ) + return Path(candidates[0]) + + +def await_completed_unsmry_file( + smry_path: Path, max_wait: float = 15, poll_interval: float = 1.0 +) -> float: + """This function will wait until the provided smry file does not grow in size + during one poll interval. + + Such a wait is sometimes needed when different MPI hosts write data to a shared + disk system. + + If the file does not exist or is completely unreadable to resfo, this function + will timeout to max_wait. If NOSIM is included, this will happen. + + Size is defined in terms of readable data elementes through resfo. + + This function will always wait for at least one poll interval, the polling + interval is specified in seconds. + + The return value is the waited time (in seconds)""" + start_time = datetime.datetime.now() + prev_len = 0 + while (datetime.datetime.now() - start_time).total_seconds() < max_wait: + try: + resfo_sum = [r.read_keyword() for r in resfo.lazy_read(smry_path)] + except Exception: + time.sleep(poll_interval) + continue + + current_len = len(resfo_sum) + if prev_len == current_len: + # smry file is regarded complete + break + else: + prev_len = max(prev_len, current_len) + + time.sleep(poll_interval) + + return (datetime.datetime.now() - start_time).total_seconds() + + +class RunReservoirSimulator: + """Wrapper class to run system installed `eclrun` or `flowrun`. + + Will initiate a limited number of reruns if license errors are detected, also + in coupled simulations (not relevant for Flow) + + By default, the class will check PRT/ECLEND files for errors after the run, and + raise exceptions accordingly. + + For eclrun, the request to do summary_conversion can be forwarded. + """ + + def __init__( + self, + simulator: Simulators, + version: str | None, + ecl_case: Path | str, + num_cpu: int = 1, + check_status: bool = True, + summary_conversion: bool = False, + ): + if simulator not in get_args(Simulators): + raise ValueError( + f"Unknown simulator '{simulator}', pick from {get_args(Simulators)}" + ) + self.simulator = simulator + self.version: str | None = version + + self.num_cpu: int = int(num_cpu) + self.check_status: bool = check_status + self.summary_conversion: bool = summary_conversion + + self.bypass_flowrun: bool = False + + runner_abspath: str | Path | None = None + if simulator in ["eclipse", "e300"]: + eclrun_path: str = os.environ.get("ECLRUN_PATH", "") + runner_abspath = shutil.which(Path(eclrun_path) / "eclrun") + if runner_abspath is None: + raise RuntimeError("eclrun not installed") + else: + flowrun_path: str = os.environ.get("FLOWRUN_PATH", "") + runner_abspath = shutil.which(Path(flowrun_path) / "flowrun") + if runner_abspath is None: + runner_abspath = shutil.which("flow") + if runner_abspath is None: + raise RuntimeError("flowrun or flow not installed") + else: + if self.num_cpu > 1: + raise RuntimeError( + "MPI runs not supported without a flowrun wrapper" + ) + self.bypass_flowrun = True + self.runner_abspath: str = str(runner_abspath) + + data_file = ecl_case_to_data_file(Path(ecl_case)) + if not Path(data_file).exists(): + raise OSError(f"No such file: {data_file}") + + self.run_path: Path = Path(data_file).parent.absolute() + self.data_file: str = Path(data_file).name + self.base_name: str = Path(data_file).stem + + @property + def prt_path(self) -> Path: + return self.run_path / (self.base_name + ".PRT") + + @property + def eclrun_command(self) -> list[str]: + return [ + self.runner_abspath, + self.simulator, + "--version", + str(self.version), + self.data_file, + "--summary-conversion", + "yes" if self.summary_conversion else "no", + ] + + @property + def flowrun_command(self) -> list[str]: + if self.bypass_flowrun: + return [ + self.runner_abspath, + self.data_file, + ] + return [ + self.runner_abspath, + "--version", + str(self.version), + self.data_file, + "--np", + str(self.num_cpu), + ] + + LICENSE_FAILURE_RETRY_INITIAL_SLEEP = 90 + LICENSE_RETRY_STAGGER_FACTOR = 60 + LICENSE_RETRY_BACKOFF_EXPONENT = 3 + + def run_eclipseX00( + self, retries_left: int = 3, backoff_sleep: float | None = None + ) -> None: + # This function calls itself recursively in case of license failures + backoff_sleep = ( + self.LICENSE_FAILURE_RETRY_INITIAL_SLEEP + if backoff_sleep is None + else backoff_sleep + ) + return_code = subprocess.run(self.eclrun_command, check=False).returncode + + OK_file = self.run_path / f"{self.base_name}.OK" + if not self.check_status: + OK_file.write_text( + "ECLIPSE simulation complete - NOT checked for errors.", + encoding="utf-8", + ) + else: + if return_code != 0: + raise subprocess.CalledProcessError(return_code, self.eclrun_command) + try: + self.assert_eclend() + except EclError as err: + if err.failed_due_to_license_problems() and retries_left > 0: + time_to_wait = backoff_sleep + int( + random() * self.LICENSE_RETRY_STAGGER_FACTOR + ) + sys.stderr.write( + "ECLIPSE failed due to license failure " + f"retrying in {time_to_wait} seconds\n" + ) + time.sleep(time_to_wait) + self.run_eclipseX00( + retries_left=retries_left - 1, + backoff_sleep=int( + backoff_sleep * self.LICENSE_RETRY_BACKOFF_EXPONENT + ), + ) + return + else: + raise err from None + if self.num_cpu > 1: + smry_file = find_unsmry(self.run_path / self.base_name) + if smry_file is not None: + await_completed_unsmry_file(smry_file) + + OK_file.write_text("ECLIPSE simulation OK", encoding="utf-8") + + def run_flow(self) -> None: + return_code = subprocess.run(self.flowrun_command, check=False).returncode + OK_file = self.run_path / f"{self.base_name}.OK" + if not self.check_status: + OK_file.write_text( + "FLOW simulation complete - NOT checked for errors.", + encoding="utf-8", + ) + else: + if return_code != 0: + raise subprocess.CalledProcessError(return_code, self.flowrun_command) + self.assert_eclend() + if self.num_cpu > 1: + smry_file = find_unsmry(self.run_path / self.base_name) + if smry_file is not None: + await_completed_unsmry_file(smry_file) + + OK_file.write_text("FLOW simulation OK", encoding="utf-8") + + def assert_eclend(self) -> None: + tail_length = 5000 + result = self.read_eclend() + if result.errors > 0: + error_list = self.parse_errors() + sep = "\n\n...\n\n" + error_and_slave_msg = sep.join(error_list) + extra_message = "" + error_messages = [ + error for error in error_list if not "STARTING SLAVE" in str(error) + ] + if result.errors != len(error_messages): + extra_message = ( + f"\n\nWarning, mismatch between stated Error count ({result.errors}) " + f"and number of ERROR messages found in PRT ({len(error_messages)})." + f"\n\nTail ({tail_length} bytes) of PRT-file {self.prt_path}:\n\n" + ) + tail_textfile(self.prt_path, 5000) + + raise EclError( + "Eclipse simulation failed with:" + f"{result.errors:d} errors:\n\n{error_and_slave_msg}{extra_message}" + ) + + if result.bugs > 0: + raise EclError(f"Eclipse simulation failed with:{result.bugs:d} bugs") + + def read_eclend(self) -> EclipseResult: + error_regexp = re.compile(r"^\s*Errors\s+(\d+)\s*$") + bug_regexp = re.compile(r"^\s*Bugs\s+(\d+)\s*$") + + report_file = self.run_path / f"{self.base_name}.ECLEND" + if not report_file.is_file(): + report_file = self.prt_path + + errors = None + bugs = None + with open(report_file, encoding="utf-8") as filehandle: + for line in filehandle.readlines(): + error_match = re.match(error_regexp, line) + if error_match: + errors = int(error_match.group(1)) + + bug_match = re.match(bug_regexp, line) + if bug_match: + bugs = int(bug_match.group(1)) + if errors is None: + raise ValueError(f"Could not read errors from {report_file}") + if bugs is None: + raise ValueError(f"Could not read bugs from {report_file}") + + return EclipseResult(errors=errors, bugs=bugs) + + def parse_errors(self) -> list[str]: + """Extract multiline ERROR messages from the PRT file""" + error_list = [] + error_e100_regexp = re.compile(error_pattern_e100, re.MULTILINE) + error_e300_regexp = re.compile(error_pattern_e300, re.MULTILINE) + slave_started_regexp = re.compile(slave_started_pattern, re.MULTILINE) + + content = self.prt_path.read_text(encoding="utf-8") + + for regexp in [error_e100_regexp, error_e300_regexp, slave_started_regexp]: + offset = 0 + while True: + match = regexp.search(content[offset:]) + if match: + error_list.append( + content[offset + match.start() : offset + match.end()] + ) + offset += match.end() + else: + break + + return error_list + + +def tail_textfile(file_path: Path, num_chars: int) -> str: + if not file_path.exists(): + return f"No output file {file_path}" + with open(file_path, encoding="utf-8") as file: + file.seek(0, 2) + file_end_position = file.tell() + seek_position = max(0, file_end_position - num_chars) + file.seek(seek_position) + return file.read()[-num_chars:] + + +def run_reservoirsimulator(args: list[str]) -> None: + parser = ArgumentParser() + parser.add_argument("simulator", type=str, choices=["flow", "eclipse", "e300"]) + parser.add_argument("version", type=str) + parser.add_argument("ecl_case", type=str) + parser.add_argument("-n", "--num-cpu", dest="num_cpu", type=int, default=1) + parser.add_argument( + "-i", "--ignore-errors", dest="ignore_errors", action="store_true" + ) + parser.add_argument( + "--summary-conversion", dest="summary_conversion", action="store_true" + ) + + options = parser.parse_args(args) + + if options.summary_conversion and options.simulator == "flow": + raise RuntimeError("--summary-conversion is not available with simulator flow") + + try: + if options.simulator in ["eclipse", "e300"]: + RunReservoirSimulator( + options.simulator, + options.version, + options.ecl_case, + num_cpu=options.num_cpu, + check_status=not options.ignore_errors, + summary_conversion=options.summary_conversion, + ).run_eclipseX00() + else: + RunReservoirSimulator( + "flow", + options.version, + options.ecl_case, + num_cpu=options.num_cpu, + check_status=not options.ignore_errors, + ).run_flow() + + except EclError as msg: + print(msg, file=sys.stderr) + sys.exit(-1) + + +def ecl_case_to_data_file(ecl_case: Path) -> Path: + if ecl_case.suffix in [".data", ".DATA"]: + return ecl_case + elif str(ecl_case).islower(): + return Path(str(ecl_case) + ".data") + else: + return Path(str(ecl_case) + ".DATA") + + +if __name__ == "__main__": + non_empty_args = list(filter(None, sys.argv)) + run_reservoirsimulator(non_empty_args[1:]) diff --git a/tests/ert/unit_tests/config/test_forward_model.py b/tests/ert/unit_tests/config/test_forward_model.py index 849a86c8362..fdf252ae28a 100644 --- a/tests/ert/unit_tests/config/test_forward_model.py +++ b/tests/ert/unit_tests/config/test_forward_model.py @@ -5,7 +5,6 @@ import stat from pathlib import Path from textwrap import dedent -from unittest.mock import patch import pytest from hypothesis import given, settings @@ -26,23 +25,6 @@ from .config_dict_generator import config_generators -@pytest.fixture() -def mock_eclrun(): - with open("eclrun", "w", encoding="utf-8") as f: - f.write("""#!/usr/bin/env python\n\nprint("4 2 8")""") - os.chmod("eclrun", os.stat("eclrun").st_mode | stat.S_IEXEC) - old_path = os.environ["PATH"] - eclrun_path = shutil.which("eclrun") - if eclrun_path is None: - os.environ["PATH"] = old_path + os.pathsep + os.getcwd() - else: - os.environ["PATH"] = old_path.replace( - str(Path(eclrun_path).parent), os.getcwd() - ) - yield - os.environ["PATH"] = old_path - - @pytest.mark.usefixtures("use_tmpdir") def test_load_forward_model_raises_on_missing(): with pytest.raises(ConfigValidationError, match="No such file or directory"): @@ -575,85 +557,71 @@ def test_that_forward_model_with_different_token_kinds_are_added(): ] == [("job", 0), ("job", 1)] -@pytest.mark.parametrize("eclipse_v", ["100", "300"]) -def test_that_eclipse_jobs_require_version_field(eclipse_v): +@pytest.mark.parametrize("eclipse_v", ["ECLIPSE100", "ECLIPSE300"]) +def test_that_eclipse_fm_step_require_explicit_version(eclipse_v): with pytest.raises( ConfigValidationError, - match=f".*Forward model step ECLIPSE{eclipse_v} must be given a VERSION argument.*", + match=f".*Forward model step {eclipse_v} must be given a VERSION argument.*", ): _ = ErtConfig.with_plugins().from_file_contents( f""" NUM_REALIZATIONS 1 - FORWARD_MODEL ECLIPSE{eclipse_v} + FORWARD_MODEL {eclipse_v} """ ) -@pytest.mark.parametrize("eclipse_v", ["100", "300"]) +@pytest.mark.skipif(shutil.which("eclrun") is None, reason="eclrun is not in $PATH") +@pytest.mark.parametrize("eclipse_v", ["ECLIPSE100", "ECLIPSE300"]) @pytest.mark.usefixtures("use_tmpdir") -def test_that_eclipse_jobs_check_version(eclipse_v, mock_eclrun): - ecl100_config_file_name = "ecl100_config.yml" - ecl300_config_file_name = "ecl300_config.yml" - - ecl100_config_content = f"eclrun_env:\n PATH: {os.getcwd()}\n" - ecl300_config_content = f"eclrun_env:\n PATH: {os.getcwd()}\n" - ert_config_contents = ( - f"NUM_REALIZATIONS 1\nFORWARD_MODEL ECLIPSE{eclipse_v} (=1)\n" +def test_that_eclipse_fm_step_check_version_availability(eclipse_v): + config_file_name = "test.ert" + Path(config_file_name).write_text( + f"NUM_REALIZATIONS 1\nFORWARD_MODEL {eclipse_v}(=dummy)\n", + encoding="utf-8", ) + with pytest.raises( + ConfigValidationError, + match=rf".*Unavailable {eclipse_v} version dummy. Available versions: \[\'20.*", + ): + ErtConfig.with_plugins().from_file(config_file_name) - # Write config file + +@pytest.mark.parametrize("eclipse_v", ["ECLIPSE100", "ECLIPSE300"]) +@pytest.mark.usefixtures("use_tmpdir") +def test_that_we_can_point_to_a_custom_eclrun_when_checking_versions(eclipse_v): + eclrun_bin = Path("bin/eclrun") + eclrun_bin.parent.mkdir() + eclrun_bin.write_text("#!/bin/sh\necho 2036.1 2036.2 2037.1", encoding="utf-8") + eclrun_bin.chmod(eclrun_bin.stat().st_mode | stat.S_IEXEC) config_file_name = "test.ert" - Path(config_file_name).write_text(ert_config_contents, encoding="utf-8") - # Write ecl100_config file - Path(ecl100_config_file_name).write_text(ecl100_config_content, encoding="utf-8") - # Write ecl300_config file - Path(ecl300_config_file_name).write_text(ecl300_config_content, encoding="utf-8") - with patch( - "ert.plugins.hook_implementations.forward_model_steps.ErtPluginManager" - ) as mock: - instance = mock.return_value - instance.get_ecl100_config_path.return_value = ecl100_config_file_name - instance.get_ecl300_config_path.return_value = ecl300_config_file_name - with pytest.raises( - ConfigValidationError, - match=rf".*Unavailable ECLIPSE{eclipse_v} version 1 current supported versions \['4', '2', '8'\].*", - ): - _ = ErtConfig.with_plugins().from_file(config_file_name) + Path(config_file_name).write_text( + dedent( + f""" + NUM_REALIZATIONS 1 + SETENV ECLRUN_PATH {eclrun_bin.absolute().parent} + FORWARD_MODEL {eclipse_v}(=2034.1)""" + ), + encoding="utf-8", + ) + with pytest.raises( + ConfigValidationError, + match=rf".*Unavailable {eclipse_v} version 2034.1. Available versions: \[\'2036.1.*", + ): + ErtConfig.with_plugins().from_file(config_file_name) -@pytest.mark.skipif(shutil.which("eclrun") is not None, reason="eclrun is available") -@pytest.mark.parametrize("eclipse_v", ["100", "300"]) +@pytest.mark.skipif(shutil.which("eclrun") is not None, reason="eclrun is present") +@pytest.mark.parametrize("eclipse_v", ["ECLIPSE100", "ECLIPSE300"]) @pytest.mark.usefixtures("use_tmpdir") def test_that_no_error_thrown_when_checking_eclipse_version_and_eclrun_is_not_present( eclipse_v, ): _ = ErtConfig.with_plugins().from_file_contents( - f"NUM_REALIZATIONS 1\nFORWARD_MODEL ECLIPSE{eclipse_v} (=1)\n" + f"NUM_REALIZATIONS 1\nFORWARD_MODEL {eclipse_v}(=1)\n" ) -@pytest.mark.skipif(shutil.which("eclrun") is not None, reason="eclrun is available") -@pytest.mark.parametrize("eclipse_v", ["100", "300"]) -@pytest.mark.usefixtures("use_tmpdir") -def test_that_no_error_thrown_when_checking_eclipse_version_and_no_ecl_config_defined( - eclipse_v, mock_eclrun -): - ert_config_contents = ( - f"NUM_REALIZATIONS 1\nFORWARD_MODEL ECLIPSE{eclipse_v} (=1)\n" - ) - - # Write config file - config_file_name = "test.ert" - Path(config_file_name).write_text(ert_config_contents, encoding="utf-8") - with patch( - "ert.plugins.hook_implementations.forward_model_steps.ErtPluginManager" - ) as mock: - instance = mock.return_value - instance.get_ecl100_config_path.return_value = None - instance.get_ecl300_config_path.return_value = None - _ = ErtConfig.with_plugins().from_file(config_file_name) - - def test_that_plugin_forward_models_are_installed(tmp_path): (tmp_path / "test.ert").write_text( dedent( @@ -900,7 +868,7 @@ def test_that_plugin_forward_model_raises_pre_experiment_validation_error_early( ): (tmp_path / "test.ert").write_text( """ - NUM_REALIZATIONS 1 + NUM_REALIZATIONS 1 FORWARD_MODEL FM1(=never,=world,=derpyderp) FORWARD_MODEL FM2 """ diff --git a/tests/ert/unit_tests/resources/test_ecl_versioning_config.py b/tests/ert/unit_tests/resources/test_ecl_versioning_config.py deleted file mode 100644 index 9bed502156a..00000000000 --- a/tests/ert/unit_tests/resources/test_ecl_versioning_config.py +++ /dev/null @@ -1,203 +0,0 @@ -import inspect -import os -import stat - -import pytest -import yaml - -from tests.ert.utils import SOURCE_DIR - -from ._import_from_location import import_from_location - -# import ecl_config and ecl_run.py from ert/forward_models/res/script -# package-data path which. These are kept out of the ert package to avoid the -# overhead of importing ert. This is necessary as these may be invoked as a -# subprocess on each realization. - - -ecl_config = import_from_location( - "ecl_config", - os.path.join( - SOURCE_DIR, "src/ert/resources/forward_models/res/script/ecl_config.py" - ), -) - -ecl_run = import_from_location( - "ecl_run", - os.path.join(SOURCE_DIR, "src/ert/resources/forward_models/res/script/ecl_run.py"), -) - - -@pytest.mark.usefixtures("use_tmpdir") -def test_loading_of_eclipse_configurations(monkeypatch): - source_file = inspect.getsourcefile(ecl_config.Ecl100Config) - assert source_file is not None - ecl_config_path = os.path.dirname(source_file) - monkeypatch.setenv("ECL100_SITE_CONFIG", "file/does/not/exist") - with pytest.raises(OSError): - conf = ecl_config.Ecl100Config() - - monkeypatch.setenv( - "ECL100_SITE_CONFIG", - os.path.join(ecl_config_path, "ecl100_config.yml"), - ) - conf = ecl_config.Ecl100Config() - with open("file.yml", "w", encoding="utf-8") as f: - f.write("this:\n -should\n-be\ninvalid:yaml?") - - monkeypatch.setenv("ECL100_SITE_CONFIG", "file.yml") - with pytest.raises(ValueError, match="Failed parse: file.yml as yaml"): - conf = ecl_config.Ecl100Config() - - scalar_exe = "bin/scalar_exe" - mpi_exe = "bin/mpi_exe" - mpi_run = "bin/mpi_run" - - os.mkdir("bin") - for f in ["scalar_exe", "mpi_exe", "mpi_run"]: - fname = os.path.join("bin", f) - with open(fname, "w", encoding="utf-8") as fh: - fh.write("This is an exectable ...") - - os.chmod(fname, stat.S_IEXEC) - - intel_path = "intel" - monkeypatch.setenv("ENV1", "A") - monkeypatch.setenv("ENV2", "C") - mocked_simulator_config = { - ecl_config.Keys.env: {"LICENSE_SERVER": "license@company.com"}, - ecl_config.Keys.versions: { - "2015": { - ecl_config.Keys.scalar: {ecl_config.Keys.executable: scalar_exe}, - ecl_config.Keys.mpi: { - ecl_config.Keys.executable: mpi_exe, - ecl_config.Keys.mpirun: mpi_run, - ecl_config.Keys.env: { - "I_MPI_ROOT": "$ENV1:B:$ENV2", - "TEST_VAR": "$ENV1.B.$ENV2 $UNKNOWN_VAR", - "P4_RSHCOMMAND": "", - "LD_LIBRARY_PATH": f"{intel_path}:$LD_LIBRARY_PATH", - "PATH": f"{intel_path}/bin64:$PATH", - }, - }, - }, - "2016": { - ecl_config.Keys.scalar: {ecl_config.Keys.executable: "/does/not/exist"}, - ecl_config.Keys.mpi: { - ecl_config.Keys.executable: "/does/not/exist", - ecl_config.Keys.mpirun: mpi_run, - }, - }, - "2017": { - ecl_config.Keys.mpi: { - ecl_config.Keys.executable: mpi_exe, - ecl_config.Keys.mpirun: "/does/not/exist", - } - }, - }, - } - - with open("file.yml", "w", encoding="utf-8") as filehandle: - filehandle.write(yaml.dump(mocked_simulator_config)) - - conf = ecl_config.Ecl100Config() - # Fails because there is no version 2020 - with pytest.raises(KeyError): - sim = conf.sim("2020") - - # Fails because the 2016 version points to a not existing executable - with pytest.raises(OSError): - sim = conf.sim("2016") - - # Fails because the 2016 mpi version points to a non existing mpi - # executable - with pytest.raises(OSError): - sim = conf.mpi_sim("2016") - - # Fails because the 2017 mpi version mpirun points to a non existing - # mpi executable - with pytest.raises(OSError): - sim = conf.mpi_sim("2017") - - # Fails because the 2017 scalar version is not registered - with pytest.raises(KeyError): - sim = conf.sim("2017") - - sim = conf.sim("2015") - mpi_sim = conf.mpi_sim("2015") - - # Check that global environment has been propagated down. - assert "LICENSE_SERVER" in mpi_sim.env - - # Check replacement of $ENV_VAR in values. - assert mpi_sim.env["I_MPI_ROOT"] == "A:B:C" - assert mpi_sim.env["TEST_VAR"] == "A.B.C $UNKNOWN_VAR" - assert len(mpi_sim.env) == 1 + 5 - - sim = conf.sim("2015") - assert sim.executable == scalar_exe - assert sim.mpirun is None - - with pytest.raises( - OSError, match="The executable: '/does/not/exist' can not be executed by user" - ): - simulators = conf.simulators() - - simulators = conf.simulators(strict=False) - assert len(simulators) == 2 - - -@pytest.mark.usefixtures("use_tmpdir") -def test_default_version_definitions(monkeypatch): - os.mkdir("bin") - scalar_exe = "bin/scalar_exe" - with open(scalar_exe, "w", encoding="utf-8") as fh: - fh.write("This is an executable ...") - os.chmod(scalar_exe, stat.S_IEXEC) - - mock_dict_0 = { - ecl_config.Keys.versions: { - "2015": {ecl_config.Keys.scalar: {ecl_config.Keys.executable: scalar_exe}}, - "2016": {ecl_config.Keys.scalar: {ecl_config.Keys.executable: scalar_exe}}, - } - } - - mock_dict_1 = { - ecl_config.Keys.default_version: "2015", - ecl_config.Keys.versions: { - "2015": {ecl_config.Keys.scalar: {ecl_config.Keys.executable: scalar_exe}}, - "2016": {ecl_config.Keys.scalar: {ecl_config.Keys.executable: scalar_exe}}, - }, - } - - monkeypatch.setenv("ECL100_SITE_CONFIG", os.path.join("file.yml")) - with open("file.yml", "w", encoding="utf-8") as f: - f.write(yaml.dump(mock_dict_1)) - - conf = ecl_config.Ecl100Config() - sim = conf.sim() - assert sim.version == "2015" - assert "2015" in conf - assert "xxxx" not in conf - assert ecl_config.Keys.default in conf - assert None in conf - - sim = conf.sim("default") - assert sim.version == "2015" - - with open("file.yml", "w", encoding="utf-8") as filehandle: - filehandle.write(yaml.dump(mock_dict_0)) - - conf = ecl_config.Ecl100Config() - assert ecl_config.Keys.default not in conf - assert conf.default_version is None - - with pytest.raises( - ValueError, match="The default version has not been set in the config file" - ): - sim = conf.sim() - - with pytest.raises( - ValueError, match="The default version has not been set in the config file" - ): - sim = conf.sim(ecl_config.Keys.default) diff --git a/tests/ert/unit_tests/resources/test_opm_flow.py b/tests/ert/unit_tests/resources/test_opm_flow.py deleted file mode 100644 index 9d0b125a58b..00000000000 --- a/tests/ert/unit_tests/resources/test_opm_flow.py +++ /dev/null @@ -1,333 +0,0 @@ -import os -import platform -import re -import shutil -from pathlib import Path -from subprocess import CalledProcessError - -import pytest -import yaml - -from tests.ert.utils import SOURCE_DIR - -from ._import_from_location import import_from_location - -# import ecl_config.py and ecl_run from ert/forward-models/res/script -# package-data path which. These are kept out of the ert package to avoid the -# overhead of importing ert. This is necessary as these may be invoked as a -# subprocess on each realization. - -ecl_config = import_from_location( - "ecl_config", - SOURCE_DIR / "src/ert/resources/forward_models/res/script/ecl_config.py", -) - -ecl_run = import_from_location( - "ecl_run", - SOURCE_DIR / "src/ert/resources/forward_models/res/script/ecl_run.py", -) - - -def locate_flow_binary() -> str: - """Locate the path for a flow executable. - - Returns the empty string if there is nothing to be found in $PATH.""" - flow_rhel_version = "7" - if "el8" in platform.release(): - flow_rhel_version = "8" - candidates = ["flow", f"/project/res/x86_64_RH_{flow_rhel_version}/bin/flowdaily"] - for candidate in candidates: - foundpath = shutil.which(candidate) - if foundpath is not None: - return foundpath - return "" - - -flow_installed = pytest.mark.skipif( - not locate_flow_binary(), reason="Requires flow to be installed in $PATH" -) - - -def find_version(output): - return re.search(r"flow\s*([\d.]+)", output).group(1) - - -@pytest.fixture(name="init_flow_config") -def fixture_init_flow_config(monkeypatch, tmpdir): - conf = { - "default_version": "default", - "versions": {"default": {"scalar": {"executable": locate_flow_binary()}}}, - } - with tmpdir.as_cwd(): - Path("flow_config.yml").write_text(yaml.dump(conf), encoding="utf-8") - monkeypatch.setenv("FLOW_SITE_CONFIG", "flow_config.yml") - yield - - -def test_ecl_run_make_LSB_MCPU_machine_list(): - assert ecl_run.make_LSB_MCPU_machine_list("host1 4 host2 4") == [ - "host1", - "host1", - "host1", - "host1", - "host2", - "host2", - "host2", - "host2", - ] - - -@pytest.mark.integration_test -@flow_installed -def test_flow(init_flow_config, source_root): - shutil.copy(source_root / "test-data/ert/eclipse/SPE1.DATA", "SPE1.DATA") - shutil.copy( - source_root / "test-data/ert/eclipse/SPE1_ERROR.DATA", "SPE1_ERROR.DATA" - ) - flow_config = ecl_config.FlowConfig() - sim = flow_config.sim() - flow_run = ecl_run.EclRun("SPE1.DATA", sim) - flow_run.runEclipse() - - ecl_run.run(flow_config, ["SPE1.DATA"]) - - flow_run = ecl_run.EclRun("SPE1_ERROR.DATA", sim) - with pytest.raises(CalledProcessError, match="returned non-zero exit status 1"): - flow_run.runEclipse() - - ecl_run.run(flow_config, ["SPE1_ERROR.DATA", "--ignore-errors"]) - - # Invalid version - with pytest.raises(KeyError): - ecl_run.run(flow_config, ["SPE1.DATA", "--version=no/such/version"]) - - -@pytest.mark.integration_test -@flow_installed -def test_flow_with_mpi(init_flow_config, source_root): - """This only tests that ERT will be able to start flow on a data deck with - the PARALLEL keyword present. It does not assert anything regarding whether - MPI-parallelization will get into play.""" - shutil.copy( - source_root / "test-data/ert/eclipse/SPE1_PARALLEL.DATA", "SPE1_PARALLEL.DATA" - ) - flow_config = ecl_config.FlowConfig() - sim = flow_config.sim() - flow_run = ecl_run.EclRun("SPE1_PARALLEL.DATA", sim) - flow_run.runEclipse() - - -@pytest.mark.usefixtures("use_tmpdir") -def test_running_flow_given_env_config_can_still_read_parent_env(monkeypatch): - version = "1111.11" - - # create a script that prints env vars ENV1 and ENV2 to a file - with open("mocked_flow", "w", encoding="utf-8") as f: - f.write("#!/bin/sh\n") - f.write("echo $ENV1 > out.txt\n") - f.write("echo $ENV2 >> out.txt\n") - executable = os.path.join(os.getcwd(), "mocked_flow") - os.chmod(executable, 0o777) - - # create a flow_config.yml with environment extension ENV2 - conf = { - "default_version": version, - "versions": { - version: { - "scalar": {"executable": executable, "env": {"ENV2": "VAL2"}}, - } - }, - } - - with open("flow_config.yml", "w", encoding="utf-8") as filehandle: - filehandle.write(yaml.dump(conf)) - - # set the environment variable ENV1 - monkeypatch.setenv("ENV1", "VAL1") - monkeypatch.setenv("FLOW_SITE_CONFIG", "flow_config.yml") - - with open("DUMMY.DATA", "w", encoding="utf-8") as filehandle: - filehandle.write("dummy") - - with open("DUMMY.PRT", "w", encoding="utf-8") as filehandle: - filehandle.write("Errors 0\n") - filehandle.write("Bugs 0\n") - - # run the script - flow_config = ecl_config.FlowConfig() - sim = flow_config.sim() - flow_run = ecl_run.EclRun("DUMMY.DATA", sim) - flow_run.runEclipse() - - # assert that the script was able to read both the variables correctly - with open("out.txt", encoding="utf-8") as filehandle: - lines = filehandle.readlines() - - assert lines == ["VAL1\n", "VAL2\n"] - - -@pytest.mark.usefixtures("use_tmpdir") -def test_running_flow_given_no_env_config_can_still_read_parent_env(monkeypatch): - version = "1111.11" - - # create a script that prints env vars ENV1 and ENV2 to a file - with open("flow", "w", encoding="utf-8") as f: - f.write("#!/bin/sh\n") - f.write("echo $ENV1 > out.txt\n") - f.write("echo $ENV2 >> out.txt\n") - executable = os.path.join(os.getcwd(), "flow") - os.chmod(executable, 0o777) - - # create a flow_config.yml with environment extension ENV2 - conf = { - "default_version": version, - "versions": { - version: { - "scalar": {"executable": executable}, - } - }, - } - - with open("flow_config.yml", "w", encoding="utf-8") as filehandle: - filehandle.write(yaml.dump(conf)) - - # set the environment variable ENV1 - monkeypatch.setenv("ENV1", "VAL1") - monkeypatch.setenv("ENV2", "VAL2") - monkeypatch.setenv("FLOW_SITE_CONFIG", "flow_config.yml") - - with open("DUMMY.DATA", "w", encoding="utf-8") as filehandle: - filehandle.write("dummy") - - with open("DUMMY.PRT", "w", encoding="utf-8") as filehandle: - filehandle.write("Errors 0\n") - filehandle.write("Bugs 0\n") - - # run the script - flow_config = ecl_config.FlowConfig() - sim = flow_config.sim() - flow_run = ecl_run.EclRun("DUMMY.DATA", sim) - flow_run.runEclipse() - - # assert that the script was able to read both the variables correctly - with open("out.txt", encoding="utf-8") as filehandle: - lines = filehandle.readlines() - - assert lines == ["VAL1\n", "VAL2\n"] - - -@pytest.mark.usefixtures("use_tmpdir") -def test_running_flow_given_env_variables_with_same_name_as_parent_env_variables_will_overwrite( - monkeypatch, -): - version = "1111.11" - - # create a script that prints env vars ENV1 and ENV2 to a file - with open("flow", "w", encoding="utf-8") as filehandle: - filehandle.write("#!/bin/sh\n") - filehandle.write("echo $ENV1 > out.txt\n") - filehandle.write("echo $ENV2 >> out.txt\n") - executable = os.path.join(os.getcwd(), "flow") - os.chmod(executable, 0o777) - - # create a flow_config.yml with environment extension ENV2 - conf = { - "default_version": version, - "versions": { - version: { - "scalar": { - "executable": executable, - "env": {"ENV1": "OVERWRITTEN1", "ENV2": "OVERWRITTEN2"}, - }, - } - }, - } - - with open("flow_config.yml", "w", encoding="utf-8") as filehandle: - filehandle.write(yaml.dump(conf)) - - # set the environment variable ENV1 - monkeypatch.setenv("ENV1", "VAL1") - monkeypatch.setenv("ENV2", "VAL2") - monkeypatch.setenv("FLOW_SITE_CONFIG", "flow_config.yml") - - with open("DUMMY.DATA", "w", encoding="utf-8") as filehandle: - filehandle.write("dummy") - - with open("DUMMY.PRT", "w", encoding="utf-8") as filehandle: - filehandle.write("Errors 0\n") - filehandle.write("Bugs 0\n") - - # run the script - flow_config = ecl_config.FlowConfig() - sim = flow_config.sim() - flow_run = ecl_run.EclRun("DUMMY.DATA", sim) - flow_run.runEclipse() - - # assert that the script was able to read both the variables correctly - with open("out.txt", encoding="utf-8") as filehandle: - lines = filehandle.readlines() - - assert lines == ["OVERWRITTEN1\n", "OVERWRITTEN2\n"] - - -def test_slurm_env_parsing(): - host_list = ecl_run.make_SLURM_machine_list("ws", "2") - assert host_list == ["ws", "ws"] - - host_list = ecl_run.make_SLURM_machine_list("ws1,ws2", "2,3") - assert host_list == ["ws1", "ws1", "ws2", "ws2", "ws2"] - - host_list = ecl_run.make_SLURM_machine_list("ws[1-3]", "1,2,3") - assert host_list == ["ws1", "ws2", "ws2", "ws3", "ws3", "ws3"] - - host_list = ecl_run.make_SLURM_machine_list("ws[1,3]", "1,3") - assert host_list == ["ws1", "ws3", "ws3", "ws3"] - - host_list = ecl_run.make_SLURM_machine_list("ws[1-3,6-8]", "1,2,3,1,2,3") - assert host_list == [ - "ws1", - "ws2", - "ws2", - "ws3", - "ws3", - "ws3", - "ws6", - "ws7", - "ws7", - "ws8", - "ws8", - "ws8", - ] - - host_list = ecl_run.make_SLURM_machine_list("ws[1-3,6-8]", "2(x2),3,1,2(x2)") - assert host_list == [ - "ws1", - "ws1", - "ws2", - "ws2", - "ws3", - "ws3", - "ws3", - "ws6", - "ws7", - "ws7", - "ws8", - "ws8", - ] - - host_list = ecl_run.make_SLURM_machine_list("ws[1-3,6],ws[7-8]", "2(x2),3,1,2(x2)") - assert host_list == [ - "ws1", - "ws1", - "ws2", - "ws2", - "ws3", - "ws3", - "ws3", - "ws6", - "ws7", - "ws7", - "ws8", - "ws8", - ] diff --git a/tests/ert/unit_tests/resources/test_ecl_run_new_config.py b/tests/ert/unit_tests/resources/test_run_eclipse_simulator.py similarity index 71% rename from tests/ert/unit_tests/resources/test_ecl_run_new_config.py rename to tests/ert/unit_tests/resources/test_run_eclipse_simulator.py index 7f2acde2b1e..a093ca1bb1d 100644 --- a/tests/ert/unit_tests/resources/test_ecl_run_new_config.py +++ b/tests/ert/unit_tests/resources/test_run_eclipse_simulator.py @@ -1,12 +1,8 @@ -import inspect -import json import os import re import shutil -import stat import subprocess import sys -import textwrap import threading import time from pathlib import Path @@ -15,148 +11,103 @@ import numpy as np import pytest import resfo -import yaml +from ert.plugins import ErtPluginManager from tests.ert.utils import SOURCE_DIR from ._import_from_location import import_from_location -# import ecl_config.py and ecl_run from ert/resources/forward_models/res/script -# package-data path which. These are kept out of the ert package to avoid the +# import run_reservoirsimulator from ert/resources/forward_models +# package-data. This is kept out of the ert package to avoid the # overhead of importing ert. This is necessary as these may be invoked as a # subprocess on each realization. -ecl_config = import_from_location( - "ecl_config", - os.path.join( - SOURCE_DIR, "src/ert/resources/forward_models/res/script/ecl_config.py" - ), -) - -ecl_run = import_from_location( - "ecl_run", - os.path.join(SOURCE_DIR, "src/ert/resources/forward_models/res/script/ecl_run.py"), +run_reservoirsimulator = import_from_location( + "run_reservoirsimulator", + SOURCE_DIR / "src/ert/resources/forward_models/run_reservoirsimulator.py", ) -def find_version(output): - return re.search(r"flow\s*([\d.]+)", output).group(1) - - -@pytest.fixture -def eclrun_conf(): - return { - "eclrun_env": { - "SLBSLS_LICENSE_FILE": "7321@eclipse-lic-no.statoil.no", - "ECLPATH": "/prog/res/ecl/grid", - "PATH": "/prog/res/ecl/grid/macros", - "F_UFMTENDIAN": "big", - "LSB_JOBID": None, - } - } - - -@pytest.fixture -def init_eclrun_config(tmp_path, monkeypatch, eclrun_conf): - with open(tmp_path / "ecl100_config.yml", "w", encoding="utf-8") as f: - f.write(yaml.dump(eclrun_conf)) - monkeypatch.setenv("ECL100_SITE_CONFIG", "ecl100_config.yml") - - -def test_get_version_raise(): - econfig = ecl_config.Ecl100Config() - class_file = inspect.getfile(ecl_config.Ecl100Config) - class_dir = os.path.dirname(os.path.abspath(class_file)) - msg = os.path.join(class_dir, "ecl100_config.yml") - with pytest.raises(ValueError, match=msg): - econfig._get_version(None) - - -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") -def test_eclrun_will_prepend_path_and_get_env_vars_from_ecl100config( - eclrun_conf, -): - # GIVEN a mocked eclrun that only dumps it env variables - Path("eclrun").write_text( - textwrap.dedent( - """\ - #!/usr/bin/env python - import os - import json - with open("env.json", "w") as f: - json.dump(dict(os.environ), f) - """ - ), - encoding="utf-8", - ) - os.chmod("eclrun", os.stat("eclrun").st_mode | stat.S_IEXEC) - Path("DUMMY.DATA").write_text("", encoding="utf-8") - econfig = ecl_config.Ecl100Config() - eclrun_config = ecl_config.EclrunConfig(econfig, "dummyversion") - erun = ecl_run.EclRun("DUMMY", None, check_status=False) - with mock.patch.object( - erun, "_get_run_command", mock.MagicMock(return_value="./eclrun") +@pytest.fixture(name="e100_env") +def e100_env(monkeypatch): + for var, value in ( + ErtPluginManager() + .get_forward_model_configuration() + .get("ECLIPSE100", {}) + .items() ): - # WHEN eclrun is run - erun.runEclipse(eclrun_config=eclrun_config) - - # THEN the env provided to eclrun is the same - # as the env from ecl_config, but PATH has been - # prepended with the value from ecl_config - with open("env.json", encoding="utf-8") as f: - run_env = json.load(f) - - expected_eclrun_env = eclrun_conf["eclrun_env"] - for key, value in expected_eclrun_env.items(): - if value is None: - assert key not in run_env - continue # Typically LSB_JOBID - - if key == "PATH": - assert run_env[key].startswith(value) - else: - assert value == run_env[key] + monkeypatch.setenv(var, value) + yield @pytest.mark.integration_test -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") @pytest.mark.requires_eclipse def test_ecl100_binary_can_produce_output(source_root): shutil.copy( source_root / "test-data/ert/eclipse/SPE1.DATA", "SPE1.DATA", ) - econfig = ecl_config.Ecl100Config() - erun = ecl_run.EclRun("SPE1.DATA", None) - erun.runEclipse(eclrun_config=ecl_config.EclrunConfig(econfig, "2019.3")) + erun = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "2019.3", "SPE1.DATA" + ) + erun.run_eclipseX00() - ok_path = Path(erun.runPath()) / f"{erun.baseName()}.OK" - prt_path = Path(erun.runPath()) / f"{erun.baseName()}.PRT" + ok_path = Path("SPE1.OK") + prt_path = Path("SPE1.PRT") assert ok_path.exists() assert prt_path.stat().st_size > 0 - assert len(erun.parseErrors()) == 0 + assert len(erun.parse_errors()) == 0 + + assert not Path("SPE1.h5").exists(), "HDF conversion should not be run by default" @pytest.mark.integration_test +@pytest.mark.usefixtures("use_tmpdir", "e100_env") @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") -def test_forward_model_cmd_line_api_works(source_root): - # ecl_run.run() is the forward model wrapper around ecl_run.runEclipse() +def test_ecl100_binary_can_handle_extra_dots_in_casename(source_root): + """There is code dealing with file extensions in the Eclipse runner + so it better be tested to work as expected.""" + shutil.copy( + source_root / "test-data/ert/eclipse/SPE1.DATA", + "SPE1.DOT.DATA", + ) + + erun = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "2019.3", "SPE1.DOT.DATA" + ) + erun.run_eclipseX00() + + ok_path = Path("SPE1.DOT.OK") + prt_path = Path("SPE1.DOT.PRT") + + assert ok_path.exists() + assert prt_path.stat().st_size > 0 + assert len(erun.parse_errors()) == 0 + + +@pytest.mark.integration_test +@pytest.mark.requires_eclipse +@pytest.mark.usefixtures("use_tmpdir", "e100_env") +def test_runeclrun_argparse_api(source_root): + # Todo: avoid actually running Eclipse here, use a mock + # Also test the other command line options. shutil.copy( source_root / "test-data/ert/eclipse/SPE1.DATA", "SPE1.DATA", ) - ecl_run.run(ecl_config.Ecl100Config(), ["SPE1.DATA", "--version=2019.3"]) + run_reservoirsimulator.run_reservoirsimulator(["eclipse", "2019.3", "SPE1.DATA"]) + assert Path("SPE1.OK").exists() @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") def test_eclrun_when_unsmry_is_ambiguous(source_root): shutil.copy( source_root / "test-data/ert/eclipse/SPE1.DATA", @@ -165,13 +116,13 @@ def test_eclrun_when_unsmry_is_ambiguous(source_root): # Mock files from another existing run Path("PREVIOUS_SPE1.SMSPEC").touch() Path("PREVIOUS_SPE1.UNSMRY").touch() - ecl_run.run(ecl_config.Ecl100Config(), ["SPE1.DATA", "--version=2019.3"]) + run_reservoirsimulator.run_reservoirsimulator(["eclipse", "2019.3", "SPE1.DATA"]) assert Path("SPE1.OK").exists() @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") def test_eclrun_when_unsmry_is_ambiguous_with_mpi(source_root): deck = (source_root / "test-data/ert/eclipse/SPE1.DATA").read_text(encoding="utf-8") deck = deck.replace("TITLE", "PARALLEL\n 2 /\n\nTITLE") @@ -179,75 +130,72 @@ def test_eclrun_when_unsmry_is_ambiguous_with_mpi(source_root): # Mock files from another existing run Path("PREVIOUS_SPE1.SMSPEC").touch() Path("PREVIOUS_SPE1.UNSMRY").touch() - ecl_run.run( - ecl_config.Ecl100Config(), ["SPE1.DATA", "--version=2019.3", "--num-cpu=2"] + run_reservoirsimulator.run_reservoirsimulator( + ["eclipse", "2019.3", "SPE1.DATA", "--num-cpu=2"] ) assert Path("SPE1.OK").exists() @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") def test_ecl_run_on_parallel_deck(source_root): deck = (source_root / "test-data/ert/eclipse/SPE1.DATA").read_text(encoding="utf-8") deck = deck.replace("TITLE", "PARALLEL\n 2 /\n\nTITLE") Path("SPE1.DATA").write_text(deck, encoding="utf-8") - ecl_run.run( - ecl_config.Ecl100Config(), ["SPE1.DATA", "--version=2019.3", "--num-cpu=2"] + run_reservoirsimulator.run_reservoirsimulator( + ["eclipse", "2019.3", "SPE1.DATA", "--num-cpu=2"] ) assert Path("SPE1.OK").exists() @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") def test_eclrun_on_nosim(source_root): deck = (source_root / "test-data/ert/eclipse/SPE1.DATA").read_text(encoding="utf-8") deck = deck.replace("TITLE", "NOSIM\n\nTITLE") Path("SPE1.DATA").write_text(deck, encoding="utf-8") - ecl_run.run(ecl_config.Ecl100Config(), ["SPE1.DATA", "--version=2019.3"]) + run_reservoirsimulator.run_reservoirsimulator(["eclipse", "2019.3", "SPE1.DATA"]) assert Path("SPE1.OK").exists() assert not Path("SPE1.UNSMRY").exists() @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") def test_eclrun_on_nosim_with_existing_unsmry_file(source_root): """This emulates users rerunning Eclipse in an existing runpath""" deck = (source_root / "test-data/ert/eclipse/SPE1.DATA").read_text(encoding="utf-8") deck = deck.replace("TITLE", "NOSIM\n\nTITLE") Path("SPE1.UNSMRY").write_text("", encoding="utf-8") Path("SPE1.DATA").write_text(deck, encoding="utf-8") - ecl_run.run(ecl_config.Ecl100Config(), ["SPE1.DATA", "--version=2019.3"]) + run_reservoirsimulator.run_reservoirsimulator(["eclipse", "2019.3", "SPE1.DATA"]) assert Path("SPE1.OK").exists() -@pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") -def test_await_completed_summary_file_times_out_on_nosim_with_mpi(source_root): - minimum_duration = 15 # This is max_wait in the await function tested +@pytest.mark.usefixtures("use_tmpdir", "e100_env") +def test_await_completed_summary_file_does_not_time_out_on_nosim_with_mpi(source_root): deck = (source_root / "test-data/ert/eclipse/SPE1.DATA").read_text(encoding="utf-8") deck = deck.replace("TITLE", "NOSIM\n\nPARALLEL\n 2 /\n\nTITLE") Path("SPE1.DATA").write_text(deck, encoding="utf-8") - start_time = time.time() - ecl_run.run( - ecl_config.Ecl100Config(), ["SPE1.DATA", "--version=2019.3", "--num-cpu=2"] + run_reservoirsimulator.run_reservoirsimulator( + ["eclipse", "2019.3", "SPE1.DATA", "--num-cpu=2"] ) - end_time = time.time() assert Path("SPE1.OK").exists() assert not Path( "SPE1.UNSMRY" ).exists(), "A nosim run should not produce an unsmry file" - assert ( - end_time - start_time > minimum_duration - ), "timeout in await_completed not triggered" + # The timeout will not happen since find_unsmry() returns None. + + # There is no assert on runtime because we cannot predict how long the Eclipse license + # checkout takes. @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") def test_eclrun_on_nosim_with_mpi_and_existing_unsmry_file(source_root): """This emulates users rerunning Eclipse in an existing runpath, with MPI. @@ -258,8 +206,8 @@ def test_eclrun_on_nosim_with_mpi_and_existing_unsmry_file(source_root): deck = deck.replace("TITLE", "NOSIM\n\nPARALLEL\n 2 /\n\nTITLE") Path("SPE1.UNSMRY").write_text("", encoding="utf-8") Path("SPE1.DATA").write_text(deck, encoding="utf-8") - ecl_run.run( - ecl_config.Ecl100Config(), ["SPE1.DATA", "--version=2019.3", "--num-cpu=2"] + run_reservoirsimulator.run_reservoirsimulator( + ["eclipse", "2019.3", "SPE1.DATA", "--num-cpu=2"] ) # There is no assert on runtime because we cannot predict how long the Eclipse license # checkout takes, otherwise we should assert that there is no await for unsmry completion. @@ -268,77 +216,71 @@ def test_eclrun_on_nosim_with_mpi_and_existing_unsmry_file(source_root): @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") def test_eclrun_will_raise_on_deck_errors(source_root): shutil.copy( source_root / "test-data/ert/eclipse/SPE1_ERROR.DATA", "SPE1_ERROR.DATA", ) - econfig = ecl_config.Ecl100Config() - eclrun_config = ecl_config.EclrunConfig(econfig, "2019.3") - erun = ecl_run.EclRun("SPE1_ERROR", None) + erun = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "2019.3", "SPE1_ERROR" + ) with pytest.raises(Exception, match="ERROR"): - erun.runEclipse(eclrun_config=eclrun_config) + erun.run_eclipseX00() @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") -def test_failed_run_nonzero_returncode(monkeypatch): - Path("FOO.DATA").write_text("", encoding="utf-8") - econfig = ecl_config.Ecl100Config() - eclrun_config = ecl_config.EclrunConfig(econfig, "2021.3") - erun = ecl_run.EclRun("FOO.DATA", None) - monkeypatch.setattr("ecl_run.EclRun.execEclipse", mock.MagicMock(return_value=1)) +@pytest.mark.usefixtures("use_tmpdir", "e100_env") +def test_failed_run_gives_nonzero_returncode_and_exception(monkeypatch): + deck = Path("MOCKED_DECK.DATA") + deck.touch() + erun = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "dummy_version", deck.name + ) + return_value_with_code = mock.MagicMock() + return_value_with_code.returncode = 1 + monkeypatch.setattr( + "subprocess.run", mock.MagicMock(return_value=return_value_with_code) + ) with pytest.raises( # The return code 1 is sometimes translated to 255. subprocess.CalledProcessError, match=r"Command .*eclrun.* non-zero exit status (1|255)\.$", ): - erun.runEclipse(eclrun_config=eclrun_config) + erun.run_eclipseX00() @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") def test_deck_errors_can_be_ignored(source_root): shutil.copy( source_root / "test-data/ert/eclipse/SPE1_ERROR.DATA", "SPE1_ERROR.DATA", ) - econfig = ecl_config.Ecl100Config() - ecl_run.run(econfig, ["SPE1_ERROR", "--version=2019.3", "--ignore-errors"]) - - -@pytest.mark.integration_test -@pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") -def test_no_hdf5_output_by_default_with_ecl100(source_root): - shutil.copy( - source_root / "test-data/ert/eclipse/SPE1.DATA", - "SPE1.DATA", + run_reservoirsimulator.run_reservoirsimulator( + ["eclipse", "2019.3", "SPE1_ERROR.DATA", "--ignore-errors"] ) - econfig = ecl_config.Ecl100Config() - ecl_run.run(econfig, ["SPE1.DATA", "--version=2019.3"]) - assert not Path("SPE1.h5").exists() @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") def test_flag_needed_to_produce_hdf5_output_with_ecl100(source_root): shutil.copy( source_root / "test-data/ert/eclipse/SPE1.DATA", "SPE1.DATA", ) - econfig = ecl_config.Ecl100Config() - ecl_run.run(econfig, ["SPE1.DATA", "--version=2019.3", "--summary-conversion"]) + run_reservoirsimulator.run_reservoirsimulator( + ["eclipse", "2019.3", "SPE1.DATA", "--summary-conversion"] + ) assert Path("SPE1.h5").exists() @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") def test_mpi_run_is_managed_by_system_tool(source_root): shutil.copy( source_root / "test-data/ert/eclipse/SPE1_PARALLEL.DATA", @@ -347,8 +289,9 @@ def test_mpi_run_is_managed_by_system_tool(source_root): assert re.findall( r"PARALLEL\s+2", Path("SPE1_PARALLEL.DATA").read_text(encoding="utf-8") ), "Test requires a deck needing 2 CPUs" - econfig = ecl_config.Ecl100Config() - ecl_run.run(econfig, ["SPE1_PARALLEL.DATA", "--version=2019.3"]) + run_reservoirsimulator.run_reservoirsimulator( + ["eclipse", "2019.3", "SPE1_PARALLEL.DATA"] + ) assert Path("SPE1_PARALLEL.PRT").stat().st_size > 0, "Eclipse did not run at all" assert Path("SPE1_PARALLEL.MSG").exists(), "No output from MPI process 1" @@ -387,17 +330,18 @@ def test_find_unsmry(paths_to_touch, basepath, expectation): Path(path).touch() if expectation == "ValueError": with pytest.raises(ValueError): - ecl_run.find_unsmry(Path(basepath)) + run_reservoirsimulator.find_unsmry(Path(basepath)) elif expectation is None: - assert ecl_run.find_unsmry(Path(basepath)) is None + assert run_reservoirsimulator.find_unsmry(Path(basepath)) is None else: - assert str(ecl_run.find_unsmry(Path(basepath))) == expectation + assert str(run_reservoirsimulator.find_unsmry(Path(basepath))) == expectation +@pytest.mark.usefixtures("use_tmpdir") def test_await_completed_summary_file_will_timeout_on_missing_smry(): assert ( # Expected wait time is 0.3 - ecl_run.await_completed_unsmry_file( + run_reservoirsimulator.await_completed_unsmry_file( "SPE1.UNSMRY", max_wait=0.3, poll_interval=0.1 ) > 0.3 @@ -410,7 +354,7 @@ def test_await_completed_summary_file_will_return_asap(): assert ( 0.01 # Expected wait time is the poll_interval - < ecl_run.await_completed_unsmry_file( + < run_reservoirsimulator.await_completed_unsmry_file( "FOO.UNSMRY", max_wait=0.5, poll_interval=0.1 ) < 0.4 @@ -441,7 +385,7 @@ def slow_smry_writer(): assert ( 0.5 # Minimal wait time is around 0.55 - < ecl_run.await_completed_unsmry_file( + < run_reservoirsimulator.await_completed_unsmry_file( "FOO.UNSMRY", max_wait=4, poll_interval=0.21 ) < 2 @@ -450,6 +394,7 @@ def slow_smry_writer(): @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse def test_ecl100_license_error_is_caught(): prt_error = """\ @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): @@ -472,13 +417,16 @@ def test_ecl100_license_error_is_caught(): Path("FOO.ECLEND").write_text(eclend, encoding="utf-8") Path("FOO.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("FOO.DATA", "dummysimulatorobject") - with pytest.raises(ecl_run.EclError) as exception_info: - run.assertECLEND() + run = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "dummyversion", "FOO.DATA" + ) + with pytest.raises(run_reservoirsimulator.EclError) as exception_info: + run.assert_eclend() assert exception_info.value.failed_due_to_license_problems() @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse def test_ecl100_crash_is_not_mistaken_as_license_trouble(): prt_error = """\ @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): @@ -499,13 +447,16 @@ def test_ecl100_crash_is_not_mistaken_as_license_trouble(): Path("FOO.ECLEND").write_text(eclend, encoding="utf-8") Path("FOO.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("FOO.DATA", "dummysimulatorobject") - with pytest.raises(ecl_run.EclError) as exception_info: - run.assertECLEND() + run = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "dummyversion", "FOO.DATA" + ) + with pytest.raises(run_reservoirsimulator.EclError) as exception_info: + run.assert_eclend() assert not exception_info.value.failed_due_to_license_problems() @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse def test_ecl300_license_error_is_caught(): prt_error = """\ @--Message:The message service has been activated @@ -534,13 +485,16 @@ def test_ecl300_license_error_is_caught(): Path("FOO.ECLEND").write_text(eclend, encoding="utf-8") Path("FOO.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("FOO.DATA", "dummysimulatorobject") - with pytest.raises(ecl_run.EclError) as exception_info: - run.assertECLEND() + run = run_reservoirsimulator.RunReservoirSimulator( + "e300", "dummyversion", "FOO.DATA" + ) + with pytest.raises(run_reservoirsimulator.EclError) as exception_info: + run.assert_eclend() assert exception_info.value.failed_due_to_license_problems() @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse def test_ecl300_crash_is_not_mistaken_as_license_trouble(): prt_error = """\ @--Message:The message service has been activated @@ -566,13 +520,16 @@ def test_ecl300_crash_is_not_mistaken_as_license_trouble(): Path("FOO.ECLEND").write_text(eclend, encoding="utf-8") Path("FOO.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("FOO.DATA", "dummysimulatorobject") - with pytest.raises(ecl_run.EclError) as exception_info: - run.assertECLEND() + run = run_reservoirsimulator.RunReservoirSimulator( + "e300", "dummyversion", "FOO.DATA" + ) + with pytest.raises(run_reservoirsimulator.EclError) as exception_info: + run.assert_eclend() assert not exception_info.value.failed_due_to_license_problems() @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse def test_license_error_in_slave_is_caught(): """If a coupled Eclipse model fails in one of the slave runs due to license issues, there is no trace of licence in the master PRT file. @@ -626,13 +583,16 @@ def test_license_error_in_slave_is_caught(): Path("slave2/EIGHTCELLS_SLAVE.PRT").write_text(slave_prt_error, encoding="utf-8") Path("EIGHTCELLS_MASTER.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("EIGHTCELLS_MASTER.DATA", "dummysimulatorobject") - with pytest.raises(ecl_run.EclError) as exception_info: - run.assertECLEND() + run = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "dummyversion", "EIGHTCELLS_MASTER.DATA" + ) + with pytest.raises(run_reservoirsimulator.EclError) as exception_info: + run.assert_eclend() assert exception_info.value.failed_due_to_license_problems() @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse def test_crash_in_slave_is_not_mistaken_as_license(): Path("slave1").mkdir() Path("slave2").mkdir() @@ -675,13 +635,16 @@ def test_crash_in_slave_is_not_mistaken_as_license(): Path("slave2/EIGHTCELLS_SLAVE.PRT").write_text(slave_prt_error, encoding="utf-8") Path("EIGHTCELLS_MASTER.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("EIGHTCELLS_MASTER.DATA", "dummysimulatorobject") - with pytest.raises(ecl_run.EclError) as exception_info: - run.assertECLEND() + run = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "dummyversion", "EIGHTCELLS_MASTER.DATA" + ) + with pytest.raises(run_reservoirsimulator.EclError) as exception_info: + run.assert_eclend() assert not exception_info.value.failed_due_to_license_problems() @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse def test_too_few_parsed_error_messages_gives_warning(): prt_error = """\ @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): @@ -699,13 +662,16 @@ def test_too_few_parsed_error_messages_gives_warning(): Path("ECLCASE.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("ECLCASE.DATA", "dummysimulatorobject") - with pytest.raises(ecl_run.EclError) as exception_info: - run.assertECLEND() + run = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "dummyversion", "ECLCASE.DATA" + ) + with pytest.raises(run_reservoirsimulator.EclError) as exception_info: + run.assert_eclend() assert "Warning, mismatch between stated Error count" in str(exception_info.value) @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse def test_tail_of_prt_file_is_included_when_error_count_inconsistency(): prt_error = ( "this_should_not_be_included " @@ -729,14 +695,17 @@ def test_tail_of_prt_file_is_included_when_error_count_inconsistency(): Path("ECLCASE.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("ECLCASE.DATA", "dummysimulatorobject") - with pytest.raises(ecl_run.EclError) as exception_info: - run.assertECLEND() + run = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "dummyversion", "ECLCASE.DATA" + ) + with pytest.raises(run_reservoirsimulator.EclError) as exception_info: + run.assert_eclend() assert "this_should_be_included" in str(exception_info.value) assert "this_should_not_be_included" not in str(exception_info.value) @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse def test_correct_number_of_parsed_error_messages_gives_no_warning(): prt_error = """\ @-- ERROR AT TIME 0.0 DAYS ( 1-JAN-2000): @@ -754,15 +723,18 @@ def test_correct_number_of_parsed_error_messages_gives_no_warning(): Path("ECLCASE.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("ECLCASE.DATA", "dummysimulatorobject") - with pytest.raises(ecl_run.EclError) as exception_info: - run.assertECLEND() + run = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "dummyversion", "ECLCASE.DATA" + ) + with pytest.raises(run_reservoirsimulator.EclError) as exception_info: + run.assert_eclend() assert "Warning, mismatch between stated Error count" not in str( exception_info.value ) @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse def test_slave_started_message_are_not_counted_as_errors(): prt_error = f"""\ @-- ERROR AT TIME 0.0 DAYS ( 1-JAN-2000): @@ -785,9 +757,11 @@ def test_slave_started_message_are_not_counted_as_errors(): Path("ECLCASE.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("ECLCASE.DATA", "dummysimulatorobject") - with pytest.raises(ecl_run.EclError) as exception_info: - run.assertECLEND() + run = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "dummyversion", "ECLCASE.DATA" + ) + with pytest.raises(run_reservoirsimulator.EclError) as exception_info: + run.assert_eclend() assert "Warning, mismatch between stated Error count" not in str( exception_info.value ) @@ -815,6 +789,7 @@ def test_slave_started_message_are_not_counted_as_errors(): @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse @pytest.mark.parametrize( "prt_error, expected_error_list", [ @@ -855,6 +830,8 @@ def test_can_parse_errors(prt_error, expected_error_list): Path("ECLCASE.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("ECLCASE.DATA", "dummysimulatorobject") - error_list = run.parseErrors() + run = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "dummyversion", "ECLCASE.DATA" + ) + error_list = run.parse_errors() assert error_list == expected_error_list diff --git a/tests/ert/unit_tests/resources/test_run_flow_simulator.py b/tests/ert/unit_tests/resources/test_run_flow_simulator.py new file mode 100644 index 00000000000..181fc1f8013 --- /dev/null +++ b/tests/ert/unit_tests/resources/test_run_flow_simulator.py @@ -0,0 +1,126 @@ +import os +import shutil +import stat +from pathlib import Path +from subprocess import CalledProcessError + +import pytest + +from tests.ert.utils import SOURCE_DIR + +from ._import_from_location import import_from_location + +# import code from ert/forward_models package-data path. +# These are kept out of the ert package to avoid the overhead of +# importing ert. This is necessary as these may be invoked as a subprocess on +# each realization. + +run_reservoirsimulator = import_from_location( + "run_reservoirsimulator", + SOURCE_DIR / "src/ert/resources/forward_models/run_reservoirsimulator.py", +) + +FLOW_VERSION = "daily" + + +@pytest.mark.integration_test +@pytest.mark.skipif(not shutil.which("flowrun"), reason="flowrun not available") +def test_flow_can_produce_output(source_root): + shutil.copy(source_root / "test-data/ert/eclipse/SPE1.DATA", "SPE1.DATA") + run_reservoirsimulator.RunReservoirSimulator( + "flow", FLOW_VERSION, "SPE1.DATA" + ).run_flow() + assert Path("SPE1.UNSMRY").exists() + + +def test_flowrun_can_be_bypassed_when_flow_is_available(tmp_path, monkeypatch): + # Set FLOWRUN_PATH to a path guaranteed not to contain flowrun + monkeypatch.setenv("FLOWRUN_PATH", str(tmp_path)) + # Add a mocked flow to PATH + monkeypatch.setenv("PATH", f"{tmp_path}:{os.environ['PATH']}") + mocked_flow = Path(tmp_path / "flow") + mocked_flow.write_text("", encoding="utf-8") + mocked_flow.chmod(mocked_flow.stat().st_mode | stat.S_IEXEC) + (tmp_path / "DUMMY.DATA").write_text("", encoding="utf-8") + runner = run_reservoirsimulator.RunReservoirSimulator( + "flow", None, str(tmp_path / "DUMMY.DATA") + ) + assert runner.bypass_flowrun is True + + +def test_flowrun_cannot_be_bypassed_for_parallel_runs(tmp_path, monkeypatch): + # Set FLOWRUN_PATH to a path guaranteed not to contain flowrun + monkeypatch.setenv("FLOWRUN_PATH", str(tmp_path)) + # Add a mocked flow to PATH + monkeypatch.setenv("PATH", f"{tmp_path}:{os.environ['PATH']}") + mocked_flow = Path(tmp_path / "flow") + mocked_flow.write_text("", encoding="utf-8") + mocked_flow.chmod(mocked_flow.stat().st_mode | stat.S_IEXEC) + + with pytest.raises( + RuntimeError, match="MPI runs not supported without a flowrun wrapper" + ): + run_reservoirsimulator.RunReservoirSimulator( + "flow", None, "DUMMY.DATA", num_cpu=2 + ) + + +@pytest.mark.integration_test +@pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.skipif(not shutil.which("flow"), reason="flow not available") +def test_run_flow_with_no_flowrun(tmp_path, monkeypatch, source_root): + # Set FLOWRUN_PATH to a path guaranteed not to contain flowrun + monkeypatch.setenv("FLOWRUN_PATH", str(tmp_path)) + shutil.copy(source_root / "test-data/ert/eclipse/SPE1.DATA", "SPE1.DATA") + run_reservoirsimulator.RunReservoirSimulator("flow", None, "SPE1.DATA").run_flow() + assert Path("SPE1.UNSMRY").exists() + + +@pytest.mark.integration_test +@pytest.mark.skipif(not shutil.which("flowrun"), reason="flowrun not available") +def test_flowrunner_will_raise_when_flow_fails(source_root): + shutil.copy( + source_root / "test-data/ert/eclipse/SPE1_ERROR.DATA", "SPE1_ERROR.DATA" + ) + with pytest.raises(CalledProcessError, match="returned non-zero exit status 1"): + run_reservoirsimulator.RunReservoirSimulator( + "flow", FLOW_VERSION, "SPE1_ERROR.DATA" + ).run_flow() + + +@pytest.mark.integration_test +@pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.skipif(not shutil.which("flowrun"), reason="flowrun not available") +def test_flowrunner_will_can_ignore_flow_errors(source_root): + shutil.copy( + source_root / "test-data/ert/eclipse/SPE1_ERROR.DATA", "SPE1_ERROR.DATA" + ) + run_reservoirsimulator.RunReservoirSimulator( + "flow", FLOW_VERSION, "SPE1_ERROR.DATA", check_status=False + ).run_flow() + + +@pytest.mark.integration_test +@pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.skipif(not shutil.which("flowrun"), reason="flowrun not available") +def test_flowrunner_will_raise_on_unknown_version(): + Path("DUMMY.DATA").touch() + with pytest.raises(CalledProcessError): + run_reservoirsimulator.RunReservoirSimulator( + "flow", "garbled_version", "DUMMY.DATA" + ).run_flow() + + +@pytest.mark.integration_test +@pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.skipif(not shutil.which("flowrun"), reason="flowrun not available") +def test_flow_with_parallel_keyword(source_root): + """This only tests that ERT will be able to start flow on a data deck with + the PARALLEL keyword present. It does not assert anything regarding whether + MPI-parallelization will get into play.""" + shutil.copy( + source_root / "test-data/ert/eclipse/SPE1_PARALLEL.DATA", "SPE1_PARALLEL.DATA" + ) + run_reservoirsimulator.RunReservoirSimulator( + "flow", FLOW_VERSION, "SPE1_PARALLEL.DATA" + ).run_flow() diff --git a/tests/ert/unit_tests/resources/test_run_reservoirsimulator.py b/tests/ert/unit_tests/resources/test_run_reservoirsimulator.py new file mode 100644 index 00000000000..717e894a52c --- /dev/null +++ b/tests/ert/unit_tests/resources/test_run_reservoirsimulator.py @@ -0,0 +1,158 @@ +import os +import stat +import sys +import threading +import time +from pathlib import Path + +import numpy as np +import pytest +import resfo + +from tests.ert.utils import SOURCE_DIR + +from ._import_from_location import import_from_location + +# import run_reservoirsimulator from ert/resources/forward_models +# package-data. This is kept out of the ert package to avoid the +# overhead of importing ert. This is necessary as these may be invoked as a +# subprocess on each realization. + + +run_reservoirsimulator = import_from_location( + "run_reservoirsimulator", + SOURCE_DIR / "src/ert/resources/forward_models/run_reservoirsimulator.py", +) +ecl_case_to_data_file = import_from_location( + "ecl_case_to_data_file", + SOURCE_DIR / "src/ert/resources/forward_models/run_reservoirsimulator.py", +) + + +@pytest.fixture(name="mocked_eclrun") +def fixture_mocked_eclrun(use_tmpdir, monkeypatch): + """This puts a eclrun binary in path that cannot do anything.""" + eclrun_bin = Path("bin/eclrun") + eclrun_bin.parent.mkdir() + eclrun_bin.write_text("", encoding="utf-8") + eclrun_bin.chmod(eclrun_bin.stat().st_mode | stat.S_IEXEC) + monkeypatch.setenv("PATH", f"bin:{os.environ['PATH']}") + + +def test_unknown_simulator(): + with pytest.raises(ValueError, match="Unknown simulator"): + run_reservoirsimulator.RunReservoirSimulator( + "bogus_flow", "mocked_version", "bogus_deck.DATA" + ) + + +@pytest.mark.usefixtures("mocked_eclrun") +def test_runner_fails_on_missing_data_file(): + with pytest.raises(OSError, match="No such file: NOTEXISTING.DATA"): + run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "mocked_version", "NOTEXISTING.DATA" + ) + + +@pytest.mark.usefixtures("mocked_eclrun") +def test_runner_can_find_deck_without_extension(): + Path("DECK.DATA").write_text("FOO", encoding="utf-8") + runner = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "mocked_version", "DECK" + ) + assert runner.data_file == "DECK.DATA" + + +@pytest.mark.usefixtures("mocked_eclrun") +def test_runner_can_find_lowercase_deck_without_extension(): + Path("deck.data").write_text("FOO", encoding="utf-8") + runner = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "mocked_version", "deck" + ) + assert runner.data_file == "deck.data" + + +@pytest.mark.skipif( + sys.platform.startswith("darwin"), reason="Case insensitive filesystem on MacOS" +) +@pytest.mark.usefixtures("mocked_eclrun") +def test_runner_cannot_find_mixed_case_decks(): + Path("deck.DATA").write_text("FOO", encoding="utf-8") + with pytest.raises(OSError, match="No such file: deck.data"): + run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "mocked_version", "deck" + ) + + +@pytest.mark.usefixtures("mocked_eclrun") +@pytest.mark.parametrize( + "data_path, expected", + [ + ("DECK.DATA", "DECK"), + ("foo/DECK.DATA", "DECK"), + ("foo/deck.data", "deck"), + ], +) +def test_runner_can_extract_base_name(data_path: str, expected: str): + Path(data_path).parent.mkdir(exist_ok=True) + Path(data_path).write_text("FOO", encoding="utf-8") + runner = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "mocked_version", data_path + ) + assert runner.base_name == expected + + +@pytest.mark.usefixtures("use_tmpdir") +def test_await_completed_summary_file_will_timeout_on_missing_smry(): + assert ( + # Expected wait time is 0.3 + run_reservoirsimulator.await_completed_unsmry_file( + "SPE1.UNSMRY", max_wait=0.3, poll_interval=0.1 + ) + > 0.3 + ) + + +@pytest.mark.usefixtures("use_tmpdir") +def test_await_completed_summary_file_will_return_asap(): + resfo.write("FOO.UNSMRY", [("INTEHEAD", np.array([1], dtype=np.int32))]) + assert ( + 0.01 + # Expected wait time is the poll_interval + < run_reservoirsimulator.await_completed_unsmry_file( + "FOO.UNSMRY", max_wait=0.5, poll_interval=0.1 + ) + < 0.4 + ) + + +@pytest.mark.flaky(reruns=5) +@pytest.mark.integration_test +@pytest.mark.usefixtures("use_tmpdir") +def test_await_completed_summary_file_will_wait_for_slow_smry(): + # This is a timing test, and has inherent flakiness: + # * Reading and writing to the same smry file at the same time + # can make the reading throw an exception every time, and can + # result in max_wait triggering. + # * If the writer thread is starved, two consecutive polls may + # yield the same summary length, resulting in premature exit. + # * Heavily loaded hardware can make everything go too slow. + def slow_smry_writer(): + for size in range(10): + resfo.write( + "FOO.UNSMRY", (size + 1) * [("INTEHEAD", np.array([1], dtype=np.int32))] + ) + time.sleep(0.05) + + thread = threading.Thread(target=slow_smry_writer) + thread.start() + time.sleep(0.1) # Let the thread start writing + assert ( + 0.5 + # Minimal wait time is around 0.55 + < run_reservoirsimulator.await_completed_unsmry_file( + "FOO.UNSMRY", max_wait=4, poll_interval=0.21 + ) + < 2 + ) + thread.join()