Skip to content

Commit

Permalink
[NOT TESTED]Predict implemented
Browse files Browse the repository at this point in the history
Predict is now included. Just need to figure out model storage and model testing.
  • Loading branch information
humbleOldSage committed Oct 3, 2023
1 parent e9abd51 commit 3820d87
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 30 deletions.
280 changes: 255 additions & 25 deletions emission/analysis/modelling/trip_model/forest_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,64 @@
from sklearn.preprocessing import OneHotEncoder
import joblib
from typing import Dict, List, Optional, Tuple
from sklearn.metrics.pairwise import haversine_distances
import emission.core.wrapper.confirmedtrip as ecwc
import logging
import numpy as np

import copy

import emission.analysis.modelling.trip_model.trip_model as eamuu
import emission.analysis.modelling.trip_model.dbscan_svm as eamtd
import emission.analysis.modelling.trip_model.util as eamtu
import emission.analysis.modelling.trip_model.config as eamtc
import emission.storage.timeseries.builtin_timeseries as estb
from sklearn.exceptions import NotFittedError

from sklearn.ensemble import RandomForestClassifier

EARTH_RADIUS = 6371000

class ForestClassifier(eamuu.TripModel):

def __init__(self,config=None):

# expected_keys = [
# 'metric',
# 'similarity_threshold_meters',
# 'apply_cutoff',
# 'incremental_evaluation'
# ]
# for k in expected_keys:
# if config.get(k) is None:
# msg = f"greedy trip model config missing expected key {k}"
# raise KeyError(msg)
if config is None:
config = eamtc.get_config_value_or_raise('model_parameters.forest')
logging.debug(f'ForestClassifier loaded model config from file')
else:
logging.debug(f'ForestClassifier using model config argument')

random_forest_expected_keys = [
'loc_feature',
'n_estimators',
'criterion',
'max_depth',
'min_samples_split',
'min_samples_leaf',
'max_features',
'bootstrap',
]
cluster_expected_keys= [
'radius',
'size_thresh',
'purity_thresh',
'gamma',
'C',
'use_start_clusters',
'use_trip_clusters',
]

for k in random_forest_expected_keys:
if config.get(k) is None:
msg = f"forest trip model config missing expected key {k}"
raise KeyError(msg)

if config['loc_feature'] == 'cluster':
for k in cluster_expected_keys:
if config.get(k) is None:
msg = f"cluster trip model config missing expected key {k}"
raise KeyError(msg)

self.loc_feature = config['loc_feature']
self.radius = config['radius']
self.size_thresh = config['size_thresh']
Expand All @@ -50,6 +77,21 @@ def __init__(self,config=None):
# self.drop_unclustered = drop_unclustered
self.use_start_clusters = config['use_start_clusters']
self.use_trip_clusters = config['use_trip_clusters']
self.base_features = [
'duration',
'distance',
'start_local_dt_year',
'start_local_dt_month',
'start_local_dt_day',
'start_local_dt_hour',
'start_local_dt_weekday',
'end_local_dt_year', # most likely the same as the start year
'end_local_dt_month', # most likely the same as the start month
'end_local_dt_day',
'end_local_dt_hour',
'end_local_dt_weekday',
]
self.targets = ['mode_true', 'purpose_true', 'replaced_true']

if self.loc_feature == 'cluster':
# clustering algorithm to generate end clusters
Expand Down Expand Up @@ -119,8 +161,16 @@ def __init__(self,config=None):
random_state=self.random_state)


def fit(self,data: List[ecwc.Confirmedtrip],data_df=None):
def fit(self,trips: List[ecwc.Confirmedtrip]):
# get location features
logging.debug(f'fit called with {len(trips)} trips')

unlabeled = list(filter(lambda t: len(t['data']['user_input']) == 0, trips))
if len(unlabeled) > 0:
msg = f'model.fit cannot be called with unlabeled trips, found {len(unlabeled)}'
raise Exception(msg)
data_df = estb.BuiltinTimeSeries.to_data_df("analysis/confirmed_trip",trips)

if self.loc_feature == 'cluster':
# fit clustering model(s) and one-hot encode their indices
# TODO: consolidate start/end_cluster_model in a single instance
Expand Down Expand Up @@ -238,22 +288,196 @@ def fit(self,data: List[ecwc.Confirmedtrip],data_df=None):
self.mode_predictor.fit(self.X_mode, self.y_mode)
if len(self.X_replaced) > 0:
self.replaced_predictor.fit(self.X_replaced, self.y_replaced)

def predict(self, data: List[float]) -> Tuple[List[Dict], int]:
pass
logging.info(f"Forest model fit to {len(trips)} rows of trip data")

