Skip to content

Commit

Permalink
Migrate finalized keys for response configs
Browse files Browse the repository at this point in the history
  • Loading branch information
yngve-sk committed Dec 12, 2024
1 parent 2062854 commit 2d374ed
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 8 deletions.
5 changes: 3 additions & 2 deletions src/ert/storage/local_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

logger = logging.getLogger(__name__)

_LOCAL_STORAGE_VERSION = 8
_LOCAL_STORAGE_VERSION = 9


class _Migrations(BaseModel):
Expand Down Expand Up @@ -472,6 +472,7 @@ def _migrate(self, version: int) -> None:
to6,
to7,
to8,
to9,
)

try:
Expand Down Expand Up @@ -516,7 +517,7 @@ def _migrate(self, version: int) -> None:

elif version < _LOCAL_STORAGE_VERSION:
migrations = list(
enumerate([to2, to3, to4, to5, to6, to7, to8], start=1)
enumerate([to2, to3, to4, to5, to6, to7, to8, to9], start=1)
)
for from_version, migration in migrations[version - 1 :]:
print(f"* Updating storage to version: {from_version+1}")
Expand Down
86 changes: 86 additions & 0 deletions src/ert/storage/migration/to9.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import json
import os
from pathlib import Path
from tempfile import NamedTemporaryFile

import polars

info = "Migrate finalized response keys into configs"


def _write_transaction(filename: str | os.PathLike[str], data: bytes) -> None:
"""
Writes the data to the filename as a transaction.
Guarantees to not leave half-written or empty files on disk if the write
fails or the process is killed.
"""

Path("./swp").mkdir(parents=True, exist_ok=True)
with NamedTemporaryFile(dir="./swp", delete=False) as f:
f.write(data)
os.rename(f.name, filename)


def migrate(path: Path) -> None:
for experiment in path.glob("experiments/*"):
ensembles = [*path.glob("ensembles/*")]

with (
open(experiment / "index.json", encoding="utf-8") as f_experiment,
open(
experiment / "responses.json", mode="r", encoding="utf-8"
) as f_responses,
):
exp_index = json.load(f_experiment)
experiment_id = exp_index["id"]

responses_config = json.load(f_responses)
for response_type, config in responses_config.items():
if not config.get("has_finalized_keys"):
# Read a sample response and write the keys
for ens in ensembles:
with open(ens / "index.json", encoding="utf-8") as f_ensemble:
ens_file = json.load(f_ensemble)
if ens_file["experiment_id"] != experiment_id:
continue

real_dirs = [*ens.glob("realization-*")]

for real_dir in real_dirs:
if (real_dir / f"{response_type}.parquet").exists():
df = polars.scan_parquet(
real_dir / f"{response_type}.parquet"
)
response_keys = (
df.select("response_key")
.unique()
.collect()
.to_series()
.to_list()
)
config["has_finalized_keys"] = True
config["keys"] = sorted(response_keys)
break

if config.get("has_finalized_keys"):
break

# If this is hit, it means no responses were found
# for that response type, so we still cannot have "finalized"
# keys. We then default it to that of the configs.
# At time of writing, it is always True for GenDataConfig
# and False for SummaryConfig
if "has_finalized_keys" not in config:
config["has_finalized_keys"] = (
config["_ert_kind"] != "SummaryConfig"
)

_write_transaction(
experiment / "responses.json",
json.dumps(
responses_config,
default=str,
indent=2,
).encode("utf-8"),
)
17 changes: 13 additions & 4 deletions tests/ert/unit_tests/storage/migration/test_version_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,19 @@ def test_migrate_responses(setup_case, set_ert_config):
response_info = json.loads(
(experiment._path / "responses.json").read_text(encoding="utf-8")
)
assert (
experiment.response_configuration
== ert_config.ensemble_config.response_configs
)

response_config_exp = experiment.response_configuration
response_config_ens = ert_config.ensemble_config.response_configs

# From storage v9 and onwards the response config is mutated
# when migrating an existing experiment, because we check that the
# keys in response.json are aligned with the dataset.
response_config_ens["summary"].has_finalized_keys = response_config_exp[
"summary"
].has_finalized_keys
response_config_ens["summary"].keys = response_config_exp["summary"].keys

assert response_config_exp == response_config_ens

