Skip to content

Commit

Permalink
added instrumentation to the _get_and_store_range
Browse files Browse the repository at this point in the history
Discussed with Jack, approved with this change to clarify pipeline stage

Added total, labeled, and last call

Modified total and labeled trips to match op-admin implementation
  • Loading branch information
TeachMeTW committed Dec 13, 2024
1 parent b15fcb9 commit b4a2ced
Showing 1 changed file with 96 additions and 12 deletions.
108 changes: 96 additions & 12 deletions emission/pipeline/intake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from uuid import UUID
import time
import pymongo
from datetime import datetime

import emission.core.get_database as edb
import emission.core.timer as ect
Expand Down Expand Up @@ -198,17 +199,100 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data):
esds.store_pipeline_time(uuid, ecwp.PipelineStages.CREATE_COMPOSITE_OBJECTS.name,
time.time(), crt.elapsed)

_get_and_store_range(uuid, "analysis/composite_trip")
with ect.Timer() as gsr:
logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
_get_and_store_range(uuid, "analysis/composite_trip")

esds.store_pipeline_time(uuid, 'STORE_USER_STATS',
time.time(), gsr.elapsed)

def _get_and_store_range(user_id, trip_key):
ts = esta.TimeSeries.get_time_series(user_id)
start_ts = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING)
if start_ts == -1:
start_ts = None
end_ts = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING)
if end_ts == -1:
end_ts = None

user = ecwu.User(user_id)
user.update({"pipeline_range": {"start_ts": start_ts, "end_ts": end_ts}})
logging.debug("After updating, new profiles is %s" % user.getProfile())
"""
Extends the user profile with pipeline_range, total_trips, labeled_trips, and last_call.
Parameters:
- user_id (str): The UUID of the user.
- trip_key (str): The key representing the trip data in the time series.
"""

try:
logging.info(f"Starting _get_and_store_range for user_id: {user_id}, trip_key: {trip_key}")

# Fetch the time series for the user
ts = esta.TimeSeries.get_time_series(user_id)
logging.debug("Fetched time series data.")

# Get start timestamp
start_ts = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING)
start_ts = None if start_ts == -1 else start_ts
logging.debug(f"Start timestamp: {start_ts}")

# Get end timestamp
end_ts = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING)
end_ts = None if end_ts == -1 else end_ts
logging.debug(f"End timestamp: {end_ts}")

# Retrieve trip entries
total_trips = ts.find_entries_count(
key_list=["analysis/confirmed_trip"],
)

labeled_trips = ts.find_entries_count(
key_list=["analysis/confirmed_trip"],
extra_query_list=[{'data.user_input': {'$ne': {}}}]
)

logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}")
logging.info(type(user_id))
# Retrieve last GET and PUT calls from stats/server_api_time
docs_cursor = edb.get_timeseries_db().find({
"metadata.key": "stats/server_api_time",
"user_id" : user_id
})
logging.debug("Fetched API call statistics.")

last_get = None
last_put = None

for doc in docs_cursor:
api_call_name = doc.get("data", {}).get("name", "")
api_call_ts = doc.get("data", {}).get("ts")

if not api_call_ts:
logging.warning(f"Missing 'ts' in document: {doc}")
continue

if api_call_name.startswith("GET_"):
if not last_get or api_call_ts > last_get:
last_get = api_call_ts
logging.debug(f"Updated last_get to: {last_get}")
elif api_call_name.startswith("PUT_"):
if not last_put or api_call_ts > last_put:
last_put = api_call_ts
logging.debug(f"Updated last_put to: {last_put}")

# Determine the most recent call
if last_get and last_put:
last_call_ts = max(last_get, last_put)
else:
last_call_ts = last_get or last_put

logging.info(f"Last call timestamp: {last_call_ts}")

# Update the user profile with pipeline_range, total_trips, labeled_trips, and last_call
user = ecwu.User.fromUUID(user_id)
user.update({
"pipeline_range": {
"start_ts": start_ts,
"end_ts": end_ts
},
"total_trips": total_trips,
"labeled_trips": labeled_trips,
"last_call": last_call_ts
})
logging.debug("User profile updated successfully.")
logging.debug("After updating, new profile is %s", user.getProfile())

except Exception as e:
logging.error(f"Error in _get_and_store_range for user_id {user_id}: {e}")

0 comments on commit b4a2ced

Please sign in to comment.