Skip to content

Commit

Permalink
Always run the computation of the user profile stats
Browse files Browse the repository at this point in the history
In #993
we expanded `_get_and_store_range` to store entries other than the pipeline
range, notably the `last_call_ts`.

However, adding it to the end of the pipeline causes it to be skipped most
of the time.

We have an early return from `run_intake_pipeline_for_user` if there is no
new data, so the end of the pipeline (the code in #993) is effectively in
the "else"

That's fine as long as we were only caching pipeline state, because it
wouldn't change if the pipeline was skipped
But now that we are caching the last_call_ts the current location will not
work.  Concretely, if the app is contacting the server but is not uploading
any data, last_call_ts will never be changed.

Since this is a fix that I am implementing on a holiday weekend, I have not
added unit tests to check this. @TeachMeTW please enhance the unit tests to
verify that the stats are generated in both cases: (i) when there is new
data and (ii) when there is no new data.

I have also changed the two tests that were running
`epi.run_intake_pipeline_for_user` and checking the profile for results,
since that is not run in that function any more.

I am a bit surprised that `TestUserStat` was calling
`epi.run_intake_pipeline_for_user` although `etc.runIntakePipeline` was
enhanced to run the new step. @TeachMeTW Why was the enhancement needed?

We need to figure out which method to call and be consistent about it as
part of adding new tests (@JGreenlee, @TeachMeTW)
  • Loading branch information
shankari committed Dec 22, 2024
1 parent fee1bef commit 9d1f414
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 9 deletions.
13 changes: 6 additions & 7 deletions emission/pipeline/intake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ def run_intake_pipeline(process_number, uuid_list, skip_if_no_new_data=False):

try:
run_intake_pipeline_for_user(uuid, skip_if_no_new_data)
with ect.Timer() as gsr:
logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip")
esds.store_pipeline_time(uuid, 'STORE_USER_STATS',
time.time(), gsr.elapsed)
except Exception as e:
esds.store_pipeline_error(uuid, "WHOLE_PIPELINE", time.time(), None)
logging.exception("Found error %s while processing pipeline "
Expand Down Expand Up @@ -200,10 +206,3 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data):
esds.store_pipeline_time(uuid, ecwp.PipelineStages.CREATE_COMPOSITE_OBJECTS.name,
time.time(), crt.elapsed)

with ect.Timer() as gsr:
logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip")

esds.store_pipeline_time(uuid, 'STORE_USER_STATS',
time.time(), gsr.elapsed)
4 changes: 2 additions & 2 deletions emission/tests/analysisTests/intakeTests/TestUserStat.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def setUp(self):
edb.get_profile_db().insert_one({"user_id": self.testUUID})

#etc.runIntakePipeline(self.testUUID)
epi.run_intake_pipeline_for_user(self.testUUID, skip_if_no_new_data = False)
etc.runIntakePipeline(self.testUUID)
logging.debug("UUID = %s" % (self.testUUID))

def tearDown(self):
Expand Down Expand Up @@ -118,4 +118,4 @@ def testLastCall(self):
if __name__ == '__main__':
# Configure logging for the test
etc.configLogging()
unittest.main()
unittest.main()
2 changes: 2 additions & 0 deletions emission/tests/netTests/TestPipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import emission.core.wrapper.localdate as ecwl
import emission.tests.common as etc
import emission.pipeline.intake_stage as epi
import emission.analysis.result.user_stat as eaurs

from emission.net.api import pipeline

Expand Down Expand Up @@ -38,6 +39,7 @@ def testNoAnalysisResults(self):
def testAnalysisResults(self):
self.assertEqual(pipeline.get_range(self.testUUID), (None, None))
epi.run_intake_pipeline_for_user(self.testUUID, skip_if_no_new_data = False)
eaurs.get_and_store_user_stats(self.testUUID, "analysis/composite_trip")
pr = pipeline.get_range(self.testUUID)
self.assertAlmostEqual(pr[0], 1440688739.672)
self.assertAlmostEqual(pr[1], 1440729142.709)
Expand Down

0 comments on commit 9d1f414

Please sign in to comment.