def to_dict(self) -> Dict:
return joblib.dump(self,compress=3)

def from_dict(self, model: Dict):
pass
def predict(self, trips: List[float]) -> Tuple[List[Dict], int]:
logging.debug(f"forest classifier predict called with {len(trips)} trips")

def is_incremental(self) -> bool:
pass
if len(trips) == 0:
msg = f'model.predict cannot be called with 0 trips'
raise Exception(msg)

# CONVERT TRIPS TO dataFrame
test_df = estb.BuiltinTimeSeries.to_data_df("analysis/confirmed_trip",trips)

def extract_features(self, trip: ecwc.Confirmedtrip) -> List[float]:
pass
self.X_test_for_purpose = self._get_X_test_for_purpose(test_df)

########################
### make predictions ###
########################
# note that we want to use purpose data to aid our mode predictions,
# and use both purpose and mode data to aid our replaced-mode
# predictions
try:
purpose_proba_raw = self.purpose_predictor.predict_proba(
self.X_test_for_purpose)
purpose_proba = pd.DataFrame(
purpose_proba_raw, columns=self.purpose_predictor.classes_)
purpose_pred = purpose_proba.idxmax(axis=1)

# update X_test with one-hot-encoded purpose predictions to aid
# mode predictor
onehot_purpose_df = self.purpose_enc.transform(
pd.DataFrame(purpose_pred).set_index(
self.X_test_for_purpose.index))
self.X_test_for_mode = pd.concat(
[self.X_test_for_purpose, onehot_purpose_df], axis=1)

mode_proba, replaced_proba = self._try_predict_proba_mode_replaced()

except NotFittedError as e:
# if we can't predict purpose, we can still try to predict mode and
# replaced-mode without one-hot encoding the purpose

purpose_pred = np.full((len(self.X_test_for_purpose), ), np.nan)
purpose_proba_raw = np.full((len(self.X_test_for_purpose), 1), 0)
purpose_proba = pd.DataFrame(purpose_proba_raw, columns=[np.nan])

self.X_test_for_mode = self.X_test_for_purpose
mode_proba, replaced_proba = self._try_predict_proba_mode_replaced()

mode_pred = mode_proba.idxmax(axis=1)
replaced_pred = replaced_proba.idxmax(axis=1)

if (purpose_pred.dtype == np.float64 and mode_pred.dtype == np.float64
and replaced_pred.dtype == np.float64):
# this indicates that all the predictions are np.nan so none of the
# random forest classifiers were fitted
raise NotFittedError

proba_dfs = []
for label_type, proba in zip(
['purpose', 'mode', 'replaced'],
[purpose_proba, mode_proba, replaced_proba]):
proba['top_pred'] = proba.idxmax(axis=1)
proba['top_proba'] = proba.max(axis=1, skipna=True)
proba['clusterable'] = self._clusterable(
self.X_test_for_purpose).astype(bool)
proba = pd.concat([proba], keys=[label_type], axis=1)
proba_dfs += [proba]

self.proba_df = pd.concat(proba_dfs, axis=1)
return self.proba_df

def _try_predict_proba_mode_replaced(self):
""" Try to predict mode and replaced-mode. Handles error in case the
ensemble algorithms were not fitted.
Requires self.X_test_for_mode to have already been set. (These are
the DataFrames containing the test data to be passed into self.
mode_predictor.)
Returns: mode_proba and replaced_proba, two DataFrames containing
class probabilities for mode and replaced-mode respectively
"""

try:
# predict mode
mode_proba_raw = self.mode_predictor.predict_proba(
self.X_test_for_mode)
mode_proba = pd.DataFrame(mode_proba_raw,
columns=self.mode_predictor.classes_)
mode_pred = mode_proba.idxmax(axis=1)

# update X_test with one-hot-encoded mode predictions to aid
# replaced-mode predictor
onehot_mode_df = self.mode_enc.transform(
pd.DataFrame(mode_pred).set_index(self.X_test_for_mode.index))
self.X_test_for_replaced = pd.concat(
[self.X_test_for_mode, onehot_mode_df], axis=1)
replaced_proba = self._try_predict_proba_replaced()

except NotFittedError as e:
mode_proba_raw = np.full((len(self.X_test_for_mode), 1), 0)
mode_proba = pd.DataFrame(mode_proba_raw, columns=[np.nan])

# if we don't have mode predictions, we *could* still try to
# predict replaced mode (but if the user didn't input mode labels
# then it's unlikely they would input replaced-mode)
self.X_test_for_replaced = self.X_test_for_mode
replaced_proba = self._try_predict_proba_replaced()

