Skip to content

Commit

Permalink
Combine responses across realizations
Browse files Browse the repository at this point in the history
  • Loading branch information
yngve-sk committed Nov 26, 2024
1 parent 5bdfea3 commit 88e6ab2
Show file tree
Hide file tree
Showing 5 changed files with 728 additions and 51 deletions.
1 change: 1 addition & 0 deletions src/ert/simulator/batch_simulator_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ def results(self) -> List[Optional[Dict[str, "npt.NDArray[np.float64]"]]]:
"Simulations are still running - need to wait before getting results"
)

self.get_ensemble().refresh()
res: List[Optional[Dict[str, "npt.NDArray[np.float64]"]]] = []
for sim_id in range(len(self)):
if self.get_job_state(iens=sim_id) != JobState.COMPLETED:
Expand Down
179 changes: 128 additions & 51 deletions src/ert/storage/local_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,95 @@ def get_failure(self, realization: int) -> Optional[_Failure]:
def refresh(self) -> None:
self.get_ensemble_state.cache_clear()
self.get_ensemble_state()
self.load_responses.cache_clear()
self._combine_responses_across_reals()

def _has_combined_data(self) -> bool:
return any(
(self.mount_point / f"{response_type}.parquet").exists()
for response_type in self.experiment.response_configuration
)

def _has_uncombined_data(self) -> bool:
for i in range(self.ensemble_size):
if any(
(self._realization_dir(i) / f"{response_type}.parquet").exists()
for response_type in self.experiment.response_configuration
):
return True

return False

def _combine_responses_across_reals(self) -> None:
has_combined_data = self._has_combined_data()
has_uncombined_data = self._has_uncombined_data()

if has_uncombined_data and not has_combined_data:
# Consider only realizations with all responses
realizations = self.get_realization_list_with_responses()

for response_type in self.experiment.response_type_to_response_keys:
df_files = [
self._realization_dir(real) / f"{response_type}.parquet"
for real in realizations
]

dfs = [polars.read_parquet(f) for f in df_files]
combined_df = polars.concat(dfs)
self._storage._to_parquet_transaction(
self.mount_point / f"{response_type}.parquet", combined_df
)

for f in df_files:
os.remove(f)
elif has_uncombined_data and has_combined_data:
for (
response_type,
response_config,
) in self.experiment.response_configuration.items():
df_files = [
(self._realization_dir(real) / f"{response_type}.parquet")
for real in range(self.ensemble_size)
if (
self._realization_dir(real) / f"{response_type}.parquet"
).exists()
]

if not df_files:
continue

# Note: Check if this is memory-intensive, should
# use chunks if it is
new_df = polars.concat([polars.read_parquet(f) for f in df_files])
old_df = polars.read_parquet(
self.mount_point / f"{response_type}.parquet"
)

updated_df = (
old_df.join(
new_df,
on=[
"realization",
"response_key",
*response_config.primary_key,
],
how="full",
)
.with_columns(
polars.coalesce( # Let new values overwrite old ones
[polars.col("values_right"), polars.col("values")]
).alias("values")
)
.drop([f"{c}_right" for c in old_df.columns])
)

os.remove(self.mount_point / f"{response_type}.parquet")
self._storage._to_parquet_transaction(
self.mount_point / f"{response_type}.parquet", updated_df
)

for f in df_files:
os.remove(f)

@lru_cache # noqa: B019
def get_ensemble_state(self) -> List[Set[RealizationStorageState]]:
Expand Down Expand Up @@ -458,65 +547,48 @@ def _parameters_exist_for_realization(realization: int) -> bool:
for parameter in self.experiment.parameter_configuration
)

def _responses_exist_for_realization(
realization: int, key: Optional[str] = None
) -> bool:
"""
Returns true if there are responses in the experiment and they have
all been saved in the ensemble
Parameters
----------
realization : int
Realization index.
key : str, optional
Response key to filter realizations. If None, all responses are considered.
Returns
-------
exists : bool
True if responses exist for realization.
"""

if not self.experiment.response_configuration:
return True
path = self._realization_dir(realization)

def _has_response(_key: str) -> bool:
if _key in self.experiment.response_key_to_response_type:
_response_type = self.experiment.response_key_to_response_type[_key]
return (path / f"{_response_type}.parquet").exists()

return (path / f"{_key}.parquet").exists()

if key:
return _has_response(key)

is_expecting_any_responses = any(
bool(config.keys)
for config in self.experiment.response_configuration.values()
)

if not is_expecting_any_responses:
return True

non_empty_response_configs = [
response
for response, config in self.experiment.response_configuration.items()
if bool(config.keys)
]

return all(
_has_response(response) for response in non_empty_response_configs
# If has combined -> Compute for every response type, which reals have it
# If not combined -> Check realization dirs
# Can get rid of the entire fn below
reals_per_response_type = {}
for response_type, config in self.experiment.response_configuration.items():
if config.has_finalized_keys and config.keys == []:
reals_per_response_type[response_type] = set(range(self.ensemble_size))

# Check for combined
if (self.mount_point / f"{response_type}.parquet").exists():
df = self.load_responses(
response_type, tuple(range(self.ensemble_size))
)
reals_per_response_type[response_type] = set(
df["realization"].unique().to_list()
)
else:
# Otherwise we check the realization folders
reals_per_response_type[response_type] = {
real
for real in range(self.ensemble_size)
if (
self._realization_dir(real) / f"{response_type}.parquet"
).exists()
}

reals_with_all_responses = {
real
for real in range(self.ensemble_size)
if all(
real in reals_per_response_type[response_type]
for response_type in self.experiment.response_configuration
)
}

def _find_state(realization: int) -> Set[RealizationStorageState]:
_state = set()
if self.has_failure(realization):
failure = self.get_failure(realization)
assert failure
_state.add(failure.type)
if _responses_exist_for_realization(realization):
if realization in reals_with_all_responses:
_state.add(RealizationStorageState.RESPONSES_LOADED)
if _parameters_exist_for_realization(realization):
_state.add(RealizationStorageState.PARAMETERS_LOADED)
Expand Down Expand Up @@ -660,6 +732,11 @@ def load_responses(self, key: str, realizations: Tuple[int]) -> polars.DataFrame
response_type = self.experiment.response_key_to_response_type[key]
select_key = True

combined_df_path = self.mount_point / f"{response_type}.parquet"
if combined_df_path.exists():
combined_df = polars.read_parquet(combined_df_path)
return combined_df.filter(polars.col("realization").is_in(realizations))

loaded = []
for realization in realizations:
input_path = self._realization_dir(realization) / f"{response_type}.parquet"
Expand Down
2 changes: 2 additions & 0 deletions tests/ert/performance_tests/test_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ def g(X):
iens,
)

prior_ensemble.refresh()

posterior_ensemble = storage.create_ensemble(
prior_ensemble.experiment_id,
ensemble_size=prior_ensemble.ensemble_size,
Expand Down
2 changes: 2 additions & 0 deletions tests/ert/unit_tests/analysis/test_es_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,8 @@ def test_gen_data_missing(storage, uniform_parameter, obs):
),
iens,
)

prior.refresh()
posterior_ens = storage.create_ensemble(
prior.experiment_id,
ensemble_size=prior.ensemble_size,
Expand Down
Loading

0 comments on commit 88e6ab2

Please sign in to comment.