Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added initial interface for fetching inferred section modes #937

Merged
merged 1 commit into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions emission/storage/decorations/section_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ def cleaned2inferred_section(user_id, section_id):
curr_predicted_entry = _get_inference_entry_for_section(user_id, section_id, "analysis/inferred_section", "data.cleaned_section")
return curr_predicted_entry

def cleaned2inferred_section_list(section_user_list):
curr_predicted_entries = {}
for section_userid in section_user_list:
matching_inferred_section = cleaned2inferred_section(section_userid.get('user_id'), section_userid.get('section'))
if matching_inferred_section is None:
curr_predicted_entries[str(section_userid.get('section'))] = ecwm.PredictedModeTypes.UNKNOWN
else:
curr_predicted_entries[str(section_userid.get('section'))] = matching_inferred_section.data.sensed_mode # PredictedModeTypes
return curr_predicted_entries

def _get_inference_entry_for_section(user_id, section_id, entry_key, section_id_key):
prediction_key_query = {"metadata.key": entry_key}
inference_query = {"user_id": user_id, section_id_key: section_id}
Expand Down
68 changes: 67 additions & 1 deletion emission/tests/storageTests/TestSectionQueries.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging
import uuid
import json
import bson.objectid as boi

# Our imports
import emission.storage.decorations.section_queries as esds
Expand All @@ -20,6 +21,10 @@

import emission.core.wrapper.section as ecws
import emission.core.get_database as edb
import emission.core.wrapper.entry as ecwe
import emission.tests.common as etc
import emission.pipeline.intake_stage as epi
import emission.core.wrapper.modeprediction as ecwm

# Our testing imports
import emission.tests.storageTests.analysis_ts_common as etsa
Expand All @@ -30,12 +35,29 @@ def setUp(self):
edb.get_analysis_timeseries_db().delete_many({'user_id': self.testUserId})
self.test_trip_id = "test_trip_id"

self.testEmail = "user1"
etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-aug-21")
self.testUUID1 = self.testUUID

self.testEmail = "user2"
etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-aug-27")
self.testUUID2 = self.testUUID

def tearDown(self):
self.clearRelatedDb()

def clearRelatedDb(self):
edb.get_timeseries_db().delete_many({"user_id": self.testUserId})
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUserId})
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUserId})

edb.get_timeseries_db().delete_many({"user_id": self.testUUID1})
edb.get_timeseries_db().delete_many({"user_id": self.testUUID2})
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUUID1})
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUUID2})
edb.get_pipeline_state_db().delete_many({"user_id": self.testUUID1})
edb.get_pipeline_state_db().delete_many({"user_id": self.testUUID2})
edb.get_uuid_db().delete_one({"user_email": "user1"})
edb.get_uuid_db().delete_one({"user_email": "user2"})

def testQuerySections(self):
new_section = ecws.Section()
Expand All @@ -54,6 +76,50 @@ def testQuerySections(self):
estt.TimeQuery("data.start_ts", 4, 6))
self.assertEqual([entry.data for entry in ret_arr_list], ret_arr_time)

def testCleaned2InferredSectionList(self):

# Running the pipeline for the two user datasets
epi.run_intake_pipeline_for_user(self.testUUID1, skip_if_no_new_data = False)
epi.run_intake_pipeline_for_user(self.testUUID2, skip_if_no_new_data = False)

# Fetching the timeseries entries containing both raw data and analysis data after running intake pipeline
ts_agg = esta.TimeSeries.get_aggregate_time_series()
Comment on lines +85 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for clarification: this doesn't actually fetch the user entries - it just fetches the appropriate access layer.


# Preparing section_user_list of sections and user_ids dictionary to be passed as function parameter
doc_cursor = ts_agg.find_entries([esda.CLEANED_SECTION_KEY])
sections_entries = [ecwe.Entry(doc) for doc in doc_cursor]
section_user_list = []
for i, section in enumerate(sections_entries):
section_id = section.get_id()
user_section_id = section['user_id']
section_dict = {'section' : section_id, 'user_id' : user_section_id}
section_user_list.append(section_dict)

# Testcase 1: Aggregate timeseries entries with list of sections-user dictionary for multiple users
# Number of predicted_entries based on the inferred sections should match the number of cleaned sections
# Total = 25 = 10 (UUID1) + 15 (UUID2)
curr_predicted_entries = esds.cleaned2inferred_section_list(section_user_list)
self.assertEqual(len(curr_predicted_entries), len(sections_entries))
print(curr_predicted_entries)

# Testcase 2: Null user_id value is passed
curr_predicted_entries = esds.cleaned2inferred_section_list([{'section' : section_id, 'user_id' : ''}])
self.assertEqual(curr_predicted_entries, {str(section_id): ecwm.PredictedModeTypes.UNKNOWN})

# Testcase 3: Null section_id value is passed
curr_predicted_entries = esds.cleaned2inferred_section_list([{'section' : '', 'user_id' : user_section_id}])
self.assertEqual(curr_predicted_entries, {'': ecwm.PredictedModeTypes.UNKNOWN})

# Testcase 4: Empty dictionary is passed
# Python assigns 'None' as the default key value for empty dict {}
curr_predicted_entries = esds.cleaned2inferred_section_list([{}])
self.assertEqual(curr_predicted_entries, {'None': ecwm.PredictedModeTypes.UNKNOWN})

# Testcase 5: Empty list is passed
curr_predicted_entries = esds.cleaned2inferred_section_list([])
self.assertEqual(curr_predicted_entries, {})


if __name__ == '__main__':
import emission.tests.common as etc
etc.configLogging()
Expand Down
24 changes: 10 additions & 14 deletions emission/tests/storageTests/TestTimeSeries.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ def setUp(self):
etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-aug-27")

def tearDown(self):
edb.get_timeseries_db().delete_many({"user_id": self.testUUID1})
edb.get_timeseries_db().delete_many({"user_id": self.testUUID})
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUUID1})
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUUID})
Comment on lines +41 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job cleaning this up. I am not going to insist on it this time, but in the future, you should really put these unrelated changes into a separate commit to keep the commit history clean. You can have multiple commits in a PR, and having separate commits will keep the "git blame" more relevant.

edb.get_uuid_db().delete_one({"user_email": "user1"})
edb.get_uuid_db().delete_one({"user_email": "user2"})
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUUID})

def testGetUUIDList(self):
uuid_list = esta.TimeSeries.get_uuid_list()
Expand Down Expand Up @@ -165,20 +167,14 @@ def testFindEntriesCount(self):
count_ts7 = ts_agg.find_entries_count(key_list=key_list1)
self.assertEqual(count_ts7, 2128)


# TODO: Update conversation in PR #935
# Code cleanup pending once debugging was complete for failing assertion
# Removed try/catch block -> code present in issue #933
# Test case: Aggregate timeseries DB User data passed as input with empty key_list
try:
ts_agg = esta.TimeSeries.get_aggregate_time_series()
count_ts8 = ts_agg.find_entries_count(key_list=key_list4)
self.assertEqual(count_ts8, 3607)
except AssertionError as e:
print(f"Assertion failed for 3607...")
for ct in count_ts8:
cte = ecwe.Entry(ct)
print(f"CTE = ")
print(cte.user_id)
print(cte.metadata.key)
print(cte)
print("=== Trip:", cte.data.start_loc, "->", cte.data.end_loc)
ts_agg = esta.TimeSeries.get_aggregate_time_series()
count_ts8 = ts_agg.find_entries_count(key_list=key_list4)
self.assertEqual(count_ts8, 3607)

# Test case: New User created with no data to check
self.testEmail = None
Expand Down
Loading