Skip to content

Commit

Permalink
Always run the computation of the user profile stats
Browse files Browse the repository at this point in the history
In #993
we expanded `_get_and_store_range` to store entries other than the pipeline
range, notably the `last_call_ts`.

However, adding it to the end of the pipeline causes it to be skipped most
of the time.

We have an early return from `run_intake_pipeline_for_user` if there is no
new data, so the end of the pipeline (the code in #993) is effectively in
the "else"

That's fine as long as we were only caching pipeline state, because it
wouldn't change if the pipeline was skipped
But now that we are caching the last_call_ts the current location will not
work.  Concretely, if the app is contacting the server but is not uploading
any data, last_call_ts will never be changed.

Since this is a fix that I am implementing on a holiday weekend, I have not
added unit tests to check this. @TeachMeTW please enhance the unit tests to
verify that the stats are generated in both cases: (i) when there is new
data and (ii) when there is no new data.
  • Loading branch information
shankari committed Dec 22, 2024
1 parent fee1bef commit aac991a
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions emission/pipeline/intake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ def run_intake_pipeline(process_number, uuid_list, skip_if_no_new_data=False):

try:
run_intake_pipeline_for_user(uuid, skip_if_no_new_data)
with ect.Timer() as gsr:
logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip")
esds.store_pipeline_time(uuid, 'STORE_USER_STATS',
time.time(), gsr.elapsed)
except Exception as e:
esds.store_pipeline_error(uuid, "WHOLE_PIPELINE", time.time(), None)
logging.exception("Found error %s while processing pipeline "
Expand Down Expand Up @@ -200,10 +206,3 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data):
esds.store_pipeline_time(uuid, ecwp.PipelineStages.CREATE_COMPOSITE_OBJECTS.name,
time.time(), crt.elapsed)

with ect.Timer() as gsr:
logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip")

esds.store_pipeline_time(uuid, 'STORE_USER_STATS',
time.time(), gsr.elapsed)

0 comments on commit aac991a

Please sign in to comment.