diff --git a/tests/test_model_manager.py b/tests/test_model_manager.py index 97e56b8..4a3ae86 100644 --- a/tests/test_model_manager.py +++ b/tests/test_model_manager.py @@ -230,7 +230,7 @@ def test_save_model_outputs(mock_model_path): df_output = pd.DataFrame({"col1": [5, 6], "col2": [7, 8]}) path_generated = Path("/path/to/generated") with patch("builtins.open", new_callable=mock_open), patch("pathlib.Path.mkdir"): - manager._save_model_outputs(df_evaluation, df_output, path_generated) + manager._save_model_outputs(df_evaluation, df_output, path_generated, sequence_number=1) with patch("pathlib.Path.exists", return_value=True): assert Path(f"{path_generated}/output_1_test_run_20220101.pkl").exists() assert Path(f"{path_generated}/evaluation_1_test_run_20220101.pkl").exists() diff --git a/views_pipeline_core/cli/utils.py b/views_pipeline_core/cli/utils.py index 206e99b..72bc3e9 100644 --- a/views_pipeline_core/cli/utils.py +++ b/views_pipeline_core/cli/utils.py @@ -86,6 +86,11 @@ def parse_args(): help="Enable drift-detection self_test at data-fetch" ) + parser.add_argument( + "-et", "--eval_type", type=str, default="standard", + help="Type of evaluation to be performed" + ) + return parser.parse_args() @@ -134,10 +139,17 @@ def validate_arguments(args): ) sys.exit(1) - if not args.train and not args.saved: - # if not training, then we need to use saved data + if (not args.train and not args.sweep) and not args.saved: + # if not training or sweeping, then we need to use saved data + print( + "Error: if --train or --sweep is not set, you should only use --saved flag. Exiting." + ) + print("To fix: Add --train or --sweep or --saved flag.") + sys.exit(1) + + if args.eval_type not in ["standard", "long", "complete", "live"]: print( - "Error: if --train is not set, you should only use --saved flag. Exiting." + "Error: --eval_type should be one of 'standard', 'long', 'complete', or 'live'. Exiting." ) - print("To fix: Add --train or --saved flag.") + print("To fix: Set --eval_type to one of the above options.") sys.exit(1) diff --git a/views_pipeline_core/managers/ensemble_manager.py b/views_pipeline_core/managers/ensemble_manager.py new file mode 100644 index 0000000..2904c7e --- /dev/null +++ b/views_pipeline_core/managers/ensemble_manager.py @@ -0,0 +1,322 @@ +from views_pipeline_core.managers.path_manager import ModelPath, EnsemblePath +from views_pipeline_core.managers.model_manager import ModelManager +from views_pipeline_core.wandb.utils import add_wandb_monthly_metrics, log_wandb_log_dict +from views_pipeline_core.models.check import ensemble_model_check +from views_pipeline_core.files.utils import read_log_file, create_log_file +from views_pipeline_core.models.outputs import generate_output_dict +from views_pipeline_core.evaluation.metrics import generate_metric_dict +from typing import Union, Optional, List, Dict +import wandb +import logging +import time +import pickle +from pathlib import Path +import subprocess +from datetime import datetime +import pandas as pd + +logger = logging.getLogger(__name__) + + +class EnsembleManager(ModelManager): + + def __init__(self, ensemble_path: EnsemblePath) -> None: + super().__init__(ensemble_path) + + @staticmethod + def _get_shell_command(model_path: ModelPath, + run_type: str, + train: bool, + evaluate: bool, + forecast: bool, + use_saved: bool = False, + eval_type: str = "standard" + ) -> list: + """ + + Args: + model_path (ModelPath): model path object for the model + run_type (str): the type of run (calibration, testing, forecasting) + train (bool): if the model should be trained + evaluate (bool): if the model should be evaluated + forecast (bool): if the model should be used for forecasting + use_saved (bool): if the model should use locally stored data + + Returns: + + """ + + shell_command = [f"{str(model_path.model_dir)}/run.sh"] + shell_command.append("--run_type") + shell_command.append(run_type) + + if train: + shell_command.append("--train") + if evaluate: + shell_command.append("--evaluate") + if forecast: + shell_command.append("--forecast") + if use_saved: + shell_command.append("--saved") + + shell_command.append("--eval_type") + shell_command.append(eval_type) + + return shell_command + + @staticmethod + def _get_aggregated_df(df_to_aggregate, aggregation): + """ + Aggregates the DataFrames of model outputs based on the specified aggregation method. + + Args: + - df_to_aggregate (list of pd.DataFrame): A list of DataFrames of model outputs. + - aggregation (str): The aggregation method to use (either "mean" or "median"). + + Returns: + - df (pd.DataFrame): The aggregated DataFrame of model outputs. + """ + + if aggregation == "mean": + return pd.concat(df_to_aggregate).groupby(level=[0, 1]).mean() + elif aggregation == "median": + return pd.concat(df_to_aggregate).groupby(level=[0, 1]).median() + else: + logger.error(f"Invalid aggregation: {aggregation}") + + def execute_single_run(self, args) -> None: + """ + Executes a single run of the model, including data fetching, training, evaluation, and forecasting. + + Args: + args: Command line arguments. + """ + self.config = self._update_single_config(args) + self._project = f"{self.config['name']}_{args.run_type}" + self._eval_type = args.eval_type + + try: + if not args.train: + ensemble_model_check(self.config) + + self._execute_model_tasks( + config=self.config, + train=args.train, + eval=args.evaluate, + forecast=args.forecast, + use_saved=args.saved + ) + + except Exception as e: + logger.error(f"Error during single run execution: {e}") + + def _execute_model_tasks( + self, + config: Optional[Dict] = None, + train: Optional[bool] = None, + eval: Optional[bool] = None, + forecast: Optional[bool] = None, + use_saved: Optional[bool] = None + ) -> None: + """ + Executes various model-related tasks including training, evaluation, and forecasting. + + Args: + config (dict, optional): Configuration object containing parameters and settings. + train (bool, optional): Flag to indicate if the model should be trained. + eval (bool, optional): Flag to indicate if the model should be evaluated. + forecast (bool, optional): Flag to indicate if forecasting should be performed. + """ + start_t = time.time() + try: + with wandb.init(project=self._project, entity=self._entity, config=config): + add_wandb_monthly_metrics() + self.config = wandb.config + + if train: + logger.info(f"Training model {self.config['name']}...") + self._train_ensemble(use_saved) + + if eval: + logger.info(f"Evaluating model {self.config['name']}...") + self._evaluate_ensemble(self._eval_type) + + if forecast: + logger.info(f"Forecasting model {self.config['name']}...") + self._forecast_ensemble() + + wandb.finish() + except Exception as e: + logger.error(f"Error during model tasks execution: {e}") + + end_t = time.time() + minutes = (end_t - start_t) / 60 + logger.info(f"Done. Runtime: {minutes:.3f} minutes.\n") + + def _train_model_artifact(self, model_name:str, run_type: str, use_saved: bool) -> None: + logger.info(f"Training single model {model_name}...") + + model_path = ModelPath(model_name) + model_config = ModelManager(model_path).configs + model_config["run_type"] = run_type + + shell_command = EnsembleManager._get_shell_command(model_path, + run_type, + train=True, + evaluate=False, + forecast=False, + use_saved=use_saved) + + # print(shell_command) + try: + subprocess.run(shell_command, check=True) + except Exception as e: + logger.error(f"Error during shell command execution for model {model_name}: {e}") + + def _evaluate_model_artifact(self, model_name:str, run_type: str, eval_type: str) -> None: + logger.info(f"Evaluating single model {model_name}...") + + model_path = ModelPath(model_name) + path_raw = model_path.data_raw + path_generated = model_path.data_generated + path_artifacts = model_path.artifacts + path_artifact = self._get_latest_model_artifact(path_artifacts, run_type) + + ts = path_artifact.stem[-15:] + + preds = [] + + for sequence_number in range(ModelManager._resolve_evaluation_sequence_number(eval_type)): + + pkl_path = f"{path_generated}/predictions_{run_type}_{ts}_{str(sequence_number).zfill(2)}.pkl" + if Path(pkl_path).exists(): + logger.info(f"Loading existing {run_type} predictions from {pkl_path}") + with open(pkl_path, "rb") as file: + pred = pickle.load(file) + else: + logger.info(f"No existing {run_type} predictions found. Generating new {run_type} predictions...") + model_config = ModelManager(model_path).configs + model_config["run_type"] = run_type + shell_command = EnsembleManager._get_shell_command(model_path, + run_type, + train=False, + evaluate=True, + forecast=False, + use_saved=True, + eval_type=eval_type) + + try: + subprocess.run(shell_command, check=True) + except Exception as e: + logger.error(f"Error during shell command execution for model {model_name}: {e}") + + with open(pkl_path, "rb") as file: + pred = pickle.load(file) + + data_generation_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + date_fetch_timestamp = read_log_file(path_raw / f"{run_type}_data_fetch_log.txt").get("Data Fetch Timestamp", None) + + create_log_file(path_generated, model_config, ts, data_generation_timestamp, date_fetch_timestamp) + + preds.append(pred) + + return preds + + def _forecast_model_artifact(self, model_name:str, run_type: str) -> None: + logger.info(f"Forecasting single model {model_name}...") + + model_path = ModelPath(model_name) + path_raw = model_path.data_raw + path_generated = model_path.data_generated + path_artifacts = model_path.artifacts + path_artifact = self._get_latest_model_artifact(path_artifacts, run_type) + + ts = path_artifact.stem[-15:] + + pkl_path = f"{path_generated}/predictions_{run_type}_{ts}.pkl" + if Path(pkl_path).exists(): + logger.info(f"Loading existing {run_type} predictions from {pkl_path}") + with open(pkl_path, "rb") as file: + df = pickle.load(file) + else: + logger.info(f"No existing {run_type} predictions found. Generating new {run_type} predictions...") + model_config = ModelManager(model_path).configs + model_config["run_type"] = run_type + shell_command = EnsembleManager._get_shell_command(model_path, + run_type, + train=False, + evaluate=False, + forecast=True, + use_saved=True) + # print(shell_command) + try: + subprocess.run(shell_command, check=True) + except Exception as e: + logger.error(f"Error during shell command execution for model {model_name}: {e}") + + with open(pkl_path, "rb") as file: + df = pickle.load(file) + + data_generation_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + date_fetch_timestamp = read_log_file(path_raw / f"{run_type}_data_fetch_log.txt").get("Data Fetch Timestamp", None) + + create_log_file(path_generated, model_config, ts, data_generation_timestamp, date_fetch_timestamp) + + return df + + def _train_ensemble(self, use_saved: bool) -> None: + run_type = self.config["run_type"] + + for model_name in self.config["models"]: + self._train_model_artifact(model_name, run_type, use_saved) + + def _evaluate_ensemble(self, eval_type: str) -> None: + path_generated_e = self._model_path.data_generated + run_type = self.config["run_type"] + dfs = [] + dfs_agg = [] + + for model_name in self.config["models"]: + dfs.append(self._evaluate_model_artifact(model_name, run_type, eval_type)) + + for i in range(len(dfs[0])): + df_to_aggregate = [df[i] for df in dfs] + df_agg = EnsembleManager._get_aggregated_df(df_to_aggregate, self.config["aggregation"]) + dfs_agg.append(df_agg) + + data_generation_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + + # _, df_output = generate_output_dict(df_agg, self.config) + # evaluation, df_evaluation = generate_metric_dict(df_agg, self.config) + # log_wandb_log_dict(self.config, evaluation) + + # Timestamp of single models is more important than ensemble model timestamp + self.config["timestamp"] = datetime.now().strftime("%Y%m%d_%H%M%S") + # self._save_model_outputs(df_evaluation, df_output, path_generated_e) + for i, df_agg in enumerate(dfs_agg): + self._save_predictions(df_agg, path_generated_e, i) + + # How to define an ensemble model timestamp? Currently set as data_generation_timestamp. + create_log_file(path_generated_e, self.config, data_generation_timestamp, data_generation_timestamp, data_fetch_timestamp=None, + model_type="ensemble", models=self.config["models"]) + + def _forecast_ensemble(self) -> None: + path_generated_e = self._model_path.data_generated + run_type = self.config["run_type"] + dfs = [] + + for model_name in self.config["models"]: + + dfs.append(self._forecast_model_artifact(model_name, run_type)) + + df_prediction = EnsembleManager._get_aggregated_df(dfs, self.config["aggregation"]) + data_generation_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + + self.config["timestamp"] = datetime.now().strftime("%Y%m%d_%H%M%S") + self._save_predictions(df_prediction, path_generated_e) + + # How to define an ensemble model timestamp? Currently set as data_generation_timestamp. + create_log_file(path_generated_e, self.config, data_generation_timestamp, data_generation_timestamp, data_fetch_timestamp=None, + model_type="ensemble", models=self.config["models"]) + + \ No newline at end of file diff --git a/views_pipeline_core/managers/model_manager.py b/views_pipeline_core/managers/model_manager.py index 512ebdf..9e3392f 100644 --- a/views_pipeline_core/managers/model_manager.py +++ b/views_pipeline_core/managers/model_manager.py @@ -13,9 +13,6 @@ logger = logging.getLogger(__name__) - - - # ============================================================ Model Manager ============================================================ @@ -58,6 +55,66 @@ def __init__(self, model_path: ModelPath) -> None: ) self._data_loader = ViewsDataLoader(model_path=self._model_path) + @staticmethod + def _resolve_evaluation_sequence_number(eval_type: str) -> int: + """ + Resolve the evaluation length based on the evaluation type. + + Args: + eval_type (str): The type of evaluation to perform (e.g., standard, long, complete, live). + + Returns: + int: The evaluation length. + """ + if eval_type == "standard": + return 12 + elif eval_type == "long": + return 36 + elif eval_type == "complete": + return None # currently set as None because sophisticated calculation is needed + elif eval_type == "live": + return 12 + else: + raise ValueError(f"Invalid evaluation type: {eval_type}") + + @staticmethod + def _generate_model_file_name(run_type: str, timestamp: str) -> str: + """ + Generates a model file name based on the run type, and timestamp. + + Args: + run_type (str): The type of run (e.g., calibration, testing). + timestamp (str): The timestamp of the model file. + + Returns: + str: The generated model file name. + """ + + return f"{run_type}_model_{timestamp}.pkl" + + @staticmethod + def _generate_output_file_name( + generated_file_type: str, + run_type: str, + timestamp: str, + sequence_number: int) -> str: + """ + Generates a prediction file name based on the run type, generated file type, steps, and timestamp. + + Args: + generated_file_type (str): The type of generated file (e.g., predictions, output, evaluation). + sequence_number (int): The sequence number. + run_type (str): The type of run (e.g., calibration, testing). + + Returns: + str: The generated prediction file name. + """ + # logger.info(f"sequence_number: {sequence_number}") + if sequence_number is not None: + return f"{generated_file_type}_{run_type}_{timestamp}_{str(sequence_number).zfill(2)}.pkl" + else: + return f"{generated_file_type}_{run_type}_{timestamp}.pkl" + def __load_config(self, script_name: str, config_method: str) -> Union[Dict, None]: """ Loads and executes a configuration method from a specified script. @@ -84,6 +141,7 @@ def __load_config(self, script_name: str, config_method: str) -> Union[Dict, Non return getattr(config_module, config_method)() except (AttributeError, ImportError) as e: logger.error(f"Error loading config from {script_name}: {e}") + return None def _update_single_config(self, args) -> Dict: @@ -103,6 +161,7 @@ def _update_single_config(self, args) -> Dict: } config["run_type"] = args.run_type config["sweep"] = False + return config def _update_sweep_config(self, args) -> Dict: @@ -121,7 +180,133 @@ def _update_sweep_config(self, args) -> Dict: config["parameters"]["name"] = {"value": self._config_meta["name"]} config["parameters"]["depvar"] = {"value": self._config_meta["depvar"]} config["parameters"]["algorithm"] = {"value": self._config_meta["algorithm"]} + return config + + def _get_artifact_files(self, path_artifact: Path, run_type: str) -> List[Path]: + """ + Retrieve artifact files from a directory that match the given run type and common extensions. + + Args: + path_artifact (Path): The directory path where model files are stored. + run_type (str): The type of run (e.g., calibration, testing). + + Returns: + List[Path]: List of matching model file paths. + """ + common_extensions = [ + ".pt", + ".pth", + ".h5", + ".hdf5", + ".pkl", + ".json", + ".bst", + ".txt", + ".bin", + ".cbm", + ".onnx", + ] + artifact_files = [ + f + for f in path_artifact.iterdir() + if f.is_file() + and f.stem.startswith(f"{run_type}_model_") + and f.suffix in common_extensions + ] + return artifact_files + + def _get_latest_model_artifact(self, path_artifact: Path, run_type: str) -> Path: + """ + Retrieve the path (pathlib path object) latest model artifact for a given run type based on the modification time. + + Args: + path_artifact (Path): The model specifc directory path where artifacts are stored. + run_type (str): The type of run (e.g., calibration, testing, forecasting). + + Returns: + The path (pathlib path objsect) to the latest model artifact given the run type. + + Raises: + FileNotFoundError: If no model artifacts are found for the given run type. + """ + + # List all model files for the given specific run_type with the expected filename pattern + model_files = self._get_artifact_files(path_artifact, run_type) + + if not model_files: + raise FileNotFoundError( + f"No model artifacts found for run type '{run_type}' in path '{path_artifact}'" + ) + + # Sort the files based on the timestamp embedded in the filename. With format %Y%m%d_%H%M%S For example, '20210831_123456.pt' + model_files.sort(reverse=True) + + # print statements for debugging + logger.info(f"artifact used: {model_files[0]}") + + return path_artifact / model_files[0] + + def _save_model_outputs( + self, + df_evaluation: pd.DataFrame, + df_output: pd.DataFrame, + path_generated: Union[str, Path], + sequence_number: int, + ) -> None: + """ + Save the model outputs and evaluation metrics to the specified path. + + Args: + df_evaluation (pd.DataFrame): DataFrame containing evaluation metrics. + df_output (pd.DataFrame): DataFrame containing model outputs. + path_generated (str or Path): The path where the outputs should be saved. + sequence_number (int): The sequence number. + """ + try: + path_generated = Path(path_generated) + path_generated.mkdir(parents=True, exist_ok=True) + + outputs_path = ModelManager._generate_output_file_name("output", + self.config["run_type"], + self.config["timestamp"], + sequence_number) + evaluation_path = ModelManager._generate_output_file_name("evaluation", + self.config["run_type"], + self.config["timestamp"], + sequence_number) + + df_output.to_pickle(path_generated/outputs_path) + df_evaluation.to_pickle(path_generated/evaluation_path) + except Exception as e: + logger.error(f"Error saving model outputs: {e}") + + def _save_predictions( + self, + df_predictions: pd.DataFrame, + path_generated: Union[str, Path], + sequence_number: int = None + ) -> None: + """ + Save the model predictions to the specified path. + + Args: + df_predictions (pd.DataFrame): DataFrame containing model predictions. + path_generated (str or Path): The path where the predictions should be saved. + sequence_number (int): The sequence number. + """ + try: + path_generated = Path(path_generated) + path_generated.mkdir(parents=True, exist_ok=True) + + predictions_name = ModelManager._generate_output_file_name("predictions", + self.config["run_type"], + self.config["timestamp"], + sequence_number) + # logger.info(f"{sequence_number}, Saving predictions to {path_generated/predictions_name}") + df_predictions.to_pickle(path_generated/predictions_name) + except Exception as e: + logger.error(f"Error saving predictions: {e}") def execute_single_run(self, args) -> None: """ @@ -132,6 +317,7 @@ def execute_single_run(self, args) -> None: """ self.config = self._update_single_config(args) self._project = f"{self.config['name']}_{args.run_type}" + self._eval_type = args.eval_type try: with wandb.init(project=f"{self._project}_fetch", entity=self._entity): @@ -148,7 +334,7 @@ def execute_single_run(self, args) -> None: train=args.train, eval=args.evaluate, forecast=args.forecast, - artifact_name=args.artifact_name, + artifact_name=args.artifact_name ) except Exception as e: logger.error(f"Error during single run execution: {e}") @@ -162,6 +348,7 @@ def execute_sweep_run(self, args) -> None: """ self.config = self._update_sweep_config(args) self._project = f"{self.config['name']}_sweep" + self._eval_type = args.eval_type try: with wandb.init(project=f"{self._project}_fetch", entity=self._entity): @@ -208,7 +395,7 @@ def _execute_model_tasks( logger.info(f"Sweeping model {self.config['name']}...") model = self._train_model_artifact() logger.info(f"Evaluating model {self.config['name']}...") - self._evaluate_sweep(model) + self._evaluate_sweep(model, self._eval_type) if train: logger.info(f"Training model {self.config['name']}...") @@ -216,7 +403,7 @@ def _execute_model_tasks( if eval: logger.info(f"Evaluating model {self.config['name']}...") - self._evaluate_model_artifact(artifact_name) + self._evaluate_model_artifact(self._eval_type, artifact_name) if forecast: logger.info(f"Forecasting model {self.config['name']}...") @@ -237,11 +424,12 @@ def _train_model_artifact(self): pass @abstractmethod - def _evaluate_model_artifact(self, artifact_name: str): + def _evaluate_model_artifact(self, eval_type: str, artifact_name: str): """ Abstract method to evaluate the model artifact. Must be implemented by subclasses. Args: + eval_type (str): The type of evaluation to perform (e.g., standard, long, complete, live). artifact_name (str): The name of the model artifact to evaluate. """ pass @@ -257,169 +445,31 @@ def _forecast_model_artifact(self, artifact_name: str): pass @abstractmethod - def _evaluate_sweep(self, model): + def _evaluate_sweep(self, model, eval_type: str): """ Abstract method to evaluate the model during a sweep. Must be implemented by subclasses. Args: model: The model to evaluate. + eval_type (str): The type of evaluation to perform (e.g., standard, long, complete, live). """ pass - def _get_artifact_files(self, path_artifact: Path, run_type: str) -> List[Path]: - """ - Retrieve artifact files from a directory that match the given run type and common extensions. - - Args: - path_artifact (Path): The directory path where model files are stored. - run_type (str): The type of run (e.g., calibration, testing). - - Returns: - List[Path]: List of matching model file paths. - """ - common_extensions = [ - ".pt", - ".pth", - ".h5", - ".hdf5", - ".pkl", - ".json", - ".bst", - ".txt", - ".bin", - ".cbm", - ".onnx", - ] - artifact_files = [ - f - for f in path_artifact.iterdir() - if f.is_file() - and f.stem.startswith(f"{run_type}_model_") - and f.suffix in common_extensions - ] - return artifact_files - - def _get_latest_model_artifact(self, path_artifact, run_type): - """ - Retrieve the path (pathlib path object) latest model artifact for a given run type based on the modification time. - - Args: - path_artifact (Path): The model specifc directory path where artifacts are stored. - run_type (str): The type of run (e.g., calibration, testing, forecasting). - - Returns: - The path (pathlib path objsect) to the latest model artifact given the run type. - - Raises: - FileNotFoundError: If no model artifacts are found for the given run type. - """ - - # List all model files for the given specific run_type with the expected filename pattern - model_files = self._get_artifact_files(path_artifact, run_type) - - if not model_files: - raise FileNotFoundError( - f"No model artifacts found for run type '{run_type}' in path '{path_artifact}'" - ) - - # Sort the files based on the timestamp embedded in the filename. With format %Y%m%d_%H%M%S For example, '20210831_123456.pt' - model_files.sort(reverse=True) - - # print statements for debugging - logger.info(f"artifact used: {model_files[0]}") - - return path_artifact / model_files[0] - - def _save_model_outputs( - self, - df_evaluation: pd.DataFrame, - df_output: pd.DataFrame, - path_generated: Union[str, Path], - ) -> None: - """ - Save the model outputs and evaluation metrics to the specified path. - - Args: - df_evaluation (pd.DataFrame): DataFrame containing evaluation metrics. - df_output (pd.DataFrame): DataFrame containing model outputs. - path_generated (str or Path): The path where the outputs should be saved. - """ - try: - Path(path_generated).mkdir(parents=True, exist_ok=True) - - outputs_path = ModelManager._generate_output_file_name(path_generated, - "output", - self.config["steps"][-1], - self.config["run_type"], - self.config["timestamp"]) - evaluation_path = ModelManager._generate_output_file_name(path_generated, - "evaluation", - self.config["steps"][-1], - self.config["run_type"], - self.config["timestamp"]) - - df_output.to_pickle(outputs_path) - df_evaluation.to_pickle(evaluation_path) - except Exception as e: - logger.error(f"Error saving model outputs: {e}") - - def _save_predictions( - self, df_predictions: pd.DataFrame, path_generated: Union[str, Path] - ) -> None: - """ - Save the model predictions to the specified path. - - Args: - df_predictions (pd.DataFrame): DataFrame containing model predictions. - path_generated (str or Path): The path where the predictions should be saved. - """ - try: - Path(path_generated).mkdir(parents=True, exist_ok=True) - - predictions_path = ModelManager._generate_output_file_name(path_generated, - "predictions", - self.config["steps"][-1], - self.config["run_type"], - self.config["timestamp"]) - df_predictions.to_pickle(predictions_path) - except Exception as e: - logger.error(f"Error saving predictions: {e}") - - @staticmethod - def _generate_model_file_name(run_type: str, timestamp: str) -> str: - """ - Generates a model file name based on the run type, and timestamp. - - Args: - run_type (str): The type of run (e.g., calibration, testing). - timestamp (str): The timestamp of the model file. - - Returns: - str: The generated model file name. - """ - - return f"{run_type}_model_{timestamp}.pkl" - - @staticmethod - def _generate_output_file_name( - path_generated: Union[str, Path], generated_file_type, steps: int, run_type: str, timestamp: str) -> str: + @property + def configs(self) -> Dict: """ - Generates a prediction file name based on the run type, generated file type, steps, and timestamp. - - Args: - path_generated (str or Path): The path where the predictions should be saved. - generated_file_type (str): The type of generated file (e.g., predictions, output, evaluation). - steps (int): The number of steps ahead for the forecast. - run_type (str): The type of run (e.g., calibration, testing). + Get the combined meta, deployment and hyperparameters configuration. Returns: - str: The generated prediction file name. + dict: The configuration object. """ - - path_generated = Path(path_generated) - - return f'{path_generated}/{generated_file_type}_{steps}_{run_type}_{timestamp}.pkl' - + + config = { + **self._config_hyperparameters, + **self._config_meta, + **self._config_deployment, + } + return config if __name__ == "__main__": diff --git a/views_pipeline_core/managers/path_manager.py b/views_pipeline_core/managers/path_manager.py index 3f809a4..9141d39 100644 --- a/views_pipeline_core/managers/path_manager.py +++ b/views_pipeline_core/managers/path_manager.py @@ -372,10 +372,8 @@ def _initialize_directories(self) -> None: self.data_generated = self._build_absolute_directory(Path("data/generated")) self.data_processed = self._build_absolute_directory(Path("data/processed")) self.reports = self._build_absolute_directory(Path("reports")) - self._sys_paths = None - self.queryset_path = self._build_absolute_directory(Path("configs/config_queryset.py")) + self._sys_paths = None self._queryset = None - print(self.queryset_path) # Initialize model-specific directories only if the class is ModelPath if self.__class__.__name__ == "ModelPath": self._initialize_model_specific_directories() @@ -396,7 +394,6 @@ def _initialize_scripts(self) -> None: self._build_absolute_directory(Path("configs/config_meta.py")), self._build_absolute_directory(Path("main.py")), self._build_absolute_directory(Path("README.md")), - self._build_absolute_directory(Path("configs/config_queryset.py")) ] # Initialize model-specific directories only if the class is ModelPath if self.__class__.__name__ == "ModelPath": @@ -410,16 +407,12 @@ def _initialize_model_specific_scripts(self) -> None: Returns: None """ + + self.queryset_path = self._build_absolute_directory(Path("configs/config_queryset.py")) self.scripts += [ - # self._build_absolute_directory(Path("configs/config_sweep.py")), - # self._build_absolute_directory(Path("src/dataloaders/get_data.py")), - # self._build_absolute_directory( - # Path("src/offline_evaluation/evaluate_model.py") - # ), - # self._build_absolute_directory( - # Path(f"src/training/train_{self.target}.py") - # ), - self.queryset_path + self.queryset_path, + self._build_absolute_directory(Path("configs/config_queryset.py")), + self._build_absolute_directory(Path("configs/config_sweep.py")) ] def _is_path(self, path_input: Union[str, Path]) -> bool: @@ -647,9 +640,9 @@ class EnsemblePath(ModelPath): _target = "ensemble" @classmethod - def _initialize_class_paths(cls): + def _initialize_class_paths(cls, current_path: Path = None) -> None: """Initialize class-level paths for ensemble.""" - super()._initialize_class_paths() + super()._initialize_class_paths(current_path=current_path) cls._models = cls._root / Path(cls._target + "s") # Additional ensemble-specific initialization... @@ -665,7 +658,7 @@ def __init__( """ super().__init__(ensemble_name_or_path, validate) # Additional ensemble-specific initialization... - print(self._validate) + # print(self._validate) def _initialize_directories(self) -> None: """ @@ -676,17 +669,17 @@ def _initialize_directories(self) -> None: # Call the parent class's _initialize_directories method super()._initialize_directories() # Initialize ensemble-specific directories only if the class is EnsemblePath - if self.__class__.__name__ == "EnsemblePath": - self._initialize_ensemble_specific_directories() - - def _initialize_ensemble_specific_directories(self): - self.reports_figures = self._build_absolute_directory(Path("reports/figures")) - self.reports_papers = self._build_absolute_directory(Path("reports/papers")) - self.reports_plots = self._build_absolute_directory(Path("reports/plots")) - self.reports_slides = self._build_absolute_directory(Path("reports/slides")) - self.reports_timelapse = self._build_absolute_directory( - Path("reports/timelapse") - ) + # if self.__class__.__name__ == "EnsemblePath": + # self._initialize_ensemble_specific_directories() + + # def _initialize_ensemble_specific_directories(self): + # self.reports_figures = self._build_absolute_directory(Path("reports/figures")) + # self.reports_papers = self._build_absolute_directory(Path("reports/papers")) + # self.reports_plots = self._build_absolute_directory(Path("reports/plots")) + # self.reports_slides = self._build_absolute_directory(Path("reports/slides")) + # self.reports_timelapse = self._build_absolute_directory( + # Path("reports/timelapse") + # ) def _initialize_scripts(self) -> None: """ @@ -696,26 +689,26 @@ def _initialize_scripts(self) -> None: """ super()._initialize_scripts() # Initialize ensemble-specific scripts only if the class is EnsemblePath - if self.__class__.__name__ == "EnsemblePath": - self._initialize_ensemble_specific_scripts() - - def _initialize_ensemble_specific_scripts(self): - """ - Initializes the ensemble-specific scripts by appending their absolute paths - to the `self.scripts` list. - - The paths are built using the `_build_absolute_directory` method. - - Returns: - None - """ - self.scripts += [ - self._build_absolute_directory(Path("artifacts/model_metadata_dict.py")), - self._build_absolute_directory( - Path("src/offline_evaluation/evaluate_ensemble.py") - ), - self._build_absolute_directory(Path("src/training/train_ensemble.py")), - self._build_absolute_directory(Path("src/utils/utils_check.py")), - self._build_absolute_directory(Path("src/utils/utils_run.py")), - self._build_absolute_directory(Path("src/visualization/visual.py")), - ] + # if self.__class__.__name__ == "EnsemblePath": + # self._initialize_ensemble_specific_scripts() + + # def _initialize_ensemble_specific_scripts(self): + # """ + # Initializes the ensemble-specific scripts by appending their absolute paths + # to the `self.scripts` list. + + # The paths are built using the `_build_absolute_directory` method. + + # Returns: + # None + # """ + # self.scripts += [ + # self._build_absolute_directory(Path("artifacts/model_metadata_dict.py")), + # self._build_absolute_directory( + # Path("src/offline_evaluation/evaluate_ensemble.py") + # ), + # self._build_absolute_directory(Path("src/training/train_ensemble.py")), + # self._build_absolute_directory(Path("src/utils/utils_check.py")), + # self._build_absolute_directory(Path("src/utils/utils_run.py")), + # self._build_absolute_directory(Path("src/visualization/visual.py")), + # ] diff --git a/views_pipeline_core/models/check.py b/views_pipeline_core/models/check.py new file mode 100644 index 0000000..2c2a4e7 --- /dev/null +++ b/views_pipeline_core/models/check.py @@ -0,0 +1,122 @@ +from datetime import datetime +import logging +from pathlib import Path +from views_pipeline_core.managers.path_manager import ModelPath +from views_pipeline_core.files.utils import read_log_file + +logger = logging.getLogger(__name__) + + +def check_model_conditions(path_generated, run_type): + """ + Checks if the single model meets the required conditions based on the log file. + + Args: + - model_folder (str): The path to the model-specific folder containing the log file. + - config (dict): The configuration dictionary containing the model details. + + Returns: + - bool: True if all conditions are met, False otherwise. + """ + + log_file_path = Path(path_generated) / f"{run_type}_log.txt" + try: + log_data = read_log_file(log_file_path) + except Exception as e: + logger.error(f"Error reading log file: {e}") + return False + + current_time = datetime.now() + current_year = current_time.year + current_month = current_time.month + + # Extract from log data + model_name = log_data["Single Model Name"] + model_timestamp = datetime.strptime(log_data["Single Model Timestamp"], "%Y%m%d_%H%M%S") + data_generation_timestamp = None if log_data["Data Generation Timestamp"] == "None" else ( + datetime.strptime(log_data["Data Generation Timestamp"], "%Y%m%d_%H%M%S")) + + data_fetch_timestamp = None if log_data["Data Fetch Timestamp"] == "None" else ( + datetime.strptime(log_data["Data Fetch Timestamp"], "%Y%m%d_%H%M%S")) + + # Condition 1: Model trained in the current year after July + if current_month >= 7: + if not (model_timestamp.year == current_year and model_timestamp.month >= 7): + logger.error(f"Model {model_name} was trained in {model_timestamp.year}_{model_timestamp.month}. " + f"Please use the latest model that is trained after {current_year}_07. Exiting.") + return False + elif current_month < 7: + if not ( + (model_timestamp.year == current_year - 1 and model_timestamp.month >= 7) or + (model_timestamp.year == current_year and model_timestamp.month < 7) + ): + logger.error(f"Model {model_name} was trained in {model_timestamp.year}_{model_timestamp.month}. " + f"Please use the latest model that is trained after {current_year - 1}_07. Exiting.") + return False + + # Condition 2: Data generated in the current month + if data_generation_timestamp and not ( + data_generation_timestamp.year == current_year and data_generation_timestamp.month == current_month): + logger.error(f"Data for model {model_name} was not generated in the current month. Exiting.") + return False + + # Condition 3: Raw data fetched in the current month + if data_fetch_timestamp and not ( + data_fetch_timestamp.year == current_year and data_fetch_timestamp.month == current_month): + logger.error(f"Raw data for model {model_name} was not fetched in the current month. Exiting.") + return False + + return True + + +def check_model_deployment_status(path_generated, run_type, deployment_status): + """ + Checks if the ensemble model meets the required deployment status conditions based on the log file. + + Args: + - model_folder (str): The path to the model-specific folder containing the log file. + + Returns: + - bool: True if all conditions are met, False otherwise. + """ + + log_file_path = Path(path_generated) / f"{run_type}_log.txt" + try: + log_data = read_log_file(log_file_path) + except Exception as e: + logger.error(f"Error reading log file: {e}. Exiting.") + return False + + model_name = log_data["Single Model Name"] + model_dp_status = log_data["Deployment Status"] + + # More check conditions can be added here + if model_dp_status == "Deployed" and deployment_status != "Deployed": + logger.error(f"Model {model_name} deployment status is deployed " + f"but the ensemble is not. Exiting.") + return False + + return True + + +def ensemble_model_check(config): + """ + Performs the ensemble model check based on the log files of individual models. + + Args: + - model_folders (list of str): A list of paths to model-specific folders containing log files. + + Returns: + - None: Shuts down if conditions are not met; proceeds otherwise. + """ + + for model_name in config["models"]: + model_path = ModelPath(model_name) + path_generated = model_path.data_generated + + if ( + (not check_model_conditions(path_generated, config["run_type"])) or + (not check_model_deployment_status(path_generated, config["run_type"], config["deployment_status"])) + ): + exit(1) # Shut down if conditions are not met + logger.info(f"Model {config['name']} meets the required conditions.") \ No newline at end of file