Skip to content

Commit

Permalink
Added total, labeled, and last call
Browse files Browse the repository at this point in the history
  • Loading branch information
TeachMeTW committed Nov 8, 2024
1 parent ab3997e commit a5d15cd
Showing 1 changed file with 90 additions and 11 deletions.
101 changes: 90 additions & 11 deletions emission/pipeline/intake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from uuid import UUID
import time
import pymongo
from datetime import datetime

import emission.core.get_database as edb
import emission.core.timer as ect
Expand Down Expand Up @@ -207,14 +208,92 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data):
time.time(), gsr.elapsed)

def _get_and_store_range(user_id, trip_key):
ts = esta.TimeSeries.get_time_series(user_id)
start_ts = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING)
if start_ts == -1:
start_ts = None
end_ts = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING)
if end_ts == -1:
end_ts = None

user = ecwu.User(user_id)
user.update({"pipeline_range": {"start_ts": start_ts, "end_ts": end_ts}})
logging.debug("After updating, new profiles is %s" % user.getProfile())
"""
Extends the user profile with pipeline_range, total_trips, labeled_trips, and last_call.
Parameters:
- user_id (str): The UUID of the user.
- trip_key (str): The key representing the trip data in the time series.
"""

try:
logging.info(f"Starting _get_and_store_range for user_id: {user_id}, trip_key: {trip_key}")

# Fetch the time series for the user
ts = esta.TimeSeries.get_time_series(user_id)
logging.debug("Fetched time series data.")

# Get start timestamp
start_ts = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING)
start_ts = None if start_ts == -1 else start_ts
logging.debug(f"Start timestamp: {start_ts}")

# Get end timestamp
end_ts = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING)
end_ts = None if end_ts == -1 else end_ts
logging.debug(f"End timestamp: {end_ts}")

# Initialize counters
total_trips = 0
labeled_trips = 0

# Retrieve trip entries as an iterator
trip_entries = ts.find_entries([trip_key], time_query=None)

# Iterate through trip_entries once to count total_trips and labeled_trips
for trip in trip_entries:
total_trips += 1
if trip.get('data', {}).get('user_input'):
labeled_trips += 1
logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}")

# Retrieve last GET and PUT calls from stats/server_api_time
docs_cursor = edb.get_timeseries_db().find({
"metadata.key": "stats/server_api_time",
})
logging.debug("Fetched API call statistics.")

last_get = None
last_put = None

for doc in docs_cursor:
api_call_name = doc.get("data", {}).get("name", "")
api_call_ts = doc.get("data", {}).get("ts")

if not api_call_ts:
logging.warning(f"Missing 'ts' in document: {doc}")
continue

if api_call_name.startswith("GET_"):
if not last_get or api_call_ts > last_get:
last_get = api_call_ts
logging.debug(f"Updated last_get to: {last_get}")
elif api_call_name.startswith("PUT_"):
if not last_put or api_call_ts > last_put:
last_put = api_call_ts
logging.debug(f"Updated last_put to: {last_put}")

# Determine the most recent call
if last_get and last_put:
last_call_ts = max(last_get, last_put)
else:
last_call_ts = last_get or last_put

logging.info(f"Last call timestamp: {last_call_ts}")

# Update the user profile with pipeline_range, total_trips, labeled_trips, and last_call
user = ecwu.User.fromUUID(user_id)
user.update({
"pipeline_range": {
"start_ts": start_ts,
"end_ts": end_ts
},
"total_trips": total_trips,
"labeled_trips": labeled_trips,
"last_call": last_call_ts
})
logging.debug("User profile updated successfully.")
logging.debug("After updating, new profile is %s", user.getProfile())

except Exception as e:
logging.error(f"Error in _get_and_store_range for user_id {user_id}: {e}")

0 comments on commit a5d15cd

Please sign in to comment.