Skip to content

Commit

Permalink
Merge pull request #130 from prio-data/drift_detection_all_models
Browse files Browse the repository at this point in the history
Drift detection all models
  • Loading branch information
Polichinel authored Oct 31, 2024
2 parents 8e01015 + 730b515 commit f67f11d
Show file tree
Hide file tree
Showing 56 changed files with 233 additions and 16,505 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/prevent_merge_when_branch_behind.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ on:
- production
- development
- fix_prevent_merge_when_branch_behind

workflow_dispatch: # enables manual triggering

jobs:
Expand All @@ -26,7 +27,7 @@ jobs:
uses: actions/checkout@v3
with:
ref: ${{ github.event.pull_request.head.ref }}

- name: Fetch comparison branch
run: |
git fetch --unshallow
Expand All @@ -38,5 +39,6 @@ jobs:
echo "::notice ::Branch is up-to-date with ${{ env.branch }}."
else
echo "::error ::Merge Blocked: Your branch is behind the latest commits on ${{ env.branch }}. Please update your branch before attempting to merge."
exit 1
fi
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,5 @@ cython_debug/
*.bak

# txt logs
*.txt

*.txt
41 changes: 41 additions & 0 deletions common_querysets/queryset_meow_meow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from viewser import Queryset, Column

def generate():
"""
Contains the configuration for the input data in the form of a viewser queryset. That is the data from viewser that is used to train the model.
This configuration is "behavioral" so modifying it will affect the model's runtime behavior and integration into the deployment system.
There is no guarantee that the model will work if the input data configuration is changed here without changing the model settings and algorithm accordingly.
Returns:
- queryset_base (Queryset): A queryset containing the base data for the model training.
"""

# VIEWSER 6, Example configuration. Modify as needed.

queryset_base = (Queryset("meow_meow", "priogrid_month")
# Create a new column 'ln_sb_best' using data from 'priogrid_month' and 'ged_sb_best_count_nokgi' column
# Apply logarithmic transformation, handle missing values by replacing them with NA
.with_column(Column("ln_sb_best", from_loa="priogrid_month", from_column="ged_sb_best_count_nokgi")
.transform.ops.ln().transform.missing.replace_na())

# Create a new column 'ln_ns_best' using data from 'priogrid_month' and 'ged_ns_best_count_nokgi' column
# Apply logarithmic transformation, handle missing values by replacing them with NA
.with_column(Column("ln_ns_best", from_loa="priogrid_month", from_column="ged_ns_best_count_nokgi")
.transform.ops.ln().transform.missing.replace_na())

# Create a new column 'ln_os_best' using data from 'priogrid_month' and 'ged_os_best_count_nokgi' column
# Apply logarithmic transformation, handle missing values by replacing them with NA
.with_column(Column("ln_os_best", from_loa="priogrid_month", from_column="ged_os_best_count_nokgi")
.transform.ops.ln().transform.missing.replace_na())

# Create columns for month and year_id
.with_column(Column("month", from_loa="month", from_column="month"))
.with_column(Column("year_id", from_loa="country_year", from_column="year_id"))

# Create columns for country_id, col, and row
.with_column(Column("c_id", from_loa="country_year", from_column="country_id"))
.with_column(Column("col", from_loa="priogrid", from_column="col"))
.with_column(Column("row", from_loa="priogrid", from_column="row"))
)

return queryset_base
5 changes: 5 additions & 0 deletions common_utils/utils_cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ def parse_args():
"-o", "--override_month", help="Over-ride use of current month", type=int
)

parser.add_argument(
"-dd", "--drift_self_test", action="store_true", default=False,
help="Enable drift-detection self_test at data-fetch"
)

return parser.parse_args()


Expand Down
28 changes: 23 additions & 5 deletions common_utils/utils_dataloaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import numpy as np
import pandas as pd

import sys
import logging
from datetime import datetime
Expand All @@ -13,10 +14,16 @@
from viewser import Queryset, Column

import logging

from model_path import ModelPath

logger = logging.getLogger(__name__)
#from utils_logger import setup_logging

#logging.basicConfig(
# level=logging.INFO, format="%(asctime)s %(name)s - %(levelname)s - %(message)s"
#)

logger = logging.getLogger(__name__)

def fetch_data_from_viewser(model_name, month_first, month_last, drift_config_dict, self_test):
"""
Expand All @@ -28,19 +35,27 @@ def fetch_data_from_viewser(model_name, month_first, month_last, drift_config_di
pd.DataFrame: The prepared DataFrame with initial processing done.
"""
logger.info(f'Beginning file download through viewser with month range {month_first},{month_last}')

model_path = ModelPath(model_name)

queryset_base = model_path.get_queryset() # just used here..
if queryset_base is None:
raise RuntimeError(f'Could not find queryset for {model_path.model_name} in common_querysets')
else:
logger.info(f'Found queryset for {model_path.model_name} in common_querysets')
del model_path

