From e990b65a485f2d097bb2989fbb6b889347153713 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Tue, 22 Oct 2024 13:49:12 +0200 Subject: [PATCH 1/3] Sync response config keys w/ actual response keys --- src/ert/config/gen_data_config.py | 1 + src/ert/config/response_config.py | 1 + src/ert/config/summary_config.py | 1 + src/ert/dark_storage/common.py | 15 +- src/ert/dark_storage/endpoints/records.py | 15 +- src/ert/storage/local_ensemble.py | 46 ++-- src/ert/storage/local_experiment.py | 62 +++++- src/everest/export.py | 6 +- .../test_that_storage_matches/responses | 2 +- .../unit_tests/storage/test_local_storage.py | 196 ++++++++++++++++-- .../storage/test_storage_migration.py | 5 +- tests/ert/unit_tests/test_libres_facade.py | 14 +- 12 files changed, 303 insertions(+), 61 deletions(-) diff --git a/src/ert/config/gen_data_config.py b/src/ert/config/gen_data_config.py index 9e90c2b0b51..0c944556e99 100644 --- a/src/ert/config/gen_data_config.py +++ b/src/ert/config/gen_data_config.py @@ -22,6 +22,7 @@ class GenDataConfig(ResponseConfig): report_steps_list: List[Optional[List[int]]] = dataclasses.field( default_factory=list ) + has_finalized_keys: bool = True def __post_init__(self) -> None: if len(self.report_steps_list) == 0: diff --git a/src/ert/config/response_config.py b/src/ert/config/response_config.py index c3335ee7258..b2bbbd8abd0 100644 --- a/src/ert/config/response_config.py +++ b/src/ert/config/response_config.py @@ -21,6 +21,7 @@ class ResponseConfig(ABC): name: str input_files: List[str] = dataclasses.field(default_factory=list) keys: List[str] = dataclasses.field(default_factory=list) + has_finalized_keys: bool = False @abstractmethod def read_from_file(self, run_path: str, iens: int) -> polars.DataFrame: diff --git a/src/ert/config/summary_config.py b/src/ert/config/summary_config.py index dd2aa49c76f..0fc934618c8 100644 --- a/src/ert/config/summary_config.py +++ b/src/ert/config/summary_config.py @@ -23,6 +23,7 @@ class SummaryConfig(ResponseConfig): name: str = "summary" refcase: Union[Set[datetime], List[str], None] = None + has_finalized_keys = False def __post_init__(self) -> None: if isinstance(self.refcase, list): diff --git a/src/ert/dark_storage/common.py b/src/ert/dark_storage/common.py index c6871c3e452..2d7753956d5 100644 --- a/src/ert/dark_storage/common.py +++ b/src/ert/dark_storage/common.py @@ -73,7 +73,13 @@ def ensemble_parameters(storage: Storage, ensemble_id: UUID) -> List[Dict[str, A return param_list -def gen_data_keys(ensemble: Ensemble) -> Iterator[str]: +def get_response_names(ensemble: Ensemble) -> List[str]: + result = ensemble.experiment.response_type_to_response_keys["summary"] + result.extend(sorted(gen_data_display_keys(ensemble), key=lambda k: k.lower())) + return result + + +def gen_data_display_keys(ensemble: Ensemble) -> Iterator[str]: gen_data_config = ensemble.experiment.response_configuration.get("gen_data") if gen_data_config: @@ -245,7 +251,7 @@ def get_observation_keys_for_response( Get all observation keys for given response key """ - if displayed_response_key in gen_data_keys(ensemble): + if displayed_response_key in gen_data_display_keys(ensemble): response_key, report_step = displayed_key_to_response_key["gen_data"]( displayed_response_key ) @@ -262,7 +268,10 @@ def get_observation_keys_for_response( return filtered["observation_key"].unique().to_list() - elif displayed_response_key in ensemble.get_summary_keyset(): + elif ( + displayed_response_key + in ensemble.experiment.response_type_to_response_keys["summary"] + ): response_key = displayed_key_to_response_key["summary"](displayed_response_key)[ 0 ] diff --git a/src/ert/dark_storage/endpoints/records.py b/src/ert/dark_storage/endpoints/records.py index 70be40d4070..365a144c32e 100644 --- a/src/ert/dark_storage/endpoints/records.py +++ b/src/ert/dark_storage/endpoints/records.py @@ -12,13 +12,14 @@ from ert.dark_storage.common import ( data_for_key, ensemble_parameters, - gen_data_keys, + gen_data_display_keys, get_observation_keys_for_response, get_observations_for_obs_keys, response_key_to_displayed_key, ) from ert.dark_storage.enkf import get_storage from ert.storage import Storage +from ert.storage.realization_storage_state import RealizationStorageState router = APIRouter(tags=["record"]) @@ -133,7 +134,15 @@ def get_ensemble_responses( ) response_names_with_observations.update(set(obs_with_responses)) - for name in ensemble.get_summary_keyset(): + has_responses = any( + s == RealizationStorageState.HAS_DATA for s in ensemble.get_ensemble_state() + ) + + for name in ( + ensemble.experiment.response_type_to_response_keys.get("summary", []) + if has_responses + else [] + ): response_map[str(name)] = js.RecordOut( id=UUID(int=0), name=name, @@ -141,7 +150,7 @@ def get_ensemble_responses( has_observations=name in response_names_with_observations, ) - for name in gen_data_keys(ensemble): + for name in gen_data_display_keys(ensemble) if has_responses else []: response_map[str(name)] = js.RecordOut( id=UUID(int=0), name=name, diff --git a/src/ert/storage/local_ensemble.py b/src/ert/storage/local_ensemble.py index b32ccca7548..f54464a97ee 100644 --- a/src/ert/storage/local_ensemble.py +++ b/src/ert/storage/local_ensemble.py @@ -29,7 +29,6 @@ logger = logging.getLogger(__name__) import polars -from polars.exceptions import ColumnNotFoundError class _Index(BaseModel): @@ -311,11 +310,22 @@ def _has_response(_key: str) -> bool: if key: return _has_response(key) - return all( - _has_response(response) - for response in self.experiment.response_configuration + is_expecting_any_responses = any( + bool(config.keys) + for config in self.experiment.response_configuration.values() ) + if not is_expecting_any_responses: + return True + + non_empty_response_configs = [ + response + for response, config in self.experiment.response_configuration.items() + if bool(config.keys) + ] + + return all(_has_response(response) for response in non_empty_response_configs) + def is_initalized(self) -> List[int]: """ Return the realization numbers where all parameters are internalized. In @@ -502,27 +512,6 @@ def _find_state(realization: int) -> RealizationStorageState: return [_find_state(i) for i in range(self.ensemble_size)] - def get_summary_keyset(self) -> List[str]: - """ - Find the first folder with summary data then load the - summary keys from this. - - Returns - ------- - keys : list of str - List of summary keys. - """ - - try: - summary_data = self.load_responses( - "summary", - tuple(self.get_realization_list_with_responses("summary")), - ) - - return sorted(summary_data["response_key"].unique().to_list()) - except (ValueError, KeyError, ColumnNotFoundError): - return [] - def _load_single_dataset( self, group: str, @@ -696,8 +685,6 @@ def load_all_summary_data( raise IndexError(f"No such realization {realization_index}") realizations = [realization_index] - summary_keys = self.get_summary_keyset() - try: df_pl = self.load_responses("summary", tuple(realizations)) @@ -715,6 +702,7 @@ def load_all_summary_data( ) if keys: + summary_keys = self.experiment.response_type_to_response_keys["summary"] summary_keys = sorted( [key for key in keys if key in summary_keys] ) # ignore keys that doesn't exist @@ -877,6 +865,10 @@ def save_response( output_path / f"{response_type}.parquet", data ) + if not self.experiment._has_finalized_response_keys(response_type): + response_keys = data["response_key"].unique().to_list() + self.experiment._update_response_keys(response_type, response_keys) + def calculate_std_dev_for_parameter(self, parameter_group: str) -> xr.Dataset: if parameter_group not in self.experiment.parameter_configuration: raise ValueError(f"{parameter_group} is not registered to the experiment.") diff --git a/src/ert/storage/local_experiment.py b/src/ert/storage/local_experiment.py index f4c350b9b15..c35a25b5222 100644 --- a/src/ert/storage/local_experiment.py +++ b/src/ert/storage/local_experiment.py @@ -334,7 +334,67 @@ def observation_keys(self) -> List[str]: def response_key_to_response_type(self) -> Dict[str, str]: mapping = {} for config in self.response_configuration.values(): - for key in config.keys: + for key in config.keys if config.has_finalized_keys else []: mapping[key] = config.response_type return mapping + + @cached_property + def response_type_to_response_keys(self) -> Dict[str, List[str]]: + result: Dict[str, List[str]] = {} + + for response_key, response_type in self.response_key_to_response_type.items(): + if response_type not in result: + result[response_type] = [] + + result[response_type].append(response_key) + + for keys in result.values(): + keys.sort() + + return result + + def _has_finalized_response_keys(self, response_type: str) -> bool: + responses_configuration = self.response_configuration + if response_type not in responses_configuration: + raise KeyError( + f"Response type {response_type} does not exist in current responses.json" + ) + + return responses_configuration[response_type].has_finalized_keys + + def _update_response_keys( + self, response_type: str, response_keys: List[str] + ) -> None: + """ + When a response is saved to storage, it may contain keys + that are not explicitly declared in the config. Calling this ensures + that the response config saved in this storage has keys corresponding + to the actual received responses. + """ + responses_configuration = self.response_configuration + if response_type not in responses_configuration: + raise KeyError( + f"Response type {response_type} does not exist in current responses.json" + ) + + config = responses_configuration[response_type] + config.keys = sorted(response_keys) + config.has_finalized_keys = True + self._storage._write_transaction( + self._path / self._responses_file, + json.dumps( + { + c.response_type: c.to_dict() + for c in responses_configuration.values() + }, + default=str, + indent=2, + ).encode("utf-8"), + ) + + if self.response_key_to_response_type is not None: + del self.response_key_to_response_type + + if self.response_type_to_response_keys is not None: + del self.response_type_to_response_keys diff --git a/src/everest/export.py b/src/everest/export.py index 21417a88645..af729ace764 100644 --- a/src/everest/export.py +++ b/src/everest/export.py @@ -171,10 +171,12 @@ def get_internalized_keys( ensemble = experiment.get_ensemble_by_name(case_name) if not internal_keys: - internal_keys = set(ensemble.get_summary_keyset()) + internal_keys = set( + ensemble.experiment.response_type_to_response_keys["summary"] + ) else: internal_keys = internal_keys.intersection( - set(ensemble.get_summary_keyset()) + set(ensemble.experiment.response_type_to_response_keys["summary"]) ) return internal_keys diff --git a/tests/ert/unit_tests/storage/snapshots/test_storage_migration/test_that_storage_matches/responses b/tests/ert/unit_tests/storage/snapshots/test_storage_migration/test_that_storage_matches/responses index 34a2a7fb3f7..598abfea72f 100644 --- a/tests/ert/unit_tests/storage/snapshots/test_storage_migration/test_that_storage_matches/responses +++ b/tests/ert/unit_tests/storage/snapshots/test_storage_migration/test_that_storage_matches/responses @@ -1 +1 @@ -{'gen_data': GenDataConfig(name='gen_data', input_files=['gen%d.txt'], keys=['GEN'], report_steps_list=[[1]]), 'summary': SummaryConfig(name='summary', input_files=['CASE'], keys=['FOPR', 'RWPR'], refcase={})} +{'gen_data': GenDataConfig(name='gen_data', input_files=['gen%d.txt'], keys=['GEN'], has_finalized_keys=True, report_steps_list=[[1]]), 'summary': SummaryConfig(name='summary', input_files=['CASE'], keys=['FOPR', 'RWPR'], has_finalized_keys=False, refcase={})} diff --git a/tests/ert/unit_tests/storage/test_local_storage.py b/tests/ert/unit_tests/storage/test_local_storage.py index 1d6236fa4c9..b38d7c70ed1 100644 --- a/tests/ert/unit_tests/storage/test_local_storage.py +++ b/tests/ert/unit_tests/storage/test_local_storage.py @@ -3,7 +3,7 @@ import shutil import tempfile from dataclasses import dataclass, field -from datetime import datetime +from datetime import datetime, timedelta from pathlib import Path from typing import Any, Dict, List from unittest.mock import MagicMock, PropertyMock, patch @@ -34,7 +34,7 @@ from ert.config.gen_kw_config import TransformFunctionDefinition from ert.config.general_observation import GenObservation from ert.config.observation_vector import ObsVector -from ert.storage import ErtStorageException, open_storage +from ert.storage import ErtStorageException, LocalEnsemble, open_storage from ert.storage.local_storage import _LOCAL_STORAGE_VERSION from ert.storage.mode import ModeError from ert.storage.realization_storage_state import RealizationStorageState @@ -108,6 +108,61 @@ def test_that_saving_empty_responses_fails_nicely(tmp_path): ensemble.save_response("RESPONSE", empty_data, 0) +def test_that_saving_response_updates_configs(tmp_path): + with open_storage(tmp_path, mode="w") as storage: + experiment = storage.create_experiment( + responses=[SummaryConfig(keys=["*", "FOPR"], input_files=["not_relevant"])] + ) + ensemble = storage.create_ensemble( + experiment, ensemble_size=1, iteration=0, name="prior" + ) + + summary_df = polars.DataFrame( + { + "response_key": ["FOPR", "FOPT:OP1", "FOPR:OP3", "FLAP", "F*"], + "time": polars.Series( + [datetime(2000, 1, i) for i in range(1, 6)] + ).dt.cast_time_unit("ms"), + "values": polars.Series( + [0.0, 1.0, 2.0, 3.0, 4.0], dtype=polars.Float32 + ), + } + ) + + mapping_before = experiment.response_key_to_response_type + smry_config_before = experiment.response_configuration["summary"] + + assert not ensemble.experiment._has_finalized_response_keys("summary") + ensemble.save_response("summary", summary_df, 0) + + assert ensemble.experiment._has_finalized_response_keys("summary") + assert ensemble.experiment.response_key_to_response_type == { + "FOPR:OP3": "summary", + "F*": "summary", + "FLAP": "summary", + "FOPR": "summary", + "FOPT:OP1": "summary", + } + assert ensemble.experiment.response_type_to_response_keys == { + "summary": ["F*", "FLAP", "FOPR", "FOPR:OP3", "FOPT:OP1"] + } + + mapping_after = experiment.response_key_to_response_type + smry_config_after = experiment.response_configuration["summary"] + + assert set(mapping_before) == set() + assert set(smry_config_before.keys) == {"*", "FOPR"} + + assert set(mapping_after) == {"F*", "FOPR", "FOPT:OP1", "FOPR:OP3", "FLAP"} + assert set(smry_config_after.keys) == { + "FOPR", + "FOPT:OP1", + "FOPR:OP3", + "FLAP", + "F*", + } + + def test_that_saving_empty_parameters_fails_nicely(tmp_path): with open_storage(tmp_path, mode="w") as storage: experiment = storage.create_experiment() @@ -225,6 +280,45 @@ def test_refresh(tmp_path): assert _ensembles(accessor) == _ensembles(reader) +def test_that_reader_storage_reads_most_recent_response_configs(tmp_path): + reader = open_storage(tmp_path, mode="r") + writer = open_storage(tmp_path, mode="w") + + exp = writer.create_experiment( + responses=[SummaryConfig(keys=["*", "FOPR"], input_files=["not_relevant"])], + name="uniq", + ) + ens: LocalEnsemble = exp.create_ensemble(ensemble_size=10, name="uniq_ens") + + reader.refresh() + read_exp = reader.get_experiment_by_name("uniq") + assert read_exp.id == exp.id + + read_smry_config = read_exp.response_configuration["summary"] + assert read_smry_config.keys == ["*", "FOPR"] + assert not read_smry_config.has_finalized_keys + + smry_data = polars.DataFrame( + { + "response_key": ["FOPR", "FOPR", "WOPR", "WOPR", "FOPT", "FOPT"], + "time": polars.Series( + [datetime.now() + timedelta(days=i) for i in range(6)] + ).dt.cast_time_unit("ms"), + "values": polars.Series( + [0.2, 0.2, 1.0, 1.1, 3.3, 3.3], dtype=polars.Float32 + ), + } + ) + + ens.save_response("summary", smry_data, 0) + assert read_smry_config.keys == ["*", "FOPR"] + assert not read_smry_config.has_finalized_keys + + read_smry_config = read_exp.response_configuration["summary"] + assert read_smry_config.keys == ["FOPR", "FOPT", "WOPR"] + assert read_smry_config.has_finalized_keys + + def test_writing_to_read_only_storage_raises(tmp_path): with open_storage(tmp_path, mode="r") as storage, pytest.raises(ModeError): storage.create_experiment() @@ -548,6 +642,7 @@ class Experiment: observations: Dict[str, polars.DataFrame] = field(default_factory=dict) +# @reproduce_failure("6.112.2", b"AXicY2BgZCAHMIL0ARE/mM2ASjKiMUCK+SD6eBkYAAYxADg=") class StatefulStorageTest(RuleBasedStateMachine): """ This test runs several commands against storage and @@ -713,9 +808,27 @@ def get_parameters(self, model_ensemble: Ensemble): parameter_data["values"], ) - @rule( - model_ensemble=ensembles, - summary_data=summaries( + @rule(model_ensemble=ensembles, data=st.data()) + def save_summary(self, model_ensemble: Ensemble, data): + storage_ensemble = self.storage.get_ensemble(model_ensemble.uuid) + storage_experiment = storage_ensemble.experiment + + # Enforce the summary data to respect the + # scheme outlined in the response configs + smry_config = storage_experiment.response_configuration.get("summary") + + if not smry_config: + assume(False) + raise AssertionError() + + expected_summary_keys = ( + st.just(smry_config.keys) + if smry_config.has_finalized_keys + else st.lists(summary_variables(), min_size=1) + ) + + summaries_strategy = summaries( + summary_keys=expected_summary_keys, start_date=st.datetimes( min_value=datetime.strptime("1969-1-1", "%Y-%m-%d"), max_value=datetime.strptime("2010-1-1", "%Y-%m-%d"), @@ -730,11 +843,9 @@ def get_parameters(self, model_ensemble: Ensemble): min_size=2, max_size=10, ), - ), - ) - def save_summary(self, model_ensemble: Ensemble, summary_data): - storage_ensemble = self.storage.get_ensemble(model_ensemble.uuid) - storage_experiment = storage_ensemble.experiment + ) + summary_data = data.draw(summaries_strategy) + responses = storage_experiment.response_configuration.values() summary_configs = [p for p in responses if isinstance(p, SummaryConfig)] assume(summary_configs) @@ -754,6 +865,17 @@ def save_summary(self, model_ensemble: Ensemble, summary_data): model_ensemble.response_values[summary.name] = ds + model_experiment = self.model[storage_experiment.id] + response_keys = set(ds["response_key"].unique()) + + model_smry_config = next( + config for config in model_experiment.responses if config.name == "summary" + ) + + if not model_smry_config.has_finalized_keys: + model_smry_config.keys = sorted(response_keys) + model_smry_config.has_finalized_keys = True + @rule(model_ensemble=ensembles) def get_responses(self, model_ensemble: Ensemble): storage_ensemble = self.storage.get_ensemble(model_ensemble.uuid) @@ -789,11 +911,24 @@ def create_ensemble(self, model_experiment: Experiment, ensemble_size: int): model_ensemble = Ensemble(ensemble.id) model_experiment.ensembles[ensemble.id] = model_ensemble - assert ( - ensemble.get_ensemble_state() - == [RealizationStorageState.UNDEFINED] * ensemble_size + is_expecting_responses = any( + len(config.keys) for config in model_experiment.responses ) - assert np.all(np.logical_not(ensemble.get_realization_mask_with_responses())) + + if is_expecting_responses: + assert ( + ensemble.get_ensemble_state() + == [RealizationStorageState.UNDEFINED] * ensemble_size + ) + assert np.all( + np.logical_not(ensemble.get_realization_mask_with_responses()) + ) + else: + assert ( + ensemble.get_ensemble_state() + == [RealizationStorageState.HAS_DATA] * ensemble_size + ) + assert np.all(ensemble.get_realization_mask_with_responses()) return model_ensemble @@ -812,17 +947,36 @@ def create_ensemble_from_prior(self, prior: Ensemble): model_ensemble = Ensemble(ensemble.id) model_experiment = self.model[experiment_id] model_experiment.ensembles[ensemble.id] = model_ensemble - state = [RealizationStorageState.PARENT_FAILURE] * size - iens = 0 - if ( + + expected_posterior_state = RealizationStorageState.PARENT_FAILURE + prior_keys = list(prior.response_values.keys()) + + is_expecting_responses = ( + sum(len(config.keys) for config in model_experiment.responses) > 0 + ) + + if not is_expecting_responses: + # Expect a HAS_DATA no matter what + expected_posterior_state = RealizationStorageState.HAS_DATA + elif ( + bool(prior_keys) + and ( + prior_keys + == [ + r.name + for r in model_experiment.responses + if (r.has_finalized_keys and len(r.keys) > 0) + ] + ) + ) or ( list(prior.response_values.keys()) == [r.name for r in model_experiment.responses] - and iens not in prior.failure_messages - and prior_ensemble.get_ensemble_state()[iens] + and 0 not in prior.failure_messages + and prior_ensemble.get_ensemble_state()[0] != RealizationStorageState.PARENT_FAILURE ): - state[iens] = RealizationStorageState.UNDEFINED - assert ensemble.get_ensemble_state() == state + expected_posterior_state = RealizationStorageState.UNDEFINED + assert ensemble.get_ensemble_state()[0] == expected_posterior_state return model_ensemble diff --git a/tests/ert/unit_tests/storage/test_storage_migration.py b/tests/ert/unit_tests/storage/test_storage_migration.py index 6c7e8b4854c..6a627107f5a 100644 --- a/tests/ert/unit_tests/storage/test_storage_migration.py +++ b/tests/ert/unit_tests/storage/test_storage_migration.py @@ -195,7 +195,10 @@ def test_that_storage_matches( "gen_data", ) - assert ensemble.get_summary_keyset() == ["FOPR"] + assert not ensemble.experiment._has_finalized_response_keys("summary") + ensemble.save_response("summary", ensemble.load_responses("summary", (0,)), 0) + assert ensemble.experiment._has_finalized_response_keys("summary") + assert ensemble.experiment.response_type_to_response_keys["summary"] == ["FOPR"] @pytest.mark.integration_test diff --git a/tests/ert/unit_tests/test_libres_facade.py b/tests/ert/unit_tests/test_libres_facade.py index be24bc0d8ae..3cc2787777a 100644 --- a/tests/ert/unit_tests/test_libres_facade.py +++ b/tests/ert/unit_tests/test_libres_facade.py @@ -32,11 +32,21 @@ def empty_case(facade, storage): def test_keyword_type_checks(snake_oil_default_storage): - assert "BPR:1,3,8" in snake_oil_default_storage.get_summary_keyset() + assert ( + "BPR:1,3,8" + in snake_oil_default_storage.experiment.response_type_to_response_keys[ + "summary" + ] + ) def test_keyword_type_checks_missing_key(snake_oil_default_storage): - assert "nokey" not in snake_oil_default_storage.get_summary_keyset() + assert ( + "nokey" + not in snake_oil_default_storage.experiment.response_type_to_response_keys[ + "summary" + ] + ) @pytest.mark.filterwarnings("ignore:.*Use load_responses.*:DeprecationWarning") From 50be1d8e691ecc39fa395326d0ef575da9ed1427 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Tue, 22 Oct 2024 10:31:47 +0200 Subject: [PATCH 2/3] Avoid redundant loading of datasets --- src/ert/dark_storage/common.py | 114 +++++++++++++++++++-------------- 1 file changed, 65 insertions(+), 49 deletions(-) diff --git a/src/ert/dark_storage/common.py b/src/ert/dark_storage/common.py index 2d7753956d5..b5059703acf 100644 --- a/src/ert/dark_storage/common.py +++ b/src/ert/dark_storage/common.py @@ -101,35 +101,75 @@ def data_for_key( """Returns a pandas DataFrame with the datapoints for a given key for a given ensemble. The row index is the realization number, and the columns are an index over the indexes/dates""" + if key.startswith("LOG10_"): key = key[6:] - try: - summary_data = ensemble.load_responses( - "summary", tuple(ensemble.get_realization_list_with_responses("summary")) - ) - summary_keys = summary_data["response_key"].unique().to_list() - except (ValueError, KeyError, polars.exceptions.ColumnNotFoundError): - summary_data = polars.DataFrame() - summary_keys = [] - - if key in summary_keys: - df = ( - summary_data.filter(polars.col("response_key").eq(key)) - .rename({"time": "Date", "realization": "Realization"}) - .drop("response_key") - .to_pandas() + response_key_to_response_type = ensemble.experiment.response_key_to_response_type + + # Check for exact match first. For example if key is "FOPRH" + # it may stop at "FOPR", which would be incorrect + response_key = next((k for k in response_key_to_response_type if k == key), None) + if response_key is None: + response_key = next( + (k for k in response_key_to_response_type if k in key), None ) - df = df.set_index(["Date", "Realization"]) - # This performs the same aggragation by mean of duplicate values - # as in ert/analysis/_es_update.py - df = df.groupby(["Date", "Realization"]).mean() - data = df.unstack(level="Date") - data.columns = data.columns.droplevel(0) - try: - return data.astype(float) - except ValueError: - return data + + if response_key is not None: + response_type = response_key_to_response_type[response_key] + + if response_type == "summary": + summary_data = ensemble.load_responses( + response_key, + tuple(ensemble.get_realization_list_with_responses(response_key)), + ) + if summary_data.is_empty(): + return pd.DataFrame() + + df = ( + summary_data.rename({"time": "Date", "realization": "Realization"}) + .drop("response_key") + .to_pandas() + ) + df = df.set_index(["Date", "Realization"]) + # This performs the same aggragation by mean of duplicate values + # as in ert/analysis/_es_update.py + df = df.groupby(["Date", "Realization"]).mean() + data = df.unstack(level="Date") + data.columns = data.columns.droplevel(0) + try: + return data.astype(float) + except ValueError: + return data + + if response_type == "gen_data": + try: + # Call below will ValueError if key ends with H, + # requested via PlotAPI.history_data + response_key, report_step = displayed_key_to_response_key["gen_data"]( + key + ) + mask = ensemble.get_realization_mask_with_responses(response_key) + realizations = np.where(mask)[0] + data = ensemble.load_responses(response_key, tuple(realizations)) + except ValueError as err: + print(f"Could not load response {key}: {err}") + return pd.DataFrame() + + try: + vals = data.filter(polars.col("report_step").eq(report_step)) + pivoted = vals.drop("response_key", "report_step").pivot( + on="index", values="values" + ) + data = pivoted.to_pandas().set_index("realization") + data.columns = data.columns.astype(int) + data.columns.name = "axis" + try: + return data.astype(float) + except ValueError: + return data + except (ValueError, KeyError): + return pd.DataFrame() group = key.split(":")[0] parameters = ensemble.experiment.parameter_configuration @@ -168,30 +208,6 @@ def data_for_key( return data.astype(float) except ValueError: return data - if key in gen_data_keys(ensemble): - response_key, report_step = displayed_key_to_response_key["gen_data"](key) - try: - mask = ensemble.get_realization_mask_with_responses(response_key) - realizations = np.where(mask)[0] - data = ensemble.load_responses(response_key, tuple(realizations)) - except ValueError as err: - print(f"Could not load response {key}: {err}") - return pd.DataFrame() - - try: - vals = data.filter(polars.col("report_step").eq(report_step)) - pivoted = vals.drop("response_key", "report_step").pivot( - on="index", values="values" - ) - data = pivoted.to_pandas().set_index("realization") - data.columns = data.columns.astype(int) - data.columns.name = "axis" - try: - return data.astype(float) - except ValueError: - return data - except (ValueError, KeyError): - return pd.DataFrame() return pd.DataFrame() From 38146472dd0d1fbf75bdd1140f17905878915500 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Wed, 23 Oct 2024 11:12:05 +0200 Subject: [PATCH 3/3] Log when dark storage fails to load response --- src/ert/dark_storage/common.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/ert/dark_storage/common.py b/src/ert/dark_storage/common.py index b5059703acf..3129b16634a 100644 --- a/src/ert/dark_storage/common.py +++ b/src/ert/dark_storage/common.py @@ -1,4 +1,5 @@ import contextlib +import logging from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple from uuid import UUID @@ -11,6 +12,8 @@ from ert.config.field import Field from ert.storage import Ensemble, Experiment, Storage +logger = logging.getLogger(__name__) + response_key_to_displayed_key: Dict[str, Callable[[Tuple[Any, ...]], str]] = { "summary": lambda t: t[0], "gen_data": lambda t: f"{t[0]}@{t[1]}", @@ -153,7 +156,7 @@ def data_for_key( realizations = np.where(mask)[0] data = ensemble.load_responses(response_key, tuple(realizations)) except ValueError as err: - print(f"Could not load response {key}: {err}") + logger.info(f"Dark storage could not load response {key}: {err}") return pd.DataFrame() try: