Skip to content

Commit

Permalink
Merge pull request #47 from prio-data/orchestration_new
Browse files Browse the repository at this point in the history
Orchestration, adding logging, ensemble model check, ADR
  • Loading branch information
Polichinel authored Sep 25, 2024
2 parents 7504d17 + e683f75 commit 7165a1f
Show file tree
Hide file tree
Showing 141 changed files with 3,246 additions and 1,885 deletions.
72 changes: 57 additions & 15 deletions common_utils/set_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,32 @@ def setup_model_paths(PATH):
Returns:
PATH_model: The path (pathlib path object) including the "models" directory and its immediate subdirectory.
"""

PATH_MODEL = Path(*[i for i in PATH.parts[:PATH.parts.index("models")+2]]) # The +2 is to include the "models" and the individual model name in the path
return PATH_MODEL

try:
PATH_MODEL = Path(*[i for i in PATH.parts[:PATH.parts.index("models") + 2]])
return PATH_MODEL
except ValueError:
return None


def setup_ensemble_paths(PATH):
"""
Extracts and returns the model-specific path (pathlib path object) including the "ensembles" directory and its immediate subdirectory.
This function identifies the "ensembles" (e.g. white_mustang) directory within the provided path and constructs a new path up to and including the next subdirectory after "models".
This is useful for setting up paths specific to a model within the project.
Args:
PATH (Path): The base path, typically the path of the script invoking this function (e.g., `PATH = Path(__file__)`).
Returns:
PATH_ENSEMBLE: The path (pathlib path object) including the "ensembles" directory and its immediate subdirectory.
"""

try:
PATH_ENSEMBLE = Path(*[i for i in PATH.parts[:PATH.parts.index("ensembles") + 2]])
return PATH_ENSEMBLE
except ValueError:
return None


def setup_project_paths(PATH) -> None:
Expand Down Expand Up @@ -71,6 +94,7 @@ def setup_project_paths(PATH) -> None:

PATH_ROOT = setup_root_paths(PATH)
PATH_MODEL = setup_model_paths(PATH)
PATH_ENSEMBLE = setup_ensemble_paths(PATH)

# print(f"Root path: {PATH_ROOT}") # debug
# print(f"Model path: {PATH_MODEL}") # debug
Expand All @@ -84,17 +108,34 @@ def setup_project_paths(PATH) -> None:
# print(f"Common configs path: {PATH_COMMON_CONFIGS}") # debug

# Define model-specific paths
PATH_CONFIGS = PATH_MODEL / "configs"
PATH_SRC = PATH_MODEL / "src"
PATH_UTILS = PATH_SRC / "utils"
PATH_MANAGEMENT = PATH_SRC / "management" # added to keep the management scripts in a separate folder the utils according to Sara's point
PATH_ARCHITECTURES = PATH_SRC / "architectures"
PATH_TRAINING = PATH_SRC / "training"
PATH_FORECASTING = PATH_SRC / "forecasting"
PATH_OFFLINE_EVALUATION = PATH_SRC / "offline_evaluation"
PATH_DATALOADERS = PATH_SRC / "dataloaders"

paths_to_add = [PATH_ROOT, PATH_COMMON_UTILS, PATH_COMMON_CONFIGS, PATH_COMMON_QUERYSETS, PATH_CONFIGS, PATH_UTILS, PATH_MANAGEMENT, PATH_ARCHITECTURES, PATH_TRAINING, PATH_FORECASTING, PATH_OFFLINE_EVALUATION, PATH_DATALOADERS]
if PATH_MODEL:
PATH_CONFIGS = PATH_MODEL / "configs"
PATH_SRC = PATH_MODEL / "src"
PATH_UTILS = PATH_SRC / "utils"
PATH_MANAGEMENT = PATH_SRC / "management" # added to keep the management scripts in a separate folder the utils according to Sara's point
PATH_ARCHITECTURES = PATH_SRC / "architectures"
PATH_TRAINING = PATH_SRC / "training"
PATH_FORECASTING = PATH_SRC / "forecasting"
PATH_OFFLINE_EVALUATION = PATH_SRC / "offline_evaluation"
PATH_DATALOADERS = PATH_SRC / "dataloaders"
paths_to_add = [PATH_ROOT, PATH_COMMON_UTILS, PATH_COMMON_CONFIGS, PATH_COMMON_QUERYSETS,
PATH_CONFIGS, PATH_UTILS, PATH_MANAGEMENT, PATH_ARCHITECTURES, PATH_TRAINING,
PATH_FORECASTING, PATH_OFFLINE_EVALUATION, PATH_DATALOADERS]

# Define ensemble paths
if PATH_ENSEMBLE:
PATH_CONFIGS_E = PATH_ENSEMBLE / "configs"
PATH_SRC_E = PATH_ENSEMBLE / "src"
PATH_UTILS_E = PATH_SRC_E / "utils"
PATH_MANAGEMENT_E = PATH_SRC_E / "management" # added to keep the management scripts in a separate folder the utils according to Sara's point
PATH_ARCHITECTURES_E = PATH_SRC_E / "architectures"
PATH_TRAINING_E = PATH_SRC_E / "training"
PATH_FORECASTING_E = PATH_SRC_E / "forecasting"
PATH_OFFLINE_EVALUATION_E = PATH_SRC_E / "offline_evaluation"
PATH_DATALOADERS_E = PATH_SRC_E / "dataloaders"
paths_to_add = [PATH_ROOT, PATH_COMMON_UTILS, PATH_COMMON_CONFIGS,PATH_COMMON_QUERYSETS,
PATH_CONFIGS_E, PATH_UTILS_E, PATH_MANAGEMENT_E, PATH_ARCHITECTURES_E, PATH_TRAINING_E,
PATH_FORECASTING_E, PATH_OFFLINE_EVALUATION_E, PATH_DATALOADERS_E]

for path in paths_to_add:
path_str = str(path)
Expand All @@ -116,8 +157,9 @@ def setup_data_paths(PATH) -> Path:

#PATH_MODEL = Path(*[i for i in PATH.parts[:PATH.parts.index("models")+2]]) # The +2 is to include the "models" and the individual model name in the path
PATH_MODEL = setup_model_paths(PATH)
PATH_ENSEMBLE = setup_ensemble_paths(PATH)

PATH_DATA = PATH_MODEL / "data"
PATH_DATA = PATH_MODEL / "data" if PATH_MODEL else PATH_ENSEMBLE / "data"
PATH_RAW = PATH_DATA / "raw"
PATH_PROCCEDS = PATH_DATA / "processed"
PATH_GENERATED = PATH_DATA / "generated"
Expand Down
28 changes: 17 additions & 11 deletions common_utils/utils_cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ def parse_args():
'Note: If --sweep is specified, --forecast will also automatically be flagged. '
'Can only be used with --run_type forecasting.')


parser.add_argument('-a', '--artifact_name',
type=str,
help='Specify the name of the model artifact to be used for evaluation. '
Expand All @@ -47,8 +46,20 @@ def parse_args():
'where <run_type> is calibration, testing, or forecasting, and <timestamp> is in the format YMD_HMS.'
'If not provided, the latest artifact will be used by default.')

parser.add_argument('-en', '--ensemble',
action='store_true',
help='Flag to indicate if the model is an ensemble.' )

parser.add_argument('-sa', '--saved',
action='store_true',
help='Used locally stored data')

parser.add_argument('-o', '--override_month',
help='Over-ride use of current month', type=int)

return parser.parse_args()


def validate_arguments(args):
if args.sweep and args.run_type != 'calibration':
print("Error: Sweep runs must have --run_type set to 'calibration'. Exiting.")
Expand Down Expand Up @@ -76,13 +87,8 @@ def validate_arguments(args):
print("To fix: Set --run_type to forecasting if --forecast is flagged.")
sys.exit(1)


# notes on stepshifted models:
# There will be some thinking here in regards to how we store, denote (naming convention), and retrieve the model artifacts from stepshifted models.
# It is not a big issue, but it is something to consider os we don't do something headless.
# A possible format could be: <run_type>_model_s<step>_<timestamp>.pt example: calibration_model_s00_20210831_123456.pt, calibration_model_s01_20210831_123456.pt, etc.
# And the rest of the code maded in a way to handle this naming convention without any issues. Could be a simple fix.
# Alternatively, we could store the model artifacts in a subfolder for each stepshifted model. This would make it easier to handle the artifacts, but it would also make it harder to retrieve the latest artifact for a given run type.
# Lastly, the solution Xiaolong is working on might allow us the store multiple models (steps) in one artifact, which would make this whole discussion obsolete and be the best solution.


if args.ensemble:
# This is a temporary solution. In the future we might need to train and sweep the ensemble models.
if args.sweep or args.train:
print("Error: --aggregation flag cannot be used with --sweep or --train. Exiting.")
sys.exit(1)
83 changes: 53 additions & 30 deletions common_utils/utils_dataloaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
import numpy as np
import pandas as pd


#from config_partitioner import get_partitioner_dict
# from config_partitioner import get_partitioner_dict
from set_partition import get_partitioner_dict
from config_input_data import get_input_data_config # this is model specific
from config_input_data import get_input_data_config # this is model specific
from common_configs import config_drift_detection
from utils_df_to_vol_conversion import df_to_vol


def fetch_data_from_viewser(month_first,month_last,drift_config_dict):
def fetch_data_from_viewser(month_first, month_last, drift_config_dict):
"""
Fetches and prepares the initial DataFrame from viewser.
Expand All @@ -21,11 +20,14 @@ def fetch_data_from_viewser(month_first,month_last,drift_config_dict):
pd.DataFrame: The prepared DataFrame with initial processing done.
"""
print(f'Beginning file download through viewser with month range {month_first},{month_last}')
queryset_base = get_input_data_config() # just used here..
df, alerts = queryset_base.publish().fetch_with_drift_detection(month_first, month_last-1, drift_config_dict)
df.reset_index(inplace=True)
df.rename(columns={'priogrid_gid': 'pg_id'}, inplace=True) # arguably HydraNet or at lest vol specific
df['in_viewser'] = True # arguably HydraNet or at lest vol specific
queryset_base = get_input_data_config() # just used here..
df, alerts = queryset_base.publish().fetch_with_drift_detection(month_first, month_last - 1, drift_config_dict)
df = ensure_float64(df) # The dataframe must contain only np.float64 floats

# Not required for stepshift model
# df.reset_index(inplace=True)
# df.rename(columns={'priogrid_gid': 'pg_id'}, inplace=True) # arguably HydraNet or at lest vol specific
# df['in_viewser'] = True # arguably HydraNet or at lest vol specific

return df, alerts

Expand Down Expand Up @@ -55,8 +57,8 @@ def get_month_range(partition):

return month_first, month_last

def get_drift_config_dict(partition):

def get_drift_config_dict(partition):
"""
Gets the drift-detection configuration dictionary for the pertinent partition from the
drift detection configs
Expand All @@ -73,8 +75,8 @@ def get_drift_config_dict(partition):

