From aac991a06085fce5901d570c34d57e4486c77f95 Mon Sep 17 00:00:00 2001 From: Shankari Date: Sat, 21 Dec 2024 18:30:33 -0800 Subject: [PATCH] Always run the computation of the user profile stats In https://github.com/e-mission/e-mission-server/pull/993 we expanded `_get_and_store_range` to store entries other than the pipeline range, notably the `last_call_ts`. However, adding it to the end of the pipeline causes it to be skipped most of the time. We have an early return from `run_intake_pipeline_for_user` if there is no new data, so the end of the pipeline (the code in #993) is effectively in the "else" That's fine as long as we were only caching pipeline state, because it wouldn't change if the pipeline was skipped But now that we are caching the last_call_ts the current location will not work. Concretely, if the app is contacting the server but is not uploading any data, last_call_ts will never be changed. Since this is a fix that I am implementing on a holiday weekend, I have not added unit tests to check this. @TeachMeTW please enhance the unit tests to verify that the stats are generated in both cases: (i) when there is new data and (ii) when there is no new data. --- emission/pipeline/intake_stage.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index d58be2b25..9fb95c49a 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -76,6 +76,12 @@ def run_intake_pipeline(process_number, uuid_list, skip_if_no_new_data=False): try: run_intake_pipeline_for_user(uuid, skip_if_no_new_data) + with ect.Timer() as gsr: + logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) + eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip") + esds.store_pipeline_time(uuid, 'STORE_USER_STATS', + time.time(), gsr.elapsed) except Exception as e: esds.store_pipeline_error(uuid, "WHOLE_PIPELINE", time.time(), None) logging.exception("Found error %s while processing pipeline " @@ -200,10 +206,3 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data): esds.store_pipeline_time(uuid, ecwp.PipelineStages.CREATE_COMPOSITE_OBJECTS.name, time.time(), crt.elapsed) - with ect.Timer() as gsr: - logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) - eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip") - - esds.store_pipeline_time(uuid, 'STORE_USER_STATS', - time.time(), gsr.elapsed)