Skip to content

Commit

Permalink
Store & plot scaling factors
Browse files Browse the repository at this point in the history
Save & plot scaling factors
  • Loading branch information
yngve-sk authored Sep 9, 2024
1 parent 7224861 commit 225d655
Show file tree
Hide file tree
Showing 6 changed files with 357 additions and 3 deletions.
20 changes: 20 additions & 0 deletions src/ert/analysis/_es_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import iterative_ensemble_smoother as ies
import numpy as np
import pandas as pd
import psutil
from iterative_ensemble_smoother.experimental import (
AdaptiveESMDA,
Expand Down Expand Up @@ -272,6 +273,8 @@ def _load_observations_and_responses(
obs_mask = np.logical_and(ens_mean_mask, ens_std_mask)

if auto_scale_observations:
scaling_factors_dfs = []

for input_group in auto_scale_observations:
group = _expand_wildcards(obs_keys, input_group)
logger.info(f"Scaling observation group: {group}")
Expand All @@ -283,6 +286,18 @@ def _load_observations_and_responses(
S[obs_group_mask], scaled_errors[obs_group_mask]
)
scaling[obs_group_mask] *= scaling_factors

scaling_factors_dfs.append(
pd.DataFrame(
data={
"input_group": [", ".join(input_group)] * len(scaling_factors),
"index": indexes[obs_group_mask],
"obs_key": obs_keys[obs_group_mask],
"scaling_factor": scaling_factors,
}
)
)

progress_callback(
AnalysisDataEvent(
name="Auto scale: " + ", ".join(input_group),
Expand All @@ -307,6 +322,11 @@ def _load_observations_and_responses(
)
)

scaling_factors_df = pd.concat(scaling_factors_dfs).set_index(
["input_group", "obs_key", "index"], verify_integrity=True
)
ensemble.save_observation_scaling_factors(scaling_factors_df.to_xarray())

# Recompute with updated scales
scaled_errors = errors * scaling

Expand Down
49 changes: 49 additions & 0 deletions src/ert/gui/tools/manage_experiments/storage_info_widget.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
from enum import IntEnum
from functools import reduce
from typing import Optional

import numpy as np
Expand Down Expand Up @@ -196,6 +197,53 @@ def _currentItemChanged(
tuple(self._ensemble.get_realization_list_with_responses()),
)

scaling_ds = self._ensemble.load_observation_scaling_factors()

def _try_render_scaled_obs() -> None:
if scaling_ds is None:
return None

_obs_df = observation_ds.to_dataframe().reset_index()
# Should store scaling by response type
# and use primary key for the response to
# create the index key.
index_cols = list(
set(observation_ds.to_dataframe().reset_index().columns)
- {
"observations",
"std",
}
)

# Just to ensure report step comes first as in _es_update
if "report_step" in index_cols:
index_cols = ["report_step", "index"]

# for summary there is only "time"
index_key = ", ".join([str(_obs_df[x].values[0]) for x in index_cols])
scaling_factors = (
scaling_ds.sel(obs_key=observation_name, drop=True)
.sel(index=index_key, drop=True)["scaling_factor"]
.dropna(dim="input_group")
.values
)

cumulative_scaling: float = (
reduce(lambda x, y: x * y, scaling_factors) or 1.0
)

original_std = observation_ds.get("std")
assert original_std
ax.errorbar(
x="Scaled observation",
y=observation_ds.get("observations"), # type: ignore
yerr=original_std * cumulative_scaling,
fmt=".",
linewidth=1,
capsize=4,
color="black",
)

# check if the response is empty
if bool(response_ds.dims):
if response_name == "summary":
Expand All @@ -219,6 +267,7 @@ def _currentItemChanged(
capsize=4,
color="black",
)
_try_render_scaled_obs()

response_ds = response_ds.rename_vars({"values": "Responses"})
sns.boxplot(response_ds.to_dataframe(), ax=ax)
Expand Down
17 changes: 17 additions & 0 deletions src/ert/storage/local_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,23 @@ def load_cross_correlations(self) -> xr.Dataset:
logger.info("Loading cross correlations")
return xr.open_dataset(input_path, engine="scipy")

@require_write
def save_observation_scaling_factors(self, dataset: xr.Dataset) -> None:
dataset.to_netcdf(
self.mount_point / "observation_scaling_factors.nc", engine="scipy"
)

def load_observation_scaling_factors(
self,
) -> Optional[xr.Dataset]:
ds_path = self.mount_point / "observation_scaling_factors.nc"
if ds_path.exists():
return xr.load_dataset(
self.mount_point / "observation_scaling_factors.nc", engine="scipy"
)

return None

@require_write
def save_cross_correlations(
self,
Expand Down
128 changes: 128 additions & 0 deletions tests/integration_tests/analysis/test_adaptive_localization.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import shutil
from textwrap import dedent

import numpy as np
Expand Down Expand Up @@ -97,6 +98,133 @@ def test_that_adaptive_localization_works_with_a_single_observation():
_, _ = run_cli_ES_with_case("poly_localization_0.ert")


@pytest.mark.usefixtures("copy_poly_case")
def test_that_adaptive_localization_works_with_multiple_observations(snapshot):
with open("observations", "w", encoding="utf-8") as file:
file.write(
"""GENERAL_OBSERVATION POLY_OBS {
DATA = POLY_RES;
INDEX_LIST = 0,1,2,3,4;
OBS_FILE = poly_obs_data.txt;
};
GENERAL_OBSERVATION POLY_OBS1_1 {
DATA = POLY_RES1;
INDEX_LIST = 0,1,2,3,4;
OBS_FILE = poly_obs_data1.txt;
};
GENERAL_OBSERVATION POLY_OBS1_2 {
DATA = POLY_RES2;
INDEX_LIST = 0,1,2,3,4;
OBS_FILE = poly_obs_data2.txt;
};
"""
)

with open("poly_eval.py", "w", encoding="utf-8") as file:
file.write(
"""#!/usr/bin/env python3
import json
def _load_coeffs(filename):
with open(filename, encoding="utf-8") as f:
return json.load(f)["COEFFS"]
def _evaluate(coeffs, x):
return coeffs["a"] * x**2 + coeffs["b"] * x + coeffs["c"]
if __name__ == "__main__":
coeffs = _load_coeffs("parameters.json")
output = [_evaluate(coeffs, x) for x in range(10)]
with open("poly.out", "w", encoding="utf-8") as f:
f.write("\\n".join(map(str, output)))
with open("poly.out1", "w", encoding="utf-8") as f:
f.write("\\n".join(map(str, [x*2 for x in output])))
with open("poly.out2", "w", encoding="utf-8") as f:
f.write("\\n".join(map(str, [x*3 for x in output])))
"""
)

shutil.copy("poly_obs_data.txt", "poly_obs_data1.txt")
shutil.copy("poly_obs_data.txt", "poly_obs_data2.txt")

with open("poly_localization_0.ert", "w", encoding="utf-8") as f:
f.write(
"""
QUEUE_SYSTEM LOCAL
QUEUE_OPTION LOCAL MAX_RUNNING 50
RUNPATH poly_out/realization-<IENS>/iter-<ITER>
OBS_CONFIG observations
REALIZATION_MEMORY 50mb
NUM_REALIZATIONS 100
MIN_REALIZATIONS 1
GEN_KW COEFFS coeff_priors
GEN_DATA POLY_RES RESULT_FILE:poly.out
GEN_DATA POLY_RES1 RESULT_FILE:poly.out1
GEN_DATA POLY_RES2 RESULT_FILE:poly.out2
INSTALL_JOB poly_eval POLY_EVAL
SIMULATION_JOB poly_eval
ANALYSIS_SET_VAR STD_ENKF LOCALIZATION True
ANALYSIS_SET_VAR STD_ENKF LOCALIZATION_CORRELATION_THRESHOLD 0.0
ANALYSIS_SET_VAR OBSERVATIONS AUTO_SCALE *
ANALYSIS_SET_VAR OBSERVATIONS AUTO_SCALE POLY_OBS1_*
"""
)

expected_records = {
("*", "POLY_OBS", "0, 0"),
("*", "POLY_OBS", "0, 1"),
("*", "POLY_OBS", "0, 2"),
("*", "POLY_OBS", "0, 3"),
("*", "POLY_OBS", "0, 4"),
("*", "POLY_OBS1_1", "0, 0"),
("*", "POLY_OBS1_1", "0, 1"),
("*", "POLY_OBS1_1", "0, 2"),
("*", "POLY_OBS1_1", "0, 3"),
("*", "POLY_OBS1_1", "0, 4"),
("*", "POLY_OBS1_2", "0, 0"),
("*", "POLY_OBS1_2", "0, 1"),
("*", "POLY_OBS1_2", "0, 2"),
("*", "POLY_OBS1_2", "0, 3"),
("*", "POLY_OBS1_2", "0, 4"),
("POLY_OBS1_*", "POLY_OBS1_1", "0, 0"),
("POLY_OBS1_*", "POLY_OBS1_1", "0, 1"),
("POLY_OBS1_*", "POLY_OBS1_1", "0, 2"),
("POLY_OBS1_*", "POLY_OBS1_1", "0, 3"),
("POLY_OBS1_*", "POLY_OBS1_1", "0, 4"),
("POLY_OBS1_*", "POLY_OBS1_2", "0, 0"),
("POLY_OBS1_*", "POLY_OBS1_2", "0, 1"),
("POLY_OBS1_*", "POLY_OBS1_2", "0, 2"),
("POLY_OBS1_*", "POLY_OBS1_2", "0, 3"),
("POLY_OBS1_*", "POLY_OBS1_2", "0, 4"),
}

prior_ens, _ = run_cli_ES_with_case("poly_localization_0.ert")
sf = prior_ens.load_observation_scaling_factors()
set_of_records_from_xr = {
x[:-1]
for x in sf.to_dataframe()
.reset_index()
.set_index(["input_group", "obs_key", "index"])
.dropna()
.to_records()
.tolist()
}

assert set_of_records_from_xr == expected_records


@pytest.mark.integration_test
@pytest.mark.usefixtures("copy_poly_case")
def test_that_adaptive_localization_with_cutoff_0_equals_ESupdate():
Expand Down
9 changes: 7 additions & 2 deletions tests/unit_tests/analysis/test_es_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,7 @@ def _mock_load_observations_and_responses(
global_std_scaling,
auto_scale_observations,
progress_callback,
ensemble,
):
"""
Runs through _load_observations_and_responses with mocked values for
Expand All @@ -777,7 +778,7 @@ def _mock_load_observations_and_responses(
mock_obs_n_responses.return_value = (S, observations, errors, obs_keys, indexes)

return _load_observations_and_responses(
ensemble=None,
ensemble=ensemble,
alpha=alpha,
std_cutoff=std_cutoff,
global_std_scaling=global_std_scaling,
Expand All @@ -788,7 +789,7 @@ def _mock_load_observations_and_responses(
)


def test_that_autoscaling_applies_to_scaled_errors():
def test_that_autoscaling_applies_to_scaled_errors(storage):
with patch("ert.analysis.misfit_preprocessor.main") as misfit_main:
misfit_main.return_value = (
np.array([2, 3]),
Expand All @@ -806,6 +807,8 @@ def test_that_autoscaling_applies_to_scaled_errors():
global_std_scaling = 1
progress_callback = lambda _: None

experiment = storage.create_experiment(name="dummyexp")
ensemble = experiment.create_ensemble(name="dummy", ensemble_size=10)
_, (_, scaled_errors_with_autoscale, _) = _mock_load_observations_and_responses(
S=S,
observations=observations,
Expand All @@ -817,6 +820,7 @@ def test_that_autoscaling_applies_to_scaled_errors():
global_std_scaling=global_std_scaling,
auto_scale_observations=[["obs1*"]],
progress_callback=progress_callback,
ensemble=ensemble,
)

_, (_, scaled_errors_without_autoscale, _) = (
Expand All @@ -831,6 +835,7 @@ def test_that_autoscaling_applies_to_scaled_errors():
global_std_scaling=global_std_scaling,
auto_scale_observations=[],
progress_callback=progress_callback,
ensemble=ensemble,
)
)

Expand Down
Loading

0 comments on commit 225d655

Please sign in to comment.