Skip to content

Commit

Permalink
Return dataframe @ _get_observations_and_responses
Browse files Browse the repository at this point in the history
  • Loading branch information
yngve-sk committed Oct 10, 2024
1 parent 6458ddc commit 603d12d
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 64 deletions.
79 changes: 38 additions & 41 deletions src/ert/analysis/_es_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,20 +140,11 @@ def _get_observations_and_responses(
ensemble: Ensemble,
selected_observations: Iterable[str],
iens_active_index: npt.NDArray[np.int_],
) -> Tuple[
npt.NDArray[np.float64],
npt.NDArray[np.float64],
npt.NDArray[np.float64],
npt.NDArray[np.str_],
npt.NDArray[np.str_],
]:
) -> polars.DataFrame:
"""Fetches and aligns selected observations with their corresponding simulated responses from an ensemble."""
filtered_responses = []
observation_keys = []
observation_values = []
observation_errors = []
indexes = []
observations_by_type = ensemble.experiment.observations

df = polars.DataFrame()
for (
response_type,
response_cls,
Expand Down Expand Up @@ -197,38 +188,31 @@ def _get_observations_and_responses(
on=["response_key", *response_cls.primary_key],
)

joined = joined.sort(by="observation_key")

index_1d = joined.with_columns(
polars.concat_str(response_cls.primary_key, separator=", ").alias("index")
)["index"].to_numpy()

obs_keys_1d = joined["observation_key"].to_numpy()
obs_values_1d = joined["observations"].to_numpy()
obs_errors_1d = joined["std"].to_numpy()
joined = (
joined.with_columns(
polars.concat_str(response_cls.primary_key, separator=", ").alias(
"__tmp_index_key__" # Avoid potential collisions w/ primary key
)
)
.drop(response_cls.primary_key)
.rename({"__tmp_index_key__": "index"})
)

# 4 columns are always there:
# [ response_key, observation_key, observations, std ]
# + one column per "primary key" column
num_non_response_value_columns = 4 + len(response_cls.primary_key)
responses = joined.select(
joined.columns[num_non_response_value_columns:]
).to_numpy()
first_columns = [
"response_key",
"index",
"observation_key",
"observations",
"std",
]
joined = joined.select(
first_columns + [c for c in joined.columns if c not in first_columns]
)

filtered_responses.append(responses)
observation_keys.append(obs_keys_1d)
observation_values.append(obs_values_1d)
observation_errors.append(obs_errors_1d)
indexes.append(index_1d)
df.vstack(joined, in_place=True)

ensemble.load_responses.cache_clear()
return (
np.concatenate(filtered_responses),
np.concatenate(observation_values),
np.concatenate(observation_errors),
np.concatenate(observation_keys),
np.concatenate(indexes),
)
return df


def _expand_wildcards(
Expand Down Expand Up @@ -270,12 +254,25 @@ def _load_observations_and_responses(
List[ObservationAndResponseSnapshot],
],
]:
S, observations, errors, obs_keys, indexes = _get_observations_and_responses(
# cols: response_key, index, observation_key, observations, std, *[1, ...nreals]
observations_and_responses = _get_observations_and_responses(
ensemble,
selected_observations,
iens_active_index,
)

S = observations_and_responses.select(
observations_and_responses.columns[5:]
).to_numpy()
observations = (
observations_and_responses.select("observations").to_numpy().reshape((-1,))
)
errors = observations_and_responses.select("std").to_numpy().reshape((-1,))
obs_keys = (
observations_and_responses.select("observation_key").to_numpy().reshape((-1,))
)
indexes = observations_and_responses.select("index").to_numpy().reshape((-1,))

# Inflating measurement errors by a factor sqrt(global_std_scaling) as shown
# in for example evensen2018 - Analysis of iterative ensemble smoothers for
# solving inverse problems.
Expand Down
42 changes: 19 additions & 23 deletions tests/ert/unit_tests/analysis/test_es_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,11 +586,7 @@ def test_temporary_parameter_storage_with_inactive_fields(


def _mock_load_observations_and_responses(
S,
observations,
errors,
obs_keys,
indexes,
observations_and_responses,
alpha,
std_cutoff,
global_std_scaling,
Expand All @@ -605,15 +601,15 @@ def _mock_load_observations_and_responses(
with patch(
"ert.analysis._es_update._get_observations_and_responses"
) as mock_obs_n_responses:
mock_obs_n_responses.return_value = (S, observations, errors, obs_keys, indexes)
mock_obs_n_responses.return_value = observations_and_responses

return _load_observations_and_responses(
ensemble=ensemble,
alpha=alpha,
std_cutoff=std_cutoff,
global_std_scaling=global_std_scaling,
iens_active_index=np.array([True] * len(observations)),
selected_observations=obs_keys,
iens_active_index=np.array([True] * len(observations_and_responses)),
selected_observations=observations_and_responses.select("observation_key"),
auto_scale_observations=auto_scale_observations,
progress_callback=progress_callback,
)
Expand All @@ -627,11 +623,19 @@ def test_that_autoscaling_applies_to_scaled_errors(storage):
np.array([1, 1]),
)

S = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])
observations = np.array([2, 4, 3, 3])
errors = np.array([1, 2, 1, 1])
obs_keys = np.array(["obs1_1", "obs1_2", "obs2", "obs2"])
indexes = np.array(["rs00", "rs0", "rs0", "rs1"])
observations_and_responses = polars.DataFrame(
{
"response_key": ["RESPONSE", "RESPONSE", "RESPONSE", "RESPONSE"],
"index": ["rs00", "rs0", "rs0", "rs1"],
"observation_key": ["obs1_1", "obs1_2", "obs2", "obs2"],
"observations": polars.Series([2, 4, 3, 3], dtype=polars.Float32),
"std": polars.Series([1, 2, 1, 1], dtype=polars.Float32),
"1": polars.Series([1, 4, 7, 8], dtype=polars.Float32),
"2": polars.Series([2, 5, 8, 11], dtype=polars.Float32),
"3": polars.Series([3, 6, 9, 12], dtype=polars.Float32),
}
)

alpha = 1
std_cutoff = 0.05
global_std_scaling = 1
Expand All @@ -640,11 +644,7 @@ def test_that_autoscaling_applies_to_scaled_errors(storage):
experiment = storage.create_experiment(name="dummyexp")
ensemble = experiment.create_ensemble(name="dummy", ensemble_size=10)
_, (_, scaled_errors_with_autoscale, _) = _mock_load_observations_and_responses(
S=S,
observations=observations,
errors=errors,
obs_keys=obs_keys,
indexes=indexes,
observations_and_responses,
alpha=alpha,
std_cutoff=std_cutoff,
global_std_scaling=global_std_scaling,
Expand All @@ -655,11 +655,7 @@ def test_that_autoscaling_applies_to_scaled_errors(storage):

_, (_, scaled_errors_without_autoscale, _) = (
_mock_load_observations_and_responses(
S=S,
observations=observations,
errors=errors,
obs_keys=obs_keys,
indexes=indexes,
observations_and_responses,
alpha=alpha,
std_cutoff=std_cutoff,
global_std_scaling=global_std_scaling,
Expand Down

0 comments on commit 603d12d

Please sign in to comment.