Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store everest results in ERT storage #9161

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions src/ert/run_models/everest_run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@
)

import numpy as np
import seba_sqlite.sqlite_storage
from numpy import float64
from numpy._typing import NDArray
from ropt.enums import EventType, OptimizerExitCode
from ropt.evaluator import EvaluatorContext, EvaluatorResult
from ropt.plan import BasicOptimizer
from ropt.plan import Event as OptimizerEvent
from seba_sqlite import SqliteStorage
from seba_sqlite import SqliteStorage, sqlite_storage
from typing_extensions import TypedDict

from _ert.events import EESnapshot, EESnapshotUpdate, Event
Expand All @@ -37,6 +36,7 @@
from ert.runpaths import Runpaths
from ert.storage import open_storage
from everest.config import EverestConfig
from everest.everest_storage import EverestStorage, OptimalResult
from everest.optimizer.everest2ropt import everest2ropt
from everest.simulator import SimulatorCache
from everest.simulator.everest_to_ert import everest_to_ert_config
Expand Down Expand Up @@ -130,14 +130,14 @@


@dataclass
class OptimalResult:
class OptimalResult: # noqa

Check failure on line 133 in src/ert/run_models/everest_run_model.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Name "OptimalResult" already defined (possibly by an import)
batch: int
controls: list[Any]
total_objective: float

@staticmethod
def from_seba_optimal_result(
o: seba_sqlite.sqlite_storage.OptimalResult | None = None,
o: sqlite_storage.OptimalResult | None = None,
) -> OptimalResult | None:
if o is None:
return None
Expand Down Expand Up @@ -266,6 +266,16 @@
# Initialize the ropt optimizer:
optimizer = self._create_optimizer()

self.ever_storage = EverestStorage(
output_dir=Path(self.everest_config.optimization_output_dir),
)
self.ever_storage.observe_optimizer(
optimizer,
Path(self.everest_config.optimization_output_dir)
/ "dakota"
/ "OPT_DEFAULT.out",
)

# The SqliteStorage object is used to store optimization results from
# Seba in an sqlite database. It reacts directly to events emitted by
# Seba and is not called by Everest directly. The stored results are
Expand All @@ -283,6 +293,11 @@
self._result = OptimalResult.from_seba_optimal_result(
seba_storage.get_optimal_result() # type: ignore
)
optimal_result_from_everstorage = self.ever_storage.get_optimal_result()

# Seems ROPT batches are 1-indexed now,
# whereas seba has its own 0-indexed counter.
assert self._result.__dict__ == optimal_result_from_everstorage.__dict__

