Skip to content

Commit

Permalink
Add per-operation timing to segment_current_trips using ect.Timer
Browse files Browse the repository at this point in the history
- Wrapped each significant operation within the `segment_current_trips` function with `ect.Timer` context managers.
- Named each timer using the pattern `ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/operation"` for consistent identification.
- After each timed block, recorded the elapsed time by calling `esds.store_pipeline_time` with the appropriate parameters.
- Ensured that only timing-related code was added without altering existing logic, error handling, or formatting.

This enhancement enables granular performance monitoring of the trip segmentation process, allowing for better identification of potential bottlenecks and optimization opportunities.
  • Loading branch information
TeachMeTW committed Nov 2, 2024
1 parent b15fcb9 commit 013d099
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 53 deletions.
226 changes: 174 additions & 52 deletions emission/analysis/intake/segmentation/trip_segmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from builtins import *
from builtins import object
import logging
import time

import emission.storage.timeseries.abstract_timeseries as esta
import emission.storage.decorations.place_queries as esdp
Expand All @@ -23,6 +24,9 @@
import emission.analysis.intake.segmentation.restart_checking as eaisr

import emission.core.common as ecc
import emission.storage.decorations.stats_queries as esds
import emission.core.timer as ect
import emission.core.wrapper.pipelinestate as ecwp

class TripSegmentationMethod(object):
def segment_into_trips(self, timeseries, time_query):
Expand All @@ -47,68 +51,186 @@ def segment_into_trips(self, timeseries, time_query):
pass

def segment_current_trips(user_id):
ts = esta.TimeSeries.get_time_series(user_id)
time_query = epq.get_time_range_for_segmentation(user_id)
with ect.Timer(ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_series") as timer_get_time_series:
ts = esta.TimeSeries.get_time_series(user_id)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_series",
time.time(),
timer_get_time_series.elapsed
)

with ect.Timer(ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_range") as timer_get_time_range:
time_query = epq.get_time_range_for_segmentation(user_id)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_range",
time.time(),
timer_get_time_range.elapsed
)

import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_time_filter as dstf
import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_dist_filter as dsdf
dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold = 5 * 60, # 5 mins
point_threshold = 9,
distance_threshold = 100) # 100 m

dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold = 10 * 60, # 10 mins
point_threshold = 9,
distance_threshold = 50) # 50 m

filter_methods = {"time": dstfsm, "distance": dsdfsm}
filter_method_names = {"time": "DwellSegmentationTimeFilter", "distance": "DwellSegmentationDistFilter"}
# We need to use the appropriate filter based on the incoming data
# So let's read in the location points for the specified query
loc_df = ts.get_data_df("background/filtered_location", time_query)


with ect.Timer(ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/initialize_filters") as timer_initialize_filters:
dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold=5 * 60, # 5 mins
point_threshold=9,
distance_threshold=100) # 100 m

dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold=10 * 60, # 10 mins
point_threshold=9,
distance_threshold=50) # 50 m
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/initialize_filters",
time.time(),
timer_initialize_filters.elapsed
)

with ect.Timer(ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/setup_filter_methods") as timer_setup_filter_methods:
filter_methods = {"time": dstfsm, "distance": dsdfsm}
filter_method_names = {"time": "DwellSegmentationTimeFilter", "distance": "DwellSegmentationDistFilter"}
# We need to use the appropriate filter based on the incoming data
# So let's read in the location points for the specified query
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/setup_filter_methods",
time.time(),
timer_setup_filter_methods.elapsed
)

with ect.Timer(ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/fetch_location_data") as timer_fetch_location_data:
loc_df = ts.get_data_df("background/filtered_location", time_query)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/fetch_location_data",
time.time(),
timer_fetch_location_data.elapsed
)

if len(loc_df) == 0:
# no new segments, no need to keep looking at these again
logging.debug("len(loc_df) == 0, early return")
epq.mark_segmentation_done(user_id, None)
with ect.Timer(ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/early_return_no_data") as timer_early_return_no_data:
# no new segments, no need to keep looking at these again
logging.debug("len(loc_df) == 0, early return")
epq.mark_segmentation_done(user_id, None)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/early_return_no_data",
time.time(),
timer_early_return_no_data.elapsed
)
return

out_of_order_points = loc_df[loc_df.ts.diff() < 0]
with ect.Timer(ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/check_out_of_order_points") as timer_check_out_of_order:
out_of_order_points = loc_df[loc_df.ts.diff() < 0]
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/check_out_of_order_points",
time.time(),
timer_check_out_of_order.elapsed
)

if len(out_of_order_points) > 0:
logging.info("Found out of order points!")
logging.info("%s" % out_of_order_points)
# drop from the table
loc_df = loc_df.drop(out_of_order_points.index.tolist())
loc_df.reset_index(inplace=True)
# invalidate in the database.
out_of_order_id_list = out_of_order_points["_id"].tolist()
logging.debug("out_of_order_id_list = %s" % out_of_order_id_list)
for ooid in out_of_order_id_list:
ts.invalidate_raw_entry(ooid)

