Skip to content

Commit

Permalink
Modified last call to mirror op-admin implementation
Browse files Browse the repository at this point in the history
Separated Get and Store into a separate file

Addressed comments, reduced overkill on refactor

Forgot to add last_call_ts
  • Loading branch information
TeachMeTW committed Dec 13, 2024
1 parent b4a2ced commit 0a39517
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 92 deletions.
112 changes: 112 additions & 0 deletions emission/analysis/result/user_stat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# emission/analysis/result/user_stats.py

import logging
import pymongo
import arrow
from typing import Optional, Dict, Any
import emission.storage.timeseries.abstract_timeseries as esta
import emission.core.wrapper.user as ecwu

TIME_FORMAT = 'YYYY-MM-DD HH:mm:ss'

def count_trips(ts: esta.TimeSeries, key_list: list, extra_query_list: Optional[list] = None) -> int:
"""
Counts the number of trips based on the provided query.
:param ts: The time series object.
:type ts: esta.TimeSeries
:param key_list: List of keys to filter trips.
:type key_list: list
:param extra_query_list: Additional queries, defaults to None.
:type extra_query_list: Optional[list], optional
:return: The count of trips.
:rtype: int
"""
count = ts.find_entries_count(key_list=key_list, extra_query_list=extra_query_list)
logging.debug(f"Counted {len(key_list)} trips with additional queries {extra_query_list}: {count}")
return count


def get_last_call_timestamp(ts: esta.TimeSeries) -> Optional[int]:
"""
Retrieves the last API call timestamp.
:param ts: The time series object.
:type ts: esta.TimeSeries
:return: The last call timestamp or None if not found.
:rtype: Optional[int]
"""
last_call_ts = ts.get_first_value_for_field(
key='stats/server_api_time',
field='data.ts',
sort_order=pymongo.DESCENDING
)
logging.debug(f"Last call timestamp: {last_call_ts}")
return None if last_call_ts == -1 else last_call_ts


def update_user_profile(user_id: str, data: Dict[str, Any]) -> None:
"""
Updates the user profile with the provided data.
:param user_id: The UUID of the user.
:type user_id: str
:param data: The data to update in the user profile.
:type data: Dict[str, Any]
:return: None
"""
user = ecwu.User.fromUUID(user_id)
user.update(data)
logging.debug(f"User profile updated with data: {data}")
logging.debug(f"New profile: {user.getProfile()}")


def get_and_store_user_stats(user_id: str, trip_key: str) -> None:
"""
Aggregates and stores user statistics into the user profile.
:param user_id: The UUID of the user.
:type user_id: str
:param trip_key: The key representing the trip data in the time series.
:type trip_key: str
:return: None
"""
try:
logging.info(f"Starting get_and_store_user_stats for user_id: {user_id}, trip_key: {trip_key}")

ts = esta.TimeSeries.get_time_series(user_id)
start_ts_result = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING)
start_ts = None if start_ts_result == -1 else start_ts_result

end_ts_result = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING)
end_ts = None if end_ts_result == -1 else end_ts_result

total_trips = count_trips(ts, key_list=["analysis/confirmed_trip"])
labeled_trips = count_trips(
ts,
key_list=["analysis/confirmed_trip"],
extra_query_list=[{'data.user_input': {'$ne': {}}}]
)

logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}")
logging.info(f"user_id type: {type(user_id)}")

last_call_ts = get_last_call_timestamp(ts)
logging.info(f"Last call timestamp: {last_call_ts}")

update_data = {
"pipeline_range": {
"start_ts": start_ts,
"end_ts": end_ts
},
"total_trips": total_trips,
"labeled_trips": labeled_trips,
"last_call_ts": last_call_ts
}

update_user_profile(user_id, update_data)

logging.debug("User profile updated successfully.")

except Exception as e:
logging.error(f"Error in get_and_store_user_stats for user_id {user_id}: {e}")
95 changes: 3 additions & 92 deletions emission/pipeline/intake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import emission.storage.decorations.stats_queries as esds

import emission.core.wrapper.user as ecwu
import emission.analysis.result.user_stat as eaurs

def run_intake_pipeline(process_number, uuid_list, skip_if_no_new_data=False):
"""
Expand Down Expand Up @@ -202,97 +203,7 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data):
with ect.Timer() as gsr:
logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
_get_and_store_range(uuid, "analysis/composite_trip")
eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip")

esds.store_pipeline_time(uuid, 'STORE_USER_STATS',
time.time(), gsr.elapsed)

def _get_and_store_range(user_id, trip_key):
"""
Extends the user profile with pipeline_range, total_trips, labeled_trips, and last_call.
Parameters:
- user_id (str): The UUID of the user.
- trip_key (str): The key representing the trip data in the time series.
"""

try:
logging.info(f"Starting _get_and_store_range for user_id: {user_id}, trip_key: {trip_key}")

# Fetch the time series for the user
ts = esta.TimeSeries.get_time_series(user_id)
logging.debug("Fetched time series data.")

# Get start timestamp
start_ts = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING)
start_ts = None if start_ts == -1 else start_ts
logging.debug(f"Start timestamp: {start_ts}")

# Get end timestamp
end_ts = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING)
end_ts = None if end_ts == -1 else end_ts
logging.debug(f"End timestamp: {end_ts}")

# Retrieve trip entries
total_trips = ts.find_entries_count(
key_list=["analysis/confirmed_trip"],
)

labeled_trips = ts.find_entries_count(
key_list=["analysis/confirmed_trip"],
extra_query_list=[{'data.user_input': {'$ne': {}}}]
)

logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}")
logging.info(type(user_id))
# Retrieve last GET and PUT calls from stats/server_api_time
docs_cursor = edb.get_timeseries_db().find({
"metadata.key": "stats/server_api_time",
"user_id" : user_id
})
logging.debug("Fetched API call statistics.")

last_get = None
last_put = None

for doc in docs_cursor:
api_call_name = doc.get("data", {}).get("name", "")
api_call_ts = doc.get("data", {}).get("ts")

if not api_call_ts:
logging.warning(f"Missing 'ts' in document: {doc}")
continue

if api_call_name.startswith("GET_"):
if not last_get or api_call_ts > last_get:
last_get = api_call_ts
logging.debug(f"Updated last_get to: {last_get}")
elif api_call_name.startswith("PUT_"):
if not last_put or api_call_ts > last_put:
last_put = api_call_ts
logging.debug(f"Updated last_put to: {last_put}")

# Determine the most recent call
if last_get and last_put:
last_call_ts = max(last_get, last_put)
else:
last_call_ts = last_get or last_put

logging.info(f"Last call timestamp: {last_call_ts}")

# Update the user profile with pipeline_range, total_trips, labeled_trips, and last_call
user = ecwu.User.fromUUID(user_id)
user.update({
"pipeline_range": {
"start_ts": start_ts,
"end_ts": end_ts
},
"total_trips": total_trips,
"labeled_trips": labeled_trips,
"last_call": last_call_ts
})
logging.debug("User profile updated successfully.")
logging.debug("After updating, new profile is %s", user.getProfile())

except Exception as e:
logging.error(f"Error in _get_and_store_range for user_id {user_id}: {e}")
time.time(), gsr.elapsed)

0 comments on commit 0a39517

Please sign in to comment.