self._exit_code = (
"max_batch_num_reached"
Expand Down
220 changes: 126 additions & 94 deletions src/everest/api/everest_data_api.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,67 @@
from collections import OrderedDict
from pathlib import Path

import polars
import polars as pl
from seba_sqlite.snapshot import SebaSnapshot
from ropt.enums import ConstraintType
from seba_sqlite import SebaSnapshot

from ert.storage import open_storage
from everest.config import EverestConfig, ServerConfig
from everest.detached import ServerStatus, everserver_status
from everest.everest_storage import EverestStorage


class EverestDataAPI:
def __init__(self, config: EverestConfig, filter_out_gradient=True):
self._config = config
output_folder = config.optimization_output_dir
self._snapshot = SebaSnapshot(output_folder).get_snapshot(filter_out_gradient)
self._ever_storage = EverestStorage(Path(output_folder))
self._ever_storage.read_from_output_dir()

@property
def batches(self):
batch_ids = list({opt.batch_id for opt in self._snapshot.optimization_data})
return sorted(batch_ids)
return sorted(
b.batch_id
for b in self._ever_storage.data.batches
if b.batch_objectives is not None
)

@property
def accepted_batches(self):
batch_ids = list(
{opt.batch_id for opt in self._snapshot.optimization_data if opt.merit_flag}
return sorted(
b.batch_id for b in self._ever_storage.data.batches if b.is_improvement
)
return sorted(batch_ids)

@property
def objective_function_names(self):
return [fnc.name for fnc in self._snapshot.metadata.objectives.values()]
return sorted(
self._ever_storage.data.objective_functions["objective_name"]
.unique()
.to_list()
)

@property
def output_constraint_names(self):
return [fnc.name for fnc in self._snapshot.metadata.constraints.values()]
return (
sorted(
self._ever_storage.data.nonlinear_constraints["constraint_name"]
.unique()
.to_list()
)
if self._ever_storage.data.nonlinear_constraints is not None
else []
)

def input_constraint(self, control):
controls = [
con
for con in self._snapshot.metadata.controls.values()
if con.name == control
]
return {"min": controls[0].min_value, "max": controls[0].max_value}
initial_values = self._ever_storage.data.initial_values
control_spec = initial_values.filter(
pl.col("control_name") == control
).to_dicts()[0]
return {
"min": control_spec.get("lower_bounds"),
"max": control_spec.get("upper_bounds"),
}

def output_constraint(self, constraint):
"""
Expand All @@ -50,106 +71,122 @@ def output_constraint(self, constraint):
"right_hand_side" is a constant real number that indicates
the constraint bound/target.
"""
constraints = [
con
for con in self._snapshot.metadata.constraints.values()
if con.name == constraint
]

constraint_dict = self._ever_storage.data.nonlinear_constraints.to_dicts()[0]
return {
"type": constraints[0].constraint_type,
"right_hand_side": constraints[0].rhs_value,
"type": ConstraintType(constraint_dict["constraint_type"]).name.lower(),
"right_hand_side": constraint_dict["constraint_rhs_value"],
}

@property
def realizations(self):
return list(
OrderedDict.fromkeys(
int(sim.realization) for sim in self._snapshot.simulation_data
)
return sorted(
self._ever_storage.data.batches[0]
.realization_objectives["realization"]
.unique()
.to_list()
)

@property
def simulations(self):
return list(
OrderedDict.fromkeys(
[int(sim.simulation) for sim in self._snapshot.simulation_data]
)
return sorted(
self._ever_storage.data.batches[0]
.realization_objectives["simulation_id"]
.unique()
.to_list()
)

@property
def control_names(self):
return [con.name for con in self._snapshot.metadata.controls.values()]
return sorted(
self._ever_storage.data.initial_values["control_name"].unique().to_list()
)

@property
def control_values(self):
controls = [con.name for con in self._snapshot.metadata.controls.values()]
return [
{"control": con, "batch": sim.batch, "value": sim.controls[con]}
for sim in self._snapshot.simulation_data
for con in controls
if con in sim.controls
]
all_control_names = self._ever_storage.data.initial_values[
"control_name"
].to_list()
new = []
for batch in self._ever_storage.data.batches:
if batch.realization_controls is None:
continue

for controls_dict in batch.realization_controls.to_dicts():
for name in all_control_names:
new.append(
{
"control": name,
"batch": batch.batch_id,
"value": controls_dict[name],
}
)

return new

@property
def objective_values(self):
return [
{
"function": objective.name,
"batch": sim.batch,
"realization": sim.realization,
"simulation": sim.simulation,
"value": sim.objectives[objective.name],
"weight": objective.weight,
"norm": objective.normalization,
}
for sim in self._snapshot.simulation_data
for objective in self._snapshot.metadata.objectives.values()
if objective.name in sim.objectives
b for b in self._ever_storage.data.batches if b.batch_objectives is not None
]

@property
def single_objective_values(self):
single_obj = [
{
"batch": optimization_el.batch_id,
"objective": optimization_el.objective_value,
"accepted": optimization_el.merit_flag,
}
for optimization_el in self._snapshot.optimization_data
]
metadata = {
func.name: {"weight": func.weight, "norm": func.normalization}
for func in self._snapshot.metadata.functions.values()
if func.function_type == func.FUNCTION_OBJECTIVE_TYPE
}
if len(metadata) == 1:
return single_obj
objectives = []
for name, values in self._snapshot.expected_objectives.items():
for idx, val in enumerate(values):
factor = metadata[name]["weight"] * metadata[name]["norm"]
if len(objectives) > idx:
objectives[idx].update({name: val * factor})
else:
objectives.append({name: val * factor})
for idx, obj in enumerate(single_obj):
obj.update(objectives[idx])

return single_obj
batch_datas = polars.concat(
[
b.batch_objectives.select(
c for c in b.batch_objectives.columns if c != "merit_value"
).with_columns(
polars.lit(1 if b.is_improvement else 0).alias("accepted")
)
for b in self._ever_storage.data.batches
if b.realization_controls is not None
]
)
objectives = self._ever_storage.data.objective_functions

for o in objectives.to_dicts():
batch_datas = batch_datas.with_columns(
polars.col(o["objective_name"]) * o["weight"] * o["normalization"]
)

return (
batch_datas.rename(
{"total_objective_value": "objective", "batch_id": "batch"}
)
.select("batch", "objective", "accepted")
.to_dicts()
)

@property
def gradient_values(self):
return [
{
"batch": optimization_el.batch_id,
"function": function,
"control": control,
"value": value,
}
for optimization_el in self._snapshot.optimization_data
for function, info in optimization_el.gradient_info.items()
for control, value in info.items()
all_batch_data = [
b.batch_objective_gradient
for b in self._ever_storage.data.batches
if b.batch_objective_gradient is not None
]
if not all_batch_data:
return []

all_info = polars.concat(all_batch_data).drop("result_id")
objective_columns = [
c
for c in all_info.drop(["batch_id", "control_name"]).columns
if not c.endswith(".total")
]
return (
all_info.select("batch_id", "control_name", *objective_columns)
.unpivot(
on=objective_columns,
index=["batch_id", "control_name"],
variable_name="function",
value_name="value",
)
.rename({"control_name": "control", "batch_id": "batch"})
.sort(by=["batch", "control"])
.select(["batch", "function", "control", "value"])
.to_dicts()
)

def summary_values(self, batches=None, keys=None):
if batches is None:
Expand Down Expand Up @@ -180,13 +217,8 @@ def summary_values(self, batches=None, keys=None):
summary = summary.with_columns(
pl.Series("batch", [batch_id] * summary.shape[0])
)
# The realization ID as defined by Everest must be
# retrieved via the seba snapshot.
realization_map = {
sim.simulation: sim.realization
for sim in self._snapshot.simulation_data
if sim.batch == batch_id
}

realization_map = self._ever_storage.data.simulation_to_realization_map
realizations = pl.Series(
"realization",
[realization_map.get(str(sim)) for sim in summary["simulation"]],
Expand Down
Loading
Loading