diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py index e6349f1ae..cecaf028b 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py @@ -69,7 +69,12 @@ def segment_into_trips(self, timeseries, time_query): with ect.Timer() as t_get_filtered_points_pre_ts_diff_df: filtered_points_pre_ts_diff_df = timeseries.get_data_df("background/filtered_location", time_query) user_id = filtered_points_pre_ts_diff_df.iloc[0]['user_id'] if not filtered_points_pre_ts_diff_df.empty else None - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_filtered_location", time.time(), t_get_filtered_points_pre_ts_diff_df.elapsed) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_filtered_location", + time.time(), + t_get_filtered_points_pre_ts_diff_df.elapsed + ) # Timer for filtering out bogus points with ect.Timer() as t_filter_bogus_points: @@ -77,14 +82,26 @@ def segment_into_trips(self, timeseries, time_query): # metadata.write_ts are off by a lot. If we don't do this, we end up # appearing to travel back in time # https://github.com/e-mission/e-mission-server/issues/457 - filtered_points_df = filtered_points_pre_ts_diff_df[(filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000] + filtered_points_df = filtered_points_pre_ts_diff_df[ + (filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000 + ] filtered_points_df.reset_index(inplace=True) - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/filter_bogus_points", time.time(), t_filter_bogus_points.elapsed) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/filter_bogus_points", + time.time(), + t_filter_bogus_points.elapsed + ) # Timer for fetching transition data with ect.Timer() as t_get_transition_df: transition_df = timeseries.get_data_df("statemachine/transition", time_query) - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_transition_df", time.time(), t_get_transition_df.elapsed) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_transition_df", + time.time(), + t_get_transition_df.elapsed + ) if len(transition_df) > 0: logging.debug("transition_df = %s" % transition_df[["fmt_time", "transition"]]) @@ -127,11 +144,18 @@ def segment_into_trips(self, timeseries, time_query): # Timer for calculating last 5 minutes points with ect.Timer() as t_calculate_last5MinsPoints: last5MinsPoints_df = filtered_points_df[np.logical_and( - np.logical_and( - filtered_points_df.ts > currPoint.ts - self.time_threshold, - filtered_points_df.ts < currPoint.ts), - filtered_points_df.ts >= curr_trip_start_point.ts)] - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculate_last5MinsPoints", time.time(), t_calculate_last5MinsPoints.elapsed) + np.logical_and( + filtered_points_df.ts > currPoint.ts - self.time_threshold, + filtered_points_df.ts < currPoint.ts + ), + filtered_points_df.ts >= curr_trip_start_point.ts + )] + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculate_last5MinsPoints", + time.time(), + t_calculate_last5MinsPoints.elapsed + ) # Timer for selecting last 10 points with ect.Timer() as t_select_last10Points: @@ -140,8 +164,13 @@ def segment_into_trips(self, timeseries, time_query): # So we reset_index upstream and use it here. # We are going to use the last 8 points for now. # TODO: Change this back to last 10 points once we normalize phone and this - last10Points_df = filtered_points_df.iloc[max(idx-self.point_threshold, curr_trip_start_point.idx):idx+1] - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/select_last10Points", time.time(), t_select_last10Points.elapsed) + last10Points_df = filtered_points_df.iloc[max(idx - self.point_threshold, curr_trip_start_point.idx):idx + 1] + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/select_last10Points", + time.time(), + t_select_last10Points.elapsed + ) distanceToLast = lambda row: pf.calDistance(ad.AttrDict(row), currPoint) timeToLast = lambda row: currPoint.ts - ad.AttrDict(row).ts @@ -149,14 +178,24 @@ def segment_into_trips(self, timeseries, time_query): # Timer for calculating last 5 minutes distances with ect.Timer() as t_calculate_last5MinsDistances: last5MinsDistances = last5MinsPoints_df.apply(distanceToLast, axis=1) - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculate_last5MinsDistances", time.time(), t_calculate_last5MinsDistances.elapsed) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculate_last5MinsDistances", + time.time(), + t_calculate_last5MinsDistances.elapsed + ) logging.debug("last5MinsDistances = %s with length %d" % (last5MinsDistances.to_numpy(), len(last5MinsDistances))) # Timer for calculating last 10 points distances with ect.Timer() as t_calculate_last10PointsDistances: last10PointsDistances = last10Points_df.apply(distanceToLast, axis=1) - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculate_last10PointsDistances", time.time(), t_calculate_last10PointsDistances.elapsed) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculate_last10PointsDistances", + time.time(), + t_calculate_last10PointsDistances.elapsed + ) logging.debug("last10PointsDistances = %s with length %d, shape %s" % (last10PointsDistances.to_numpy(), len(last10PointsDistances), @@ -166,7 +205,12 @@ def segment_into_trips(self, timeseries, time_query): with ect.Timer() as t_calculate_last5MinTimes: # Fix for https://github.com/e-mission/e-mission-server/issues/348 last5MinTimes = last5MinsPoints_df.apply(timeToLast, axis=1) - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculate_last5MinTimes", time.time(), t_calculate_last5MinTimes.elapsed) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculate_last5MinTimes", + time.time(), + t_calculate_last5MinTimes.elapsed + ) logging.debug("len(last10PointsDistances) = %d, len(last5MinsDistances) = %d" % (len(last10PointsDistances), len(last5MinsDistances))) @@ -176,14 +220,24 @@ def segment_into_trips(self, timeseries, time_query): # Timer for checking if trip has ended with ect.Timer() as t_has_trip_ended: trip_ended = self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes) - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/has_trip_ended", time.time(), t_has_trip_ended.elapsed) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/has_trip_ended", + time.time(), + t_has_trip_ended.elapsed + ) if trip_ended: # Timer for getting last trip end point with ect.Timer() as t_get_last_trip_end_point: (ended_before_this, last_trip_end_point) = self.get_last_trip_end_point(filtered_points_df, last10Points_df, last5MinsPoints_df) - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_last_trip_end_point", time.time(), t_get_last_trip_end_point.elapsed) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_last_trip_end_point", + time.time(), + t_get_last_trip_end_point.elapsed + ) segmentation_points.append((curr_trip_start_point, last_trip_end_point)) logging.info("Found trip end at %s" % last_trip_end_point.fmt_time) @@ -200,20 +254,40 @@ def segment_into_trips(self, timeseries, time_query): curr_trip_start_point = currPoint logging.debug("Setting new trip start point %s with idx %s" % (currPoint, currPoint.idx)) - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/set_new_trip_start_before", time.time(), t_set_new_trip_start_before.elapsed) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/set_new_trip_start_before", + time.time(), + t_set_new_trip_start_before.elapsed + ) else: # Timer for setting new trip start point in else case with ect.Timer() as t_set_new_trip_start_else: # We end a trip at the current point, and the next trip starts at the next point just_ended = True prevPoint = None - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/set_new_trip_start_else", time.time(), t_set_new_trip_start_else.elapsed) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/set_new_trip_start_else", + time.time(), + t_set_new_trip_start_else.elapsed + ) else: prevPoint = currPoint # Store elapsed time for processing the row - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/process_row", time.time(), t_process_row.elapsed) - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/loop_over_points", time.time(), t_loop_over_points.elapsed) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/process_row", + time.time(), + t_process_row.elapsed + ) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/loop_over_points", + time.time(), + t_loop_over_points.elapsed + ) logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" % (just_ended, len(transition_df))) @@ -230,10 +304,16 @@ def segment_into_trips(self, timeseries, time_query): logging.debug("Found trip end at %s" % last_trip_end_point.fmt_time) # We have "processed" everything up to the trip end by marking it as a completed trip self.last_ts_processed = currPoint.metadata_write_ts - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/handle_final_trip_end", time.time(), t_handle_final_trip_end.elapsed) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/handle_final_trip_end", + time.time(), + t_handle_final_trip_end.elapsed + ) return segmentation_points + def continue_just_ended(self, idx, currPoint, filtered_points_df): """ Normally, since the logic here and the