Skip to content

Commit

Permalink
Merge pull request #115 from prio-data/log_datafetch_timestamp
Browse files Browse the repository at this point in the history
Log datafetch timestamp
  • Loading branch information
Polichinel authored Oct 30, 2024
2 parents b9fb7e4 + 48fc9cf commit 8bedc53
Show file tree
Hide file tree
Showing 49 changed files with 189 additions and 130 deletions.
7 changes: 1 addition & 6 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,6 @@ venv.bak/
# Global cache
.global_cache.pkl

# Generated calibration logs
*calibration_log.txt

# mypy
.mypy_cache/
.dmypy.json
Expand Down Expand Up @@ -214,6 +211,4 @@ cython_debug/
*.bak

# txt logs
calibration_log.txt
testing_log.txt
forecasting_log.txt
*.txt
8 changes: 8 additions & 0 deletions common_utils/utils_cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,11 @@ def validate_arguments(args):
"Error: --aggregation flag cannot be used with --sweep. Exiting."
)
sys.exit(1)

if not args.train and not args.saved:
# if not training, then we need to use saved data
print(
"Error: if --train is not set, you should only use --saved flag. Exiting."
)
print("To fix: Add --train or --saved flag.")
sys.exit(1)
8 changes: 7 additions & 1 deletion common_utils/utils_dataloaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import pandas as pd
import sys
import logging
from datetime import datetime

from set_partition import get_partitioner_dict
from common_configs import config_drift_detection
from utils_df_to_vol_conversion import df_to_vol
from utils_log_files import create_data_fetch_log_file
from viewser import Queryset, Column

