Skip to content

Commit

Permalink
Separated Get and Store into a separate file
Browse files Browse the repository at this point in the history
  • Loading branch information
TeachMeTW committed Nov 8, 2024
1 parent f1d2eb2 commit 89dfc9a
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 78 deletions.
160 changes: 160 additions & 0 deletions emission/analysis/result/user_stat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# emission/analysis/result/user_stats.py

import logging
import pymongo
import arrow
from typing import Optional, Dict, Any
import emission.storage.timeseries.abstract_timeseries as esta
import emission.core.wrapper.user as ecwu

TIME_FORMAT = 'YYYY-MM-DD HH:mm:ss'

def get_time_series(user_id: str) -> esta.TimeSeries:
"""
Fetches the time series data for a given user.
:param user_id: The UUID of the user.
:type user_id: str
:return: The time series object.
:rtype: esta.TimeSeries
"""
logging.debug(f"Fetching time series for user_id: {user_id}")
return esta.TimeSeries.get_time_series(user_id)


def get_timestamp(ts: esta.TimeSeries, trip_key: str, field: str, sort_order: int) -> Optional[int]:
"""
Retrieves a timestamp from the time series.
:param ts: The time series object.
:type ts: esta.TimeSeries
:param trip_key: The key representing the trip data.
:type trip_key: str
:param field: The specific field to retrieve.
:type field: str
:param sort_order: Sorting order (pymongo.ASCENDING or pymongo.DESCENDING).
:type sort_order: int
:return: The timestamp or None if not found.
:rtype: Optional[int]
"""
timestamp = ts.get_first_value_for_field(trip_key, field, sort_order)
logging.debug(f"Retrieved {field}: {timestamp}")
return None if timestamp == -1 else timestamp


def count_trips(ts: esta.TimeSeries, key_list: list, extra_query_list: Optional[list] = None) -> int:
"""
Counts the number of trips based on the provided query.
:param ts: The time series object.
:type ts: esta.TimeSeries
:param key_list: List of keys to filter trips.
:type key_list: list
:param extra_query_list: Additional queries, defaults to None.
:type extra_query_list: Optional[list], optional
:return: The count of trips.
:rtype: int
"""
count = ts.find_entries_count(key_list=key_list, extra_query_list=extra_query_list)
logging.debug(f"Counted {len(key_list)} trips with additional queries {extra_query_list}: {count}")
return count


def get_last_call_timestamp(ts: esta.TimeSeries) -> Optional[int]:
"""
Retrieves the last API call timestamp.
:param ts: The time series object.
:type ts: esta.TimeSeries
:return: The last call timestamp or None if not found.
:rtype: Optional[int]
"""
last_call_ts = ts.get_first_value_for_field(
key='stats/server_api_time',
field='data.ts',
sort_order=pymongo.DESCENDING
)
logging.debug(f"Last call timestamp: {last_call_ts}")
return None if last_call_ts == -1 else last_call_ts


def format_timestamp(ts: int) -> str:
"""
Formats a timestamp using the predefined time format.
:param ts: The timestamp to format.
:type ts: int
:return: The formatted timestamp.
:rtype: str
"""
formatted = arrow.get(ts).format(TIME_FORMAT)
logging.debug(f"Formatted timestamp: {formatted}")
return formatted


def update_user_profile(user_id: str, data: Dict[str, Any]) -> None:
"""
Updates the user profile with the provided data.
:param user_id: The UUID of the user.
:type user_id: str
:param data: The data to update in the user profile.
:type data: Dict[str, Any]
:return: None
"""
user = ecwu.User.fromUUID(user_id)
user.update(data)
logging.debug(f"User profile updated with data: {data}")
logging.debug(f"New profile: {user.getProfile()}")


def get_and_store_user_stats(user_id: str, trip_key: str) -> None:
"""
Aggregates and stores user statistics into the user profile.
:param user_id: The UUID of the user.
:type user_id: str
:param trip_key: The key representing the trip data in the time series.
:type trip_key: str
:return: None
"""
try:
logging.info(f"Starting get_and_store_user_stats for user_id: {user_id}, trip_key: {trip_key}")

ts = get_time_series(user_id)

