diff --git a/conf/analysis/trip_model.conf.json.sample b/conf/analysis/trip_model.conf.json.sample index 845e67a6a..4851be5d6 100644 --- a/conf/analysis/trip_model.conf.json.sample +++ b/conf/analysis/trip_model.conf.json.sample @@ -1,5 +1,5 @@ { - "model_type": "greedy", + "model_type": "forest", "model_storage": "document_database", "minimum_trips": 14, "model_parameters": { @@ -8,6 +8,24 @@ "similarity_threshold_meters": 500, "apply_cutoff": false, "incremental_evaluation": false + }, + "forest": { + "loc_feature" : "coordinates", + "radius": 100, + "size_thresh":1, + "purity_thresh":1.0, + "gamma":0.05, + "C":1, + "n_estimators":100, + "criterion":"gini", + "max_depth":null, + "min_samples_split":2, + "min_samples_leaf":1, + "max_features":"sqrt", + "bootstrap":true, + "random_state":42, + "use_start_clusters":false, + "use_trip_clusters":true } } } \ No newline at end of file diff --git a/emission/analysis/modelling/trip_model/dbscan_svm.py b/emission/analysis/modelling/trip_model/dbscan_svm.py new file mode 100644 index 000000000..58cd8f7e0 --- /dev/null +++ b/emission/analysis/modelling/trip_model/dbscan_svm.py @@ -0,0 +1,250 @@ +import emission.analysis.modelling.trip_model.trip_model as eamuu +from sklearn.cluster import DBSCAN +import logging +import numpy as np +import pandas as pd +import emission.analysis.modelling.trip_model.util as eamtu +from sklearn.preprocessing import StandardScaler +from sklearn.pipeline import make_pipeline +from sklearn import svm +from sklearn.metrics.pairwise import haversine_distances + +EARTH_RADIUS = 6371000 + +class DBSCANSVMCluster(eamuu.TripModel): + """ DBSCAN-based clustering algorithm that optionally implements SVM + sub-clustering. + + Args: + loc_type (str): 'start' or 'end', the type of point to cluster + radius (int): max distance between two points in each other's + neighborhood, i.e. DBSCAN's eps value. does not strictly + dictate final cluster size + size_thresh (int): the min number of trips a cluster must have + to be considered for SVM sub-division + purity_thresh (float): the min purity a cluster must have + to be sub-divided using SVM + gamma (float): coefficient for the rbf kernel in SVM + C (float): regularization hyperparameter for SVM + + Attributes: + loc_type (str) + radius (int) + size_thresh (int) + purity_thresh (float) + gamma (float) + C (float) + train_df (DataFrame) + test_df (DataFrame) + base_model (sklearn Estimator) + """ + + def __init__(self, + loc_type='end', + radius=100, + svm=True, + size_thresh=1, + purity_thresh=1.0, + gamma=0.05, + C=1): + logging.info("PERF: Initializing DBSCANSVMCluster") + self.loc_type = loc_type + self.radius = radius + self.svm = svm + self.size_thresh = size_thresh + self.purity_thresh = purity_thresh + self.gamma = gamma + self.C = C + + def set_params(self, params): + if 'loc_type' in params.keys(): self.loc_type = params['loc_type'] + if 'radius' in params.keys(): self.radius = params['radius'] + if 'svm' in params.keys(): self.svm = params['svm'] + if 'size_thresh' in params.keys(): + self.size_thresh = params['size_thresh'] + if 'purity_thresh' in params.keys(): + self.purity_thresh = params['purity_thresh'] + if 'gamma' in params.keys(): self.gamma = params['gamma'] + + return self + + def fit(self, train_df,ct_entry=None): + """ Creates clusters of trip points. + self.train_df will be updated with columns containing base and + final clusters. + + TODO: perhaps move the loc_type argument to fit() so we can use a + single class instance to cluster both start and end points. This + will also help us reduce duplicate data. + + Args: + train_df (dataframe): dataframe of labeled trips + ct_entry (List) : A list of Entry type of labeled and unlabeled trips + """ + ################## + ### clean data ### + ################## + logging.info("PERF: Fitting DBSCANSVMCluster") + self.train_df = self._clean_data(train_df) + + # we can use all trips as long as they have purpose labels. it's ok if + # they're missing mode/replaced-mode labels, because they aren't as + # strongly correlated with location compared to purpose + # TODO: actually, we may want to rethink this. for example, it will + # probably be helpful to include trips that are missing purpose labels + # but still have mode labels. + if self.train_df.purpose_true.isna().any(): + num_nan = self.train_df.purpose_true.value_counts( + dropna=False).loc[np.nan] + logging.info( + f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels' + ) + self.train_df = self.train_df.dropna( + subset=['purpose_true']).reset_index(drop=True) + if len(self.train_df) == 0: + # i.e. no valid trips after removing all nans + raise Exception('no valid trips; nothing to fit') + + ######################### + ### get base clusters ### + ######################### + dist_matrix_meters = eamtu.get_distance_matrix(self.train_df, self.loc_type) + self.base_model = DBSCAN(self.radius, + metric="precomputed", + min_samples=1).fit(dist_matrix_meters) + base_clusters = self.base_model.labels_ + + self.train_df.loc[:, + f'{self.loc_type}_base_cluster_idx'] = base_clusters + + ######################## + ### get sub-clusters ### + ######################## + # copy base cluster column into final cluster column + self.train_df.loc[:, f'{self.loc_type}_cluster_idx'] = self.train_df[ + f'{self.loc_type}_base_cluster_idx'] + + if self.svm: + c = 0 # count of how many clusters we have iterated over + + # iterate over all clusters and subdivide them with SVM. the while + # loop is so we can do multiple iterations of subdividing if needed + while c < self.train_df[f'{self.loc_type}_cluster_idx'].max(): + points_in_cluster = self.train_df[ + self.train_df[f'{self.loc_type}_cluster_idx'] == c] + + # only do SVM if we have the minimum num of trips in the cluster + if len(points_in_cluster) < self.size_thresh: + c += 1 + continue + + # only do SVM if purity is below threshold + purity = eamtu.single_cluster_purity(points_in_cluster, + label_col='purpose_true') + if purity < self.purity_thresh: + X = points_in_cluster[[ + f"{self.loc_type}_lon", f"{self.loc_type}_lat" + ]] + y = points_in_cluster.purpose_true.to_list() + + svm_model = make_pipeline( + StandardScaler(), + svm.SVC( + kernel='rbf', + gamma=self.gamma, + C=self.C, + )).fit(X, y) + labels = svm_model.predict(X) + unique_labels = np.unique(labels) + + # if the SVM predicts that all points in the cluster have + # the same label, just ignore it and don't reindex. + # this also helps us to handle the possibility that a + # cluster may be impure but inherently inseparable, e.g. an + # end cluster at a user's home, containing 50% trips from + # work to home and 50% round trips that start and end at + # home. we don't want to reindex otherwise the low purity + # will trigger SVM again, and we will attempt & fail to + # split the cluster ad infinitum + if len(unique_labels) > 1: + # map purpose labels to new cluster indices + # we offset indices by the max existing index so that we + # don't run into any duplicate indices + max_existing_idx = self.train_df[ + f'{self.loc_type}_cluster_idx'].max() + label_to_cluster = { + unique_labels[i]: i + max_existing_idx + 1 + for i in range(len(unique_labels)) + } + # update trips with their new cluster indices + indices = np.array( + [label_to_cluster[l] for l in labels]) + self.train_df.loc[ + self.train_df[f'{self.loc_type}_cluster_idx'] == c, + f'{self.loc_type}_cluster_idx'] = indices + + c += 1 + # TODO: make things categorical at the end? or maybe at the start of the decision tree pipeline + + return self + + def fit_predict(self, train_df): + """ Override to avoid unnecessarily computation of distance matrices. + """ + self.fit(train_df) + return self.train_df[[f'{self.loc_type}_cluster_idx']] + + def predict(self, test_df): + logging.info("PERF: Predicting DBSCANSVMCluster") + # TODO: store clusters as polygons so the prediction is faster + # TODO: we probably don't want to store test_df in self to be more memory-efficient + self.test_df = self._clean_data(test_df) + pred_clusters = self._NN_predict(self.test_df) + + self.test_df.loc[:, f'{self.loc_type}_cluster_idx'] = pred_clusters + + return self.test_df[[f'{self.loc_type}_cluster_idx']] + + def _NN_predict(self, test_df): + """ Generate base-cluster predictions for the test data using a + nearest-neighbor approach. + + sklearn doesn't implement predict() for DBSCAN, which is why we + need a custom method. + """ + logging.info("PERF: NN_predicting DBSCANSVMCluster") + n_samples = test_df.shape[0] + labels = np.ones(shape=n_samples, dtype=int) * -1 + + # get coordinates of core points (we can't use model.components_ + # because our input feature was a distance matrix and doesn't contain + # info about the raw coordinates) + # NOTE: technically, every single point in a cluster is a core point + # because it has at least minPts (2) points, including itself, in its + # radius + train_coordinates = self.train_df[[ + f'{self.loc_type}_lat', f'{self.loc_type}_lon' + ]] + train_radians = np.radians(train_coordinates) + + for idx, row in test_df.reset_index(drop=True).iterrows(): + # calculate the distances between the ith test data and all points, + # then find the index of the closest point. if the ith test data is + # within epsilon of the point, then assign its cluster to the ith + # test data (otherwise, leave it as -1, indicating noise). + # unfortunately, pairwise_distances_argmin() does not support + # haversine distance, so we have to reimplement it ourselves + new_loc_radians = np.radians( + row[[self.loc_type + "_lat", self.loc_type + "_lon"]].to_list()) + new_loc_radians = np.reshape(new_loc_radians, (1, 2)) + dist_matrix_meters = haversine_distances( + new_loc_radians, train_radians) * EARTH_RADIUS + + shortest_dist_idx = np.argmin(dist_matrix_meters) + if dist_matrix_meters[0, shortest_dist_idx] < self.radius: + labels[idx] = self.train_df.reset_index( + drop=True).loc[shortest_dist_idx, + f'{self.loc_type}_cluster_idx'] + + return labels + diff --git a/emission/analysis/modelling/trip_model/forest_classifier.py b/emission/analysis/modelling/trip_model/forest_classifier.py new file mode 100644 index 000000000..0816717db --- /dev/null +++ b/emission/analysis/modelling/trip_model/forest_classifier.py @@ -0,0 +1,335 @@ +import pandas as pd +from sklearn.preprocessing import OneHotEncoder +import joblib +from typing import Dict, List, Optional, Tuple +import emission.core.wrapper.confirmedtrip as ecwc +import logging +import numpy as np + + +import emission.analysis.modelling.trip_model.trip_model as eamuu +import emission.analysis.modelling.trip_model.dbscan_svm as eamtd +import emission.analysis.modelling.trip_model.util as eamtu +import emission.analysis.modelling.trip_model.config as eamtc + +from sklearn.ensemble import RandomForestClassifier +class ForestClassifier(eamuu.TripModel): + + def __init__(self,config=None): + + # expected_keys = [ + # 'metric', + # 'similarity_threshold_meters', + # 'apply_cutoff', + # 'incremental_evaluation' + # ] + # for k in expected_keys: + # if config.get(k) is None: + # msg = f"greedy trip model config missing expected key {k}" + # raise KeyError(msg) + if config is None: + config = eamtc.get_config_value_or_raise('model_parameters.forest') + logging.debug(f'ForestClassifier loaded model config from file') + else: + logging.debug(f'ForestClassifier using model config argument') + + self.loc_feature = config['loc_feature'] + self.radius = config['radius'] + self.size_thresh = config['size_thresh'] + self.purity_thresh = config['purity_thresh'] + self.gamma = config['gamma'] + self.C = config['C'] + self.n_estimators = config['n_estimators'] + self.criterion =config['criterion'] + self.max_depth = config['max_depth'] if config['max_depth'] != 'null' else None + self.min_samples_split = config['min_samples_split'] + self.min_samples_leaf = config['min_samples_leaf'] + self.max_features = config['max_features'] + self.bootstrap = config['bootstrap'] + self.random_state = config['random_state'] + # self.drop_unclustered = drop_unclustered + self.use_start_clusters = config['use_start_clusters'] + self.use_trip_clusters = config['use_trip_clusters'] + + if self.loc_feature == 'cluster': + # clustering algorithm to generate end clusters + self.end_cluster_model = eamtd.DBSCANSVMCluster( + loc_type='end', + radius=self.radius, + size_thresh=self.size_thresh, + purity_thresh=self.purity_thresh, + gamma=self.gamma, + C=self.C) + + if self.use_start_clusters or self.use_trip_clusters: + # clustering algorithm to generate start clusters + self.start_cluster_model = eamtd.DBSCANSVMCluster( + loc_type='start', + radius=self.radius, + size_thresh=self.size_thresh, + purity_thresh=self.purity_thresh, + gamma=self.gamma, + C=self.C) + + if self.use_trip_clusters: + # helper class to generate trip-level clusters + self.trip_grouper = eamtd.TripGrouper( + start_cluster_col='start_cluster_idx', + end_cluster_col='end_cluster_idx') + + # wrapper class to generate one-hot encodings for cluster indices + self.cluster_enc = eamtu.OneHotWrapper(sparse=False, + handle_unknown='ignore') + + # wrapper class to generate one-hot encodings for purposes and modes + self.purpose_enc = eamtu.OneHotWrapper(impute_missing=True, + sparse=False, + handle_unknown='error') + self.mode_enc = eamtu.OneHotWrapper(impute_missing=True, + sparse=False, + handle_unknown='error') + + # ensemble classifiers for each label category + self.purpose_predictor = RandomForestClassifier( + n_estimators=self.n_estimators, + criterion=self.criterion, + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + min_samples_leaf=self.min_samples_leaf, + max_features=self.max_features, + bootstrap=self.bootstrap, + random_state=self.random_state) + self.mode_predictor = RandomForestClassifier( + n_estimators=self.n_estimators, + criterion=self.criterion, + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + min_samples_leaf=self.min_samples_leaf, + max_features=self.max_features, + bootstrap=self.bootstrap, + random_state=self.random_state) + self.replaced_predictor = RandomForestClassifier( + n_estimators=self.n_estimators, + criterion=self.criterion, + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + min_samples_leaf=self.min_samples_leaf, + max_features=self.max_features, + bootstrap=self.bootstrap, + random_state=self.random_state) + + + def fit(self,data: List[ecwc.Confirmedtrip],data_df=None): + # get location features + if self.loc_feature == 'cluster': + # fit clustering model(s) and one-hot encode their indices + # TODO: consolidate start/end_cluster_model in a single instance + # that has a location_type parameter in the fit() method + self.end_cluster_model.fit(data_df) + + clusters_to_encode = self.end_cluster_model.train_df[[ + 'end_cluster_idx' + ]].copy() # copy is to avoid SettingWithCopyWarning + + if self.use_start_clusters or self.use_trip_clusters: + self.start_cluster_model.fit(data_df) + + if self.use_start_clusters: + clusters_to_encode = pd.concat([ + clusters_to_encode, + self.start_cluster_model.train_df[['start_cluster_idx']] + ], + axis=1) + if self.use_trip_clusters: + start_end_clusters = pd.concat([ + self.end_cluster_model.train_df[['end_cluster_idx']], + self.start_cluster_model.train_df[['start_cluster_idx']] + ], + axis=1) + trip_cluster_idx = self.trip_grouper.fit_transform( + start_end_clusters) + clusters_to_encode.loc[:, + 'trip_cluster_idx'] = trip_cluster_idx + + loc_features_df = self.cluster_enc.fit_transform( + clusters_to_encode.astype(int)) + + # clean the df again because we need it in the next step + # TODO: remove redundancy + self.train_df = self._clean_data(data_df) + + # TODO: move below code into a reusable function + if self.train_df.purpose_true.isna().any(): + num_nan = self.train_df.purpose_true.value_counts( + dropna=False).loc[np.nan] + logging.info( + f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels' + ) + self.train_df = self.train_df.dropna( + subset=['purpose_true']).reset_index(drop=True) + if len(self.train_df) == 0: + # i.e. no valid trips after removing all nans + raise Exception('no valid trips; nothing to fit') + + else: # self.loc_feature == 'coordinates' + self.train_df = self._clean_data(data_df) + + # TODO: move below code into a reusable function + if self.train_df.purpose_true.isna().any(): + num_nan = self.train_df.purpose_true.value_counts( + dropna=False).loc[np.nan] + logging.info( + f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels' + ) + self.train_df = self.train_df.dropna( + subset=['purpose_true']).reset_index(drop=True) + if len(self.train_df) == 0: + # i.e. no valid trips after removing all nans + raise Exception('no valid trips; nothing to fit') + + loc_features_df = self.train_df[[ + 'start_lon', 'start_lat', 'end_lon', 'end_lat' + ]] + + # prepare data for the ensemble classifiers + + # note that we want to use purpose data to aid our mode predictions, + # and use both purpose and mode data to aid our replaced-mode + # predictions + # thus, we want to one-hot encode the purpose and mode as data + # features, but also preserve an unencoded copy for the target columns + + # dataframe holding all features and targets + self.Xy_train = pd.concat( + [self.train_df[self.base_features + self.targets], loc_features_df], + axis=1) + + # encode purposes and modes + onehot_purpose_df = self.purpose_enc.fit_transform( + self.Xy_train[['purpose_true']], output_col_prefix='purpose') + onehot_mode_df = self.mode_enc.fit_transform( + self.Xy_train[['mode_true']], output_col_prefix='mode') + self.Xy_train = pd.concat( + [self.Xy_train, onehot_purpose_df, onehot_mode_df], axis=1) + + # for predicting purpose, drop encoded purpose and mode features, as + # well as all target labels + self.X_purpose = self.Xy_train.dropna(subset=['purpose_true']).drop( + labels=self.targets + self.purpose_enc.onehot_encoding_cols + + self.mode_enc.onehot_encoding_cols, + axis=1) + + # for predicting mode, we want to keep purpose data + self.X_mode = self.Xy_train.dropna(subset=['mode_true']).drop( + labels=self.targets + self.mode_enc.onehot_encoding_cols, axis=1) + + # for predicting replaced-mode, we want to keep purpose and mode data + self.X_replaced = self.Xy_train.dropna(subset=['replaced_true']).drop( + labels=self.targets, axis=1) + + self.y_purpose = self.Xy_train['purpose_true'].dropna() + self.y_mode = self.Xy_train['mode_true'].dropna() + self.y_replaced = self.Xy_train['replaced_true'].dropna() + + # fit classifiers + if len(self.X_purpose) > 0: + self.purpose_predictor.fit(self.X_purpose, self.y_purpose) + if len(self.X_mode) > 0: + self.mode_predictor.fit(self.X_mode, self.y_mode) + if len(self.X_replaced) > 0: + self.replaced_predictor.fit(self.X_replaced, self.y_replaced) + + def predict(self, data: List[float]) -> Tuple[List[Dict], int]: + pass + + def to_dict(self) -> Dict: + return joblib.dump(self,compress=3) + + def from_dict(self, model: Dict): + pass + + def is_incremental(self) -> bool: + pass + + def extract_features(self, trip: ecwc.Confirmedtrip) -> List[float]: + pass + + def _clean_data(self, df): + """ Clean a dataframe of trips. + (Drop trips with missing start/end locations, expand the user input + columns, ensure all essential columns are present) + + Args: + df: a dataframe of trips. must contain the columns 'start_loc', + 'end_loc', and should also contain the user input columns + ('mode_confirm', 'purpose_confirm', 'replaced_mode') if + available + """ + assert 'start_loc' in df.columns and 'end_loc' in df.columns + + # clean up the dataframe by dropping entries with NaN locations and + # reset index + num_nan = 0 + if df.start_loc.isna().any(): + num_nan += df.start_loc.value_counts(dropna=False).loc[np.nan] + df = df.dropna(subset=['start_loc']) + if df.end_loc.isna().any(): + num_nan += df.end_loc.value_counts(dropna=False).loc[np.nan] + df = df.dropna(subset=['end_loc']) + + # expand the 'start_loc' and 'end_loc' column into 'start_lat', + # 'start_lon', 'end_lat', and 'end_lon' columns + df = self.expand_coords(df) + + # drop trips with missing coordinates + if df.start_lat.isna().any(): + num_nan += df.start_lat.value_counts(dropna=False).loc[np.nan] + df = df.dropna(subset=['start_lat']) + if df.start_lon.isna().any(): + num_nan += df.start_lon.value_counts(dropna=False).loc[np.nan] + df = df.dropna(subset=['start_lon']) + if df.end_lat.isna().any(): + num_nan += df.end_lat.value_counts(dropna=False).loc[np.nan] + df = df.dropna(subset=['end_lat']) + if df.end_lon.isna().any(): + num_nan = df.end_lon.value_counts(dropna=False).loc[np.nan] + df += df.dropna(subset=['end_lon']) + if num_nan > 0: + logging.info( + f'dropped {num_nan} trips that are missing location coordinates' + ) + + df = df.rename( + columns={ + 'mode_confirm': 'mode_true', + 'purpose_confirm': 'purpose_true', + 'replaced_mode': 'replaced_true' + }) + + for category in ['mode_true', 'purpose_true', 'replaced_true']: + if category not in df.columns: + # for example, if a user labels all their trip modes but none of their trip purposes + df.loc[:, category] = np.nan + + return df.reset_index(drop=True) + + def expand_coords(exp_df, purpose=None): + """ + copied and modifed from get_loc_df_for_purpose() in the 'Radius + selection' notebook + """ + purpose_trips = exp_df + if purpose is not None: + purpose_trips = exp_df[exp_df.purpose_confirm == purpose] + + dfs = [purpose_trips] + for loc_type in ['start', 'end']: + df = pd.DataFrame( + purpose_trips[loc_type + + "_loc"].apply(lambda p: p["coordinates"]).to_list(), + columns=[loc_type + "_lon", loc_type + "_lat"]) + df = df.set_index(purpose_trips.index) + dfs.append(df) + + # display.display(end_loc_df.head()) + return pd.concat(dfs, axis=1) \ No newline at end of file diff --git a/emission/analysis/modelling/trip_model/greedy_similarity_binning.py b/emission/analysis/modelling/trip_model/greedy_similarity_binning.py index 226fdefb5..4e9ee6fad 100644 --- a/emission/analysis/modelling/trip_model/greedy_similarity_binning.py +++ b/emission/analysis/modelling/trip_model/greedy_similarity_binning.py @@ -128,11 +128,12 @@ class label to apply: self.bins: Dict[str, Dict] = {} - def fit(self, trips: List[ecwc.Confirmedtrip]): + def fit(self, trips: List[ecwc.Confirmedtrip],tripsdf=None): """train the model by passing data, where each row in the data corresponds to a label at the matching index of the label input :param trips: 2D array of features to train from + :param tripsdf: trips data in dataframe format """ logging.debug(f'fit called with {len(trips)} trips') diff --git a/emission/analysis/modelling/trip_model/model_type.py b/emission/analysis/modelling/trip_model/model_type.py index b5e761fb0..2d7e6f743 100644 --- a/emission/analysis/modelling/trip_model/model_type.py +++ b/emission/analysis/modelling/trip_model/model_type.py @@ -3,6 +3,7 @@ import emission.analysis.modelling.trip_model.trip_model as eamuu import emission.analysis.modelling.similarity.od_similarity as eamso import emission.analysis.modelling.trip_model.greedy_similarity_binning as eamug +import emission.analysis.modelling.trip_model.forest_classifier as eamuf SIMILARITY_THRESHOLD_METERS=500 @@ -11,6 +12,7 @@ class ModelType(Enum): # ENUM_NAME_CAPS = 'SHORTHAND_NAME_CAPS' GREEDY_SIMILARITY_BINNING = 'GREEDY' + RANDOM_FOREST_CLASSIFIER = 'FOREST' def build(self, config=None) -> eamuu.TripModel: """ @@ -25,7 +27,8 @@ def build(self, config=None) -> eamuu.TripModel: """ # Dict[ModelType, TripModel] MODELS = { - ModelType.GREEDY_SIMILARITY_BINNING: eamug.GreedySimilarityBinning(config) + #ModelType.GREEDY_SIMILARITY_BINNING: eamug.GreedySimilarityBinning(config), + ModelType.RANDOM_FOREST_CLASSIFIER: eamuf.ForestClassifier(config) } model = MODELS.get(self) if model is None: diff --git a/emission/analysis/modelling/trip_model/run_model.py b/emission/analysis/modelling/trip_model/run_model.py index e3e2b1c4e..45b524f9a 100644 --- a/emission/analysis/modelling/trip_model/run_model.py +++ b/emission/analysis/modelling/trip_model/run_model.py @@ -56,8 +56,10 @@ def update_trip_model( logging.debug(f'model type {model_type.name} is incremental? {model.is_incremental}') logging.debug(f'time query for training data collection: {time_query}') + ts = esta.TimeSeries.get_time_series(user_id) trips = _get_training_data(user_id, time_query) - + + trips_df = ts.to_data_df("analysis/confirmed_trip",trips) # don't start training for a user that doesn't have at least $trips many trips # (assume if a stored model exists for the user, that they met this requirement previously) if len(trips) == 0: @@ -73,8 +75,9 @@ def update_trip_model( epq.mark_trip_model_failed(user_id) else: - # train and store the model - model.fit(trips) + # train and store the model. pass both List of event and dataframe time data + # that both standard( which mostly work on df) and self implemented models can use. + model.fit(trips,trips_df) model_data_next = model.to_dict() if len(model_data_next) == 0: diff --git a/emission/analysis/modelling/trip_model/util.py b/emission/analysis/modelling/trip_model/util.py index 7d22b5d22..0728fb702 100644 --- a/emission/analysis/modelling/trip_model/util.py +++ b/emission/analysis/modelling/trip_model/util.py @@ -1,7 +1,13 @@ from typing import List, Tuple from past.utils import old_div -import numpy +import numpy as np +import pandas as pd from numpy.linalg import norm +import copy + +from sklearn.preprocessing import OneHotEncoder +from sklearn.pipeline import make_pipeline +from sklearn.impute import SimpleImputer def find_knee_point(values: List[float]) -> Tuple[float, int]: @@ -26,16 +32,139 @@ def find_knee_point(values: List[float]) -> Tuple[float, int]: x = list(range(N)) max = 0 index = -1 - a = numpy.array([x[0], values[0]]) - b = numpy.array([x[-1], values[-1]]) + a = np.array([x[0], values[0]]) + b = np.array([x[-1], values[-1]]) n = norm(b - a) new_y = [] for i in range(0, N): - p = numpy.array([x[i], values[i]]) - dist = old_div(norm(numpy.cross(p - a, p - b)), n) + p = np.array([x[i], values[i]]) + dist = old_div(norm(np.cross(p - a, p - b)), n) new_y.append(dist) if dist > max: max = dist index = i value = values[index] return [index, value] + + def get_distance_matrix(loc_df, loc_type): + """ Args: + loc_df (dataframe): must have columns 'start_lat' and 'start_lon' + or 'end_lat' and 'end_lon' + loc_type (str): 'start' or 'end' + """ + assert loc_type == 'start' or loc_type == 'end' + + radians_lat_lon = np.radians(loc_df[[loc_type + "_lat", loc_type + "_lon"]]) + + dist_matrix_meters = pd.DataFrame( + smp.haversine_distances(radians_lat_lon, radians_lat_lon) * + EARTH_RADIUS) + return dist_matrix_meters + +def single_cluster_purity(points_in_cluster, label_col='purpose_confirm'): + """ Calculates purity of a cluster (i.e. % of trips that have the most + common label) + + Args: + points_in_cluster (df): dataframe containing points in the same + cluster + label_col (str): column in the dataframe containing labels + """ + assert label_col in points_in_cluster.columns + + most_freq_label = points_in_cluster[label_col].mode()[0] + purity = len(points_in_cluster[points_in_cluster[label_col] == + most_freq_label]) / len(points_in_cluster) + return purity + + +class OneHotWrapper(): + """ Helper class to streamline one-hot encoding. + + Args: + impute_missing (bool): whether or not to impute np.nan values. + sparse (bool): whether or not to return a sparse matrix. + handle_unknown (str): specifies the way unknown categories are + handled during transform. + """ + + def __init__( + self, + impute_missing=False, + sparse=False, + handle_unknown='ignore', + ): + self.impute_missing = impute_missing + if self.impute_missing: + self.encoder = make_pipeline( + SimpleImputer(missing_values=np.nan, + strategy='constant', + fill_value='missing'), + OneHotEncoder(sparse=False, handle_unknown=handle_unknown)) + else: + self.encoder = OneHotEncoder(sparse=sparse, + handle_unknown=handle_unknown) + + def fit_transform(self, train_df, output_col_prefix=None): + """ Establish one-hot encoded variables. + + Args: + train_df (DataFrame): DataFrame containing train trips. + output_col_prefix (str): only if train_df is a single column + """ + # TODO: handle pd series + + train_df = train_df.copy() # to avoid SettingWithCopyWarning + + # if imputing, the dtype of each column must be string/object and not + # numerical, otherwise the SimpleImputer will fail + if self.impute_missing: + for col in train_df.columns: + train_df[col] = train_df[col].astype(object) + onehot_encoding = self.encoder.fit_transform(train_df) + self.onehot_encoding_cols_all = [] + for col in train_df.columns: + if train_df.shape[1] > 1 or output_col_prefix is None: + output_col_prefix = col + self.onehot_encoding_cols_all += [ + f'{output_col_prefix}_{val}' + for val in np.sort(train_df[col].dropna().unique()) + ] + # we handle np.nan separately because it is of type float, and may + # cause issues with np.sort if the rest of the unique values are + # strings + if any((train_df[col].isna())): + self.onehot_encoding_cols_all += [f'{output_col_prefix}_nan'] + + onehot_encoding_df = pd.DataFrame( + onehot_encoding, + columns=self.onehot_encoding_cols_all).set_index(train_df.index) + + # ignore the encoded columns for missing entries + self.onehot_encoding_cols = copy.deepcopy(self.onehot_encoding_cols_all) + for col in self.onehot_encoding_cols_all: + if col.endswith('_nan'): + onehot_encoding_df = onehot_encoding_df.drop(columns=[col]) + self.onehot_encoding_cols.remove(col) + + return onehot_encoding_df.astype(int) + + def transform(self, test_df): + """ One-hot encoded features in accordance with features seen in the + train set. + + Args: + test_df (DataFrame): DataFrame of trips. + """ + # TODO: rename test_df, this one doesn't necessarily need to be a df + onehot_encoding = self.encoder.transform(test_df) + onehot_encoding_df = pd.DataFrame( + onehot_encoding, + columns=self.onehot_encoding_cols_all).set_index(test_df.index) + + # ignore the encoded columns for missing entries + for col in self.onehot_encoding_cols_all: + if col.endswith('_nan'): + onehot_encoding_df = onehot_encoding_df.drop(columns=[col]) + + return onehot_encoding_df.astype(int) \ No newline at end of file diff --git a/emission/tests/modellingTests/TestRunForestModel.py b/emission/tests/modellingTests/TestRunForestModel.py new file mode 100644 index 000000000..b668c22b3 --- /dev/null +++ b/emission/tests/modellingTests/TestRunForestModel.py @@ -0,0 +1,200 @@ +import unittest +import logging + +import emission.analysis.modelling.trip_model.model_storage as eamums +import emission.analysis.modelling.trip_model.model_type as eamumt +import emission.analysis.modelling.trip_model.run_model as eamur +import emission.storage.timeseries.abstract_timeseries as esta +import emission.tests.modellingTests.modellingTestAssets as etmm +import emission.storage.decorations.analysis_timeseries_queries as esda +import emission.core.get_database as edb +import emission.storage.pipeline_queries as epq +import emission.core.wrapper.pipelinestate as ecwp + + +class TestRunForestModel(unittest.TestCase): + """these tests were copied forward during a refactor of the tour model + [https://github.com/e-mission/e-mission-server/blob/10772f892385d44e11e51e796b0780d8f6609a2c/emission/analysis/modelling/tour_model_first_only/load_predict.py#L114] + + it's uncertain what condition they are in besides having been refactored to + use the more recent tour modeling code. + """ + + def setUp(self): + """ + sets up the end-to-end run model test with Confirmedtrip data + """ + logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', + level=logging.DEBUG) + + # configuration for randomly-generated test data + self.user_id = user_id = 'TestRunForestModel-TestData' + self.origin = (-105.1705977, 39.7402654,) + self.destination = (-105.1755606, 39.7673075) + self.min_trips = 14 + self.total_trips = 100 + self.clustered_trips = 33 # must have at least self.min_trips similar trips by default + self.has_label_percent = 0.9 # let's make a few that don't have a label, but invariant + # $clustered_trips * $has_label_percent > self.min_trips + # must be correct or else this test could fail under some random test cases. + + # for a negative test, below + self.unused_user_id = 'asdjfkl;asdfjkl;asd08234ur13fi4jhf2103mkl' + + # test data can be saved between test invocations, check if data exists before generating + ts = esta.TimeSeries.get_time_series(user_id) + test_data = list(ts.find_entries(["analysis/confirmed_trip"])) + if len(test_data) == 0: + # generate test data for the database + logging.debug(f"inserting mock Confirmedtrips into database") + + # generate labels with a known sample weight that we can rely on in the test + label_data = { + "mode_confirm": ['ebike', 'bike'], + "purpose_confirm": ['happy-hour', 'dog-park'], + "replaced_mode": ['walk'], + "mode_weights": [0.9, 0.1], + "purpose_weights": [0.1, 0.9] + } + + train = etmm.generate_mock_trips( + user_id=user_id, + trips=self.total_trips, + origin=self.origin, + destination=self.destination, + trip_part='od', + label_data=label_data, + within_threshold=self.clustered_trips, + threshold=0.004, # ~400m + has_label_p=self.has_label_percent + ) + + ts.bulk_insert(train) + + # confirm data write did not fail + test_data = esda.get_entries(key="analysis/confirmed_trip", user_id=user_id, time_query=None) + if len(test_data) != self.total_trips: + logging.debug(f'test invariant failed after generating test data') + self.fail() + else: + logging.debug(f'found {self.total_trips} trips in database') + + def tearDown(self): + """ + clean up database + """ + edb.get_analysis_timeseries_db().delete_many({'user_id': self.user_id}) + edb.get_model_db().delete_many({'user_id': self.user_id}) + edb.get_pipeline_state_db().delete_many({'user_id': self.user_id}) + + def testBuildForestModelFromConfig(self): + """ + forest model takes config arguments via the constructor for testing + purposes but will load from a file in /conf/analysis/ which is tested here + """ + + eamumt.ModelType.RANDOM_FOREST_CLASSIFIER.build() + # success if it didn't throw + + def testTrainForestModelWithZeroTrips(self): + """ + forest model takes config arguments via the constructor for testing + purposes but will load from a file in /conf/analysis/ which is tested here + """ + + # pass along debug model configuration + forest_model_config= { + "loc_feature" : "coordinates", + "radius": 500, + "size_thresh":1, + "purity_thresh":1.0, + "gamma":0.05, + "C":1, + "n_estimators":100, + "criterion":"gini", + "max_depth":'null', + "min_samples_split":2, + "min_samples_leaf":1, + "max_features":"sqrt", + "bootstrap":True, + "random_state":42, + "use_start_clusters":False, + "use_trip_clusters":True + } + + logging.debug(f'~~~~ do nothing ~~~~') + eamur.update_trip_model( + user_id=self.unused_user_id, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + min_trips=self.min_trips, + model_config=forest_model_config + ) + + # user had no entries so their pipeline state should not have been set + # if it was set, the time query here would + stage = ecwp.PipelineStages.TRIP_MODEL + pipeline_state = epq.get_current_state(self.unused_user_id, stage) + self.assertIsNone( + pipeline_state['curr_run_ts'], + "pipeline should not have a current timestamp for the test user") + +# TODO :complete this test once prediction part is done + +''' + def test1RoundTripGreedySimilarityBinning(self): + """ + train a model, save it, load it, and use it for prediction, using + the high-level training/testing API provided via + run_model.py:update_trip_model() # train + run_model.py:predict_labels_with_n() # test + + for clustering, use the default greedy similarity binning model + """ + + # pass along debug model configuration + forest_model_config= { + "loc_feature" : "coordinates", + "radius": 500, + "size_thresh":1, + "purity_thresh":1.0, + "gamma":0.05, + "C":1, + "n_estimators":100, + "criterion":"gini", + "max_depth":'null', + "min_samples_split":2, + "min_samples_leaf":1, + "max_features":"sqrt", + "bootstrap":True, + "random_state":42, + "use_start_clusters":False, + "use_trip_clusters":True + } + + logging.debug(f'(TRAIN) creating a model based on trips in database') + eamur.update_trip_model( + user_id=self.user_id, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + min_trips=self.min_trips, + model_config=forest_model_config + ) + + logging.debug(f'(TEST) testing prediction of stored model') + test = etmm.build_mock_trip( + user_id=self.user_id, + origin=self.origin, + destination=self.destination + ) + prediction, n = eamur.predict_labels_with_n( + trip = test, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + model_config=forest_model_config + ) + + [logging.debug(p) for p in sorted(prediction, key=lambda r: r['p'], reverse=True)] + + self.assertNotEqual(len(prediction), 0, "should have a prediction") +'''