Skip to content

Commit

Permalink
Scalability fixes - Load model once per user (#944)
Browse files Browse the repository at this point in the history
* Scalability fixes - Load model once per user

Implementing code changes for improving scalability as per the requirements in this issue# 950 in e-mission-docs.

Initial approach involves utilizing Singleton design pattern concept on instance variables to check whether model has already been loaded before attempting to load the model.

Thus this should prevent model from being loaded if it is already present in the instance variable of the current class instance of BuiltinModelStorage.

* Missing imports and self keyword

Added import for logging.
Added self parameter to pass reference to current instance object.

* Refactoring label inference pipeline for scalability issues

Changes after refactoring label inference pipeline to load model only once. Will be merging with previous PR branch.

* Removed singleton approach

Starting from scratch to implement refactored label pipeline approach

* Changed results_list to result_dict

Using a dictionary of lists of predictions returned for each trip instead of a simple list.
This will help keep predictions of each trip separate but keep all the algorithms_wise predictions of each trip together in a list.

* Fixed function call to predict_labels_with_n in TestRunGreedyModel

Updated function call to predict_labels_with_n() as per latest changes to include trip_list and user_id

Fixed typo in eacilp for trip.get_id()

* Fixed function signature in TestRunGreedyModel

eamur.predict_labels_with_n() now takes a trip instead of a trip_list.
Passed a trip_list in previous commit but forgot to update parameter name.

* Updated TestRunGreedyModel + Debug Cleanup

Receive a prediction list of (prediction, n) for each trip in trip_list.

Then for each of the predictions in the prediction list, run the assertion tests.

Also, cleaned up some debug print statements from earlier commits / PRs.

* Cleanup print statements

Cleaning up previously used debug statements

* Added debug statements to track time taken for operations.

* Cleaned up some comments and debug statements.

* Fixed failing analysis pipeline tests involved in Label Inference

A couple of pipeline tests were failing - TestLableInferencePipeline, TestExpectationPipeline.
Failed since they were calling placeholder_predict_n functions in eacili inferrers.
Had to update these to now receive trip_list instead of single trip.

* Removed user_id parameter

Fetching from trip_list itself.
Added assertion to ensure identical and only one unique user_id present for all trips.
Assertion fails if multiple user_ids found for the trip_list.

* Removed user_id parameter from Tests

Removed from Test files and inferrers.py which contained placeholder_predictor functions.

* Moved model loading step out of predict_labels_with_n

Refactored code to pass in the trip model directly to predict_labels_with_n() in eamur.

Moved the load model step to eacil.inferrers by using load_model() of eamur.

Modified TestRunGreedyModel to use this refactored function.

* Passed user_id in TestRunGreedyModel.py

Failing test due to missed user_id parameter.
Passed user_id to rectify.

---------

Co-authored-by: Mahadik, Mukul Chandrakant <[email protected]>
  • Loading branch information
MukuFlash03 and Mahadik, Mukul Chandrakant authored Dec 2, 2023
1 parent 81c4314 commit 1a1b1e6
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 149 deletions.
227 changes: 132 additions & 95 deletions emission/analysis/classification/inference/labels/inferrers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import logging
import random
import copy
import time
import arrow

import emission.analysis.modelling.tour_model_first_only.load_predict as lp
import emission.analysis.modelling.trip_model.run_model as eamur
Expand All @@ -20,109 +22,128 @@
# the user walks to the location it is to shop (e.g., because they don't have a basket on
# their bike), and the user bikes to the location four times more than they walk there.
# Obviously, it is a simplification.
def placeholder_predictor_0(trip):
return [
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
]
def placeholder_predictor_0(trip_list):
predictions_list = []
for trip in trip_list:
predictions = [
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
]
predictions_list.append(predictions)
return predictions_list


# The next placeholder scenario provides that same set of labels in 75% of cases and no
# labels in the rest.
def placeholder_predictor_1(trip):
return [
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
] if random.random() > 0.25 else []
def placeholder_predictor_1(trip_list):
predictions_list = []
for trip in trip_list:
predictions = [
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
] if random.random() > 0.25 else []
predictions_list.append(predictions)
return predictions_list



# This third scenario provides labels designed to test the soundness and resilience of
# the client-side inference processing algorithms.
def placeholder_predictor_2(trip):
# Timestamp2index gives us a deterministic way to match test trips with labels
# Hardcoded to match "test_july_22" -- clearly, this is just for testing
timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0}
timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"]
index = timestamp2index[timestamp] if timestamp in timestamp2index else 0
return [
[

],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
],
[
{"labels": {"mode_confirm": "drove_alone"}, "p": 0.8},
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
]
][index]
def placeholder_predictor_2(trip_list):
predictions_list = []
for trip in trip_list:
# Timestamp2index gives us a deterministic way to match test trips with labels
# Hardcoded to match "test_july_22" -- clearly, this is just for testing
timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0}
timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"]
index = timestamp2index[timestamp] if timestamp in timestamp2index else 0
predictions = [
[

],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
],
[
{"labels": {"mode_confirm": "drove_alone"}, "p": 0.8},
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
]
][index]
predictions_list.append(predictions)
return predictions_list


# This fourth scenario provides labels designed to test the expectation and notification system.
def placeholder_predictor_3(trip):
timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0}
timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"]
index = timestamp2index[timestamp] if timestamp in timestamp2index else 0
return [
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20}
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20}
],
[
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "entertainment"}, "p": 0.70},
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.96},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.04}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.60},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.25},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.11},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.04}
]
][index]
def placeholder_predictor_3(trip_list):
predictions_list = []
for trip in trip_list:
timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0}
timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"]
index = timestamp2index[timestamp] if timestamp in timestamp2index else 0
predictions = [
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20}
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20}
],
[
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "entertainment"}, "p": 0.70},
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.96},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.04}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.60},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.25},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.11},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.04}
]
][index]
predictions_list.append(predictions)
return predictions_list

# Placeholder that is suitable for a demo.
# Finds all unique label combinations for this user and picks one randomly
def placeholder_predictor_demo(trip):
def placeholder_predictor_demo(trip_list):
import random

import emission.core.get_database as edb
user = trip["user_id"]
unique_user_inputs = edb.get_analysis_timeseries_db().find({"user_id": user}).distinct("data.user_input")

unique_user_inputs = edb.get_analysis_timeseries_db().find({"user_id": user_id}).distinct("data.user_input")
predictions_list = []
if len(unique_user_inputs) == 0:
return []
predictions_list.append([])
return predictions_list
random_user_input = random.choice(unique_user_inputs) if random.randrange(0,10) > 0 else []

logging.debug(f"In placeholder_predictor_demo: found {len(unique_user_inputs)} for user {user}, returning value {random_user_input}")
return [{"labels": random_user_input, "p": random.random()}]
logging.debug(f"In placeholder_predictor_demo: found {len(unique_user_inputs)} for user {user_id}, returning value {random_user_input}")
predictions_list.append([{"labels": random_user_input, "p": random.random()}])
return predictions_list

# Non-placeholder implementation. First bins the trips, and then clusters every bin
# See emission.analysis.modelling.tour_model for more details
Expand All @@ -141,18 +162,34 @@ def n_to_confidence_coeff(n, max_confidence=None, first_confidence=None, confide
return max_confidence-(max_confidence-first_confidence)*(1-confidence_multiplier)**(n-1) # This is the u = ... formula in the issue

# predict_two_stage_bin_cluster but with the above reduction in confidence
def predict_cluster_confidence_discounting(trip, max_confidence=None, first_confidence=None, confidence_multiplier=None):
def predict_cluster_confidence_discounting(trip_list, max_confidence=None, first_confidence=None, confidence_multiplier=None):
# load application config
model_type = eamtc.get_model_type()
model_storage = eamtc.get_model_storage()
labels, n = eamur.predict_labels_with_n(trip, model_type, model_storage)
if n <= 0: # No model data or trip didn't match a cluster
logging.debug(f"In predict_cluster_confidence_discounting: n={n}; returning as-is")
return labels

confidence_coeff = n_to_confidence_coeff(n, max_confidence, first_confidence, confidence_multiplier)
logging.debug(f"In predict_cluster_confidence_discounting: n={n}; discounting with coefficient {confidence_coeff}")

labels = copy.deepcopy(labels)
for l in labels: l["p"] *= confidence_coeff
return labels
# assert and fetch unique user id for trip_list
user_id_list = []
for trip in trip_list:
user_id_list.append(trip['user_id'])
assert user_id_list.count(user_id_list[0]) == len(user_id_list), "Multiple user_ids found for trip_list, expected unique user_id for all trips"
# Assertion successful, use unique user_id
user_id = user_id_list[0]

# load model
start_model_load_time = time.process_time()
model = eamur._load_stored_trip_model(user_id, model_type, model_storage)
print(f"{arrow.now()} Inside predict_labels_n: Model load time = {time.process_time() - start_model_load_time}")

labels_n_list = eamur.predict_labels_with_n(trip_list, model)
predictions_list = []
for labels, n in labels_n_list:
if n <= 0: # No model data or trip didn't match a cluster
logging.debug(f"In predict_cluster_confidence_discounting: n={n}; returning as-is")
predictions_list.append(labels)
continue
confidence_coeff = n_to_confidence_coeff(n, max_confidence, first_confidence, confidence_multiplier)
logging.debug(f"In predict_cluster_confidence_discounting: n={n}; discounting with coefficient {confidence_coeff}")
labels = copy.deepcopy(labels)
for l in labels: l["p"] *= confidence_coeff
predictions_list.append(labels)
return predictions_list
72 changes: 47 additions & 25 deletions emission/analysis/classification/inference/labels/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import logging
import random
import copy
import time
import arrow

# Our imports
import emission.storage.pipeline_queries as epq
Expand Down Expand Up @@ -55,48 +57,68 @@ def run_prediction_pipeline(self, user_id, time_range):
self.ts = esta.TimeSeries.get_time_series(user_id)
self.toPredictTrips = esda.get_entries(
esda.CLEANED_TRIP_KEY, user_id, time_query=time_range)
for cleaned_trip in self.toPredictTrips:
# Create an inferred trip

cleaned_trip_list = self.toPredictTrips
inferred_trip_list = []
results_dict = {}
ensemble_list = []

# Create list of inferred trips
for cleaned_trip in cleaned_trip_list:
cleaned_trip_dict = copy.copy(cleaned_trip)["data"]
inferred_trip = ecwe.Entry.create_entry(user_id, "analysis/inferred_trip", cleaned_trip_dict)

inferred_trip_list.append(inferred_trip)

if inferred_trip_list:
# Computing outside loop by passing trip_list to ensure model loads once
# Run the algorithms and the ensemble, store results
results = self.compute_and_save_algorithms(inferred_trip)
ensemble = self.compute_and_save_ensemble(inferred_trip, results)
results_dict = self.compute_and_save_algorithms(inferred_trip_list)
ensemble_list = self.compute_and_save_ensemble(inferred_trip_list, results_dict)

start_insert_inferred_trip_time = time.process_time()
for cleaned_trip, inferred_trip, ensemble in zip(cleaned_trip_list, inferred_trip_list, ensemble_list):
# Put final results into the inferred trip and store it
inferred_trip["data"]["cleaned_trip"] = cleaned_trip.get_id()
inferred_trip["data"]["inferred_labels"] = ensemble["prediction"]
self.ts.insert(inferred_trip)

if self._last_trip_done is None or self._last_trip_done["data"]["end_ts"] < cleaned_trip["data"]["end_ts"]:
self._last_trip_done = cleaned_trip
# print(f"{arrow.now()} Inside run_prediction_pipeline: Saving inferred_trip total time = {time.process_time() - start_insert_inferred_trip_time}")

# This is where the labels for a given trip are actually predicted.
# Though the only information passed in is the trip object, the trip object can provide the
# user_id and other potentially useful information.
def compute_and_save_algorithms(self, trip):
predictions = []
def compute_and_save_algorithms(self, trip_list):
predictions_dict = {trip.get_id(): [] for trip in trip_list}
for algorithm_id, algorithm_fn in primary_algorithms.items():
prediction = algorithm_fn(trip)
lp = ecwl.Labelprediction()
lp.trip_id = trip.get_id()
lp.algorithm_id = algorithm_id
lp.prediction = prediction
lp.start_ts = trip["data"]["start_ts"]
lp.end_ts = trip["data"]["end_ts"]
self.ts.insert_data(self.user_id, "inference/labels", lp)
predictions.append(lp)
return predictions
prediction_list = algorithm_fn(trip_list)
start_insert_inference_labels_time = time.process_time()
for trip, prediction in zip(trip_list, prediction_list):
lp = ecwl.Labelprediction()
lp.algorithm_id = algorithm_id
lp.trip_id = trip.get_id()
lp.prediction = prediction
lp.start_ts = trip["data"]["start_ts"]
lp.end_ts = trip["data"]["end_ts"]
self.ts.insert_data(self.user_id, "inference/labels", lp)
predictions_dict[trip.get_id()].append(lp)
# print(f"{arrow.now()} Inside compute_and_save_algorithms: Saving inference/labels total time = {time.process_time() - start_insert_inference_labels_time}")
return predictions_dict

# Combine all our predictions into a single ensemble prediction.
# As a placeholder, we just take the first prediction.
# TODO: implement a real combination algorithm.
def compute_and_save_ensemble(self, trip, predictions):
il = ecwl.Labelprediction()
il.trip_id = trip.get_id()
il.start_ts = trip["data"]["start_ts"]
il.end_ts = trip["data"]["end_ts"]
(il.algorithm_id, il.prediction) = ensemble(trip, predictions)
self.ts.insert_data(self.user_id, "analysis/inferred_labels", il)
return il
def compute_and_save_ensemble(self, trip_list, predictions_dict):
start_insert_inferred_labels_time = time.process_time()
il_list = []
for trip, key in zip(trip_list, predictions_dict):
il = ecwl.Labelprediction()
il.trip_id = trip.get_id()
il.start_ts = trip["data"]["start_ts"]
il.end_ts = trip["data"]["end_ts"]
(il.algorithm_id, il.prediction) = ensemble(trip, predictions_dict[key])
self.ts.insert_data(self.user_id, "analysis/inferred_labels", il)
il_list.append(il)
# print(f"{arrow.now()} Inside compute_and_save_ensemble: Saving inferred_labels total time = {time.process_time() - start_insert_inferred_labels_time}")
return il_list
Loading

0 comments on commit 1a1b1e6

Please sign in to comment.