-
Notifications
You must be signed in to change notification settings - Fork 119
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add initial unit tests for
get_and_store_user_stats
- Introduced comprehensive test suite to validate functionality of the `get_and_store_user_stats` method. - Covered core scenarios including: - Correct data aggregation and storage. - Handling cases with no trips or missing data. - Verifying behavior with partial data and multiple invocations. - handling of invalid UUID inputs. - setup and teardown to ensure test isolation and clean up user data. - Included data insertion for confirmed trips, composite trips, and server API timestamps to simulate realistic scenarios. This initial test suite establishes a baseline for ensuring reliability of the `get_and_store_user_stats` function. remove Modified test Refactored the test and simplified it; validated all new user stats
- Loading branch information
Showing
1 changed file
with
141 additions
and
0 deletions.
There are no files selected for viewing
141 changes: 141 additions & 0 deletions
141
emission/tests/analysisTests/intakeTests/TestUserStat.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
from __future__ import unicode_literals, print_function, division, absolute_import | ||
import unittest | ||
import uuid | ||
import logging | ||
import json | ||
import os | ||
import time | ||
import pandas as pd | ||
|
||
from builtins import * | ||
from future import standard_library | ||
standard_library.install_aliases() | ||
|
||
# Standard imports | ||
import emission.storage.json_wrappers as esj | ||
|
||
# Our imports | ||
import emission.core.get_database as edb | ||
import emission.storage.timeseries.timequery as estt | ||
import emission.storage.timeseries.abstract_timeseries as esta | ||
import emission.storage.decorations.analysis_timeseries_queries as esda | ||
import emission.core.wrapper.user as ecwu | ||
import emission.net.api.stats as enac | ||
|
||
# Test imports | ||
import emission.tests.common as etc | ||
|
||
|
||
class TestUserStats(unittest.TestCase): | ||
def setUp(self): | ||
""" | ||
Set up the test environment by loading real example data for both Android and users. | ||
""" | ||
# Configure logging for the test | ||
etc.configLogging() | ||
|
||
# Retrieve a test user UUID from the database | ||
get_example = pd.DataFrame(list(edb.get_uuid_db().find({}, {"user_email":1, "uuid": 1, "_id": 0}))) | ||
if get_example.empty: | ||
self.fail("No users found in the database to perform tests.") | ||
test_user_id = get_example.iloc[0].uuid | ||
self.testUUID = test_user_id | ||
self.UUID = test_user_id | ||
# Load example entries from a JSON file | ||
with open("emission/tests/data/real_examples/shankari_2015-aug-27") as fp: | ||
self.entries = json.load(fp, object_hook=esj.wrapped_object_hook) | ||
|
||
# Set up the real example data with entries | ||
etc.setupRealExampleWithEntries(self) | ||
|
||
# Retrieve the user profile | ||
profile = edb.get_profile_db().find_one({"user_id": self.UUID}) | ||
if profile is None: | ||
# Initialize the profile if it does not exist | ||
edb.get_profile_db().insert_one({"user_id": self.UUID}) | ||
|
||
etc.runIntakePipeline(self.UUID) | ||
|
||
logging.debug("UUID = %s" % (self.UUID)) | ||
|
||
def tearDown(self): | ||
""" | ||
Clean up the test environment by removing analysis configuration and deleting test data from databases. | ||
""" | ||
|
||
# Delete all time series entries for users | ||
tsdb = edb.get_timeseries_db() | ||
tsdb.delete_many({"user_id": self.UUID}) | ||
|
||
# Delete all pipeline state entries for users | ||
pipeline_db = edb.get_pipeline_state_db() | ||
pipeline_db.delete_many({"user_id": self.UUID}) | ||
|
||
# Delete all analysis time series entries for users | ||
analysis_ts_db = edb.get_analysis_timeseries_db() | ||
analysis_ts_db.delete_many({"user_id": self.UUID}) | ||
|
||
# Delete user profiles | ||
profile_db = edb.get_profile_db() | ||
profile_db.delete_one({"user_id": self.UUID}) | ||
|
||
def testGetAndStoreUserStats(self): | ||
""" | ||
Test get_and_store_user_stats for the user to ensure that user statistics | ||
are correctly aggregated and stored in the user profile. | ||
""" | ||
|
||
# Retrieve the updated user profile from the database | ||
profile = edb.get_profile_db().find_one({"user_id": self.UUID}) | ||
|
||
# Ensure that the profile exists | ||
self.assertIsNotNone(profile, "User profile should exist after storing stats.") | ||
|
||
# Verify that the expected fields are present | ||
self.assertIn("total_trips", profile, "User profile should contain 'total_trips'.") | ||
self.assertIn("labeled_trips", profile, "User profile should contain 'labeled_trips'.") | ||
self.assertIn("pipeline_range", profile, "User profile should contain 'pipeline_range'.") | ||
self.assertIn("last_call_ts", profile, "User profile should contain 'last_call_ts'.") | ||
|
||
expected_total_trips = 8 | ||
expected_labeled_trips = 0 | ||
|
||
self.assertEqual(profile["total_trips"], expected_total_trips, | ||
f"Expected total_trips to be {expected_total_trips}, got {profile['total_trips']}") | ||
self.assertEqual(profile["labeled_trips"], expected_labeled_trips, | ||
f"Expected labeled_trips to be {expected_labeled_trips}, got {profile['labeled_trips']}") | ||
|
||
# Verify pipeline range | ||
pipeline_range = profile.get("pipeline_range", {}) | ||
self.assertIn("start_ts", pipeline_range, "Pipeline range should contain 'start_ts'.") | ||
self.assertIn("end_ts", pipeline_range, "Pipeline range should contain 'end_ts'.") | ||
|
||
expected_start_ts = 1440688739.672 | ||
expected_end_ts = 1440729142.709 | ||
|
||
self.assertEqual(pipeline_range["start_ts"], expected_start_ts, | ||
f"Expected start_ts to be {expected_start_ts}, got {pipeline_range['start_ts']}") | ||
self.assertEqual(pipeline_range["end_ts"], expected_end_ts, | ||
f"Expected end_ts to be {expected_end_ts}, got {pipeline_range['end_ts']}") | ||
|
||
def testLastCall(self): | ||
# Call the function with all required arguments | ||
enac.store_server_api_time(self.UUID, "test_call", 1440729142.709, 69) | ||
|
||
etc.runIntakePipeline(self.UUID) | ||
|
||
# Retrieve the profile from the database | ||
profile = edb.get_profile_db().find_one({"user_id": self.UUID}) | ||
|
||
# Verify that last_call_ts is updated correctly | ||
expected_last_call_ts = 1440729142.709 | ||
actual_last_call_ts = profile.get("last_call_ts") | ||
|
||
self.assertEqual( | ||
actual_last_call_ts, | ||
expected_last_call_ts, | ||
f"Expected last_call_ts to be {expected_last_call_ts}, got {actual_last_call_ts}" | ||
) | ||
|
||
if __name__ == '__main__': | ||
unittest.main() |