From 74aff2c4fbf6231e77ee7a10ac1713bd8def7b2a Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Fri, 13 Sep 2024 12:44:25 +0200 Subject: [PATCH] Fixup store responses with parquet --- src/ert/analysis/_es_update.py | 99 +++++++++++++++---------------- src/ert/config/response_config.py | 6 ++ src/ert/config/summary_config.py | 4 ++ 3 files changed, 59 insertions(+), 50 deletions(-) diff --git a/src/ert/analysis/_es_update.py b/src/ert/analysis/_es_update.py index f4c798e5bdf..1c1fb289b44 100644 --- a/src/ert/analysis/_es_update.py +++ b/src/ert/analysis/_es_update.py @@ -155,64 +155,63 @@ def _get_observations_and_responses( observation_errors = [] indexes = [] observations_by_type = ensemble.experiment.observations - for response_type in ensemble.experiment.response_info: + for ( + response_type, + response_cls, + ) in ensemble.experiment.response_configuration.items(): observations_for_type = observations_by_type[response_type].filter( polars.col("observation_key").is_in(selected_observations) ) responses_for_type = ensemble.load_responses( response_type, realizations=tuple(iens_active_index) ) - joined = observations_for_type.join(responses_for_type, how="left") - - # - # observation = None - # # group = observation.attrs["response"] - # all_responses = ensemble.load_responses(group, tuple(iens_active_index)) - # if "time" in observation.coords: - # all_responses = all_responses.reindex( - # time=observation.time, - # method="nearest", - # tolerance="1s", - # ) - # try: - # observations_and_responses = observation.merge(all_responses, join="left") - # except KeyError as e: - # raise ErtAnalysisError( - # f"Mismatched index for: " - # f"Observation: {obs} attached to response: {group}" - # ) from e - # - # observation_keys.append([obs] * observations_and_responses["observations"].size) - # - # if group == "summary": - # indexes.append( - # [ - # np.datetime_as_string(e, unit="s") - # for e in observations_and_responses["time"].data - # ] - # ) - # else: - # indexes.append( - # [ - # f"{e[0]}, {e[1]}" - # for e in zip( - # list(observations_and_responses["report_step"].data) - # * len(observations_and_responses["index"].data), - # observations_and_responses["index"].data, - # ) - # ] - # ) - observations_and_responses = None - observation_values.append( - observations_and_responses["observations"].data.ravel() + pivoted = responses_for_type.pivot( + on="realization", index=["response_key", *response_cls.primary_key] ) - observation_errors.append(observations_and_responses["std"].data.ravel()) - filtered_responses.append( - observations_and_responses["values"] - .transpose(..., "realization") - .values.reshape((-1, len(observations_and_responses.realization))) - ) + # Note2reviewer: + # We need to either assume that if there is a time column + # we will approx-join that, or we could specify in response configs + # that there is a column that requires an approx "asof" join. + # Suggest we simplify and assume that there is always only + # one "time" column, which we will reindex towards the response dataset + # with a given resolution + if "time" in pivoted: + joined = observations_for_type.join_asof( + pivoted, + by=["response_key", *response_cls.primary_key], + on="time", + tolerance="1s", + ) + else: + joined = observations_for_type.join( + pivoted, + how="left", + on=["response_key", *response_cls.primary_key], + ) + + index_1d = joined.with_columns( + polars.concat_str(response_cls.primary_key, separator=", ").alias("index") + )["index"].to_numpy() + + obs_keys_1d = joined["observation_key"].to_numpy() + obs_values_1d = joined["observations"].to_numpy() + obs_errors_1d = joined["std"].to_numpy() + + # 4 columns are always there: + # [ response_key, observation_key, observations, std ] + # + one column per "primary key" column + num_non_response_value_columns = 4 + len(response_cls.primary_key) + responses = joined.select( + joined.columns[num_non_response_value_columns:] + ).to_numpy() + + filtered_responses.append(responses) + observation_keys.append(obs_keys_1d) + observation_values.append(obs_values_1d) + observation_errors.append(obs_errors_1d) + indexes.append(index_1d) + ensemble.load_responses.cache_clear() return ( np.concatenate(filtered_responses), diff --git a/src/ert/config/response_config.py b/src/ert/config/response_config.py index 55d176a6bd0..9d807d84435 100644 --- a/src/ert/config/response_config.py +++ b/src/ert/config/response_config.py @@ -35,6 +35,12 @@ def response_type(self) -> str: Must not overlap with that of other response configs.""" ... + @property + @abstractmethod + def primary_key(self) -> List[str]: + """Primary key of this response data. + For example 'time' for summary and ['index','report_step'] for gen data""" + @classmethod @abstractmethod def from_config_dict(cls, config_dict: ConfigDict) -> Optional[Self]: diff --git a/src/ert/config/summary_config.py b/src/ert/config/summary_config.py index 3b0c0bfaa3e..3477e72c016 100644 --- a/src/ert/config/summary_config.py +++ b/src/ert/config/summary_config.py @@ -64,6 +64,10 @@ def read_from_file(self, run_path: str, iens: int) -> polars.DataFrame: def response_type(self) -> str: return "summary" + @property + def primary_key(self) -> List[str]: + return ["time"] + @classmethod def from_config_dict(self, config_dict: ConfigDict) -> Optional[SummaryConfig]: refcase = Refcase.from_config_dict(config_dict)