return drift_config_dict

def validate_df_partition(df,partition,override_month=None):

def validate_df_partition(df, partition, override_month=None):
"""
Checks to see if the min and max months in the input dataframe are the same as the min
month in the train and max month in the predict portions (or min and max months in the train portion for
Expand All @@ -91,7 +93,10 @@ def validate_df_partition(df,partition,override_month=None):
"""

df_time_units = df['month_id'].values
if 'month_id' in df.columns:
df_time_units = df['month_id'].values
else:
df_time_units = df.index.get_level_values('month_id').values
partitioner_dict = get_partitioner_dict(partition)
if partition in ['calibration', 'testing']:
first_month = partitioner_dict['train'][0]
Expand All @@ -100,13 +105,14 @@ def validate_df_partition(df,partition,override_month=None):
first_month = partitioner_dict['train'][0]
last_month = partitioner_dict['train'][1]
if override_month is not None:
last_month=override_month-1
last_month = override_month - 1

if [np.min(df_time_units), np.max(df_time_units)] != [first_month,last_month]:
if [np.min(df_time_units), np.max(df_time_units)] != [first_month, last_month]:
return False
else:
return True


def filter_dataframe_by_month_range(df, month_first, month_last):
"""
Filters the DataFrame to include only the specified month range.
Expand All @@ -123,15 +129,15 @@ def filter_dataframe_by_month_range(df, month_first, month_last):
return df[df['month_id'].isin(month_range)].copy()


def get_views_df(partition,override_month=None):
def get_views_df(partition, override_month=None):
"""
Fetches and processes a DataFrame containing spatial-temporal data for the specified partition type.
This function combines fetching data, determining the month range, filtering the DataFrame,
and calculating absolute indices based on the provided partition type ('calibration', 'testing', or 'forecasting').
Args:
partition (str): Specifies the type of partition to retrieve. Must be one of 'calibration', 'testing',
partition (str): Specifies the type of partition to retrieve. Must be one of 'calibration', 'testing',
or 'forecasting'.
- 'calibration': Use months specified for calibration.
- 'testing': Use months specified for testing.
Expand All @@ -158,14 +164,12 @@ def get_views_df(partition,override_month=None):
month_last = override_month
print(f'\n ***Warning: overriding end month in forecasting partition to {month_last} ***\n')


