From 603d12da82830929a9e7cd5f6d87deb53b6e1e11 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Wed, 9 Oct 2024 14:10:47 +0200 Subject: [PATCH] Return dataframe @ _get_observations_and_responses --- src/ert/analysis/_es_update.py | 79 +++++++++---------- .../ert/unit_tests/analysis/test_es_update.py | 42 +++++----- 2 files changed, 57 insertions(+), 64 deletions(-) diff --git a/src/ert/analysis/_es_update.py b/src/ert/analysis/_es_update.py index 1b157218b38..45c2bd69624 100644 --- a/src/ert/analysis/_es_update.py +++ b/src/ert/analysis/_es_update.py @@ -140,20 +140,11 @@ def _get_observations_and_responses( ensemble: Ensemble, selected_observations: Iterable[str], iens_active_index: npt.NDArray[np.int_], -) -> Tuple[ - npt.NDArray[np.float64], - npt.NDArray[np.float64], - npt.NDArray[np.float64], - npt.NDArray[np.str_], - npt.NDArray[np.str_], -]: +) -> polars.DataFrame: """Fetches and aligns selected observations with their corresponding simulated responses from an ensemble.""" - filtered_responses = [] - observation_keys = [] - observation_values = [] - observation_errors = [] - indexes = [] observations_by_type = ensemble.experiment.observations + + df = polars.DataFrame() for ( response_type, response_cls, @@ -197,38 +188,31 @@ def _get_observations_and_responses( on=["response_key", *response_cls.primary_key], ) - joined = joined.sort(by="observation_key") - - index_1d = joined.with_columns( - polars.concat_str(response_cls.primary_key, separator=", ").alias("index") - )["index"].to_numpy() - - obs_keys_1d = joined["observation_key"].to_numpy() - obs_values_1d = joined["observations"].to_numpy() - obs_errors_1d = joined["std"].to_numpy() + joined = ( + joined.with_columns( + polars.concat_str(response_cls.primary_key, separator=", ").alias( + "__tmp_index_key__" # Avoid potential collisions w/ primary key + ) + ) + .drop(response_cls.primary_key) + .rename({"__tmp_index_key__": "index"}) + ) - # 4 columns are always there: - # [ response_key, observation_key, observations, std ] - # + one column per "primary key" column - num_non_response_value_columns = 4 + len(response_cls.primary_key) - responses = joined.select( - joined.columns[num_non_response_value_columns:] - ).to_numpy() + first_columns = [ + "response_key", + "index", + "observation_key", + "observations", + "std", + ] + joined = joined.select( + first_columns + [c for c in joined.columns if c not in first_columns] + ) - filtered_responses.append(responses) - observation_keys.append(obs_keys_1d) - observation_values.append(obs_values_1d) - observation_errors.append(obs_errors_1d) - indexes.append(index_1d) + df.vstack(joined, in_place=True) ensemble.load_responses.cache_clear() - return ( - np.concatenate(filtered_responses), - np.concatenate(observation_values), - np.concatenate(observation_errors), - np.concatenate(observation_keys), - np.concatenate(indexes), - ) + return df def _expand_wildcards( @@ -270,12 +254,25 @@ def _load_observations_and_responses( List[ObservationAndResponseSnapshot], ], ]: - S, observations, errors, obs_keys, indexes = _get_observations_and_responses( + # cols: response_key, index, observation_key, observations, std, *[1, ...nreals] + observations_and_responses = _get_observations_and_responses( ensemble, selected_observations, iens_active_index, ) + S = observations_and_responses.select( + observations_and_responses.columns[5:] + ).to_numpy() + observations = ( + observations_and_responses.select("observations").to_numpy().reshape((-1,)) + ) + errors = observations_and_responses.select("std").to_numpy().reshape((-1,)) + obs_keys = ( + observations_and_responses.select("observation_key").to_numpy().reshape((-1,)) + ) + indexes = observations_and_responses.select("index").to_numpy().reshape((-1,)) + # Inflating measurement errors by a factor sqrt(global_std_scaling) as shown # in for example evensen2018 - Analysis of iterative ensemble smoothers for # solving inverse problems. diff --git a/tests/ert/unit_tests/analysis/test_es_update.py b/tests/ert/unit_tests/analysis/test_es_update.py index 93fc9184c0b..14cc02d125e 100644 --- a/tests/ert/unit_tests/analysis/test_es_update.py +++ b/tests/ert/unit_tests/analysis/test_es_update.py @@ -586,11 +586,7 @@ def test_temporary_parameter_storage_with_inactive_fields( def _mock_load_observations_and_responses( - S, - observations, - errors, - obs_keys, - indexes, + observations_and_responses, alpha, std_cutoff, global_std_scaling, @@ -605,15 +601,15 @@ def _mock_load_observations_and_responses( with patch( "ert.analysis._es_update._get_observations_and_responses" ) as mock_obs_n_responses: - mock_obs_n_responses.return_value = (S, observations, errors, obs_keys, indexes) + mock_obs_n_responses.return_value = observations_and_responses return _load_observations_and_responses( ensemble=ensemble, alpha=alpha, std_cutoff=std_cutoff, global_std_scaling=global_std_scaling, - iens_active_index=np.array([True] * len(observations)), - selected_observations=obs_keys, + iens_active_index=np.array([True] * len(observations_and_responses)), + selected_observations=observations_and_responses.select("observation_key"), auto_scale_observations=auto_scale_observations, progress_callback=progress_callback, ) @@ -627,11 +623,19 @@ def test_that_autoscaling_applies_to_scaled_errors(storage): np.array([1, 1]), ) - S = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]]) - observations = np.array([2, 4, 3, 3]) - errors = np.array([1, 2, 1, 1]) - obs_keys = np.array(["obs1_1", "obs1_2", "obs2", "obs2"]) - indexes = np.array(["rs00", "rs0", "rs0", "rs1"]) + observations_and_responses = polars.DataFrame( + { + "response_key": ["RESPONSE", "RESPONSE", "RESPONSE", "RESPONSE"], + "index": ["rs00", "rs0", "rs0", "rs1"], + "observation_key": ["obs1_1", "obs1_2", "obs2", "obs2"], + "observations": polars.Series([2, 4, 3, 3], dtype=polars.Float32), + "std": polars.Series([1, 2, 1, 1], dtype=polars.Float32), + "1": polars.Series([1, 4, 7, 8], dtype=polars.Float32), + "2": polars.Series([2, 5, 8, 11], dtype=polars.Float32), + "3": polars.Series([3, 6, 9, 12], dtype=polars.Float32), + } + ) + alpha = 1 std_cutoff = 0.05 global_std_scaling = 1 @@ -640,11 +644,7 @@ def test_that_autoscaling_applies_to_scaled_errors(storage): experiment = storage.create_experiment(name="dummyexp") ensemble = experiment.create_ensemble(name="dummy", ensemble_size=10) _, (_, scaled_errors_with_autoscale, _) = _mock_load_observations_and_responses( - S=S, - observations=observations, - errors=errors, - obs_keys=obs_keys, - indexes=indexes, + observations_and_responses, alpha=alpha, std_cutoff=std_cutoff, global_std_scaling=global_std_scaling, @@ -655,11 +655,7 @@ def test_that_autoscaling_applies_to_scaled_errors(storage): _, (_, scaled_errors_without_autoscale, _) = ( _mock_load_observations_and_responses( - S=S, - observations=observations, - errors=errors, - obs_keys=obs_keys, - indexes=indexes, + observations_and_responses, alpha=alpha, std_cutoff=std_cutoff, global_std_scaling=global_std_scaling,