From 9d1f414b067cbb8411214fed88d45aaa8c6d5eae Mon Sep 17 00:00:00 2001 From: Shankari Date: Sat, 21 Dec 2024 18:30:33 -0800 Subject: [PATCH] Always run the computation of the user profile stats In https://github.com/e-mission/e-mission-server/pull/993 we expanded `_get_and_store_range` to store entries other than the pipeline range, notably the `last_call_ts`. However, adding it to the end of the pipeline causes it to be skipped most of the time. We have an early return from `run_intake_pipeline_for_user` if there is no new data, so the end of the pipeline (the code in #993) is effectively in the "else" That's fine as long as we were only caching pipeline state, because it wouldn't change if the pipeline was skipped But now that we are caching the last_call_ts the current location will not work. Concretely, if the app is contacting the server but is not uploading any data, last_call_ts will never be changed. Since this is a fix that I am implementing on a holiday weekend, I have not added unit tests to check this. @TeachMeTW please enhance the unit tests to verify that the stats are generated in both cases: (i) when there is new data and (ii) when there is no new data. I have also changed the two tests that were running `epi.run_intake_pipeline_for_user` and checking the profile for results, since that is not run in that function any more. I am a bit surprised that `TestUserStat` was calling `epi.run_intake_pipeline_for_user` although `etc.runIntakePipeline` was enhanced to run the new step. @TeachMeTW Why was the enhancement needed? We need to figure out which method to call and be consistent about it as part of adding new tests (@JGreenlee, @TeachMeTW) --- emission/pipeline/intake_stage.py | 13 ++++++------- .../tests/analysisTests/intakeTests/TestUserStat.py | 4 ++-- emission/tests/netTests/TestPipeline.py | 2 ++ 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index d58be2b25..9fb95c49a 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -76,6 +76,12 @@ def run_intake_pipeline(process_number, uuid_list, skip_if_no_new_data=False): try: run_intake_pipeline_for_user(uuid, skip_if_no_new_data) + with ect.Timer() as gsr: + logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) + eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip") + esds.store_pipeline_time(uuid, 'STORE_USER_STATS', + time.time(), gsr.elapsed) except Exception as e: esds.store_pipeline_error(uuid, "WHOLE_PIPELINE", time.time(), None) logging.exception("Found error %s while processing pipeline " @@ -200,10 +206,3 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data): esds.store_pipeline_time(uuid, ecwp.PipelineStages.CREATE_COMPOSITE_OBJECTS.name, time.time(), crt.elapsed) - with ect.Timer() as gsr: - logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) - eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip") - - esds.store_pipeline_time(uuid, 'STORE_USER_STATS', - time.time(), gsr.elapsed) diff --git a/emission/tests/analysisTests/intakeTests/TestUserStat.py b/emission/tests/analysisTests/intakeTests/TestUserStat.py index 7b2243322..207aa0a98 100644 --- a/emission/tests/analysisTests/intakeTests/TestUserStat.py +++ b/emission/tests/analysisTests/intakeTests/TestUserStat.py @@ -44,7 +44,7 @@ def setUp(self): edb.get_profile_db().insert_one({"user_id": self.testUUID}) #etc.runIntakePipeline(self.testUUID) - epi.run_intake_pipeline_for_user(self.testUUID, skip_if_no_new_data = False) + etc.runIntakePipeline(self.testUUID) logging.debug("UUID = %s" % (self.testUUID)) def tearDown(self): @@ -118,4 +118,4 @@ def testLastCall(self): if __name__ == '__main__': # Configure logging for the test etc.configLogging() - unittest.main() \ No newline at end of file + unittest.main() diff --git a/emission/tests/netTests/TestPipeline.py b/emission/tests/netTests/TestPipeline.py index 54f81ba8e..d404ac6e2 100644 --- a/emission/tests/netTests/TestPipeline.py +++ b/emission/tests/netTests/TestPipeline.py @@ -7,6 +7,7 @@ import emission.core.wrapper.localdate as ecwl import emission.tests.common as etc import emission.pipeline.intake_stage as epi +import emission.analysis.result.user_stat as eaurs from emission.net.api import pipeline @@ -38,6 +39,7 @@ def testNoAnalysisResults(self): def testAnalysisResults(self): self.assertEqual(pipeline.get_range(self.testUUID), (None, None)) epi.run_intake_pipeline_for_user(self.testUUID, skip_if_no_new_data = False) + eaurs.get_and_store_user_stats(self.testUUID, "analysis/composite_trip") pr = pipeline.get_range(self.testUUID) self.assertAlmostEqual(pr[0], 1440688739.672) self.assertAlmostEqual(pr[1], 1440729142.709)