Skip to content

Commit

Permalink
Fixed the loop placement to access timer data
Browse files Browse the repository at this point in the history
  • Loading branch information
TeachMeTW committed Nov 6, 2024
1 parent eace26b commit cc2a352
Showing 1 changed file with 101 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,39 @@ def segment_into_trips(self, timeseries, time_query):
with ect.Timer() as t_get_filtered_points_pre_ts_diff_df:
filtered_points_pre_ts_diff_df = timeseries.get_data_df("background/filtered_location", time_query)
user_id = filtered_points_pre_ts_diff_df.iloc[0]['user_id'] if not filtered_points_pre_ts_diff_df.empty else None
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_filtered_location", time.time(), t_get_filtered_points_pre_ts_diff_df.elapsed)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_filtered_location",
time.time(),
t_get_filtered_points_pre_ts_diff_df.elapsed
)

# Timer for filtering out bogus points
with ect.Timer() as t_filter_bogus_points:
# Sometimes, we can get bogus points because data.ts and
# metadata.write_ts are off by a lot. If we don't do this, we end up
# appearing to travel back in time
# https://github.com/e-mission/e-mission-server/issues/457
filtered_points_df = filtered_points_pre_ts_diff_df[(filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000]
filtered_points_df = filtered_points_pre_ts_diff_df[
(filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000
]
filtered_points_df.reset_index(inplace=True)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/filter_bogus_points", time.time(), t_filter_bogus_points.elapsed)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/filter_bogus_points",
time.time(),
t_filter_bogus_points.elapsed
)

# Timer for fetching transition data
with ect.Timer() as t_get_transition_df:
transition_df = timeseries.get_data_df("statemachine/transition", time_query)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_transition_df", time.time(), t_get_transition_df.elapsed)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_transition_df",
time.time(),
t_get_transition_df.elapsed
)

if len(transition_df) > 0:
logging.debug("transition_df = %s" % transition_df[["fmt_time", "transition"]])
Expand Down Expand Up @@ -127,11 +144,18 @@ def segment_into_trips(self, timeseries, time_query):
# Timer for calculating last 5 minutes points
with ect.Timer() as t_calculate_last5MinsPoints:
last5MinsPoints_df = filtered_points_df[np.logical_and(
np.logical_and(
filtered_points_df.ts > currPoint.ts - self.time_threshold,
filtered_points_df.ts < currPoint.ts),
filtered_points_df.ts >= curr_trip_start_point.ts)]
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculate_last5MinsPoints", time.time(), t_calculate_last5MinsPoints.elapsed)
np.logical_and(
filtered_points_df.ts > currPoint.ts - self.time_threshold,
filtered_points_df.ts < currPoint.ts
),
filtered_points_df.ts >= curr_trip_start_point.ts
)]
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculate_last5MinsPoints",
time.time(),
t_calculate_last5MinsPoints.elapsed
)

# Timer for selecting last 10 points
with ect.Timer() as t_select_last10Points:
Expand All @@ -140,23 +164,38 @@ def segment_into_trips(self, timeseries, time_query):
# So we reset_index upstream and use it here.
# We are going to use the last 8 points for now.
# TODO: Change this back to last 10 points once we normalize phone and this
last10Points_df = filtered_points_df.iloc[max(idx-self.point_threshold, curr_trip_start_point.idx):idx+1]
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/select_last10Points", time.time(), t_select_last10Points.elapsed)
last10Points_df = filtered_points_df.iloc[max(idx - self.point_threshold, curr_trip_start_point.idx):idx + 1]
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/select_last10Points",
time.time(),
t_select_last10Points.elapsed
)

distanceToLast = lambda row: pf.calDistance(ad.AttrDict(row), currPoint)
timeToLast = lambda row: currPoint.ts - ad.AttrDict(row).ts

# Timer for calculating last 5 minutes distances
with ect.Timer() as t_calculate_last5MinsDistances:
last5MinsDistances = last5MinsPoints_df.apply(distanceToLast, axis=1)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculate_last5MinsDistances", time.time(), t_calculate_last5MinsDistances.elapsed)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculate_last5MinsDistances",
time.time(),
t_calculate_last5MinsDistances.elapsed
)