filters_in_df = loc_df["filter"].dropna().unique()
logging.debug("Filters in the dataframe = %s" % filters_in_df)
with ect.Timer(ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/handle_out_of_order_points") as timer_handle_out_of_order:
logging.info("Found out of order points!")
logging.info("%s" % out_of_order_points)
# drop from the table
loc_df = loc_df.drop(out_of_order_points.index.tolist())
loc_df.reset_index(inplace=True)
# invalidate in the database.
out_of_order_id_list = out_of_order_points["_id"].tolist()
logging.debug("out_of_order_id_list = %s" % out_of_order_id_list)
for ooid in out_of_order_id_list:
ts.invalidate_raw_entry(ooid)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/handle_out_of_order_points",
time.time(),
timer_handle_out_of_order.elapsed
)

with ect.Timer(ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/identify_active_filters") as timer_identify_filters:
filters_in_df = loc_df["filter"].dropna().unique()
logging.debug("Filters in the dataframe = %s" % filters_in_df)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/identify_active_filters",
time.time(),
timer_identify_filters.elapsed
)

if len(filters_in_df) == 1:
# Common case - let's make it easy

segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(ts,
time_query)
else:
segmentation_points = get_combined_segmentation_points(ts, loc_df, time_query,
filters_in_df,
filter_methods)
# Create and store trips and places based on the segmentation points
if segmentation_points is None:
epq.mark_segmentation_failed(user_id)
elif len(segmentation_points) == 0:
# no new segments, no need to keep looking at these again
logging.debug("len(segmentation_points) == 0, early return")
epq.mark_segmentation_done(user_id, None)
with ect.Timer(ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_single_filter") as timer_segment_single_filter:
segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(ts, time_query)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_single_filter",
time.time(),
timer_segment_single_filter.elapsed
)
else:
try:
create_places_and_trips(user_id, segmentation_points, filter_method_names[filters_in_df[0]])
epq.mark_segmentation_done(user_id, get_last_ts_processed(filter_methods))
except:
logging.exception("Trip generation failed for user %s" % user_id)
with ect.Timer(ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_combined_filters") as timer_segment_combined_filters:
segmentation_points = get_combined_segmentation_points(ts, loc_df, time_query,
filters_in_df,
filter_methods)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_combined_filters",
time.time(),
timer_segment_combined_filters.elapsed
)

with ect.Timer(ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/process_segmentation_points") as timer_process_segmentation:
if segmentation_points is None:
epq.mark_segmentation_failed(user_id)
elif len(segmentation_points) == 0:
with ect.Timer(ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/early_return_no_segmentation") as timer_early_return_no_segmentation:
# no new segments, no need to keep looking at these again
logging.debug("len(segmentation_points) == 0, early return")
epq.mark_segmentation_done(user_id, None)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/early_return_no_segmentation",
time.time(),
timer_early_return_no_segmentation.elapsed
)
else:
try:
with ect.Timer(ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_places_and_trips") as timer_create_places_trips:
create_places_and_trips(user_id, segmentation_points, filter_method_names[filters_in_df[0]])
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_places_and_trips",
time.time(),
timer_create_places_trips.elapsed
)

with ect.Timer(ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/mark_segmentation_done") as timer_mark_done:
epq.mark_segmentation_done(user_id, get_last_ts_processed(filter_methods))
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/mark_segmentation_done",
time.time(),
timer_mark_done.elapsed
)
except:
with ect.Timer(ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/handle_segmentation_failure") as timer_handle_failure:
logging.exception("Trip generation failed for user %s" % user_id)
epq.mark_segmentation_failed(user_id)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/handle_segmentation_failure",
time.time(),
timer_handle_failure.elapsed
)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/process_segmentation_points",
time.time(),
timer_process_segmentation.elapsed
)


def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods):
"""
Expand Down
8 changes: 7 additions & 1 deletion emission/pipeline/intake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,13 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data):
esds.store_pipeline_time(uuid, ecwp.PipelineStages.CREATE_COMPOSITE_OBJECTS.name,
time.time(), crt.elapsed)

_get_and_store_range(uuid, "analysis/composite_trip")
with ect.Timer() as gsr:
logging.info("*" * 10 + "UUID %s: generating store and range " % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: generating store and range " % uuid + "*" * 10)
_get_and_store_range(uuid, "analysis/composite_trip")

esds.store_pipeline_time(uuid, 'GENERATE_STORE_AND_RANGE',
time.time(), gsr.elapsed)

def _get_and_store_range(user_id, trip_key):
ts = esta.TimeSeries.get_time_series(user_id)
Expand Down

0 comments on commit 013d099

Please sign in to comment.