return mode_proba, replaced_proba

def _get_X_test_for_purpose(self, test_df):
""" Do the pre-processing to get data that we can then pass into the
ensemble classifiers.
"""
if self.loc_feature == 'cluster':
# get clusters
self.end_cluster_model.predict(test_df)
clusters_to_encode = self.end_cluster_model.test_df[[
'end_cluster_idx'
]].copy() # copy is to avoid SettingWithCopyWarning

if self.use_start_clusters or self.use_trip_clusters:
self.start_cluster_model.predict(test_df)

if self.use_start_clusters:
clusters_to_encode = pd.concat([
clusters_to_encode,
self.start_cluster_model.test_df[['start_cluster_idx']]
],
axis=1)
if self.use_trip_clusters:
start_end_clusters = pd.concat([
self.end_cluster_model.test_df[['end_cluster_idx']],
self.start_cluster_model.test_df[['start_cluster_idx']]
],
axis=1)
trip_cluster_idx = self.trip_grouper.transform(
start_end_clusters)
clusters_to_encode.loc[:,
'trip_cluster_idx'] = trip_cluster_idx

# one-hot encode the cluster indices
loc_features_df = self.cluster_enc.transform(clusters_to_encode)
else: # self.loc_feature == 'coordinates'
test_df = self._clean_data(test_df)
loc_features_df = test_df[[
'start_lon', 'start_lat', 'end_lon', 'end_lat'
]]

# extract the desired data
X_test = pd.concat([
test_df[self.base_features].reset_index(drop=True),
loc_features_df.reset_index(drop=True)
],
axis=1)

return X_test


def _clusterable(self, test_df):
""" Check if the end points can be clustered (i.e. are within <radius>
meters of an end point from the training set)
"""
if self.loc_feature == 'cluster':
return self.end_cluster_model.test_df.end_cluster_idx >= 0

n_samples = test_df.shape[0]
clustered = np.ones(shape=n_samples, dtype=int) * False

train_coordinates = self.train_df[['end_lat', 'end_lon']]
train_radians = np.radians(train_coordinates)

for idx, row in test_df.reset_index(drop=True).iterrows():
# calculate the distances between the ith test data and all points,
# then find the minimum distance for each point and check if it's
# within the distance threshold.
# unfortunately, pairwise_distances_argmin() does not support
# haversine distance, so we have to reimplement it ourselves
new_loc_radians = np.radians(row[["end_lat", "end_lon"]].to_list())
new_loc_radians = np.reshape(new_loc_radians, (1, 2))
dist_matrix_meters = haversine_distances(
new_loc_radians, train_radians) * EARTH_RADIUS

shortest_dist = np.min(dist_matrix_meters)
if shortest_dist < self.radius:
clustered[idx] = True

return clustered

def _clean_data(self, df):
""" Clean a dataframe of trips.
(Drop trips with missing start/end locations, expand the user input
Expand Down Expand Up @@ -332,4 +556,10 @@ def expand_coords(exp_df, purpose=None):
dfs.append(df)

# display.display(end_loc_df.head())
return pd.concat(dfs, axis=1)
return pd.concat(dfs, axis=1)

def to_dict(self) -> Dict:
return joblib.dump(self,compress=3)

def from_dict(self, model: Dict):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class label to apply:
self.bins: Dict[str, Dict] = {}


def fit(self, trips: List[ecwc.Confirmedtrip],tripsdf=None):
def fit(self, trips: List[ecwc.Confirmedtrip]):
"""train the model by passing data, where each row in the data
corresponds to a label at the matching index of the label input
Expand Down
5 changes: 1 addition & 4 deletions emission/analysis/modelling/trip_model/run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ def update_trip_model(
logging.debug(f'model type {model_type.name} is incremental? {model.is_incremental}')
logging.debug(f'time query for training data collection: {time_query}')

ts = esta.TimeSeries.get_time_series(user_id)
trips = _get_training_data(user_id, time_query)

trips_df = ts.to_data_df("analysis/confirmed_trip",trips)
# don't start training for a user that doesn't have at least $trips many trips
# (assume if a stored model exists for the user, that they met this requirement previously)
if len(trips) == 0:
Expand All @@ -77,7 +74,7 @@ def update_trip_model(

# train and store the model. pass both List of event and dataframe time data
# that both standard( which mostly work on df) and self implemented models can use.
model.fit(trips,trips_df)
model.fit(trips)
model_data_next = model.to_dict()

if len(model_data_next) == 0:
Expand Down

0 comments on commit 3820d87

Please sign in to comment.