df, alerts = queryset_base.publish().fetch_with_drift_detection(start_date=month_first,
end_date=month_last - 1,
drift_config_dict=drift_config_dict,
self_test=self_test)

df = ensure_float64(df) # The dataframe must contain only np.float64 floats
# with wandb.init(project=f'{model_path.model_name}', entity="views_pipeline"):
for ialert, alert in enumerate(str(alerts).strip('[').strip(']').split('Input')):
if 'offender' in alert:
logger.warning({f"{model_path.model_name} data alert {ialert}": str(alert)})

del model_path

# Not required for stepshift model
# df.reset_index(inplace=True)
Expand Down Expand Up @@ -147,7 +162,8 @@ def filter_dataframe_by_month_range(df, month_first, month_last):
return df[df['month_id'].isin(month_range)].copy()


def get_views_df(model_name, partition, override_month=None, self_test=False):
def get_views_df(model_name, partition, self_test, override_month=None):

"""
Fetches and processes a DataFrame containing spatial-temporal data for the specified partition type.
Expand Down Expand Up @@ -187,8 +203,8 @@ def get_views_df(model_name, partition, override_month=None, self_test=False):

return df, alerts

def fetch_or_load_views_df(model_name, partition, PATH_RAW, self_test, use_saved=False, override_month=None):

def fetch_or_load_views_df(model_name, partition, PATH_RAW, self_test=False, use_saved=False, override_month=None):
"""
Fetches or loads a DataFrame for a given partition from viewser.
Expand Down Expand Up @@ -225,12 +241,14 @@ def fetch_or_load_views_df(model_name, partition, PATH_RAW, self_test=False, use

else:
logger.info(f'Fetching data...')
df, alerts = get_views_df(model_name, partition, override_month, self_test) # which is then used here

df, alerts = get_views_df(model_name, partition, self_test, override_month) # which is then used here

data_fetch_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
create_data_fetch_log_file(PATH_RAW, partition, model_name, data_fetch_timestamp)

logger.info(f'Saving data to {path_viewser_df}')

df.to_pickle(path_viewser_df)

if validate_df_partition(df, partition, override_month):
Expand Down
4 changes: 3 additions & 1 deletion common_utils/utils_log_files.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from pathlib import Path

from model_path import ModelPath

logger = logging.getLogger(__name__)
Expand All @@ -10,6 +11,7 @@ def read_log_file(log_file_path):
Reads the log file and returns a dictionary with the relevant information.
Args:
- log_file_path (str or Path): The path to the log file.
Returns:
Expand Down Expand Up @@ -96,7 +98,7 @@ def create_log_file(path_generated,
model_config,
model_timestamp,
data_generation_timestamp,
data_fetch_timestamp,
data_fetch_timestamp,
model_type="single",
models=None):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def forecast_ensemble(config):

for model_name in config["models"]:
logger.info(f"Forecasting single model {model_name}...")

model_path = ModelPath(model_name)
path_raw = model_path.data_raw
path_generated = model_path.data_generated
Expand Down Expand Up @@ -70,6 +69,7 @@ def forecast_ensemble(config):
save_predictions(df_prediction, path_generated_e, config)

# How to define an ensemble model timestamp? Currently set as data_generation_timestamp.

create_log_file(path_generated_e, config, data_generation_timestamp, data_generation_timestamp, date_fetch_timestamp=None,
model_type="ensemble", models=config["models"])

Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
import pickle
import pandas as pd
import datetime
from pathlib import Path
from model_path import ModelPath
from ensemble_path import EnsemblePath
Expand All @@ -25,7 +27,6 @@ def evaluate_ensemble(config):

for model_name in config["models"]:
logger.info(f"Evaluating single model {model_name}...")

model_path = ModelPath(model_name)
path_raw = model_path.data_raw
path_generated = model_path.data_generated
Expand Down Expand Up @@ -79,5 +80,6 @@ def evaluate_ensemble(config):
save_predictions(df_agg, path_generated_e, config)

# How to define an ensemble model timestamp? Currently set as data_generation_timestamp.

create_log_file(path_generated_e, config, data_generation_timestamp, data_generation_timestamp, data_fetch_timestamp=None,
model_type="ensemble", models=config["models"])
3 changes: 2 additions & 1 deletion ensembles/cruel_summer/src/training/train_ensemble.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import pandas as pd
from datetime import datetime
from model_path import ModelPath
from set_partition import get_partitioner_dict
Expand All @@ -18,7 +19,7 @@ def train_ensemble(config):

for model_name in config["models"]:
logger.info(f"Training single model {model_name}...")

model_path = ModelPath(model_name)
path_raw = model_path.data_raw
path_generated = model_path.data_generated
Expand Down
1 change: 1 addition & 0 deletions ensembles/cruel_summer/src/utils/utils_run.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import numpy as np
import pandas as pd
import runpy
import logging
from model_path import ModelPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def forecast_ensemble(config):

for model_name in config["models"]:
logger.info(f"Forecasting single model {model_name}...")

model_path = ModelPath(model_name)
path_raw = model_path.data_raw
path_generated = model_path.data_generated
Expand Down Expand Up @@ -70,6 +69,7 @@ def forecast_ensemble(config):
save_predictions(df_prediction, path_generated_e, config)

# How to define an ensemble model timestamp? Currently set as data_generation_timestamp.

create_log_file(path_generated_e, config, data_generation_timestamp, data_generation_timestamp, date_fetch_timestamp=None,
model_type="ensemble", models=config["models"])

Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
import pickle
import pandas as pd
import datetime
from pathlib import Path
from model_path import ModelPath
from ensemble_path import EnsemblePath
Expand All @@ -25,7 +27,6 @@ def evaluate_ensemble(config):

for model_name in config["models"]:
logger.info(f"Evaluating single model {model_name}...")

model_path = ModelPath(model_name)
path_raw = model_path.data_raw
path_generated = model_path.data_generated
Expand Down Expand Up @@ -79,5 +80,6 @@ def evaluate_ensemble(config):
save_predictions(df_agg, path_generated_e, config)

# How to define an ensemble model timestamp? Currently set as data_generation_timestamp.

create_log_file(path_generated_e, config, data_generation_timestamp, data_generation_timestamp, data_fetch_timestamp=None,
model_type="ensemble", models=config["models"])
2 changes: 1 addition & 1 deletion ensembles/white_mustang/src/training/train_ensemble.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import pandas as pd
from datetime import datetime
from model_path import ModelPath
from set_partition import get_partitioner_dict
Expand All @@ -18,7 +19,6 @@ def train_ensemble(config):

for model_name in config["models"]:
logger.info(f"Training single model {model_name}...")

model_path = ModelPath(model_name)
path_raw = model_path.data_raw
path_generated = model_path.data_generated
Expand Down
1 change: 1 addition & 0 deletions ensembles/white_mustang/src/utils/utils_run.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import numpy as np
import pandas as pd
import runpy
import logging
from model_path import ModelPath
Expand Down
2 changes: 2 additions & 0 deletions models/blank_space/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import wandb
import sys
import warnings
import time

from pathlib import Path
PATH = Path(__file__)
Expand All @@ -19,6 +20,7 @@


if __name__ == "__main__":
start_t = time.time()
wandb.login()

args = parse_args()
Expand Down
11 changes: 3 additions & 8 deletions models/blank_space/src/dataloaders/get_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,11 @@

logger = logging.getLogger(__name__)


def get_data(args, model_name):
model_path = ModelPath(model_name)
def get_data(args, model_name, self_test):
model_path = ModelPath(model_name, validate=False)
path_raw = model_path.data_raw

data, alerts = fetch_or_load_views_df(model_name, args.run_type, path_raw, use_saved=args.saved)
data, alerts = fetch_or_load_views_df(model_name, args.run_type, path_raw, self_test, use_saved=args.saved)
logger.debug(f"DataFrame shape: {data.shape if data is not None else 'None'}")

for ialert, alert in enumerate(str(alerts).strip('[').strip(']').split('Input')):
if 'offender' in alert:
logger.warning({f"{args.run_type} data alert {ialert}": str(alert)})

return data
20 changes: 15 additions & 5 deletions models/blank_space/src/management/execute_model_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,34 @@ def execute_sweep_run(args):
meta_config = get_meta_config()
update_sweep_config(sweep_config, args, meta_config)

get_data(args, sweep_config["name"])

project = f"{sweep_config['name']}_sweep" # we can name the sweep in the config file

with wandb.init(project=f'{project}_fetch', entity="views_pipeline"):
get_data(args, sweep_config["name"], args.drift_self_test)

wandb.finish()

sweep_id = wandb.sweep(sweep_config, project=project, entity="views_pipeline")
wandb.agent(sweep_id, execute_model_tasks, entity="views_pipeline")


def execute_single_run(args):

hp_config = get_hp_config()
meta_config = get_meta_config()
dp_config = get_deployment_config()
config = update_config(hp_config, meta_config, dp_config, args)

get_data(args, config["name"])

project = f"{config['name']}_{args.run_type}"

if args.run_type == "calibration" or args.run_type == "testing":
with wandb.init(project=f'{project}_fetch', entity="views_pipeline"):

get_data(args, config["name"], args.drift_self_test)

wandb.finish()

if args.run_type == 'calibration' or args.run_type == 'testing':

execute_model_tasks(config=config, project=project, train=args.train, eval=args.evaluate,
forecast=False, artifact_name=args.artifact_name)

Expand Down
Loading

0 comments on commit f67f11d

Please sign in to comment.