Skip to content

Commit

Permalink
Refactor and rename summary_block()
Browse files Browse the repository at this point in the history
The logic for determining stable was not correct, if the summary
file was not stable after two reads, it would always timeout. Behaviour
is not defined by tests.
  • Loading branch information
berland committed Oct 11, 2024
1 parent 96dd8d8 commit 2453a2c
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 55 deletions.
91 changes: 51 additions & 40 deletions src/ert/resources/forward_models/res/script/ecl_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def pushd(run_path):
os.chdir(starting_directory)


def _find_unsmry(case: str) -> str:
def find_unsmry(basepath: Path) -> Path:
def _is_unsmry(base: str, path: str) -> bool:
if "." not in path:
return False
Expand All @@ -165,15 +165,57 @@ def _is_unsmry(base: str, path: str) -> bool:
"funsmry",
]

dir, base = os.path.split(case)
candidates = list(filter(lambda x: _is_unsmry(base, x), os.listdir(dir or ".")))
dir = basepath.parent
base = basepath.name
candidates: List[str] = list(
filter(lambda x: _is_unsmry(base, x), os.listdir(dir or "."))
)
if not candidates:
raise ValueError(f"Could not find any unsmry matching case path {case}")
raise ValueError(f"Could not find any unsmry matching case path {basepath}")
if len(candidates) > 1:
raise ValueError(
f"Ambiguous reference to unsmry in {case}, could be any of {candidates}"
f"Ambiguous reference to unsmry in {basepath}, could be any of {candidates}"
)
return os.path.join(dir, candidates[0])
return Path(dir) / candidates[0]


def await_completed_unsmry_file(
smry_path: Path, max_wait: float = 15, poll_interval: float = 1.0
) -> float:
"""This function will wait until the provided smry file does not grow in size
during one poll interval.
Such a wait is sometimes needed when different MPI hosts write data to a shared
disk system.
If the file does not exist or is completely unreadable to resfo, this function
will timeout to max_wait. If NOSIM is included, this will happen.
Size is defined in terms of readable data elementes through resfo.
This function will always wait for at least one poll interval, the polling
interval is specified in seconds.
The return value is the waited time (in seconds)"""
start_time = datetime.datetime.now()
prev_len = 0
while (datetime.datetime.now() - start_time).total_seconds() < max_wait:
try:
resfo_sum = [r.read_keyword() for r in resfo.lazy_read(smry_path)]
except Exception:
time.sleep(poll_interval)
continue

current_len = len(resfo_sum)
if prev_len == current_len:
# smry file is regarded complete
break
else:
prev_len = max(prev_len, current_len)

time.sleep(poll_interval)

return (datetime.datetime.now() - start_time).total_seconds()


class EclRun:
Expand Down Expand Up @@ -408,44 +450,13 @@ def runEclipse(self, eclrun_config=None, retries_left=3, backoff_sleep=None):
else:
raise err from None
if self.num_cpu > 1:
self.summary_block()
await_completed_unsmry_file(
find_unsmry(Path(self.run_path) / self.base_name)
)

with open(OK_file, "w", encoding="utf-8") as f:
f.write("ECLIPSE simulation OK")

def summary_block(self):
case = os.path.join(self.run_path, self.base_name)
start_time = datetime.datetime.now()
prev_len = 0
while True:
dt = datetime.datetime.now() - start_time
if dt.total_seconds() > 15:
# We have not got a stable summary file after 15 seconds of
# waiting, this either implies that something is completely
# broken or this is a NOSIM simulation. Due the possibility of
# NOSIM solution we just return here without signalling an
# error.
return None

time.sleep(1)

try:
ecl_sum = [
r.read_keyword() for r in resfo.lazy_read(_find_unsmry(case))
]
except Exception:
continue

this_len = len(ecl_sum)
if prev_len == 0:
prev_len = this_len
continue

if prev_len == this_len:
break

return ecl_sum

def assertECLEND(self):
tail_length = 5000
result = self.readECLEND()
Expand Down
68 changes: 53 additions & 15 deletions tests/ert/unit_tests/resources/test_ecl_run_new_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
import stat
import subprocess
import textwrap
import threading
import time
from pathlib import Path
from unittest import mock

import numpy as np
import pytest
import resfo
import yaml

from tests.ert.utils import SOURCE_DIR
Expand Down Expand Up @@ -241,25 +245,59 @@ def test_mpi_run_is_managed_by_system_tool(source_root):
).exists(), "There should not be 3 MPI processes"


@pytest.mark.integration_test
@pytest.mark.requires_eclipse
@pytest.mark.usefixtures("use_tmpdir", "init_eclrun_config")
def test_summary_block_will_return_something(source_root):
shutil.copy(
source_root / "test-data/ert/eclipse/SPE1.DATA",
"SPE1.DATA",
def test_await_completed_summary_file_will_timeout_on_missing_smry():
assert (
# Expected wait time is 0.3
ecl_run.await_completed_unsmry_file(
"SPE1.UNSMRY", max_wait=0.3, poll_interval=0.1
)
> 0.3
)
econfig = ecl_config.Ecl100Config()
erun = ecl_run.EclRun("SPE1.DATA", None)

assert not Path("SPE1.UNSMRY").exists()

@pytest.mark.usefixtures("use_tmpdir")
def test_await_completed_summary_file_will_return_asap():
resfo.write("FOO.UNSMRY", [("INTEHEAD", np.array([1], dtype=np.int32))])
assert (
erun.summary_block() is None
), "summary_block() should return None where there is no SMRY file"
0.01
# Expected wait time is the poll_interval
< ecl_run.await_completed_unsmry_file(
"FOO.UNSMRY", max_wait=0.5, poll_interval=0.1
)
< 0.4
)

erun.runEclipse(eclrun_config=ecl_config.EclrunConfig(econfig, "2019.3"))
assert Path("SPE1.UNSMRY").exists()
assert erun.summary_block() is not None

@pytest.mark.flaky(reruns=5)
@pytest.mark.integration_test
@pytest.mark.usefixtures("use_tmpdir")
def test_await_completed_summary_file_will_wait_for_slow_smry():
# This is a timing test, and has inherent flakiness:
# * Reading and writing to the same smry file at the same time
# can make the reading throw an exception every time, and can
# result in max_wait triggering.
# * If the writer thread is starved, two consecutive polls may
# yield the same summary length, resulting in premature exit.
# * Heavily loaded hardware can make everything go too slow.
def slow_smry_writer():
for size in range(10):
resfo.write(
"FOO.UNSMRY", (size + 1) * [("INTEHEAD", np.array([1], dtype=np.int32))]
)
time.sleep(0.05)

thread = threading.Thread(target=slow_smry_writer)
thread.start()
time.sleep(0.1) # Let the thread start writing
assert (
0.5
# Minimal wait time is around 0.55
< ecl_run.await_completed_unsmry_file(
"FOO.UNSMRY", max_wait=4, poll_interval=0.21
)
< 2
)
thread.join()


@pytest.mark.usefixtures("use_tmpdir")
Expand Down

0 comments on commit 2453a2c

Please sign in to comment.