start_ts = get_timestamp(ts, trip_key, "data.start_ts", pymongo.ASCENDING)
end_ts = get_timestamp(ts, trip_key, "data.end_ts", pymongo.DESCENDING)

total_trips = count_trips(ts, key_list=["analysis/confirmed_trip"])
labeled_trips = count_trips(
ts,
key_list=["analysis/confirmed_trip"],
extra_query_list=[{'data.user_input': {'$ne': {}}}]
)

logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}")
logging.info(f"user_id type: {type(user_id)}")

last_call_ts = get_last_call_timestamp(ts)
logging.info(f"Last call timestamp: {last_call_ts}")

update_data = {
"pipeline_range": {
"start_ts": start_ts,
"end_ts": end_ts
},
"total_trips": total_trips,
"labeled_trips": labeled_trips
}

if last_call_ts is not None:
formatted_last_call = format_timestamp(last_call_ts)
update_data["last_call"] = formatted_last_call

update_user_profile(user_id, update_data)

logging.debug("User profile updated successfully.")

except Exception as e:
logging.error(f"Error in get_and_store_user_stats for user_id {user_id}: {e}")
81 changes: 3 additions & 78 deletions emission/pipeline/intake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import emission.storage.decorations.stats_queries as esds

import emission.core.wrapper.user as ecwu
import emission.analysis.result.user_stat as eaurs

def run_intake_pipeline(process_number, uuid_list, skip_if_no_new_data=False):
"""
Expand Down Expand Up @@ -202,83 +203,7 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data):
with ect.Timer() as gsr:
logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
_get_and_store_range(uuid, "analysis/composite_trip")
eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip")

esds.store_pipeline_time(uuid, 'STORE_USER_STATS',
time.time(), gsr.elapsed)

def _get_and_store_range(user_id, trip_key):
"""
Extends the user profile with pipeline_range, total_trips, labeled_trips, and last_call.
Parameters:
- user_id (str): The UUID of the user.
- trip_key (str): The key representing the trip data in the time series.
"""
time_format = 'YYYY-MM-DD HH:mm:ss'
try:
logging.info(f"Starting _get_and_store_range for user_id: {user_id}, trip_key: {trip_key}")

# Fetch the time series for the user
ts = esta.TimeSeries.get_time_series(user_id)
logging.debug("Fetched time series data.")

# Get start timestamp
start_ts = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING)
start_ts = None if start_ts == -1 else start_ts
logging.debug(f"Start timestamp: {start_ts}")

# Get end timestamp
end_ts = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING)
end_ts = None if end_ts == -1 else end_ts
logging.debug(f"End timestamp: {end_ts}")

# Retrieve trip entries
total_trips = ts.find_entries_count(
key_list=["analysis/confirmed_trip"],
)

labeled_trips = ts.find_entries_count(
key_list=["analysis/confirmed_trip"],
extra_query_list=[{'data.user_input': {'$ne': {}}}]
)

logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}")
logging.info(type(user_id))
logging.debug("Fetched API call statistics.")

last_call_ts = ts.get_first_value_for_field(
key='stats/server_api_time',
field='data.ts',
sort_order=pymongo.DESCENDING
)

logging.info(f"Last call timestamp: {last_call_ts}")

# Update the user profile with pipeline_range, total_trips, labeled_trips, and last_call
user = ecwu.User.fromUUID(user_id)
if last_call_ts != -1:
# Format the timestamp using arrow
formatted_last_call = arrow.get(last_call_ts).format(time_format)
# Assign using attribute access or the update method
# Option 1: Attribute Assignment (if supported)
# user.last_call = formatted_last_call

# Option 2: Using the update method
user.update({
"last_call": formatted_last_call
})
user.update({
"pipeline_range": {
"start_ts": start_ts,
"end_ts": end_ts
},
"total_trips": total_trips,
"labeled_trips": labeled_trips,
"last_call": last_call_ts
})
logging.debug("User profile updated successfully.")
logging.debug("After updating, new profile is %s", user.getProfile())

except Exception as e:
logging.error(f"Error in _get_and_store_range for user_id {user_id}: {e}")
time.time(), gsr.elapsed)

0 comments on commit 89dfc9a

Please sign in to comment.