From 0d2a2ed3072b49f3d031559379479ff6c2c2ade3 Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Sat, 23 Nov 2024 10:01:37 -0800 Subject: [PATCH 1/2] Reduce timing measurement to bottom 80% of tracked functions - Removed timing for less significant functions in TRIP_SEGMENTATION pipeline - Focused retained measurements on key contributors - Prepare for local testing to validate `create_time_filter` triggering - Will explore top 20% timing optimizations in a follow-up commit --- .../intake/segmentation/trip_segmentation.py | 61 ++++----- .../dwell_segmentation_dist_filter.py | 116 +++++++----------- 2 files changed, 65 insertions(+), 112 deletions(-) diff --git a/emission/analysis/intake/segmentation/trip_segmentation.py b/emission/analysis/intake/segmentation/trip_segmentation.py index 7b47bd49b..50534f34c 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation.py +++ b/emission/analysis/intake/segmentation/trip_segmentation.py @@ -51,37 +51,27 @@ def segment_into_trips(self, timeseries, time_query): pass def segment_current_trips(user_id): - with ect.Timer() as t_get_time_series: - ts = esta.TimeSeries.get_time_series(user_id) - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_series", time.time(), t_get_time_series.elapsed) + ts = esta.TimeSeries.get_time_series(user_id) - with ect.Timer() as t_get_time_range: - time_query = epq.get_time_range_for_segmentation(user_id) - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_range_for_segmentation", time.time(), t_get_time_range.elapsed) + time_query = epq.get_time_range_for_segmentation(user_id) import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_time_filter as dstf import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_dist_filter as dsdf - with ect.Timer() as t_create_time_filter: - dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold=5 * 60, # 5 mins - point_threshold=9, - distance_threshold=100) # 100 m - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_time_filter", time.time(), t_create_time_filter.elapsed) + dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold=5 * 60, # 5 mins + point_threshold=9, + distance_threshold=100) # 100 m - with ect.Timer() as t_create_dist_filter: - dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold=10 * 60, # 10 mins - point_threshold=9, - distance_threshold=50) # 50 m - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_dist_filter", time.time(), t_create_dist_filter.elapsed) + dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold=10 * 60, # 10 mins + point_threshold=9, + distance_threshold=50) # 50 m filter_methods = {"time": dstfsm, "distance": dsdfsm} filter_method_names = {"time": "DwellSegmentationTimeFilter", "distance": "DwellSegmentationDistFilter"} # We need to use the appropriate filter based on the incoming data # So let's read in the location points for the specified query - with ect.Timer() as t_get_data_df: - loc_df = ts.get_data_df("background/filtered_location", time_query) - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_data_df", time.time(), t_get_data_df.elapsed) + loc_df = ts.get_data_df("background/filtered_location", time_query) if len(loc_df) == 0: # no new segments, no need to keep looking at these again @@ -89,24 +79,21 @@ def segment_current_trips(user_id): epq.mark_segmentation_done(user_id, None) return - with ect.Timer() as t_handle_out_of_order: - out_of_order_points = loc_df[loc_df.ts.diff() < 0] - if len(out_of_order_points) > 0: - logging.info("Found out of order points!") - logging.info("%s" % out_of_order_points) - # drop from the table - loc_df = loc_df.drop(out_of_order_points.index.tolist()) - loc_df.reset_index(inplace=True) - # invalidate in the database. - out_of_order_id_list = out_of_order_points["_id"].tolist() - logging.debug("out_of_order_id_list = %s" % out_of_order_id_list) - for ooid in out_of_order_id_list: - ts.invalidate_raw_entry(ooid) - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/handle_out_of_order_points", time.time(), t_handle_out_of_order.elapsed) - - with ect.Timer() as t_get_filters: - filters_in_df = loc_df["filter"].dropna().unique() - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_filters_in_df", time.time(), t_get_filters.elapsed) + out_of_order_points = loc_df[loc_df.ts.diff() < 0] + if len(out_of_order_points) > 0: + logging.info("Found out of order points!") + logging.info("%s" % out_of_order_points) + # drop from the table + loc_df = loc_df.drop(out_of_order_points.index.tolist()) + loc_df.reset_index(inplace=True) + # invalidate in the database. + out_of_order_id_list = out_of_order_points["_id"].tolist() + logging.debug("out_of_order_id_list = %s" % out_of_order_id_list) + for ooid in out_of_order_id_list: + ts.invalidate_raw_entry(ooid) + + + filters_in_df = loc_df["filter"].dropna().unique() logging.debug("Filters in the dataframe = %s" % filters_in_df) if len(filters_in_df) == 1: diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py index 2ffd6f058..eda5f19f0 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py @@ -60,23 +60,9 @@ def segment_into_trips(self, timeseries, time_query): t_get_filtered_points.elapsed ) - with ect.Timer() as t_mark_valid: - self.filtered_points_df.loc[:, "valid"] = True - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/mark_valid", - time.time(), - t_mark_valid.elapsed - ) + self.filtered_points_df.loc[:, "valid"] = True - with ect.Timer() as t_get_transition_df: - self.transition_df = timeseries.get_data_df("statemachine/transition", time_query) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/get_transition_df", - time.time(), - t_get_transition_df.elapsed - ) + self.transition_df = timeseries.get_data_df("statemachine/transition", time_query) if len(self.transition_df) > 0: logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]]) @@ -119,14 +105,7 @@ def segment_into_trips(self, timeseries, time_query): # else: sel_point = currPoint logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx)) - with ect.Timer() as t_set_start_point: - curr_trip_start_point = sel_point - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/set_new_trip_start_point", - time.time(), - t_set_start_point.elapsed - ) + curr_trip_start_point = sel_point just_ended = False else: with ect.Timer() as t_process_trip: @@ -186,57 +165,44 @@ def segment_into_trips(self, timeseries, time_query): t_loop.elapsed ) - with ect.Timer() as t_post_loop: - # Since we only end a trip when we start a new trip, this means that - # the last trip that was pushed is ignored. Consider the example of - # 2016-02-22 when I took kids to karate. We arrived shortly after 4pm, - # so during that remote push, a trip end was not detected. And we got - # back home shortly after 5pm, so the trip end was only detected on the - # phone at 6pm. At that time, the following points were pushed: - # ..., 2016-02-22T16:04:02, 2016-02-22T16:49:34, 2016-02-22T16:49:50, - # ..., 2016-02-22T16:57:04 - # Then, on the server, while iterating through the points, we detected - # a trip end at 16:04, and a new trip start at 16:49. But we did not - # detect the trip end at 16:57, because we didn't start a new point. - # This has two issues: - # - we won't see this trip until the next trip start, which may be on the next day - # - we won't see this trip at all, because when we run the pipeline the - # next time, we will only look at points from that time onwards. These - # points have been marked as "processed", so they won't even be considered. - - # There are multiple potential fixes: - # - we can mark only the completed trips as processed. This will solve (2) above, but not (1) - # - we can mark a trip end based on the fact that we only push data - # when a trip ends, so if we have data, it means that the trip has been - # detected as ended on the phone. - # This seems a bit fragile - what if we start pushing incomplete trip - # data for efficiency reasons? Therefore, we also check to see if there - # is a trip_end_detected in this timeframe after the last point. If so, - # then we end the trip at the last point that we have. - if not just_ended and len(self.transition_df) > 0: - with ect.Timer() as t_check_transitions: - stopped_moving_after_last = self.transition_df[ - (self.transition_df.ts > currPoint.ts) & (self.transition_df.transition == 2) - ] - logging.debug("stopped_moving_after_last = %s" % stopped_moving_after_last[["fmt_time", "transition"]]) - if len(stopped_moving_after_last) > 0: - logging.debug("Found %d transitions after last point, ending trip..." % len(stopped_moving_after_last)) - segmentation_points.append((curr_trip_start_point, currPoint)) - self.last_ts_processed = currPoint.metadata_write_ts - else: - logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last)) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/check_transitions_post_loop", - time.time(), - t_check_transitions.elapsed - ) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/post_loop", - time.time(), - t_post_loop.elapsed - ) + + # Since we only end a trip when we start a new trip, this means that + # the last trip that was pushed is ignored. Consider the example of + # 2016-02-22 when I took kids to karate. We arrived shortly after 4pm, + # so during that remote push, a trip end was not detected. And we got + # back home shortly after 5pm, so the trip end was only detected on the + # phone at 6pm. At that time, the following points were pushed: + # ..., 2016-02-22T16:04:02, 2016-02-22T16:49:34, 2016-02-22T16:49:50, + # ..., 2016-02-22T16:57:04 + # Then, on the server, while iterating through the points, we detected + # a trip end at 16:04, and a new trip start at 16:49. But we did not + # detect the trip end at 16:57, because we didn't start a new point. + # This has two issues: + # - we won't see this trip until the next trip start, which may be on the next day + # - we won't see this trip at all, because when we run the pipeline the + # next time, we will only look at points from that time onwards. These + # points have been marked as "processed", so they won't even be considered. + + # There are multiple potential fixes: + # - we can mark only the completed trips as processed. This will solve (2) above, but not (1) + # - we can mark a trip end based on the fact that we only push data + # when a trip ends, so if we have data, it means that the trip has been + # detected as ended on the phone. + # This seems a bit fragile - what if we start pushing incomplete trip + # data for efficiency reasons? Therefore, we also check to see if there + # is a trip_end_detected in this timeframe after the last point. If so, + # then we end the trip at the last point that we have. + if not just_ended and len(self.transition_df) > 0: + stopped_moving_after_last = self.transition_df[ + (self.transition_df.ts > currPoint.ts) & (self.transition_df.transition == 2) + ] + logging.debug("stopped_moving_after_last = %s" % stopped_moving_after_last[["fmt_time", "transition"]]) + if len(stopped_moving_after_last) > 0: + logging.debug("Found %d transitions after last point, ending trip..." % len(stopped_moving_after_last)) + segmentation_points.append((curr_trip_start_point, currPoint)) + self.last_ts_processed = currPoint.metadata_write_ts + else: + logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last)) return segmentation_points From 87dad3c5f763bcac966d92654c0853c5571cf9c7 Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Sat, 23 Nov 2024 10:35:33 -0800 Subject: [PATCH 2/2] Refine timing measurement by removing low-significance functions - Removed tracking for additional functions identified as low-impact during iOS and Android local testing. - Retained tracking for key contributors to overall execution time. - Suggested broader retesting on staging, production, or different datasets to validate changes. --- .../dwell_segmentation_dist_filter.py | 64 +++++-------- .../dwell_segmentation_time_filter.py | 89 +++++++------------ 2 files changed, 56 insertions(+), 97 deletions(-) diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py index eda5f19f0..0363f4b74 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py @@ -89,14 +89,8 @@ def segment_into_trips(self, timeseries, time_query): # segmentation_points.append(currPoint) if just_ended: - with ect.Timer() as t_continue_just_ended: - continue_flag = self.continue_just_ended(idx, currPoint, self.filtered_points_df) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/continue_just_ended", - time.time(), - t_continue_just_ended.elapsed - ) + + continue_flag = self.continue_just_ended(idx, currPoint, self.filtered_points_df) if continue_flag: # We have "processed" the currPoint by deciding to glom it @@ -126,38 +120,28 @@ def segment_into_trips(self, timeseries, time_query): ) if trip_ended: - with ect.Timer() as t_get_last_trip_end_point: - last_trip_end_point = lastPoint - logging.debug("Appending last_trip_end_point %s with index %s " % - (last_trip_end_point, idx - 1)) - segmentation_points.append((curr_trip_start_point, last_trip_end_point)) - logging.info("Found trip end at %s" % last_trip_end_point.fmt_time) - # We have processed everything up to the trip end by marking it as a completed trip - self.last_ts_processed = currPoint.metadata_write_ts - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/get_last_trip_end_point", - time.time(), - t_get_last_trip_end_point.elapsed - ) - - with ect.Timer() as t_handle_trip_end: - just_ended = True - # Now, we have finished processing the previous point as a trip - # end or not. But we still need to process this point by seeing - # whether it should represent a new trip start, or a glom to the - # previous trip - if not self.continue_just_ended(idx, currPoint, self.filtered_points_df): - sel_point = currPoint - logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx)) - curr_trip_start_point = sel_point - just_ended = False - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/handle_trip_end", - time.time(), - t_handle_trip_end.elapsed - ) + + last_trip_end_point = lastPoint + logging.debug("Appending last_trip_end_point %s with index %s " % + (last_trip_end_point, idx - 1)) + segmentation_points.append((curr_trip_start_point, last_trip_end_point)) + logging.info("Found trip end at %s" % last_trip_end_point.fmt_time) + # We have processed everything up to the trip end by marking it as a completed trip + self.last_ts_processed = currPoint.metadata_write_ts + + + + just_ended = True + # Now, we have finished processing the previous point as a trip + # end or not. But we still need to process this point by seeing + # whether it should represent a new trip start, or a glom to the + # previous trip + if not self.continue_just_ended(idx, currPoint, self.filtered_points_df): + sel_point = currPoint + logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx)) + curr_trip_start_point = sel_point + just_ended = False + esds.store_pipeline_time( user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/loop", diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py index 3febdca20..9703197f0 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py @@ -65,40 +65,20 @@ def segment_into_trips(self, timeseries, time_query): data that they want from the sensor streams in order to determine the segmentation points. """ - with ect.Timer() as t_get_filtered_points_pre: - filtered_points_pre_ts_diff_df = timeseries.get_data_df("background/filtered_location", time_query) - user_id = filtered_points_pre_ts_diff_df["user_id"].iloc[0] - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_filtered_points_pre_ts_diff_df", - time.time(), - t_get_filtered_points_pre.elapsed - ) - with ect.Timer() as t_filter_bogus_points: - # Sometimes, we can get bogus points because data.ts and - # metadata.write_ts are off by a lot. If we don't do this, we end up - # appearing to travel back in time - # https://github.com/e-mission/e-mission-server/issues/457 - filtered_points_df = filtered_points_pre_ts_diff_df[ - (filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000 - ] - filtered_points_df.reset_index(inplace=True) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/filter_bogus_points", - time.time(), - t_filter_bogus_points.elapsed - ) + filtered_points_pre_ts_diff_df = timeseries.get_data_df("background/filtered_location", time_query) + user_id = filtered_points_pre_ts_diff_df["user_id"].iloc[0] - with ect.Timer() as t_get_transition_df: - transition_df = timeseries.get_data_df("statemachine/transition", time_query) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_transition_df", - time.time(), - t_get_transition_df.elapsed - ) + # Sometimes, we can get bogus points because data.ts and + # metadata.write_ts are off by a lot. If we don't do this, we end up + # appearing to travel back in time + # https://github.com/e-mission/e-mission-server/issues/457 + filtered_points_df = filtered_points_pre_ts_diff_df[ + (filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000 + ] + filtered_points_df.reset_index(inplace=True) + + transition_df = timeseries.get_data_df("statemachine/transition", time_query) if len(transition_df) > 0: logging.debug("transition_df = %s" % transition_df[["fmt_time", "transition"]]) @@ -216,31 +196,26 @@ def segment_into_trips(self, timeseries, time_query): t_loop.elapsed ) - with ect.Timer() as t_post_loop: - logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" % - (just_ended, len(transition_df))) - if not just_ended and len(transition_df) > 0: - stopped_moving_after_last = transition_df[ - (transition_df.ts > currPoint.ts) & (transition_df.transition == 2) - ] - logging.debug("looking after %s, found transitions %s" % - (currPoint.ts, stopped_moving_after_last)) - if len(stopped_moving_after_last) > 0: - (unused, last_trip_end_point) = self.get_last_trip_end_point( - filtered_points_df, - last10Points_df, - None - ) - segmentation_points.append((curr_trip_start_point, last_trip_end_point)) - logging.debug("Found trip end at %s" % last_trip_end_point.fmt_time) - # We have processed everything up to the trip end by marking it as a completed trip - self.last_ts_processed = currPoint.metadata_write_ts - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/post_loop", - time.time(), - t_post_loop.elapsed - ) + + logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" % + (just_ended, len(transition_df))) + if not just_ended and len(transition_df) > 0: + stopped_moving_after_last = transition_df[ + (transition_df.ts > currPoint.ts) & (transition_df.transition == 2) + ] + logging.debug("looking after %s, found transitions %s" % + (currPoint.ts, stopped_moving_after_last)) + if len(stopped_moving_after_last) > 0: + (unused, last_trip_end_point) = self.get_last_trip_end_point( + filtered_points_df, + last10Points_df, + None + ) + segmentation_points.append((curr_trip_start_point, last_trip_end_point)) + logging.debug("Found trip end at %s" % last_trip_end_point.fmt_time) + # We have processed everything up to the trip end by marking it as a completed trip + self.last_ts_processed = currPoint.metadata_write_ts + return segmentation_points