diff --git a/emission/analysis/modelling/trip_model/clustering.py b/emission/analysis/modelling/trip_model/clustering.py
new file mode 100644
index 000000000..d3924f32a
--- /dev/null
+++ b/emission/analysis/modelling/trip_model/clustering.py
@@ -0,0 +1,410 @@
+# helper functions to streamline the use and comparison of clustering algs
+
+# basic imports
+import pandas as pd
+import numpy as np
+import logging
+
+# import clustering algorithms
+import sklearn.metrics.pairwise as smp
+import sklearn.cluster as sc
+from sklearn import metrics
+from sklearn import svm
+from sklearn.pipeline import make_pipeline
+from sklearn.preprocessing import StandardScaler
+
+# our imports
+# NOTE: this requires changing the branch of e-mission-server to
+# eval-private-data-compatibility
+import emission.storage.decorations.trip_queries as esdtq
+import emission.analysis.modelling.trip_model.greedy_similarity_binning as eamtg
+
+EARTH_RADIUS = 6371000
+ALG_OPTIONS = [
+ 'DBSCAN',
+ 'naive',
+ 'OPTICS',
+ # 'fuzzy',
+ 'mean_shift'
+]
+
+def cleanEntryTypeData(loc_df,trip_entry):
+
+ """
+ Helps weed out entries from the list of entries which were removed from the df using
+ esdtq.filter_labeled_trips() and esdtq.expand_userinputs()
+
+ loc_df : dataframe amde from entry type data
+ trip_entry : the entry type equivalent of loc_df ,
+ which was passed alongside the dataframe while loading the data
+
+ """
+
+ ids_in_df=loc_df['_id']
+ filtered_trip_entry = list(filter(lambda entry: entry['_id'] in ids_in_df.values, trip_entry))
+ return filtered_trip_entry
+
+
+def add_loc_clusters(
+ loc_df,
+ trip_entry,
+ clustering_way,
+ radii,
+ loc_type,
+ alg,
+ SVM=False,
+ # cluster_unlabeled=False,
+ min_samples=1,
+ optics_min_samples=None,
+ optics_xi=0.05,
+ optics_cluster_method='xi',
+ svm_min_size=6,
+ svm_purity_thresh=0.7,
+ svm_gamma=0.05,
+ svm_C=1):
+ """ Given a dataframe of trips, cluster the locations (either start or end
+ locations) using the desired algorithm & parameters.
+
+ Returns:
+ Same dataframe, with appended columns that contain the resulting cluster indices.
+
+ Args:
+ loc_df (dataframe): must have columns 'start_lat' and 'start_lon'
+ or 'end_lat' and 'end_lon'
+ trip_entry ( list of Entry/confirmedTrip): list consisting all entries from the
+ time data was loaded. loc_df was obtained from this by converting to df and
+ then filtering out labeled trips and expanding user_inputs
+ radii (int list): list of radii to run the clustering algs with
+ loc_type (str): 'start' or 'end'
+ alg (str): 'DBSCAN', 'naive', 'OPTICS', 'SVM', 'fuzzy', or
+ 'mean_shift'
+ SVM (bool): whether or not to sub-divide clusters with SVM
+ # cluster_unlabeled (bool): whether or not unlabeled points are used
+ # to generate clusters.
+ min_samples (int): min samples per cluster. used in DBSCAN (and
+ therefore also SVM and fuzzy, for now)
+ optics_min_samples (int): min samples per cluster, if using OPTICS.
+ optics_xi (float): xi value if using the xi method of OPTICS.
+ optics_cluster_method (str): method to use for the OPTICS
+ algorithm. either 'xi' or 'dbscan'
+ svm_min_size (int): the min number of trips a cluster must have to
+ be considered for sub-division, if using SVM
+ svm_purity_thresh (float): the min purity a cluster must have to be
+ sub-divided, if using SVM
+ svm_gamma (float): if using SVM, the gamma hyperparameter
+ svm_C (float): if using SVM, the C hyperparameter
+ """
+ assert loc_type == 'start' or loc_type == 'end'
+ assert alg in ALG_OPTIONS
+
+ # if using SVM, we get the initial clusters with DBSCAN, then sub-divide
+ if alg == 'DBSCAN':
+ dist_matrix_meters = get_distance_matrix(loc_df, loc_type)
+
+ for r in radii:
+ model = sc.DBSCAN(r, metric="precomputed",
+ min_samples=min_samples).fit(dist_matrix_meters)
+ labels = model.labels_
+ # print(model.n_features_in_)
+ # print(model.components_.shape)
+ # print(model.components_)
+
+ # pd.Categorical converts the type from int to category (so
+ # numerical operations aren't possible)
+ # loc_df.loc[:,
+ # f"{loc_type}_DBSCAN_clusters_{r}_m"] = pd.Categorical(
+ # labels)
+ # TODO: fix this and make it Categorical again (right now labels are
+ # ints)
+ loc_df.loc[:, f"{loc_type}_DBSCAN_clusters_{r}_m"] = labels
+
+ elif alg == 'naive':
+
+ cleaned_trip_entry= cleanEntryTypeData(loc_df,trip_entry)
+
+ for r in radii:
+ # this is using a modified Similarity class that bins start/end
+ # points separately before creating trip-level bins
+
+ model_config = {
+ "metric": "od_similarity",
+ "similarity_threshold_meters": r, # meters,
+ "apply_cutoff": False,
+ "clustering_way": clustering_way,
+ "shouldFilter":False,
+ "incremental_evaluation": False
+ }
+
+ sim_model = eamtg.GreedySimilarityBinning(model_config)
+ sim_model.fit(cleaned_trip_entry)
+ labels = [int(l) for l in sim_model.tripLabels]
+ # # pd.Categorical converts the type from int to category (so
+ # # numerical operations aren't possible)
+ # loc_df.loc[:, f"{loc_type}_{alg}_clusters_{r}_m"] = pd.Categorical(
+ # labels)
+ loc_df.loc[:, f"{loc_type}_{alg}_clusters_{r}_m"] = labels
+
+ elif alg == 'OPTICS':
+ if optics_min_samples == None:
+ optics_min_samples = 2
+ dist_matrix_meters = get_distance_matrix(loc_df, loc_type)
+
+ for r in radii:
+ labels = sc.OPTICS(
+ min_samples=optics_min_samples,
+ max_eps=r,
+ xi=optics_xi,
+ cluster_method=optics_cluster_method,
+ metric="precomputed").fit(dist_matrix_meters).labels_
+
+ # # pd.Categorical converts the type from int to category (so
+ # # numerical operations aren't possible)
+ # loc_df.loc[:, f"{loc_type}_{alg}_clusters_{r}_m"] = pd.Categorical(
+ # labels)
+ loc_df.loc[:, f"{loc_type}_{alg}_clusters_{r}_m"] = labels
+
+ elif alg == 'fuzzy':
+ # create clusters with completely homogeneous purpose labels
+ # I'm calling this 'fuzzy' for now since the clusters overlap, but I
+ # need to think of a better name
+ logging.warning(
+ 'This alg is not properly implemented and will not generate clusters for unlabeled trips!'
+ )
+
+ purpose_list = loc_df.purpose_confirm.dropna().unique()
+
+ for p in purpose_list:
+ p_loc_df = loc_df.loc[loc_df['purpose_confirm'] == p]
+ dist_matrix_meters = get_distance_matrix(p_loc_df, loc_type)
+
+ for r in radii:
+ labels = sc.DBSCAN(
+ r, metric="precomputed",
+ min_samples=min_samples).fit(dist_matrix_meters).labels_
+
+ # pd.Categorical converts the type from int to category (so
+ # numerical operations aren't possible)
+ # loc_df.loc[:,
+ # f"{loc_type}_DBSCAN_clusters_{r}_m"] = pd.Categorical(
+ # labels)
+ loc_df.loc[loc_df['purpose_confirm'] == p,
+ f"{loc_type}_{alg}_clusters_{r}_m"] = labels
+
+ # move "noisy" trips to their own single-trip clusters
+ noisy_trips = loc_df.loc[(loc_df['purpose_confirm'] == p) & (
+ loc_df[f"{loc_type}_{alg}_clusters_{r}_m"] == -1)]
+ for idx in noisy_trips.index.values:
+ max_idx_inside_p = loc_df.loc[
+ (loc_df['purpose_confirm'] == p),
+ f"{loc_type}_{alg}_clusters_{r}_m"].max()
+ loc_df.loc[
+ idx,
+ f"{loc_type}_{alg}_clusters_{r}_m"] = 1 + max_idx_inside_p
+
+ # we offset all cluster indices for purpose p by the max
+ # existing index excluding purpose p
+ # so that we don't run into any duplicate trouble
+ max_idx_outside_p = loc_df.loc[
+ (loc_df['purpose_confirm'] != p),
+ f"{loc_type}_{alg}_clusters_{r}_m"].max(skipna=True)
+
+ if np.isnan(max_idx_outside_p):
+ # can happen if column is empty, e.g. if this is the first
+ # purpose in the list that we are iterating over
+ max_idx_outside_p = -1
+
+ logging.debug('max_idx_outside_p', max_idx_outside_p,
+ "at radius", r)
+
+ loc_df.loc[
+ loc_df['purpose_confirm'] == p,
+ f"{loc_type}_{alg}_clusters_{r}_m"] += 1 + max_idx_outside_p
+
+ elif alg == 'mean_shift':
+ for r in radii:
+ # seems like the bandwidth is based on the raw lat/lon data (we
+ # never pass in a distance matrix), so we want a conversion factor
+ # from meters to degrees. Since 100-500m corresponds to such a
+ # small degree change, we can rely on the small angle approximation
+ # and just use a linear multiplier. This conversion factor doesn't
+ # have to be *super* accurate, its just so we can get a sense of
+ # what the bandwidth roughly corresponds to in the real world/make
+ # the value a little more interpretable.
+ LATLON_TO_M = 1 / 111139
+ labels = sc.MeanShift(
+ bandwidth=LATLON_TO_M * r,
+ min_bin_freq=min_samples,
+ cluster_all=False,
+ ).fit(loc_df[[f"{loc_type}_lon", f"{loc_type}_lat"]]).labels_
+
+ # pd.Categorical converts the type from int to category (so
+ # numerical operations aren't possible)
+ # loc_df.loc[:,
+ # f"{loc_type}_DBSCAN_clusters_{r}_m"] = pd.Categorical(
+ # labels)
+ # TODO: fix this and make it Categorical again (right now labels are
+ # ints)
+ loc_df.loc[:, f"{loc_type}_mean_shift_clusters_{r}_m"] = labels
+
+ # move "noisy" trips to their own single-trip clusters
+ for idx in loc_df.loc[
+ loc_df[f"{loc_type}_mean_shift_clusters_{r}_m"] ==
+ -1].index.values:
+ loc_df.loc[
+ idx, f"{loc_type}_mean_shift_clusters_{r}_m"] = 1 + loc_df[
+ f"{loc_type}_mean_shift_clusters_{r}_m"].max()
+
+ if SVM:
+ loc_df = add_loc_SVM(loc_df, radii, alg, loc_type, svm_min_size,
+ svm_purity_thresh, svm_gamma, svm_C)
+ return loc_df
+
+
+def add_loc_SVM(loc_df,
+ radii,
+ alg,
+ loc_type,
+ svm_min_size=6,
+ svm_purity_thresh=0.7,
+ svm_gamma=0.05,
+ svm_C=1,
+ cluster_cols=None):
+ """ Sub-divide base clusters using SVM.
+
+ Args:
+ loc_df (dataframe): must have columns 'start_lat' and 'start_lon'
+ or 'end_lat' and 'end_lon', as well as
+ '{loc_type}_{base_alg}_SVM_clusters_{r}_m', containing cluster indices generated by the base clustering alg
+ radii (int list): list of radii to run the clustering algs with
+ loc_type (str): 'start' or 'end'
+ svm_min_size (int): the min number of trips a cluster must have to
+ be considered for sub-division
+ svm_purity_thresh (float): the min purity a cluster must have to be
+ sub-divided
+ svm_gamma (float): the gamma hyperparameter
+ svm_C(float): the C hyperparameter
+ cluster_col (str list): names of column containing cluster indices
+ of interest
+ """
+ assert loc_type == 'start' or loc_type == 'end'
+ assert f'{loc_type}_lat' in loc_df.columns
+ assert f'{loc_type}_lon' in loc_df.columns
+
+ for i in range(len(radii)):
+ r = radii[i]
+ if cluster_cols == None:
+ cluster_col = f"{loc_type}_{alg}_clusters_{r}_m"
+ else:
+ cluster_col = cluster_cols[i]
+ assert cluster_col in loc_df.columns
+
+ # c is the count of how many clusters we have iterated over
+ c = 0
+ # iterate over all clusters and subdivide them with SVM. The while loop
+ # is so we can do multiple iterations of subdividing if needed
+ while c < loc_df[cluster_col].max():
+ points_in_cluster = loc_df.loc[loc_df[cluster_col] == c]
+
+ labeled_points_in_cluster = points_in_cluster.dropna(
+ subset=['purpose_confirm'])
+
+ # only do SVM if we have at least labeled 6 points in the cluster
+ # (or custom min_size)
+ if len(labeled_points_in_cluster) < svm_min_size:
+ c += 1
+ continue
+
+ # only do SVM if purity is below threshold
+ purity = single_cluster_purity(labeled_points_in_cluster)
+ if purity < svm_purity_thresh:
+ X_train = labeled_points_in_cluster[[
+ f"{loc_type}_lon", f"{loc_type}_lat"
+ ]]
+ X_all = points_in_cluster[[
+ f"{loc_type}_lon", f"{loc_type}_lat"
+ ]]
+ y_train = labeled_points_in_cluster.purpose_confirm.to_list()
+
+ labels = make_pipeline(
+ StandardScaler(),
+ svm.SVC(
+ kernel='rbf',
+ gamma=svm_gamma,
+ C=svm_C,
+ )).fit(X_train, y_train).predict(X_all)
+
+ unique_labels = np.unique(labels)
+
+ # map from purpose labels to new cluster indices
+ # we offset indices by the max existing index so that
+ # we don't run into any duplicate trouble
+ max_existing_idx = loc_df[cluster_col].max()
+
+ # # if the indices are Categorical, need to convert to
+ # # ordered values
+ # max_existing_idx = np.amax(
+ # existing_cluster_indices.as_ordered())
+
+ # labels = np.array(svc.predict(X))
+ label_to_cluster = {
+ unique_labels[i]: i + max_existing_idx + 1
+ for i in range(len(unique_labels))
+ }
+
+ # if the SVM predicts everything with the same label, just
+ # ignore it and don't reindex.
+
+ # this also helps us to handle the possibility that a cluster
+ # may be impure but inherently inseparable, e.g. an end cluster
+ # containing 50% 'home' trips and 50% round trips to pick up/
+ # drop off. we don't want to reindex otherwise the low purity
+ # will trigger SVM again, and we will attempt & fail to split
+ # the cluster ad infinitum
+ if len(unique_labels) > 1:
+ indices = np.array([label_to_cluster[l] for l in labels])
+
+ loc_df.loc[loc_df[cluster_col] == c, cluster_col] = indices
+
+ c += 1
+
+ return loc_df
+
+
+def get_distance_matrix(loc_df, loc_type):
+ """ Args:
+ loc_df (dataframe): must have columns 'start_lat' and 'start_lon'
+ or 'end_lat' and 'end_lon'
+ loc_type (str): 'start' or 'end'
+ """
+ assert loc_type == 'start' or loc_type == 'end'
+
+ radians_lat_lon = np.radians(loc_df[[loc_type + "_lat", loc_type + "_lon"]])
+
+ dist_matrix_meters = pd.DataFrame(
+ smp.haversine_distances(radians_lat_lon, radians_lat_lon) *
+ EARTH_RADIUS)
+ return dist_matrix_meters
+
+
+def single_cluster_purity(points_in_cluster, label_col='purpose_confirm'):
+ """ Calculates purity of a cluster (i.e. % of trips that have the most
+ common label)
+
+ Args:
+ points_in_cluster (df): dataframe containing points in the same
+ cluster
+ label_col (str): column in the dataframe containing labels
+ """
+ assert label_col in points_in_cluster.columns
+
+ most_freq_label = points_in_cluster[label_col].mode()[0]
+ purity = len(points_in_cluster[points_in_cluster[label_col] ==
+ most_freq_label]) / len(points_in_cluster)
+ return purity
+
+
+def purity_score(y_true, y_pred):
+ contingency_matrix = metrics.cluster.contingency_matrix(y_true, y_pred)
+ purity = np.sum(np.amax(contingency_matrix,
+ axis=0)) / np.sum(contingency_matrix)
+ return purity
diff --git a/emission/analysis/modelling/trip_model/data_wrangling.py b/emission/analysis/modelling/trip_model/data_wrangling.py
new file mode 100644
index 000000000..137886190
--- /dev/null
+++ b/emission/analysis/modelling/trip_model/data_wrangling.py
@@ -0,0 +1,238 @@
+import pandas as pd
+import numpy as np
+import logging
+
+
+def expand_df_dict(df, column_name):
+ """
+ df: a dataframe that contains a column whose values are dictionaries
+ column_name: name of the df's column containing dictionary entries
+
+ Returns a dataframe with the desired column expanded into the main dataframe
+
+ This is a generalized version of the expand_userinputs() function from
+ e-mission-server/emission/storage/decorations/trip_queries.py
+ """
+ if len(df) == 0:
+ return df
+ expanded_col = pd.DataFrame(df.loc[:, column_name].to_list(),
+ index=df.index)
+ logging.debug(expanded_col.head())
+ df = df.drop(columns=[column_name])
+ expanded_df = pd.concat([df, expanded_col], axis=1)
+ assert len(expanded_df) == len(df), \
+ ("Mismatch after expanding labels, expanded_df.rows = %s != df.columns %s" %
+ (len(expanded_df), len(df)))
+ logging.debug("After expanding, columns went from %s -> %s" %
+ (len(df.columns), len(expanded_df.columns)))
+ logging.debug(expanded_df.head())
+ return expanded_df
+
+
+# oops, this is actually just the same as pd's explode()
+def expand_df_list_vert(df, column_name):
+ """
+ df: a dataframe that contains a column whose values are lists
+ column_name: name of the df's column containing list entries. (the
+ length of the list entry can vary from row to row.)
+
+ Returns a dataframe with the desired column expanded vertically into
+ the main dataframe, i.e. for each row in the original dataframe, there
+ will be n rows in the expanded dataframe where n is the length of its
+ list entry under 'column_name'
+ """
+ if len(df) == 0:
+ return df
+
+ expanded_df_list = []
+ for i in range(len(df)):
+ col_list = df.loc[i, column_name]
+ for e in col_list:
+ # add new row to new_df
+ new_row = df.loc[i].to_dict()
+ new_row[column_name] = e
+ expanded_df_list += [new_row]
+
+ if len(expanded_df_list) == 0:
+ logging.debug(
+ '{} only has empty lists; expansion failed.'.format(column_name))
+ raise Exception('expansion failed; empty lists')
+
+ expanded_df = pd.DataFrame(expanded_df_list)
+
+ assert len(expanded_df.columns) == len(df.columns), \
+ ("Mismatch after expanding labels, expanded_df.columns = %s != df.columns %s" %
+ (len(expanded_df.columns), len(df.columns)))
+ logging.debug("After expanding, rows went from %s -> %s" %
+ (len(df), len(expanded_df)))
+
+ return expanded_df
+
+
+def expand_df_list_horiz(df, column_name):
+ """
+ df: a dataframe that contains a column whose values are lists
+ column_name: name of the df's column containing list entries. (the
+ length of the list entry must be consistent for all rows.)
+
+ Returns a dataframe with the desired column expanded horizontally into
+ the main dataframe, i.e. 'column_name' will be replaced by n columns
+ where n is the length of each list entry
+ """
+ if len(df) == 0:
+ return df
+ expanded_col = pd.DataFrame(df.loc[:, column_name].to_list(),
+ index=df.index)
+ logging.debug(expanded_col.head())
+ df = df.drop(columns=[column_name])
+ expanded_df = pd.concat([df, expanded_col], axis=1)
+ assert len(expanded_df) == len(df), \
+ ("Mismatch after expanding labels, expanded_df.rows = %s != df.columns %s" %
+ (len(expanded_df), len(df)))
+ logging.debug("After expanding, columns went from %s -> %s" %
+ (len(df.columns), len(expanded_df.columns)))
+ logging.debug(expanded_df.head())
+ return expanded_df
+
+
+def add_top_pred(df, trip_id_column='trip_id', pred_conf_column='pred_conf'):
+ """ df: dataframe containing trip ids, predicted labels and confidence level
+ trip_id_column: string, the name of the column containing trip ids
+ pred_conf_column: string, the name of the column containing prediction confidence
+ """
+ df['top_pred'] = False
+ for trip_id in df[trip_id_column].unique():
+ id_max = df[df[trip_id_column] == trip_id][pred_conf_column].idxmax(
+ skipna=True)
+ if not np.isnan(id_max):
+ df.loc[id_max, 'top_pred'] = True
+
+ return df
+
+
+def trips_to_df(trips, user_id, os_df=None):
+ datas = []
+ for i in range(len(trips)):
+ t = trips[i]
+ if 'inferred_labels' not in t['data'] or t['data'][
+ 'inferred_labels'] == []:
+ data = {'trip_id': t['_id']}
+ data = update_labels(data, t['data']['user_input'], 'true')
+ datas.append(data)
+
+ else:
+ for label in t['data']['inferred_labels']:
+ data = {'trip_id': t['_id']}
+ data['pred_conf'] = label['p']
+ data = update_labels(data, label['labels'], 'pred')
+ data = update_labels(data, t['data']['user_input'], 'true')
+ datas.append(data)
+
+ df = pd.DataFrame(datas,
+ columns=[
+ 'user_id', 'trip_id', 'mode_pred', 'replaced_pred',
+ 'purpose_pred', 'tuple_pred', 'pred_conf',
+ 'mode_true', 'replaced_true', 'purpose_true',
+ 'tuple_true'
+ ])
+ df['user_id'] = user_id
+ if os_df:
+ df['os'] = os_df[os_df.user_id == user_id]['curr_platform'].item()
+ df['tuple_pred'] = df.mode_pred.astype(
+ str) + ', ' + df.purpose_pred.astype(
+ str) + ', ' + df.replaced_pred.astype(str)
+ df['tuple_true'] = df.mode_true.astype(
+ str) + ', ' + df.purpose_true.astype(
+ str) + ', ' + df.replaced_true.astype(str)
+
+ # df['tuple_pred'] = list(zip(df.mode_pred, df.purpose_pred, df.replaced_pred))
+ # df['tuple_true'] = list(zip(df.mode_true, df.purpose_true, df.replaced_true))
+
+ # indicates if the predicted label was the top choice (i.e. the first suggestion to the user)
+ df = add_top_pred(df)
+
+ return df
+
+
+def update_labels(data, user_input, label_type):
+ """ helper function to populate a dictionary with trip labels.
+
+ Args:
+ data (dict): dictionary that we want to populate
+ user_input (dict): the dictionary containing mode_confirm,
+ purpose_confirm, and replaced_mode information (e.g.
+ t['data']['user_input'] or t['data']['inferred_labels'][i] )
+ label_type (str): 'true' or 'pred'
+ """
+ if user_input != {}:
+ if 'mode_confirm' in user_input.keys():
+ data['mode_' + label_type] = user_input['mode_confirm']
+ if data['mode_' + label_type] == 'not_a_trip':
+ data['replaced_' + label_type] = 'not_a_trip'
+ data['purpose_' + label_type] = 'not_a_trip'
+
+ else:
+ if 'replaced_mode' in user_input.keys():
+ data['replaced_' +
+ label_type] = user_input['replaced_mode']
+ if 'purpose_confirm' in user_input.keys():
+ data['purpose_' +
+ label_type] = user_input['purpose_confirm']
+
+ return data
+
+
+def get_labels(trips):
+ """ helper function to get lists of trip labels from a list of trip dicts."""
+ mode_true = []
+ purpose_true = []
+ replaced_true = []
+
+ for t in trips:
+ if 'mode_confirm' in t['data']['user_input']:
+ mode_true.append(t['data']['user_input']['mode_confirm'])
+ else:
+ mode_true.append(None)
+
+ if 'purpose_confirm' in t['data']['user_input']:
+ purpose_true.append(t['data']['user_input']['purpose_confirm'])
+ else:
+ purpose_true.append(None)
+
+ if 'replaced_mode' in t['data']['user_input']:
+ replaced_true.append(t['data']['user_input']['replaced_mode'])
+ else:
+ replaced_true.append(None)
+
+ return mode_true, purpose_true, replaced_true
+
+
+def get_trip_index(trips):
+ """ helper function to get list of trip indices from a list of trip dicts."""
+ trip_indices = []
+ for t in trips:
+ trip_indices.append(t['_id'])
+
+ return trip_indices
+
+
+def expand_coords(exp_df, purpose=None):
+ """
+ copied and modifed from get_loc_df_for_purpose() in the 'Radius
+ selection' notebook
+ """
+ purpose_trips = exp_df
+ if purpose is not None:
+ purpose_trips = exp_df[exp_df.purpose_confirm == purpose]
+
+ dfs = [purpose_trips]
+ for loc_type in ['start', 'end']:
+ df = pd.DataFrame(
+ purpose_trips[loc_type +
+ "_loc"].apply(lambda p: p["coordinates"]).to_list(),
+ columns=[loc_type + "_lon", loc_type + "_lat"])
+ df = df.set_index(purpose_trips.index)
+ dfs.append(df)
+
+ # display.display(end_loc_df.head())
+ return pd.concat(dfs, axis=1)
\ No newline at end of file
diff --git a/emission/analysis/modelling/trip_model/mapping.py b/emission/analysis/modelling/trip_model/mapping.py
new file mode 100644
index 000000000..06f0614ea
--- /dev/null
+++ b/emission/analysis/modelling/trip_model/mapping.py
@@ -0,0 +1,429 @@
+# This file contains helper functions for plotting maps.
+import pandas as pd
+import numpy as np
+
+import folium
+import branca.element as bre
+from scipy.spatial import ConvexHull
+
+import emission.analysis.modelling.trip_model.data_wrangling as eamtd
+from emission.analysis.modelling.trip_model.clustering import add_loc_clusters, ALG_OPTIONS
+
+DENVER_COORD = [39.7392, -104.9903]
+MTV_COORD = [37.3861, -122.0839]
+CLM_COORD = [34.0967, -117.7198]
+
+# list of valid default colors in folium
+COLORS = [
+ 'darkred',
+ 'orange',
+ 'gray',
+ # 'green', # reserved for correct labels
+ 'darkblue',
+ # 'lightblue', # too hard to see on map
+ 'purple',
+ # 'pink', # too hard to see on map
+ 'darkgreen',
+ 'lightgreen',
+ # 'darkpurple', # this color does not exist?
+ 'cadetblue',
+ # 'lightgray', # too hard to see point on map
+ # 'black', # reserved for no_pred
+ 'blue',
+ # 'red', # reserved for noise/unlabeled data/incorrect labels
+ # 'lightred', # this color does not exist in folium?
+ # 'beige', # too hard to see point on map
+]
+
+
+def find_plot_clusters(user_df,
+ user_entry,
+ loc_type,
+ alg,
+ clustering_way,
+ SVM=False,
+ radii=[50, 100, 150, 200],
+ cluster_unlabeled=False,
+ plot_unlabeled=False,
+ optics_min_samples=None,
+ optics_xi=0.05,
+ optics_cluster_method='xi',
+ svm_min_size=6,
+ svm_purity_thresh=0.7,
+ svm_gamma=0.05,
+ svm_C=1,
+ map_loc=MTV_COORD):
+ """ Plot points and clusters on a folium map.
+
+ Points with the same purpose will have the same color (unless there are more purposes than available colors in folium, in which case some colors may be duplicated). Hovering over a point will also reveal the purpose in the tooltip.
+
+ The clusters are visualized as convex hulls; their color doesn't mean anything for now, it's simply so we can distinguish between distinct clusters (which will be helpful when clusters overlap).
+
+ Args:
+ user_df (dataframe): must contain the following columns:
+ 'start_loc', 'end_loc', 'user_input'
+ loc_type (str): 'start' or 'end', the type of points to cluster
+ alg (str): the clustering algorithm to be used. must be one of the
+ following: 'DBSCAN', 'naive', 'OPTICS', 'SVM', 'fuzzy' or
+ 'mean_shift'
+ clustering_way(str): 'origin'or 'destination' or 'origin-destination'.
+ Decides the way we can cluster trips geospatially.
+ SVM (bool): whether or not to sub-divide clusters with SVM
+ radii (int list): list of radii to pass to the clustering alg
+ cluster_unlabeled (bool): whether or not unlabeled points are used
+ to generate clusters.
+ plot_unlabeled (bool): whether or not to plot unlabeled points. If
+ True, they will be plotted as red points.
+ optics_min_samples (int): number of min samples if using the OPTICS
+ algorithm.
+ optics_xi (float): xi value if using the xi method of the OPTICS algorithm.
+ optics_cluster_method (str): method to use for the OPTICS
+ algorithm. either 'xi' or 'dbscan'
+ svm_min_size (int): the min number of trips a cluster must have to
+ be considered for sub-division, if using SVM
+ svm_purity_thresh (float): the min purity a cluster must have to be
+ sub-divided, if using SVM
+ svm_gamma (float): if using SVM, the gamma hyperparameter
+ svm_C (float): if using SVM, the C hyperparameter
+ map_loc (array-like): lat and lon coordinate for the default folium
+ map position.
+
+ """
+ # TODO: refactor to take in kwargs so we can remove the mess of optics_*
+ # variables that I was using when manually tuning that algorithm
+ assert loc_type == 'start' or loc_type == 'end'
+ assert 'start_loc' in user_df.columns
+ assert 'end_loc' in user_df.columns
+ assert 'user_input' in user_df.columns
+ assert clustering_way in ['origin','destination','origin-destination']
+ assert alg in ALG_OPTIONS
+
+ fig = bre.Figure(figsize=(20, 20))
+ fig_index = 0
+
+ # clean up the dataframe by dropping entries with NaN locations and
+ # reset index (because naive needs the position of each trip to match
+ # its nominal index)
+ all_trips_df = user_df.dropna(subset=['start_loc', 'end_loc']).reset_index(
+ drop=True)
+
+ # expand the 'start_loc' and 'end_loc' column into 'start_lat',
+ # 'start_lon', 'end_lat', and 'end_lon' columns
+ all_trips_df = eamtd.expand_coords(all_trips_df)
+
+ labeled_trips_df = all_trips_df.loc[all_trips_df.user_input != {}].dropna(
+ subset=['purpose_confirm'])
+
+ if cluster_unlabeled:
+ df_for_cluster = all_trips_df
+ else:
+ df_for_cluster = labeled_trips_df
+
+ df_for_cluster = add_loc_clusters(
+ df_for_cluster,
+ user_entry,
+ clustering_way,
+ radii=radii,
+ alg=alg,
+ SVM=SVM,
+ # cluster_unlabeled=cluster_unlabeled,
+ loc_type=loc_type,
+ min_samples=1,
+ optics_min_samples=optics_min_samples,
+ optics_xi=optics_xi,
+ optics_cluster_method=optics_cluster_method,
+ svm_min_size=svm_min_size,
+ svm_purity_thresh=svm_purity_thresh,
+ svm_gamma=svm_gamma,
+ svm_C=svm_C)
+
+ for r in radii:
+ fig_index = fig_index + 1
+ m = folium.Map(
+ location=map_loc,
+ zoom_start=12,
+ tiles=
+ 'https://{s}.basemaps.cartocdn.com/rastertiles/voyager_nolabels/{z}/{x}/{y}{r}.png',
+ attr=
+ '© OpenStreetMap contributors © CARTO'
+ )
+ folium.TileLayer(
+ tiles=
+ 'https://stamen-tiles-{s}.a.ssl.fastly.net/toner-lines/{z}/{x}/{y}{r}.png',
+ attr=
+ 'Map tiles by Stamen Design, CC BY 3.0 — Map data © OpenStreetMap contributors'
+ ).add_to(m)
+ # folium.TileLayer('Stamen Toner').add_to(m)
+
+ cluster_ids = df_for_cluster[
+ f"{loc_type}_{alg}_clusters_{r}_m"].unique()
+
+ # draw the convex hull of the clusters
+ for i in range(len(cluster_ids)):
+ c = cluster_ids[i]
+ if c == -1:
+ print(
+ 'we should never get here because we want to convert the -1 cluster into single-trip clusters'
+ )
+ continue
+
+ points_in_cluster = df_for_cluster[
+ df_for_cluster[f"{loc_type}_{alg}_clusters_{r}_m"] == c]
+
+ if np.isnan(c):
+ # if False:
+ if len(points_in_cluster) == 0:
+ continue
+ else:
+ print(points_in_cluster)
+ print(df_for_cluster[df_for_cluster[
+ f"{loc_type}_{alg}_clusters_{r}_m"].isnull()])
+ raise Exception(
+ 'nan cluster detected; all trips should have a proper cluster index'
+ )
+ m = plot_cluster_border(
+ points_in_cluster,
+ loc_type=loc_type,
+ m=m,
+ color='gray',
+ # color=COLORS[i % (len(COLORS) - 1)],
+ cluster_idx=c)
+
+ # plot all the destinations, color-coordinated by purpose
+ # we want to plot these on *top* of the cluster circles so that we can
+ # hover over the points and see the purpose on the tooltip
+ m = plot_user_trips(user_df,
+ loc_type,
+ plot_100=False,
+ plot_unlabeled=plot_unlabeled,
+ m=m)
+
+ # add plot to the figure
+ fig.add_subplot(len(radii) / 2 + len(radii) % 2, 2,
+ fig_index).add_child(m)
+
+ return fig
+
+
+def plot_model_clusters(
+ model,
+ category,
+ # purpose_col='purpose_confirm',
+ m=None,
+ map_loc=CLM_COORD):
+ """ category (str): 'test' or 'train' """
+ loc_type = 'end'
+
+ if m == None:
+ m = folium.Map(location=map_loc, zoom_start=12)
+
+ if category == 'test':
+ df = model.test_df
+ elif category == 'train':
+ df = model.train_df
+
+ cluster_ids = df['final_cluster_idx'].unique()
+
+ # draw the convex hull of the clusters
+ for i in range(len(cluster_ids)):
+ c = cluster_ids[i]
+ if c == -1:
+ print(
+ 'we should never get here because we want to convert the -1 cluster into single-trip clusters'
+ )
+ continue
+
+ points_in_cluster = df[df['final_cluster_idx'] == c]
+
+ if np.isnan(c):
+ print(points_in_cluster)
+ print(df[df['final_cluster_idx'].isnull()])
+ raise Exception(
+ 'nan cluster detected; all trips should have a proper cluster index'
+ )
+ m = plot_cluster_border(points_in_cluster,
+ loc_type=loc_type,
+ m=m,
+ color=COLORS[i % (len(COLORS) - 1)],
+ cluster_idx=c)
+
+ # plot all the destinations, color-coordinated by purpose
+ # we want to plot these on *top* of the cluster circles so that we can
+ # hover over the points and see the purpose on the tooltip
+ # m = plot_user_trips(df, loc_type, plot_100=False, plot_unlabeled=True, m=m)
+
+ return m
+
+
+def plot_user_trips(user_df,
+ loc_type,
+ plot_100=True,
+ plot_500=False,
+ plot_unlabeled=False,
+ purpose_col='purpose_confirm',
+ color=None,
+ m=None):
+ """ Args:
+ user_df (dataframe): must contain the columns 'start/end_lat/lon'
+ loc_type (str): 'start' or 'end'
+ plot_100 (bool): whether or not to plot 100m radius circles around
+ each location point
+ plot_500 (bool): whether or not to plot 500m radius circles around
+ each location point
+ plot_unlabeled (bool): whether or not to plot unlabeled points (if
+ so, they will be red)
+ m (folium.Map): optional, an existing map onto which this function
+ will plot markers
+ """
+ assert loc_type == 'start' or loc_type == 'end'
+
+ if m is None:
+ m = folium.Map(location=MTV_COORD, zoom_start=13)
+
+ purpose_list = user_df[purpose_col].dropna().unique()
+
+ # plot circles with a 500m radius around each point
+ if plot_500:
+ for i, purpose in enumerate(purpose_list):
+ if color is None and i < len(COLORS):
+ color = COLORS[i]
+ elif color is None:
+ color = COLORS[len(COLORS) - 1]
+ purpose_trips = user_df[user_df[purpose_col] == purpose]
+ for j in range(len(purpose_trips)):
+ coords = purpose_trips[loc_type +
+ '_loc'].iloc[j]['coordinates']
+ folium.Circle([coords[1], coords[0]],
+ radius=500,
+ color=color,
+ opacity=0.2,
+ fill=True,
+ fill_opacity=0.1,
+ weight=1).add_to(m)
+ if plot_unlabeled:
+ unlabeled_trips = user_df[user_df[purpose_col].isna()]
+ for j in range(len(unlabeled_trips)):
+ coords = unlabeled_trips[loc_type +
+ '_loc'].iloc[j]['coordinates']
+ folium.Circle([coords[1], coords[0]],
+ radius=500,
+ color='red',
+ opacity=0.2,
+ fill=True,
+ fill_opacity=0.1,
+ weight=1).add_to(m)
+
+
+# plot circles with a 100m radius around each point
+ if plot_100:
+ for i, purpose in enumerate(purpose_list):
+ if i < len(COLORS):
+ color = COLORS[i]
+ else:
+ color = COLORS[len(COLORS) - 1]
+ purpose_trips = user_df[user_df[purpose_col] == purpose]
+ for j in range(len(purpose_trips)):
+ coords = purpose_trips[loc_type +
+ '_loc'].iloc[j]['coordinates']
+ folium.Circle([coords[1], coords[0]],
+ radius=100,
+ color=color,
+ opacity=0.2,
+ fill=True,
+ fill_opacity=0.1,
+ weight=1).add_to(m)
+ if plot_unlabeled:
+ unlabeled_trips = user_df[user_df[purpose_col].isna()]
+ for j in range(len(unlabeled_trips)):
+ coords = unlabeled_trips[loc_type +
+ '_loc'].iloc[j]['coordinates']
+ folium.Circle([coords[1], coords[0]],
+ radius=100,
+ color='red',
+ opacity=0.2,
+ fill=True,
+ fill_opacity=0.1,
+ weight=1).add_to(m)
+
+ # plot small circle marker on the very top so it doesn't get obscured by
+ # the layers of 100m/500m circles
+ for i, purpose in enumerate(purpose_list):
+ if i < len(COLORS):
+ color = COLORS[i]
+ else:
+ color = COLORS[len(COLORS) - 1]
+ # print('{:<15} {:<15}'.format(color, purpose))
+ purpose_trips = user_df[user_df[purpose_col] == purpose]
+ # print(purpose_trips)
+ for j in range(len(purpose_trips)):
+ coords = purpose_trips[loc_type + '_loc'].iloc[j]['coordinates']
+ # print(purpose_trips.iloc[j])
+ # print(purpose_trips.iloc[j].index)
+ trip_idx = purpose_trips.iloc[j].name
+ folium.CircleMarker([coords[1], coords[0]],
+ radius=2.5,
+ color=color,
+ tooltip=purpose + ' ' +
+ str(trip_idx)).add_to(m)
+ if plot_unlabeled:
+ unlabeled_trips = user_df[user_df[purpose_col].isna()]
+ for j in range(len(unlabeled_trips)):
+ coords = unlabeled_trips[loc_type + '_loc'].iloc[j]['coordinates']
+ trip_idx = unlabeled_trips.iloc[j].name
+ folium.CircleMarker([coords[1], coords[0]],
+ radius=2.5,
+ color='red',
+ tooltip='UNLABELED' + ' ' +
+ str(trip_idx)).add_to(m)
+
+ return m
+
+
+def plot_cluster_border(points_df,
+ loc_type,
+ m=None,
+ color='green',
+ cluster_idx=None):
+ """ plots a convex hull around the given points.
+
+ Args:
+ points_df: dataframe with columns 'xxx_lat' and 'xxx_lon'
+ loc_type (str): 'start' or 'end', the type of points to cluster
+ m (folium.Map): optional, an existing map onto which this function
+ will plot markers
+ color (str): cluster color. must be valid in folium.
+ cluster_idx (int): cluster index, to be added to tooltip
+ """
+ assert loc_type == 'start' or loc_type == 'end'
+ if m is None:
+ m = folium.Map(location=MTV_COORD, zoom_start=12)
+
+ lats = points_df[loc_type + '_lat'].tolist()
+ lons = points_df[loc_type + '_lon'].tolist()
+ points = np.array([lats, lons]).T
+
+ if len(points) > 2:
+ hull = ConvexHull(points)
+ border_points = points[hull.vertices]
+ else:
+ border_points = points
+
+ if cluster_idx is not None:
+ folium.Polygon(
+ border_points, # list of points (latitude, longitude)
+ color=color,
+ weight=15,
+ opacity=0.6,
+ fill=True,
+ fill_opacity=0.5,
+ tooltip=f'cluster {cluster_idx}').add_to(m)
+ else:
+ folium.Polygon(
+ border_points, # list of points (latitude, longitude)
+ color=color,
+ weight=20,
+ opacity=0.6,
+ fill=True,
+ fill_opacity=0.5).add_to(m)
+
+ return m
diff --git a/emission/analysis/modelling/trip_model/models.py b/emission/analysis/modelling/trip_model/models.py
new file mode 100644
index 000000000..e5fc08b46
--- /dev/null
+++ b/emission/analysis/modelling/trip_model/models.py
@@ -0,0 +1,2092 @@
+import pandas as pd
+import numpy as np
+from abc import ABCMeta, abstractmethod # to define abstract class "blueprints"
+import logging
+import copy
+
+# sklearn imports
+from sklearn.pipeline import make_pipeline
+from sklearn.preprocessing import StandardScaler, OneHotEncoder
+from sklearn.impute import SimpleImputer
+from sklearn.metrics.pairwise import haversine_distances
+from sklearn.cluster import DBSCAN
+from sklearn import svm
+from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
+from sklearn.tree import DecisionTreeClassifier
+from sklearn.exceptions import NotFittedError
+
+# our imports
+from emission.analysis.modelling.trip_model.clustering import get_distance_matrix, single_cluster_purity
+import emission.analysis.modelling.trip_model.data_wrangling as eamtd
+import emission.storage.decorations.trip_queries as esdtq
+from emission.analysis.classification.inference.labels.inferrers import predict_cluster_confidence_discounting
+import emission.core.wrapper.entry as ecwe
+import emission.analysis.modelling.trip_model.greedy_similarity_binning as eamtg
+import emission.core.common as ecc
+import emission.analysis.modelling.trip_model.model_storage as eamums
+import emission.analysis.modelling.trip_model.model_type as eamumt
+import emission.analysis.modelling.trip_model.run_model as eamur
+
+
+import emission.analysis.modelling.trip_model.clustering as eamtc
+# NOTE: tour_model_extended.similarity is on the
+# eval-private-data-compatibility branch in e-mission-server
+
+# logging.basicConfig(level=logging.DEBUG)
+
+EARTH_RADIUS = 6371000
+
+#############################
+## define abstract classes ##
+#############################
+
+
+class SetupMixin(metaclass=ABCMeta):
+ """ class containing code to be reused when setting up estimators. """
+
+ @abstractmethod
+ def set_params(self, params):
+ """ Set the parameters of the estimator.
+
+ Args:
+ params (dict): dictionary where the keys are the param names
+ (strings) and the values are the parameter inputs
+
+ Returns:
+ self
+ """
+ raise NotImplementedError
+
+ def _clean_data(self, df):
+ """ Clean a dataframe of trips.
+ (Drop trips with missing start/end locations, expand the user input
+ columns, ensure all essential columns are present)
+
+ Args:
+ df: a dataframe of trips. must contain the columns 'start_loc',
+ 'end_loc', and should also contain the user input columns
+ ('mode_confirm', 'purpose_confirm', 'replaced_mode') if
+ available
+ """
+ assert 'start_loc' in df.columns and 'end_loc' in df.columns
+
+ # clean up the dataframe by dropping entries with NaN locations and
+ # reset index
+ num_nan = 0
+ if df.start_loc.isna().any():
+ num_nan += df.start_loc.value_counts(dropna=False).loc[np.nan]
+ df = df.dropna(subset=['start_loc'])
+ if df.end_loc.isna().any():
+ num_nan += df.end_loc.value_counts(dropna=False).loc[np.nan]
+ df = df.dropna(subset=['end_loc'])
+
+ # expand the 'start_loc' and 'end_loc' column into 'start_lat',
+ # 'start_lon', 'end_lat', and 'end_lon' columns
+ df = eamtd.expand_coords(df)
+
+ # drop trips with missing coordinates
+ if df.start_lat.isna().any():
+ num_nan += df.start_lat.value_counts(dropna=False).loc[np.nan]
+ df = df.dropna(subset=['start_lat'])
+ if df.start_lon.isna().any():
+ num_nan += df.start_lon.value_counts(dropna=False).loc[np.nan]
+ df = df.dropna(subset=['start_lon'])
+ if df.end_lat.isna().any():
+ num_nan += df.end_lat.value_counts(dropna=False).loc[np.nan]
+ df = df.dropna(subset=['end_lat'])
+ if df.end_lon.isna().any():
+ num_nan = df.end_lon.value_counts(dropna=False).loc[np.nan]
+ df += df.dropna(subset=['end_lon'])
+ if num_nan > 0:
+ logging.info(
+ f'dropped {num_nan} trips that are missing location coordinates'
+ )
+
+ df = df.rename(
+ columns={
+ 'mode_confirm': 'mode_true',
+ 'purpose_confirm': 'purpose_true',
+ 'replaced_mode': 'replaced_true'
+ })
+
+ for category in ['mode_true', 'purpose_true', 'replaced_true']:
+ if category not in df.columns:
+ # for example, if a user labels all their trip modes but none of their trip purposes
+ df.loc[:, category] = np.nan
+
+ return df.reset_index(drop=True)
+
+
+class Cluster(SetupMixin, metaclass=ABCMeta):
+ """ blueprint for clustering models. """
+
+ @abstractmethod
+ def fit(self, train_df,train_entry_list):
+ """ Fit the clustering algorithm.
+
+ Args:
+ train_df (DataFrame): dataframe of labeled trips
+ train_entry_list (List) : A list of trips where each element is of Entry type
+ Returns:
+ self
+ """
+ raise NotImplementedError
+
+ @abstractmethod
+ def predict(self, test_df):
+ """ Predict cluster indices for trips, if possible. Trips that could
+ not be clustered will have the index -1.
+
+ Args:
+ test_df (DataFrame): dataframe of test trips
+
+ Returns:
+ pd DataFrame containing one column, 'start_cluster_idx' or
+ 'end_cluster_idx'
+ """
+ raise NotImplementedError
+
+ def fit_predict(self, train_df):
+ """ Fit the clustering algorithm and predict cluster indices for trips,
+ if possible. Trips that could not be clustered will have the index -1.
+
+ Args:
+ train_df (DataFrame): dataframe of labeled trips
+
+ Returns:
+ pd DataFrame containing one column, 'start_cluster_idx' or
+ 'end_cluster_idx'
+ """
+ self.fit(train_df)
+ return self.predict(train_df)
+
+
+class TripClassifier(SetupMixin, metaclass=ABCMeta):
+
+ @abstractmethod
+ def fit(self, train_df,unused=None):
+ """ Fit a classification model.
+
+ Args:
+ train_df (DataFrame): dataframe of labeled trips
+ unused (List) : A list of Entry type of labeled and unlabeled trips which is not used in current function.
+ Passed to keep fit function generic.
+ Returns:
+ self
+ """
+ raise NotImplementedError
+
+ def predict(self, test_df):
+ """ Predict trip labels.
+
+ Args:
+ test_df (DataFrame): dataframe of trips
+
+ Returns:
+ DataFrame containing the following columns:
+ 'purpose_pred', 'mode_pred', 'replaced_pred',
+ 'purpose_proba', 'mode_proba', 'replaced_proba'
+ the *_pred columns contain the most-likely label prediction
+ (string for a label or float for np.nan).
+ the *_proba columns contain the probability of the most-likely
+ prediction.
+ """
+ proba_df = self.predict_proba(test_df)
+ prediction_df = proba_df.loc[:, [('purpose', 'top_pred'),
+ ('purpose', 'top_proba'),
+ ('mode', 'top_pred'),
+ ('mode', 'top_proba'),
+ ('replaced', 'top_pred'),
+ ('replaced', 'top_proba')]]
+
+ prediction_df.columns = prediction_df.columns.to_flat_index()
+ prediction_df = prediction_df.rename(
+ columns={
+ ('purpose', 'top_pred'): 'purpose_pred',
+ ('purpose', 'top_proba'): 'purpose_proba',
+ ('mode', 'top_pred'): 'mode_pred',
+ ('mode', 'top_proba'): 'mode_proba',
+ ('replaced', 'top_pred'): 'replaced_pred',
+ ('replaced', 'top_proba'): 'replaced_proba',
+ })
+
+ return prediction_df
+
+ def fit_predict(self, train_df):
+ """ Fit a classification model and predict trip labels.
+
+ Args:
+ train_df (DataFrame): dataframe of labeled trips
+
+ Returns:
+ DataFrame containing the following columns:
+ 'purpose_pred', 'mode_pred', 'replaced_pred',
+ 'purpose_proba', 'mode_proba', 'replaced_proba'
+ the *_pred columns contain the most-likely label prediction
+ (string for a label or float for np.nan).
+ the *_proba columns contain the probability of the most-likely
+ prediction.
+ """
+ self.fit(train_df)
+ return self.predict(train_df)
+
+ @abstractmethod
+ def predict_proba(self, test_df):
+ """ Predict class probabilities for each trip.
+
+ NOTE: check the specific model to see if the class probabilities
+ have confidence-discounting or not.
+
+ Args:
+ test_df (DataFrame): dataframe of trips
+
+ Returns:
+ DataFrame with multiindexing. Each row represents a trip. There
+ are 3 columns at level 1, one for each label category
+ ('purpose', 'mode', 'replaced'). Within each category, there is
+ a column for each label, with the row's entry being the
+ probability that the trip has the label. There are three
+ additional columns within each category, one indicating the
+ most-likely label, one indicating the probability of the
+ most-likely label, and one indicating whether or not the trip
+ can be clustered.
+ TODO: add a fourth optional column for the number of trips in
+ the cluster (if clusterable)
+
+ Level 1 columns are: purpose, mode, replaced
+ Lebel 2 columns are:
+ , , ... top_pred, top_proba, clusterable
+ , , ... top_pred, top_proba, clusterable
+ , , ... top_pred, top_proba, clusterable
+ """
+ raise NotImplementedError
+
+
+########################
+## clustering classes ##
+########################
+
+
+class RefactoredNaiveCluster(Cluster):
+ """ Naive fixed-width clustering algorithm.
+ Refactored from the existing Similarity class to take in dataframes for
+ consistency, and allows for separate clustering of start and end
+ clusters.
+
+ WARNING: this algorithm is *extremely* slow.
+
+ Args:
+ loc_type (str): 'start' or 'end', the type of point to cluster
+ radius (int): max distance between all pairs of points in a
+ cluster, i.e. strict maximum cluster width.
+
+ Attributes:
+ loc_type (str)
+ radius (int)
+ train_df (DataFrame)
+ test_df (DataFrame)
+ sim_model (Similarity object)
+ """
+
+ def __init__(self, loc_type='end', radius=100):
+ logging.info("PERF: Initializing RefactoredNaiveCluster")
+ self.loc_type = loc_type
+ self.radius = radius
+
+ def set_params(self, params):
+ if 'loc_type' in params.keys(): self.loc_type = params['loc_type']
+ if 'radius' in params.keys(): self.radius = params['radius']
+
+ return self
+
+ def fit(self, unused,train_entry_list=None):
+ # clean data
+ logging.info("PERF: Fitting RefactoredNaiveCluster with size %s" % len(unused))
+ self.train_df = self._clean_data(unused)
+
+ # we can use all trips as long as they have purpose labels. it's ok if
+ # they're missing mode/replaced-mode labels, because they aren't as
+ # strongly correlated with location compared to purpose
+ # TODO: actually, we may want to rethink this. for example, it will
+ # probably be helpful to include trips that are missing purpose labels
+ # but still have mode labels.
+ if self.train_df.purpose_true.isna().any():
+ num_nan = self.train_df.purpose_true.value_counts(
+ dropna=False).loc[np.nan]
+ logging.info(
+ f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels'
+ )
+ self.train_df = self.train_df.dropna(
+ subset=['purpose_true']).reset_index(drop=True)
+ if len(self.train_df) == 0:
+ # i.e. no valid trips after removing all nans
+ raise Exception('no valid trips; nothing to fit')
+
+ model_config = {
+ "metric": "od_similarity",
+ "similarity_threshold_meters": self.radius, # meters,
+ "apply_cutoff": False,
+ "clustering_way":'origin' if self.loc_type=='start'
+ else 'destination' if self.loc_type =='end'
+ else 'origin-destination',
+ "incremental_evaluation": False
+ }
+
+ # fit the bins
+ self.sim_model= eamtg.GreedySimilarityBinning(model_config)
+ cleaned_trip_entry= eamtc.cleanEntryTypeData(self.train_df,train_entry_list)
+ self.sim_model.fit(cleaned_trip_entry)
+
+ labels = [int(l) for l in self.sim_model.tripLabels]
+ self.train_df.loc[:, f'{self.loc_type}_cluster_idx'] = labels
+ return self
+
+ def predict(self, test_df):
+ logging.info("PERF: Predicting RefactoredNaiveCluster for %s" % len(test_df))
+ self.test_df = self._clean_data(test_df)
+
+ if self.loc_type == 'start':
+ bins = self.sim_model.bins
+ elif self.loc_type == 'end':
+ bins = self.sim_model.bins
+
+ # This looks weird but works
+ # >>> x = [(1, 'a'), (2, 'b'), (3, 'c')]
+ # >>> {int(key):value for key,value in x}
+ # {1: 'a', 2: 'b', 3: 'c'}
+ #
+ # bins = { '1': [ 'key1': [] , 'key2' :[],.. ....],
+ # '2': ['key1': [] , 'key2' :[],...],
+ # '3': ['key1': [] , 'key2' :[],.....] ...}
+ #
+ # the code below converts above to
+ #
+ # bins = { 1: [ 'key1': [] , 'key2' :[],.. ....],
+ # 2: ['key1': [] , 'key2' :[],...],
+ # 3: ['key1': [] , 'key2' :[],.....] ....}
+ #
+ # This is why it works :
+ # 1. Iterate over (key,value) pairs in 'bins.items()'
+ # 2. for each pair, 'key' is a string . so use int(key) to convert it into an integer.
+ # 3. Create a new dictionary(using {} within the dictionary comprehension)
+ # where the keys are now integers and the values are same
+
+ bins = {int(key):value for key,value in bins.items()}
+ labels = []
+
+ # for each trip in the test list:
+ for idx, row in self.test_df.iterrows():
+ if idx % 100 == 0:
+ logging.info("PERF: RefactoredNaiveCluster Working on trip %s/%s" % (idx, len(self.test_df)))
+ # iterate over all bins
+ trip_binned = False
+ for i in bins:
+ # check if the trip can fit in the bin
+ # if so, get the bin index.
+ #
+ # 'feature_rows' is the key that contains the list of list where
+ # each of the inner list takes the form :
+ #
+ # [ start_lon,start_lat,end_lon,end_lat]
+ if self._match(row, bins[i]['feature_rows'], self.loc_type):
+ labels += [i]
+ trip_binned = True
+ break
+ # if not, return -1
+ if not trip_binned:
+ labels += [-1]
+
+ self.test_df.loc[:, f'{self.loc_type}_cluster_idx'] = labels
+
+ return self.test_df[[f'{self.loc_type}_cluster_idx']]
+
+ def _match(self, trip, bin, loc_type):
+ """ Check if a trip can fit into an existing bin.
+
+ copied from the Similarity class on the e-mission-server.
+ """
+ for trip_in_bin in bin:
+ if not self._distance_helper(trip, trip_in_bin, loc_type):
+ return False
+ return True
+
+ def _distance_helper(self, tripa, tripb, loc_type):
+ """ Check if two trips have start/end points within the distance
+ threshold.
+ """
+ #tripa is taken from the test datframe.
+ #tripb is taken from the stored bin list.
+ pta_lat = tripa[[loc_type + '_lat']]
+ pta_lon = tripa[[loc_type + '_lon']]
+ if loc_type == 'start':
+ ptb_lat = tripb[1]
+ ptb_lon = tripb[0]
+ elif loc_type == 'end':
+ ptb_lat = tripb[3]
+ ptb_lon = tripb[2]
+
+ dist= ecc.calDistance([pta_lon,pta_lat],[ptb_lon,ptb_lat])
+ return dist <= self.radius
+
+
+class DBSCANSVMCluster(Cluster):
+ """ DBSCAN-based clustering algorithm that optionally implements SVM
+ sub-clustering.
+
+ Args:
+ loc_type (str): 'start' or 'end', the type of point to cluster
+ radius (int): max distance between two points in each other's
+ neighborhood, i.e. DBSCAN's eps value. does not strictly
+ dictate final cluster size
+ size_thresh (int): the min number of trips a cluster must have
+ to be considered for SVM sub-division
+ purity_thresh (float): the min purity a cluster must have
+ to be sub-divided using SVM
+ gamma (float): coefficient for the rbf kernel in SVM
+ C (float): regularization hyperparameter for SVM
+
+ Attributes:
+ loc_type (str)
+ radius (int)
+ size_thresh (int)
+ purity_thresh (float)
+ gamma (float)
+ C (float)
+ train_df (DataFrame)
+ test_df (DataFrame)
+ base_model (sklearn Estimator)
+ """
+
+ def __init__(self,
+ loc_type='end',
+ radius=100,
+ svm=True,
+ size_thresh=1,
+ purity_thresh=1.0,
+ gamma=0.05,
+ C=1):
+ logging.info("PERF: Initializing DBSCANSVMCluster")
+ self.loc_type = loc_type
+ self.radius = radius
+ self.svm = svm
+ self.size_thresh = size_thresh
+ self.purity_thresh = purity_thresh
+ self.gamma = gamma
+ self.C = C
+
+ def set_params(self, params):
+ if 'loc_type' in params.keys(): self.loc_type = params['loc_type']
+ if 'radius' in params.keys(): self.radius = params['radius']
+ if 'svm' in params.keys(): self.svm = params['svm']
+ if 'size_thresh' in params.keys():
+ self.size_thresh = params['size_thresh']
+ if 'purity_thresh' in params.keys():
+ self.purity_thresh = params['purity_thresh']
+ if 'gamma' in params.keys(): self.gamma = params['gamma']
+
+ return self
+
+ def fit(self, train_df,unused=None):
+ """ Creates clusters of trip points.
+ self.train_df will be updated with columns containing base and
+ final clusters.
+
+ TODO: perhaps move the loc_type argument to fit() so we can use a
+ single class instance to cluster both start and end points. This
+ will also help us reduce duplicate data.
+
+ Args:
+ train_df (dataframe): dataframe of labeled trips
+ unused (List) : A list of Entry type of labeled and unlabeled trips which is not used in current function.
+ Passed to keep fit function generic. """
+ ##################
+ ### clean data ###
+ ##################
+ logging.info("PERF: Fitting DBSCANSVMCluster")
+ self.train_df = self._clean_data(train_df)
+
+ # we can use all trips as long as they have purpose labels. it's ok if
+ # they're missing mode/replaced-mode labels, because they aren't as
+ # strongly correlated with location compared to purpose
+ # TODO: actually, we may want to rethink this. for example, it will
+ # probably be helpful to include trips that are missing purpose labels
+ # but still have mode labels.
+ if self.train_df.purpose_true.isna().any():
+ num_nan = self.train_df.purpose_true.value_counts(
+ dropna=False).loc[np.nan]
+ logging.info(
+ f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels'
+ )
+ self.train_df = self.train_df.dropna(
+ subset=['purpose_true']).reset_index(drop=True)
+ if len(self.train_df) == 0:
+ # i.e. no valid trips after removing all nans
+ raise Exception('no valid trips; nothing to fit')
+
+ #########################
+ ### get base clusters ###
+ #########################
+ dist_matrix_meters = get_distance_matrix(self.train_df, self.loc_type)
+ self.base_model = DBSCAN(self.radius,
+ metric="precomputed",
+ min_samples=1).fit(dist_matrix_meters)
+ base_clusters = self.base_model.labels_
+
+ self.train_df.loc[:,
+ f'{self.loc_type}_base_cluster_idx'] = base_clusters
+
+ ########################
+ ### get sub-clusters ###
+ ########################
+ # copy base cluster column into final cluster column
+ self.train_df.loc[:, f'{self.loc_type}_cluster_idx'] = self.train_df[
+ f'{self.loc_type}_base_cluster_idx']
+
+ if self.svm:
+ c = 0 # count of how many clusters we have iterated over
+
+ # iterate over all clusters and subdivide them with SVM. the while
+ # loop is so we can do multiple iterations of subdividing if needed
+ while c < self.train_df[f'{self.loc_type}_cluster_idx'].max():
+ points_in_cluster = self.train_df[
+ self.train_df[f'{self.loc_type}_cluster_idx'] == c]
+
+ # only do SVM if we have the minimum num of trips in the cluster
+ if len(points_in_cluster) < self.size_thresh:
+ c += 1
+ continue
+
+ # only do SVM if purity is below threshold
+ purity = single_cluster_purity(points_in_cluster,
+ label_col='purpose_true')
+ if purity < self.purity_thresh:
+ X = points_in_cluster[[
+ f"{self.loc_type}_lon", f"{self.loc_type}_lat"
+ ]]
+ y = points_in_cluster.purpose_true.to_list()
+
+ svm_model = make_pipeline(
+ StandardScaler(),
+ svm.SVC(
+ kernel='rbf',
+ gamma=self.gamma,
+ C=self.C,
+ )).fit(X, y)
+ labels = svm_model.predict(X)
+ unique_labels = np.unique(labels)
+
+ # if the SVM predicts that all points in the cluster have
+ # the same label, just ignore it and don't reindex.
+ # this also helps us to handle the possibility that a
+ # cluster may be impure but inherently inseparable, e.g. an
+ # end cluster at a user's home, containing 50% trips from
+ # work to home and 50% round trips that start and end at
+ # home. we don't want to reindex otherwise the low purity
+ # will trigger SVM again, and we will attempt & fail to
+ # split the cluster ad infinitum
+ if len(unique_labels) > 1:
+ # map purpose labels to new cluster indices
+ # we offset indices by the max existing index so that we
+ # don't run into any duplicate indices
+ max_existing_idx = self.train_df[
+ f'{self.loc_type}_cluster_idx'].max()
+ label_to_cluster = {
+ unique_labels[i]: i + max_existing_idx + 1
+ for i in range(len(unique_labels))
+ }
+ # update trips with their new cluster indices
+ indices = np.array(
+ [label_to_cluster[l] for l in labels])
+ self.train_df.loc[
+ self.train_df[f'{self.loc_type}_cluster_idx'] == c,
+ f'{self.loc_type}_cluster_idx'] = indices
+
+ c += 1
+ # TODO: make things categorical at the end? or maybe at the start of the decision tree pipeline
+
+ return self
+
+ def fit_predict(self, train_df):
+ """ Override to avoid unnecessarily computation of distance matrices.
+ """
+ self.fit(train_df)
+ return self.train_df[[f'{self.loc_type}_cluster_idx']]
+
+ def predict(self, test_df):
+ logging.info("PERF: Predicting DBSCANSVMCluster")
+ # TODO: store clusters as polygons so the prediction is faster
+ # TODO: we probably don't want to store test_df in self to be more memory-efficient
+ self.test_df = self._clean_data(test_df)
+ pred_clusters = self._NN_predict(self.test_df)
+
+ self.test_df.loc[:, f'{self.loc_type}_cluster_idx'] = pred_clusters
+
+ return self.test_df[[f'{self.loc_type}_cluster_idx']]
+
+ def _NN_predict(self, test_df):
+ """ Generate base-cluster predictions for the test data using a
+ nearest-neighbor approach.
+
+ sklearn doesn't implement predict() for DBSCAN, which is why we
+ need a custom method.
+ """
+ logging.info("PERF: NN_predicting DBSCANSVMCluster")
+ n_samples = test_df.shape[0]
+ labels = np.ones(shape=n_samples, dtype=int) * -1
+
+ # get coordinates of core points (we can't use model.components_
+ # because our input feature was a distance matrix and doesn't contain
+ # info about the raw coordinates)
+ # NOTE: technically, every single point in a cluster is a core point
+ # because it has at least minPts (2) points, including itself, in its
+ # radius
+ train_coordinates = self.train_df[[
+ f'{self.loc_type}_lat', f'{self.loc_type}_lon'
+ ]]
+ train_radians = np.radians(train_coordinates)
+
+ for idx, row in test_df.reset_index(drop=True).iterrows():
+ # calculate the distances between the ith test data and all points,
+ # then find the index of the closest point. if the ith test data is
+ # within epsilon of the point, then assign its cluster to the ith
+ # test data (otherwise, leave it as -1, indicating noise).
+ # unfortunately, pairwise_distances_argmin() does not support
+ # haversine distance, so we have to reimplement it ourselves
+ new_loc_radians = np.radians(
+ row[[self.loc_type + "_lat", self.loc_type + "_lon"]].to_list())
+ new_loc_radians = np.reshape(new_loc_radians, (1, 2))
+ dist_matrix_meters = haversine_distances(
+ new_loc_radians, train_radians) * EARTH_RADIUS
+
+ shortest_dist_idx = np.argmin(dist_matrix_meters)
+ if dist_matrix_meters[0, shortest_dist_idx] < self.radius:
+ labels[idx] = self.train_df.reset_index(
+ drop=True).loc[shortest_dist_idx,
+ f'{self.loc_type}_cluster_idx']
+
+ return labels
+
+
+######################
+## trip classifiers ##
+######################
+
+
+class NaiveBinningClassifier(TripClassifier):
+ """ Trip classifier using the existing Similarity class and associated
+ functions without refactoring them. Essentially a wrapper for the
+ existing code on e-mission-server.
+
+ Args:
+ radius (int): maximum distance between any two points in the same
+ cluster
+ """
+
+ def __init__(self, radius=500):
+ logging.info("PERF: Initializing NaiveBinningClassifier")
+ self.radius = radius
+
+ def set_params(self, params):
+ if 'radius' in params.keys(): self.radius = params['radius']
+
+ return self
+
+ def fit(self, train_df,unused=None):
+ logging.info("PERF: Fitting NaiveBinningClassifier")
+ # (copied from bsm.build_user_model())
+
+ # convert train_df to a list because the existing binning algorithm
+ # only accepts lists of Entry objects
+ train_trips = self._trip_df_to_list(train_df)
+
+
+ model_config = {
+ "metric": "od_similarity",
+ "similarity_threshold_meters": self.radius, # meters,
+ "apply_cutoff": False,
+ "clustering_way": "origin-destination", #cause thats what is set in performance_eval.py for this model
+ "incremental_evaluation": False
+ }
+
+ sim_model = eamtg.GreedySimilarityBinning(model_config)
+ sim_model.fit(train_trips)
+ # set instance variables so we can access results later as well
+ self.sim = sim_model
+ self.bins = sim_model.bins
+
+ # save all user labels
+ user_id = train_df.user_id.iloc[0]
+ model_type=eamumt.ModelType.GREEDY_SIMILARITY_BINNING
+ model_storage=eamums.ModelStorage.DOCUMENT_DATABASE
+ model_data_next=sim_model.to_dict()
+ last_done_ts = eamur._latest_timestamp(train_trips)
+ eamums.save_model(user_id, model_type, model_data_next, last_done_ts, model_storage)
+
+ return self
+
+ def predict_proba(self, test_df):
+ """ NOTE: these class probabilities have the confidence-discounting
+ heuristic applied.
+ """
+ # convert test_df to a list because the existing binning algorithm
+ # only accepts lists of Entry objects
+ logging.info("PERF: Predicting NaiveBinningClassifier")
+ test_trips = self._trip_df_to_list(test_df)
+
+ purpose_distribs = []
+ mode_distribs = []
+ replaced_distribs = []
+
+ for trip in test_trips:
+ trip_prediction = predict_cluster_confidence_discounting(trip)
+
+ if len(trip_prediction) == 0:
+ # model could not find cluster for the trip
+ purpose_distribs += [{}]
+ mode_distribs += [{}]
+ replaced_distribs += [{}]
+
+ else:
+ trip_prediction_df = pd.DataFrame(trip_prediction).rename(
+ columns={'labels': 'user_input'})
+ # renaming is simply so we can use the expand_userinputs
+ # function
+
+ expand_prediction = esdtq.expand_userinputs(trip_prediction_df)
+ # converts the 'labels' dictionaries into individual columns
+
+ # sum up probability for each label
+ for label_type, label_distribs in zip(
+ ['purpose_confirm', 'mode_confirm', 'replaced_mode'],
+ [purpose_distribs, mode_distribs, replaced_distribs]):
+ label_distrib = {}
+ if label_type in expand_prediction.columns:
+ for label in expand_prediction[label_type].unique():
+ label_distrib[label] = expand_prediction.loc[
+ expand_prediction[label_type] == label,
+ 'p'].sum()
+ label_distribs += [label_distrib]
+
+ proba_dfs = []
+ for label_type, label_distribs in zip(
+ ['purpose', 'mode', 'replaced'],
+ [purpose_distribs, mode_distribs, replaced_distribs]):
+
+ proba = pd.DataFrame(label_distribs)
+ proba['clusterable'] = proba.sum(axis=1) > 0
+ proba['top_pred'] = proba.drop(columns=['clusterable']).idxmax(
+ axis=1)
+ proba['top_proba'] = proba.drop(
+ columns=['clusterable', 'top_pred']).max(axis=1, skipna=True)
+ classes = proba.columns[:-3]
+ proba.loc[:, classes] = proba.loc[:, classes].fillna(0)
+ proba = pd.concat([proba], keys=[label_type], axis=1)
+ proba_dfs += [proba]
+
+ self.proba_df = pd.concat(proba_dfs, axis=1)
+ return self.proba_df
+
+ def _trip_df_to_list(self, trip_df):
+ """ Converts a dataframe of trips into a list of trip Entry objects.
+
+ Allows this class to accept DataFrames (which are used by the new
+ clustering algorithms) without having to refactor the old
+ clustering algorithm.
+
+ Args:
+ trip_df: DataFrame containing trips. See code below for the
+ expected columns.
+
+ """
+ trips_list = []
+
+ for idx, row in trip_df.iterrows():
+ data = {
+ 'source': row['source'],
+ 'end_ts': row['end_ts'],
+ # 'end_local_dt':row['end_local_dt'], # this attribute doesn't seem to appear in the dataframes I've tested with
+ 'end_fmt_time': row['end_fmt_time'],
+ 'end_loc': row['end_loc'],
+ 'raw_trip': row['raw_trip'],
+ 'start_ts': row['start_ts'],
+ # 'start_local_dt':row['start_local_dt'], # this attribute doesn't seem to appear in the dataframes I've tested with
+ 'start_fmt_time': row['start_fmt_time'],
+ 'start_loc': row['start_loc'],
+ 'duration': row['duration'],
+ 'distance': row['distance'],
+ 'start_place': row['start_place'],
+ 'end_place': row['end_place'],
+ 'cleaned_trip': row['cleaned_trip'],
+ 'inferred_labels': row['inferred_labels'],
+ 'inferred_trip': row['inferred_trip'],
+ 'expectation': row['expectation'],
+ 'confidence_threshold': row['confidence_threshold'],
+ 'expected_trip': row['expected_trip'],
+ 'user_input': row['user_input']
+ }
+ trip = ecwe.Entry.create_entry(user_id=row['user_id'],
+ key='analysis/confirmed_trip',
+ data=data)
+ trips_list += [trip]
+
+ return trips_list
+
+
+class ClusterExtrapolationClassifier(TripClassifier):
+ """ Classifier that extrapolates labels from a trip's cluster.
+
+ Args:
+ alg (str): clustering algorithm to use; either 'DBSCAN' or 'naive'
+ radius (int): radius for the clustering algorithm
+ svm (bool): whether or not to use SVM sub-clustering. (only when
+ alg=='DBSCAN')
+ size_thresh (int): the min number of trips a cluster must have
+ to be considered for SVM sub-division
+ purity_thresh (float): the min purity a cluster must have
+ to be sub-divided using SVM
+ gamma (float): coefficient for the rbf kernel in SVM
+ C (float): regularization hyperparameter for SVM
+ cluster_method (str): 'end', 'trip', 'combination'. whether to
+ extrapolate labels from only end clusters, only trip clusters,
+ or both end and trip clusters when available.
+ """
+
+ def __init__(
+ self,
+ alg='DBSCAN',
+ radius=100, # TODO: add diff start and end radii
+ svm=True,
+ size_thresh=1,
+ purity_thresh=1.0,
+ gamma=0.05,
+ C=1,
+ cluster_method='end'):
+ assert cluster_method in ['end', 'trip', 'combination']
+ assert alg in ['DBSCAN', 'naive']
+ self.alg = alg
+ self.radius = radius
+ self.svm = svm
+ self.size_thresh = size_thresh
+ self.purity_thresh = purity_thresh
+ self.gamma = gamma
+ self.C = C
+ self.cluster_method = cluster_method
+
+ if self.alg == 'DBSCAN':
+ self.end_cluster_model = DBSCANSVMCluster(
+ loc_type='end',
+ radius=self.radius,
+ svm=self.svm,
+ size_thresh=self.size_thresh,
+ purity_thresh=self.purity_thresh,
+ gamma=self.gamma,
+ C=self.C)
+ elif self.alg == 'naive':
+ self.end_cluster_model = RefactoredNaiveCluster(loc_type='end',
+ radius=self.radius)
+
+ if self.cluster_method in ['trip', 'combination']:
+ if self.alg == 'DBSCAN':
+ self.start_cluster_model = DBSCANSVMCluster(
+ loc_type='start',
+ radius=self.radius,
+ svm=self.svm,
+ size_thresh=self.size_thresh,
+ purity_thresh=self.purity_thresh,
+ gamma=self.gamma,
+ C=self.C)
+ elif self.alg == 'naive':
+ self.start_cluster_model = RefactoredNaiveCluster(
+ loc_type='start', radius=self.radius)
+
+ self.trip_grouper = TripGrouper(
+ start_cluster_col='start_cluster_idx',
+ end_cluster_col='end_cluster_idx')
+
+ def set_params(self, params):
+ """ hacky code that mimics the set_params of an sklearn Estimator class
+ so that we can pass params during randomizedsearchCV
+
+ Args:
+ params (dict): a dictionary where the keys are the parameter
+ names and the values are the parameter values
+ """
+ alg = params['alg'] if 'alg' in params.keys() else self.alg
+ radius = params['radius'] if 'radius' in params.keys() else self.radius
+ svm = params['svm'] if 'svm' in params.keys() else self.svm
+ size_thresh = params['size_thresh'] if 'size_thresh' in params.keys(
+ ) else self.size_thresh
+ purity_thresh = params[
+ 'purity_thresh'] if 'purity_thresh' in params.keys(
+ ) else self.purity_thresh
+ gamma = params['gamma'] if 'gamma' in params.keys() else self.gamma
+ C = params['C'] if 'C' in params.keys() else self.C
+ cluster_method = params[
+ 'cluster_method'] if 'cluster_method' in params.keys(
+ ) else self.cluster_method
+
+ # calling __init__ again is not good practice, I know...
+ self.__init__(alg, radius, svm, size_thresh, purity_thresh, gamma, C,
+ cluster_method)
+
+ return self
+
+ def fit(self, train_df,train_entry_list=None):
+ # fit clustering model
+ self.end_cluster_model.fit(train_df,train_entry_list)
+ self.train_df = self.end_cluster_model.train_df
+
+ if self.cluster_method in ['trip', 'combination']:
+ self.start_cluster_model.fit(train_df,train_entry_list)
+ self.train_df.loc[:, ['start_cluster_idx'
+ ]] = self.start_cluster_model.train_df[[
+ 'start_cluster_idx'
+ ]]
+
+ # create trip-level clusters
+ trip_cluster_idx = self.trip_grouper.fit_transform(self.train_df)
+ self.train_df.loc[:, 'trip_cluster_idx'] = trip_cluster_idx
+
+ return self
+
+ def predict_proba(self, test_df):
+ """ NOTE: these class probabilities do NOT have a
+ confidence-discounting heuristic applied.
+ """
+ self.end_cluster_model.predict(test_df)
+ # store a copy of test_df for now (TODO: make this more efficient since
+ # the data is duplicated)
+ self.test_df = self.end_cluster_model.test_df
+
+ if self.cluster_method in ['trip', 'combination']:
+ self.start_cluster_model.predict(test_df)
+ # append the start cluster indices
+ self.test_df.loc[:, [
+ 'start_cluster_idx'
+ ]] = self.start_cluster_model.test_df.loc[:, ['start_cluster_idx']]
+
+ # create trip-level clusters
+ trip_cluster_idx = self.trip_grouper.transform(self.test_df)
+ self.test_df.loc[:, 'trip_cluster_idx'] = trip_cluster_idx
+
+ # extrapolate label distributions from cluster information
+ self.test_df.loc[:, [
+ 'mode_distrib', 'purpose_distrib', 'replaced_distrib'
+ ]] = np.nan
+
+ if self.cluster_method in ['end', 'trip']:
+ cluster_col = f'{self.cluster_method}_cluster_idx'
+ self.test_df = self._add_label_distributions(
+ self.test_df, cluster_col)
+
+ else: # self.cluster_method == 'combination'
+ # try to get label distributions from trip-level clusters first,
+ # because trip-level clusters tend to be more homogenous and will
+ # yield more accurate predictions
+ self.test_df = self._add_label_distributions(
+ self.test_df, 'trip_cluster_idx')
+
+ # for trips that have an empty label-distribution after the first
+ # pass using trip clusters, try to get a distribution from the
+ # destination cluster (this includes both trips that *don't* fall
+ # into a trip cluster, as well as trips that *do* fall into a trip
+ # cluster but are missing some/all categories of labels due to
+ # missing user inputs.)
+
+ # fill in missing label-distributions by the label_type
+ # (we want to iterate by label_type rather than check cluster idx
+ # because it's possible that some trips in a trip-cluster have
+ # predictions for one label_type but not another)
+ for label_type in ['mode', 'purpose', 'replaced']:
+ self.test_df.loc[self.test_df[f'{label_type}_distrib'] ==
+ {}] = self._add_label_distributions(
+ self.test_df.loc[
+ self.test_df[f'{label_type}_distrib']
+ == {}],
+ 'end_cluster_idx',
+ label_types=[label_type])
+
+ # create the dataframe of probabilities
+ proba_dfs = []
+ for label_type in ['purpose', 'mode', 'replaced']:
+ classes = self.train_df[f'{label_type}_true'].dropna().unique()
+ proba = pd.DataFrame(
+ self.test_df[f'{label_type}_distrib'].to_list(),
+ columns=classes)
+ proba['top_pred'] = proba.idxmax(axis=1)
+ proba['top_proba'] = proba.max(axis=1, skipna=True)
+ proba['clusterable'] = self.test_df.end_cluster_idx >= 0
+ proba.loc[:, classes] = proba.loc[:, classes].fillna(0)
+ proba = pd.concat([proba], keys=[label_type], axis=1)
+ proba_dfs += [proba]
+
+ self.proba_df = pd.concat(proba_dfs, axis=1)
+ return self.proba_df
+
+ def _add_label_distributions(self,
+ df,
+ cluster_col,
+ label_types=['mode', 'purpose', 'replaced']):
+ """ Add label distributions to a DataFrame.
+
+ Args:
+ df (DataFrame): DataFrame containing a column of clusters
+ cluster_col (str): name of column in df containing clusters
+ label_types (str list): the categories of labels to retrieve
+ distributions for.
+
+ Returns:
+ a DataFrame with additional columns in which the entries are
+ dictionaries containing label distributions.
+ """
+ df = df.copy() # to avoid SettingWithCopyWarning
+ for c in df.loc[:, cluster_col].unique():
+ labeled_trips_in_cluster = self.train_df.loc[
+ self.train_df[cluster_col] == c]
+ unlabeled_trips_in_cluster = df.loc[df[cluster_col] == c]
+
+ cluster_size = len(unlabeled_trips_in_cluster)
+
+ for label_type in label_types:
+ assert label_type in ['mode', 'purpose', 'replaced']
+
+ # get distribution of label_type labels in this cluster
+ distrib = labeled_trips_in_cluster[
+ f'{label_type}_true'].value_counts(normalize=True,
+ dropna=True).to_dict()
+ # TODO: add confidence discounting
+
+ # update predictions
+ # convert the dict into a list of dicts to work around pandas
+ # thinking we're trying to insert information according to a
+ # key-value map
+ # TODO: this is the line throwing the set on slice warning
+ df.loc[df[cluster_col] == c,
+ f'{label_type}_distrib'] = [distrib] * cluster_size
+
+ return df
+
+
+class EnsembleClassifier(TripClassifier, metaclass=ABCMeta):
+ """ Template class for trip classifiers using ensemble algorithms.
+
+ Required args:
+ loc_feature (str): 'coordinates' or 'cluster'
+ """
+ base_features = [
+ 'duration',
+ 'distance',
+ 'start_local_dt_year',
+ 'start_local_dt_month',
+ 'start_local_dt_day',
+ 'start_local_dt_hour',
+ # 'start_local_dt_minute',
+ 'start_local_dt_weekday',
+ 'end_local_dt_year', # most likely the same as the start year
+ 'end_local_dt_month', # most likely the same as the start month
+ 'end_local_dt_day',
+ 'end_local_dt_hour',
+ # 'end_local_dt_minute',
+ 'end_local_dt_weekday',
+ ]
+ targets = ['mode_true', 'purpose_true', 'replaced_true']
+
+ # required instance attributes
+ loc_feature = NotImplemented
+ purpose_enc = NotImplemented
+ mode_enc = NotImplemented
+ purpose_predictor = NotImplemented
+ mode_predictor = NotImplemented
+ replaced_predictor = NotImplemented
+
+ # required methods
+ def fit(self, train_df,unused=None):
+ # get location features
+ if self.loc_feature == 'cluster':
+ # fit clustering model(s) and one-hot encode their indices
+ # TODO: consolidate start/end_cluster_model in a single instance
+ # that has a location_type parameter in the fit() method
+ self.end_cluster_model.fit(train_df)
+
+ clusters_to_encode = self.end_cluster_model.train_df[[
+ 'end_cluster_idx'
+ ]].copy() # copy is to avoid SettingWithCopyWarning
+
+ if self.use_start_clusters or self.use_trip_clusters:
+ self.start_cluster_model.fit(train_df)
+
+ if self.use_start_clusters:
+ clusters_to_encode = pd.concat([
+ clusters_to_encode,
+ self.start_cluster_model.train_df[['start_cluster_idx']]
+ ],
+ axis=1)
+ if self.use_trip_clusters:
+ start_end_clusters = pd.concat([
+ self.end_cluster_model.train_df[['end_cluster_idx']],
+ self.start_cluster_model.train_df[['start_cluster_idx']]
+ ],
+ axis=1)
+ trip_cluster_idx = self.trip_grouper.fit_transform(
+ start_end_clusters)
+ clusters_to_encode.loc[:,
+ 'trip_cluster_idx'] = trip_cluster_idx
+
+ loc_features_df = self.cluster_enc.fit_transform(
+ clusters_to_encode.astype(int))
+
+ # clean the df again because we need it in the next step
+ # TODO: remove redundancy
+ self.train_df = self._clean_data(train_df)
+
+ # TODO: move below code into a reusable function
+ if self.train_df.purpose_true.isna().any():
+ num_nan = self.train_df.purpose_true.value_counts(
+ dropna=False).loc[np.nan]
+ logging.info(
+ f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels'
+ )
+ self.train_df = self.train_df.dropna(
+ subset=['purpose_true']).reset_index(drop=True)
+ if len(self.train_df) == 0:
+ # i.e. no valid trips after removing all nans
+ raise Exception('no valid trips; nothing to fit')
+
+ else: # self.loc_feature == 'coordinates'
+ self.train_df = self._clean_data(train_df)
+
+ # TODO: move below code into a reusable function
+ if self.train_df.purpose_true.isna().any():
+ num_nan = self.train_df.purpose_true.value_counts(
+ dropna=False).loc[np.nan]
+ logging.info(
+ f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels'
+ )
+ self.train_df = self.train_df.dropna(
+ subset=['purpose_true']).reset_index(drop=True)
+ if len(self.train_df) == 0:
+ # i.e. no valid trips after removing all nans
+ raise Exception('no valid trips; nothing to fit')
+
+ loc_features_df = self.train_df[[
+ 'start_lon', 'start_lat', 'end_lon', 'end_lat'
+ ]]
+
+ # prepare data for the ensemble classifiers
+
+ # note that we want to use purpose data to aid our mode predictions,
+ # and use both purpose and mode data to aid our replaced-mode
+ # predictions
+ # thus, we want to one-hot encode the purpose and mode as data
+ # features, but also preserve an unencoded copy for the target columns
+
+ # dataframe holding all features and targets
+ self.Xy_train = pd.concat(
+ [self.train_df[self.base_features + self.targets], loc_features_df],
+ axis=1)
+
+ # encode purposes and modes
+ onehot_purpose_df = self.purpose_enc.fit_transform(
+ self.Xy_train[['purpose_true']], output_col_prefix='purpose')
+ onehot_mode_df = self.mode_enc.fit_transform(
+ self.Xy_train[['mode_true']], output_col_prefix='mode')
+ self.Xy_train = pd.concat(
+ [self.Xy_train, onehot_purpose_df, onehot_mode_df], axis=1)
+
+ # for predicting purpose, drop encoded purpose and mode features, as
+ # well as all target labels
+ self.X_purpose = self.Xy_train.dropna(subset=['purpose_true']).drop(
+ labels=self.targets + self.purpose_enc.onehot_encoding_cols +
+ self.mode_enc.onehot_encoding_cols,
+ axis=1)
+
+ # for predicting mode, we want to keep purpose data
+ self.X_mode = self.Xy_train.dropna(subset=['mode_true']).drop(
+ labels=self.targets + self.mode_enc.onehot_encoding_cols, axis=1)
+
+ # for predicting replaced-mode, we want to keep purpose and mode data
+ self.X_replaced = self.Xy_train.dropna(subset=['replaced_true']).drop(
+ labels=self.targets, axis=1)
+
+ self.y_purpose = self.Xy_train['purpose_true'].dropna()
+ self.y_mode = self.Xy_train['mode_true'].dropna()
+ self.y_replaced = self.Xy_train['replaced_true'].dropna()
+
+ # fit classifiers
+ if len(self.X_purpose) > 0:
+ self.purpose_predictor.fit(self.X_purpose, self.y_purpose)
+ if len(self.X_mode) > 0:
+ self.mode_predictor.fit(self.X_mode, self.y_mode)
+ if len(self.X_replaced) > 0:
+ self.replaced_predictor.fit(self.X_replaced, self.y_replaced)
+
+ return self
+
+ def predict_proba(self, test_df):
+ """ NOTE: these class probabilities do NOT have a
+ confidence-discounting heuristic applied.
+ """
+ ################
+ ### get data ###
+ ################
+ self.X_test_for_purpose = self._get_X_test_for_purpose(test_df)
+
+ ########################
+ ### make predictions ###
+ ########################
+ # note that we want to use purpose data to aid our mode predictions,
+ # and use both purpose and mode data to aid our replaced-mode
+ # predictions
+
+ # TODO: some of the code across the try and except blocks can be
+ # consolidated by considering one-hot encoding fully np.nan arrays
+ try:
+ purpose_proba_raw = self.purpose_predictor.predict_proba(
+ self.X_test_for_purpose)
+ purpose_proba = pd.DataFrame(
+ purpose_proba_raw, columns=self.purpose_predictor.classes_)
+ purpose_pred = purpose_proba.idxmax(axis=1)
+
+ # update X_test with one-hot-encoded purpose predictions to aid
+ # mode predictor
+ # TODO: converting purpose_pred to a DataFrame feels super
+ # unnecessary, make this more efficient
+ onehot_purpose_df = self.purpose_enc.transform(
+ pd.DataFrame(purpose_pred).set_index(
+ self.X_test_for_purpose.index))
+ self.X_test_for_mode = pd.concat(
+ [self.X_test_for_purpose, onehot_purpose_df], axis=1)
+
+ mode_proba, replaced_proba = self._try_predict_proba_mode_replaced()
+
+ except NotFittedError as e:
+ # if we can't predict purpose, we can still try to predict mode and
+ # replaced-mode without one-hot encoding the purpose
+
+ purpose_pred = np.full((len(self.X_test_for_purpose), ), np.nan)
+ purpose_proba_raw = np.full((len(self.X_test_for_purpose), 1), 0)
+ purpose_proba = pd.DataFrame(purpose_proba_raw, columns=[np.nan])
+
+ self.X_test_for_mode = self.X_test_for_purpose
+ mode_proba, replaced_proba = self._try_predict_proba_mode_replaced()
+
+ mode_pred = mode_proba.idxmax(axis=1)
+ replaced_pred = replaced_proba.idxmax(axis=1)
+
+ if (purpose_pred.dtype == np.float64 and mode_pred.dtype == np.float64
+ and replaced_pred.dtype == np.float64):
+ # this indicates that all the predictions are np.nan so none of the
+ # random forest classifiers were fitted
+ raise NotFittedError
+
+ # TODO: move this to a Mixin for cluster-based predictors and use the
+ # 'cluster' column of the proba_df outputs
+ # if self.drop_unclustered:
+ # # TODO: actually, we should only drop purpose predictions. we can
+ # # then impute the missing entries in the purpose feature and still
+ # # try to predict mode and replaced-mode without it
+ # self.predictions.loc[
+ # self.end_cluster_model.test_df['end_cluster_idx'] == -1,
+ # ['purpose_pred', 'mode_pred', 'replaced_pred']] = np.nan
+
+ proba_dfs = []
+ for label_type, proba in zip(
+ ['purpose', 'mode', 'replaced'],
+ [purpose_proba, mode_proba, replaced_proba]):
+ proba['top_pred'] = proba.idxmax(axis=1)
+ proba['top_proba'] = proba.max(axis=1, skipna=True)
+ proba['clusterable'] = self._clusterable(
+ self.X_test_for_purpose).astype(bool)
+ proba = pd.concat([proba], keys=[label_type], axis=1)
+ proba_dfs += [proba]
+
+ self.proba_df = pd.concat(proba_dfs, axis=1)
+ return self.proba_df
+
+ def _get_X_test_for_purpose(self, test_df):
+ """ Do the pre-processing to get data that we can then pass into the
+ ensemble classifiers.
+ """
+ if self.loc_feature == 'cluster':
+ # get clusters
+ self.end_cluster_model.predict(test_df)
+ clusters_to_encode = self.end_cluster_model.test_df[[
+ 'end_cluster_idx'
+ ]].copy() # copy is to avoid SettingWithCopyWarning
+
+ if self.use_start_clusters or self.use_trip_clusters:
+ self.start_cluster_model.predict(test_df)
+
+ if self.use_start_clusters:
+ clusters_to_encode = pd.concat([
+ clusters_to_encode,
+ self.start_cluster_model.test_df[['start_cluster_idx']]
+ ],
+ axis=1)
+ if self.use_trip_clusters:
+ start_end_clusters = pd.concat([
+ self.end_cluster_model.test_df[['end_cluster_idx']],
+ self.start_cluster_model.test_df[['start_cluster_idx']]
+ ],
+ axis=1)
+ trip_cluster_idx = self.trip_grouper.transform(
+ start_end_clusters)
+ clusters_to_encode.loc[:,
+ 'trip_cluster_idx'] = trip_cluster_idx
+
+ # one-hot encode the cluster indices
+ loc_features_df = self.cluster_enc.transform(clusters_to_encode)
+ else: # self.loc_feature == 'coordinates'
+ test_df = self._clean_data(test_df)
+ loc_features_df = test_df[[
+ 'start_lon', 'start_lat', 'end_lon', 'end_lat'
+ ]]
+
+ # extract the desired data
+ X_test = pd.concat([
+ test_df[self.base_features].reset_index(drop=True),
+ loc_features_df.reset_index(drop=True)
+ ],
+ axis=1)
+
+ return X_test
+
+ def _try_predict_proba_mode_replaced(self):
+ """ Try to predict mode and replaced-mode. Handles error in case the
+ ensemble algorithms were not fitted.
+
+ Requires self.X_test_for_mode to have already been set. (These are
+ the DataFrames containing the test data to be passed into self.
+ mode_predictor.)
+
+ Returns: mode_proba and replaced_proba, two DataFrames containing
+ class probabilities for mode and replaced-mode respectively
+ """
+
+ try:
+ # predict mode
+ mode_proba_raw = self.mode_predictor.predict_proba(
+ self.X_test_for_mode)
+ mode_proba = pd.DataFrame(mode_proba_raw,
+ columns=self.mode_predictor.classes_)
+ mode_pred = mode_proba.idxmax(axis=1)
+
+ # update X_test with one-hot-encoded mode predictions to aid
+ # replaced-mode predictor
+ onehot_mode_df = self.mode_enc.transform(
+ pd.DataFrame(mode_pred).set_index(self.X_test_for_mode.index))
+ self.X_test_for_replaced = pd.concat(
+ [self.X_test_for_mode, onehot_mode_df], axis=1)
+ replaced_proba = self._try_predict_proba_replaced()
+
+ except NotFittedError as e:
+ mode_proba_raw = np.full((len(self.X_test_for_mode), 1), 0)
+ mode_proba = pd.DataFrame(mode_proba_raw, columns=[np.nan])
+
+ # if we don't have mode predictions, we *could* still try to
+ # predict replaced mode (but if the user didn't input mode labels
+ # then it's unlikely they would input replaced-mode)
+ self.X_test_for_replaced = self.X_test_for_mode
+ replaced_proba = self._try_predict_proba_replaced()
+
+ return mode_proba, replaced_proba
+
+ def _try_predict_proba_replaced(self):
+ """ Try to predict replaced mode. Handles error in case the
+ replaced_predictor was not fitted.
+
+ Requires self.X_test_for_replaced to have already been set. (This
+ is the DataFrame containing the test data to be passed into self.
+ replaced_predictor.)
+
+ Returns: replaced_proba, DataFrame containing class probabilities
+ for replaced-mode
+ """
+ try:
+ replaced_proba_raw = self.replaced_predictor.predict_proba(
+ self.X_test_for_replaced
+ ) # has shape (len_trips, number of replaced_mode classes)
+ replaced_proba = pd.DataFrame(
+ replaced_proba_raw, columns=self.replaced_predictor.classes_)
+
+ except NotFittedError as e:
+ replaced_proba_raw = np.full((len(self.X_test_for_replaced), 1), 0)
+ replaced_proba = pd.DataFrame(replaced_proba_raw, columns=[np.nan])
+
+ return replaced_proba
+
+ def _clusterable(self, test_df):
+ """ Check if the end points can be clustered (i.e. are within
+ meters of an end point from the training set)
+ """
+ if self.loc_feature == 'cluster':
+ return self.end_cluster_model.test_df.end_cluster_idx >= 0
+
+ n_samples = test_df.shape[0]
+ clustered = np.ones(shape=n_samples, dtype=int) * False
+
+ train_coordinates = self.train_df[['end_lat', 'end_lon']]
+ train_radians = np.radians(train_coordinates)
+
+ for idx, row in test_df.reset_index(drop=True).iterrows():
+ # calculate the distances between the ith test data and all points,
+ # then find the minimum distance for each point and check if it's
+ # within the distance threshold.
+ # unfortunately, pairwise_distances_argmin() does not support
+ # haversine distance, so we have to reimplement it ourselves
+ new_loc_radians = np.radians(row[["end_lat", "end_lon"]].to_list())
+ new_loc_radians = np.reshape(new_loc_radians, (1, 2))
+ dist_matrix_meters = haversine_distances(
+ new_loc_radians, train_radians) * EARTH_RADIUS
+
+ shortest_dist = np.min(dist_matrix_meters)
+ if shortest_dist < self.radius:
+ clustered[idx] = True
+
+ return clustered
+
+
+class ForestClassifier(EnsembleClassifier):
+ """ Random forest-based trip classifier.
+
+ Args:
+ loc_feature (str): 'coordinates' or 'cluster'; whether to use lat/
+ lon coordinates or cluster indices for the location feature
+ radius (int): radius for DBSCAN clustering. only if
+ loc_feature=='cluster'
+ size_thresh (int): the min number of trips a cluster must have to
+ be considered for sub-division via SVM. only if
+ loc_feature=='cluster'
+ purity_thresh (float): the min purity a cluster must have to be
+ sub-divided via SVM. only if loc_feature=='cluster'
+ gamma (float): coefficient for the rbf kernel in SVM. only if
+ loc_feature=='cluster'
+ C (float): regularization hyperparameter for SVM. only if
+ loc_feature=='cluster'
+ n_estimators (int): number of estimators in the random forest
+ criterion (str): function to measure the quality of a split in the
+ random forest
+ max_depth (int): max depth of a tree in the random forest.
+ unlimited if None.
+ min_samples_split (int): min number of samples required to split an
+ internal node in a decision tree
+ min_samples_leaf (int): min number of samples required for a leaf
+ node in a decision tree
+ max_features (str): number of features to consider when looking for
+ the best split in a decision tree
+ bootstrap (bool): whether bootstrap samples are used when building
+ decision trees
+ random_state (int): random state for deterministic random forest
+ construction
+ use_start_clusters (bool): whether or not to use start clusters as
+ input features to the ensemble classifier. only if
+ loc_feature=='cluster'
+ use_trip_clusters (bool): whether or not to use trip-level clusters
+ as input features to the ensemble classifier. only if
+ loc_feature=='cluster'
+ """
+
+ def __init__(
+ self,
+ loc_feature='coordinates',
+ radius=100, # TODO: add different start and end radii
+ size_thresh=1,
+ purity_thresh=1.0,
+ gamma=0.05,
+ C=1,
+ n_estimators=100,
+ criterion='gini',
+ max_depth=None,
+ min_samples_split=2,
+ min_samples_leaf=1,
+ max_features='sqrt',
+ bootstrap=True,
+ random_state=42,
+ # drop_unclustered=False,
+ use_start_clusters=False,
+ use_trip_clusters=True):
+ assert loc_feature in ['cluster', 'coordinates']
+ self.loc_feature = loc_feature
+ self.radius = radius
+ self.size_thresh = size_thresh
+ self.purity_thresh = purity_thresh
+ self.gamma = gamma
+ self.C = C
+ self.n_estimators = n_estimators
+ self.criterion = criterion
+ self.max_depth = max_depth
+ self.min_samples_split = min_samples_split
+ self.min_samples_leaf = min_samples_leaf
+ self.max_features = max_features
+ self.bootstrap = bootstrap
+ self.random_state = random_state
+ # self.drop_unclustered = drop_unclustered
+ self.use_start_clusters = use_start_clusters
+ self.use_trip_clusters = use_trip_clusters
+
+ if self.loc_feature == 'cluster':
+ # clustering algorithm to generate end clusters
+ self.end_cluster_model = DBSCANSVMCluster(
+ loc_type='end',
+ radius=self.radius,
+ size_thresh=self.size_thresh,
+ purity_thresh=self.purity_thresh,
+ gamma=self.gamma,
+ C=self.C)
+
+ if self.use_start_clusters or self.use_trip_clusters:
+ # clustering algorithm to generate start clusters
+ self.start_cluster_model = DBSCANSVMCluster(
+ loc_type='start',
+ radius=self.radius,
+ size_thresh=self.size_thresh,
+ purity_thresh=self.purity_thresh,
+ gamma=self.gamma,
+ C=self.C)
+
+ if self.use_trip_clusters:
+ # helper class to generate trip-level clusters
+ self.trip_grouper = TripGrouper(
+ start_cluster_col='start_cluster_idx',
+ end_cluster_col='end_cluster_idx')
+
+ # wrapper class to generate one-hot encodings for cluster indices
+ self.cluster_enc = OneHotWrapper(sparse=False,
+ handle_unknown='ignore')
+
+ # wrapper class to generate one-hot encodings for purposes and modes
+ self.purpose_enc = OneHotWrapper(impute_missing=True,
+ sparse=False,
+ handle_unknown='error')
+ self.mode_enc = OneHotWrapper(impute_missing=True,
+ sparse=False,
+ handle_unknown='error')
+
+ # ensemble classifiers for each label category
+ self.purpose_predictor = RandomForestClassifier(
+ n_estimators=self.n_estimators,
+ criterion=self.criterion,
+ max_depth=self.max_depth,
+ min_samples_split=self.min_samples_split,
+ min_samples_leaf=self.min_samples_leaf,
+ max_features=self.max_features,
+ bootstrap=self.bootstrap,
+ random_state=self.random_state)
+ self.mode_predictor = RandomForestClassifier(
+ n_estimators=self.n_estimators,
+ criterion=self.criterion,
+ max_depth=self.max_depth,
+ min_samples_split=self.min_samples_split,
+ min_samples_leaf=self.min_samples_leaf,
+ max_features=self.max_features,
+ bootstrap=self.bootstrap,
+ random_state=self.random_state)
+ self.replaced_predictor = RandomForestClassifier(
+ n_estimators=self.n_estimators,
+ criterion=self.criterion,
+ max_depth=self.max_depth,
+ min_samples_split=self.min_samples_split,
+ min_samples_leaf=self.min_samples_leaf,
+ max_features=self.max_features,
+ bootstrap=self.bootstrap,
+ random_state=self.random_state)
+
+ def set_params(self, params):
+ """ hacky code that mimics the set_params of an sklearn Estimator class
+ so that we can pass params during randomizedsearchCV
+
+ Args:
+ params (dict): a dictionary where the keys are the parameter
+ names and the values are the parameter values
+ """
+ loc_feature = params['loc_feature'] if 'loc_feature' in params.keys(
+ ) else self.loc_feature
+ radius = params['radius'] if 'radius' in params.keys() else self.radius
+ size_thresh = params['size_thresh'] if 'size_thresh' in params.keys(
+ ) else self.size_thresh
+ purity_thresh = params[
+ 'purity_thresh'] if 'purity_thresh' in params.keys(
+ ) else self.purity_thresh
+ gamma = params['gamma'] if 'gamma' in params.keys() else self.gamma
+ C = params['C'] if 'C' in params.keys() else self.C
+ n_estimators = params['n_estimators'] if 'n_estimators' in params.keys(
+ ) else self.n_estimators
+ criterion = params['criterion'] if 'criterion' in params.keys(
+ ) else self.criterion
+ max_depth = params['max_depth'] if 'max_depth' in params.keys(
+ ) else self.max_depth
+ min_samples_split = params[
+ 'min_samples_split'] if 'min_samples_split' in params.keys(
+ ) else self.min_samples_split
+ min_samples_leaf = params[
+ 'min_samples_leaf'] if 'min_samples_leaf' in params.keys(
+ ) else self.min_samples_leaf
+ max_features = params['max_features'] if 'max_features' in params.keys(
+ ) else self.max_features
+ bootstrap = params['bootstrap'] if 'bootstrap' in params.keys(
+ ) else self.bootstrap
+ random_state = params['random_state'] if 'random_state' in params.keys(
+ ) else self.random_state
+ use_start_clusters = params[
+ 'use_start_clusters'] if 'use_start_clusters' in params.keys(
+ ) else self.use_start_clusters
+ # drop_unclustered = params[
+ # 'drop_unclustered'] if 'drop_unclustered' in params.keys(
+ # ) else self.drop_unclustered
+ use_trip_clusters = params[
+ 'use_trip_clusters'] if 'use_trip_clusters' in params.keys(
+ ) else self.use_trip_clusters
+
+ # yes, calling __init__ again is not good practice...
+ self.__init__(loc_feature, radius, size_thresh, purity_thresh, gamma, C,
+ n_estimators, criterion, max_depth, min_samples_split,
+ min_samples_leaf, max_features, bootstrap, random_state,
+ use_start_clusters, use_trip_clusters)
+ return self
+
+
+class ClusterForestSlimPredictor(ForestClassifier):
+ """ This is the same as ForestClassifier, just with fewer base
+ features.
+
+ Args:
+ loc_feature (str): 'coordinates' or 'cluster'; whether to use lat/
+ lon coordinates or cluster indices for the location feature
+ radius (int): radius for DBSCAN clustering. only if
+ loc_feature=='cluster'
+ size_thresh (int): the min number of trips a cluster must have to
+ be considered for sub-division via SVM. only if
+ loc_feature=='cluster'
+ purity_thresh (float): the min purity a cluster must have to be
+ sub-divided via SVM. only if loc_feature=='cluster'
+ gamma (float): coefficient for the rbf kernel in SVM. only if
+ loc_feature=='cluster'
+ C (float): regularization hyperparameter for SVM. only if
+ loc_feature=='cluster'
+ n_estimators (int): number of estimators in the random forest
+ criterion (str): function to measure the quality of a split in the
+ random forest
+ max_depth (int): max depth of a tree in the random forest.
+ unlimited if None.
+ min_samples_split (int): min number of samples required to split an
+ internal node in a decision tree
+ min_samples_leaf (int): min number of samples required for a leaf
+ node in a decision tree
+ max_features (str): number of features to consider when looking for
+ the best split in a decision tree
+ bootstrap (bool): whether bootstrap samples are used when building
+ decision trees
+ random_state (int): random state for deterministic random forest
+ construction
+ use_start_clusters (bool): whether or not to use start clusters as
+ input features to the ensemble classifier. only if
+ loc_feature=='cluster'
+ use_trip_clusters (bool): whether or not to use trip-level clusters
+ as input features to the ensemble classifier. only if
+ loc_feature=='cluster'
+ """
+
+ def __init__(
+ self,
+ loc_feature='coordinates',
+ radius=100, # TODO: add different start and end radii
+ size_thresh=1,
+ purity_thresh=1.0,
+ gamma=0.05,
+ C=1,
+ n_estimators=100,
+ criterion='gini',
+ max_depth=None,
+ min_samples_split=2,
+ min_samples_leaf=1,
+ max_features='sqrt',
+ bootstrap=True,
+ random_state=42,
+ # drop_unclustered=False,
+ use_start_clusters=False,
+ use_trip_clusters=True):
+
+ super().__init__(loc_feature, radius, size_thresh, purity_thresh, gamma,
+ C, n_estimators, criterion, max_depth,
+ min_samples_split, min_samples_leaf, max_features,
+ bootstrap, random_state, use_start_clusters,
+ use_trip_clusters)
+
+ self.base_features = [
+ 'duration',
+ 'distance',
+ ]
+
+
+class AdaBoostClassifier(EnsembleClassifier):
+ """ AdaBoost-based trip classifier.
+
+ Args:
+ loc_feature (str): 'coordinates' or 'cluster'; whether to use lat/
+ lon coordinates or cluster indices for the location feature
+ radius (int): radius for DBSCAN clustering. only if
+ loc_feature=='cluster'
+ size_thresh (int): the min number of trips a cluster must have to
+ be considered for sub-division via SVM. only if
+ loc_feature=='cluster'
+ purity_thresh (float): the min purity a cluster must have to be
+ sub-divided via SVM. only if loc_feature=='cluster'
+ gamma (float): coefficient for the rbf kernel in SVM. only if
+ loc_feature=='cluster'
+ C (float): regularization hyperparameter for SVM. only if
+ loc_feature=='cluster'
+ n_estimators (int): number of estimators
+ criterion (str): function to measure the quality of a split in a
+ decision tree
+ max_depth (int): max depth of a tree in the random forest.
+ unlimited if None.
+ min_samples_split (int): min number of samples required to split an
+ internal node in a decision tree
+ min_samples_leaf (int): min number of samples required for a leaf
+ node in a decision tree
+ max_features (str): number of features to consider when looking for
+ the best split in a decision tree
+ random_state (int): random state for deterministic random forest
+ construction
+ use_start_clusters (bool): whether or not to use start clusters as
+ input features to the ensemble classifier. only if
+ loc_feature=='cluster'
+ use_trip_clusters (bool): whether or not to use trip-level clusters
+ as input features to the ensemble classifier. only if
+ loc_feature=='cluster'
+ learning_rate (float): weight applied to each decision tree at each
+ boosting iteration
+ """
+
+ def __init__(
+ self,
+ loc_feature='coordinates',
+ radius=100, # TODO: add different start and end radii
+ size_thresh=1,
+ purity_thresh=1.0,
+ gamma=0.05,
+ C=1,
+ n_estimators=100,
+ criterion='gini',
+ max_depth=None,
+ min_samples_split=2,
+ min_samples_leaf=1,
+ max_features='sqrt',
+ random_state=42,
+ # drop_unclustered=False,
+ use_start_clusters=False,
+ use_trip_clusters=True,
+ use_base_clusters=True,
+ learning_rate=1.0):
+ assert loc_feature in ['cluster', 'coordinates']
+ self.loc_feature = loc_feature
+ self.radius = radius
+ self.size_thresh = size_thresh
+ self.purity_thresh = purity_thresh
+ self.gamma = gamma
+ self.C = C
+ self.n_estimators = n_estimators
+ self.criterion = criterion
+ self.max_depth = max_depth
+ self.min_samples_split = min_samples_split
+ self.min_samples_leaf = min_samples_leaf
+ self.max_features = max_features
+ self.random_state = random_state
+ # self.drop_unclustered = drop_unclustered
+ self.use_start_clusters = use_start_clusters
+ self.use_trip_clusters = use_trip_clusters
+ self.use_base_clusters = use_base_clusters
+ self.learning_rate = learning_rate
+
+ if self.loc_feature == 'cluster':
+ # clustering algorithm to generate end clusters
+ self.end_cluster_model = DBSCANSVMCluster(
+ loc_type='end',
+ radius=self.radius,
+ size_thresh=self.size_thresh,
+ purity_thresh=self.purity_thresh,
+ gamma=self.gamma,
+ C=self.C)
+
+ if self.use_start_clusters or self.use_trip_clusters:
+ # clustering algorithm to generate start clusters
+ self.start_cluster_model = DBSCANSVMCluster(
+ loc_type='start',
+ radius=self.radius,
+ size_thresh=self.size_thresh,
+ purity_thresh=self.purity_thresh,
+ gamma=self.gamma,
+ C=self.C)
+
+ if self.use_trip_clusters:
+ # helper class to generate trip-level clusters
+ self.trip_grouper = TripGrouper(
+ start_cluster_col='start_cluster_idx',
+ end_cluster_col='end_cluster_idx')
+
+ # wrapper class to generate one-hot encodings for cluster indices
+ self.cluster_enc = OneHotWrapper(sparse=False,
+ handle_unknown='ignore')
+
+ # wrapper class to generate one-hot encodings for purposes and modes
+ self.purpose_enc = OneHotWrapper(impute_missing=True,
+ sparse=False,
+ handle_unknown='error')
+ self.mode_enc = OneHotWrapper(impute_missing=True,
+ sparse=False,
+ handle_unknown='error')
+
+ self.purpose_predictor = AdaBoostClassifier(
+ n_estimators=self.n_estimators,
+ learning_rate=self.learning_rate,
+ random_state=self.random_state,
+ base_estimator=DecisionTreeClassifier(
+ criterion=self.criterion,
+ max_depth=self.max_depth,
+ min_samples_split=self.min_samples_split,
+ min_samples_leaf=self.min_samples_leaf,
+ max_features=self.max_features,
+ random_state=self.random_state))
+ self.mode_predictor = AdaBoostClassifier(
+ n_estimators=self.n_estimators,
+ learning_rate=self.learning_rate,
+ random_state=self.random_state,
+ base_estimator=DecisionTreeClassifier(
+ criterion=self.criterion,
+ max_depth=self.max_depth,
+ min_samples_split=self.min_samples_split,
+ min_samples_leaf=self.min_samples_leaf,
+ max_features=self.max_features,
+ random_state=self.random_state))
+ self.replaced_predictor = AdaBoostClassifier(
+ n_estimators=self.n_estimators,
+ learning_rate=self.learning_rate,
+ random_state=self.random_state,
+ base_estimator=DecisionTreeClassifier(
+ criterion=self.criterion,
+ max_depth=self.max_depth,
+ min_samples_split=self.min_samples_split,
+ min_samples_leaf=self.min_samples_leaf,
+ max_features=self.max_features,
+ random_state=self.random_state))
+
+ def set_params(self, params):
+ """ hacky code that mimics the set_params of an sklearn Estimator class
+ so that we can pass params during randomizedsearchCV
+
+ Args:
+ params (dict): a dictionary where the keys are the parameter
+ names and the values are the parameter values
+ """
+ radius = params['radius'] if 'radius' in params.keys() else self.radius
+ size_thresh = params['size_thresh'] if 'size_thresh' in params.keys(
+ ) else self.size_thresh
+ purity_thresh = params[
+ 'purity_thresh'] if 'purity_thresh' in params.keys(
+ ) else self.purity_thresh
+ gamma = params['gamma'] if 'gamma' in params.keys() else self.gamma
+ C = params['C'] if 'C' in params.keys() else self.C
+ n_estimators = params['n_estimators'] if 'n_estimators' in params.keys(
+ ) else self.n_estimators
+ criterion = params['criterion'] if 'criterion' in params.keys(
+ ) else self.criterion
+ max_depth = params['max_depth'] if 'max_depth' in params.keys(
+ ) else self.max_depth
+ min_samples_split = params[
+ 'min_samples_split'] if 'min_samples_split' in params.keys(
+ ) else self.min_samples_split
+ min_samples_leaf = params[
+ 'min_samples_leaf'] if 'min_samples_leaf' in params.keys(
+ ) else self.min_samples_leaf
+ max_features = params['max_features'] if 'max_features' in params.keys(
+ ) else self.max_features
+ random_state = params['random_state'] if 'random_state' in params.keys(
+ ) else self.random_state
+ use_start_clusters = params[
+ 'use_start_clusters'] if 'use_start_clusters' in params.keys(
+ ) else self.use_start_clusters
+ # drop_unclustered = params[
+ # 'drop_unclustered'] if 'drop_unclustered' in params.keys(
+ # ) else self.drop_unclustered
+ use_trip_clusters = params[
+ 'use_trip_clusters'] if 'use_trip_clusters' in params.keys(
+ ) else self.use_trip_clusters
+ learning_rate = params[
+ 'learning_rate'] if 'learning_rate' in params.keys(
+ ) else self.learning_rate
+
+ # calling __init__ again is not good practice, I know...
+ self.__init__(radius, size_thresh, purity_thresh, gamma, C,
+ n_estimators, criterion, max_depth, min_samples_split,
+ min_samples_leaf, max_features, random_state,
+ use_start_clusters, use_trip_clusters, learning_rate)
+ return self
+
+
+class TripGrouper():
+ """ Helper class to get trip clusters from start and end clusters.
+
+ Args:
+ start_cluster_col (str): name of the column containing start
+ cluster indices
+ end_cluster_col (str): name of the column containing end cluster
+ indices
+ """
+
+ def __init__(self,
+ start_cluster_col='start_cluster_idx',
+ end_cluster_col='end_cluster_idx'):
+ self.start_cluster_col = start_cluster_col
+ self.end_cluster_col = end_cluster_col
+
+ def fit_transform(self, trip_df):
+ """ Fit and remember possible trip clusters.
+
+ Args:
+ trip_df (DataFrame): DataFrame containing trips. must have
+ columns and
+ """
+ trip_groups = trip_df.groupby(
+ [self.start_cluster_col, self.end_cluster_col])
+
+ # need dict so we can access the trip indices of all the trips in each
+ # group. the key is the group tuple and the value is the list of trip
+ # indices in the group.
+ self.trip_groups_dict = dict(trip_groups.groups)
+
+ # we want to convert trip-group tuples to to trip-cluster indices,
+ # hence the pd Series
+ trip_groups_series = pd.Series(list(self.trip_groups_dict.keys()))
+
+ trip_cluster_idx = np.empty(len(trip_df))
+
+ for group_idx in range(len(trip_groups_series)):
+ group_tuple = trip_groups_series[group_idx]
+ trip_idxs_in_group = self.trip_groups_dict[group_tuple]
+ trip_cluster_idx[trip_idxs_in_group] = group_idx
+
+ return trip_cluster_idx
+
+ def transform(self, new_trip_df):
+ """ Get trip clusters for a new set of trips.
+
+ Args:
+ new_trip_df (DataFrame): DataFrame containing trips. must have
+ columns and
+ """
+ prediction_trip_groups = new_trip_df.groupby(
+ [self.start_cluster_col, self.end_cluster_col])
+
+ # need dict so we can access the trip indices of all the trips in each
+ # group. the key is the group tuple and the value is the list of trip
+ # indices in the group.
+ prediction_trip_groups_dict = dict(prediction_trip_groups.groups)
+ trip_groups_series = pd.Series(list(self.trip_groups_dict.keys()))
+ trip_cluster_idx = np.empty(len(new_trip_df))
+
+ for group_tuple in dict(prediction_trip_groups.groups).keys():
+ # check if the trip cluster exists in the training set
+ trip_idxs_in_group = prediction_trip_groups_dict[group_tuple]
+ if group_tuple in self.trip_groups_dict.keys():
+ # look up the group index from the series we created when we
+ # fit the model
+ group_idx = trip_groups_series[trip_groups_series ==
+ group_tuple].index[0]
+ else:
+ group_idx = -1
+
+ trip_cluster_idx[trip_idxs_in_group] = group_idx
+
+ return trip_cluster_idx
+
+
+class OneHotWrapper():
+ """ Helper class to streamline one-hot encoding.
+
+ Args:
+ impute_missing (bool): whether or not to impute np.nan values.
+ sparse (bool): whether or not to return a sparse matrix.
+ handle_unknown (str): specifies the way unknown categories are
+ handled during transform.
+ """
+
+ def __init__(
+ self,
+ impute_missing=False,
+ sparse=False,
+ handle_unknown='ignore',
+ ):
+ self.impute_missing = impute_missing
+ if self.impute_missing:
+ self.encoder = make_pipeline(
+ SimpleImputer(missing_values=np.nan,
+ strategy='constant',
+ fill_value='missing'),
+ OneHotEncoder(sparse=False, handle_unknown=handle_unknown))
+ else:
+ self.encoder = OneHotEncoder(sparse=sparse,
+ handle_unknown=handle_unknown)
+
+ def fit_transform(self, train_df, output_col_prefix=None):
+ """ Establish one-hot encoded variables.
+
+ Args:
+ train_df (DataFrame): DataFrame containing train trips.
+ output_col_prefix (str): only if train_df is a single column
+ """
+ # TODO: handle pd series
+
+ train_df = train_df.copy() # to avoid SettingWithCopyWarning
+
+ # if imputing, the dtype of each column must be string/object and not
+ # numerical, otherwise the SimpleImputer will fail
+ if self.impute_missing:
+ for col in train_df.columns:
+ train_df[col] = train_df[col].astype(object)
+ onehot_encoding = self.encoder.fit_transform(train_df)
+ self.onehot_encoding_cols_all = []
+ for col in train_df.columns:
+ if train_df.shape[1] > 1 or output_col_prefix is None:
+ output_col_prefix = col
+ self.onehot_encoding_cols_all += [
+ f'{output_col_prefix}_{val}'
+ for val in np.sort(train_df[col].dropna().unique())
+ ]
+ # we handle np.nan separately because it is of type float, and may
+ # cause issues with np.sort if the rest of the unique values are
+ # strings
+ if any((train_df[col].isna())):
+ self.onehot_encoding_cols_all += [f'{output_col_prefix}_nan']
+
+ onehot_encoding_df = pd.DataFrame(
+ onehot_encoding,
+ columns=self.onehot_encoding_cols_all).set_index(train_df.index)
+
+ # ignore the encoded columns for missing entries
+ self.onehot_encoding_cols = copy.deepcopy(self.onehot_encoding_cols_all)
+ for col in self.onehot_encoding_cols_all:
+ if col.endswith('_nan'):
+ onehot_encoding_df = onehot_encoding_df.drop(columns=[col])
+ self.onehot_encoding_cols.remove(col)
+
+ return onehot_encoding_df.astype(int)
+
+ def transform(self, test_df):
+ """ One-hot encoded features in accordance with features seen in the
+ train set.
+
+ Args:
+ test_df (DataFrame): DataFrame of trips.
+ """
+ # TODO: rename test_df, this one doesn't necessarily need to be a df
+ onehot_encoding = self.encoder.transform(test_df)
+ onehot_encoding_df = pd.DataFrame(
+ onehot_encoding,
+ columns=self.onehot_encoding_cols_all).set_index(test_df.index)
+
+ # ignore the encoded columns for missing entries
+ for col in self.onehot_encoding_cols_all:
+ if col.endswith('_nan'):
+ onehot_encoding_df = onehot_encoding_df.drop(columns=[col])
+
+ return onehot_encoding_df.astype(int)