From b4a2ced0ce47e87a25424462b717be4baa98e218 Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Sat, 2 Nov 2024 20:45:30 -0700 Subject: [PATCH] added instrumentation to the _get_and_store_range Discussed with Jack, approved with this change to clarify pipeline stage Added total, labeled, and last call Modified total and labeled trips to match op-admin implementation --- emission/pipeline/intake_stage.py | 108 ++++++++++++++++++++++++++---- 1 file changed, 96 insertions(+), 12 deletions(-) diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index a93ba2996..4e80683a5 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -13,6 +13,7 @@ from uuid import UUID import time import pymongo +from datetime import datetime import emission.core.get_database as edb import emission.core.timer as ect @@ -198,17 +199,100 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data): esds.store_pipeline_time(uuid, ecwp.PipelineStages.CREATE_COMPOSITE_OBJECTS.name, time.time(), crt.elapsed) - _get_and_store_range(uuid, "analysis/composite_trip") + with ect.Timer() as gsr: + logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) + _get_and_store_range(uuid, "analysis/composite_trip") + + esds.store_pipeline_time(uuid, 'STORE_USER_STATS', + time.time(), gsr.elapsed) def _get_and_store_range(user_id, trip_key): - ts = esta.TimeSeries.get_time_series(user_id) - start_ts = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING) - if start_ts == -1: - start_ts = None - end_ts = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING) - if end_ts == -1: - end_ts = None - - user = ecwu.User(user_id) - user.update({"pipeline_range": {"start_ts": start_ts, "end_ts": end_ts}}) - logging.debug("After updating, new profiles is %s" % user.getProfile()) + """ + Extends the user profile with pipeline_range, total_trips, labeled_trips, and last_call. + + Parameters: + - user_id (str): The UUID of the user. + - trip_key (str): The key representing the trip data in the time series. + """ + + try: + logging.info(f"Starting _get_and_store_range for user_id: {user_id}, trip_key: {trip_key}") + + # Fetch the time series for the user + ts = esta.TimeSeries.get_time_series(user_id) + logging.debug("Fetched time series data.") + + # Get start timestamp + start_ts = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING) + start_ts = None if start_ts == -1 else start_ts + logging.debug(f"Start timestamp: {start_ts}") + + # Get end timestamp + end_ts = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING) + end_ts = None if end_ts == -1 else end_ts + logging.debug(f"End timestamp: {end_ts}") + + # Retrieve trip entries + total_trips = ts.find_entries_count( + key_list=["analysis/confirmed_trip"], + ) + + labeled_trips = ts.find_entries_count( + key_list=["analysis/confirmed_trip"], + extra_query_list=[{'data.user_input': {'$ne': {}}}] + ) + + logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}") + logging.info(type(user_id)) + # Retrieve last GET and PUT calls from stats/server_api_time + docs_cursor = edb.get_timeseries_db().find({ + "metadata.key": "stats/server_api_time", + "user_id" : user_id + }) + logging.debug("Fetched API call statistics.") + + last_get = None + last_put = None + + for doc in docs_cursor: + api_call_name = doc.get("data", {}).get("name", "") + api_call_ts = doc.get("data", {}).get("ts") + + if not api_call_ts: + logging.warning(f"Missing 'ts' in document: {doc}") + continue + + if api_call_name.startswith("GET_"): + if not last_get or api_call_ts > last_get: + last_get = api_call_ts + logging.debug(f"Updated last_get to: {last_get}") + elif api_call_name.startswith("PUT_"): + if not last_put or api_call_ts > last_put: + last_put = api_call_ts + logging.debug(f"Updated last_put to: {last_put}") + + # Determine the most recent call + if last_get and last_put: + last_call_ts = max(last_get, last_put) + else: + last_call_ts = last_get or last_put + + logging.info(f"Last call timestamp: {last_call_ts}") + + # Update the user profile with pipeline_range, total_trips, labeled_trips, and last_call + user = ecwu.User.fromUUID(user_id) + user.update({ + "pipeline_range": { + "start_ts": start_ts, + "end_ts": end_ts + }, + "total_trips": total_trips, + "labeled_trips": labeled_trips, + "last_call": last_call_ts + }) + logging.debug("User profile updated successfully.") + logging.debug("After updating, new profile is %s", user.getProfile()) + + except Exception as e: + logging.error(f"Error in _get_and_store_range for user_id {user_id}: {e}") \ No newline at end of file