Skip to content

Commit

Permalink
Merge pull request #937 from MukuFlash03/master
Browse files Browse the repository at this point in the history
Added initial interface for fetching inferred section modes
  • Loading branch information
shankari authored Sep 29, 2023
2 parents f6bf89a + c252e1b commit 02f966f
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 15 deletions.
10 changes: 10 additions & 0 deletions emission/storage/decorations/section_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ def cleaned2inferred_section(user_id, section_id):
curr_predicted_entry = _get_inference_entry_for_section(user_id, section_id, "analysis/inferred_section", "data.cleaned_section")
return curr_predicted_entry

def cleaned2inferred_section_list(section_user_list):
curr_predicted_entries = {}
for section_userid in section_user_list:
matching_inferred_section = cleaned2inferred_section(section_userid.get('user_id'), section_userid.get('section'))
if matching_inferred_section is None:
curr_predicted_entries[str(section_userid.get('section'))] = ecwm.PredictedModeTypes.UNKNOWN
else:
curr_predicted_entries[str(section_userid.get('section'))] = matching_inferred_section.data.sensed_mode # PredictedModeTypes
return curr_predicted_entries

def _get_inference_entry_for_section(user_id, section_id, entry_key, section_id_key):
prediction_key_query = {"metadata.key": entry_key}
inference_query = {"user_id": user_id, section_id_key: section_id}
Expand Down
68 changes: 67 additions & 1 deletion emission/tests/storageTests/TestSectionQueries.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging
import uuid
import json
import bson.objectid as boi

# Our imports
import emission.storage.decorations.section_queries as esds
Expand All @@ -20,6 +21,10 @@

import emission.core.wrapper.section as ecws
import emission.core.get_database as edb
import emission.core.wrapper.entry as ecwe
import emission.tests.common as etc
import emission.pipeline.intake_stage as epi
import emission.core.wrapper.modeprediction as ecwm

# Our testing imports
import emission.tests.storageTests.analysis_ts_common as etsa
Expand All @@ -30,12 +35,29 @@ def setUp(self):
edb.get_analysis_timeseries_db().delete_many({'user_id': self.testUserId})
self.test_trip_id = "test_trip_id"

self.testEmail = "user1"
etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-aug-21")
self.testUUID1 = self.testUUID

self.testEmail = "user2"
etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-aug-27")
self.testUUID2 = self.testUUID

def tearDown(self):
self.clearRelatedDb()

def clearRelatedDb(self):
edb.get_timeseries_db().delete_many({"user_id": self.testUserId})
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUserId})
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUserId})

edb.get_timeseries_db().delete_many({"user_id": self.testUUID1})
edb.get_timeseries_db().delete_many({"user_id": self.testUUID2})
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUUID1})
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUUID2})
edb.get_pipeline_state_db().delete_many({"user_id": self.testUUID1})
edb.get_pipeline_state_db().delete_many({"user_id": self.testUUID2})
edb.get_uuid_db().delete_one({"user_email": "user1"})
edb.get_uuid_db().delete_one({"user_email": "user2"})

def testQuerySections(self):
new_section = ecws.Section()
Expand All @@ -54,6 +76,50 @@ def testQuerySections(self):
estt.TimeQuery("data.start_ts", 4, 6))
self.assertEqual([entry.data for entry in ret_arr_list], ret_arr_time)

def testCleaned2InferredSectionList(self):

# Running the pipeline for the two user datasets
epi.run_intake_pipeline_for_user(self.testUUID1, skip_if_no_new_data = False)
epi.run_intake_pipeline_for_user(self.testUUID2, skip_if_no_new_data = False)

# Fetching the timeseries entries containing both raw data and analysis data after running intake pipeline
ts_agg = esta.TimeSeries.get_aggregate_time_series()

# Preparing section_user_list of sections and user_ids dictionary to be passed as function parameter
doc_cursor = ts_agg.find_entries([esda.CLEANED_SECTION_KEY])
sections_entries = [ecwe.Entry(doc) for doc in doc_cursor]
section_user_list = []
for i, section in enumerate(sections_entries):
section_id = section.get_id()
user_section_id = section['user_id']
section_dict = {'section' : section_id, 'user_id' : user_section_id}
section_user_list.append(section_dict)

# Testcase 1: Aggregate timeseries entries with list of sections-user dictionary for multiple users
# Number of predicted_entries based on the inferred sections should match the number of cleaned sections
# Total = 25 = 10 (UUID1) + 15 (UUID2)
curr_predicted_entries = esds.cleaned2inferred_section_list(section_user_list)
self.assertEqual(len(curr_predicted_entries), len(sections_entries))
print(curr_predicted_entries)

# Testcase 2: Null user_id value is passed
curr_predicted_entries = esds.cleaned2inferred_section_list([{'section' : section_id, 'user_id' : ''}])
self.assertEqual(curr_predicted_entries, {str(section_id): ecwm.PredictedModeTypes.UNKNOWN})

# Testcase 3: Null section_id value is passed
curr_predicted_entries = esds.cleaned2inferred_section_list([{'section' : '', 'user_id' : user_section_id}])
self.assertEqual(curr_predicted_entries, {'': ecwm.PredictedModeTypes.UNKNOWN})

# Testcase 4: Empty dictionary is passed
# Python assigns 'None' as the default key value for empty dict {}
curr_predicted_entries = esds.cleaned2inferred_section_list([{}])
self.assertEqual(curr_predicted_entries, {'None': ecwm.PredictedModeTypes.UNKNOWN})

# Testcase 5: Empty list is passed
curr_predicted_entries = esds.cleaned2inferred_section_list([])
self.assertEqual(curr_predicted_entries, {})


if __name__ == '__main__':
import emission.tests.common as etc
etc.configLogging()
Expand Down
24 changes: 10 additions & 14 deletions emission/tests/storageTests/TestTimeSeries.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ def setUp(self):
etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-aug-27")

def tearDown(self):
edb.get_timeseries_db().delete_many({"user_id": self.testUUID1})
edb.get_timeseries_db().delete_many({"user_id": self.testUUID})
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUUID1})
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUUID})
edb.get_uuid_db().delete_one({"user_email": "user1"})
edb.get_uuid_db().delete_one({"user_email": "user2"})
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUUID})

def testGetUUIDList(self):
uuid_list = esta.TimeSeries.get_uuid_list()
Expand Down Expand Up @@ -165,20 +167,14 @@ def testFindEntriesCount(self):
count_ts7 = ts_agg.find_entries_count(key_list=key_list1)
self.assertEqual(count_ts7, 2128)


# TODO: Update conversation in PR #935
# Code cleanup pending once debugging was complete for failing assertion
# Removed try/catch block -> code present in issue #933
# Test case: Aggregate timeseries DB User data passed as input with empty key_list
try:
ts_agg = esta.TimeSeries.get_aggregate_time_series()
count_ts8 = ts_agg.find_entries_count(key_list=key_list4)
self.assertEqual(count_ts8, 3607)
except AssertionError as e:
print(f"Assertion failed for 3607...")
for ct in count_ts8:
cte = ecwe.Entry(ct)
print(f"CTE = ")
print(cte.user_id)
print(cte.metadata.key)
print(cte)
print("=== Trip:", cte.data.start_loc, "->", cte.data.end_loc)
ts_agg = esta.TimeSeries.get_aggregate_time_series()
count_ts8 = ts_agg.find_entries_count(key_list=key_list4)
self.assertEqual(count_ts8, 3607)

# Test case: New User created with no data to check
self.testEmail = None
Expand Down

0 comments on commit 02f966f

Please sign in to comment.