df, alerts = fetch_data_from_viewser(month_first, month_last, drift_config_dict)

return df, alerts


def fetch_or_load_views_df(partition, PATH_RAW, use_saved=False, override_month=None):

"""
Fetches or loads a DataFrame for a given partition from viewser.
Expand All @@ -182,13 +186,13 @@ def fetch_or_load_views_df(partition, PATH_RAW, use_saved=False, override_month=
pd.DataFrame: The DataFrame fetched or loaded from viewser, with minimum preprocessing applied.
"""

path_viewser_df = os.path.join(str(PATH_RAW), f'{partition}_viewser_df.pkl') #maby change to df...
path_viewser_df = os.path.join(str(PATH_RAW), f'{partition}_viewser_df.pkl') # maby change to df...

# Create the folders if they don't exist
os.makedirs(str(PATH_RAW), exist_ok=True)
#os.makedirs(str(PATH_PROCESSED), exist_ok=True)
# os.makedirs(str(PATH_PROCESSED), exist_ok=True)

alerts=None
alerts = None

if use_saved:
# Check if the VIEWSER data file exists
Expand All @@ -201,7 +205,7 @@ def fetch_or_load_views_df(partition, PATH_RAW, use_saved=False, override_month=

else:
print(f'Fetching file...')
df, alerts = get_views_df(partition, override_month) # which is then used here
df, alerts = get_views_df(partition, override_month) # which is then used here
print(f'Saving file to {path_viewser_df}')
df.to_pickle(path_viewser_df)

Expand All @@ -212,13 +216,13 @@ def fetch_or_load_views_df(partition, PATH_RAW, use_saved=False, override_month=
else:
raise RuntimeError(f'file at {path_viewser_df} incompatible with partition {partition}')


# could be moved to common_utils/utils_df_to_vol_conversion.py but it is not really a conversion function so I would keep it here for now.
def create_or_load_views_vol(partition, PATH_PROCESSED, PATH_RAW):

"""
Creates or loads a volume from a DataFrame for a specified partition.
This function manages the creation or loading of a 4D volume array based on the DataFrame
This function manages the creation or loading of a 4D volume array based on the DataFrame
associated with the given partition. It ensures that the volume file is available locally,
either by loading it if it exists or creating it from the DataFrame if it does not.
This volume array is used as input data for CNN-based models such as HydraNet.
Expand All @@ -228,7 +232,7 @@ def create_or_load_views_vol(partition, PATH_PROCESSED, PATH_RAW):
PATH_PROCESSED (str or Path): The path to the directory where processed volume data should be stored.
Returns:
np.ndarray: The 4D volume array created or loaded from the DataFrame, with shape
np.ndarray: The 4D volume array created or loaded from the DataFrame, with shape
[n_months, height, width, n_features].
"""
Expand All @@ -254,9 +258,9 @@ def create_or_load_views_vol(partition, PATH_PROCESSED, PATH_RAW):

return vol

def get_alert_help_string():

help_string=(f"""
def get_alert_help_string():
help_string = (f"""
# Data fetching and drift detection run
Issues alerts if drift detection algorithms selected in config\_drift\_detection
are triggered\n
Expand All @@ -272,6 +276,26 @@ def get_alert_help_string():

return help_string


def ensure_float64(df):
"""
Check if the DataFrame only contains np.float64 types. If not, raise a warning
and convert the DataFrame to use np.float64 for all its numeric columns.
"""

non_float64_cols = df.select_dtypes(include=['number']).columns[
df.select_dtypes(include=['number']).dtypes != np.float64]

if len(non_float64_cols) > 0:
print(
f"Warning: DataFrame contains non-np.float64 numeric columns. Converting the following columns: {', '.join(non_float64_cols)}")

for col in non_float64_cols:
df[col] = df[col].astype(np.float64)

return df


def parse_args():
parser = argparse.ArgumentParser(description='Fetch data for different partitions')

Expand All @@ -282,5 +306,4 @@ def parse_args():
parser.add_argument('-s', '--saved', action='store_true', help='Used locally stored data')
parser.add_argument('-o', '--override_month', help='Over-ride use of current month', type=int)


return parser.parse_args()
return parser.parse_args()
Loading

0 comments on commit 7165a1f

Please sign in to comment.