From 5ed7cc59e4024c7567ffcaaf18aa8592cd5bad14 Mon Sep 17 00:00:00 2001 From: Vishal Date: Wed, 5 Jun 2024 13:38:00 +0200 Subject: [PATCH] feat: working pipeline with configs (baseline model: knn) --- src/call_data_api.py | 11 ++-- src/{config.py => config_data.py} | 66 +++++++++++++----------- src/config_model.py | 86 +++++++++++++++++++++++++++++++ src/frontend.py | 10 ++-- src/models.py | 37 ++++++++++++- src/predict.py | 43 ++++++++++++---- src/process_data.py | 6 +-- src/train.py | 63 ++++++++++++++++------ 8 files changed, 252 insertions(+), 70 deletions(-) rename src/{config.py => config_data.py} (71%) create mode 100644 src/config_model.py diff --git a/src/call_data_api.py b/src/call_data_api.py index ed65b34..3017e51 100644 --- a/src/call_data_api.py +++ b/src/call_data_api.py @@ -7,13 +7,12 @@ import os from dataclasses import dataclass import requests -from tqdm import tqdm import pandas as pd from utils import setup_logging -from config import URL, LINKS, INFERENCE_DATA_DATE, data_folder +from config_data import URL, LINKS, input_date_formatted, BASE_PATH_DATA -temp_path = data_folder / "raw_data" +temp_path = BASE_PATH_DATA / "raw_data" logging = setup_logging(file_name="call_data_api.log") @@ -65,7 +64,7 @@ def merge_data(self): df = pd.read_csv(f"{self.read_path}/raw_data_{i}.csv") df["t_1h"] = pd.to_datetime(df["t_1h"]) - if str(df["t_1h"].dt.date.min()) == INFERENCE_DATA_DATE: + if str(df["t_1h"].dt.date.min()) == input_date_formatted: full_data_list.append(df) else: logging.info("Data for %s detector is not available", i) @@ -89,7 +88,7 @@ def clean_data(self): def data_collector(limit=24, offset=0, timezone="Europe/Berlin"): """Wrapper fucntion to collect and save the data""" - for link in tqdm(LINKS): + for link in LINKS: # Define the query parameters params = { @@ -109,7 +108,7 @@ def data_collector(limit=24, offset=0, timezone="Europe/Berlin"): api_handler.call_open_api() data_merger = DataMerger( - path=f"{temp_path}/raw_data_{INFERENCE_DATA_DATE}.csv", + path=f"{temp_path}/raw_data_{input_date_formatted}.csv", list_links=LINKS, read_path=temp_path, ) diff --git a/src/config.py b/src/config_data.py similarity index 71% rename from src/config.py rename to src/config_data.py index adb19d6..ab4d5bb 100644 --- a/src/config.py +++ b/src/config_data.py @@ -1,5 +1,4 @@ from datetime import datetime, timedelta -import os from pathlib import Path # API URL @@ -8,35 +7,6 @@ "comptages-routiers-permanents/records" ) -# Previous day's input data i.e., to make predictions for today, we use yesterday's data -INFERENCE_DATA_DATE = (datetime.today() - timedelta(1)).strftime("%Y-%m-%d") - -# Path handling - -# Define the base paths -data_folder = Path("../data") - -# Define the specific paths using the base paths -file_raw_input = data_folder / "raw_data" / f"raw_data_{INFERENCE_DATA_DATE}.csv" -file_train_input = data_folder / "historical_data" / "paris_trunk_june_july.csv" -file_static_attributes = data_folder / "processed_data" / "link_static_attributes.csv" -file_historical_trends = data_folder / "processed_data" / "link_historical_trends.csv" -file_processed_input = ( - data_folder / "processed_data" / f"inference_data_{INFERENCE_DATA_DATE}.csv" -) - -# column names -list_column_order = [ - "time_idx", - "day", - "hour", - "maxspeed", - "length", - "lanes", - "paris_id", - "q", -] - # Network detector IDs used to query the data LINKS = [ "5169", @@ -155,3 +125,39 @@ "5455", "5456", ] + +# Define the base paths +BASE_PATH_DATA = Path("../data") + +# column names +LIST_COLUMN_ORDER = [ + "time_idx", + "day", + "hour", + "maxspeed", + "length", + "lanes", + "paris_id", + "q", +] + + +# Previous day's input data i.e., to make predictions for today, we use yesterday's data +input_date = datetime.today() - timedelta(1) +input_date_formatted = input_date.strftime("%Y-%m-%d") +prediction_date = datetime.today() +prediction_date_formatted = prediction_date.strftime("%Y-%m-%d") + +# Define the specific paths using the base paths +file_raw_input = BASE_PATH_DATA / "raw_data" / f"raw_data_{input_date_formatted}.csv" +file_train_input = BASE_PATH_DATA / "historical_data" / "paris_trunk_june_july.csv" +file_model_train = BASE_PATH_DATA / "historical_data" / "paris_trunk_june_july.csv" +file_static_attributes = ( + BASE_PATH_DATA / "processed_data" / "link_static_attributes.csv" +) +file_historical_trends = ( + BASE_PATH_DATA / "processed_data" / "link_historical_trends.csv" +) +file_processed_input = ( + BASE_PATH_DATA / "processed_data" / f"inference_data_{input_date_formatted}.csv" +) diff --git a/src/config_model.py b/src/config_model.py new file mode 100644 index 0000000..cc843de --- /dev/null +++ b/src/config_model.py @@ -0,0 +1,86 @@ +from config_data import file_model_train + +TRAINING_PARAMS = { + "metric": "smape", + "training": True, + "data_path": file_model_train, + "model_output_dir": "modeloutput/", + "seed": 46, + "test_proportion": 0.15, + "validation_proportion": 0.15, + "patience": 25, + "train_episodes": 1000, + "batch_size": 512, +} + +FORECASTING_PARAMS = { + "lb": 24, + "ph": 24, +} + +## month and day are considered as static for the forecasting horizon, short-term forecasting +FEATURE_SETTINGS = { + "dyn_to_static": ["day", "hour"], + "include_occupancy": False, + "dynamic_categorical_features": ["day", "hour"], + "static_categorical_features": [], + "dynamic_continous_features": [ + # "speed_kph_mean", + # "speed_kph_stddev", + "q", + ], + "static_continous_features": [ + "maxspeed", + "lanes", + "length", + ], + "other_columns": ["time_idx", "paris_id"], + "occupancy_column": ["k"], + "target_as_autoregressive_feature": ["q"], + "target_column": ["qt"], +} + +# Using the CONFIG dictionary +metric = TRAINING_PARAMS["metric"] +training = TRAINING_PARAMS["training"] +data_path = TRAINING_PARAMS["data_path"] +train_episodes = TRAINING_PARAMS["train_episodes"] + +dynamic_continous_features = FEATURE_SETTINGS["dynamic_continous_features"] +dynamic_categorical_features = FEATURE_SETTINGS["dynamic_categorical_features"] +static_continous_features = FEATURE_SETTINGS["static_continous_features"] +static_categorical_features = FEATURE_SETTINGS["static_categorical_features"] + +continous_features = [*static_continous_features, *dynamic_continous_features] +categorical_features = [*static_categorical_features, *dynamic_categorical_features] + +dyn_to_static = FEATURE_SETTINGS["dyn_to_static"] + +occupancy_column = FEATURE_SETTINGS["occupancy_column"] +other_columns = FEATURE_SETTINGS["other_columns"] +target_as_autoregressive_feature = FEATURE_SETTINGS["target_as_autoregressive_feature"] +target_column = FEATURE_SETTINGS["target_column"] + +if not FEATURE_SETTINGS["include_occupancy"]: + pass +else: + dynamic_continous_features.append(*occupancy_column) + +if FEATURE_SETTINGS["include_occupancy"]: + dynamic_features = [ + # TODO: ordering as first: categorical and second: continous in the list + # is important for preprocess data fucntion + *dynamic_categorical_features, + *dynamic_continous_features, + *occupancy_column, + *target_column, + ] +else: + dynamic_features = [ + # TODO: ordering as first: categorical and second: continous in the list + # is important for preprocess data fucntion + *dynamic_categorical_features, + *dynamic_continous_features, + *target_column, + ] +static_features = [*static_categorical_features, *static_continous_features] diff --git a/src/frontend.py b/src/frontend.py index fa26a81..d10a0c0 100644 --- a/src/frontend.py +++ b/src/frontend.py @@ -7,7 +7,7 @@ import numpy as np import pandas as pd -from config_interface import INFERENCE_PREDICTION_DATE, INFERENCE_INPUT_DATE +from config_data import prediction_date, input_date class PlotFormatting(ABC): @@ -20,8 +20,8 @@ def read_data(): class DashboardData(PlotFormatting): - prediction_date = INFERENCE_PREDICTION_DATE - input_date = INFERENCE_INPUT_DATE + _prediction_date = prediction_date + _input_date = input_date def __init__(self, path_pt, path_pt_1, path_o_t_1, path_variance=None) -> None: self.path_predictions_t = path_pt @@ -33,8 +33,8 @@ def __init__(self, path_pt, path_pt_1, path_o_t_1, path_variance=None) -> None: @staticmethod def create_date_strings(): - current_date = DashboardData.prediction_date.strftime("%d-%m-%Y") - previous_date = DashboardData.input_date.strftime("%d-%m-%Y") + current_date = DashboardData._prediction_date.strftime("%d-%m-%Y") + previous_date = DashboardData._input_date.strftime("%d-%m-%Y") return previous_date, current_date @classmethod diff --git a/src/models.py b/src/models.py index e5bf75e..55b1eb5 100644 --- a/src/models.py +++ b/src/models.py @@ -1,4 +1,3 @@ -from __future__ import annotations from abc import ABC, abstractmethod from typing import Any @@ -7,7 +6,9 @@ from sklearn.neighbors import KNeighborsRegressor from sklearn.metrics import mean_squared_error -from sklearn.model_selection import cross_val_score +from sklearn.model_selection import cross_val_score, RepeatedKFold + +import xgboost as xgb class Model(ABC): @@ -74,6 +75,38 @@ def load_model(self, path): self.model = pickle.load(open(path, "rb")) +class XGBoostModel(Model): + + def __init__(self, **kwargs) -> None: + self.model = xgb.XGBRegressor(**kwargs) + + def train_model(self, X, y): + self.model.fit( + X, + y, + verbose=2, + ) + return self.model + + def predict_model(self, X): + return self.model.predict(X) + + def cross_validation(self, X, y): + cv = RepeatedKFold(n_splits=10, n_repeats=3, random_state=1) + self.cv_mocel = cross_val_score( + self.model, X, y, scoring="neg_mean_absolute_error", cv=cv, n_jobs=-1 + ) + return self.cv_model + + def save_model(self, path): + saved_model = open(path, "wb") + pickle.dump(self.model, saved_model) + saved_model.close() + + def load_model(self, path): + self.model = pickle.load(open(path, "rb")) + + class EvaluationMetrics: def __init__(self, y, y_hat) -> None: self.y = y diff --git a/src/predict.py b/src/predict.py index ca97199..00a9d3f 100644 --- a/src/predict.py +++ b/src/predict.py @@ -1,19 +1,40 @@ from pathlib import Path -from config_model import * -from models import KNNModel +import argparse + +from config_model import FORECASTING_PARAMS +from config_model import ( + continous_features, + categorical_features, + other_columns, + target_as_autoregressive_feature, + target_column, + static_features, + dynamic_features, +) +from config_data import prediction_date_formatted, file_processed_input + +from models import KNNModel, XGBoostModel from dataset import DataSplitter, TimeSeriesScaler, TimeSeriesFormatter from utils import setup_logging, predicitons_to_df -inference_date = INFERENCE_INPUT_DATE -inference_file = file_processed_input +lb, ph = (FORECASTING_PARAMS["lb"], FORECASTING_PARAMS["ph"]) + +parser = argparse.ArgumentParser() +parser.add_argument( + "-m", + "--model", + help="type of machine learning model", + choices=["knn", "xgboost"], + default="knn", +) +args = parser.parse_args() -lb, ph = (MODEL_PARAMS["lb"], MODEL_PARAMS["ph"]) # Set up logging logging = setup_logging("predict.log") if __name__ == "__main__": - data_object = DataSplitter(inference_file) + data_object = DataSplitter(file_processed_input) X_formatted = data_object.df time_series_object = TimeSeriesScaler( @@ -33,12 +54,16 @@ W_test, X_test, z_test = series_formatter_obj.format_data(scaled_test) X_test = TimeSeriesFormatter.reshape_x(X_test) - traffic_model = KNNModel() - traffic_model.load_model("artifacts/knn_model") + if args.model == "knn": + traffic_model = KNNModel() + traffic_model.load_model(f"artifacts/{args.model}_model") + elif args.model == "xgboost": + traffic_model = XGBoostModel() + traffic_model.load_model(f"artifacts/{args.model}_model") y_test_hat = traffic_model.predict_model(X_test) df_test = predicitons_to_df(ph, z_test, y_test_hat) df_test.to_csv( - Path("..") / "predictions" / f"knn_{INFERENCE_PREDICTION_DATE}.csv", index=False + Path("..") / "predictions" / f"knn_{prediction_date_formatted}.csv", index=False ) diff --git a/src/process_data.py b/src/process_data.py index 711f960..ad713a2 100644 --- a/src/process_data.py +++ b/src/process_data.py @@ -6,12 +6,12 @@ from utils import setup_logging -from config import ( +from config_data import ( file_static_attributes, file_train_input, file_historical_trends, file_raw_input, - list_column_order, + LIST_COLUMN_ORDER, file_processed_input, ) @@ -113,7 +113,7 @@ def fill_missing_values(_df): file_static_attributes, file_raw_input, file_historical_trends, - list_column_order, + LIST_COLUMN_ORDER, ) merged_df = fill_missing_values(df) diff --git a/src/train.py b/src/train.py index 5b3537a..06a1bc5 100644 --- a/src/train.py +++ b/src/train.py @@ -1,23 +1,45 @@ -from config_model import * +import argparse + +from config_model import TRAINING_PARAMS, FORECASTING_PARAMS +from config_model import ( + continous_features, + categorical_features, + other_columns, + target_as_autoregressive_feature, + target_column, + static_features, + dynamic_features, +) +from config_data import prediction_date_formatted from utils import setup_logging -from models import KNNModel, EvaluationMetrics +from models import KNNModel, EvaluationMetrics, XGBoostModel from dataset import DataSplitter, TimeSeriesScaler, TimeSeriesFormatter # Set up logging logging = setup_logging("train.log") +parser = argparse.ArgumentParser() +parser.add_argument( + "-m", + "--model", + help="type of machine learning model", + choices=["knn", "xgboost"], + default="knn", +) +args = parser.parse_args() + if __name__ == "__main__": - data_object = DataSplitter(data_path) + data_object = DataSplitter(TRAINING_PARAMS["data_path"]) X_formatted = data_object.df - for lb, ph in [(MODEL_PARAMS["lb"], MODEL_PARAMS["ph"])]: + for lb, ph in [(FORECASTING_PARAMS["lb"], FORECASTING_PARAMS["ph"])]: det_ids = data_object.get_groups - seed = CONFIG["seed"] - validation_prop = CONFIG["validation_proportion"] - test_prop = CONFIG["test_proportion"] + seed = TRAINING_PARAMS["seed"] + validation_prop = TRAINING_PARAMS["validation_proportion"] + test_prop = TRAINING_PARAMS["test_proportion"] data_object.split_groups(seed, validation_prop, test_prop) @@ -63,24 +85,35 @@ X_val = TimeSeriesFormatter.reshape_x(X_val) X_test = TimeSeriesFormatter.reshape_x(X_test) - optimal_k = 8 + if args.model == "knn": + optimal_k = 2 + traffic_model = KNNModel( + n_neighbors=optimal_k, weights="uniform", algorithm="kd_tree", p=2 + ) + elif args.model == "xgboost": + traffic_model = XGBoostModel( + n_estimators=300, + max_depth=5, + eta=0.1, + subsample=0.7, + colsample_bytree=0.8, + ) + else: + pass - traffic_model = KNNModel( - n_neighbors=optimal_k, weights="uniform", algorithm="kd_tree", p=2 - ) traffic_model.train_model(X_train, y_train) y_train_hat = traffic_model.predict_model(X_train) train_rmse = EvaluationMetrics(y_train, y_train_hat).rmse() - print("RMSE on Train Set:", train_rmse) + # logging.info("RMSE on Train Set:", train_rmse) y_val_hat = traffic_model.predict_model(X_val) val_rmse = EvaluationMetrics(y_val, y_val_hat).rmse() - print("RMSE on Validation Set:", val_rmse) + logging.info(f"RMSE on Validation Set: {val_rmse}") y_test_hat = traffic_model.predict_model(X_test) test_rmse = EvaluationMetrics(y_test, y_test_hat).rmse() - print("RMSE on Test Set:", test_rmse) + logging.info(f"RMSE on Test Set: {test_rmse}") - traffic_model.save_model("artifacts/knn_model") + traffic_model.save_model(f"artifacts/{args.model}_model") time_series_object.save_scaler("artifacts/minmax_scaler.gz")