Skip to content

Commit

Permalink
fixes..
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Dec 9, 2024
1 parent 1fa5ed6 commit 4b4f864
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 37 deletions.
53 changes: 29 additions & 24 deletions src/ert/resources/forward_models/run_reservoirsimulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import resfo


def ecl_output_has_license_error(ecl_output: str):
def ecl_output_has_license_error(ecl_output: str) -> bool:
return (
"LICENSE ERROR" in ecl_output
or "LICENSE FAILURE" in ecl_output
Expand Down Expand Up @@ -132,14 +132,14 @@ class RunReservoirSimulator:
def __init__(
self,
simulator: Literal["flow", "eclipse", "e300"],
version: str,
ecl_case: str, # consider Path
version: Optional[str],
ecl_case: Union[Path, str],
num_cpu: int = 1,
check_status: bool = True,
summary_conversion: bool = False,
):
self.simulator = simulator
self.version: str = version
self.version: Optional[str] = version

self.num_cpu: int = int(num_cpu)
self.check_status: bool = check_status
Expand Down Expand Up @@ -168,9 +168,9 @@ def __init__(
self.bypass_flowrun = True
self.runner_abspath: str = str(_runner_abspath)

data_file = ecl_case_to_data_file(ecl_case)
data_file = ecl_case_to_data_file(Path(ecl_case))

if not Path(data_file).exists:
if not Path(data_file).exists():
raise IOError(f"No such file: {data_file}")

self.run_path: Path = Path(data_file).parent.absolute()
Expand All @@ -187,7 +187,7 @@ def eclrun_command(self) -> List[str]:
self.runner_abspath,
self.simulator,
"--version",
self.version,
str(self.version),
self.data_file,
"--summary-conversion",
"yes" if self.summary_conversion else "no",
Expand All @@ -204,13 +204,13 @@ def flowrun_command(self) -> List[str]:
return [
self.runner_abspath,
"--version",
self.version,
str(self.version),
self.data_file,
"--np",
str(self.num_cpu),
]

def runFlow(self) -> None:
def run_flow(self) -> None:
return_code = subprocess.run(self.flowrun_command, check=False).returncode
OK_file = self.run_path / f"{self.base_name}.OK"
if not self.check_status:
Expand All @@ -223,15 +223,19 @@ def runFlow(self) -> None:
raise subprocess.CalledProcessError(return_code, self.flowrun_command)
self.assertECLEND()
if self.num_cpu > 1:
await_completed_unsmry_file(find_unsmry(self.run_path / self.base_name))
smry_file = find_unsmry(self.run_path / self.base_name)
if smry_file is not None:
await_completed_unsmry_file(smry_file)

OK_file.write_text("FLOW simulation OK", encoding="utf-8")

LICENSE_FAILURE_RETRY_INITIAL_SLEEP = 90
LICENSE_RETRY_STAGGER_FACTOR = 60
LICENSE_RETRY_BACKOFF_EXPONENT = 3

def runEclipseX00(self, retries_left=3, backoff_sleep=None) -> None:
def run_eclipseX00(
self, retries_left: int = 3, backoff_sleep: Optional[float] = None
) -> None:
# This function calls itself recursively in case of license failures
backoff_sleep = (
self.LICENSE_FAILURE_RETRY_INITIAL_SLEEP
Expand Down Expand Up @@ -261,7 +265,7 @@ def runEclipseX00(self, retries_left=3, backoff_sleep=None) -> None:
f"retrying in {time_to_wait} seconds\n"
)
time.sleep(time_to_wait)
self.runEclipseX00(
self.run_eclipseX00(
retries_left=retries_left - 1,
backoff_sleep=int(
backoff_sleep * self.LICENSE_RETRY_BACKOFF_EXPONENT
Expand All @@ -271,11 +275,13 @@ def runEclipseX00(self, retries_left=3, backoff_sleep=None) -> None:
else:
raise err from None
if self.num_cpu > 1:
await_completed_unsmry_file(find_unsmry(self.run_path / self.base_name))
smry_file = find_unsmry(self.run_path / self.base_name)
if smry_file is not None:
await_completed_unsmry_file(smry_file)

OK_file.write_text("ECLIPSE simulation OK", encoding="utf-8")

def assertECLEND(self):
def assertECLEND(self) -> None:
tail_length = 5000
result = self.readECLEND()
if result.errors > 0:
Expand All @@ -301,12 +307,12 @@ def assertECLEND(self):
if result.bugs > 0:
raise EclError(f"Eclipse simulation failed with:{result.bugs:d} bugs")

def readECLEND(self):
def readECLEND(self) -> EclipseResult:
error_regexp = re.compile(r"^\s*Errors\s+(\d+)\s*$")
bug_regexp = re.compile(r"^\s*Bugs\s+(\d+)\s*$")

report_file = os.path.join(self.run_path, f"{self.base_name}.ECLEND")
if not os.path.isfile(report_file):
report_file = self.run_path / f"{self.base_name}.ECLEND"
if not report_file.is_file():
report_file = self.prt_path

errors = None
Expand Down Expand Up @@ -362,7 +368,7 @@ def tail_textfile(file_path: Path, num_chars: int) -> str:
return file.read()[-num_chars:]


def run_reservoirsimulator(args: List[str]):
def run_reservoirsimulator(args: List[str]) -> None:
parser = ArgumentParser()
parser.add_argument("simulator", type=str, choices=["flow", "eclipse", "e300"])
parser.add_argument("version", type=str)
Expand Down Expand Up @@ -405,14 +411,13 @@ def run_reservoirsimulator(args: List[str]):
sys.exit(-1)


def ecl_case_to_data_file(ecl_case: str) -> str:
ext: str = Path(ecl_case).suffix
if ext in [".data", ".DATA"]:
def ecl_case_to_data_file(ecl_case: Path) -> Path:
if ecl_case.suffix in [".data", ".DATA"]:
return ecl_case
elif ecl_case.islower():
return ecl_case + ".data"
elif str(ecl_case).islower():
return Path(str(ecl_case) + ".data")
else:
return ecl_case + ".DATA"
return Path(str(ecl_case) + ".DATA")


if __name__ == "__main__":
Expand Down
6 changes: 3 additions & 3 deletions tests/ert/unit_tests/resources/test_run_eclipse_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def test_ecl100_binary_can_produce_output(source_root):
erun = run_reservoirsimulator.RunReservoirSimulator(
"eclipse", "2019.3", "SPE1.DATA"
)
erun.runEclipseX00()
erun.run_eclipseX00()

ok_path = Path("SPE1.OK")
prt_path = Path("SPE1.PRT")
Expand Down Expand Up @@ -205,7 +205,7 @@ def test_eclrun_will_raise_on_deck_errors(source_root):
"eclipse", "2019.3", "SPE1_ERROR"
)
with pytest.raises(Exception, match="ERROR"):
erun.runEclipseX00()
erun.run_eclipseX00()


@pytest.mark.integration_test
Expand All @@ -225,7 +225,7 @@ def test_failed_run_gives_nonzero_returncode_and_exception(monkeypatch):
subprocess.CalledProcessError,
match=r"Command .*eclrun.* non-zero exit status (1|255)\.$",
):
erun.runEclipseX00()
erun.run_eclipseX00()


@pytest.mark.integration_test
Expand Down
53 changes: 43 additions & 10 deletions tests/ert/unit_tests/resources/test_run_flow_simulator.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import os
import shutil
import stat
from pathlib import Path
from subprocess import CalledProcessError
from unittest import mock

import pytest

Expand All @@ -29,15 +29,48 @@ def test_flow_can_produce_output(source_root):
shutil.copy(source_root / "test-data/ert/eclipse/SPE1.DATA", "SPE1.DATA")
run_reservoirsimulator.RunReservoirSimulator(
"flow", FLOW_VERSION, "SPE1.DATA"
).runFlow()
).run_flow()
assert Path("SPE1.UNSMRY").exists()


@mock.patch.dict(os.environ, {"FLOWRUN_PATH": ""}, clear=True)
def test_flowrun_can_be_bypassed(tmp_path, monkeypatch):
def test_flowrun_can_be_bypassed_when_flow_is_available(tmp_path, monkeypatch):
# Set FLOWRUN_PATH to a path guaranteed not to contain flowrun
monkeypatch.setenv("FLOWRUN_PATH", str(tmp_path))
print(shutil.which("flowrun"))
pass
# Add a mocked flow to PATH
monkeypatch.setenv("PATH", f"{tmp_path}:{os.environ['PATH']}")
mocked_flow = Path(tmp_path / "flow")
mocked_flow.write_text("", encoding="utf-8")
mocked_flow.chmod(mocked_flow.stat().st_mode | stat.S_IEXEC)
Path("DUMMY.DATA").write_text("", encoding="utf-8")
runner = run_reservoirsimulator.RunReservoirSimulator("flow", None, "DUMMY.DATA")
assert runner.bypass_flowrun is True


def test_flowrun_cannot_be_bypassed_for_parallel_runs(tmp_path, monkeypatch):
# Set FLOWRUN_PATH to a path guaranteed not to contain flowrun
monkeypatch.setenv("FLOWRUN_PATH", str(tmp_path))
# Add a mocked flow to PATH
monkeypatch.setenv("PATH", f"{tmp_path}:{os.environ['PATH']}")
mocked_flow = Path(tmp_path / "flow")
mocked_flow.write_text("", encoding="utf-8")
mocked_flow.chmod(mocked_flow.stat().st_mode | stat.S_IEXEC)

with pytest.raises(
RuntimeError, match="MPI runs not supported without a flowrun wrapper"
):
run_reservoirsimulator.RunReservoirSimulator(
"flow", None, "DUMMY.DATA", num_cpu=2
)


@pytest.mark.integration_test
@pytest.mark.skipif(not shutil.which("flow"), reason="flow not available")
def test_run_flow_with_no_flowrun(tmp_path, monkeypatch, source_root):
# Set FLOWRUN_PATH to a path guaranteed not to contain flowrun
monkeypatch.setenv("FLOWRUN_PATH", str(tmp_path))
shutil.copy(source_root / "test-data/ert/eclipse/SPE1.DATA", "SPE1.DATA")
run_reservoirsimulator.RunReservoirSimulator("flow", None, "SPE1.DATA").run_flow()
assert Path("SPE1.UNSMRY").exists()


@pytest.mark.integration_test
Expand All @@ -49,7 +82,7 @@ def test_flowrunner_will_raise_when_flow_fails(source_root):
with pytest.raises(CalledProcessError, match="returned non-zero exit status 1"):
run_reservoirsimulator.RunReservoirSimulator(
"flow", FLOW_VERSION, "SPE1_ERROR.DATA"
).runFlow()
).run_flow()


@pytest.mark.integration_test
Expand All @@ -60,7 +93,7 @@ def test_flowrunner_will_can_ignore_flow_errors(source_root):
)
run_reservoirsimulator.RunReservoirSimulator(
"flow", FLOW_VERSION, "SPE1_ERROR.DATA", check_status=False
).runFlow()
).run_flow()


@pytest.mark.integration_test
Expand All @@ -69,7 +102,7 @@ def test_flowrunner_will_raise_on_unknown_version():
with pytest.raises(CalledProcessError):
run_reservoirsimulator.RunReservoirSimulator(
"flow", "garbled_version", ""
).runFlow()
).run_flow()


@pytest.mark.integration_test
Expand All @@ -83,4 +116,4 @@ def test_flow_with_parallel_keyword(source_root):
)
run_reservoirsimulator.RunReservoirSimulator(
"flow", FLOW_VERSION, "SPE1_PARALLEL.DATA"
).runFlow()
).run_flow()

0 comments on commit 4b4f864

Please sign in to comment.