diff --git a/src/everest/everest_storage.py b/src/everest/everest_storage.py index d6214869c6a..4f32163899a 100644 --- a/src/everest/everest_storage.py +++ b/src/everest/everest_storage.py @@ -55,7 +55,7 @@ def try_read_df(path: Path) -> polars.DataFrame | None: @dataclass class BatchDataFrames: batch_id: int - batch_controls: polars.DataFrame + realization_controls: polars.DataFrame batch_objectives: Optional[polars.DataFrame] realization_objectives: Optional[polars.DataFrame] batch_constraints: Optional[polars.DataFrame] @@ -76,6 +76,9 @@ def existing_dataframes(self) -> Dict[str, polars.DataFrame]: if self.realization_objectives is not None: dataframes["realization_objectives"] = self.realization_objectives + if self.realization_controls is not None: + dataframes["realization_controls"] = self.realization_controls + if self.batch_constraints is not None: dataframes["batch_constraints"] = self.batch_constraints @@ -253,8 +256,8 @@ def read_from_experiment(self, experiment: _OptimizerOnlyExperiment) -> None: ens.optimizer_mount_point / "perturbation_constraints.parquet" ) - batch_controls = try_read_df( - ens.optimizer_mount_point / "batch_controls.parquet" + realization_controls = try_read_df( + ens.optimizer_mount_point / "realization_controls.parquet" ) with open(ens.optimizer_mount_point / "batch.json", encoding="utf-8") as f: @@ -265,7 +268,7 @@ def read_from_experiment(self, experiment: _OptimizerOnlyExperiment) -> None: self.batches.append( BatchDataFrames( batch_id, - batch_controls, + realization_controls, batch_objectives, realization_objectives, batch_constraints, @@ -324,7 +327,7 @@ def get_ensemble_by_name(self, name: str) -> _OptimizerOnlyEnsemble: @dataclass class _EvaluationResults: - batch_controls: polars.DataFrame + realization_controls: polars.DataFrame batch_objectives: polars.DataFrame realization_objectives: polars.DataFrame batch_constraints: Optional[polars.DataFrame] @@ -414,13 +417,13 @@ def _initialize(self, event): self._convert_names(config.variables.names), dtype=polars.String ), "initial_value": polars.Series( - config.variables.initial_values, dtype=polars.Float32 + config.variables.initial_values, dtype=polars.Float64 ), "lower_bounds": polars.Series( - config.variables.lower_bounds, dtype=polars.Float32 + config.variables.lower_bounds, dtype=polars.Float64 ), "upper_bounds": polars.Series( - config.variables.upper_bounds, dtype=polars.Float32 + config.variables.upper_bounds, dtype=polars.Float64 ), } ) @@ -429,11 +432,11 @@ def _initialize(self, event): { "objective_name": config.objectives.names, "weight": polars.Series( - config.objectives.weights, dtype=polars.Float32 + config.objectives.weights, dtype=polars.Float64 ), "normalization": polars.Series( [1.0 / s for s in config.objectives.scales], - dtype=polars.Float32, + dtype=polars.Float64, ), } ) @@ -442,7 +445,9 @@ def _initialize(self, event): self.data.nonlinear_constraints = polars.DataFrame( { "constraint_name": config.nonlinear_constraints.names, - "normalization": config.nonlinear_constraints.scales, + "normalization": [ + 1.0 / s for s in config.nonlinear_constraints.scales + ], # Q: Is this correct? "constraint_rhs_value": config.nonlinear_constraints.rhs_values, "constraint_type": config.nonlinear_constraints.types, } @@ -454,7 +459,7 @@ def _initialize(self, event): config.realizations.names, dtype=polars.UInt16 ), "weight": polars.Series( - config.realizations.weights, dtype=polars.Float32 + config.realizations.weights, dtype=polars.Float64 ), } ) @@ -465,7 +470,7 @@ def _store_function_results(self, results: FunctionResults) -> _EvaluationResult realization_objectives = polars.from_pandas( results.to_dataframe( "evaluations", - select=["variables", "objectives", "constraints", "evaluation_ids"], + select=["objectives", "constraints", "evaluation_ids"], ).reset_index(), ).drop("plan_id") batch_objectives = polars.from_pandas( @@ -475,27 +480,21 @@ def _store_function_results(self, results: FunctionResults) -> _EvaluationResult ).reset_index() ).drop("plan_id") - batch_controls = polars.from_pandas( - results.to_dataframe("evaluations", select=["variables"]).reset_index() + realization_controls = polars.from_pandas( + results.to_dataframe( + "evaluations", select=["variables", "evaluation_ids"] + ).reset_index() ).drop("plan_id") - batch_controls = self._rename_columns(batch_controls) - control_names = batch_controls["control_name"].unique().to_list() + realization_controls = self._rename_columns(realization_controls) + realization_controls = self._enforce_dtypes(realization_controls) - has_scaled_controls = "scaled_control_value" in batch_controls - batch_controls = batch_controls.pivot( + realization_controls = realization_controls.pivot( on="control_name", values=["control_value"], # , "scaled_control_value"] separator=":", ) - if has_scaled_controls: - batch_controls = batch_controls.rename( - { - **{f"control_value:{name}": name for name in control_names}, - } - ) - try: batch_constraints = polars.from_pandas( results.to_dataframe("nonlinear_constraints").reset_index() @@ -506,7 +505,10 @@ def _store_function_results(self, results: FunctionResults) -> _EvaluationResult realization_constraints = None batch_objectives = self._rename_columns(batch_objectives) + batch_objectives = self._enforce_dtypes(batch_objectives) + realization_objectives = self._rename_columns(realization_objectives) + realization_objectives = self._enforce_dtypes(realization_objectives) batch_objectives = batch_objectives.pivot( on="objective_name", @@ -547,35 +549,33 @@ def _store_function_results(self, results: FunctionResults) -> _EvaluationResult "result_id", "batch_id", "realization", + "simulation_id", "constraint_name", "constraint_value", - ].unique(["result_id", "batch_id", "realization", "constraint_name"]) + ] realization_constraints = realization_constraints.pivot( values=["constraint_value"], on="constraint_name" ) realization_objectives = realization_objectives.drop( [c for c in realization_objectives.columns if "constraint" in c.lower()] - ).unique(subset=["result_id", "batch_id", "realization", "control_name"]) + ) batch_objectives = batch_objectives.drop( [c for c in batch_objectives.columns if "constraint" in c.lower()] - ).unique(subset=["result_id", "batch_id"]) - - realization_objectives = ( - realization_objectives.drop(["control_name", "control_value"]) - .unique(subset=["result_id", "batch_id", "realization", "objective_name"]) - .pivot( - values="objective_value", - index=[ - "result_id", - "batch_id", - "realization", - ], - columns="objective_name", ) + + realization_objectives = realization_objectives.pivot( + values="objective_value", + index=[ + "result_id", + "batch_id", + "realization", + "simulation_id", + ], + columns="objective_name", ) return _EvaluationResults( - batch_controls, + realization_controls, batch_objectives, realization_objectives, batch_constraints, @@ -604,9 +604,52 @@ def _rename_columns(df: polars.DataFrame): "scaled_perturbed_objectives": "scaled_perturbed_objective_value", "scaled_perturbed_constraints": "scaled_perturbed_constraint_value", "scaled_variables": "scaled_control_value", + "evaluation_ids": "simulation_id", } return df.rename({k: v for k, v in _renames.items() if k in df.columns}) + @staticmethod + def _enforce_dtypes(df: polars.DataFrame): + _dtypes = { + "batch_id": polars.UInt16, + "result_id": polars.UInt16, + "perturbation": polars.UInt16, + "realization": polars.UInt16, + "simulation_id": polars.UInt16, + "objective_name": polars.String, + "control_name": polars.String, + "constraint_name": polars.String, + "total_objective_value": polars.Float64, + "control_value": polars.Float64, + "objective_value": polars.Float64, + "constraint_value": polars.Float64, + "scaled_constraint_value": polars.Float64, + "scaled_objective_value": polars.Float64, + "perturbed_control_value": polars.Float64, + "perturbed_objective_value": polars.Float64, + "perturbed_constraint_value": polars.Float64, + "scaled_perturbed_objective_value": polars.Float64, + "scaled_perturbed_constraint_value": polars.Float64, + "scaled_control_value": polars.Float64, + } + + existing_cols = set(df.columns) + unaccounted_cols = existing_cols - set(_dtypes) + if len(unaccounted_cols) > 0: + raise KeyError( + f"Expected all keys to have a specified dtype, found {unaccounted_cols}" + ) + + df = df.cast( + { + colname: dtype + for colname, dtype in _dtypes.items() + if colname in df.columns + } + ) + + return df + def _store_gradient_results(self, results: FunctionResults) -> _GradientResults: perturbation_objectives = polars.from_pandas( results.to_dataframe("evaluations").reset_index() @@ -631,8 +674,10 @@ def _store_gradient_results(self, results: FunctionResults) -> _GradientResults: if c.lower().startswith("scaled") ) batch_objective_gradient = self._rename_columns(batch_objective_gradient) + batch_objective_gradient = self._enforce_dtypes(batch_objective_gradient) perturbation_objectives = self._rename_columns(perturbation_objectives) + perturbation_objectives = self._rename_columns(perturbation_objectives) if "constraint_name" in perturbation_objectives: perturbation_constraints = ( @@ -772,7 +817,9 @@ def _handle_finished_batch_event(self, event: Event): if isinstance(item, FunctionResults): eval_results = self._store_function_results(item) - _batches[item.batch_id]["batch_controls"] = eval_results.batch_controls + _batches[item.batch_id]["realization_controls"] = ( + eval_results.realization_controls + ) _batches[item.batch_id]["batch_objectives"] = ( eval_results.batch_objectives ) @@ -812,7 +859,7 @@ def _handle_finished_batch_event(self, event: Event): self.data.batches.append( BatchDataFrames( batch_id=batch_id, - batch_controls=info.get("batch_controls"), + realization_controls=info.get("realization_controls"), batch_objectives=info.get("batch_objectives"), realization_objectives=info.get("realization_objectives"), batch_constraints=info.get("batch_constraints"), @@ -874,13 +921,15 @@ def find_best_batch(filter_by, sort_by): matching_batches.sort(key=sort_by) _batch = matching_batches[0] - _controls_dict = _batch.batch_controls.drop( + _controls_dict = _batch.realization_controls.drop( [ "result_id", "batch_id", + "simulation_id", + "realization", *[ c - for c in _batch.batch_controls.columns + for c in _batch.realization_controls.columns if c.endswith(".scaled") # don't need scaled control values ], ] diff --git a/src/everest/export.py b/src/everest/export.py index 57973ba3758..bda4345b2a6 100644 --- a/src/everest/export.py +++ b/src/everest/export.py @@ -177,21 +177,141 @@ def export_metadata(config: Optional[ExportConfig], optimization_output_dir: str # WIP! metadata2 = [] - for i, batch_info in enumerate(storage.data.batches): + for i, batch_info in enumerate(b for b in storage.data.batches if b.is_improvement): if discard_rejected and not batch_info.is_improvement: continue - corresponding = metadata[i] - print("Yo") - md_row2: Dict[str, Any] = { - MetaDataColumnNames.BATCH: batch_info.batch_id, - MetaDataColumnNames.SIM_AVERAGED_OBJECTIVE: batch_info.batch_objectives.select( - polars.mean("total_objective_value") - ).item(), - MetaDataColumnNames.REALIZATION: None, - } - metadata2.append(md_row2) - assert corresponding is not None + all_control_names = storage.data.initial_values["control_name"].to_list() + all_objective_names = storage.data.objective_functions[ + "objective_name" + ].to_list() + # all_constraint_names = storage.data.nonlinear_constraints[ + # "constraint_name" + # ].to_list() + + realization_info = batch_info.realization_objectives.join( + batch_info.realization_constraints, + on=["result_id", "batch_id", "realization", "simulation_id"], + ).join( + batch_info.realization_controls, + on=["result_id", "batch_id", "realization", "simulation_id"], + ) + for real_tuple, data in realization_info.group_by("realization"): + realization = real_tuple[0] + corresponding = next( + m + for m in metadata + if m[MetaDataColumnNames.BATCH] == batch_info.batch_id - 1 + and m[MetaDataColumnNames.REALIZATION] == realization + ) + objectives_dict = {} + objectives_gradient_dict = {} + for objective in storage.data.objective_functions.to_dicts(): + weight = objective["weight"] + normalization = objective["normalization"] + objective_name = objective["objective_name"] + objective_value = data[objective_name].item() + + objectives_dict[objective_name] = objective_value + objectives_dict[f"{objective_name}_norm"] = ( + objective_value * normalization + ) + objectives_dict[f"{objective_name}_weighted_norm"] = ( + objective_value * weight * normalization + ) + + if batch_info.batch_objective_gradient is not None: + for objective_name in all_objective_names: + for d in batch_info.batch_objective_gradient.select( + "control_name", objective_name + ).to_dicts(): + objectives_gradient_dict[ + f"gradient-{objective_name}-{d['control_name']}" + ] = d[objective_name] + + # Q: Seems to not be exported, why? + # constraints_gradient_dict = {} + # if batch_info.batch_constraint_gradient is not None: + # for constraint_name in all_constraint_names: + # for d in batch_info.batch_constraint_gradient.select( + # "control_name", constraint_name + # ).to_dicts(): + # constraints_gradient_dict[ + # f"gradient-{constraint_name}-{d['control_name']}" + # ] = d[constraint_name] + + constraints_dict = {} + for constraint in storage.data.nonlinear_constraints.to_dicts(): + # SEBA always just sets it to 1 for functions as a "convenience" + weight = 1 + + normalization = constraint["normalization"] + constraint_name = constraint["constraint_name"] + constraint_value = data[constraint_name].item() + + constraints_dict[constraint_name] = constraint_value + constraints_dict[f"{constraint_name}_norm"] = ( + constraint_value * normalization + ) + constraints_dict[f"{constraint_name}_weighted_norm"] = ( + constraint_value * weight * normalization + ) + + controls_dict = { + control_name: data[control_name].item() + for control_name in all_control_names + } + + my_stuff = { + MetaDataColumnNames.BATCH: batch_info.batch_id - 1, + MetaDataColumnNames.SIM_AVERAGED_OBJECTIVE: batch_info.batch_objectives[ + "total_objective_value" + ].item(), # avg weighted (summarized for batch) + MetaDataColumnNames.REAL_AVERAGED_OBJECTIVE: batch_info.batch_objectives[ + "total_objective_value" + ].item(), + MetaDataColumnNames.SIMULATION: data["simulation_id"].item(), + MetaDataColumnNames.IS_GRADIENT: 0, # TODO get from everest config? + MetaDataColumnNames.REALIZATION: realization, + MetaDataColumnNames.SUCCESS: 1, # TODO is it always 1? + MetaDataColumnNames.REALIZATION_WEIGHT: storage.data.realization_weights.filter( + polars.col("realization") == realization # OK + )["weight"].first(), + **objectives_dict, # OK(95% sure) + **constraints_dict, # OK, old is incorrect, shows rhs + **controls_dict, # OK + **objectives_gradient_dict, # OK + MetaDataColumnNames.INCREASED_MERIT: batch_info.is_improvement, # OK + # **constraints_gradient_dict, + } + + for colname in MetaDataColumnNames: + if colname in my_stuff: + if my_stuff[colname] != corresponding[colname]: + print( + f"MISMATCHING {colname} for [batch={batch_info.batch_id}, " + f"real={my_stuff[MetaDataColumnNames.REALIZATION]}," + f"sim={my_stuff[MetaDataColumnNames.SIMULATION]}, expected {corresponding[colname]}, got {my_stuff[colname]}" + ) + else: + pass # print(f"MATCHING {colname} okok") + # { + # obj: objective + # obj_norm: objective * normalization + # obj_weighted_norm: objective * weight * normalization + # } + + # { + # cstr: constraint + # cstr_norm: constraint * normalization + # cstr_weighted_norm: constraint * weight * normalization + # } + + # {control name:value} + + # {gradient-obj-control: value} + # REAL_AVERAGED_OBJECTIVE? + metadata2.append(my_stuff) return metadata