Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log datafetch timestamp #115

Merged
merged 6 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,6 @@ venv.bak/
# Global cache
.global_cache.pkl

# Generated calibration logs
*calibration_log.txt

# mypy
.mypy_cache/
.dmypy.json
Expand Down Expand Up @@ -214,6 +211,4 @@ cython_debug/
*.bak

# txt logs
calibration_log.txt
testing_log.txt
forecasting_log.txt
*.txt
8 changes: 8 additions & 0 deletions common_utils/utils_cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,11 @@ def validate_arguments(args):
"Error: --aggregation flag cannot be used with --sweep. Exiting."
)
sys.exit(1)

if not args.train and not args.saved:
# if not training, then we need to use saved data
print(
"Error: if --train is not set, you should only use --saved flag. Exiting."
)
print("To fix: Add --train or --saved flag.")
sys.exit(1)
8 changes: 7 additions & 1 deletion common_utils/utils_dataloaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import pandas as pd
import sys
import logging
from datetime import datetime

from set_partition import get_partitioner_dict
from common_configs import config_drift_detection
from utils_df_to_vol_conversion import df_to_vol
from utils_log_files import create_data_fetch_log_file
from viewser import Queryset, Column

import logging
Expand All @@ -26,7 +28,7 @@ def fetch_data_from_viewser(model_name, month_first, month_last, drift_config_di
pd.DataFrame: The prepared DataFrame with initial processing done.
"""
logger.info(f'Beginning file download through viewser with month range {month_first},{month_last}')
model_path = ModelPath(model_name, validate=True)
model_path = ModelPath(model_name)
queryset_base = model_path.get_queryset() # just used here..
if queryset_base is None:
raise RuntimeError(f'Could not find queryset for {model_path.model_name} in common_querysets')
Expand Down Expand Up @@ -224,6 +226,10 @@ def fetch_or_load_views_df(model_name, partition, PATH_RAW, self_test=False, use
else:
logger.info(f'Fetching data...')
df, alerts = get_views_df(model_name, partition, override_month, self_test) # which is then used here

data_fetch_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
create_data_fetch_log_file(PATH_RAW, partition, model_name, data_fetch_timestamp)

logger.info(f'Saving data to {path_viewser_df}')
df.to_pickle(path_viewser_df)

Expand Down
43 changes: 33 additions & 10 deletions common_utils/utils_log_files.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
from pathlib import Path
from set_path import setup_data_paths
from model_path import ModelPath

logger = logging.getLogger(__name__)
Expand All @@ -11,11 +10,12 @@ def read_log_file(log_file_path):
Reads the log file and returns a dictionary with the relevant information.

Args:
- log_file_path (str): The path to the log file.
- log_file_path (str or Path): The path to the log file.

Returns:
- dict: A dictionary containing the model name, model timestamp, data generation timestamp, and data fetch timestamp.
"""

log_data = {}
with open(log_file_path, "r") as file:
for line in file:
Expand All @@ -28,20 +28,44 @@ def read_log_file(log_file_path):
key, value = line.split(": ", 1)
# There are duplicated keys for ensemble models, but it's not a problem bc these keys are not used
log_data[key] = value

return log_data


def create_data_fetch_log_file(path_raw,
run_type,
model_name,
data_fetch_timestamp):
"""
Creates a log file in the specified single model folder with details about the data fetch.

Args:
- path_raw (Path): The path to the folder where the log file will be created.
- run_type (str): The type of run.
- model_name (str): The name of the model.
- data_fetch_timestamp (str): The timestamp when the raw data used was fetched from VIEWS.
"""

data_fetch_log_file_path = f"{path_raw}/{run_type}_data_fetch_log.txt"

with open(data_fetch_log_file_path, "w") as log_file:
log_file.write(f"Single Model Name: {model_name}\n")
log_file.write(f"Data Fetch Timestamp: {data_fetch_timestamp}\n\n")

logger.info(f"Data fetch log file created at {data_fetch_log_file_path}")


def create_specific_log_file(path_generated,
run_type,
model_name,
deployment_status,
model_timestamp,
data_generation_timestamp=None,
data_fetch_timestamp=None,
data_generation_timestamp,
data_fetch_timestamp,
model_type="single",
mode="w",):
"""
Creates a log file in the specified model-specific folder with details about the generated data.
Creates a log file in the specified model folder with details about the generated data.

Args:
- path_generated (Path): The path to the folder where the log file will be created.
Expand All @@ -55,7 +79,6 @@ def create_specific_log_file(path_generated,
- mode (str, optional): The mode in which the file will be opened. Default is "w".
"""

Path(path_generated).mkdir(parents=True, exist_ok=True)
log_file_path = f"{path_generated}/{run_type}_log.txt"

# Capitalize the first letter of the model type
Expand All @@ -72,8 +95,8 @@ def create_specific_log_file(path_generated,
def create_log_file(path_generated,
model_config,
model_timestamp,
data_generation_timestamp=None,
data_fetch_timestamp=None,
data_generation_timestamp,
data_fetch_timestamp,
model_type="single",
models=None):

Expand All @@ -85,8 +108,8 @@ def create_log_file(path_generated,
model_timestamp, data_generation_timestamp, data_fetch_timestamp, model_type)
if models:
for m_name in models:
model_path = ModelPath(m_name, validate=False).model_dir
_, _, model_path_generated = setup_data_paths(model_path)
model_path = ModelPath(m_name)
model_path_generated = model_path.data_generated
log_data = read_log_file(model_path_generated / f"{run_type}_log.txt")
create_specific_log_file(path_generated, run_type, m_name, log_data["Deployment Status"],
log_data["Single Model Timestamp"], log_data["Data Generation Timestamp"], log_data["Data Fetch Timestamp"], mode="a")
Expand Down
13 changes: 7 additions & 6 deletions ensembles/cruel_summer/src/forecasting/generate_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from model_path import ModelPath
from ensemble_path import EnsemblePath
from set_partition import get_partitioner_dict
from utils_log_files import create_log_file
from utils_log_files import create_log_file, read_log_file
from utils_outputs import save_predictions
from utils_run import get_standardized_df, get_aggregated_df, get_single_model_config
from utils_artifacts import get_latest_model_artifact
Expand All @@ -15,7 +15,7 @@


def forecast_ensemble(config):
ensemble_path = EnsemblePath(config["name"], validate=False)
ensemble_path = EnsemblePath(config["name"])
path_generated_e = ensemble_path.data_generated
run_type = config["run_type"]
steps = config["steps"]
Expand All @@ -25,7 +25,7 @@ def forecast_ensemble(config):
for model_name in config["models"]:
logger.info(f"Forecasting single model {model_name}...")

model_path = ModelPath(model_name, validate=False)
model_path = ModelPath(model_name)
path_raw = model_path.data_raw
path_generated = model_path.data_generated
path_artifacts = model_path.artifacts
Expand Down Expand Up @@ -56,19 +56,20 @@ def forecast_ensemble(config):
df = get_standardized_df(df, model_config)

data_generation_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
date_fetch_timestamp = read_log_file(path_raw / f"{run_type}_data_fetch_log.txt").get("Data Fetch Timestamp", None)
save_predictions(df, path_generated, model_config)
create_log_file(path_generated, model_config, ts, data_generation_timestamp)
create_log_file(path_generated, model_config, ts, data_generation_timestamp, date_fetch_timestamp)

dfs.append(df)

df_prediction = get_aggregated_df(dfs, config["aggregation"])
data_generation_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

# I don"t think current timestamp is useful here because timestamp of single models is more important.
# Timestamp of single models is more important than ensemble model timestamp
config["timestamp"] = timestamp[:-1]
save_predictions(df_prediction, path_generated_e, config)

# How to define an ensemble model timestamp? Currently set as data_generation_timestamp.
create_log_file(path_generated_e, config, data_generation_timestamp, data_generation_timestamp,
create_log_file(path_generated_e, config, data_generation_timestamp, data_generation_timestamp, date_fetch_timestamp=None,
model_type="ensemble", models=config["models"])

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import Path
from model_path import ModelPath
from ensemble_path import EnsemblePath
from utils_log_files import create_log_file
from utils_log_files import create_log_file, read_log_file
from utils_outputs import save_model_outputs, save_predictions
from utils_run import get_standardized_df, get_aggregated_df, get_single_model_config
from utils_evaluation_metrics import generate_metric_dict
Expand All @@ -16,7 +16,7 @@


def evaluate_ensemble(config):
ensemble_path = EnsemblePath(config["name"], validate=False)
ensemble_path = EnsemblePath(config["name"])
path_generated_e = ensemble_path.data_generated
run_type = config["run_type"]
steps = config["steps"]
Expand All @@ -26,7 +26,7 @@ def evaluate_ensemble(config):
for model_name in config["models"]:
logger.info(f"Evaluating single model {model_name}...")

model_path = ModelPath(model_name, validate=False)
model_path = ModelPath(model_name)
path_raw = model_path.data_raw
path_generated = model_path.data_generated
path_artifacts = model_path.artifacts
Expand Down Expand Up @@ -55,28 +55,29 @@ def evaluate_ensemble(config):
df = stepshift_model.predict(run_type, "predict", df_viewser)
df = get_standardized_df(df, model_config)
data_generation_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
date_fetch_timestamp = read_log_file(path_raw / f"{run_type}_data_fetch_log.txt").get("Data Fetch Timestamp", None)

_, df_output = generate_output_dict(df, model_config)
evaluation, df_evaluation = generate_metric_dict(df, model_config)
save_model_outputs(df_evaluation, df_output, path_generated, model_config)
save_predictions(df, path_generated, model_config)
create_log_file(path_generated, model_config, ts, data_generation_timestamp)
create_log_file(path_generated, model_config, ts, data_generation_timestamp, date_fetch_timestamp)

dfs.append(df)

df_agg = get_aggregated_df(dfs, config["aggregation"])
data_generation_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")


_, df_output = generate_output_dict(df_agg, config)
evaluation, df_evaluation = generate_metric_dict(df_agg, config)
log_wandb_log_dict(config, evaluation)

# I don"t think current timestamp is useful here.
# Timestamp of single models is more important but how should we register them in ensemble config?
# Timestamp of single models is more important than ensemble model timestamp
config["timestamp"] = timestamp[:-1]
save_model_outputs(df_evaluation, df_output, path_generated_e, config)
save_predictions(df_agg, path_generated_e, config)

# How to define an ensemble model timestamp? Currently set as data_generation_timestamp.
create_log_file(path_generated_e, config, data_generation_timestamp, data_generation_timestamp,
create_log_file(path_generated_e, config, data_generation_timestamp, data_generation_timestamp, data_fetch_timestamp=None,
model_type="ensemble", models=config["models"])
7 changes: 4 additions & 3 deletions ensembles/cruel_summer/src/training/train_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from datetime import datetime
from model_path import ModelPath
from set_partition import get_partitioner_dict
from utils_log_files import create_log_file
from utils_log_files import create_log_file, read_log_file
from utils_run import get_model, get_single_model_config
from views_stepshift.run import ViewsRun
from stepshift.views import StepshiftedModels
Expand All @@ -19,7 +19,7 @@ def train_ensemble(config):
for model_name in config["models"]:
logger.info(f"Training single model {model_name}...")

model_path = ModelPath(model_name, validate=False)
model_path = ModelPath(model_name)
path_raw = model_path.data_raw
path_generated = model_path.data_generated
path_artifacts = model_path.artifacts
Expand All @@ -32,7 +32,8 @@ def train_ensemble(config):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
model_filename = f"{run_type}_model_{timestamp}.pkl"
stepshift_model.save(path_artifacts / model_filename)
create_log_file(path_generated, model_config, timestamp)
date_fetch_timestamp = read_log_file(path_raw / f"{run_type}_data_fetch_log.txt").get("Data Fetch Timestamp", None)
create_log_file(path_generated, config, timestamp, None, date_fetch_timestamp)


def stepshift_training(config, partition_name, model, dataset):
Expand Down
2 changes: 1 addition & 1 deletion ensembles/cruel_summer/src/utils/utils_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def ensemble_model_check(config):
"""

for model_name in config["models"]:
model_path = ModelPath(model_name, validate=False)
model_path = ModelPath(model_name)
path_generated = model_path.data_generated

if (
Expand Down
2 changes: 1 addition & 1 deletion ensembles/cruel_summer/src/utils/utils_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def update_config(hp_config, meta_config, dp_config, args):


def get_single_model_config(model_name):
model_path = ModelPath(model_name, validate=False)
model_path = ModelPath(model_name)
hp_config = runpy.run_path(model_path.configs / "config_hyperparameters.py")["get_hp_config"]()
meta_config = runpy.run_path(model_path.configs / "config_meta.py")["get_meta_config"]()
dp_config = runpy.run_path(model_path.configs / "config_deployment.py")["get_deployment_config"]()
Expand Down
13 changes: 7 additions & 6 deletions ensembles/white_mustang/src/forecasting/generate_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from model_path import ModelPath
from ensemble_path import EnsemblePath
from set_partition import get_partitioner_dict
from utils_log_files import create_log_file
from utils_log_files import create_log_file, read_log_file
from utils_outputs import save_predictions
from utils_run import get_standardized_df, get_aggregated_df, get_single_model_config
from utils_artifacts import get_latest_model_artifact
Expand All @@ -15,7 +15,7 @@


def forecast_ensemble(config):
ensemble_path = EnsemblePath(config["name"], validate=False)
ensemble_path = EnsemblePath(config["name"])
path_generated_e = ensemble_path.data_generated
run_type = config["run_type"]
steps = config["steps"]
Expand All @@ -25,7 +25,7 @@ def forecast_ensemble(config):
for model_name in config["models"]:
logger.info(f"Forecasting single model {model_name}...")

model_path = ModelPath(model_name, validate=False)
model_path = ModelPath(model_name)
path_raw = model_path.data_raw
path_generated = model_path.data_generated
path_artifacts = model_path.artifacts
Expand Down Expand Up @@ -56,19 +56,20 @@ def forecast_ensemble(config):
df = get_standardized_df(df, model_config)

data_generation_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
date_fetch_timestamp = read_log_file(path_raw / f"{run_type}_data_fetch_log.txt").get("Data Fetch Timestamp", None)
save_predictions(df, path_generated, model_config)
create_log_file(path_generated, model_config, ts, data_generation_timestamp)
create_log_file(path_generated, model_config, ts, data_generation_timestamp, date_fetch_timestamp)

dfs.append(df)

df_prediction = get_aggregated_df(dfs, config["aggregation"])
data_generation_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

# I don"t think current timestamp is useful here because timestamp of single models is more important.
# Timestamp of single models is more important than ensemble model timestamp
config["timestamp"] = timestamp[:-1]
save_predictions(df_prediction, path_generated_e, config)

# How to define an ensemble model timestamp? Currently set as data_generation_timestamp.
create_log_file(path_generated_e, config, data_generation_timestamp, data_generation_timestamp,
create_log_file(path_generated_e, config, data_generation_timestamp, data_generation_timestamp, date_fetch_timestamp=None,
model_type="ensemble", models=config["models"])

Loading
Loading