Skip to content

Commit

Permalink
Fixup store responses with parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
Yngve S. Kristiansen committed Sep 13, 2024
1 parent 83caa84 commit 8bbedce
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 50 deletions.
99 changes: 49 additions & 50 deletions src/ert/analysis/_es_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,64 +155,63 @@ def _get_observations_and_responses(
observation_errors = []
indexes = []
observations_by_type = ensemble.experiment.observations
for response_type in ensemble.experiment.response_info:
for (
response_type,
response_cls,
) in ensemble.experiment.response_configuration.items():
observations_for_type = observations_by_type[response_type].filter(
polars.col("observation_key").is_in(selected_observations)
)
responses_for_type = ensemble.load_responses(
response_type, realizations=tuple(iens_active_index)
)
joined = observations_for_type.join(responses_for_type, how="left")

#
# observation = None
# # group = observation.attrs["response"]
# all_responses = ensemble.load_responses(group, tuple(iens_active_index))
# if "time" in observation.coords:
# all_responses = all_responses.reindex(
# time=observation.time,
# method="nearest",
# tolerance="1s",
# )
# try:
# observations_and_responses = observation.merge(all_responses, join="left")
# except KeyError as e:
# raise ErtAnalysisError(
# f"Mismatched index for: "
# f"Observation: {obs} attached to response: {group}"
# ) from e
#
# observation_keys.append([obs] * observations_and_responses["observations"].size)
#
# if group == "summary":
# indexes.append(
# [
# np.datetime_as_string(e, unit="s")
# for e in observations_and_responses["time"].data
# ]
# )
# else:
# indexes.append(
# [
# f"{e[0]}, {e[1]}"
# for e in zip(
# list(observations_and_responses["report_step"].data)
# * len(observations_and_responses["index"].data),
# observations_and_responses["index"].data,
# )
# ]
# )
observations_and_responses = None
observation_values.append(
observations_and_responses["observations"].data.ravel()
pivoted = responses_for_type.pivot(
on="realization", index=["response_key", *response_cls.primary_key]
)
observation_errors.append(observations_and_responses["std"].data.ravel())

filtered_responses.append(
observations_and_responses["values"]
.transpose(..., "realization")
.values.reshape((-1, len(observations_and_responses.realization)))
)
# Note2reviewer:
# We need to either assume that if there is a time column
# we will approx-join that, or we could specify in response configs
# that there is a column that requires an approx "asof" join.
# Suggest we simplify and assume that there is always only
# one "time" column, which we will reindex towards the response dataset
# with a given resolution
if "time" in pivoted:
joined = observations_for_type.join_asof(
pivoted,
by=["response_key", *response_cls.primary_key],
on="time",
tolerance="1s",
)
else:
joined = observations_for_type.join(
pivoted,
how="left",
on=["response_key", *response_cls.primary_key],
)

index_1d = joined.with_columns(
polars.concat_str(response_cls.primary_key, separator=", ").alias("index")
)["index"].to_numpy()

obs_keys_1d = joined["observation_key"].to_numpy()
obs_values_1d = joined["observations"].to_numpy()
obs_errors_1d = joined["std"].to_numpy()

# 4 columns are always there:
# [ response_key, observation_key, observations, std ]
# + one column per "primary key" column
num_non_response_value_columns = 4 + len(response_cls.primary_key)
responses = joined.select(
joined.columns[num_non_response_value_columns:]
).to_numpy()

filtered_responses.append(responses)
observation_keys.append(obs_keys_1d)
observation_values.append(obs_values_1d)
observation_errors.append(obs_errors_1d)
indexes.append(index_1d)

ensemble.load_responses.cache_clear()
return (
np.concatenate(filtered_responses),
Expand Down
6 changes: 6 additions & 0 deletions src/ert/config/response_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ def response_type(self) -> str:
Must not overlap with that of other response configs."""
...

@property
@abstractmethod
def primary_key(self) -> List[str]:
"""Primary key of this response data.
For example 'time' for summary and ['index','report_step'] for gen data"""

@classmethod
@abstractmethod
def from_config_dict(cls, config_dict: ConfigDict) -> Optional[Self]:
Expand Down
4 changes: 4 additions & 0 deletions src/ert/config/summary_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ def read_from_file(self, run_path: str, iens: int) -> polars.DataFrame:
def response_type(self) -> str:
return "summary"

@property
def primary_key(self) -> List[str]:
return ["time"]

@classmethod
def from_config_dict(self, config_dict: ConfigDict) -> Optional[SummaryConfig]:
refcase = Refcase.from_config_dict(config_dict)
Expand Down

0 comments on commit 8bbedce

Please sign in to comment.