diff --git a/emission/analysis/result/user_stat.py b/emission/analysis/result/user_stat.py new file mode 100644 index 000000000..1fe983d51 --- /dev/null +++ b/emission/analysis/result/user_stat.py @@ -0,0 +1,160 @@ +# emission/analysis/result/user_stats.py + +import logging +import pymongo +import arrow +from typing import Optional, Dict, Any +import emission.storage.timeseries.abstract_timeseries as esta +import emission.core.wrapper.user as ecwu + +TIME_FORMAT = 'YYYY-MM-DD HH:mm:ss' + +def get_time_series(user_id: str) -> esta.TimeSeries: + """ + Fetches the time series data for a given user. + + :param user_id: The UUID of the user. + :type user_id: str + :return: The time series object. + :rtype: esta.TimeSeries + """ + logging.debug(f"Fetching time series for user_id: {user_id}") + return esta.TimeSeries.get_time_series(user_id) + + +def get_timestamp(ts: esta.TimeSeries, trip_key: str, field: str, sort_order: int) -> Optional[int]: + """ + Retrieves a timestamp from the time series. + + :param ts: The time series object. + :type ts: esta.TimeSeries + :param trip_key: The key representing the trip data. + :type trip_key: str + :param field: The specific field to retrieve. + :type field: str + :param sort_order: Sorting order (pymongo.ASCENDING or pymongo.DESCENDING). + :type sort_order: int + :return: The timestamp or None if not found. + :rtype: Optional[int] + """ + timestamp = ts.get_first_value_for_field(trip_key, field, sort_order) + logging.debug(f"Retrieved {field}: {timestamp}") + return None if timestamp == -1 else timestamp + + +def count_trips(ts: esta.TimeSeries, key_list: list, extra_query_list: Optional[list] = None) -> int: + """ + Counts the number of trips based on the provided query. + + :param ts: The time series object. + :type ts: esta.TimeSeries + :param key_list: List of keys to filter trips. + :type key_list: list + :param extra_query_list: Additional queries, defaults to None. + :type extra_query_list: Optional[list], optional + :return: The count of trips. + :rtype: int + """ + count = ts.find_entries_count(key_list=key_list, extra_query_list=extra_query_list) + logging.debug(f"Counted {len(key_list)} trips with additional queries {extra_query_list}: {count}") + return count + + +def get_last_call_timestamp(ts: esta.TimeSeries) -> Optional[int]: + """ + Retrieves the last API call timestamp. + + :param ts: The time series object. + :type ts: esta.TimeSeries + :return: The last call timestamp or None if not found. + :rtype: Optional[int] + """ + last_call_ts = ts.get_first_value_for_field( + key='stats/server_api_time', + field='data.ts', + sort_order=pymongo.DESCENDING + ) + logging.debug(f"Last call timestamp: {last_call_ts}") + return None if last_call_ts == -1 else last_call_ts + + +def format_timestamp(ts: int) -> str: + """ + Formats a timestamp using the predefined time format. + + :param ts: The timestamp to format. + :type ts: int + :return: The formatted timestamp. + :rtype: str + """ + formatted = arrow.get(ts).format(TIME_FORMAT) + logging.debug(f"Formatted timestamp: {formatted}") + return formatted + + +def update_user_profile(user_id: str, data: Dict[str, Any]) -> None: + """ + Updates the user profile with the provided data. + + :param user_id: The UUID of the user. + :type user_id: str + :param data: The data to update in the user profile. + :type data: Dict[str, Any] + :return: None + """ + user = ecwu.User.fromUUID(user_id) + user.update(data) + logging.debug(f"User profile updated with data: {data}") + logging.debug(f"New profile: {user.getProfile()}") + + +def get_and_store_user_stats(user_id: str, trip_key: str) -> None: + """ + Aggregates and stores user statistics into the user profile. + + :param user_id: The UUID of the user. + :type user_id: str + :param trip_key: The key representing the trip data in the time series. + :type trip_key: str + :return: None + """ + try: + logging.info(f"Starting get_and_store_user_stats for user_id: {user_id}, trip_key: {trip_key}") + + ts = get_time_series(user_id) + + start_ts = get_timestamp(ts, trip_key, "data.start_ts", pymongo.ASCENDING) + end_ts = get_timestamp(ts, trip_key, "data.end_ts", pymongo.DESCENDING) + + total_trips = count_trips(ts, key_list=["analysis/confirmed_trip"]) + labeled_trips = count_trips( + ts, + key_list=["analysis/confirmed_trip"], + extra_query_list=[{'data.user_input': {'$ne': {}}}] + ) + + logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}") + logging.info(f"user_id type: {type(user_id)}") + + last_call_ts = get_last_call_timestamp(ts) + logging.info(f"Last call timestamp: {last_call_ts}") + + update_data = { + "pipeline_range": { + "start_ts": start_ts, + "end_ts": end_ts + }, + "total_trips": total_trips, + "labeled_trips": labeled_trips + } + + if last_call_ts is not None: + formatted_last_call = format_timestamp(last_call_ts) + update_data["last_call"] = formatted_last_call + + update_user_profile(user_id, update_data) + + logging.debug("User profile updated successfully.") + + except Exception as e: + logging.error(f"Error in get_and_store_user_stats for user_id {user_id}: {e}") \ No newline at end of file diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index 2d85294ea..d58be2b25 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -40,6 +40,7 @@ import emission.storage.decorations.stats_queries as esds import emission.core.wrapper.user as ecwu +import emission.analysis.result.user_stat as eaurs def run_intake_pipeline(process_number, uuid_list, skip_if_no_new_data=False): """ @@ -202,83 +203,7 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data): with ect.Timer() as gsr: logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) - _get_and_store_range(uuid, "analysis/composite_trip") + eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip") esds.store_pipeline_time(uuid, 'STORE_USER_STATS', - time.time(), gsr.elapsed) - -def _get_and_store_range(user_id, trip_key): - """ - Extends the user profile with pipeline_range, total_trips, labeled_trips, and last_call. - - Parameters: - - user_id (str): The UUID of the user. - - trip_key (str): The key representing the trip data in the time series. - """ - time_format = 'YYYY-MM-DD HH:mm:ss' - try: - logging.info(f"Starting _get_and_store_range for user_id: {user_id}, trip_key: {trip_key}") - - # Fetch the time series for the user - ts = esta.TimeSeries.get_time_series(user_id) - logging.debug("Fetched time series data.") - - # Get start timestamp - start_ts = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING) - start_ts = None if start_ts == -1 else start_ts - logging.debug(f"Start timestamp: {start_ts}") - - # Get end timestamp - end_ts = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING) - end_ts = None if end_ts == -1 else end_ts - logging.debug(f"End timestamp: {end_ts}") - - # Retrieve trip entries - total_trips = ts.find_entries_count( - key_list=["analysis/confirmed_trip"], - ) - - labeled_trips = ts.find_entries_count( - key_list=["analysis/confirmed_trip"], - extra_query_list=[{'data.user_input': {'$ne': {}}}] - ) - - logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}") - logging.info(type(user_id)) - logging.debug("Fetched API call statistics.") - - last_call_ts = ts.get_first_value_for_field( - key='stats/server_api_time', - field='data.ts', - sort_order=pymongo.DESCENDING - ) - - logging.info(f"Last call timestamp: {last_call_ts}") - - # Update the user profile with pipeline_range, total_trips, labeled_trips, and last_call - user = ecwu.User.fromUUID(user_id) - if last_call_ts != -1: - # Format the timestamp using arrow - formatted_last_call = arrow.get(last_call_ts).format(time_format) - # Assign using attribute access or the update method - # Option 1: Attribute Assignment (if supported) - # user.last_call = formatted_last_call - - # Option 2: Using the update method - user.update({ - "last_call": formatted_last_call - }) - user.update({ - "pipeline_range": { - "start_ts": start_ts, - "end_ts": end_ts - }, - "total_trips": total_trips, - "labeled_trips": labeled_trips, - "last_call": last_call_ts - }) - logging.debug("User profile updated successfully.") - logging.debug("After updating, new profile is %s", user.getProfile()) - - except Exception as e: - logging.error(f"Error in _get_and_store_range for user_id {user_id}: {e}") \ No newline at end of file + time.time(), gsr.elapsed)