import logging
Expand All @@ -26,7 +28,7 @@ def fetch_data_from_viewser(model_name, month_first, month_last, drift_config_di
pd.DataFrame: The prepared DataFrame with initial processing done.
"""
logger.info(f'Beginning file download through viewser with month range {month_first},{month_last}')
model_path = ModelPath(model_name, validate=True)
model_path = ModelPath(model_name)
queryset_base = model_path.get_queryset() # just used here..
if queryset_base is None:
raise RuntimeError(f'Could not find queryset for {model_path.model_name} in common_querysets')
Expand Down Expand Up @@ -224,6 +226,10 @@ def fetch_or_load_views_df(model_name, partition, PATH_RAW, self_test=False, use
else:
logger.info(f'Fetching data...')
df, alerts = get_views_df(model_name, partition, override_month, self_test) # which is then used here

data_fetch_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
create_data_fetch_log_file(PATH_RAW, partition, model_name, data_fetch_timestamp)

logger.info(f'Saving data to {path_viewser_df}')
df.to_pickle(path_viewser_df)

Expand Down
43 changes: 33 additions & 10 deletions common_utils/utils_log_files.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
from pathlib import Path
from set_path import setup_data_paths
from model_path import ModelPath

logger = logging.getLogger(__name__)
Expand All @@ -11,11 +10,12 @@ def read_log_file(log_file_path):
Reads the log file and returns a dictionary with the relevant information.
Args:
- log_file_path (str): The path to the log file.
- log_file_path (str or Path): The path to the log file.
Returns:
- dict: A dictionary containing the model name, model timestamp, data generation timestamp, and data fetch timestamp.
"""

log_data = {}
with open(log_file_path, "r") as file:
for line in file:
Expand All @@ -28,20 +28,44 @@ def read_log_file(log_file_path):
key, value = line.split(": ", 1)
# There are duplicated keys for ensemble models, but it's not a problem bc these keys are not used
log_data[key] = value

return log_data


def create_data_fetch_log_file(path_raw,
run_type,
model_name,
data_fetch_timestamp):
"""
Creates a log file in the specified single model folder with details about the data fetch.
Args:
- path_raw (Path): The path to the folder where the log file will be created.
- run_type (str): The type of run.
- model_name (str): The name of the model.
- data_fetch_timestamp (str): The timestamp when the raw data used was fetched from VIEWS.
"""

data_fetch_log_file_path = f"{path_raw}/{run_type}_data_fetch_log.txt"

with open(data_fetch_log_file_path, "w") as log_file:
log_file.write(f"Single Model Name: {model_name}\n")
log_file.write(f"Data Fetch Timestamp: {data_fetch_timestamp}\n\n")

logger.info(f"Data fetch log file created at {data_fetch_log_file_path}")


def create_specific_log_file(path_generated,
run_type,
model_name,
deployment_status,
model_timestamp,
data_generation_timestamp=None,
data_fetch_timestamp=None,
data_generation_timestamp,
data_fetch_timestamp,
model_type="single",
mode="w",):
"""
Creates a log file in the specified model-specific folder with details about the generated data.
Creates a log file in the specified model folder with details about the generated data.
Args:
- path_generated (Path): The path to the folder where the log file will be created.
Expand All @@ -55,7 +79,6 @@ def create_specific_log_file(path_generated,
- mode (str, optional): The mode in which the file will be opened. Default is "w".
"""

Path(path_generated).mkdir(parents=True, exist_ok=True)
log_file_path = f"{path_generated}/{run_type}_log.txt"

# Capitalize the first letter of the model type
Expand All @@ -72,8 +95,8 @@ def create_specific_log_file(path_generated,
def create_log_file(path_generated,
model_config,
model_timestamp,
data_generation_timestamp=None,
data_fetch_timestamp=None,
data_generation_timestamp,
data_fetch_timestamp,
model_type="single",
models=None):

Expand All @@ -85,8 +108,8 @@ def create_log_file(path_generated,
model_timestamp, data_generation_timestamp, data_fetch_timestamp, model_type)
if models:
for m_name in models:
model_path = ModelPath(m_name, validate=False).model_dir
_, _, model_path_generated = setup_data_paths(model_path)
model_path = ModelPath(m_name)
model_path_generated = model_path.data_generated
log_data = read_log_file(model_path_generated / f"{run_type}_log.txt")
create_specific_log_file(path_generated, run_type, m_name, log_data["Deployment Status"],
log_data["Single Model Timestamp"], log_data["Data Generation Timestamp"], log_data["Data Fetch Timestamp"], mode="a")
Expand Down
13 changes: 7 additions & 6 deletions ensembles/cruel_summer/src/forecasting/generate_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from model_path import ModelPath
from ensemble_path import EnsemblePath
from set_partition import get_partitioner_dict
from utils_log_files import create_log_file
from utils_log_files import create_log_file, read_log_file
from utils_outputs import save_predictions
from utils_run import get_standardized_df, get_aggregated_df, get_single_model_config
from utils_artifacts import get_latest_model_artifact
Expand All @@ -15,7 +15,7 @@


def forecast_ensemble(config):
ensemble_path = EnsemblePath(config["name"], validate=False)
ensemble_path = EnsemblePath(config["name"])
path_generated_e = ensemble_path.data_generated
run_type = config["run_type"]
steps = config["steps"]
Expand All @@ -25,7 +25,7 @@ def forecast_ensemble(config):
for model_name in config["models"]:
logger.info(f"Forecasting single model {model_name}...")

model_path = ModelPath(model_name, validate=False)
model_path = ModelPath(model_name)
path_raw = model_path.data_raw
path_generated = model_path.data_generated
path_artifacts = model_path.artifacts
Expand Down Expand Up @@ -56,19 +56,20 @@ def forecast_ensemble(config):
df = get_standardized_df(df, model_config)

data_generation_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
date_fetch_timestamp = read_log_file(path_raw / f"{run_type}_data_fetch_log.txt").get("Data Fetch Timestamp", None)
save_predictions(df, path_generated, model_config)
create_log_file(path_generated, model_config, ts, data_generation_timestamp)
create_log_file(path_generated, model_config, ts, data_generation_timestamp, date_fetch_timestamp)

dfs.append(df)

df_prediction = get_aggregated_df(dfs, config["aggregation"])
data_generation_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

# I don"t think current timestamp is useful here because timestamp of single models is more important.
# Timestamp of single models is more important than ensemble model timestamp
config["timestamp"] = timestamp[:-1]
save_predictions(df_prediction, path_generated_e, config)

# How to define an ensemble model timestamp? Currently set as data_generation_timestamp.
create_log_file(path_generated_e, config, data_generation_timestamp, data_generation_timestamp,
create_log_file(path_generated_e, config, data_generation_timestamp, data_generation_timestamp, date_fetch_timestamp=None,
model_type="ensemble", models=config["models"])

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import Path
from model_path import ModelPath
from ensemble_path import EnsemblePath
from utils_log_files import create_log_file
from utils_log_files import create_log_file, read_log_file
from utils_outputs import save_model_outputs, save_predictions
from utils_run import get_standardized_df, get_aggregated_df, get_single_model_config
from utils_evaluation_metrics import generate_metric_dict
Expand All @@ -16,7 +16,7 @@


def evaluate_ensemble(config):
ensemble_path = EnsemblePath(config["name"], validate=False)
ensemble_path = EnsemblePath(config["name"])
path_generated_e = ensemble_path.data_generated
run_type = config["run_type"]
steps = config["steps"]
Expand All @@ -26,7 +26,7 @@ def evaluate_ensemble(config):
for model_name in config["models"]:
logger.info(f"Evaluating single model {model_name}...")

model_path = ModelPath(model_name, validate=False)
model_path = ModelPath(model_name)
path_raw = model_path.data_raw
path_generated = model_path.data_generated
path_artifacts = model_path.artifacts
Expand Down Expand Up @@ -55,28 +55,29 @@ def evaluate_ensemble(config):
df = stepshift_model.predict(run_type, "predict", df_viewser)
df = get_standardized_df(df, model_config)
data_generation_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
date_fetch_timestamp = read_log_file(path_raw / f"{run_type}_data_fetch_log.txt").get("Data Fetch Timestamp", None)

_, df_output = generate_output_dict(df, model_config)
evaluation, df_evaluation = generate_metric_dict(df, model_config)
save_model_outputs(df_evaluation, df_output, path_generated, model_config)
save_predictions(df, path_generated, model_config)
create_log_file(path_generated, model_config, ts, data_generation_timestamp)
create_log_file(path_generated, model_config, ts, data_generation_timestamp, date_fetch_timestamp)

dfs.append(df)

df_agg = get_aggregated_df(dfs, config["aggregation"])
data_generation_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")


_, df_output = generate_output_dict(df_agg, config)
evaluation, df_evaluation = generate_metric_dict(df_agg, config)
log_wandb_log_dict(config, evaluation)

# I don"t think current timestamp is useful here.
# Timestamp of single models is more important but how should we register them in ensemble config?
# Timestamp of single models is more important than ensemble model timestamp
config["timestamp"] = timestamp[:-1]
save_model_outputs(df_evaluation, df_output, path_generated_e, config)
save_predictions(df_agg, path_generated_e, config)

# How to define an ensemble model timestamp? Currently set as data_generation_timestamp.
create_log_file(path_generated_e, config, data_generation_timestamp, data_generation_timestamp,
create_log_file(path_generated_e, config, data_generation_timestamp, data_generation_timestamp, data_fetch_timestamp=None,
model_type="ensemble", models=config["models"])
7 changes: 4 additions & 3 deletions ensembles/cruel_summer/src/training/train_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from datetime import datetime
from model_path import ModelPath
from set_partition import get_partitioner_dict
from utils_log_files import create_log_file
from utils_log_files import create_log_file, read_log_file
from utils_run import get_model, get_single_model_config
from views_stepshift.run import ViewsRun
from stepshift.views import StepshiftedModels
Expand All @@ -19,7 +19,7 @@ def train_ensemble(config):
for model_name in config["models"]:
logger.info(f"Training single model {model_name}...")

model_path = ModelPath(model_name, validate=False)
model_path = ModelPath(model_name)
path_raw = model_path.data_raw
path_generated = model_path.data_generated
path_artifacts = model_path.artifacts
Expand All @@ -32,7 +32,8 @@ def train_ensemble(config):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
model_filename = f"{run_type}_model_{timestamp}.pkl"
stepshift_model.save(path_artifacts / model_filename)
create_log_file(path_generated, model_config, timestamp)
date_fetch_timestamp = read_log_file(path_raw / f"{run_type}_data_fetch_log.txt").get("Data Fetch Timestamp", None)
create_log_file(path_generated, config, timestamp, None, date_fetch_timestamp)


def stepshift_training(config, partition_name, model, dataset):
Expand Down
2 changes: 1 addition & 1 deletion ensembles/cruel_summer/src/utils/utils_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def ensemble_model_check(config):
"""

for model_name in config["models"]:
model_path = ModelPath(model_name, validate=False)
model_path = ModelPath(model_name)
path_generated = model_path.data_generated

if (
Expand Down
2 changes: 1 addition & 1 deletion ensembles/cruel_summer/src/utils/utils_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def update_config(hp_config, meta_config, dp_config, args):


def get_single_model_config(model_name):
model_path = ModelPath(model_name, validate=False)
model_path = ModelPath(model_name)
hp_config = runpy.run_path(model_path.configs / "config_hyperparameters.py")["get_hp_config"]()
meta_config = runpy.run_path(model_path.configs / "config_meta.py")["get_meta_config"]()
dp_config = runpy.run_path(model_path.configs / "config_deployment.py")["get_deployment_config"]()
Expand Down
13 changes: 7 additions & 6 deletions ensembles/white_mustang/src/forecasting/generate_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from model_path import ModelPath
from ensemble_path import EnsemblePath
from set_partition import get_partitioner_dict
from utils_log_files import create_log_file
from utils_log_files import create_log_file, read_log_file
from utils_outputs import save_predictions
from utils_run import get_standardized_df, get_aggregated_df, get_single_model_config
from utils_artifacts import get_latest_model_artifact
Expand All @@ -15,7 +15,7 @@


def forecast_ensemble(config):
ensemble_path = EnsemblePath(config["name"], validate=False)
ensemble_path = EnsemblePath(config["name"])
path_generated_e = ensemble_path.data_generated
run_type = config["run_type"]
steps = config["steps"]
Expand All @@ -25,7 +25,7 @@ def forecast_ensemble(config):
for model_name in config["models"]:
logger.info(f"Forecasting single model {model_name}...")

model_path = ModelPath(model_name, validate=False)
model_path = ModelPath(model_name)
path_raw = model_path.data_raw
path_generated = model_path.data_generated
path_artifacts = model_path.artifacts
Expand Down Expand Up @@ -56,19 +56,20 @@ def forecast_ensemble(config):
df = get_standardized_df(df, model_config)

data_generation_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
date_fetch_timestamp = read_log_file(path_raw / f"{run_type}_data_fetch_log.txt").get("Data Fetch Timestamp", None)
save_predictions(df, path_generated, model_config)
create_log_file(path_generated, model_config, ts, data_generation_timestamp)
create_log_file(path_generated, model_config, ts, data_generation_timestamp, date_fetch_timestamp)

dfs.append(df)

df_prediction = get_aggregated_df(dfs, config["aggregation"])
data_generation_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

# I don"t think current timestamp is useful here because timestamp of single models is more important.
# Timestamp of single models is more important than ensemble model timestamp
config["timestamp"] = timestamp[:-1]
save_predictions(df_prediction, path_generated_e, config)

# How to define an ensemble model timestamp? Currently set as data_generation_timestamp.
create_log_file(path_generated_e, config, data_generation_timestamp, data_generation_timestamp,
create_log_file(path_generated_e, config, data_generation_timestamp, data_generation_timestamp, date_fetch_timestamp=None,
model_type="ensemble", models=config["models"])

Loading

0 comments on commit 8bedc53

Please sign in to comment.