diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index c7fef6e5b8c..1c26ba624a2 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -69,6 +69,18 @@ jobs: python-version: ${{ matrix.python-version }} secrets: inherit + test-ert-with-flow: + strategy: + fail-fast: false + matrix: + os: [ ubuntu-latest ] + python-version: [ '3.11', '3.12' ] + uses: ./.github/workflows/test_ert_with_flow.yml + with: + os: ${{ matrix.os }} + python-version: ${{ matrix.python-version }} + secrets: inherit + test-mac-ert: if: github.ref_type != 'tag' # when not tag strategy: diff --git a/.github/workflows/test_ert_with_flow.yml b/.github/workflows/test_ert_with_flow.yml new file mode 100644 index 00000000000..892424ae1cb --- /dev/null +++ b/.github/workflows/test_ert_with_flow.yml @@ -0,0 +1,52 @@ +on: + workflow_call: + inputs: + os: + type: string + python-version: + type: string + +env: + UV_SYSTEM_PYTHON: 1 + +jobs: + test-ert-with-flow: + name: Run ert tests + timeout-minutes: 20 + runs-on: ${{ inputs.os }} + + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + id: setup_python + with: + python-version: ${{ inputs.python-version }} + + - name: Install uv + uses: astral-sh/setup-uv@v4 + + - name: Install ert + run: + uv pip install ".[dev]" + + - name: Install flow + run: | + set -e + sudo apt install software-properties-common + sudo apt-add-repository ppa:opm/ppa + sudo apt update + sudo apt install mpi-default-bin + sudo apt install libopm-simulators-bin python3-opm-common + + which flow + flow --version + + - name: Run integration tests towards OPM flow without flowrun + run: | + set -e + pytest tests/ert/unit_tests/resources/test_run_flow_simulator.py + + cd test-data/ert/flow_example + perl -p -i -e 's/NUM_REALIZATIONS\s*12/NUM_REALIZATIONS 2/g' flow.ert + ert ensemble_experiment flow.ert --disable-monitor diff --git a/src/ert/config/ert_config.py b/src/ert/config/ert_config.py index deef1a1d90a..7afe5495919 100644 --- a/src/ert/config/ert_config.py +++ b/src/ert/config/ert_config.py @@ -732,10 +732,15 @@ def _create_list_of_forward_model_steps_to_run( cls, installed_steps: dict[str, ForwardModelStep], substitutions: Substitutions, - config_dict, + config_dict: dict, ) -> list[ForwardModelStep]: errors = [] fm_steps = [] + + env_vars = {} + for key, val in config_dict.get("SETENV", []): + env_vars[key] = val + for fm_step_description in config_dict.get(ConfigKeys.FORWARD_MODEL, []): if len(fm_step_description) > 1: unsubstituted_step_name, args = fm_step_description @@ -800,9 +805,15 @@ def _create_list_of_forward_model_steps_to_run( context=substitutions, forward_model_steps=[fm_step], skip_pre_experiment_validation=True, + env_vars=env_vars, ) - job_json = substituted_json["jobList"][0] - fm_step.validate_pre_experiment(job_json) + fm_json = substituted_json["jobList"][0] + fm_json["environment"] = { + **cls.ENV_PR_FM_STEP.get(fm_step.name, {}), # plugins + **fm_json["environment"], # setenv + **substituted_json["global_environment"], + } + fm_step.validate_pre_experiment(fm_json) except ForwardModelStepValidationError as err: errors.append( ConfigValidationError.with_context( diff --git a/src/ert/plugins/hook_implementations/forward_model_steps.py b/src/ert/plugins/hook_implementations/forward_model_steps.py index e1a4442cdd7..02f2421d1c0 100644 --- a/src/ert/plugins/hook_implementations/forward_model_steps.py +++ b/src/ert/plugins/hook_implementations/forward_model_steps.py @@ -1,12 +1,9 @@ -import os import shutil import subprocess from pathlib import Path from textwrap import dedent from typing import Literal -import yaml - from ert import ( ForwardModelStepDocumentation, ForwardModelStepJSON, @@ -14,7 +11,6 @@ ForwardModelStepValidationError, plugin, ) -from ert.plugins import ErtPluginManager class CarefulCopyFile(ForwardModelStepPlugin): @@ -207,12 +203,12 @@ def __init__(self) -> None: str( ( Path(__file__) - / "../../../resources/forward_models/res/script/ecl100.py" + / "../../../resources/forward_models/run_reservoirsimulator.py" ).resolve() ), - "", - "-v", + "eclipse", "", + "", "-n", "", "", @@ -220,18 +216,20 @@ def __init__(self) -> None: default_mapping={"": 1, "": ""}, ) - def validate_pre_experiment(self, _: ForwardModelStepJSON) -> None: + def validate_pre_experiment(self, fm_json: ForwardModelStepJSON) -> None: if "" not in self.private_args: raise ForwardModelStepValidationError( "Forward model step ECLIPSE100 must be given a VERSION argument" ) version = self.private_args[""] - available_versions = _available_eclrun_versions(simulator="eclipse") + available_versions = _available_eclrun_versions( + simulator="eclipse", env_vars=fm_json["environment"] + ) if available_versions and version not in available_versions: raise ForwardModelStepValidationError( - f"Unavailable ECLIPSE100 version {version} current supported " - f"versions {available_versions}" + f"Unavailable ECLIPSE100 version {version}. " + f"Available versions: {available_versions}" ) @staticmethod @@ -265,12 +263,12 @@ def __init__(self) -> None: str( ( Path(__file__) - / "../../../resources/forward_models/res/script/ecl300.py" + / "../../../resources/forward_models/run_reservoirsimulator.py" ).resolve() ), - "", - "-v", + "e300", "", + "", "-n", "", "", @@ -278,17 +276,22 @@ def __init__(self) -> None: default_mapping={"": 1, "": "", "": "version"}, ) - def validate_pre_experiment(self, _: ForwardModelStepJSON) -> None: + def validate_pre_experiment( + self, + fm_step_json: ForwardModelStepJSON, + ) -> None: if "" not in self.private_args: raise ForwardModelStepValidationError( "Forward model step ECLIPSE300 must be given a VERSION argument" ) version = self.private_args[""] - available_versions = _available_eclrun_versions(simulator="e300") + available_versions = _available_eclrun_versions( + simulator="e300", env_vars=fm_step_json["environment"] + ) if available_versions and version not in available_versions: raise ForwardModelStepValidationError( - f"Unavailable ECLIPSE300 version {version} current supported " - f"versions {available_versions}" + f"Unavailable ECLIPSE300 version {version}. " + f"Available versions: {available_versions}" ) @staticmethod @@ -317,12 +320,12 @@ def __init__(self) -> None: str( ( Path(__file__) - / "../../../resources/forward_models/res/script/flow.py" + / "../../../resources/forward_models/run_reservoirsimulator.py" ).resolve() ), - "", - "-v", + "flow", "", + "", "-n", "", "", @@ -628,33 +631,22 @@ def installable_forward_model_steps() -> list[type[ForwardModelStepPlugin]]: return [*_UpperCaseFMSteps, *_LowerCaseFMSteps] -def _available_eclrun_versions(simulator: Literal["eclipse", "e300"]) -> list[str]: - if shutil.which("eclrun") is None: - return [] - pm = ErtPluginManager() - ecl_config_path = ( - pm.get_ecl100_config_path() - if simulator == "eclipse" - else pm.get_ecl300_config_path() - ) - - if not ecl_config_path: - return [] - eclrun_env = {"PATH": os.getenv("PATH", "")} - - with open(ecl_config_path, encoding="utf-8") as f: - try: - config = yaml.safe_load(f) - except yaml.YAMLError as e: - raise ValueError(f"Failed parse: {ecl_config_path} as yaml") from e - ecl_install_path = config.get("eclrun_env", {}).get("PATH", "") - eclrun_env["PATH"] = eclrun_env["PATH"] + os.pathsep + ecl_install_path - +def _available_eclrun_versions( + simulator: Literal["eclipse", "e300"], env_vars: dict[str, str] +) -> list[str]: + eclrun_path = env_vars.get("ECLRUN_PATH", "") try: + eclrun_abspath = shutil.which(Path(eclrun_path) / "eclrun") + if eclrun_abspath is None: + return [] return ( subprocess.check_output( - ["eclrun", "--report-versions", simulator], - env=eclrun_env, + [ + eclrun_abspath, + simulator, + "--report-versions", + ], + env=env_vars, ) .decode("utf-8") .strip() diff --git a/src/ert/resources/forward_models/res/script/ecl100.py b/src/ert/resources/forward_models/res/script/ecl100.py deleted file mode 100755 index d7ceeb71b29..00000000000 --- a/src/ert/resources/forward_models/res/script/ecl100.py +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env python -import sys - -from ecl_config import Ecl100Config -from ecl_run import run - -if __name__ == "__main__": - config = Ecl100Config() - run(config, [arg for arg in sys.argv[1:] if len(arg) > 0]) diff --git a/src/ert/resources/forward_models/res/script/ecl100_config.yml b/src/ert/resources/forward_models/res/script/ecl100_config.yml deleted file mode 100644 index e84aab694d1..00000000000 --- a/src/ert/resources/forward_models/res/script/ecl100_config.yml +++ /dev/null @@ -1,14 +0,0 @@ -# NB: There are inter related dependencies between this file, the EclConfig -# class which essentially internalizes this file and the EclRun class -# which uses the EclConfig class. -# -# You can have a shared 'env' attribute which is an environment variable map -# which will be set before the simulator starts. If there are env settings in -# the simulator as well they will be merged with these common settings, the -# simulator specific take presedence. - -eclrun_env: - SLBSLS_LICENSE_FILE: something@yourcompany.com - ECLPATH: /path/to/ecl - PATH: /path/to/ecl/macros - LSB_JOBID: null diff --git a/src/ert/resources/forward_models/res/script/ecl300.py b/src/ert/resources/forward_models/res/script/ecl300.py deleted file mode 100755 index 480a30a27e5..00000000000 --- a/src/ert/resources/forward_models/res/script/ecl300.py +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env python -import sys - -from ecl_config import Ecl300Config -from ecl_run import run - -if __name__ == "__main__": - config = Ecl300Config() - run(config, [arg for arg in sys.argv[1:] if len(arg) > 0]) diff --git a/src/ert/resources/forward_models/res/script/ecl300_config.yml b/src/ert/resources/forward_models/res/script/ecl300_config.yml deleted file mode 100644 index 7c0b1c492c3..00000000000 --- a/src/ert/resources/forward_models/res/script/ecl300_config.yml +++ /dev/null @@ -1,8 +0,0 @@ -# This is an example configuration file for the local ecl300 installation, see -# the ecl100_config.yml file for more extensive documentation. - -eclrun_env: - SLBSLS_LICENSE_FILE: something@yourcompany.com - ECLPATH: /path/to/ecl - PATH: /path/to/ecl/macros - LSB_JOBID: null diff --git a/src/ert/resources/forward_models/res/script/ecl_config.py b/src/ert/resources/forward_models/res/script/ecl_config.py deleted file mode 100644 index b0bbeb9196c..00000000000 --- a/src/ert/resources/forward_models/res/script/ecl_config.py +++ /dev/null @@ -1,278 +0,0 @@ -import os -import re -import shutil -import subprocess -import sys -from pathlib import Path -from typing import Any - -import yaml - - -def re_getenv(match_obj): - match_str = match_obj.group(1) - variable = match_str[1:] - return os.getenv(variable, default=match_str) - - -def _replace_env(env: dict) -> dict: - """Small utility function will take a dict as input, and create a new - dictionary where all $VARIABLE in the values has been replaced with - getenv("VARIABLE"). Variables which are not recognized are left - unchanged.""" - new_env = {} - for key, value in env.items(): - new_env[key] = re.sub(r"(\$[A-Z0-9_]+)", re_getenv, value) - - return new_env - - -class Keys: - default_version: str = "default_version" - default: str = "default" - versions: str = "versions" - env: str = "env" - mpi: str = "mpi" - mpirun: str = "mpirun" - executable: str = "executable" - scalar: str = "scalar" - - -class Simulator: - """Small 'struct' with the config information for one simulator.""" - - def __init__( - self, - version: str, - executable: str, - env: dict[str, str], - mpirun: str | None = None, - ): - self.version: str = version - if not os.access(executable, os.X_OK): - raise OSError(f"The executable: '{executable}' can not be executed by user") - - self.executable: str = executable - self.env: dict[str, str] = env - self.mpirun: str | None = mpirun - self.name: str = "simulator" - - if mpirun is not None and not os.access(mpirun, os.X_OK): - raise OSError(f"The mpirun binary: '{mpirun}' is not executable by user") - - def __repr__(self) -> str: - mpistring: str = "" - if self.mpirun: - mpistring = " MPI" - return ( - f"{self.name}(version={self.version}, " - f"executable={self.executable}{mpistring})" - ) - - -class EclConfig: - """Represent the eclipse configuration at a site. - - The EclConfig class internalizes information of where the various eclipse - programs are installed on site, and which environment variables need to be - set before the simulator starts. The class is based on parsing a yaml - formatted configuration file, the source distribution contains commented - example file. - - """ - - def __init__(self, config_file: str, simulator_name: str = "not_set"): - with open(config_file, encoding="utf-8") as f: - try: - config = yaml.safe_load(f) - except yaml.YAMLError as e: - raise ValueError(f"Failed parse: {config_file} as yaml") from e - - self._config: dict = config - self._config_file: str = os.path.abspath(config_file) - self.simulator_name: str = simulator_name - - def __contains__(self, version: str) -> bool: - if version in self._config[Keys.versions]: - return True - - return self.default_version is not None and version in [None, Keys.default] - - def get_eclrun_env(self) -> dict[str, str] | None: - if "eclrun_env" in self._config: - return self._config["eclrun_env"].copy() - return None - - @property - def default_version(self) -> str | None: - return self._config.get(Keys.default_version) - - def _get_version(self, version_arg: str | None) -> str: - if version_arg in [None, Keys.default]: - version = self.default_version - else: - version = version_arg - - if version is None: - raise ValueError( - "The default version has not been " - f"set in the config file:{self._config_file}" - ) - - return version - - def _get_env(self, version: str, exe_type: str) -> dict[str, str]: - env: dict[str, str] = {} - env.update(self._config.get(Keys.env, {})) - - mpi_sim: dict[str, Any] = self._config[Keys.versions][ - self._get_version(version) - ][exe_type] - env.update(mpi_sim.get(Keys.env, {})) - - return _replace_env(env) - - def _get_sim(self, version: str | None, exe_type: str) -> Simulator: - version = self._get_version(version) - binaries: dict[str, str] = self._config[Keys.versions][version][exe_type] - mpirun = binaries[Keys.mpirun] if exe_type == Keys.mpi else None - return Simulator( - version, - binaries[Keys.executable], - self._get_env(version, exe_type), - mpirun=mpirun, - ) - - def sim(self, version: str | None = None) -> Simulator: - """Will return an object describing the simulator. - - Available attributes are 'executable' and 'env'. Observe that the - executable path is validated when you instantiate the Simulator object; - so if the executable key in the config file points to non-existing file - you will not get the error before this point. - """ - return self._get_sim(version, Keys.scalar) - - def mpi_sim(self, version: str | None = None) -> Simulator: - """MPI version of method sim()""" - return self._get_sim(version, Keys.mpi) - - def simulators(self, strict: bool = True) -> list[Simulator]: - simulators = [] - for version in self._config[Keys.versions]: - for exe_type in self._config[Keys.versions][version]: - if strict: - sim = self._get_sim(version, exe_type) - else: - try: - sim = self._get_sim(version, exe_type) - except OSError: - sys.stderr.write( - "Failed to create simulator object for: " - f"version:{version} {exe_type}\n" - ) - sim = None - - if sim: - simulators.append(sim) - return simulators - - -class Ecl100Config(EclConfig): - DEFAULT_CONFIG_FILE: str = os.path.join( - os.path.dirname(__file__), "ecl100_config.yml" - ) - - def __init__(self): - config_file = os.getenv("ECL100_SITE_CONFIG", default=self.DEFAULT_CONFIG_FILE) - super().__init__(config_file, simulator_name="eclipse") - - -class Ecl300Config(EclConfig): - DEFAULT_CONFIG_FILE: str = os.path.join( - os.path.dirname(__file__), "ecl300_config.yml" - ) - - def __init__(self): - config_file = os.getenv("ECL300_SITE_CONFIG", default=self.DEFAULT_CONFIG_FILE) - super().__init__(config_file, simulator_name="e300") - - -class FlowConfig(EclConfig): - DEFAULT_CONFIG_FILE: str = os.path.join( - os.path.dirname(__file__), "flow_config.yml" - ) - - def __init__(self): - config_file = os.getenv("FLOW_SITE_CONFIG", default=self.DEFAULT_CONFIG_FILE) - if not Path(config_file).exists(): - config_file = self.init_flow_config() - super().__init__(config_file, simulator_name="flow") - - @staticmethod - def init_flow_config() -> str: - binary_path = shutil.which("flow") - if binary_path is None: - raise FileNotFoundError( - "Could not find flow executable!\n" - " Requires flow to be installed in $PATH" - ) - - conf = { - "default_version": "default", - "versions": {"default": {"scalar": {"executable": binary_path}}}, - } - flow_config_yml = Path("flow_config.yml") - flow_config_yml.write_text(yaml.dump(conf), encoding="utf-8") - return str(flow_config_yml.absolute()) - - -class EclrunConfig: - """This class contains configurations for using the new eclrun binary - for running eclipse. It uses the old configurations classes above to - get the configuration in the ECLX00_SITE_CONFIG files. - """ - - def __init__(self, config: EclConfig, version: str): - self.simulator_name: str = config.simulator_name - self.run_env: dict[str, str] | None = self._get_run_env(config.get_eclrun_env()) - self.version: str = version - - @staticmethod - def _get_run_env(eclrun_env: dict[str, str] | None) -> dict[str, str] | None: - if eclrun_env is None: - return None - - env: dict = os.environ.copy() - if "PATH" in eclrun_env: - env["PATH"] = eclrun_env["PATH"] + os.pathsep + env["PATH"] - eclrun_env.pop("PATH") - - for key, value in eclrun_env.copy().items(): - if value is None: - if key in env: - env.pop(key) - eclrun_env.pop(key) - - env.update(eclrun_env) - return env - - def _get_available_eclrun_versions(self) -> list[str]: - try: - return ( - subprocess.check_output( - ["eclrun", "--report-versions", self.simulator_name], - env=self.run_env, - ) - .decode("utf-8") - .strip() - .split(" ") - ) - except subprocess.CalledProcessError: - return [] - - def can_use_eclrun(self) -> bool: - if self.run_env is None: - return False - - return self.version in self._get_available_eclrun_versions() diff --git a/src/ert/resources/forward_models/res/script/ecl_run.py b/src/ert/resources/forward_models/res/script/ecl_run.py deleted file mode 100644 index 561b920a165..00000000000 --- a/src/ert/resources/forward_models/res/script/ecl_run.py +++ /dev/null @@ -1,588 +0,0 @@ -import datetime -import glob -import os -import os.path -import re -import socket -import subprocess -import sys -import time -from argparse import ArgumentParser -from collections import namedtuple -from contextlib import contextmanager -from pathlib import Path -from random import random - -import resfo -from ecl_config import EclConfig, EclrunConfig, Simulator - - -def ecl_output_has_license_error(ecl_output: str): - return ( - "LICENSE ERROR" in ecl_output - or "LICENSE FAILURE" in ecl_output - or "not allowed in license" in ecl_output - ) - - -class EclError(RuntimeError): - def failed_due_to_license_problems(self) -> bool: - # self.args[0] contains the multiline ERROR messages and SLAVE startup messages - if ecl_output_has_license_error(self.args[0]): - return True - if re.search(a_slave_failed_pattern, self.args[0]): - for match in re.finditer(slave_run_paths, self.args[0], re.MULTILINE): - (ecl_case_starts_with, ecl_case_dir) = match.groups() - for prt_file in glob.glob( - f"{ecl_case_dir}/{ecl_case_starts_with}*.PRT" - ): - if ecl_output_has_license_error( - Path(prt_file).read_text(encoding="utf-8") - ): - return True - return False - - -EclipseResult = namedtuple("EclipseResult", "errors bugs") -body_sub_pattern = r"(\s^\s@.+$)*" -date_sub_pattern = r"\s+AT TIME\s+(?P\d+\.\d+)\s+DAYS\s+\((?P(.+)):\s*$" -error_pattern_e100 = ( - rf"^\s@-- ERROR\s(FROM PROCESSOR \d+)?{date_sub_pattern}${body_sub_pattern}" -) -error_pattern_e300 = rf"^\s@--Error${body_sub_pattern}" -slave_started_pattern = ( - rf"^\s@--MESSAGE{date_sub_pattern}\s^\s@\s+STARTING SLAVE.+${body_sub_pattern}" -) -a_slave_failed_pattern = r"\s@\s+SLAVE RUN.*HAS STOPPED WITH AN ERROR CONDITION.\s*" -slave_run_paths = r"^\s@\s+STARTING SLAVE\s+[^ ]+RUNNING \([^ ]\)\s*$" -slave_run_paths = r"\s@\s+STARTING SLAVE .* RUNNING (\w+)\s*^\s@\s+ON HOST.*IN DIRECTORY\s*^\s@\s+(.*)" - - -def make_LSB_MCPU_machine_list(LSB_MCPU_HOSTS): - host_numcpu_list = LSB_MCPU_HOSTS.split() - host_list = [] - for index in range(len(host_numcpu_list) // 2): - machine = host_numcpu_list[2 * index] - host_numcpu = int(host_numcpu_list[2 * index + 1]) - for _ in range(host_numcpu): - host_list.append(machine) - return host_list - - -def _expand_SLURM_range(rs): - if "-" in rs: - tmp = rs.split("-") - return range(int(tmp[0]), int(tmp[1]) + 1) - else: - return [int(rs)] - - -def _expand_SLURM_node(node_string): - match_object = re.match(r"(?P[^[]+)\[(?P[-0-9,]+)\]", node_string) - if match_object: - node_list = [] - base = match_object.groupdict()["base"] - range_string = match_object.groupdict()["range"] - for rs in range_string.split(","): - for num in _expand_SLURM_range(rs): - node_list.append(f"{base}{num}") - return node_list - else: - return [node_string] - - -def _expand_SLURM_task_count(task_count_string): - match_object = re.match(r"(?P\d+)(\(x(?P\d+)\))?", task_count_string) - if match_object: - match_dict = match_object.groupdict() - print(match_dict) - count = int(match_dict["count"]) - mult_string = match_dict["mult"] - mult = 1 if mult_string is None else int(mult_string) - - return [count] * mult - else: - raise ValueError(f"Failed to parse SLURM_TASKS_PER_NODE: {task_count_string}") - - -# The list of available machines/nodes and how many tasks each node should get -# is available in the slurm environment variables SLURM_JOB_NODELIST and -# SLURM_TASKS_PER_NODE. These string variables are in an incredibly compact -# notation, and there are some hoops to expand them. The short description is: -# -# 1. They represent flat lists of hostnames and the number of cpu's on that -# host respectively. -# -# 2. The outer structure is a ',' separated lis. -# -# 3. The items in SLURM_JOB_NODELIST have a compact notation -# base-[n1-n2,n3-n4] which is expanded to the nodelist: [base-n1, -# base-n1+1, base-n1+2, ... , base-n4-1, base-n4] -# -# 4. The SLURM_TASK_PER_NODE items has the compact notation 3(x4) which -# implies that four consecutive nodes (from the expanded -# SLURM_JOB_NODELIST) should have three CPUs each. -# -# For further details see the sbatch manual page. - - -def make_SLURM_machine_list(SLURM_JOB_NODELIST, SLURM_TASKS_PER_NODE): - # We split on ',' - but not on ',' which is inside a [...] - split_re = ",(?![^[]*\\])" - nodelist = [] - for node_string in re.split(split_re, SLURM_JOB_NODELIST): - nodelist += _expand_SLURM_node(node_string) - - task_count_list = [] - for task_count_string in SLURM_TASKS_PER_NODE.split(","): - task_count_list += _expand_SLURM_task_count(task_count_string) - - host_list = [] - for node, count in zip(nodelist, task_count_list, strict=False): - host_list += [node] * count - - return host_list - - -def make_LSB_machine_list(LSB_HOSTS): - return LSB_HOSTS.split() - - -@contextmanager -def pushd(run_path): - starting_directory = os.getcwd() - os.chdir(run_path) - yield - os.chdir(starting_directory) - - -def find_unsmry(basepath: Path) -> Path | None: - def _is_unsmry(base: str, path: str) -> bool: - if "." not in path: - return False - splitted = path.split(".") - return splitted[-2].endswith(base) and splitted[-1].lower() in [ - "unsmry", - "funsmry", - ] - - base = basepath.name - candidates: list[str] = list( - filter(lambda x: _is_unsmry(base, x), glob.glob(str(basepath) + "*")) - ) - if not candidates: - return None - if len(candidates) > 1: - raise ValueError( - f"Ambiguous reference to unsmry in {basepath}, could be any of {candidates}" - ) - return Path(candidates[0]) - - -def await_completed_unsmry_file( - smry_path: Path, max_wait: float = 15, poll_interval: float = 1.0 -) -> float: - """This function will wait until the provided smry file does not grow in size - during one poll interval. - - Such a wait is sometimes needed when different MPI hosts write data to a shared - disk system. - - If the file does not exist or is completely unreadable to resfo, this function - will timeout to max_wait. If NOSIM is included, this will happen. - - Size is defined in terms of readable data elementes through resfo. - - This function will always wait for at least one poll interval, the polling - interval is specified in seconds. - - The return value is the waited time (in seconds)""" - start_time = datetime.datetime.now() - prev_len = 0 - while (datetime.datetime.now() - start_time).total_seconds() < max_wait: - try: - resfo_sum = [r.read_keyword() for r in resfo.lazy_read(smry_path)] - except Exception: - time.sleep(poll_interval) - continue - - current_len = len(resfo_sum) - if prev_len == current_len: - # smry file is regarded complete - break - else: - prev_len = max(prev_len, current_len) - - time.sleep(poll_interval) - - return (datetime.datetime.now() - start_time).total_seconds() - - -class EclRun: - """Wrapper class to run Eclipse simulations. - - The EclRun class is a small wrapper class which is used to run Eclipse - simulations. It will load a configuration, i.e. where the binary is - installed and so on, from an instance of the EclConfig class. - - The main method is the runEclipse() method which will: - - 1. Set up redirection of the stdxxx file descriptors. - 2. Set the necessary environment variables. - 3. [MPI]: Create machine_file listing the nodes which should be used. - 4. fork+exec to actually run the Eclipse binary. - 5. Parse the output .PRT / .ECLEND file to check for errors. - - If the simulation fails the runEclipse() method will raise an exception. - - The class is called EclRun, and the main focus has been on running Eclipse - simulations, but it should also handle "eclipse-like" simulators, e.g. the - simulator OPM/flow. - - To actually create an executable script based on this class could in it's - simplest form be: - - #!/usr/bin/env python - import sys - from .ecl_run import EclRun - - run = EclRun() - run.runEclipse( ) - - - """ - - def __init__( - self, - ecl_case: str, - sim: Simulator, - num_cpu: int = 1, - check_status: bool = True, - summary_conversion: bool = False, - ): - self.sim = sim - self.check_status = check_status - self.num_cpu = int(num_cpu) - self.summary_conversion = summary_conversion - - # Dechipher the ecl_case argument. - input_arg = ecl_case - (_, ext) = os.path.splitext(input_arg) - if ext and ext in [".data", ".DATA"]: - data_file = input_arg - elif input_arg.islower(): - data_file = input_arg + ".data" - else: - data_file = input_arg + ".DATA" - - if not os.path.isfile(data_file): - raise OSError(f"No such file: {data_file}") - - (self.run_path, self.data_file) = os.path.split(data_file) - (self.base_name, ext) = os.path.splitext(self.data_file) - - if self.run_path is None: - self.run_path = os.getcwd() - else: - self.run_path = os.path.abspath(self.run_path) - - def runPath(self): - return self.run_path - - def baseName(self): - return self.base_name - - @property - def prt_path(self): - return Path(self.run_path) / (self.baseName() + ".PRT") - - def numCpu(self): - return self.num_cpu - - def _get_legacy_run_env(self): - my_env = os.environ.copy() - my_env.update(self.sim.env.items()) - return my_env - - def initMPI(self): - # If the environment variable LSB_MCPU_HOSTS is set we assume the job is - # running on LSF - otherwise we assume it is running on the current host. - # - # If the LSB_MCPU_HOSTS variable is indeed set it will be a string like this: - # - # host1 num_cpu1 host2 num_cpu2 ... - # - # i.e. an alternating list of hostname & number of - # cpu. Alternatively/in addition the environment variable - # LSB_HOSTS can be used. This variable is simply: - # - # host1 host1 host2 host3 - - LSB_MCPU_HOSTS = os.getenv("LSB_MCPU_HOSTS") - LSB_HOSTS = os.getenv("LSB_HOSTS") - - if LSB_MCPU_HOSTS or LSB_HOSTS: - LSB_MCPU_machine_list = make_LSB_MCPU_machine_list(LSB_MCPU_HOSTS) - LSB_machine_list = make_LSB_machine_list(LSB_HOSTS) - - if len(LSB_MCPU_machine_list) == self.num_cpu: - machine_list = LSB_MCPU_machine_list - elif len(LSB_machine_list) == self.num_cpu: - machine_list = LSB_machine_list - else: - raise EclError( - "LSF / MPI problems. " - f"Asked for:{self.num_cpu} cpu. " - f'LSB_MCPU_HOSTS: "{LSB_MCPU_HOSTS}" LSB_HOSTS: "{LSB_HOSTS}"' - ) - elif os.getenv("SLURM_JOB_NODELIST"): - machine_list = make_SLURM_machine_list( - os.getenv("SLURM_JOB_NODELIST"), os.getenv("SLURM_TASKS_PER_NODE") - ) - if len(machine_list) != self.num_cpu: - raise EclError( - f"SLURM / MPI problems - asked for {self.num_cpu} - " - f"got {len(machine_list)} nodes" - ) - else: - localhost = socket.gethostname() - machine_list = [localhost] * self.num_cpu - - self.machine_file = f"{self.base_name}.mpi" - with open(self.machine_file, "w", encoding="utf-8") as filehandle: - for host in machine_list: - filehandle.write(f"{host}\n") - - def _get_run_command(self, eclrun_config: EclrunConfig): - summary_conversion = "yes" if self.summary_conversion else "no" - return [ - "eclrun", - "-v", - eclrun_config.version, - eclrun_config.simulator_name, - f"{self.base_name}.DATA", - "--summary-conversion", - summary_conversion, - ] - - def _get_legacy_run_command(self): - if self.num_cpu == 1: - return [self.sim.executable, self.base_name] - else: - self.initMPI() - return [ - self.sim.mpirun, - "-machinefile", - self.machine_file, - "-np", - str(self.num_cpu), - self.sim.executable, - self.base_name, - ] - - def execEclipse(self, eclrun_config=None) -> int: - use_eclrun = eclrun_config is not None - - with pushd(self.run_path): - if not os.path.exists(self.data_file): - raise OSError(f"Can not find data_file:{self.data_file}") - if not os.access(self.data_file, os.R_OK): - raise OSError(f"Can not read data file:{self.data_file}") - - command = ( - self._get_run_command(eclrun_config) - if use_eclrun - else self._get_legacy_run_command() - ) - env = eclrun_config.run_env if use_eclrun else self._get_legacy_run_env() - - return subprocess.run( - command, - env=env, - check=False, - ).returncode - - LICENSE_FAILURE_RETRY_INITIAL_SLEEP = 90 - LICENSE_RETRY_STAGGER_FACTOR = 60 - LICENSE_RETRY_BACKOFF_EXPONENT = 3 - - def runEclipse(self, eclrun_config=None, retries_left=3, backoff_sleep=None): - backoff_sleep = ( - self.LICENSE_FAILURE_RETRY_INITIAL_SLEEP - if backoff_sleep is None - else backoff_sleep - ) - return_code = self.execEclipse(eclrun_config=eclrun_config) - - OK_file = os.path.join(self.run_path, f"{self.base_name}.OK") - if not self.check_status: - with open(OK_file, "w", encoding="utf-8") as f: - f.write("ECLIPSE simulation complete - NOT checked for errors.") - else: - if return_code != 0: - command = ( - self._get_run_command(eclrun_config) - if self.sim is None - else self._get_legacy_run_command() - ) - raise subprocess.CalledProcessError(return_code, command) - - try: - self.assertECLEND() - except EclError as err: - if err.failed_due_to_license_problems() and retries_left > 0: - time_to_wait = backoff_sleep + int( - random() * self.LICENSE_RETRY_STAGGER_FACTOR - ) - sys.stderr.write( - "ECLIPSE failed due to license failure " - f"retrying in {time_to_wait} seconds\n" - ) - time.sleep(time_to_wait) - self.runEclipse( - eclrun_config, - retries_left=retries_left - 1, - backoff_sleep=int( - backoff_sleep * self.LICENSE_RETRY_BACKOFF_EXPONENT - ), - ) - return - else: - raise err from None - if self.num_cpu > 1: - await_completed_unsmry_file( - find_unsmry(Path(self.run_path) / self.base_name) - ) - - with open(OK_file, "w", encoding="utf-8") as f: - f.write("ECLIPSE simulation OK") - - def assertECLEND(self): - tail_length = 5000 - result = self.readECLEND() - if result.errors > 0: - error_list = self.parseErrors() - sep = "\n\n...\n\n" - error_and_slave_msg = sep.join(error_list) - extra_message = "" - error_messages = [ - error for error in error_list if not "STARTING SLAVE" in str(error) - ] - if result.errors != len(error_messages): - extra_message = ( - f"\n\nWarning, mismatch between stated Error count ({result.errors}) " - f"and number of ERROR messages found in PRT ({len(error_messages)})." - f"\n\nTail ({tail_length} bytes) of PRT-file {self.prt_path}:\n\n" - ) + tail_textfile(self.prt_path, 5000) - - raise EclError( - "Eclipse simulation failed with:" - f"{result.errors:d} errors:\n\n{error_and_slave_msg}{extra_message}" - ) - - if result.bugs > 0: - raise EclError(f"Eclipse simulation failed with:{result.bugs:d} bugs") - - def readECLEND(self): - error_regexp = re.compile(r"^\s*Errors\s+(\d+)\s*$") - bug_regexp = re.compile(r"^\s*Bugs\s+(\d+)\s*$") - - report_file = os.path.join(self.run_path, f"{self.base_name}.ECLEND") - if not os.path.isfile(report_file): - report_file = self.prt_path - - errors = None - bugs = None - with open(report_file, encoding="utf-8") as filehandle: - for line in filehandle.readlines(): - error_match = re.match(error_regexp, line) - if error_match: - errors = int(error_match.group(1)) - - bug_match = re.match(bug_regexp, line) - if bug_match: - bugs = int(bug_match.group(1)) - if errors is None: - raise ValueError(f"Could not read errors from {report_file}") - if bugs is None: - raise ValueError(f"Could not read bugs from {report_file}") - - return EclipseResult(errors=errors, bugs=bugs) - - def parseErrors(self) -> list[str]: - """Extract multiline ERROR messages from the PRT file""" - error_list = [] - error_e100_regexp = re.compile(error_pattern_e100, re.MULTILINE) - error_e300_regexp = re.compile(error_pattern_e300, re.MULTILINE) - slave_started_regexp = re.compile(slave_started_pattern, re.MULTILINE) - with open(self.prt_path, encoding="utf-8") as filehandle: - content = filehandle.read() - - for regexp in [error_e100_regexp, error_e300_regexp, slave_started_regexp]: - offset = 0 - while True: - match = regexp.search(content[offset:]) - if match: - error_list.append( - content[offset + match.start() : offset + match.end()] - ) - offset += match.end() - else: - break - - return error_list - - -def run(config: EclConfig, argv): - parser = ArgumentParser() - parser.add_argument("ecl_case") - parser.add_argument("-v", "--version", dest="version", type=str) - parser.add_argument("-n", "--num-cpu", dest="num_cpu", type=int, default=1) - parser.add_argument( - "-i", "--ignore-errors", dest="ignore_errors", action="store_true" - ) - parser.add_argument( - "--summary-conversion", dest="summary_conversion", action="store_true" - ) - - options = parser.parse_args(argv) - - try: - eclrun_config = EclrunConfig(config, options.version) - if eclrun_config.can_use_eclrun(): - run = EclRun( - options.ecl_case, - None, - num_cpu=options.num_cpu, - check_status=not options.ignore_errors, - summary_conversion=options.summary_conversion, - ) - run.runEclipse(eclrun_config=eclrun_config) - else: - if options.num_cpu > 1: - sim = config.mpi_sim(version=options.version) - else: - sim = config.sim(version=options.version) - - run = EclRun( - options.ecl_case, - sim, - num_cpu=options.num_cpu, - check_status=not options.ignore_errors, - summary_conversion=options.summary_conversion, - ) - run.runEclipse() - except EclError as msg: - print(msg, file=sys.stderr) - sys.exit(-1) - - -def tail_textfile(file_path: Path, num_chars: int) -> str: - if not file_path.exists(): - return f"No output file {file_path}" - with open(file_path, encoding="utf-8") as file: - file.seek(0, 2) - file_end_position = file.tell() - seek_position = max(0, file_end_position - num_chars) - file.seek(seek_position) - return file.read()[-num_chars:] diff --git a/src/ert/resources/forward_models/res/script/flow.py b/src/ert/resources/forward_models/res/script/flow.py deleted file mode 100755 index 26f29245b50..00000000000 --- a/src/ert/resources/forward_models/res/script/flow.py +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env python -import sys - -from ecl_config import FlowConfig -from ecl_run import run - -if __name__ == "__main__": - config = FlowConfig() - run(config, [arg for arg in sys.argv[1:] if len(arg) > 0]) diff --git a/src/ert/resources/forward_models/run_reservoirsimulator.py b/src/ert/resources/forward_models/run_reservoirsimulator.py new file mode 100755 index 00000000000..f6781da5d07 --- /dev/null +++ b/src/ert/resources/forward_models/run_reservoirsimulator.py @@ -0,0 +1,430 @@ +#!/usr/bin/env python +import datetime +import glob +import os +import os.path +import re +import shutil +import subprocess +import sys +import time +from argparse import ArgumentParser +from collections import namedtuple +from pathlib import Path +from random import random +from typing import Literal, get_args + +import resfo + + +def ecl_output_has_license_error(ecl_output: str) -> bool: + return ( + "LICENSE ERROR" in ecl_output + or "LICENSE FAILURE" in ecl_output + or "not allowed in license" in ecl_output + ) + + +class EclError(RuntimeError): + def failed_due_to_license_problems(self) -> bool: + # self.args[0] contains the multiline ERROR messages and SLAVE startup messages + if ecl_output_has_license_error(self.args[0]): + return True + if re.search(a_slave_failed_pattern, self.args[0]): + for match in re.finditer(slave_run_paths, self.args[0], re.MULTILINE): + (ecl_case_starts_with, ecl_case_dir) = match.groups() + for prt_file in glob.glob( + f"{ecl_case_dir}/{ecl_case_starts_with}*.PRT" + ): + if ecl_output_has_license_error( + Path(prt_file).read_text(encoding="utf-8") + ): + return True + return False + + +Simulators = Literal["flow", "eclipse", "e300"] +EclipseResult = namedtuple("EclipseResult", "errors bugs") +body_sub_pattern = r"(\s^\s@.+$)*" +date_sub_pattern = r"\s+AT TIME\s+(?P\d+\.\d+)\s+DAYS\s+\((?P(.+)):\s*$" +error_pattern_e100 = ( + rf"^\s@-- ERROR\s(FROM PROCESSOR \d+)?{date_sub_pattern}${body_sub_pattern}" +) +error_pattern_e300 = rf"^\s@--Error${body_sub_pattern}" +slave_started_pattern = ( + rf"^\s@--MESSAGE{date_sub_pattern}\s^\s@\s+STARTING SLAVE.+${body_sub_pattern}" +) +a_slave_failed_pattern = r"\s@\s+SLAVE RUN.*HAS STOPPED WITH AN ERROR CONDITION.\s*" +slave_run_paths = r"^\s@\s+STARTING SLAVE\s+[^ ]+RUNNING \([^ ]\)\s*$" +slave_run_paths = r"\s@\s+STARTING SLAVE .* RUNNING (\w+)\s*^\s@\s+ON HOST.*IN DIRECTORY\s*^\s@\s+(.*)" + + +def find_unsmry(basepath: Path) -> Path | None: + def _is_unsmry(base: str, path: str) -> bool: + if "." not in path: + return False + splitted = path.split(".") + return splitted[-2].endswith(base) and splitted[-1].lower() in [ + "unsmry", + "funsmry", + ] + + base = basepath.name + candidates: list[str] = list( + filter(lambda x: _is_unsmry(base, x), glob.glob(str(basepath) + "*")) + ) + if not candidates: + return None + if len(candidates) > 1: + raise ValueError( + f"Ambiguous reference to unsmry in {basepath}, could be any of {candidates}" + ) + return Path(candidates[0]) + + +def await_completed_unsmry_file( + smry_path: Path, max_wait: float = 15, poll_interval: float = 1.0 +) -> float: + """This function will wait until the provided smry file does not grow in size + during one poll interval. + + Such a wait is sometimes needed when different MPI hosts write data to a shared + disk system. + + If the file does not exist or is completely unreadable to resfo, this function + will timeout to max_wait. If NOSIM is included, this will happen. + + Size is defined in terms of readable data elementes through resfo. + + This function will always wait for at least one poll interval, the polling + interval is specified in seconds. + + The return value is the waited time (in seconds)""" + start_time = datetime.datetime.now() + prev_len = 0 + while (datetime.datetime.now() - start_time).total_seconds() < max_wait: + try: + resfo_sum = [r.read_keyword() for r in resfo.lazy_read(smry_path)] + except Exception: + time.sleep(poll_interval) + continue + + current_len = len(resfo_sum) + if prev_len == current_len: + # smry file is regarded complete + break + else: + prev_len = max(prev_len, current_len) + + time.sleep(poll_interval) + + return (datetime.datetime.now() - start_time).total_seconds() + + +class RunReservoirSimulator: + """Wrapper class to run system installed `eclrun` or `flowrun`. + + Will initiate a limited number of reruns if license errors are detected, also + in coupled simulations (not relevant for Flow) + + By default, the class will check PRT/ECLEND files for errors after the run, and + raise exceptions accordingly. + + For eclrun, the request to do summary_conversion can be forwarded. + """ + + def __init__( + self, + simulator: Simulators, + version: str | None, + ecl_case: Path | str, + num_cpu: int = 1, + check_status: bool = True, + summary_conversion: bool = False, + ): + if simulator not in get_args(Simulators): + raise ValueError( + f"Unknown simulator '{simulator}', pick from {get_args(Simulators)}" + ) + self.simulator = simulator + self.version: str | None = version + + self.num_cpu: int = int(num_cpu) + self.check_status: bool = check_status + self.summary_conversion: bool = summary_conversion + + self.bypass_flowrun: bool = False + + runner_abspath: str | Path | None = None + if simulator in ["eclipse", "e300"]: + eclrun_path: str = os.environ.get("ECLRUN_PATH", "") + runner_abspath = shutil.which(Path(eclrun_path) / "eclrun") + if runner_abspath is None: + raise RuntimeError("eclrun not installed") + else: + flowrun_path: str = os.environ.get("FLOWRUN_PATH", "") + runner_abspath = shutil.which(Path(flowrun_path) / "flowrun") + if runner_abspath is None: + runner_abspath = shutil.which("flow") + if runner_abspath is None: + raise RuntimeError("flowrun or flow not installed") + else: + if self.num_cpu > 1: + raise RuntimeError( + "MPI runs not supported without a flowrun wrapper" + ) + self.bypass_flowrun = True + self.runner_abspath: str = str(runner_abspath) + + data_file = ecl_case_to_data_file(Path(ecl_case)) + if not Path(data_file).exists(): + raise OSError(f"No such file: {data_file}") + + self.run_path: Path = Path(data_file).parent.absolute() + self.data_file: str = Path(data_file).name + self.base_name: str = Path(data_file).stem + + @property + def prt_path(self) -> Path: + return self.run_path / (self.base_name + ".PRT") + + @property + def eclrun_command(self) -> list[str]: + return [ + self.runner_abspath, + self.simulator, + "--version", + str(self.version), + self.data_file, + "--summary-conversion", + "yes" if self.summary_conversion else "no", + ] + + @property + def flowrun_command(self) -> list[str]: + if self.bypass_flowrun: + return [ + self.runner_abspath, + self.data_file, + ] + return [ + self.runner_abspath, + "--version", + str(self.version), + self.data_file, + "--np", + str(self.num_cpu), + ] + + LICENSE_FAILURE_RETRY_INITIAL_SLEEP = 90 + LICENSE_RETRY_STAGGER_FACTOR = 60 + LICENSE_RETRY_BACKOFF_EXPONENT = 3 + + def run_eclipseX00( + self, retries_left: int = 3, backoff_sleep: float | None = None + ) -> None: + # This function calls itself recursively in case of license failures + backoff_sleep = ( + self.LICENSE_FAILURE_RETRY_INITIAL_SLEEP + if backoff_sleep is None + else backoff_sleep + ) + return_code = subprocess.run(self.eclrun_command, check=False).returncode + + OK_file = self.run_path / f"{self.base_name}.OK" + if not self.check_status: + OK_file.write_text( + "ECLIPSE simulation complete - NOT checked for errors.", + encoding="utf-8", + ) + else: + if return_code != 0: + raise subprocess.CalledProcessError(return_code, self.eclrun_command) + try: + self.assert_eclend() + except EclError as err: + if err.failed_due_to_license_problems() and retries_left > 0: + time_to_wait = backoff_sleep + int( + random() * self.LICENSE_RETRY_STAGGER_FACTOR + ) + sys.stderr.write( + "ECLIPSE failed due to license failure " + f"retrying in {time_to_wait} seconds\n" + ) + time.sleep(time_to_wait) + self.run_eclipseX00( + retries_left=retries_left - 1, + backoff_sleep=int( + backoff_sleep * self.LICENSE_RETRY_BACKOFF_EXPONENT + ), + ) + return + else: + raise err from None + if self.num_cpu > 1: + smry_file = find_unsmry(self.run_path / self.base_name) + if smry_file is not None: + await_completed_unsmry_file(smry_file) + + OK_file.write_text("ECLIPSE simulation OK", encoding="utf-8") + + def run_flow(self) -> None: + return_code = subprocess.run(self.flowrun_command, check=False).returncode + OK_file = self.run_path / f"{self.base_name}.OK" + if not self.check_status: + OK_file.write_text( + "FLOW simulation complete - NOT checked for errors.", + encoding="utf-8", + ) + else: + if return_code != 0: + raise subprocess.CalledProcessError(return_code, self.flowrun_command) + self.assert_eclend() + if self.num_cpu > 1: + smry_file = find_unsmry(self.run_path / self.base_name) + if smry_file is not None: + await_completed_unsmry_file(smry_file) + + OK_file.write_text("FLOW simulation OK", encoding="utf-8") + + def assert_eclend(self) -> None: + tail_length = 5000 + result = self.read_eclend() + if result.errors > 0: + error_list = self.parse_errors() + sep = "\n\n...\n\n" + error_and_slave_msg = sep.join(error_list) + extra_message = "" + error_messages = [ + error for error in error_list if not "STARTING SLAVE" in str(error) + ] + if result.errors != len(error_messages): + extra_message = ( + f"\n\nWarning, mismatch between stated Error count ({result.errors}) " + f"and number of ERROR messages found in PRT ({len(error_messages)})." + f"\n\nTail ({tail_length} bytes) of PRT-file {self.prt_path}:\n\n" + ) + tail_textfile(self.prt_path, 5000) + + raise EclError( + "Eclipse simulation failed with:" + f"{result.errors:d} errors:\n\n{error_and_slave_msg}{extra_message}" + ) + + if result.bugs > 0: + raise EclError(f"Eclipse simulation failed with:{result.bugs:d} bugs") + + def read_eclend(self) -> EclipseResult: + error_regexp = re.compile(r"^\s*Errors\s+(\d+)\s*$") + bug_regexp = re.compile(r"^\s*Bugs\s+(\d+)\s*$") + + report_file = self.run_path / f"{self.base_name}.ECLEND" + if not report_file.is_file(): + report_file = self.prt_path + + errors = None + bugs = None + with open(report_file, encoding="utf-8") as filehandle: + for line in filehandle.readlines(): + error_match = re.match(error_regexp, line) + if error_match: + errors = int(error_match.group(1)) + + bug_match = re.match(bug_regexp, line) + if bug_match: + bugs = int(bug_match.group(1)) + if errors is None: + raise ValueError(f"Could not read errors from {report_file}") + if bugs is None: + raise ValueError(f"Could not read bugs from {report_file}") + + return EclipseResult(errors=errors, bugs=bugs) + + def parse_errors(self) -> list[str]: + """Extract multiline ERROR messages from the PRT file""" + error_list = [] + error_e100_regexp = re.compile(error_pattern_e100, re.MULTILINE) + error_e300_regexp = re.compile(error_pattern_e300, re.MULTILINE) + slave_started_regexp = re.compile(slave_started_pattern, re.MULTILINE) + + content = self.prt_path.read_text(encoding="utf-8") + + for regexp in [error_e100_regexp, error_e300_regexp, slave_started_regexp]: + offset = 0 + while True: + match = regexp.search(content[offset:]) + if match: + error_list.append( + content[offset + match.start() : offset + match.end()] + ) + offset += match.end() + else: + break + + return error_list + + +def tail_textfile(file_path: Path, num_chars: int) -> str: + if not file_path.exists(): + return f"No output file {file_path}" + with open(file_path, encoding="utf-8") as file: + file.seek(0, 2) + file_end_position = file.tell() + seek_position = max(0, file_end_position - num_chars) + file.seek(seek_position) + return file.read()[-num_chars:] + + +def run_reservoirsimulator(args: list[str]) -> None: + parser = ArgumentParser() + parser.add_argument("simulator", type=str, choices=["flow", "eclipse", "e300"]) + parser.add_argument("version", type=str) + parser.add_argument("ecl_case", type=str) + parser.add_argument("-n", "--num-cpu", dest="num_cpu", type=int, default=1) + parser.add_argument( + "-i", "--ignore-errors", dest="ignore_errors", action="store_true" + ) + parser.add_argument( + "--summary-conversion", dest="summary_conversion", action="store_true" + ) + + options = parser.parse_args(args) + + if options.summary_conversion and options.simulator == "flow": + raise RuntimeError("--summary-conversion is not available with simulator flow") + + try: + if options.simulator in ["eclipse", "e300"]: + RunReservoirSimulator( + options.simulator, + options.version, + options.ecl_case, + num_cpu=options.num_cpu, + check_status=not options.ignore_errors, + summary_conversion=options.summary_conversion, + ).run_eclipseX00() + else: + RunReservoirSimulator( + "flow", + options.version, + options.ecl_case, + num_cpu=options.num_cpu, + check_status=not options.ignore_errors, + ).run_flow() + + except EclError as msg: + print(msg, file=sys.stderr) + sys.exit(-1) + + +def ecl_case_to_data_file(ecl_case: Path) -> Path: + if ecl_case.suffix in [".data", ".DATA"]: + return ecl_case + elif str(ecl_case).islower(): + return Path(str(ecl_case) + ".data") + else: + return Path(str(ecl_case) + ".DATA") + + +if __name__ == "__main__": + non_empty_args = list(filter(None, sys.argv)) + run_reservoirsimulator(non_empty_args[1:]) diff --git a/tests/ert/unit_tests/config/test_forward_model.py b/tests/ert/unit_tests/config/test_forward_model.py index 849a86c8362..fdf252ae28a 100644 --- a/tests/ert/unit_tests/config/test_forward_model.py +++ b/tests/ert/unit_tests/config/test_forward_model.py @@ -5,7 +5,6 @@ import stat from pathlib import Path from textwrap import dedent -from unittest.mock import patch import pytest from hypothesis import given, settings @@ -26,23 +25,6 @@ from .config_dict_generator import config_generators -@pytest.fixture() -def mock_eclrun(): - with open("eclrun", "w", encoding="utf-8") as f: - f.write("""#!/usr/bin/env python\n\nprint("4 2 8")""") - os.chmod("eclrun", os.stat("eclrun").st_mode | stat.S_IEXEC) - old_path = os.environ["PATH"] - eclrun_path = shutil.which("eclrun") - if eclrun_path is None: - os.environ["PATH"] = old_path + os.pathsep + os.getcwd() - else: - os.environ["PATH"] = old_path.replace( - str(Path(eclrun_path).parent), os.getcwd() - ) - yield - os.environ["PATH"] = old_path - - @pytest.mark.usefixtures("use_tmpdir") def test_load_forward_model_raises_on_missing(): with pytest.raises(ConfigValidationError, match="No such file or directory"): @@ -575,85 +557,71 @@ def test_that_forward_model_with_different_token_kinds_are_added(): ] == [("job", 0), ("job", 1)] -@pytest.mark.parametrize("eclipse_v", ["100", "300"]) -def test_that_eclipse_jobs_require_version_field(eclipse_v): +@pytest.mark.parametrize("eclipse_v", ["ECLIPSE100", "ECLIPSE300"]) +def test_that_eclipse_fm_step_require_explicit_version(eclipse_v): with pytest.raises( ConfigValidationError, - match=f".*Forward model step ECLIPSE{eclipse_v} must be given a VERSION argument.*", + match=f".*Forward model step {eclipse_v} must be given a VERSION argument.*", ): _ = ErtConfig.with_plugins().from_file_contents( f""" NUM_REALIZATIONS 1 - FORWARD_MODEL ECLIPSE{eclipse_v} + FORWARD_MODEL {eclipse_v} """ ) -@pytest.mark.parametrize("eclipse_v", ["100", "300"]) +@pytest.mark.skipif(shutil.which("eclrun") is None, reason="eclrun is not in $PATH") +@pytest.mark.parametrize("eclipse_v", ["ECLIPSE100", "ECLIPSE300"]) @pytest.mark.usefixtures("use_tmpdir") -def test_that_eclipse_jobs_check_version(eclipse_v, mock_eclrun): - ecl100_config_file_name = "ecl100_config.yml" - ecl300_config_file_name = "ecl300_config.yml" - - ecl100_config_content = f"eclrun_env:\n PATH: {os.getcwd()}\n" - ecl300_config_content = f"eclrun_env:\n PATH: {os.getcwd()}\n" - ert_config_contents = ( - f"NUM_REALIZATIONS 1\nFORWARD_MODEL ECLIPSE{eclipse_v} (=1)\n" +def test_that_eclipse_fm_step_check_version_availability(eclipse_v): + config_file_name = "test.ert" + Path(config_file_name).write_text( + f"NUM_REALIZATIONS 1\nFORWARD_MODEL {eclipse_v}(=dummy)\n", + encoding="utf-8", ) + with pytest.raises( + ConfigValidationError, + match=rf".*Unavailable {eclipse_v} version dummy. Available versions: \[\'20.*", + ): + ErtConfig.with_plugins().from_file(config_file_name) - # Write config file + +@pytest.mark.parametrize("eclipse_v", ["ECLIPSE100", "ECLIPSE300"]) +@pytest.mark.usefixtures("use_tmpdir") +def test_that_we_can_point_to_a_custom_eclrun_when_checking_versions(eclipse_v): + eclrun_bin = Path("bin/eclrun") + eclrun_bin.parent.mkdir() + eclrun_bin.write_text("#!/bin/sh\necho 2036.1 2036.2 2037.1", encoding="utf-8") + eclrun_bin.chmod(eclrun_bin.stat().st_mode | stat.S_IEXEC) config_file_name = "test.ert" - Path(config_file_name).write_text(ert_config_contents, encoding="utf-8") - # Write ecl100_config file - Path(ecl100_config_file_name).write_text(ecl100_config_content, encoding="utf-8") - # Write ecl300_config file - Path(ecl300_config_file_name).write_text(ecl300_config_content, encoding="utf-8") - with patch( - "ert.plugins.hook_implementations.forward_model_steps.ErtPluginManager" - ) as mock: - instance = mock.return_value - instance.get_ecl100_config_path.return_value = ecl100_config_file_name - instance.get_ecl300_config_path.return_value = ecl300_config_file_name - with pytest.raises( - ConfigValidationError, - match=rf".*Unavailable ECLIPSE{eclipse_v} version 1 current supported versions \['4', '2', '8'\].*", - ): - _ = ErtConfig.with_plugins().from_file(config_file_name) + Path(config_file_name).write_text( + dedent( + f""" + NUM_REALIZATIONS 1 + SETENV ECLRUN_PATH {eclrun_bin.absolute().parent} + FORWARD_MODEL {eclipse_v}(=2034.1)""" + ), + encoding="utf-8", + ) + with pytest.raises( + ConfigValidationError, + match=rf".*Unavailable {eclipse_v} version 2034.1. Available versions: \[\'2036.1.*", + ): + ErtConfig.with_plugins().from_file(config_file_name) -@pytest.mark.skipif(shutil.which("eclrun") is not None, reason="eclrun is available") -@pytest.mark.parametrize("eclipse_v", ["100", "300"]) +@pytest.mark.skipif(shutil.which("eclrun") is not None, reason="eclrun is present") +@pytest.mark.parametrize("eclipse_v", ["ECLIPSE100", "ECLIPSE300"]) @pytest.mark.usefixtures("use_tmpdir") def test_that_no_error_thrown_when_checking_eclipse_version_and_eclrun_is_not_present( eclipse_v, ): _ = ErtConfig.with_plugins().from_file_contents( - f"NUM_REALIZATIONS 1\nFORWARD_MODEL ECLIPSE{eclipse_v} (=1)\n" + f"NUM_REALIZATIONS 1\nFORWARD_MODEL {eclipse_v}(=1)\n" ) -@pytest.mark.skipif(shutil.which("eclrun") is not None, reason="eclrun is available") -@pytest.mark.parametrize("eclipse_v", ["100", "300"]) -@pytest.mark.usefixtures("use_tmpdir") -def test_that_no_error_thrown_when_checking_eclipse_version_and_no_ecl_config_defined( - eclipse_v, mock_eclrun -): - ert_config_contents = ( - f"NUM_REALIZATIONS 1\nFORWARD_MODEL ECLIPSE{eclipse_v} (=1)\n" - ) - - # Write config file - config_file_name = "test.ert" - Path(config_file_name).write_text(ert_config_contents, encoding="utf-8") - with patch( - "ert.plugins.hook_implementations.forward_model_steps.ErtPluginManager" - ) as mock: - instance = mock.return_value - instance.get_ecl100_config_path.return_value = None - instance.get_ecl300_config_path.return_value = None - _ = ErtConfig.with_plugins().from_file(config_file_name) - - def test_that_plugin_forward_models_are_installed(tmp_path): (tmp_path / "test.ert").write_text( dedent( @@ -900,7 +868,7 @@ def test_that_plugin_forward_model_raises_pre_experiment_validation_error_early( ): (tmp_path / "test.ert").write_text( """ - NUM_REALIZATIONS 1 + NUM_REALIZATIONS 1 FORWARD_MODEL FM1(=never,=world,=derpyderp) FORWARD_MODEL FM2 """ diff --git a/tests/ert/unit_tests/resources/test_ecl_versioning_config.py b/tests/ert/unit_tests/resources/test_ecl_versioning_config.py deleted file mode 100644 index 9bed502156a..00000000000 --- a/tests/ert/unit_tests/resources/test_ecl_versioning_config.py +++ /dev/null @@ -1,203 +0,0 @@ -import inspect -import os -import stat - -import pytest -import yaml - -from tests.ert.utils import SOURCE_DIR - -from ._import_from_location import import_from_location - -# import ecl_config and ecl_run.py from ert/forward_models/res/script -# package-data path which. These are kept out of the ert package to avoid the -# overhead of importing ert. This is necessary as these may be invoked as a -# subprocess on each realization. - - -ecl_config = import_from_location( - "ecl_config", - os.path.join( - SOURCE_DIR, "src/ert/resources/forward_models/res/script/ecl_config.py" - ), -) - -ecl_run = import_from_location( - "ecl_run", - os.path.join(SOURCE_DIR, "src/ert/resources/forward_models/res/script/ecl_run.py"), -) - - -@pytest.mark.usefixtures("use_tmpdir") -def test_loading_of_eclipse_configurations(monkeypatch): - source_file = inspect.getsourcefile(ecl_config.Ecl100Config) - assert source_file is not None - ecl_config_path = os.path.dirname(source_file) - monkeypatch.setenv("ECL100_SITE_CONFIG", "file/does/not/exist") - with pytest.raises(OSError): - conf = ecl_config.Ecl100Config() - - monkeypatch.setenv( - "ECL100_SITE_CONFIG", - os.path.join(ecl_config_path, "ecl100_config.yml"), - ) - conf = ecl_config.Ecl100Config() - with open("file.yml", "w", encoding="utf-8") as f: - f.write("this:\n -should\n-be\ninvalid:yaml?") - - monkeypatch.setenv("ECL100_SITE_CONFIG", "file.yml") - with pytest.raises(ValueError, match="Failed parse: file.yml as yaml"): - conf = ecl_config.Ecl100Config() - - scalar_exe = "bin/scalar_exe" - mpi_exe = "bin/mpi_exe" - mpi_run = "bin/mpi_run" - - os.mkdir("bin") - for f in ["scalar_exe", "mpi_exe", "mpi_run"]: - fname = os.path.join("bin", f) - with open(fname, "w", encoding="utf-8") as fh: - fh.write("This is an exectable ...") - - os.chmod(fname, stat.S_IEXEC) - - intel_path = "intel" - monkeypatch.setenv("ENV1", "A") - monkeypatch.setenv("ENV2", "C") - mocked_simulator_config = { - ecl_config.Keys.env: {"LICENSE_SERVER": "license@company.com"}, - ecl_config.Keys.versions: { - "2015": { - ecl_config.Keys.scalar: {ecl_config.Keys.executable: scalar_exe}, - ecl_config.Keys.mpi: { - ecl_config.Keys.executable: mpi_exe, - ecl_config.Keys.mpirun: mpi_run, - ecl_config.Keys.env: { - "I_MPI_ROOT": "$ENV1:B:$ENV2", - "TEST_VAR": "$ENV1.B.$ENV2 $UNKNOWN_VAR", - "P4_RSHCOMMAND": "", - "LD_LIBRARY_PATH": f"{intel_path}:$LD_LIBRARY_PATH", - "PATH": f"{intel_path}/bin64:$PATH", - }, - }, - }, - "2016": { - ecl_config.Keys.scalar: {ecl_config.Keys.executable: "/does/not/exist"}, - ecl_config.Keys.mpi: { - ecl_config.Keys.executable: "/does/not/exist", - ecl_config.Keys.mpirun: mpi_run, - }, - }, - "2017": { - ecl_config.Keys.mpi: { - ecl_config.Keys.executable: mpi_exe, - ecl_config.Keys.mpirun: "/does/not/exist", - } - }, - }, - } - - with open("file.yml", "w", encoding="utf-8") as filehandle: - filehandle.write(yaml.dump(mocked_simulator_config)) - - conf = ecl_config.Ecl100Config() - # Fails because there is no version 2020 - with pytest.raises(KeyError): - sim = conf.sim("2020") - - # Fails because the 2016 version points to a not existing executable - with pytest.raises(OSError): - sim = conf.sim("2016") - - # Fails because the 2016 mpi version points to a non existing mpi - # executable - with pytest.raises(OSError): - sim = conf.mpi_sim("2016") - - # Fails because the 2017 mpi version mpirun points to a non existing - # mpi executable - with pytest.raises(OSError): - sim = conf.mpi_sim("2017") - - # Fails because the 2017 scalar version is not registered - with pytest.raises(KeyError): - sim = conf.sim("2017") - - sim = conf.sim("2015") - mpi_sim = conf.mpi_sim("2015") - - # Check that global environment has been propagated down. - assert "LICENSE_SERVER" in mpi_sim.env - - # Check replacement of $ENV_VAR in values. - assert mpi_sim.env["I_MPI_ROOT"] == "A:B:C" - assert mpi_sim.env["TEST_VAR"] == "A.B.C $UNKNOWN_VAR" - assert len(mpi_sim.env) == 1 + 5 - - sim = conf.sim("2015") - assert sim.executable == scalar_exe - assert sim.mpirun is None - - with pytest.raises( - OSError, match="The executable: '/does/not/exist' can not be executed by user" - ): - simulators = conf.simulators() - - simulators = conf.simulators(strict=False) - assert len(simulators) == 2 - - -@pytest.mark.usefixtures("use_tmpdir") -def test_default_version_definitions(monkeypatch): - os.mkdir("bin") - scalar_exe = "bin/scalar_exe" - with open(scalar_exe, "w", encoding="utf-8") as fh: - fh.write("This is an executable ...") - os.chmod(scalar_exe, stat.S_IEXEC) - - mock_dict_0 = { - ecl_config.Keys.versions: { - "2015": {ecl_config.Keys.scalar: {ecl_config.Keys.executable: scalar_exe}}, - "2016": {ecl_config.Keys.scalar: {ecl_config.Keys.executable: scalar_exe}}, - } - } - - mock_dict_1 = { - ecl_config.Keys.default_version: "2015", - ecl_config.Keys.versions: { - "2015": {ecl_config.Keys.scalar: {ecl_config.Keys.executable: scalar_exe}}, - "2016": {ecl_config.Keys.scalar: {ecl_config.Keys.executable: scalar_exe}}, - }, - } - - monkeypatch.setenv("ECL100_SITE_CONFIG", os.path.join("file.yml")) - with open("file.yml", "w", encoding="utf-8") as f: - f.write(yaml.dump(mock_dict_1)) - - conf = ecl_config.Ecl100Config() - sim = conf.sim() - assert sim.version == "2015" - assert "2015" in conf - assert "xxxx" not in conf - assert ecl_config.Keys.default in conf - assert None in conf - - sim = conf.sim("default") - assert sim.version == "2015" - - with open("file.yml", "w", encoding="utf-8") as filehandle: - filehandle.write(yaml.dump(mock_dict_0)) - - conf = ecl_config.Ecl100Config() - assert ecl_config.Keys.default not in conf - assert conf.default_version is None - - with pytest.raises( - ValueError, match="The default version has not been set in the config file" - ): - sim = conf.sim() - - with pytest.raises( - ValueError, match="The default version has not been set in the config file" - ): - sim = conf.sim(ecl_config.Keys.default) diff --git a/tests/ert/unit_tests/resources/test_opm_flow.py b/tests/ert/unit_tests/resources/test_opm_flow.py deleted file mode 100644 index 9d0b125a58b..00000000000 --- a/tests/ert/unit_tests/resources/test_opm_flow.py +++ /dev/null @@ -1,333 +0,0 @@ -import os -import platform -import re -import shutil -from pathlib import Path -from subprocess import CalledProcessError - -import pytest -import yaml - -from tests.ert.utils import SOURCE_DIR - -from ._import_from_location import import_from_location - -# import ecl_config.py and ecl_run from ert/forward-models/res/script -# package-data path which. These are kept out of the ert package to avoid the -# overhead of importing ert. This is necessary as these may be invoked as a -# subprocess on each realization. - -ecl_config = import_from_location( - "ecl_config", - SOURCE_DIR / "src/ert/resources/forward_models/res/script/ecl_config.py", -) - -ecl_run = import_from_location( - "ecl_run", - SOURCE_DIR / "src/ert/resources/forward_models/res/script/ecl_run.py", -) - - -def locate_flow_binary() -> str: - """Locate the path for a flow executable. - - Returns the empty string if there is nothing to be found in $PATH.""" - flow_rhel_version = "7" - if "el8" in platform.release(): - flow_rhel_version = "8" - candidates = ["flow", f"/project/res/x86_64_RH_{flow_rhel_version}/bin/flowdaily"] - for candidate in candidates: - foundpath = shutil.which(candidate) - if foundpath is not None: - return foundpath - return "" - - -flow_installed = pytest.mark.skipif( - not locate_flow_binary(), reason="Requires flow to be installed in $PATH" -) - - -def find_version(output): - return re.search(r"flow\s*([\d.]+)", output).group(1) - - -@pytest.fixture(name="init_flow_config") -def fixture_init_flow_config(monkeypatch, tmpdir): - conf = { - "default_version": "default", - "versions": {"default": {"scalar": {"executable": locate_flow_binary()}}}, - } - with tmpdir.as_cwd(): - Path("flow_config.yml").write_text(yaml.dump(conf), encoding="utf-8") - monkeypatch.setenv("FLOW_SITE_CONFIG", "flow_config.yml") - yield - - -def test_ecl_run_make_LSB_MCPU_machine_list(): - assert ecl_run.make_LSB_MCPU_machine_list("host1 4 host2 4") == [ - "host1", - "host1", - "host1", - "host1", - "host2", - "host2", - "host2", - "host2", - ] - - -@pytest.mark.integration_test -@flow_installed -def test_flow(init_flow_config, source_root): - shutil.copy(source_root / "test-data/ert/eclipse/SPE1.DATA", "SPE1.DATA") - shutil.copy( - source_root / "test-data/ert/eclipse/SPE1_ERROR.DATA", "SPE1_ERROR.DATA" - ) - flow_config = ecl_config.FlowConfig() - sim = flow_config.sim() - flow_run = ecl_run.EclRun("SPE1.DATA", sim) - flow_run.runEclipse() - - ecl_run.run(flow_config, ["SPE1.DATA"]) - - flow_run = ecl_run.EclRun("SPE1_ERROR.DATA", sim) - with pytest.raises(CalledProcessError, match="returned non-zero exit status 1"): - flow_run.runEclipse() - - ecl_run.run(flow_config, ["SPE1_ERROR.DATA", "--ignore-errors"]) - - # Invalid version - with pytest.raises(KeyError): - ecl_run.run(flow_config, ["SPE1.DATA", "--version=no/such/version"]) - - -@pytest.mark.integration_test -@flow_installed -def test_flow_with_mpi(init_flow_config, source_root): - """This only tests that ERT will be able to start flow on a data deck with - the PARALLEL keyword present. It does not assert anything regarding whether - MPI-parallelization will get into play.""" - shutil.copy( - source_root / "test-data/ert/eclipse/SPE1_PARALLEL.DATA", "SPE1_PARALLEL.DATA" - ) - flow_config = ecl_config.FlowConfig() - sim = flow_config.sim() - flow_run = ecl_run.EclRun("SPE1_PARALLEL.DATA", sim) - flow_run.runEclipse() - - -@pytest.mark.usefixtures("use_tmpdir") -def test_running_flow_given_env_config_can_still_read_parent_env(monkeypatch): - version = "1111.11" - - # create a script that prints env vars ENV1 and ENV2 to a file - with open("mocked_flow", "w", encoding="utf-8") as f: - f.write("#!/bin/sh\n") - f.write("echo $ENV1 > out.txt\n") - f.write("echo $ENV2 >> out.txt\n") - executable = os.path.join(os.getcwd(), "mocked_flow") - os.chmod(executable, 0o777) - - # create a flow_config.yml with environment extension ENV2 - conf = { - "default_version": version, - "versions": { - version: { - "scalar": {"executable": executable, "env": {"ENV2": "VAL2"}}, - } - }, - } - - with open("flow_config.yml", "w", encoding="utf-8") as filehandle: - filehandle.write(yaml.dump(conf)) - - # set the environment variable ENV1 - monkeypatch.setenv("ENV1", "VAL1") - monkeypatch.setenv("FLOW_SITE_CONFIG", "flow_config.yml") - - with open("DUMMY.DATA", "w", encoding="utf-8") as filehandle: - filehandle.write("dummy") - - with open("DUMMY.PRT", "w", encoding="utf-8") as filehandle: - filehandle.write("Errors 0\n") - filehandle.write("Bugs 0\n") - - # run the script - flow_config = ecl_config.FlowConfig() - sim = flow_config.sim() - flow_run = ecl_run.EclRun("DUMMY.DATA", sim) - flow_run.runEclipse() - - # assert that the script was able to read both the variables correctly - with open("out.txt", encoding="utf-8") as filehandle: - lines = filehandle.readlines() - - assert lines == ["VAL1\n", "VAL2\n"] - - -@pytest.mark.usefixtures("use_tmpdir") -def test_running_flow_given_no_env_config_can_still_read_parent_env(monkeypatch): - version = "1111.11" - - # create a script that prints env vars ENV1 and ENV2 to a file - with open("flow", "w", encoding="utf-8") as f: - f.write("#!/bin/sh\n") - f.write("echo $ENV1 > out.txt\n") - f.write("echo $ENV2 >> out.txt\n") - executable = os.path.join(os.getcwd(), "flow") - os.chmod(executable, 0o777) - - # create a flow_config.yml with environment extension ENV2 - conf = { - "default_version": version, - "versions": { - version: { - "scalar": {"executable": executable}, - } - }, - } - - with open("flow_config.yml", "w", encoding="utf-8") as filehandle: - filehandle.write(yaml.dump(conf)) - - # set the environment variable ENV1 - monkeypatch.setenv("ENV1", "VAL1") - monkeypatch.setenv("ENV2", "VAL2") - monkeypatch.setenv("FLOW_SITE_CONFIG", "flow_config.yml") - - with open("DUMMY.DATA", "w", encoding="utf-8") as filehandle: - filehandle.write("dummy") - - with open("DUMMY.PRT", "w", encoding="utf-8") as filehandle: - filehandle.write("Errors 0\n") - filehandle.write("Bugs 0\n") - - # run the script - flow_config = ecl_config.FlowConfig() - sim = flow_config.sim() - flow_run = ecl_run.EclRun("DUMMY.DATA", sim) - flow_run.runEclipse() - - # assert that the script was able to read both the variables correctly - with open("out.txt", encoding="utf-8") as filehandle: - lines = filehandle.readlines() - - assert lines == ["VAL1\n", "VAL2\n"] - - -@pytest.mark.usefixtures("use_tmpdir") -def test_running_flow_given_env_variables_with_same_name_as_parent_env_variables_will_overwrite( - monkeypatch, -): - version = "1111.11" - - # create a script that prints env vars ENV1 and ENV2 to a file - with open("flow", "w", encoding="utf-8") as filehandle: - filehandle.write("#!/bin/sh\n") - filehandle.write("echo $ENV1 > out.txt\n") - filehandle.write("echo $ENV2 >> out.txt\n") - executable = os.path.join(os.getcwd(), "flow") - os.chmod(executable, 0o777) - - # create a flow_config.yml with environment extension ENV2 - conf = { - "default_version": version, - "versions": { - version: { - "scalar": { - "executable": executable, - "env": {"ENV1": "OVERWRITTEN1", "ENV2": "OVERWRITTEN2"}, - }, - } - }, - } - - with open("flow_config.yml", "w", encoding="utf-8") as filehandle: - filehandle.write(yaml.dump(conf)) - - # set the environment variable ENV1 - monkeypatch.setenv("ENV1", "VAL1") - monkeypatch.setenv("ENV2", "VAL2") - monkeypatch.setenv("FLOW_SITE_CONFIG", "flow_config.yml") - - with open("DUMMY.DATA", "w", encoding="utf-8") as filehandle: - filehandle.write("dummy") - - with open("DUMMY.PRT", "w", encoding="utf-8") as filehandle: - filehandle.write("Errors 0\n") - filehandle.write("Bugs 0\n") - - # run the script - flow_config = ecl_config.FlowConfig() - sim = flow_config.sim() - flow_run = ecl_run.EclRun("DUMMY.DATA", sim) - flow_run.runEclipse() - - # assert that the script was able to read both the variables correctly - with open("out.txt", encoding="utf-8") as filehandle: - lines = filehandle.readlines() - - assert lines == ["OVERWRITTEN1\n", "OVERWRITTEN2\n"] - - -def test_slurm_env_parsing(): - host_list = ecl_run.make_SLURM_machine_list("ws", "2") - assert host_list == ["ws", "ws"] - - host_list = ecl_run.make_SLURM_machine_list("ws1,ws2", "2,3") - assert host_list == ["ws1", "ws1", "ws2", "ws2", "ws2"] - - host_list = ecl_run.make_SLURM_machine_list("ws[1-3]", "1,2,3") - assert host_list == ["ws1", "ws2", "ws2", "ws3", "ws3", "ws3"] - - host_list = ecl_run.make_SLURM_machine_list("ws[1,3]", "1,3") - assert host_list == ["ws1", "ws3", "ws3", "ws3"] - - host_list = ecl_run.make_SLURM_machine_list("ws[1-3,6-8]", "1,2,3,1,2,3") - assert host_list == [ - "ws1", - "ws2", - "ws2", - "ws3", - "ws3", - "ws3", - "ws6", - "ws7", - "ws7", - "ws8", - "ws8", - "ws8", - ] - - host_list = ecl_run.make_SLURM_machine_list("ws[1-3,6-8]", "2(x2),3,1,2(x2)") - assert host_list == [ - "ws1", - "ws1", - "ws2", - "ws2", - "ws3", - "ws3", - "ws3", - "ws6", - "ws7", - "ws7", - "ws8", - "ws8", - ] - - host_list = ecl_run.make_SLURM_machine_list("ws[1-3,6],ws[7-8]", "2(x2),3,1,2(x2)") - assert host_list == [ - "ws1", - "ws1", - "ws2", - "ws2", - "ws3", - "ws3", - "ws3", - "ws6", - "ws7", - "ws7", - "ws8", - "ws8", - ] diff --git a/tests/ert/unit_tests/resources/test_ecl_run_new_config.py b/tests/ert/unit_tests/resources/test_run_eclipse_simulator.py similarity index 71% rename from tests/ert/unit_tests/resources/test_ecl_run_new_config.py rename to tests/ert/unit_tests/resources/test_run_eclipse_simulator.py index 7f2acde2b1e..a093ca1bb1d 100644 --- a/tests/ert/unit_tests/resources/test_ecl_run_new_config.py +++ b/tests/ert/unit_tests/resources/test_run_eclipse_simulator.py @@ -1,12 +1,8 @@ -import inspect -import json import os import re import shutil -import stat import subprocess import sys -import textwrap import threading import time from pathlib import Path @@ -15,148 +11,103 @@ import numpy as np import pytest import resfo -import yaml +from ert.plugins import ErtPluginManager from tests.ert.utils import SOURCE_DIR from ._import_from_location import import_from_location -# import ecl_config.py and ecl_run from ert/resources/forward_models/res/script -# package-data path which. These are kept out of the ert package to avoid the +# import run_reservoirsimulator from ert/resources/forward_models +# package-data. This is kept out of the ert package to avoid the # overhead of importing ert. This is necessary as these may be invoked as a # subprocess on each realization. -ecl_config = import_from_location( - "ecl_config", - os.path.join( - SOURCE_DIR, "src/ert/resources/forward_models/res/script/ecl_config.py" - ), -) - -ecl_run = import_from_location( - "ecl_run", - os.path.join(SOURCE_DIR, "src/ert/resources/forward_models/res/script/ecl_run.py"), +run_reservoirsimulator = import_from_location( + "run_reservoirsimulator", + SOURCE_DIR / "src/ert/resources/forward_models/run_reservoirsimulator.py", ) -def find_version(output): - return re.search(r"flow\s*([\d.]+)", output).group(1) - - -@pytest.fixture -def eclrun_conf(): - return { - "eclrun_env": { - "SLBSLS_LICENSE_FILE": "7321@eclipse-lic-no.statoil.no", - "ECLPATH": "/prog/res/ecl/grid", - "PATH": "/prog/res/ecl/grid/macros", - "F_UFMTENDIAN": "big", - "LSB_JOBID": None, - } - } - - -@pytest.fixture -def init_eclrun_config(tmp_path, monkeypatch, eclrun_conf): - with open(tmp_path / "ecl100_config.yml", "w", encoding="utf-8") as f: - f.write(yaml.dump(eclrun_conf)) - monkeypatch.setenv("ECL100_SITE_CONFIG", "ecl100_config.yml") - - -def test_get_version_raise(): - econfig = ecl_config.Ecl100Config() - class_file = inspect.getfile(ecl_config.Ecl100Config) - class_dir = os.path.dirname(os.path.abspath(class_file)) - msg = os.path.join(class_dir, "ecl100_config.yml") - with pytest.raises(ValueError, match=msg): - econfig._get_version(None) - - -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") -def test_eclrun_will_prepend_path_and_get_env_vars_from_ecl100config( - eclrun_conf, -): - # GIVEN a mocked eclrun that only dumps it env variables - Path("eclrun").write_text( - textwrap.dedent( - """\ - #!/usr/bin/env python - import os - import json - with open("env.json", "w") as f: - json.dump(dict(os.environ), f) - """ - ), - encoding="utf-8", - ) - os.chmod("eclrun", os.stat("eclrun").st_mode | stat.S_IEXEC) - Path("DUMMY.DATA").write_text("", encoding="utf-8") - econfig = ecl_config.Ecl100Config() - eclrun_config = ecl_config.EclrunConfig(econfig, "dummyversion") - erun = ecl_run.EclRun("DUMMY", None, check_status=False) - with mock.patch.object( - erun, "_get_run_command", mock.MagicMock(return_value="./eclrun") +@pytest.fixture(name="e100_env") +def e100_env(monkeypatch): + for var, value in ( + ErtPluginManager() + .get_forward_model_configuration() + .get("ECLIPSE100", {}) + .items() ): - # WHEN eclrun is run - erun.runEclipse(eclrun_config=eclrun_config) - - # THEN the env provided to eclrun is the same - # as the env from ecl_config, but PATH has been - # prepended with the value from ecl_config - with open("env.json", encoding="utf-8") as f: - run_env = json.load(f) - - expected_eclrun_env = eclrun_conf["eclrun_env"] - for key, value in expected_eclrun_env.items(): - if value is None: - assert key not in run_env - continue # Typically LSB_JOBID - - if key == "PATH": - assert run_env[key].startswith(value) - else: - assert value == run_env[key] + monkeypatch.setenv(var, value) + yield @pytest.mark.integration_test -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") @pytest.mark.requires_eclipse def test_ecl100_binary_can_produce_output(source_root): shutil.copy( source_root / "test-data/ert/eclipse/SPE1.DATA", "SPE1.DATA", ) - econfig = ecl_config.Ecl100Config() - erun = ecl_run.EclRun("SPE1.DATA", None) - erun.runEclipse(eclrun_config=ecl_config.EclrunConfig(econfig, "2019.3")) + erun = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "2019.3", "SPE1.DATA" + ) + erun.run_eclipseX00() - ok_path = Path(erun.runPath()) / f"{erun.baseName()}.OK" - prt_path = Path(erun.runPath()) / f"{erun.baseName()}.PRT" + ok_path = Path("SPE1.OK") + prt_path = Path("SPE1.PRT") assert ok_path.exists() assert prt_path.stat().st_size > 0 - assert len(erun.parseErrors()) == 0 + assert len(erun.parse_errors()) == 0 + + assert not Path("SPE1.h5").exists(), "HDF conversion should not be run by default" @pytest.mark.integration_test +@pytest.mark.usefixtures("use_tmpdir", "e100_env") @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") -def test_forward_model_cmd_line_api_works(source_root): - # ecl_run.run() is the forward model wrapper around ecl_run.runEclipse() +def test_ecl100_binary_can_handle_extra_dots_in_casename(source_root): + """There is code dealing with file extensions in the Eclipse runner + so it better be tested to work as expected.""" + shutil.copy( + source_root / "test-data/ert/eclipse/SPE1.DATA", + "SPE1.DOT.DATA", + ) + + erun = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "2019.3", "SPE1.DOT.DATA" + ) + erun.run_eclipseX00() + + ok_path = Path("SPE1.DOT.OK") + prt_path = Path("SPE1.DOT.PRT") + + assert ok_path.exists() + assert prt_path.stat().st_size > 0 + assert len(erun.parse_errors()) == 0 + + +@pytest.mark.integration_test +@pytest.mark.requires_eclipse +@pytest.mark.usefixtures("use_tmpdir", "e100_env") +def test_runeclrun_argparse_api(source_root): + # Todo: avoid actually running Eclipse here, use a mock + # Also test the other command line options. shutil.copy( source_root / "test-data/ert/eclipse/SPE1.DATA", "SPE1.DATA", ) - ecl_run.run(ecl_config.Ecl100Config(), ["SPE1.DATA", "--version=2019.3"]) + run_reservoirsimulator.run_reservoirsimulator(["eclipse", "2019.3", "SPE1.DATA"]) + assert Path("SPE1.OK").exists() @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") def test_eclrun_when_unsmry_is_ambiguous(source_root): shutil.copy( source_root / "test-data/ert/eclipse/SPE1.DATA", @@ -165,13 +116,13 @@ def test_eclrun_when_unsmry_is_ambiguous(source_root): # Mock files from another existing run Path("PREVIOUS_SPE1.SMSPEC").touch() Path("PREVIOUS_SPE1.UNSMRY").touch() - ecl_run.run(ecl_config.Ecl100Config(), ["SPE1.DATA", "--version=2019.3"]) + run_reservoirsimulator.run_reservoirsimulator(["eclipse", "2019.3", "SPE1.DATA"]) assert Path("SPE1.OK").exists() @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") def test_eclrun_when_unsmry_is_ambiguous_with_mpi(source_root): deck = (source_root / "test-data/ert/eclipse/SPE1.DATA").read_text(encoding="utf-8") deck = deck.replace("TITLE", "PARALLEL\n 2 /\n\nTITLE") @@ -179,75 +130,72 @@ def test_eclrun_when_unsmry_is_ambiguous_with_mpi(source_root): # Mock files from another existing run Path("PREVIOUS_SPE1.SMSPEC").touch() Path("PREVIOUS_SPE1.UNSMRY").touch() - ecl_run.run( - ecl_config.Ecl100Config(), ["SPE1.DATA", "--version=2019.3", "--num-cpu=2"] + run_reservoirsimulator.run_reservoirsimulator( + ["eclipse", "2019.3", "SPE1.DATA", "--num-cpu=2"] ) assert Path("SPE1.OK").exists() @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") def test_ecl_run_on_parallel_deck(source_root): deck = (source_root / "test-data/ert/eclipse/SPE1.DATA").read_text(encoding="utf-8") deck = deck.replace("TITLE", "PARALLEL\n 2 /\n\nTITLE") Path("SPE1.DATA").write_text(deck, encoding="utf-8") - ecl_run.run( - ecl_config.Ecl100Config(), ["SPE1.DATA", "--version=2019.3", "--num-cpu=2"] + run_reservoirsimulator.run_reservoirsimulator( + ["eclipse", "2019.3", "SPE1.DATA", "--num-cpu=2"] ) assert Path("SPE1.OK").exists() @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") def test_eclrun_on_nosim(source_root): deck = (source_root / "test-data/ert/eclipse/SPE1.DATA").read_text(encoding="utf-8") deck = deck.replace("TITLE", "NOSIM\n\nTITLE") Path("SPE1.DATA").write_text(deck, encoding="utf-8") - ecl_run.run(ecl_config.Ecl100Config(), ["SPE1.DATA", "--version=2019.3"]) + run_reservoirsimulator.run_reservoirsimulator(["eclipse", "2019.3", "SPE1.DATA"]) assert Path("SPE1.OK").exists() assert not Path("SPE1.UNSMRY").exists() @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") def test_eclrun_on_nosim_with_existing_unsmry_file(source_root): """This emulates users rerunning Eclipse in an existing runpath""" deck = (source_root / "test-data/ert/eclipse/SPE1.DATA").read_text(encoding="utf-8") deck = deck.replace("TITLE", "NOSIM\n\nTITLE") Path("SPE1.UNSMRY").write_text("", encoding="utf-8") Path("SPE1.DATA").write_text(deck, encoding="utf-8") - ecl_run.run(ecl_config.Ecl100Config(), ["SPE1.DATA", "--version=2019.3"]) + run_reservoirsimulator.run_reservoirsimulator(["eclipse", "2019.3", "SPE1.DATA"]) assert Path("SPE1.OK").exists() -@pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") -def test_await_completed_summary_file_times_out_on_nosim_with_mpi(source_root): - minimum_duration = 15 # This is max_wait in the await function tested +@pytest.mark.usefixtures("use_tmpdir", "e100_env") +def test_await_completed_summary_file_does_not_time_out_on_nosim_with_mpi(source_root): deck = (source_root / "test-data/ert/eclipse/SPE1.DATA").read_text(encoding="utf-8") deck = deck.replace("TITLE", "NOSIM\n\nPARALLEL\n 2 /\n\nTITLE") Path("SPE1.DATA").write_text(deck, encoding="utf-8") - start_time = time.time() - ecl_run.run( - ecl_config.Ecl100Config(), ["SPE1.DATA", "--version=2019.3", "--num-cpu=2"] + run_reservoirsimulator.run_reservoirsimulator( + ["eclipse", "2019.3", "SPE1.DATA", "--num-cpu=2"] ) - end_time = time.time() assert Path("SPE1.OK").exists() assert not Path( "SPE1.UNSMRY" ).exists(), "A nosim run should not produce an unsmry file" - assert ( - end_time - start_time > minimum_duration - ), "timeout in await_completed not triggered" + # The timeout will not happen since find_unsmry() returns None. + + # There is no assert on runtime because we cannot predict how long the Eclipse license + # checkout takes. @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") def test_eclrun_on_nosim_with_mpi_and_existing_unsmry_file(source_root): """This emulates users rerunning Eclipse in an existing runpath, with MPI. @@ -258,8 +206,8 @@ def test_eclrun_on_nosim_with_mpi_and_existing_unsmry_file(source_root): deck = deck.replace("TITLE", "NOSIM\n\nPARALLEL\n 2 /\n\nTITLE") Path("SPE1.UNSMRY").write_text("", encoding="utf-8") Path("SPE1.DATA").write_text(deck, encoding="utf-8") - ecl_run.run( - ecl_config.Ecl100Config(), ["SPE1.DATA", "--version=2019.3", "--num-cpu=2"] + run_reservoirsimulator.run_reservoirsimulator( + ["eclipse", "2019.3", "SPE1.DATA", "--num-cpu=2"] ) # There is no assert on runtime because we cannot predict how long the Eclipse license # checkout takes, otherwise we should assert that there is no await for unsmry completion. @@ -268,77 +216,71 @@ def test_eclrun_on_nosim_with_mpi_and_existing_unsmry_file(source_root): @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") def test_eclrun_will_raise_on_deck_errors(source_root): shutil.copy( source_root / "test-data/ert/eclipse/SPE1_ERROR.DATA", "SPE1_ERROR.DATA", ) - econfig = ecl_config.Ecl100Config() - eclrun_config = ecl_config.EclrunConfig(econfig, "2019.3") - erun = ecl_run.EclRun("SPE1_ERROR", None) + erun = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "2019.3", "SPE1_ERROR" + ) with pytest.raises(Exception, match="ERROR"): - erun.runEclipse(eclrun_config=eclrun_config) + erun.run_eclipseX00() @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") -def test_failed_run_nonzero_returncode(monkeypatch): - Path("FOO.DATA").write_text("", encoding="utf-8") - econfig = ecl_config.Ecl100Config() - eclrun_config = ecl_config.EclrunConfig(econfig, "2021.3") - erun = ecl_run.EclRun("FOO.DATA", None) - monkeypatch.setattr("ecl_run.EclRun.execEclipse", mock.MagicMock(return_value=1)) +@pytest.mark.usefixtures("use_tmpdir", "e100_env") +def test_failed_run_gives_nonzero_returncode_and_exception(monkeypatch): + deck = Path("MOCKED_DECK.DATA") + deck.touch() + erun = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "dummy_version", deck.name + ) + return_value_with_code = mock.MagicMock() + return_value_with_code.returncode = 1 + monkeypatch.setattr( + "subprocess.run", mock.MagicMock(return_value=return_value_with_code) + ) with pytest.raises( # The return code 1 is sometimes translated to 255. subprocess.CalledProcessError, match=r"Command .*eclrun.* non-zero exit status (1|255)\.$", ): - erun.runEclipse(eclrun_config=eclrun_config) + erun.run_eclipseX00() @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") def test_deck_errors_can_be_ignored(source_root): shutil.copy( source_root / "test-data/ert/eclipse/SPE1_ERROR.DATA", "SPE1_ERROR.DATA", ) - econfig = ecl_config.Ecl100Config() - ecl_run.run(econfig, ["SPE1_ERROR", "--version=2019.3", "--ignore-errors"]) - - -@pytest.mark.integration_test -@pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") -def test_no_hdf5_output_by_default_with_ecl100(source_root): - shutil.copy( - source_root / "test-data/ert/eclipse/SPE1.DATA", - "SPE1.DATA", + run_reservoirsimulator.run_reservoirsimulator( + ["eclipse", "2019.3", "SPE1_ERROR.DATA", "--ignore-errors"] ) - econfig = ecl_config.Ecl100Config() - ecl_run.run(econfig, ["SPE1.DATA", "--version=2019.3"]) - assert not Path("SPE1.h5").exists() @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") def test_flag_needed_to_produce_hdf5_output_with_ecl100(source_root): shutil.copy( source_root / "test-data/ert/eclipse/SPE1.DATA", "SPE1.DATA", ) - econfig = ecl_config.Ecl100Config() - ecl_run.run(econfig, ["SPE1.DATA", "--version=2019.3", "--summary-conversion"]) + run_reservoirsimulator.run_reservoirsimulator( + ["eclipse", "2019.3", "SPE1.DATA", "--summary-conversion"] + ) assert Path("SPE1.h5").exists() @pytest.mark.integration_test @pytest.mark.requires_eclipse -@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config") +@pytest.mark.usefixtures("use_tmpdir", "e100_env") def test_mpi_run_is_managed_by_system_tool(source_root): shutil.copy( source_root / "test-data/ert/eclipse/SPE1_PARALLEL.DATA", @@ -347,8 +289,9 @@ def test_mpi_run_is_managed_by_system_tool(source_root): assert re.findall( r"PARALLEL\s+2", Path("SPE1_PARALLEL.DATA").read_text(encoding="utf-8") ), "Test requires a deck needing 2 CPUs" - econfig = ecl_config.Ecl100Config() - ecl_run.run(econfig, ["SPE1_PARALLEL.DATA", "--version=2019.3"]) + run_reservoirsimulator.run_reservoirsimulator( + ["eclipse", "2019.3", "SPE1_PARALLEL.DATA"] + ) assert Path("SPE1_PARALLEL.PRT").stat().st_size > 0, "Eclipse did not run at all" assert Path("SPE1_PARALLEL.MSG").exists(), "No output from MPI process 1" @@ -387,17 +330,18 @@ def test_find_unsmry(paths_to_touch, basepath, expectation): Path(path).touch() if expectation == "ValueError": with pytest.raises(ValueError): - ecl_run.find_unsmry(Path(basepath)) + run_reservoirsimulator.find_unsmry(Path(basepath)) elif expectation is None: - assert ecl_run.find_unsmry(Path(basepath)) is None + assert run_reservoirsimulator.find_unsmry(Path(basepath)) is None else: - assert str(ecl_run.find_unsmry(Path(basepath))) == expectation + assert str(run_reservoirsimulator.find_unsmry(Path(basepath))) == expectation +@pytest.mark.usefixtures("use_tmpdir") def test_await_completed_summary_file_will_timeout_on_missing_smry(): assert ( # Expected wait time is 0.3 - ecl_run.await_completed_unsmry_file( + run_reservoirsimulator.await_completed_unsmry_file( "SPE1.UNSMRY", max_wait=0.3, poll_interval=0.1 ) > 0.3 @@ -410,7 +354,7 @@ def test_await_completed_summary_file_will_return_asap(): assert ( 0.01 # Expected wait time is the poll_interval - < ecl_run.await_completed_unsmry_file( + < run_reservoirsimulator.await_completed_unsmry_file( "FOO.UNSMRY", max_wait=0.5, poll_interval=0.1 ) < 0.4 @@ -441,7 +385,7 @@ def slow_smry_writer(): assert ( 0.5 # Minimal wait time is around 0.55 - < ecl_run.await_completed_unsmry_file( + < run_reservoirsimulator.await_completed_unsmry_file( "FOO.UNSMRY", max_wait=4, poll_interval=0.21 ) < 2 @@ -450,6 +394,7 @@ def slow_smry_writer(): @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse def test_ecl100_license_error_is_caught(): prt_error = """\ @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): @@ -472,13 +417,16 @@ def test_ecl100_license_error_is_caught(): Path("FOO.ECLEND").write_text(eclend, encoding="utf-8") Path("FOO.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("FOO.DATA", "dummysimulatorobject") - with pytest.raises(ecl_run.EclError) as exception_info: - run.assertECLEND() + run = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "dummyversion", "FOO.DATA" + ) + with pytest.raises(run_reservoirsimulator.EclError) as exception_info: + run.assert_eclend() assert exception_info.value.failed_due_to_license_problems() @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse def test_ecl100_crash_is_not_mistaken_as_license_trouble(): prt_error = """\ @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): @@ -499,13 +447,16 @@ def test_ecl100_crash_is_not_mistaken_as_license_trouble(): Path("FOO.ECLEND").write_text(eclend, encoding="utf-8") Path("FOO.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("FOO.DATA", "dummysimulatorobject") - with pytest.raises(ecl_run.EclError) as exception_info: - run.assertECLEND() + run = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "dummyversion", "FOO.DATA" + ) + with pytest.raises(run_reservoirsimulator.EclError) as exception_info: + run.assert_eclend() assert not exception_info.value.failed_due_to_license_problems() @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse def test_ecl300_license_error_is_caught(): prt_error = """\ @--Message:The message service has been activated @@ -534,13 +485,16 @@ def test_ecl300_license_error_is_caught(): Path("FOO.ECLEND").write_text(eclend, encoding="utf-8") Path("FOO.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("FOO.DATA", "dummysimulatorobject") - with pytest.raises(ecl_run.EclError) as exception_info: - run.assertECLEND() + run = run_reservoirsimulator.RunReservoirSimulator( + "e300", "dummyversion", "FOO.DATA" + ) + with pytest.raises(run_reservoirsimulator.EclError) as exception_info: + run.assert_eclend() assert exception_info.value.failed_due_to_license_problems() @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse def test_ecl300_crash_is_not_mistaken_as_license_trouble(): prt_error = """\ @--Message:The message service has been activated @@ -566,13 +520,16 @@ def test_ecl300_crash_is_not_mistaken_as_license_trouble(): Path("FOO.ECLEND").write_text(eclend, encoding="utf-8") Path("FOO.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("FOO.DATA", "dummysimulatorobject") - with pytest.raises(ecl_run.EclError) as exception_info: - run.assertECLEND() + run = run_reservoirsimulator.RunReservoirSimulator( + "e300", "dummyversion", "FOO.DATA" + ) + with pytest.raises(run_reservoirsimulator.EclError) as exception_info: + run.assert_eclend() assert not exception_info.value.failed_due_to_license_problems() @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse def test_license_error_in_slave_is_caught(): """If a coupled Eclipse model fails in one of the slave runs due to license issues, there is no trace of licence in the master PRT file. @@ -626,13 +583,16 @@ def test_license_error_in_slave_is_caught(): Path("slave2/EIGHTCELLS_SLAVE.PRT").write_text(slave_prt_error, encoding="utf-8") Path("EIGHTCELLS_MASTER.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("EIGHTCELLS_MASTER.DATA", "dummysimulatorobject") - with pytest.raises(ecl_run.EclError) as exception_info: - run.assertECLEND() + run = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "dummyversion", "EIGHTCELLS_MASTER.DATA" + ) + with pytest.raises(run_reservoirsimulator.EclError) as exception_info: + run.assert_eclend() assert exception_info.value.failed_due_to_license_problems() @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse def test_crash_in_slave_is_not_mistaken_as_license(): Path("slave1").mkdir() Path("slave2").mkdir() @@ -675,13 +635,16 @@ def test_crash_in_slave_is_not_mistaken_as_license(): Path("slave2/EIGHTCELLS_SLAVE.PRT").write_text(slave_prt_error, encoding="utf-8") Path("EIGHTCELLS_MASTER.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("EIGHTCELLS_MASTER.DATA", "dummysimulatorobject") - with pytest.raises(ecl_run.EclError) as exception_info: - run.assertECLEND() + run = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "dummyversion", "EIGHTCELLS_MASTER.DATA" + ) + with pytest.raises(run_reservoirsimulator.EclError) as exception_info: + run.assert_eclend() assert not exception_info.value.failed_due_to_license_problems() @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse def test_too_few_parsed_error_messages_gives_warning(): prt_error = """\ @--MESSAGE AT TIME 0.0 DAYS ( 1-JAN-2000): @@ -699,13 +662,16 @@ def test_too_few_parsed_error_messages_gives_warning(): Path("ECLCASE.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("ECLCASE.DATA", "dummysimulatorobject") - with pytest.raises(ecl_run.EclError) as exception_info: - run.assertECLEND() + run = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "dummyversion", "ECLCASE.DATA" + ) + with pytest.raises(run_reservoirsimulator.EclError) as exception_info: + run.assert_eclend() assert "Warning, mismatch between stated Error count" in str(exception_info.value) @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse def test_tail_of_prt_file_is_included_when_error_count_inconsistency(): prt_error = ( "this_should_not_be_included " @@ -729,14 +695,17 @@ def test_tail_of_prt_file_is_included_when_error_count_inconsistency(): Path("ECLCASE.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("ECLCASE.DATA", "dummysimulatorobject") - with pytest.raises(ecl_run.EclError) as exception_info: - run.assertECLEND() + run = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "dummyversion", "ECLCASE.DATA" + ) + with pytest.raises(run_reservoirsimulator.EclError) as exception_info: + run.assert_eclend() assert "this_should_be_included" in str(exception_info.value) assert "this_should_not_be_included" not in str(exception_info.value) @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse def test_correct_number_of_parsed_error_messages_gives_no_warning(): prt_error = """\ @-- ERROR AT TIME 0.0 DAYS ( 1-JAN-2000): @@ -754,15 +723,18 @@ def test_correct_number_of_parsed_error_messages_gives_no_warning(): Path("ECLCASE.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("ECLCASE.DATA", "dummysimulatorobject") - with pytest.raises(ecl_run.EclError) as exception_info: - run.assertECLEND() + run = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "dummyversion", "ECLCASE.DATA" + ) + with pytest.raises(run_reservoirsimulator.EclError) as exception_info: + run.assert_eclend() assert "Warning, mismatch between stated Error count" not in str( exception_info.value ) @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse def test_slave_started_message_are_not_counted_as_errors(): prt_error = f"""\ @-- ERROR AT TIME 0.0 DAYS ( 1-JAN-2000): @@ -785,9 +757,11 @@ def test_slave_started_message_are_not_counted_as_errors(): Path("ECLCASE.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("ECLCASE.DATA", "dummysimulatorobject") - with pytest.raises(ecl_run.EclError) as exception_info: - run.assertECLEND() + run = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "dummyversion", "ECLCASE.DATA" + ) + with pytest.raises(run_reservoirsimulator.EclError) as exception_info: + run.assert_eclend() assert "Warning, mismatch between stated Error count" not in str( exception_info.value ) @@ -815,6 +789,7 @@ def test_slave_started_message_are_not_counted_as_errors(): @pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.requires_eclipse @pytest.mark.parametrize( "prt_error, expected_error_list", [ @@ -855,6 +830,8 @@ def test_can_parse_errors(prt_error, expected_error_list): Path("ECLCASE.DATA").write_text("", encoding="utf-8") - run = ecl_run.EclRun("ECLCASE.DATA", "dummysimulatorobject") - error_list = run.parseErrors() + run = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "dummyversion", "ECLCASE.DATA" + ) + error_list = run.parse_errors() assert error_list == expected_error_list diff --git a/tests/ert/unit_tests/resources/test_run_flow_simulator.py b/tests/ert/unit_tests/resources/test_run_flow_simulator.py new file mode 100644 index 00000000000..181fc1f8013 --- /dev/null +++ b/tests/ert/unit_tests/resources/test_run_flow_simulator.py @@ -0,0 +1,126 @@ +import os +import shutil +import stat +from pathlib import Path +from subprocess import CalledProcessError + +import pytest + +from tests.ert.utils import SOURCE_DIR + +from ._import_from_location import import_from_location + +# import code from ert/forward_models package-data path. +# These are kept out of the ert package to avoid the overhead of +# importing ert. This is necessary as these may be invoked as a subprocess on +# each realization. + +run_reservoirsimulator = import_from_location( + "run_reservoirsimulator", + SOURCE_DIR / "src/ert/resources/forward_models/run_reservoirsimulator.py", +) + +FLOW_VERSION = "daily" + + +@pytest.mark.integration_test +@pytest.mark.skipif(not shutil.which("flowrun"), reason="flowrun not available") +def test_flow_can_produce_output(source_root): + shutil.copy(source_root / "test-data/ert/eclipse/SPE1.DATA", "SPE1.DATA") + run_reservoirsimulator.RunReservoirSimulator( + "flow", FLOW_VERSION, "SPE1.DATA" + ).run_flow() + assert Path("SPE1.UNSMRY").exists() + + +def test_flowrun_can_be_bypassed_when_flow_is_available(tmp_path, monkeypatch): + # Set FLOWRUN_PATH to a path guaranteed not to contain flowrun + monkeypatch.setenv("FLOWRUN_PATH", str(tmp_path)) + # Add a mocked flow to PATH + monkeypatch.setenv("PATH", f"{tmp_path}:{os.environ['PATH']}") + mocked_flow = Path(tmp_path / "flow") + mocked_flow.write_text("", encoding="utf-8") + mocked_flow.chmod(mocked_flow.stat().st_mode | stat.S_IEXEC) + (tmp_path / "DUMMY.DATA").write_text("", encoding="utf-8") + runner = run_reservoirsimulator.RunReservoirSimulator( + "flow", None, str(tmp_path / "DUMMY.DATA") + ) + assert runner.bypass_flowrun is True + + +def test_flowrun_cannot_be_bypassed_for_parallel_runs(tmp_path, monkeypatch): + # Set FLOWRUN_PATH to a path guaranteed not to contain flowrun + monkeypatch.setenv("FLOWRUN_PATH", str(tmp_path)) + # Add a mocked flow to PATH + monkeypatch.setenv("PATH", f"{tmp_path}:{os.environ['PATH']}") + mocked_flow = Path(tmp_path / "flow") + mocked_flow.write_text("", encoding="utf-8") + mocked_flow.chmod(mocked_flow.stat().st_mode | stat.S_IEXEC) + + with pytest.raises( + RuntimeError, match="MPI runs not supported without a flowrun wrapper" + ): + run_reservoirsimulator.RunReservoirSimulator( + "flow", None, "DUMMY.DATA", num_cpu=2 + ) + + +@pytest.mark.integration_test +@pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.skipif(not shutil.which("flow"), reason="flow not available") +def test_run_flow_with_no_flowrun(tmp_path, monkeypatch, source_root): + # Set FLOWRUN_PATH to a path guaranteed not to contain flowrun + monkeypatch.setenv("FLOWRUN_PATH", str(tmp_path)) + shutil.copy(source_root / "test-data/ert/eclipse/SPE1.DATA", "SPE1.DATA") + run_reservoirsimulator.RunReservoirSimulator("flow", None, "SPE1.DATA").run_flow() + assert Path("SPE1.UNSMRY").exists() + + +@pytest.mark.integration_test +@pytest.mark.skipif(not shutil.which("flowrun"), reason="flowrun not available") +def test_flowrunner_will_raise_when_flow_fails(source_root): + shutil.copy( + source_root / "test-data/ert/eclipse/SPE1_ERROR.DATA", "SPE1_ERROR.DATA" + ) + with pytest.raises(CalledProcessError, match="returned non-zero exit status 1"): + run_reservoirsimulator.RunReservoirSimulator( + "flow", FLOW_VERSION, "SPE1_ERROR.DATA" + ).run_flow() + + +@pytest.mark.integration_test +@pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.skipif(not shutil.which("flowrun"), reason="flowrun not available") +def test_flowrunner_will_can_ignore_flow_errors(source_root): + shutil.copy( + source_root / "test-data/ert/eclipse/SPE1_ERROR.DATA", "SPE1_ERROR.DATA" + ) + run_reservoirsimulator.RunReservoirSimulator( + "flow", FLOW_VERSION, "SPE1_ERROR.DATA", check_status=False + ).run_flow() + + +@pytest.mark.integration_test +@pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.skipif(not shutil.which("flowrun"), reason="flowrun not available") +def test_flowrunner_will_raise_on_unknown_version(): + Path("DUMMY.DATA").touch() + with pytest.raises(CalledProcessError): + run_reservoirsimulator.RunReservoirSimulator( + "flow", "garbled_version", "DUMMY.DATA" + ).run_flow() + + +@pytest.mark.integration_test +@pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.skipif(not shutil.which("flowrun"), reason="flowrun not available") +def test_flow_with_parallel_keyword(source_root): + """This only tests that ERT will be able to start flow on a data deck with + the PARALLEL keyword present. It does not assert anything regarding whether + MPI-parallelization will get into play.""" + shutil.copy( + source_root / "test-data/ert/eclipse/SPE1_PARALLEL.DATA", "SPE1_PARALLEL.DATA" + ) + run_reservoirsimulator.RunReservoirSimulator( + "flow", FLOW_VERSION, "SPE1_PARALLEL.DATA" + ).run_flow() diff --git a/tests/ert/unit_tests/resources/test_run_reservoirsimulator.py b/tests/ert/unit_tests/resources/test_run_reservoirsimulator.py new file mode 100644 index 00000000000..717e894a52c --- /dev/null +++ b/tests/ert/unit_tests/resources/test_run_reservoirsimulator.py @@ -0,0 +1,158 @@ +import os +import stat +import sys +import threading +import time +from pathlib import Path + +import numpy as np +import pytest +import resfo + +from tests.ert.utils import SOURCE_DIR + +from ._import_from_location import import_from_location + +# import run_reservoirsimulator from ert/resources/forward_models +# package-data. This is kept out of the ert package to avoid the +# overhead of importing ert. This is necessary as these may be invoked as a +# subprocess on each realization. + + +run_reservoirsimulator = import_from_location( + "run_reservoirsimulator", + SOURCE_DIR / "src/ert/resources/forward_models/run_reservoirsimulator.py", +) +ecl_case_to_data_file = import_from_location( + "ecl_case_to_data_file", + SOURCE_DIR / "src/ert/resources/forward_models/run_reservoirsimulator.py", +) + + +@pytest.fixture(name="mocked_eclrun") +def fixture_mocked_eclrun(use_tmpdir, monkeypatch): + """This puts a eclrun binary in path that cannot do anything.""" + eclrun_bin = Path("bin/eclrun") + eclrun_bin.parent.mkdir() + eclrun_bin.write_text("", encoding="utf-8") + eclrun_bin.chmod(eclrun_bin.stat().st_mode | stat.S_IEXEC) + monkeypatch.setenv("PATH", f"bin:{os.environ['PATH']}") + + +def test_unknown_simulator(): + with pytest.raises(ValueError, match="Unknown simulator"): + run_reservoirsimulator.RunReservoirSimulator( + "bogus_flow", "mocked_version", "bogus_deck.DATA" + ) + + +@pytest.mark.usefixtures("mocked_eclrun") +def test_runner_fails_on_missing_data_file(): + with pytest.raises(OSError, match="No such file: NOTEXISTING.DATA"): + run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "mocked_version", "NOTEXISTING.DATA" + ) + + +@pytest.mark.usefixtures("mocked_eclrun") +def test_runner_can_find_deck_without_extension(): + Path("DECK.DATA").write_text("FOO", encoding="utf-8") + runner = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "mocked_version", "DECK" + ) + assert runner.data_file == "DECK.DATA" + + +@pytest.mark.usefixtures("mocked_eclrun") +def test_runner_can_find_lowercase_deck_without_extension(): + Path("deck.data").write_text("FOO", encoding="utf-8") + runner = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "mocked_version", "deck" + ) + assert runner.data_file == "deck.data" + + +@pytest.mark.skipif( + sys.platform.startswith("darwin"), reason="Case insensitive filesystem on MacOS" +) +@pytest.mark.usefixtures("mocked_eclrun") +def test_runner_cannot_find_mixed_case_decks(): + Path("deck.DATA").write_text("FOO", encoding="utf-8") + with pytest.raises(OSError, match="No such file: deck.data"): + run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "mocked_version", "deck" + ) + + +@pytest.mark.usefixtures("mocked_eclrun") +@pytest.mark.parametrize( + "data_path, expected", + [ + ("DECK.DATA", "DECK"), + ("foo/DECK.DATA", "DECK"), + ("foo/deck.data", "deck"), + ], +) +def test_runner_can_extract_base_name(data_path: str, expected: str): + Path(data_path).parent.mkdir(exist_ok=True) + Path(data_path).write_text("FOO", encoding="utf-8") + runner = run_reservoirsimulator.RunReservoirSimulator( + "eclipse", "mocked_version", data_path + ) + assert runner.base_name == expected + + +@pytest.mark.usefixtures("use_tmpdir") +def test_await_completed_summary_file_will_timeout_on_missing_smry(): + assert ( + # Expected wait time is 0.3 + run_reservoirsimulator.await_completed_unsmry_file( + "SPE1.UNSMRY", max_wait=0.3, poll_interval=0.1 + ) + > 0.3 + ) + + +@pytest.mark.usefixtures("use_tmpdir") +def test_await_completed_summary_file_will_return_asap(): + resfo.write("FOO.UNSMRY", [("INTEHEAD", np.array([1], dtype=np.int32))]) + assert ( + 0.01 + # Expected wait time is the poll_interval + < run_reservoirsimulator.await_completed_unsmry_file( + "FOO.UNSMRY", max_wait=0.5, poll_interval=0.1 + ) + < 0.4 + ) + + +@pytest.mark.flaky(reruns=5) +@pytest.mark.integration_test +@pytest.mark.usefixtures("use_tmpdir") +def test_await_completed_summary_file_will_wait_for_slow_smry(): + # This is a timing test, and has inherent flakiness: + # * Reading and writing to the same smry file at the same time + # can make the reading throw an exception every time, and can + # result in max_wait triggering. + # * If the writer thread is starved, two consecutive polls may + # yield the same summary length, resulting in premature exit. + # * Heavily loaded hardware can make everything go too slow. + def slow_smry_writer(): + for size in range(10): + resfo.write( + "FOO.UNSMRY", (size + 1) * [("INTEHEAD", np.array([1], dtype=np.int32))] + ) + time.sleep(0.05) + + thread = threading.Thread(target=slow_smry_writer) + thread.start() + time.sleep(0.1) # Let the thread start writing + assert ( + 0.5 + # Minimal wait time is around 0.55 + < run_reservoirsimulator.await_completed_unsmry_file( + "FOO.UNSMRY", max_wait=4, poll_interval=0.21 + ) + < 2 + ) + thread.join()