Skip to content

Commit

Permalink
finish ensemble manager with eval type
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaolong0728 committed Dec 2, 2024
1 parent 61b0d8a commit 6832878
Showing 1 changed file with 118 additions and 103 deletions.
221 changes: 118 additions & 103 deletions views_pipeline_core/managers/ensemble_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,69 @@

class EnsembleManager(ModelManager):

def __init__(self, ensemble_path: EnsemblePath):
def __init__(self, ensemble_path: EnsemblePath) -> None:
super().__init__(ensemble_path)

@staticmethod
def _get_shell_command(model_path: ModelPath,
run_type: str,
train: bool,
evaluate: bool,
forecast: bool,
use_saved: bool = False,
eval_type: str = "standard"
) -> list:
"""
Args:
model_path (ModelPath): model path object for the model
run_type (str): the type of run (calibration, testing, forecasting)
train (bool): if the model should be trained
evaluate (bool): if the model should be evaluated
forecast (bool): if the model should be used for forecasting
use_saved (bool): if the model should use locally stored data
Returns:
"""

shell_command = [f"{str(model_path.model_dir)}/run.sh"]
shell_command.append("--run_type")
shell_command.append(run_type)

if train:
shell_command.append("--train")
if evaluate:
shell_command.append("--evaluate")
if forecast:
shell_command.append("--forecast")
if use_saved:
shell_command.append("--saved")

shell_command.append("--eval_type")
shell_command.append(eval_type)

return shell_command

@staticmethod
def _get_aggregated_df(df_to_aggregate, aggregation):
"""
Aggregates the DataFrames of model outputs based on the specified aggregation method.
Args:
- df_to_aggregate (list of pd.DataFrame): A list of DataFrames of model outputs.
- aggregation (str): The aggregation method to use (either "mean" or "median").
Returns:
- df (pd.DataFrame): The aggregated DataFrame of model outputs.
"""

if aggregation == "mean":
return pd.concat(df_to_aggregate).groupby(level=[0, 1]).mean()
elif aggregation == "median":
return pd.concat(df_to_aggregate).groupby(level=[0, 1]).median()
else:
logger.error(f"Invalid aggregation: {aggregation}")

def execute_single_run(self, args) -> None:
"""
Expand All @@ -33,6 +93,7 @@ def execute_single_run(self, args) -> None:
"""
self.config = self._update_single_config(args)
self._project = f"{self.config['name']}_{args.run_type}"
self._eval_type = args.eval_type

