Skip to content

Commit

Permalink
Enable load results manually from any available iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
DanSava committed Oct 8, 2024
1 parent 56d011f commit caecbbf
Show file tree
Hide file tree
Showing 12 changed files with 187 additions and 116 deletions.
78 changes: 62 additions & 16 deletions src/ert/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
import logging
import time
from pathlib import Path
from typing import Iterable

from ert.config import ParameterConfig, ResponseConfig
from ert.run_arg import RunArg
from ert.storage import Ensemble
from ert.storage.realization_storage_state import RealizationStorageState

from .load_status import LoadResult, LoadStatus
Expand All @@ -16,24 +15,27 @@


async def _read_parameters(
run_arg: RunArg, parameter_configuration: Iterable[ParameterConfig]
run_path: str,
realization: int,
ensemble: Ensemble,
) -> LoadResult:
result = LoadResult(LoadStatus.LOAD_SUCCESSFUL, "")
error_msg = ""
parameter_configuration = ensemble.experiment.parameter_configuration.values()
for config in parameter_configuration:
if not config.forward_init:
continue
try:
start_time = time.perf_counter()
logger.debug(f"Starting to load parameter: {config.name}")
ds = config.read_from_runpath(Path(run_arg.runpath), run_arg.iens)
ds = config.read_from_runpath(Path(run_path), realization)
await asyncio.sleep(0)
logger.debug(
f"Loaded {config.name}",
extra={"Time": f"{(time.perf_counter() - start_time):.4f}s"},
)
start_time = time.perf_counter()
run_arg.ensemble_storage.save_parameters(config.name, run_arg.iens, ds)
ensemble.save_parameters(config.name, realization, ds)
await asyncio.sleep(0)
logger.debug(
f"Saved {config.name} to storage",
Expand All @@ -42,36 +44,37 @@ async def _read_parameters(
except Exception as err:
error_msg += str(err)
result = LoadResult(LoadStatus.LOAD_FAILURE, error_msg)
logger.warning(f"Failed to load: {run_arg.iens}", exc_info=err)
logger.warning(f"Failed to load: {realization}", exc_info=err)
return result


async def _write_responses_to_storage(
run_arg: RunArg, response_configs: Iterable[ResponseConfig]
run_path: str,
realization: int,
ensemble: Ensemble,
) -> LoadResult:
errors = []
response_configs = ensemble.experiment.response_configuration.values()
for config in response_configs:
try:
start_time = time.perf_counter()
logger.debug(f"Starting to load response: {config.response_type}")
ds = config.read_from_file(run_arg.runpath, run_arg.iens)
ds = config.read_from_file(run_path, realization)
await asyncio.sleep(0)
logger.debug(
f"Loaded {config.response_type}",
extra={"Time": f"{(time.perf_counter() - start_time):.4f}s"},
)
start_time = time.perf_counter()
run_arg.ensemble_storage.save_response(
config.response_type, ds, run_arg.iens
)
ensemble.save_response(config.response_type, ds, realization)
await asyncio.sleep(0)
logger.debug(
f"Saved {config.response_type} to storage",
extra={"Time": f"{(time.perf_counter() - start_time):.4f}s"},
)
except ValueError as err:
errors.append(str(err))
logger.warning(f"Failed to write: {run_arg.iens}", exc_info=err)
logger.warning(f"Failed to write: {realization}", exc_info=err)
if errors:
return LoadResult(LoadStatus.LOAD_FAILURE, "\n".join(errors))
return LoadResult(LoadStatus.LOAD_SUCCESSFUL, "")
Expand All @@ -87,14 +90,16 @@ async def forward_model_ok(
# handles parameters
if run_arg.itr == 0:
parameters_result = await _read_parameters(
run_arg,
run_arg.ensemble_storage.experiment.parameter_configuration.values(),
run_arg.runpath,
run_arg.iens,
run_arg.ensemble_storage,
)

if parameters_result.status == LoadStatus.LOAD_SUCCESSFUL:
response_result = await _write_responses_to_storage(
run_arg,
run_arg.ensemble_storage.experiment.response_configuration.values(),
run_arg.runpath,
run_arg.iens,
run_arg.ensemble_storage,
)

except Exception as err:
Expand All @@ -115,3 +120,44 @@ async def forward_model_ok(
run_arg.ensemble_storage.unset_failure(run_arg.iens)

return final_result


# TODO re-name this function
async def load_from_run_path(
run_path: str,
realization: int,
ensemble: Ensemble,
) -> LoadResult:
response_result = LoadResult(LoadStatus.LOAD_SUCCESSFUL, "")
try:
parameters_result = await _read_parameters(
run_path,
realization,
ensemble,
)

if parameters_result.status == LoadStatus.LOAD_SUCCESSFUL:
response_result = await _write_responses_to_storage(
run_path,
realization,
ensemble,
)

except Exception as err:
logger.exception(f"Failed to load results for realization {realization}")
parameters_result = LoadResult(
LoadStatus.LOAD_FAILURE,
"Failed to load results for realization "
f"{realization}, failed with: {err}",
)

final_result = parameters_result
if response_result.status != LoadStatus.LOAD_SUCCESSFUL:
final_result = response_result
ensemble.set_failure(
realization, RealizationStorageState.LOAD_FAILURE, final_result.message
)
elif ensemble.has_failure(realization):
ensemble.unset_failure(realization)

return final_result
10 changes: 5 additions & 5 deletions src/ert/config/gen_data_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ def from_config_dict(cls, config_dict: ConfigDict) -> Optional[Self]:
)

def read_from_file(self, run_path: str, _: int) -> polars.DataFrame:
def _read_file(filename: Path, report_step: int) -> polars.DataFrame:
if not filename.exists():
raise ValueError(f"Missing output file: {filename}")
data = np.loadtxt(_run_path / filename, ndmin=1)
active_information_file = _run_path / (str(filename) + "_active")
def _read_file(file_path: Path, report_step: int) -> polars.DataFrame:
if not file_path.exists():
raise ValueError(f"Missing output file: {file_path}")
data = np.loadtxt(file_path, ndmin=1)
active_information_file = _run_path / (file_path.name + "_active")
if active_information_file.exists():
active_list = np.loadtxt(active_information_file)
data[active_list == 0] = np.nan
Expand Down
57 changes: 20 additions & 37 deletions src/ert/gui/tools/load_results/load_results_panel.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
ErtMessageBox,
QApplication,
StringBox,
ValueModel,
)
from ert.libres_facade import LibresFacade
from ert.run_models.base_run_model import captured_logs
from ert.validation import IntegerArgument, RangeStringArgument
from ert.validation import RangeStringArgument


class LoadResultsPanel(QWidget):
Expand All @@ -34,12 +33,12 @@ def __init__(self, facade: LibresFacade, notifier: ErtNotifier):

layout = QFormLayout()

run_path_text = QTextEdit()
run_path_text.setText(self.readCurrentRunPath())
run_path_text.setDisabled(True)
run_path_text.setFixedHeight(80)
self._run_path_text = QTextEdit()
self._run_path_text.setText(self.readCurrentRunPath())
self._run_path_text.setFixedHeight(80)
self._run_path_text.setObjectName("run_path_edit_lrm")

layout.addRow("Load data from current run path: ", run_path_text)
layout.addRow("Load data from run path: ", self._run_path_text)

ensemble_selector = EnsembleSelector(self._notifier)
layout.addRow("Load into ensemble:", ensemble_selector)
Expand All @@ -58,21 +57,13 @@ def __init__(self, facade: LibresFacade, notifier: ErtNotifier):
self._active_realizations_field.setObjectName("active_realizations_lrm")
layout.addRow("Realizations to load:", self._active_realizations_field)

self._iterations_model = ValueModel(0) # type: ignore
self._iterations_field = StringBox(
self._iterations_model, # type: ignore
"load_results_manually/iterations",
)
self._iterations_field.setValidator(IntegerArgument(from_value=0))
self._iterations_field.setObjectName("iterations_field_lrm")
layout.addRow("Iteration to load:", self._iterations_field)

self._active_realizations_field.getValidationSupport().validationChanged.connect(
self.panelConfigurationChanged
)
self._iterations_field.getValidationSupport().validationChanged.connect(
self.panelConfigurationChanged
)
# TODO replace with runpath change trigger
# self._iterations_field.getValidationSupport().validationChanged.connect(
# self.panelConfigurationChanged
# )

self.setLayout(layout)

Expand All @@ -84,37 +75,29 @@ def readCurrentRunPath(self) -> str:
return run_path

def isConfigurationValid(self) -> bool:
return (
self._active_realizations_field.isValid()
and self._iterations_field.isValid()
)
return self._active_realizations_field.isValid()

def load(self) -> int:
selected_ensemble = self._notifier.current_ensemble
realizations = self._active_realizations_model.getActiveRealizationsMask()
iteration = self._iterations_model.getValue()
try:
if iteration is None:
iteration = ""
iteration_int = int(iteration)
except ValueError:
run_path = self._run_path_text.toPlainText()
active_realizations = [
iens for iens, active in enumerate(realizations) if active
]
if "<IENS>" not in run_path:
QMessageBox.warning(
self,
"Warning",
(
"Expected an integer number in iteration field, "
f'got "{iteration}"'
),
"<IENS> placeholder missing from run_path",
)
return False

QApplication.setOverrideCursor(Qt.CursorShape.WaitCursor)
messages: list[str] = []
with captured_logs(messages):
loaded = self._facade.load_from_forward_model(
loaded = self._facade.load_from_run_path(
run_path,
selected_ensemble, # type: ignore
realizations, # type: ignore
iteration_int,
active_realizations,
)
QApplication.restoreOverrideCursor()

Expand Down
61 changes: 28 additions & 33 deletions src/ert/libres_facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import warnings
from multiprocessing.pool import ThreadPool
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -20,7 +21,7 @@
from pandas import DataFrame

from ert.analysis import AnalysisEvent, SmootherSnapshot, smoother_update
from ert.callbacks import forward_model_ok
from ert.callbacks import forward_model_ok, load_from_run_path
from ert.config import (
EnkfObservationImplementationType,
ErtConfig,
Expand All @@ -29,10 +30,8 @@
from ert.data import MeasuredData
from ert.data._measured_data import ObservationError, ResponseError
from ert.load_status import LoadResult, LoadStatus
from ert.run_arg import create_run_arguments

from .plugins import ErtPluginContext
from .runpaths import Runpaths

_logger = logging.getLogger(__name__)

Expand All @@ -55,6 +54,15 @@ def _load_realization(
return result, realisation


def _load_realization_from_run_path(
run_path: str,
realization: int,
ensemble: Ensemble,
) -> Tuple[LoadResult, int]:
result = asyncio.run(load_from_run_path(run_path, realization, ensemble))
return result, realization


class LibresFacade:
"""The intention of this class is to expose properties or data of ert
commonly used in other project. It is part of the public interface of ert,
Expand Down Expand Up @@ -126,52 +134,39 @@ def load_from_forward_model(
self,
ensemble: Ensemble,
realisations: npt.NDArray[np.bool_],
iteration: Optional[int] = None,
) -> int:
if iteration is not None:
warnings.warn(
"The iteration argument has no effect, iteration is read from ensemble",
DeprecationWarning,
stacklevel=1,
)
t = time.perf_counter()
run_args = create_run_arguments(
Runpaths(
jobname_format=self.config.model_config.jobname_format_string,
runpath_format=self.config.model_config.runpath_format_string,
filename=str(self.config.runpath_file),
substitution_list=self.config.substitution_list,
eclbase=self.config.ensemble_config.eclbase,
),
realisations,
ensemble=ensemble,
)
nr_loaded = self._load_from_run_path(
self.config.model_config.num_realizations,
run_args,
realisations,
nr_loaded = self.load_from_run_path(
str(Path(self.run_path).resolve()),
ensemble,
[r for r, active in enumerate(realisations) if active],
)
_logger.debug(
f"load_from_forward_model() time_used {(time.perf_counter() - t):.4f}s"
)
return nr_loaded

@staticmethod
def _load_from_run_path(
ensemble_size: int,
run_args: List[RunArg],
active_realizations: npt.NDArray[np.bool_],
def load_from_run_path(
run_path_format: str,
ensemble: Ensemble,
active_realizations: List[int],
) -> int:
"""Returns the number of loaded realizations"""
pool = ThreadPool(processes=8)

async_result = [
pool.apply_async(
_load_realization,
(iens, run_args),
_load_realization_from_run_path,
(
run_path_format.replace("<IENS>", str(realization)).replace(
"<ITER>", "0"
),
realization,
ensemble,
),
)
for iens in range(ensemble_size)
if active_realizations[iens]
for realization in active_realizations
]

loaded = 0
Expand Down
2 changes: 1 addition & 1 deletion src/everest/bin/everload_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def _internalize_batch(ert_config, batch_id, batch_data):
realizations = [True] * batch_size + [False] * (
facade.get_ensemble_size() - batch_size
)
facade.load_from_forward_model(ensemble, realizations, 0)
facade.load_from_forward_model(ensemble, realizations)


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit caecbbf

Please sign in to comment.