From 225d6555cffdaa9d0a737f965a61b2e416153bb2 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Mon, 9 Sep 2024 13:48:14 +0200 Subject: [PATCH] Store & plot scaling factors Save & plot scaling factors --- src/ert/analysis/_es_update.py | 20 +++ .../manage_experiments/storage_info_widget.py | 49 +++++++ src/ert/storage/local_ensemble.py | 17 +++ .../analysis/test_adaptive_localization.py | 128 ++++++++++++++++ tests/unit_tests/analysis/test_es_update.py | 9 +- .../gui/tools/test_manage_experiments_tool.py | 137 +++++++++++++++++- 6 files changed, 357 insertions(+), 3 deletions(-) diff --git a/src/ert/analysis/_es_update.py b/src/ert/analysis/_es_update.py index 9e3a1eb5baa..4fd29651dec 100644 --- a/src/ert/analysis/_es_update.py +++ b/src/ert/analysis/_es_update.py @@ -18,6 +18,7 @@ import iterative_ensemble_smoother as ies import numpy as np +import pandas as pd import psutil from iterative_ensemble_smoother.experimental import ( AdaptiveESMDA, @@ -272,6 +273,8 @@ def _load_observations_and_responses( obs_mask = np.logical_and(ens_mean_mask, ens_std_mask) if auto_scale_observations: + scaling_factors_dfs = [] + for input_group in auto_scale_observations: group = _expand_wildcards(obs_keys, input_group) logger.info(f"Scaling observation group: {group}") @@ -283,6 +286,18 @@ def _load_observations_and_responses( S[obs_group_mask], scaled_errors[obs_group_mask] ) scaling[obs_group_mask] *= scaling_factors + + scaling_factors_dfs.append( + pd.DataFrame( + data={ + "input_group": [", ".join(input_group)] * len(scaling_factors), + "index": indexes[obs_group_mask], + "obs_key": obs_keys[obs_group_mask], + "scaling_factor": scaling_factors, + } + ) + ) + progress_callback( AnalysisDataEvent( name="Auto scale: " + ", ".join(input_group), @@ -307,6 +322,11 @@ def _load_observations_and_responses( ) ) + scaling_factors_df = pd.concat(scaling_factors_dfs).set_index( + ["input_group", "obs_key", "index"], verify_integrity=True + ) + ensemble.save_observation_scaling_factors(scaling_factors_df.to_xarray()) + # Recompute with updated scales scaled_errors = errors * scaling diff --git a/src/ert/gui/tools/manage_experiments/storage_info_widget.py b/src/ert/gui/tools/manage_experiments/storage_info_widget.py index 6dcacd78a16..a589dbd4da5 100644 --- a/src/ert/gui/tools/manage_experiments/storage_info_widget.py +++ b/src/ert/gui/tools/manage_experiments/storage_info_widget.py @@ -1,5 +1,6 @@ import json from enum import IntEnum +from functools import reduce from typing import Optional import numpy as np @@ -196,6 +197,53 @@ def _currentItemChanged( tuple(self._ensemble.get_realization_list_with_responses()), ) + scaling_ds = self._ensemble.load_observation_scaling_factors() + + def _try_render_scaled_obs() -> None: + if scaling_ds is None: + return None + + _obs_df = observation_ds.to_dataframe().reset_index() + # Should store scaling by response type + # and use primary key for the response to + # create the index key. + index_cols = list( + set(observation_ds.to_dataframe().reset_index().columns) + - { + "observations", + "std", + } + ) + + # Just to ensure report step comes first as in _es_update + if "report_step" in index_cols: + index_cols = ["report_step", "index"] + + # for summary there is only "time" + index_key = ", ".join([str(_obs_df[x].values[0]) for x in index_cols]) + scaling_factors = ( + scaling_ds.sel(obs_key=observation_name, drop=True) + .sel(index=index_key, drop=True)["scaling_factor"] + .dropna(dim="input_group") + .values + ) + + cumulative_scaling: float = ( + reduce(lambda x, y: x * y, scaling_factors) or 1.0 + ) + + original_std = observation_ds.get("std") + assert original_std + ax.errorbar( + x="Scaled observation", + y=observation_ds.get("observations"), # type: ignore + yerr=original_std * cumulative_scaling, + fmt=".", + linewidth=1, + capsize=4, + color="black", + ) + # check if the response is empty if bool(response_ds.dims): if response_name == "summary": @@ -219,6 +267,7 @@ def _currentItemChanged( capsize=4, color="black", ) + _try_render_scaled_obs() response_ds = response_ds.rename_vars({"values": "Responses"}) sns.boxplot(response_ds.to_dataframe(), ax=ax) diff --git a/src/ert/storage/local_ensemble.py b/src/ert/storage/local_ensemble.py index a4416e6906f..772e27f8354 100644 --- a/src/ert/storage/local_ensemble.py +++ b/src/ert/storage/local_ensemble.py @@ -575,6 +575,23 @@ def load_cross_correlations(self) -> xr.Dataset: logger.info("Loading cross correlations") return xr.open_dataset(input_path, engine="scipy") + @require_write + def save_observation_scaling_factors(self, dataset: xr.Dataset) -> None: + dataset.to_netcdf( + self.mount_point / "observation_scaling_factors.nc", engine="scipy" + ) + + def load_observation_scaling_factors( + self, + ) -> Optional[xr.Dataset]: + ds_path = self.mount_point / "observation_scaling_factors.nc" + if ds_path.exists(): + return xr.load_dataset( + self.mount_point / "observation_scaling_factors.nc", engine="scipy" + ) + + return None + @require_write def save_cross_correlations( self, diff --git a/tests/integration_tests/analysis/test_adaptive_localization.py b/tests/integration_tests/analysis/test_adaptive_localization.py index dcf2cf392b3..45af7253e4c 100644 --- a/tests/integration_tests/analysis/test_adaptive_localization.py +++ b/tests/integration_tests/analysis/test_adaptive_localization.py @@ -1,3 +1,4 @@ +import shutil from textwrap import dedent import numpy as np @@ -97,6 +98,133 @@ def test_that_adaptive_localization_works_with_a_single_observation(): _, _ = run_cli_ES_with_case("poly_localization_0.ert") +@pytest.mark.usefixtures("copy_poly_case") +def test_that_adaptive_localization_works_with_multiple_observations(snapshot): + with open("observations", "w", encoding="utf-8") as file: + file.write( + """GENERAL_OBSERVATION POLY_OBS { + DATA = POLY_RES; + INDEX_LIST = 0,1,2,3,4; + OBS_FILE = poly_obs_data.txt; + }; + GENERAL_OBSERVATION POLY_OBS1_1 { + DATA = POLY_RES1; + INDEX_LIST = 0,1,2,3,4; + OBS_FILE = poly_obs_data1.txt; + }; + GENERAL_OBSERVATION POLY_OBS1_2 { + DATA = POLY_RES2; + INDEX_LIST = 0,1,2,3,4; + OBS_FILE = poly_obs_data2.txt; + }; + """ + ) + + with open("poly_eval.py", "w", encoding="utf-8") as file: + file.write( + """#!/usr/bin/env python3 +import json + + +def _load_coeffs(filename): + with open(filename, encoding="utf-8") as f: + return json.load(f)["COEFFS"] + + +def _evaluate(coeffs, x): + return coeffs["a"] * x**2 + coeffs["b"] * x + coeffs["c"] + + +if __name__ == "__main__": + coeffs = _load_coeffs("parameters.json") + output = [_evaluate(coeffs, x) for x in range(10)] + with open("poly.out", "w", encoding="utf-8") as f: + f.write("\\n".join(map(str, output))) + + with open("poly.out1", "w", encoding="utf-8") as f: + f.write("\\n".join(map(str, [x*2 for x in output]))) + + with open("poly.out2", "w", encoding="utf-8") as f: + f.write("\\n".join(map(str, [x*3 for x in output]))) +""" + ) + + shutil.copy("poly_obs_data.txt", "poly_obs_data1.txt") + shutil.copy("poly_obs_data.txt", "poly_obs_data2.txt") + + with open("poly_localization_0.ert", "w", encoding="utf-8") as f: + f.write( + """ + QUEUE_SYSTEM LOCAL +QUEUE_OPTION LOCAL MAX_RUNNING 50 + +RUNPATH poly_out/realization-/iter- + +OBS_CONFIG observations +REALIZATION_MEMORY 50mb + +NUM_REALIZATIONS 100 +MIN_REALIZATIONS 1 + +GEN_KW COEFFS coeff_priors +GEN_DATA POLY_RES RESULT_FILE:poly.out +GEN_DATA POLY_RES1 RESULT_FILE:poly.out1 +GEN_DATA POLY_RES2 RESULT_FILE:poly.out2 + +INSTALL_JOB poly_eval POLY_EVAL +SIMULATION_JOB poly_eval + +ANALYSIS_SET_VAR STD_ENKF LOCALIZATION True +ANALYSIS_SET_VAR STD_ENKF LOCALIZATION_CORRELATION_THRESHOLD 0.0 + +ANALYSIS_SET_VAR OBSERVATIONS AUTO_SCALE * +ANALYSIS_SET_VAR OBSERVATIONS AUTO_SCALE POLY_OBS1_* +""" + ) + + expected_records = { + ("*", "POLY_OBS", "0, 0"), + ("*", "POLY_OBS", "0, 1"), + ("*", "POLY_OBS", "0, 2"), + ("*", "POLY_OBS", "0, 3"), + ("*", "POLY_OBS", "0, 4"), + ("*", "POLY_OBS1_1", "0, 0"), + ("*", "POLY_OBS1_1", "0, 1"), + ("*", "POLY_OBS1_1", "0, 2"), + ("*", "POLY_OBS1_1", "0, 3"), + ("*", "POLY_OBS1_1", "0, 4"), + ("*", "POLY_OBS1_2", "0, 0"), + ("*", "POLY_OBS1_2", "0, 1"), + ("*", "POLY_OBS1_2", "0, 2"), + ("*", "POLY_OBS1_2", "0, 3"), + ("*", "POLY_OBS1_2", "0, 4"), + ("POLY_OBS1_*", "POLY_OBS1_1", "0, 0"), + ("POLY_OBS1_*", "POLY_OBS1_1", "0, 1"), + ("POLY_OBS1_*", "POLY_OBS1_1", "0, 2"), + ("POLY_OBS1_*", "POLY_OBS1_1", "0, 3"), + ("POLY_OBS1_*", "POLY_OBS1_1", "0, 4"), + ("POLY_OBS1_*", "POLY_OBS1_2", "0, 0"), + ("POLY_OBS1_*", "POLY_OBS1_2", "0, 1"), + ("POLY_OBS1_*", "POLY_OBS1_2", "0, 2"), + ("POLY_OBS1_*", "POLY_OBS1_2", "0, 3"), + ("POLY_OBS1_*", "POLY_OBS1_2", "0, 4"), + } + + prior_ens, _ = run_cli_ES_with_case("poly_localization_0.ert") + sf = prior_ens.load_observation_scaling_factors() + set_of_records_from_xr = { + x[:-1] + for x in sf.to_dataframe() + .reset_index() + .set_index(["input_group", "obs_key", "index"]) + .dropna() + .to_records() + .tolist() + } + + assert set_of_records_from_xr == expected_records + + @pytest.mark.integration_test @pytest.mark.usefixtures("copy_poly_case") def test_that_adaptive_localization_with_cutoff_0_equals_ESupdate(): diff --git a/tests/unit_tests/analysis/test_es_update.py b/tests/unit_tests/analysis/test_es_update.py index d126d467027..51a2cc33340 100644 --- a/tests/unit_tests/analysis/test_es_update.py +++ b/tests/unit_tests/analysis/test_es_update.py @@ -766,6 +766,7 @@ def _mock_load_observations_and_responses( global_std_scaling, auto_scale_observations, progress_callback, + ensemble, ): """ Runs through _load_observations_and_responses with mocked values for @@ -777,7 +778,7 @@ def _mock_load_observations_and_responses( mock_obs_n_responses.return_value = (S, observations, errors, obs_keys, indexes) return _load_observations_and_responses( - ensemble=None, + ensemble=ensemble, alpha=alpha, std_cutoff=std_cutoff, global_std_scaling=global_std_scaling, @@ -788,7 +789,7 @@ def _mock_load_observations_and_responses( ) -def test_that_autoscaling_applies_to_scaled_errors(): +def test_that_autoscaling_applies_to_scaled_errors(storage): with patch("ert.analysis.misfit_preprocessor.main") as misfit_main: misfit_main.return_value = ( np.array([2, 3]), @@ -806,6 +807,8 @@ def test_that_autoscaling_applies_to_scaled_errors(): global_std_scaling = 1 progress_callback = lambda _: None + experiment = storage.create_experiment(name="dummyexp") + ensemble = experiment.create_ensemble(name="dummy", ensemble_size=10) _, (_, scaled_errors_with_autoscale, _) = _mock_load_observations_and_responses( S=S, observations=observations, @@ -817,6 +820,7 @@ def test_that_autoscaling_applies_to_scaled_errors(): global_std_scaling=global_std_scaling, auto_scale_observations=[["obs1*"]], progress_callback=progress_callback, + ensemble=ensemble, ) _, (_, scaled_errors_without_autoscale, _) = ( @@ -831,6 +835,7 @@ def test_that_autoscaling_applies_to_scaled_errors(): global_std_scaling=global_std_scaling, auto_scale_observations=[], progress_callback=progress_callback, + ensemble=ensemble, ) ) diff --git a/tests/unit_tests/gui/tools/test_manage_experiments_tool.py b/tests/unit_tests/gui/tools/test_manage_experiments_tool.py index f18b1d78b92..0b7e42e7bb5 100644 --- a/tests/unit_tests/gui/tools/test_manage_experiments_tool.py +++ b/tests/unit_tests/gui/tools/test_manage_experiments_tool.py @@ -1,3 +1,5 @@ +import shutil + import pytest from qtpy.QtCore import Qt from qtpy.QtWidgets import QPushButton, QTextEdit @@ -12,8 +14,11 @@ _WidgetType, ) from ert.gui.tools.manage_experiments.storage_widget import StorageWidget -from ert.storage import Storage +from ert.storage import Storage, open_storage from ert.storage.realization_storage_state import RealizationStorageState +from tests.integration_tests.analysis.test_adaptive_localization import ( + run_cli_ES_with_case, +) @pytest.mark.usefixtures("copy_poly_case") @@ -209,6 +214,136 @@ def test_ensemble_view( assert ensemble_widget._figure.get_axes()[0].get_title() == "WOPR_OP1_108" +@pytest.mark.usefixtures("copy_poly_case") +def test_ensemble_observations_view(qtbot): + with open("observations", "w", encoding="utf-8") as file: + file.write( + """GENERAL_OBSERVATION POLY_OBS { + DATA = POLY_RES; + INDEX_LIST = 0,1,2,3,4; + OBS_FILE = poly_obs_data.txt; + }; + GENERAL_OBSERVATION POLY_OBS1_1 { + DATA = POLY_RES1; + INDEX_LIST = 0,1,2,3,4; + OBS_FILE = poly_obs_data1.txt; + }; + GENERAL_OBSERVATION POLY_OBS1_2 { + DATA = POLY_RES2; + INDEX_LIST = 0,1,2,3,4; + OBS_FILE = poly_obs_data2.txt; + }; + """ + ) + + with open("poly_eval.py", "w", encoding="utf-8") as file: + file.write( + """#!/usr/bin/env python3 +import json + + +def _load_coeffs(filename): + with open(filename, encoding="utf-8") as f: + return json.load(f)["COEFFS"] + + +def _evaluate(coeffs, x): + return coeffs["a"] * x**2 + coeffs["b"] * x + coeffs["c"] + + +if __name__ == "__main__": + coeffs = _load_coeffs("parameters.json") + output = [_evaluate(coeffs, x) for x in range(10)] + with open("poly.out", "w", encoding="utf-8") as f: + f.write("\\n".join(map(str, output))) + + with open("poly.out1", "w", encoding="utf-8") as f: + f.write("\\n".join(map(str, [x*2 for x in output]))) + + with open("poly.out2", "w", encoding="utf-8") as f: + f.write("\\n".join(map(str, [x*3 for x in output]))) +""" + ) + + shutil.copy("poly_obs_data.txt", "poly_obs_data1.txt") + shutil.copy("poly_obs_data.txt", "poly_obs_data2.txt") + + with open("poly_localization_0.ert", "w", encoding="utf-8") as f: + f.write( + """ + QUEUE_SYSTEM LOCAL +QUEUE_OPTION LOCAL MAX_RUNNING 50 + +RUNPATH poly_out/realization-/iter- + +OBS_CONFIG observations +REALIZATION_MEMORY 50mb + +NUM_REALIZATIONS 100 +MIN_REALIZATIONS 1 + +GEN_KW COEFFS coeff_priors +GEN_DATA POLY_RES RESULT_FILE:poly.out +GEN_DATA POLY_RES1 RESULT_FILE:poly.out1 +GEN_DATA POLY_RES2 RESULT_FILE:poly.out2 + +INSTALL_JOB poly_eval POLY_EVAL +SIMULATION_JOB poly_eval + +ANALYSIS_SET_VAR STD_ENKF LOCALIZATION True +ANALYSIS_SET_VAR STD_ENKF LOCALIZATION_CORRELATION_THRESHOLD 0.0 + +ANALYSIS_SET_VAR OBSERVATIONS AUTO_SCALE * +ANALYSIS_SET_VAR OBSERVATIONS AUTO_SCALE POLY_OBS1_* +""" + ) + + prior_ens, _ = run_cli_ES_with_case("poly_localization_0.ert") + config = ErtConfig.from_file("poly_localization_0.ert") + + notifier = ErtNotifier(config.config_path) + with open_storage(config.ens_path, mode="w") as storage: + notifier.set_storage(storage) + + tool = ManageExperimentsPanel( + config, notifier, config.model_config.num_realizations + ) + + assert prior_ens.name + + # select the ensemble + storage_widget = tool.findChild(StorageWidget) + storage_widget._tree_view.expandAll() + model_index = storage_widget._tree_view.model().index( + 0, 0, storage_widget._tree_view.model().index(0, 0) + ) + storage_widget._tree_view.setCurrentIndex(model_index) + assert ( + tool._storage_info_widget._content_layout.currentIndex() + == _WidgetType.ENSEMBLE_WIDGET + ) + + ensemble_widget = tool._storage_info_widget._content_layout.currentWidget() + assert isinstance(ensemble_widget, _EnsembleWidget) + assert ensemble_widget._name_label.text() + assert ensemble_widget._uuid_label.text() + assert not ensemble_widget._state_text_edit.toPlainText() + + ensemble_widget._tab_widget.setCurrentIndex(_EnsembleWidgetTabs.STATE_TAB) + assert ensemble_widget._state_text_edit.toPlainText() + + ensemble_widget._tab_widget.setCurrentIndex( + _EnsembleWidgetTabs.OBSERVATIONS_TAB + ) + + # Check that a scaled observation is plotted + assert any( + l + for l in ensemble_widget._figure.get_axes()[0].get_lines() + if "Scaled observation" in l.get_xdata() + ) + + def test_realization_view( qtbot, snake_oil_case_storage: ErtConfig, snake_oil_storage: Storage ):