try:
if not args.train:
Expand Down Expand Up @@ -78,7 +139,7 @@ def _execute_model_tasks(

if eval:
logger.info(f"Evaluating model {self.config['name']}...")
self._evaluate_ensemble()
self._evaluate_ensemble(self._eval_type)

if forecast:
logger.info(f"Forecasting model {self.config['name']}...")
Expand Down Expand Up @@ -111,8 +172,8 @@ def _train_model_artifact(self, model_name:str, run_type: str, use_saved: bool)
subprocess.run(shell_command, check=True)
except Exception as e:
logger.error(f"Error during shell command execution for model {model_name}: {e}")

def _evaluate_model_artifact(self, model_name:str, run_type: str, steps: List[int]) -> None:
def _evaluate_model_artifact(self, model_name:str, run_type: str, eval_type: str) -> None:
logger.info(f"Evaluating single model {model_name}...")

model_path = ModelPath(model_name)
Expand All @@ -122,39 +183,46 @@ def _evaluate_model_artifact(self, model_name:str, run_type: str, steps: List[in
path_artifact = self._get_latest_model_artifact(path_artifacts, run_type)

ts = path_artifact.stem[-15:]

pkl_path = f"{path_generated}/predictions_{steps[-1]}_{run_type}_{ts}.pkl"
if Path(pkl_path).exists():
logger.info(f"Loading existing {run_type} predictions from {pkl_path}")
with open(pkl_path, "rb") as file:
df = pickle.load(file)
else:
logger.info(f"No existing {run_type} predictions found. Generating new {run_type} predictions...")
model_config = ModelManager(model_path).configs
model_config["run_type"] = run_type
shell_command = EnsembleManager._get_shell_command(model_path,
run_type,
train=False,
evaluate=True,
forecast=False,
use_saved=True)

try:
subprocess.run(shell_command, check=True)
except Exception as e:
logger.error(f"Error during shell command execution for model {model_name}: {e}")
preds = []

with open(pkl_path, "rb") as file:
df = pickle.load(file)

data_generation_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
date_fetch_timestamp = read_log_file(path_raw / f"{run_type}_data_fetch_log.txt").get("Data Fetch Timestamp", None)

create_log_file(path_generated, model_config, ts, data_generation_timestamp, date_fetch_timestamp)

return df
for sequence_number in range(ModelManager._resolve_evaluation_sequence_number(eval_type)):

pkl_path = f"{path_generated}/predictions_{run_type}_{ts}_{str(sequence_number).zfill(2)}.pkl"
if Path(pkl_path).exists():
logger.info(f"Loading existing {run_type} predictions from {pkl_path}")
with open(pkl_path, "rb") as file:
pred = pickle.load(file)
else:
logger.info(f"No existing {run_type} predictions found. Generating new {run_type} predictions...")
model_config = ModelManager(model_path).configs
model_config["run_type"] = run_type
shell_command = EnsembleManager._get_shell_command(model_path,
run_type,
train=False,
evaluate=True,
forecast=False,
use_saved=True,
eval_type=eval_type)

try:
subprocess.run(shell_command, check=True)
except Exception as e:
logger.error(f"Error during shell command execution for model {model_name}: {e}")

with open(pkl_path, "rb") as file:
pred = pickle.load(file)

data_generation_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
date_fetch_timestamp = read_log_file(path_raw / f"{run_type}_data_fetch_log.txt").get("Data Fetch Timestamp", None)

create_log_file(path_generated, model_config, ts, data_generation_timestamp, date_fetch_timestamp)

preds.append(pred)

return preds

def _forecast_model_artifact(self, model_name:str, run_type: str, steps: List[int]) -> None:
def _forecast_model_artifact(self, model_name:str, run_type: str) -> None:
logger.info(f"Forecasting single model {model_name}...")

model_path = ModelPath(model_name)
Expand All @@ -165,7 +233,7 @@ def _forecast_model_artifact(self, model_name:str, run_type: str, steps: List[in

ts = path_artifact.stem[-15:]

pkl_path = f"{path_generated}/predictions_{steps[-1]}_{run_type}_{ts}.pkl"
pkl_path = f"{path_generated}/predictions_{run_type}_{ts}.pkl"
if Path(pkl_path).exists():
logger.info(f"Loading existing {run_type} predictions from {pkl_path}")
with open(pkl_path, "rb") as file:
Expand Down Expand Up @@ -202,28 +270,31 @@ def _train_ensemble(self, use_saved: bool) -> None:
for model_name in self.config["models"]:
self._train_model_artifact(model_name, run_type, use_saved)

def _evaluate_ensemble(self) -> None:
def _evaluate_ensemble(self, eval_type: str) -> None:
path_generated_e = self._model_path.data_generated
run_type = self.config["run_type"]
steps = self.config["steps"]
dfs = []
dfs_agg = []

for model_name in self.config["models"]:
dfs.append(self._evaluate_model_artifact(model_name, run_type, eval_type))

dfs.append(self._evaluate_model_artifact(model_name, run_type, steps))
for i in range(len(dfs[0])):
df_to_aggregate = [df[i] for df in dfs]
df_agg = EnsembleManager._get_aggregated_df(df_to_aggregate, self.config["aggregation"])
dfs_agg.append(df_agg)

df_agg = EnsembleManager._get_aggregated_df(dfs, self.config["aggregation"])
data_generation_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")


_, df_output = generate_output_dict(df_agg, self.config)
evaluation, df_evaluation = generate_metric_dict(df_agg, self.config)
log_wandb_log_dict(self.config, evaluation)
# _, df_output = generate_output_dict(df_agg, self.config)
# evaluation, df_evaluation = generate_metric_dict(df_agg, self.config)
# log_wandb_log_dict(self.config, evaluation)

# Timestamp of single models is more important than ensemble model timestamp
self.config["timestamp"] = datetime.now().strftime("%Y%m%d_%H%M%S")
self._save_model_outputs(df_evaluation, df_output, path_generated_e)
self._save_predictions(df_agg, path_generated_e)
# self._save_model_outputs(df_evaluation, df_output, path_generated_e)
for i, df_agg in enumerate(dfs_agg):
self._save_predictions(df_agg, path_generated_e, i)

# How to define an ensemble model timestamp? Currently set as data_generation_timestamp.
create_log_file(path_generated_e, self.config, data_generation_timestamp, data_generation_timestamp, data_fetch_timestamp=None,
Expand All @@ -232,12 +303,11 @@ def _evaluate_ensemble(self) -> None:
def _forecast_ensemble(self) -> None:
path_generated_e = self._model_path.data_generated
run_type = self.config["run_type"]
steps = self.config["steps"]
dfs = []

for model_name in self.config["models"]:

dfs.append(self._forecast_model_artifact(model_name, run_type, steps))
dfs.append(self._forecast_model_artifact(model_name, run_type))

df_prediction = EnsembleManager._get_aggregated_df(dfs, self.config["aggregation"])
data_generation_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
Expand All @@ -249,59 +319,4 @@ def _forecast_ensemble(self) -> None:
create_log_file(path_generated_e, self.config, data_generation_timestamp, data_generation_timestamp, data_fetch_timestamp=None,
model_type="ensemble", models=self.config["models"])

@staticmethod
def _get_shell_command(model_path: ModelPath,
run_type: str,
train: bool,
evaluate: bool,
forecast: bool,
use_saved: bool = False
) -> list:
"""
Args:
model_path (ModelPath): model path object for the model
run_type (str): the type of run (calibration, testing, forecasting)
train (bool): if the model should be trained
evaluate (bool): if the model should be evaluated
forecast (bool): if the model should be used for forecasting
use_saved (bool): if the model should use locally stored data
Returns:
"""

shell_command = [f"{str(model_path.model_dir)}/run.sh"]
shell_command.append("--run_type")
shell_command.append(run_type)

if train:
shell_command.append("--train")
if evaluate:
shell_command.append("--evaluate")
if forecast:
shell_command.append("--forecast")
if use_saved:
shell_command.append("--saved")

return shell_command

@staticmethod
def _get_aggregated_df(dfs, aggregation):
"""
Aggregates the DataFrames of model outputs based on the specified aggregation method.
Args:
- dfs (list of pd.DataFrame): A list of DataFrames of model outputs.
- aggregation (str): The aggregation method to use (either "mean" or "median").
Returns:
- df (pd.DataFrame): The aggregated DataFrame of model outputs.
"""
if aggregation == "mean":
return pd.concat(dfs).groupby(level=[0, 1]).mean()
elif aggregation == "median":
return pd.concat(dfs).groupby(level=[0, 1]).median()
else:
logger.error(f"Invalid aggregation: {aggregation}")


0 comments on commit 6832878

Please sign in to comment.