assert set(response_info) == {
"gen_data",
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{'gen_data': GenDataConfig(name='gen_data', input_files=['gen%d.txt'], keys=['GEN'], has_finalized_keys=True, report_steps_list=[[1]]), 'summary': SummaryConfig(name='summary', input_files=['CASE'], keys=['FOPR', 'RWPR'], has_finalized_keys=False, refcase={})}
{'gen_data': GenDataConfig(name='gen_data', input_files=['gen%d.txt'], keys=['GEN'], has_finalized_keys=True, report_steps_list=[[1]]), 'summary': SummaryConfig(name='summary', input_files=['CASE'], keys=['FOPR'], has_finalized_keys=True, refcase={})}
108 changes: 107 additions & 1 deletion tests/ert/unit_tests/storage/test_storage_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ def copy_shared(tmp_path, block_storage_path):
@pytest.mark.parametrize(
"ert_version",
[
"11.1.8",
"11.0.8",
"10.3.1",
"10.2.8",
"10.1.3",
Expand Down Expand Up @@ -127,6 +129,11 @@ def test_that_storage_matches(
response_config = experiment.response_configuration
response_config["summary"].refcase = {}

assert all(
"has_finalized_keys" in config
for config in experiment.response_info.values()
)

with open(
experiment._path / experiment._responses_file, "w", encoding="utf-8"
) as f:
Expand Down Expand Up @@ -197,7 +204,8 @@ def test_that_storage_matches(
"gen_data",
)

assert not ensemble.experiment._has_finalized_response_keys("summary")
assert ensemble.experiment._has_finalized_response_keys("summary")
assert ensemble.experiment._has_finalized_response_keys("gen_data")
ensemble.save_response("summary", ensemble.load_responses("summary", (0,)), 0)
assert ensemble.experiment._has_finalized_response_keys("summary")
assert ensemble.experiment.response_type_to_response_keys["summary"] == ["FOPR"]
Expand All @@ -208,6 +216,8 @@ def test_that_storage_matches(
@pytest.mark.parametrize(
"ert_version",
[
"11.1.8",
"11.0.8",
"10.3.1",
"10.2.8",
"10.1.3",
Expand Down Expand Up @@ -457,3 +467,99 @@ def test_that_manual_update_from_migrated_storage_works(
list(experiment.observation_keys),
list(ert_config.ensemble_config.parameters),
)


@pytest.mark.integration_test
@pytest.mark.usefixtures("copy_shared")
@pytest.mark.parametrize(
"ert_version",
[
"11.1.8",
"10.3.1",
"10.0.3",
"9.0.17",
"8.4.9",
"8.4.8",
"8.4.7",
"8.4.6",
"8.4.5",
"8.4.4",
"8.4.3",
"8.4.2",
"8.4.1",
"8.4.0",
"8.3.1",
"8.3.0",
"8.2.1",
"8.2.0",
"8.1.1",
"8.1.0",
"8.0.13",
"8.0.12",
"8.0.11",
"8.0.10",
"8.0.9",
"8.0.8",
"8.0.7",
"8.0.6",
"8.0.4",
"8.0.3",
"8.0.2",
"8.0.1",
"8.0.0",
"7.0.4",
"7.0.3",
"7.0.2",
"7.0.1",
"7.0.0",
"6.0.8",
"6.0.7",
"6.0.6",
"6.0.4",
"6.0.3",
"6.0.1",
"6.0.0",
"5.0.11",
"5.0.9",
"5.0.8",
"5.0.7",
"5.0.6",
"5.0.5",
"5.0.4",
"5.0.2",
"5.0.1",
"5.0.0",
],
)
def test_migrate_storage_with_no_responses(
tmp_path,
block_storage_path,
monkeypatch,
ert_version,
):
storage_path = tmp_path / "all_data_types" / f"storage-{ert_version}"
shutil.copytree(
block_storage_path / f"all_data_types/storage-{ert_version}",
storage_path,
)
[ensemble_id] = os.listdir(storage_path / "ensembles")

# Remove all realization-*/TOP.nc, and only some realization-*/BPC.nc
for real_dir in (storage_path / "ensembles" / ensemble_id).glob("realization-*"):
gen_data_file = next(
file for file in os.listdir(real_dir) if "gen" in file.lower()
)

os.remove(real_dir / gen_data_file)

summary_file = next(
file for file in os.listdir(real_dir) if "summary" in file.lower()
)

os.remove(real_dir / summary_file)

monkeypatch.chdir(tmp_path / "all_data_types")
ert_config = ErtConfig.with_plugins().from_file("config.ert")
local_storage_set_ert_config(ert_config)

open_storage(f"storage-{ert_version}", "w")

0 comments on commit 2d374ed

Please sign in to comment.