diff --git a/emission/analysis/modelling/trip_model/forest_classifier.py b/emission/analysis/modelling/trip_model/forest_classifier.py index 0816717db..8041ecc44 100644 --- a/emission/analysis/modelling/trip_model/forest_classifier.py +++ b/emission/analysis/modelling/trip_model/forest_classifier.py @@ -2,37 +2,64 @@ from sklearn.preprocessing import OneHotEncoder import joblib from typing import Dict, List, Optional, Tuple +from sklearn.metrics.pairwise import haversine_distances import emission.core.wrapper.confirmedtrip as ecwc import logging import numpy as np - +import copy import emission.analysis.modelling.trip_model.trip_model as eamuu import emission.analysis.modelling.trip_model.dbscan_svm as eamtd import emission.analysis.modelling.trip_model.util as eamtu import emission.analysis.modelling.trip_model.config as eamtc +import emission.storage.timeseries.builtin_timeseries as estb +from sklearn.exceptions import NotFittedError from sklearn.ensemble import RandomForestClassifier + +EARTH_RADIUS = 6371000 + class ForestClassifier(eamuu.TripModel): def __init__(self,config=None): - # expected_keys = [ - # 'metric', - # 'similarity_threshold_meters', - # 'apply_cutoff', - # 'incremental_evaluation' - # ] - # for k in expected_keys: - # if config.get(k) is None: - # msg = f"greedy trip model config missing expected key {k}" - # raise KeyError(msg) if config is None: config = eamtc.get_config_value_or_raise('model_parameters.forest') logging.debug(f'ForestClassifier loaded model config from file') else: logging.debug(f'ForestClassifier using model config argument') + + random_forest_expected_keys = [ + 'loc_feature', + 'n_estimators', + 'criterion', + 'max_depth', + 'min_samples_split', + 'min_samples_leaf', + 'max_features', + 'bootstrap', + ] + cluster_expected_keys= [ + 'radius', + 'size_thresh', + 'purity_thresh', + 'gamma', + 'C', + 'use_start_clusters', + 'use_trip_clusters', + ] + + for k in random_forest_expected_keys: + if config.get(k) is None: + msg = f"forest trip model config missing expected key {k}" + raise KeyError(msg) + if config['loc_feature'] == 'cluster': + for k in cluster_expected_keys: + if config.get(k) is None: + msg = f"cluster trip model config missing expected key {k}" + raise KeyError(msg) + self.loc_feature = config['loc_feature'] self.radius = config['radius'] self.size_thresh = config['size_thresh'] @@ -50,6 +77,21 @@ def __init__(self,config=None): # self.drop_unclustered = drop_unclustered self.use_start_clusters = config['use_start_clusters'] self.use_trip_clusters = config['use_trip_clusters'] + self.base_features = [ + 'duration', + 'distance', + 'start_local_dt_year', + 'start_local_dt_month', + 'start_local_dt_day', + 'start_local_dt_hour', + 'start_local_dt_weekday', + 'end_local_dt_year', # most likely the same as the start year + 'end_local_dt_month', # most likely the same as the start month + 'end_local_dt_day', + 'end_local_dt_hour', + 'end_local_dt_weekday', + ] + self.targets = ['mode_true', 'purpose_true', 'replaced_true'] if self.loc_feature == 'cluster': # clustering algorithm to generate end clusters @@ -119,8 +161,16 @@ def __init__(self,config=None): random_state=self.random_state) - def fit(self,data: List[ecwc.Confirmedtrip],data_df=None): + def fit(self,trips: List[ecwc.Confirmedtrip]): # get location features + logging.debug(f'fit called with {len(trips)} trips') + + unlabeled = list(filter(lambda t: len(t['data']['user_input']) == 0, trips)) + if len(unlabeled) > 0: + msg = f'model.fit cannot be called with unlabeled trips, found {len(unlabeled)}' + raise Exception(msg) + data_df = estb.BuiltinTimeSeries.to_data_df("analysis/confirmed_trip",trips) + if self.loc_feature == 'cluster': # fit clustering model(s) and one-hot encode their indices # TODO: consolidate start/end_cluster_model in a single instance @@ -238,22 +288,196 @@ def fit(self,data: List[ecwc.Confirmedtrip],data_df=None): self.mode_predictor.fit(self.X_mode, self.y_mode) if len(self.X_replaced) > 0: self.replaced_predictor.fit(self.X_replaced, self.y_replaced) - - def predict(self, data: List[float]) -> Tuple[List[Dict], int]: - pass + logging.info(f"Forest model fit to {len(trips)} rows of trip data") - def to_dict(self) -> Dict: - return joblib.dump(self,compress=3) - - def from_dict(self, model: Dict): - pass + def predict(self, trips: List[float]) -> Tuple[List[Dict], int]: + logging.debug(f"forest classifier predict called with {len(trips)} trips") - def is_incremental(self) -> bool: - pass + if len(trips) == 0: + msg = f'model.predict cannot be called with 0 trips' + raise Exception(msg) + + # CONVERT TRIPS TO dataFrame + test_df = estb.BuiltinTimeSeries.to_data_df("analysis/confirmed_trip",trips) - def extract_features(self, trip: ecwc.Confirmedtrip) -> List[float]: - pass + self.X_test_for_purpose = self._get_X_test_for_purpose(test_df) + + ######################## + ### make predictions ### + ######################## + # note that we want to use purpose data to aid our mode predictions, + # and use both purpose and mode data to aid our replaced-mode + # predictions + try: + purpose_proba_raw = self.purpose_predictor.predict_proba( + self.X_test_for_purpose) + purpose_proba = pd.DataFrame( + purpose_proba_raw, columns=self.purpose_predictor.classes_) + purpose_pred = purpose_proba.idxmax(axis=1) + + # update X_test with one-hot-encoded purpose predictions to aid + # mode predictor + onehot_purpose_df = self.purpose_enc.transform( + pd.DataFrame(purpose_pred).set_index( + self.X_test_for_purpose.index)) + self.X_test_for_mode = pd.concat( + [self.X_test_for_purpose, onehot_purpose_df], axis=1) + + mode_proba, replaced_proba = self._try_predict_proba_mode_replaced() + + except NotFittedError as e: + # if we can't predict purpose, we can still try to predict mode and + # replaced-mode without one-hot encoding the purpose + + purpose_pred = np.full((len(self.X_test_for_purpose), ), np.nan) + purpose_proba_raw = np.full((len(self.X_test_for_purpose), 1), 0) + purpose_proba = pd.DataFrame(purpose_proba_raw, columns=[np.nan]) + + self.X_test_for_mode = self.X_test_for_purpose + mode_proba, replaced_proba = self._try_predict_proba_mode_replaced() + + mode_pred = mode_proba.idxmax(axis=1) + replaced_pred = replaced_proba.idxmax(axis=1) + + if (purpose_pred.dtype == np.float64 and mode_pred.dtype == np.float64 + and replaced_pred.dtype == np.float64): + # this indicates that all the predictions are np.nan so none of the + # random forest classifiers were fitted + raise NotFittedError + + proba_dfs = [] + for label_type, proba in zip( + ['purpose', 'mode', 'replaced'], + [purpose_proba, mode_proba, replaced_proba]): + proba['top_pred'] = proba.idxmax(axis=1) + proba['top_proba'] = proba.max(axis=1, skipna=True) + proba['clusterable'] = self._clusterable( + self.X_test_for_purpose).astype(bool) + proba = pd.concat([proba], keys=[label_type], axis=1) + proba_dfs += [proba] + + self.proba_df = pd.concat(proba_dfs, axis=1) + return self.proba_df + + def _try_predict_proba_mode_replaced(self): + """ Try to predict mode and replaced-mode. Handles error in case the + ensemble algorithms were not fitted. + + Requires self.X_test_for_mode to have already been set. (These are + the DataFrames containing the test data to be passed into self. + mode_predictor.) + + Returns: mode_proba and replaced_proba, two DataFrames containing + class probabilities for mode and replaced-mode respectively + """ + + try: + # predict mode + mode_proba_raw = self.mode_predictor.predict_proba( + self.X_test_for_mode) + mode_proba = pd.DataFrame(mode_proba_raw, + columns=self.mode_predictor.classes_) + mode_pred = mode_proba.idxmax(axis=1) + + # update X_test with one-hot-encoded mode predictions to aid + # replaced-mode predictor + onehot_mode_df = self.mode_enc.transform( + pd.DataFrame(mode_pred).set_index(self.X_test_for_mode.index)) + self.X_test_for_replaced = pd.concat( + [self.X_test_for_mode, onehot_mode_df], axis=1) + replaced_proba = self._try_predict_proba_replaced() + + except NotFittedError as e: + mode_proba_raw = np.full((len(self.X_test_for_mode), 1), 0) + mode_proba = pd.DataFrame(mode_proba_raw, columns=[np.nan]) + + # if we don't have mode predictions, we *could* still try to + # predict replaced mode (but if the user didn't input mode labels + # then it's unlikely they would input replaced-mode) + self.X_test_for_replaced = self.X_test_for_mode + replaced_proba = self._try_predict_proba_replaced() + + return mode_proba, replaced_proba + + def _get_X_test_for_purpose(self, test_df): + """ Do the pre-processing to get data that we can then pass into the + ensemble classifiers. + """ + if self.loc_feature == 'cluster': + # get clusters + self.end_cluster_model.predict(test_df) + clusters_to_encode = self.end_cluster_model.test_df[[ + 'end_cluster_idx' + ]].copy() # copy is to avoid SettingWithCopyWarning + + if self.use_start_clusters or self.use_trip_clusters: + self.start_cluster_model.predict(test_df) + + if self.use_start_clusters: + clusters_to_encode = pd.concat([ + clusters_to_encode, + self.start_cluster_model.test_df[['start_cluster_idx']] + ], + axis=1) + if self.use_trip_clusters: + start_end_clusters = pd.concat([ + self.end_cluster_model.test_df[['end_cluster_idx']], + self.start_cluster_model.test_df[['start_cluster_idx']] + ], + axis=1) + trip_cluster_idx = self.trip_grouper.transform( + start_end_clusters) + clusters_to_encode.loc[:, + 'trip_cluster_idx'] = trip_cluster_idx + + # one-hot encode the cluster indices + loc_features_df = self.cluster_enc.transform(clusters_to_encode) + else: # self.loc_feature == 'coordinates' + test_df = self._clean_data(test_df) + loc_features_df = test_df[[ + 'start_lon', 'start_lat', 'end_lon', 'end_lat' + ]] + + # extract the desired data + X_test = pd.concat([ + test_df[self.base_features].reset_index(drop=True), + loc_features_df.reset_index(drop=True) + ], + axis=1) + return X_test + + + def _clusterable(self, test_df): + """ Check if the end points can be clustered (i.e. are within + meters of an end point from the training set) + """ + if self.loc_feature == 'cluster': + return self.end_cluster_model.test_df.end_cluster_idx >= 0 + + n_samples = test_df.shape[0] + clustered = np.ones(shape=n_samples, dtype=int) * False + + train_coordinates = self.train_df[['end_lat', 'end_lon']] + train_radians = np.radians(train_coordinates) + + for idx, row in test_df.reset_index(drop=True).iterrows(): + # calculate the distances between the ith test data and all points, + # then find the minimum distance for each point and check if it's + # within the distance threshold. + # unfortunately, pairwise_distances_argmin() does not support + # haversine distance, so we have to reimplement it ourselves + new_loc_radians = np.radians(row[["end_lat", "end_lon"]].to_list()) + new_loc_radians = np.reshape(new_loc_radians, (1, 2)) + dist_matrix_meters = haversine_distances( + new_loc_radians, train_radians) * EARTH_RADIUS + + shortest_dist = np.min(dist_matrix_meters) + if shortest_dist < self.radius: + clustered[idx] = True + + return clustered + def _clean_data(self, df): """ Clean a dataframe of trips. (Drop trips with missing start/end locations, expand the user input @@ -332,4 +556,10 @@ def expand_coords(exp_df, purpose=None): dfs.append(df) # display.display(end_loc_df.head()) - return pd.concat(dfs, axis=1) \ No newline at end of file + return pd.concat(dfs, axis=1) + + def to_dict(self) -> Dict: + return joblib.dump(self,compress=3) + + def from_dict(self, model: Dict): + pass diff --git a/emission/analysis/modelling/trip_model/greedy_similarity_binning.py b/emission/analysis/modelling/trip_model/greedy_similarity_binning.py index 4e9ee6fad..a19f5e5c0 100644 --- a/emission/analysis/modelling/trip_model/greedy_similarity_binning.py +++ b/emission/analysis/modelling/trip_model/greedy_similarity_binning.py @@ -128,7 +128,7 @@ class label to apply: self.bins: Dict[str, Dict] = {} - def fit(self, trips: List[ecwc.Confirmedtrip],tripsdf=None): + def fit(self, trips: List[ecwc.Confirmedtrip]): """train the model by passing data, where each row in the data corresponds to a label at the matching index of the label input diff --git a/emission/analysis/modelling/trip_model/run_model.py b/emission/analysis/modelling/trip_model/run_model.py index 45b524f9a..63f1f2ef0 100644 --- a/emission/analysis/modelling/trip_model/run_model.py +++ b/emission/analysis/modelling/trip_model/run_model.py @@ -56,10 +56,7 @@ def update_trip_model( logging.debug(f'model type {model_type.name} is incremental? {model.is_incremental}') logging.debug(f'time query for training data collection: {time_query}') - ts = esta.TimeSeries.get_time_series(user_id) trips = _get_training_data(user_id, time_query) - - trips_df = ts.to_data_df("analysis/confirmed_trip",trips) # don't start training for a user that doesn't have at least $trips many trips # (assume if a stored model exists for the user, that they met this requirement previously) if len(trips) == 0: @@ -77,7 +74,7 @@ def update_trip_model( # train and store the model. pass both List of event and dataframe time data # that both standard( which mostly work on df) and self implemented models can use. - model.fit(trips,trips_df) + model.fit(trips) model_data_next = model.to_dict() if len(model_data_next) == 0: