From 1a1b1e63e8d6b1e1a578c6449cc2dc919de126e5 Mon Sep 17 00:00:00 2001 From: Mukul Chandrakant Mahadik Date: Sat, 2 Dec 2023 00:17:04 -0700 Subject: [PATCH] Scalability fixes - Load model once per user (#944) * Scalability fixes - Load model once per user Implementing code changes for improving scalability as per the requirements in this issue# 950 in e-mission-docs. Initial approach involves utilizing Singleton design pattern concept on instance variables to check whether model has already been loaded before attempting to load the model. Thus this should prevent model from being loaded if it is already present in the instance variable of the current class instance of BuiltinModelStorage. * Missing imports and self keyword Added import for logging. Added self parameter to pass reference to current instance object. * Refactoring label inference pipeline for scalability issues Changes after refactoring label inference pipeline to load model only once. Will be merging with previous PR branch. * Removed singleton approach Starting from scratch to implement refactored label pipeline approach * Changed results_list to result_dict Using a dictionary of lists of predictions returned for each trip instead of a simple list. This will help keep predictions of each trip separate but keep all the algorithms_wise predictions of each trip together in a list. * Fixed function call to predict_labels_with_n in TestRunGreedyModel Updated function call to predict_labels_with_n() as per latest changes to include trip_list and user_id Fixed typo in eacilp for trip.get_id() * Fixed function signature in TestRunGreedyModel eamur.predict_labels_with_n() now takes a trip instead of a trip_list. Passed a trip_list in previous commit but forgot to update parameter name. * Updated TestRunGreedyModel + Debug Cleanup Receive a prediction list of (prediction, n) for each trip in trip_list. Then for each of the predictions in the prediction list, run the assertion tests. Also, cleaned up some debug print statements from earlier commits / PRs. * Cleanup print statements Cleaning up previously used debug statements * Added debug statements to track time taken for operations. * Cleaned up some comments and debug statements. * Fixed failing analysis pipeline tests involved in Label Inference A couple of pipeline tests were failing - TestLableInferencePipeline, TestExpectationPipeline. Failed since they were calling placeholder_predict_n functions in eacili inferrers. Had to update these to now receive trip_list instead of single trip. * Removed user_id parameter Fetching from trip_list itself. Added assertion to ensure identical and only one unique user_id present for all trips. Assertion fails if multiple user_ids found for the trip_list. * Removed user_id parameter from Tests Removed from Test files and inferrers.py which contained placeholder_predictor functions. * Moved model loading step out of predict_labels_with_n Refactored code to pass in the trip model directly to predict_labels_with_n() in eamur. Moved the load model step to eacil.inferrers by using load_model() of eamur. Modified TestRunGreedyModel to use this refactored function. * Passed user_id in TestRunGreedyModel.py Failing test due to missed user_id parameter. Passed user_id to rectify. --------- Co-authored-by: Mahadik, Mukul Chandrakant --- .../inference/labels/inferrers.py | 227 ++++++++++-------- .../inference/labels/pipeline.py | 72 ++++-- .../modelling/trip_model/run_model.py | 35 +-- .../storage/timeseries/builtin_timeseries.py | 2 - .../modellingTests/TestRunGreedyModel.py | 17 +- .../TestLabelInferencePipeline.py | 2 +- .../tests/storageTests/TestSectionQueries.py | 1 - emission/tests/storageTests/TestTimeSeries.py | 3 - 8 files changed, 210 insertions(+), 149 deletions(-) diff --git a/emission/analysis/classification/inference/labels/inferrers.py b/emission/analysis/classification/inference/labels/inferrers.py index c6b939671..04a59cdd9 100644 --- a/emission/analysis/classification/inference/labels/inferrers.py +++ b/emission/analysis/classification/inference/labels/inferrers.py @@ -4,6 +4,8 @@ import logging import random import copy +import time +import arrow import emission.analysis.modelling.tour_model_first_only.load_predict as lp import emission.analysis.modelling.trip_model.run_model as eamur @@ -20,109 +22,128 @@ # the user walks to the location it is to shop (e.g., because they don't have a basket on # their bike), and the user bikes to the location four times more than they walk there. # Obviously, it is a simplification. -def placeholder_predictor_0(trip): - return [ - {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} - ] +def placeholder_predictor_0(trip_list): + predictions_list = [] + for trip in trip_list: + predictions = [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} + ] + predictions_list.append(predictions) + return predictions_list # The next placeholder scenario provides that same set of labels in 75% of cases and no # labels in the rest. -def placeholder_predictor_1(trip): - return [ - {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} - ] if random.random() > 0.25 else [] +def placeholder_predictor_1(trip_list): + predictions_list = [] + for trip in trip_list: + predictions = [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} + ] if random.random() > 0.25 else [] + predictions_list.append(predictions) + return predictions_list + # This third scenario provides labels designed to test the soundness and resilience of # the client-side inference processing algorithms. -def placeholder_predictor_2(trip): - # Timestamp2index gives us a deterministic way to match test trips with labels - # Hardcoded to match "test_july_22" -- clearly, this is just for testing - timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0} - timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"] - index = timestamp2index[timestamp] if timestamp in timestamp2index else 0 - return [ - [ - - ], - [ - {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} - ], - [ - {"labels": {"mode_confirm": "drove_alone"}, "p": 0.8}, - ], - [ - {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} - ], - [ - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35}, - {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15}, - {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05} - ], - [ - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35}, - {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15}, - {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05} - ] - ][index] +def placeholder_predictor_2(trip_list): + predictions_list = [] + for trip in trip_list: + # Timestamp2index gives us a deterministic way to match test trips with labels + # Hardcoded to match "test_july_22" -- clearly, this is just for testing + timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0} + timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"] + index = timestamp2index[timestamp] if timestamp in timestamp2index else 0 + predictions = [ + [ + + ], + [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} + ], + [ + {"labels": {"mode_confirm": "drove_alone"}, "p": 0.8}, + ], + [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} + ], + [ + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35}, + {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15}, + {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05} + ], + [ + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35}, + {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15}, + {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05} + ] + ][index] + predictions_list.append(predictions) + return predictions_list # This fourth scenario provides labels designed to test the expectation and notification system. -def placeholder_predictor_3(trip): - timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0} - timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"] - index = timestamp2index[timestamp] if timestamp in timestamp2index else 0 - return [ - [ - {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20} - ], - [ - {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20} - ], - [ - {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "entertainment"}, "p": 0.70}, - ], - [ - {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.96}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.04} - ], - [ - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35}, - {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15}, - {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05} - ], - [ - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.60}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.25}, - {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.11}, - {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.04} - ] - ][index] +def placeholder_predictor_3(trip_list): + predictions_list = [] + for trip in trip_list: + timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0} + timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"] + index = timestamp2index[timestamp] if timestamp in timestamp2index else 0 + predictions = [ + [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20} + ], + [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20} + ], + [ + {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "entertainment"}, "p": 0.70}, + ], + [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.96}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.04} + ], + [ + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35}, + {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15}, + {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05} + ], + [ + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.60}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.25}, + {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.11}, + {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.04} + ] + ][index] + predictions_list.append(predictions) + return predictions_list # Placeholder that is suitable for a demo. # Finds all unique label combinations for this user and picks one randomly -def placeholder_predictor_demo(trip): +def placeholder_predictor_demo(trip_list): import random - import emission.core.get_database as edb - user = trip["user_id"] - unique_user_inputs = edb.get_analysis_timeseries_db().find({"user_id": user}).distinct("data.user_input") + + unique_user_inputs = edb.get_analysis_timeseries_db().find({"user_id": user_id}).distinct("data.user_input") + predictions_list = [] if len(unique_user_inputs) == 0: - return [] + predictions_list.append([]) + return predictions_list random_user_input = random.choice(unique_user_inputs) if random.randrange(0,10) > 0 else [] - logging.debug(f"In placeholder_predictor_demo: found {len(unique_user_inputs)} for user {user}, returning value {random_user_input}") - return [{"labels": random_user_input, "p": random.random()}] + logging.debug(f"In placeholder_predictor_demo: found {len(unique_user_inputs)} for user {user_id}, returning value {random_user_input}") + predictions_list.append([{"labels": random_user_input, "p": random.random()}]) + return predictions_list # Non-placeholder implementation. First bins the trips, and then clusters every bin # See emission.analysis.modelling.tour_model for more details @@ -141,18 +162,34 @@ def n_to_confidence_coeff(n, max_confidence=None, first_confidence=None, confide return max_confidence-(max_confidence-first_confidence)*(1-confidence_multiplier)**(n-1) # This is the u = ... formula in the issue # predict_two_stage_bin_cluster but with the above reduction in confidence -def predict_cluster_confidence_discounting(trip, max_confidence=None, first_confidence=None, confidence_multiplier=None): +def predict_cluster_confidence_discounting(trip_list, max_confidence=None, first_confidence=None, confidence_multiplier=None): # load application config model_type = eamtc.get_model_type() model_storage = eamtc.get_model_storage() - labels, n = eamur.predict_labels_with_n(trip, model_type, model_storage) - if n <= 0: # No model data or trip didn't match a cluster - logging.debug(f"In predict_cluster_confidence_discounting: n={n}; returning as-is") - return labels - - confidence_coeff = n_to_confidence_coeff(n, max_confidence, first_confidence, confidence_multiplier) - logging.debug(f"In predict_cluster_confidence_discounting: n={n}; discounting with coefficient {confidence_coeff}") - labels = copy.deepcopy(labels) - for l in labels: l["p"] *= confidence_coeff - return labels + # assert and fetch unique user id for trip_list + user_id_list = [] + for trip in trip_list: + user_id_list.append(trip['user_id']) + assert user_id_list.count(user_id_list[0]) == len(user_id_list), "Multiple user_ids found for trip_list, expected unique user_id for all trips" + # Assertion successful, use unique user_id + user_id = user_id_list[0] + + # load model + start_model_load_time = time.process_time() + model = eamur._load_stored_trip_model(user_id, model_type, model_storage) + print(f"{arrow.now()} Inside predict_labels_n: Model load time = {time.process_time() - start_model_load_time}") + + labels_n_list = eamur.predict_labels_with_n(trip_list, model) + predictions_list = [] + for labels, n in labels_n_list: + if n <= 0: # No model data or trip didn't match a cluster + logging.debug(f"In predict_cluster_confidence_discounting: n={n}; returning as-is") + predictions_list.append(labels) + continue + confidence_coeff = n_to_confidence_coeff(n, max_confidence, first_confidence, confidence_multiplier) + logging.debug(f"In predict_cluster_confidence_discounting: n={n}; discounting with coefficient {confidence_coeff}") + labels = copy.deepcopy(labels) + for l in labels: l["p"] *= confidence_coeff + predictions_list.append(labels) + return predictions_list diff --git a/emission/analysis/classification/inference/labels/pipeline.py b/emission/analysis/classification/inference/labels/pipeline.py index e7bda7b2c..fddd2fb8c 100644 --- a/emission/analysis/classification/inference/labels/pipeline.py +++ b/emission/analysis/classification/inference/labels/pipeline.py @@ -2,6 +2,8 @@ import logging import random import copy +import time +import arrow # Our imports import emission.storage.pipeline_queries as epq @@ -55,15 +57,26 @@ def run_prediction_pipeline(self, user_id, time_range): self.ts = esta.TimeSeries.get_time_series(user_id) self.toPredictTrips = esda.get_entries( esda.CLEANED_TRIP_KEY, user_id, time_query=time_range) - for cleaned_trip in self.toPredictTrips: - # Create an inferred trip + + cleaned_trip_list = self.toPredictTrips + inferred_trip_list = [] + results_dict = {} + ensemble_list = [] + + # Create list of inferred trips + for cleaned_trip in cleaned_trip_list: cleaned_trip_dict = copy.copy(cleaned_trip)["data"] inferred_trip = ecwe.Entry.create_entry(user_id, "analysis/inferred_trip", cleaned_trip_dict) - + inferred_trip_list.append(inferred_trip) + + if inferred_trip_list: + # Computing outside loop by passing trip_list to ensure model loads once # Run the algorithms and the ensemble, store results - results = self.compute_and_save_algorithms(inferred_trip) - ensemble = self.compute_and_save_ensemble(inferred_trip, results) + results_dict = self.compute_and_save_algorithms(inferred_trip_list) + ensemble_list = self.compute_and_save_ensemble(inferred_trip_list, results_dict) + start_insert_inferred_trip_time = time.process_time() + for cleaned_trip, inferred_trip, ensemble in zip(cleaned_trip_list, inferred_trip_list, ensemble_list): # Put final results into the inferred trip and store it inferred_trip["data"]["cleaned_trip"] = cleaned_trip.get_id() inferred_trip["data"]["inferred_labels"] = ensemble["prediction"] @@ -71,32 +84,41 @@ def run_prediction_pipeline(self, user_id, time_range): if self._last_trip_done is None or self._last_trip_done["data"]["end_ts"] < cleaned_trip["data"]["end_ts"]: self._last_trip_done = cleaned_trip + # print(f"{arrow.now()} Inside run_prediction_pipeline: Saving inferred_trip total time = {time.process_time() - start_insert_inferred_trip_time}") # This is where the labels for a given trip are actually predicted. # Though the only information passed in is the trip object, the trip object can provide the # user_id and other potentially useful information. - def compute_and_save_algorithms(self, trip): - predictions = [] + def compute_and_save_algorithms(self, trip_list): + predictions_dict = {trip.get_id(): [] for trip in trip_list} for algorithm_id, algorithm_fn in primary_algorithms.items(): - prediction = algorithm_fn(trip) - lp = ecwl.Labelprediction() - lp.trip_id = trip.get_id() - lp.algorithm_id = algorithm_id - lp.prediction = prediction - lp.start_ts = trip["data"]["start_ts"] - lp.end_ts = trip["data"]["end_ts"] - self.ts.insert_data(self.user_id, "inference/labels", lp) - predictions.append(lp) - return predictions + prediction_list = algorithm_fn(trip_list) + start_insert_inference_labels_time = time.process_time() + for trip, prediction in zip(trip_list, prediction_list): + lp = ecwl.Labelprediction() + lp.algorithm_id = algorithm_id + lp.trip_id = trip.get_id() + lp.prediction = prediction + lp.start_ts = trip["data"]["start_ts"] + lp.end_ts = trip["data"]["end_ts"] + self.ts.insert_data(self.user_id, "inference/labels", lp) + predictions_dict[trip.get_id()].append(lp) + # print(f"{arrow.now()} Inside compute_and_save_algorithms: Saving inference/labels total time = {time.process_time() - start_insert_inference_labels_time}") + return predictions_dict # Combine all our predictions into a single ensemble prediction. # As a placeholder, we just take the first prediction. # TODO: implement a real combination algorithm. - def compute_and_save_ensemble(self, trip, predictions): - il = ecwl.Labelprediction() - il.trip_id = trip.get_id() - il.start_ts = trip["data"]["start_ts"] - il.end_ts = trip["data"]["end_ts"] - (il.algorithm_id, il.prediction) = ensemble(trip, predictions) - self.ts.insert_data(self.user_id, "analysis/inferred_labels", il) - return il + def compute_and_save_ensemble(self, trip_list, predictions_dict): + start_insert_inferred_labels_time = time.process_time() + il_list = [] + for trip, key in zip(trip_list, predictions_dict): + il = ecwl.Labelprediction() + il.trip_id = trip.get_id() + il.start_ts = trip["data"]["start_ts"] + il.end_ts = trip["data"]["end_ts"] + (il.algorithm_id, il.prediction) = ensemble(trip, predictions_dict[key]) + self.ts.insert_data(self.user_id, "analysis/inferred_labels", il) + il_list.append(il) + # print(f"{arrow.now()} Inside compute_and_save_ensemble: Saving inferred_labels total time = {time.process_time() - start_insert_inferred_labels_time}") + return il_list diff --git a/emission/analysis/modelling/trip_model/run_model.py b/emission/analysis/modelling/trip_model/run_model.py index e3e2b1c4e..7356aa597 100644 --- a/emission/analysis/modelling/trip_model/run_model.py +++ b/emission/analysis/modelling/trip_model/run_model.py @@ -1,8 +1,9 @@ import logging from typing import List, Optional from uuid import UUID - import time +import arrow + import emission.storage.timeseries.timequery as estt import emission.analysis.modelling.trip_model.model_storage as eamums import emission.analysis.modelling.trip_model.model_type as eamumt @@ -97,26 +98,28 @@ def update_trip_model( def predict_labels_with_n( - trip: ecwc.Confirmedtrip, - model_type = eamumt.ModelType.GREEDY_SIMILARITY_BINNING, - model_storage = eamums.ModelStorage.DOCUMENT_DATABASE, - model_config = None): + trip_list: List[ecwc.Confirmedtrip], + model: eamuu.TripModel): """ invoke the user label prediction model to predict labels for a trip. - :param trip: the trip to predict labels for - :param model_type: type of prediction model to run - :param model_storage: location to read/write models - :param model_config: optional configuration for model, for debugging purposes + :param trip_list: the list of trips to predict labels for + :param model: trip model used for predictions :return: a list of predictions """ - user_id = trip['user_id'] - model = _load_stored_trip_model(user_id, model_type, model_storage, model_config) - if model is None: - return [], -1 - else: - predictions, n = model.predict(trip) - return predictions, n + + predictions_list = [] + print(f"{arrow.now()} Inside predict_labels_n: Predicting...") + start_predict_time = time.process_time() + for trip in trip_list: + if model is None: + predictions_list.append(([], -1)) + continue + else: + predictions, n = model.predict(trip) + predictions_list.append((predictions, n)) + print(f"{arrow.now()} Inside predict_labels_n: Predictions complete for trip_list in time = {time.process_time() - start_predict_time}") + return predictions_list def _get_training_data(user_id: UUID, time_query: Optional[estt.TimeQuery]): diff --git a/emission/storage/timeseries/builtin_timeseries.py b/emission/storage/timeseries/builtin_timeseries.py index 0a09ee3ab..8204b4e29 100644 --- a/emission/storage/timeseries/builtin_timeseries.py +++ b/emission/storage/timeseries/builtin_timeseries.py @@ -461,8 +461,6 @@ def find_entries_count(self, key_list = None, time_query = None, geo_query = Non For key_list = None or empty, total count of all documents are returned considering the matching entries from entire dataset. """ - print("builtin_timeseries.find_entries_count() called") - orig_tsdb = self.timeseries_db analysis_tsdb = self.analysis_timeseries_db diff --git a/emission/tests/modellingTests/TestRunGreedyModel.py b/emission/tests/modellingTests/TestRunGreedyModel.py index 9e4431fa3..d00ae282e 100644 --- a/emission/tests/modellingTests/TestRunGreedyModel.py +++ b/emission/tests/modellingTests/TestRunGreedyModel.py @@ -163,14 +163,19 @@ def test1RoundTripGreedySimilarityBinning(self): origin=self.origin, destination=self.destination ) - prediction, n = eamur.predict_labels_with_n( - trip = test, + + model = eamur._load_stored_trip_model( + user_id=self.user_id, model_type=eamumt.ModelType.GREEDY_SIMILARITY_BINNING, model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, model_config=greedy_model_config ) + + predictions_list = eamur.predict_labels_with_n( + trip_list = [test], + model = model + ) - [logging.debug(p) for p in sorted(prediction, key=lambda r: r['p'], reverse=True)] - - self.assertNotEqual(len(prediction), 0, "should have a prediction") - + for prediction, n in predictions_list: + [logging.debug(p) for p in sorted(prediction, key=lambda r: r['p'], reverse=True)] + self.assertNotEqual(len(prediction), 0, "should have a prediction") diff --git a/emission/tests/pipelineTests/TestLabelInferencePipeline.py b/emission/tests/pipelineTests/TestLabelInferencePipeline.py index 97ad6982b..a661750fc 100644 --- a/emission/tests/pipelineTests/TestLabelInferencePipeline.py +++ b/emission/tests/pipelineTests/TestLabelInferencePipeline.py @@ -63,7 +63,7 @@ def testIndividualAlgorithms(self): self.assertEqual(entry["data"]["trip_id"], trip.get_id()) this_algorithm = ecwl.AlgorithmTypes(entry["data"]["algorithm_id"]) self.assertIn(this_algorithm, self.test_algorithms) - self.assertEqual(entry["data"]["prediction"], self.test_algorithms[this_algorithm](trip)) + self.assertEqual(entry["data"]["prediction"], self.test_algorithms[this_algorithm]([trip])[0]) self.assertEqual(entry["data"]["start_ts"], trip["data"]["start_ts"]) self.assertEqual(entry["data"]["end_ts"], trip["data"]["end_ts"]) diff --git a/emission/tests/storageTests/TestSectionQueries.py b/emission/tests/storageTests/TestSectionQueries.py index 0678b2e8e..01c6804c0 100644 --- a/emission/tests/storageTests/TestSectionQueries.py +++ b/emission/tests/storageTests/TestSectionQueries.py @@ -100,7 +100,6 @@ def testCleaned2InferredSectionList(self): # Total = 25 = 10 (UUID1) + 15 (UUID2) curr_predicted_entries = esds.cleaned2inferred_section_list(section_user_list) self.assertEqual(len(curr_predicted_entries), len(sections_entries)) - print(curr_predicted_entries) # Testcase 2: Null user_id value is passed curr_predicted_entries = esds.cleaned2inferred_section_list([{'section' : section_id, 'user_id' : ''}]) diff --git a/emission/tests/storageTests/TestTimeSeries.py b/emission/tests/storageTests/TestTimeSeries.py index 7b6751cf2..f12e27f01 100644 --- a/emission/tests/storageTests/TestTimeSeries.py +++ b/emission/tests/storageTests/TestTimeSeries.py @@ -184,9 +184,6 @@ def testFindEntriesCount(self): count_ts9 = ts_new_user.find_entries_count(key_list=key_list1) self.assertEqual(count_ts9, 0) - print("Assert Test for Count Data successful!") - - if __name__ == '__main__': import emission.tests.common as etc etc.configLogging()