Skip to content

Commit

Permalink
Store observations as parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
Yngve S. Kristiansen committed Sep 12, 2024
1 parent 0724104 commit 34091ed
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 23 deletions.
40 changes: 22 additions & 18 deletions src/ert/config/observation_vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, Dict, Iterable, List, Union

import xarray as xr

from .enkf_observation_implementation_type import EnkfObservationImplementationType
from .general_observation import GenObservation
from .summary_observation import SummaryObservation

if TYPE_CHECKING:
from datetime import datetime

import polars

Check failure on line 13 in src/ert/config/observation_vector.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Cannot find implementation or library stub for module named "polars"


@dataclass
class ObsVector:
Expand All @@ -27,25 +27,28 @@ def __iter__(self) -> Iterable[Union[SummaryObservation, GenObservation]]:
def __len__(self) -> int:
return len(self.observations)

def to_dataset(self, active_list: List[int]) -> xr.Dataset:
def to_dataset(self, active_list: List[int]) -> polars.DataFrame:
if self.observation_type == EnkfObservationImplementationType.GEN_OBS:
datasets = []
dataframes = []
for time_step, node in self.observations.items():
if active_list and time_step not in active_list:
continue

assert isinstance(node, GenObservation)
datasets.append(
xr.Dataset(
dataframes.append(
polars.DataFrame(
{
"observations": (["report_step", "index"], [node.values]),
"std": (["report_step", "index"], [node.stds]),
},
coords={"index": node.indices, "report_step": [time_step]},
"name": self.data_key,
"index": node.indices,
"report_step": time_step,
"observations": polars.Series(
node.values, dtype=polars.Float32
),
"std": polars.Series(node.stds, dtype=polars.Float32),
}
)
)
combined = xr.combine_by_coords(datasets)
combined.attrs["response"] = self.data_key
combined = polars.concat(dataframes)
return combined # type: ignore

Check failure on line 52 in src/ert/config/observation_vector.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Unused "type: ignore" comment
elif self.observation_type == EnkfObservationImplementationType.SUMMARY_OBS:
observations = []
Expand All @@ -59,13 +62,14 @@ def to_dataset(self, active_list: List[int]) -> xr.Dataset:
assert isinstance(n, SummaryObservation)
observations.append(n.value)
errors.append(n.std)
return xr.Dataset(

return polars.DataFrame(
{
"observations": (["name", "time"], [observations]),
"std": (["name", "time"], [errors]),
},
coords={"time": dates, "name": [self.observation_key]},
attrs={"response": "summary"},
"name": self.observation_key,
"time": dates,
"observations": polars.Series(observations, dtype=polars.Float32),
"std": polars.Series(errors, dtype=polars.Float32),
}
)
else:
raise ValueError(f"Unknown observation type {self.observation_type}")
20 changes: 17 additions & 3 deletions src/ert/config/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import TYPE_CHECKING, Dict, Iterator, List, Optional, Tuple, Union

import numpy as np
import xarray as xr
import polars

Check failure on line 7 in src/ert/config/observations.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Cannot find implementation or library stub for module named "polars"

from ert.validation import rangestring_to_list

Expand Down Expand Up @@ -42,8 +42,22 @@ class EnkfObs:
obs_time: List[datetime]

def __post_init__(self) -> None:
self.datasets: Dict[str, xr.Dataset] = {
name: obs.to_dataset([]) for name, obs in sorted(self.obs_vectors.items())
grouped = {}

Check failure on line 45 in src/ert/config/observations.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Need type annotation for "grouped" (hint: "grouped: dict[<type>, <type>] = ...")
for vec in self.obs_vectors.values():
if vec.observation_type == EnkfObservationImplementationType.SUMMARY_OBS:
if "summary" not in grouped:
grouped["summary"] = []

grouped["summary"].append(vec.to_dataset([]))

elif vec.observation_type == EnkfObservationImplementationType.GEN_OBS:
if "gen_data" not in grouped:
grouped["gen_data"] = []

grouped["gen_data"].append(vec.to_dataset([]))

self.datasets: Dict[str, polars.DataFrame] = {
name: polars.concat(dfs) for name, dfs in grouped.items()
}

def __len__(self) -> int:
Expand Down
5 changes: 3 additions & 2 deletions src/ert/storage/local_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from uuid import UUID

import numpy as np
import polars

Check failure on line 11 in src/ert/storage/local_experiment.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Cannot find implementation or library stub for module named "polars"
import xarray as xr
import xtgeo
from pydantic import BaseModel
Expand Down Expand Up @@ -142,7 +143,7 @@ def create(
output_path = path / "observations"
output_path.mkdir()
for obs_name, dataset in observations.items():
dataset.to_netcdf(output_path / f"{obs_name}", engine="scipy")
dataset.write_parquet(output_path / f"{obs_name}")

with open(path / cls._metadata_file, "w", encoding="utf-8") as f:
simulation_data = simulation_arguments if simulation_arguments else {}
Expand Down Expand Up @@ -306,7 +307,7 @@ def update_parameters(self) -> List[str]:
def observations(self) -> Dict[str, xr.Dataset]:
observations = sorted(self.mount_point.glob("observations/*"))
return {
observation.name: xr.open_dataset(observation, engine="scipy")
observation.name: polars.read_parquet(f"{observation}")
for observation in observations
}

Expand Down

0 comments on commit 34091ed

Please sign in to comment.