logging.debug("last5MinsDistances = %s with length %d" % (last5MinsDistances.to_numpy(), len(last5MinsDistances)))

# Timer for calculating last 10 points distances
with ect.Timer() as t_calculate_last10PointsDistances:
last10PointsDistances = last10Points_df.apply(distanceToLast, axis=1)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculate_last10PointsDistances", time.time(), t_calculate_last10PointsDistances.elapsed)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculate_last10PointsDistances",
time.time(),
t_calculate_last10PointsDistances.elapsed
)

logging.debug("last10PointsDistances = %s with length %d, shape %s" % (last10PointsDistances.to_numpy(),
len(last10PointsDistances),
Expand All @@ -166,7 +205,12 @@ def segment_into_trips(self, timeseries, time_query):
with ect.Timer() as t_calculate_last5MinTimes:
# Fix for https://github.com/e-mission/e-mission-server/issues/348
last5MinTimes = last5MinsPoints_df.apply(timeToLast, axis=1)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculate_last5MinTimes", time.time(), t_calculate_last5MinTimes.elapsed)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculate_last5MinTimes",
time.time(),
t_calculate_last5MinTimes.elapsed
)

logging.debug("len(last10PointsDistances) = %d, len(last5MinsDistances) = %d" %
(len(last10PointsDistances), len(last5MinsDistances)))
Expand All @@ -176,14 +220,24 @@ def segment_into_trips(self, timeseries, time_query):
# Timer for checking if trip has ended
with ect.Timer() as t_has_trip_ended:
trip_ended = self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/has_trip_ended", time.time(), t_has_trip_ended.elapsed)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/has_trip_ended",
time.time(),
t_has_trip_ended.elapsed
)

if trip_ended:
# Timer for getting last trip end point
with ect.Timer() as t_get_last_trip_end_point:
(ended_before_this, last_trip_end_point) = self.get_last_trip_end_point(filtered_points_df,
last10Points_df, last5MinsPoints_df)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_last_trip_end_point", time.time(), t_get_last_trip_end_point.elapsed)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_last_trip_end_point",
time.time(),
t_get_last_trip_end_point.elapsed
)

segmentation_points.append((curr_trip_start_point, last_trip_end_point))
logging.info("Found trip end at %s" % last_trip_end_point.fmt_time)
Expand All @@ -200,20 +254,40 @@ def segment_into_trips(self, timeseries, time_query):
curr_trip_start_point = currPoint
logging.debug("Setting new trip start point %s with idx %s" %
(currPoint, currPoint.idx))
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/set_new_trip_start_before", time.time(), t_set_new_trip_start_before.elapsed)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/set_new_trip_start_before",
time.time(),
t_set_new_trip_start_before.elapsed
)
else:
# Timer for setting new trip start point in else case
with ect.Timer() as t_set_new_trip_start_else:
# We end a trip at the current point, and the next trip starts at the next point
just_ended = True
prevPoint = None
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/set_new_trip_start_else", time.time(), t_set_new_trip_start_else.elapsed)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/set_new_trip_start_else",
time.time(),
t_set_new_trip_start_else.elapsed
)
else:
prevPoint = currPoint

# Store elapsed time for processing the row
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/process_row", time.time(), t_process_row.elapsed)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/loop_over_points", time.time(), t_loop_over_points.elapsed)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/process_row",
time.time(),
t_process_row.elapsed
)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/loop_over_points",
time.time(),
t_loop_over_points.elapsed
)

logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" %
(just_ended, len(transition_df)))
Expand All @@ -230,10 +304,16 @@ def segment_into_trips(self, timeseries, time_query):
logging.debug("Found trip end at %s" % last_trip_end_point.fmt_time)
# We have "processed" everything up to the trip end by marking it as a completed trip
self.last_ts_processed = currPoint.metadata_write_ts
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/handle_final_trip_end", time.time(), t_handle_final_trip_end.elapsed)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/handle_final_trip_end",
time.time(),
t_handle_final_trip_end.elapsed
)

return segmentation_points


def continue_just_ended(self, idx, currPoint, filtered_points_df):
"""
Normally, since the logic here and the
Expand Down

0 comments on commit cc2a352

Please sign in to comment.