Skip to content

Commit

Permalink
Time Filter
Browse files Browse the repository at this point in the history
  • Loading branch information
TeachMeTW committed Nov 6, 2024
1 parent f4f6d19 commit eace26b
Showing 1 changed file with 153 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
import numpy as np
import pandas as pd
import datetime as pydt
import time

# Our imports
import emission.analysis.point_features as pf
import emission.analysis.intake.segmentation.trip_segmentation as eaist
import emission.core.wrapper.location as ecwl

import emission.analysis.intake.segmentation.restart_checking as eaisr
import emission.core.timer as ect
import emission.core.wrapper.pipelinestate as ecwp
import emission.storage.decorations.stats_queries as esds

class DwellSegmentationTimeFilter(eaist.TripSegmentationMethod):
def __init__(self, time_threshold, point_threshold, distance_threshold):
Expand Down Expand Up @@ -61,14 +65,27 @@ def segment_into_trips(self, timeseries, time_query):
data that they want from the sensor streams in order to determine the
segmentation points.
"""
filtered_points_pre_ts_diff_df = timeseries.get_data_df("background/filtered_location", time_query)
# Sometimes, we can get bogus points because data.ts and
# metadata.write_ts are off by a lot. If we don't do this, we end up
# appearing to travel back in time
# https://github.com/e-mission/e-mission-server/issues/457
filtered_points_df = filtered_points_pre_ts_diff_df[(filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000]
filtered_points_df.reset_index(inplace=True)
transition_df = timeseries.get_data_df("statemachine/transition", time_query)
# Timer for fetching filtered location points
with ect.Timer() as t_get_filtered_points_pre_ts_diff_df:
filtered_points_pre_ts_diff_df = timeseries.get_data_df("background/filtered_location", time_query)
user_id = filtered_points_pre_ts_diff_df.iloc[0]['user_id'] if not filtered_points_pre_ts_diff_df.empty else None
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_filtered_location", time.time(), t_get_filtered_points_pre_ts_diff_df.elapsed)

# Timer for filtering out bogus points
with ect.Timer() as t_filter_bogus_points:
# Sometimes, we can get bogus points because data.ts and
# metadata.write_ts are off by a lot. If we don't do this, we end up
# appearing to travel back in time
# https://github.com/e-mission/e-mission-server/issues/457
filtered_points_df = filtered_points_pre_ts_diff_df[(filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000]
filtered_points_df.reset_index(inplace=True)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/filter_bogus_points", time.time(), t_filter_bogus_points.elapsed)

# Timer for fetching transition data
with ect.Timer() as t_get_transition_df:
transition_df = timeseries.get_data_df("statemachine/transition", time_query)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_transition_df", time.time(), t_get_transition_df.elapsed)

if len(transition_df) > 0:
logging.debug("transition_df = %s" % transition_df[["fmt_time", "transition"]])
else:
Expand All @@ -83,88 +100,137 @@ def segment_into_trips(self, timeseries, time_query):
curr_trip_start_point = None
just_ended = True
prevPoint = None
for idx, row in filtered_points_df.iterrows():
currPoint = ad.AttrDict(row)
currPoint.update({"idx": idx})
logging.debug("-" * 30 + str(currPoint.fmt_time) + "-" * 30)
if curr_trip_start_point is None:
logging.debug("Appending currPoint because the current start point is None")
# segmentation_points.append(currPoint)

if just_ended:
if self.continue_just_ended(idx, currPoint, filtered_points_df):
# We have "processed" the currPoint by deciding to glom it
self.last_ts_processed = currPoint.metadata_write_ts
continue
# else:
sel_point = currPoint
logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx))
curr_trip_start_point = sel_point
just_ended = False

last5MinsPoints_df = filtered_points_df[np.logical_and(
np.logical_and(
filtered_points_df.ts > currPoint.ts - self.time_threshold,
filtered_points_df.ts < currPoint.ts),
filtered_points_df.ts >= curr_trip_start_point.ts)]
# Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive.
# Using .iloc just ends up including points after this one.
# So we reset_index upstream and use it here.
# We are going to use the last 8 points for now.
# TODO: Change this back to last 10 points once we normalize phone and this
last10Points_df = filtered_points_df.iloc[max(idx-self.point_threshold, curr_trip_start_point.idx):idx+1]
distanceToLast = lambda row: pf.calDistance(ad.AttrDict(row), currPoint)
timeToLast = lambda row: currPoint.ts - ad.AttrDict(row).ts
last5MinsDistances = last5MinsPoints_df.apply(distanceToLast, axis=1)
logging.debug("last5MinsDistances = %s with length %d" % (last5MinsDistances.to_numpy(), len(last5MinsDistances)))
last10PointsDistances = last10Points_df.apply(distanceToLast, axis=1)
logging.debug("last10PointsDistances = %s with length %d, shape %s" % (last10PointsDistances.to_numpy(),
len(last10PointsDistances),
last10PointsDistances.shape))

# Fix for https://github.com/e-mission/e-mission-server/issues/348
last5MinTimes = last5MinsPoints_df.apply(timeToLast, axis=1)

logging.debug("len(last10PointsDistances) = %d, len(last5MinsDistances) = %d" %
(len(last10PointsDistances), len(last5MinsDistances)))
logging.debug("last5MinsTimes.max() = %s, time_threshold = %s" %
(last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold))

if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes):
(ended_before_this, last_trip_end_point) = self.get_last_trip_end_point(filtered_points_df,
last10Points_df, last5MinsPoints_df)
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
logging.info("Found trip end at %s" % last_trip_end_point.fmt_time)
# We have processed everything up to the trip end by marking it as a completed trip
self.last_ts_processed = currPoint.metadata_write_ts
if ended_before_this:
# in this case, we end a trip at the previous point, and the next trip starts at this
# point, not the next one
just_ended = False
prevPoint = currPoint
curr_trip_start_point = currPoint
logging.debug("Setting new trip start point %s with idx %s" %
(currPoint, currPoint.idx))
else:
# We end a trip at the current point, and the next trip starts at the next point
just_ended = True
prevPoint = None
else:
prevPoint = currPoint

# Timer for the entire loop over filtered points
with ect.Timer() as t_loop_over_points:
for idx, row in filtered_points_df.iterrows():
# Timer for processing each row
with ect.Timer() as t_process_row:
currPoint = ad.AttrDict(row)
currPoint.update({"idx": idx})
logging.debug("-" * 30 + str(currPoint.fmt_time) + "-" * 30)
if curr_trip_start_point is None:
logging.debug("Appending currPoint because the current start point is None")
# segmentation_points.append(currPoint)

if just_ended:
if self.continue_just_ended(idx, currPoint, filtered_points_df):
# We have "processed" the currPoint by deciding to glom it
self.last_ts_processed = currPoint.metadata_write_ts
continue
# else:
sel_point = currPoint
logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx))
curr_trip_start_point = sel_point
just_ended = False

# Timer for calculating last 5 minutes points
with ect.Timer() as t_calculate_last5MinsPoints:
last5MinsPoints_df = filtered_points_df[np.logical_and(
np.logical_and(
filtered_points_df.ts > currPoint.ts - self.time_threshold,
filtered_points_df.ts < currPoint.ts),
filtered_points_df.ts >= curr_trip_start_point.ts)]
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculate_last5MinsPoints", time.time(), t_calculate_last5MinsPoints.elapsed)

# Timer for selecting last 10 points
with ect.Timer() as t_select_last10Points:
# Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive.
# Using .iloc just ends up including points after this one.
# So we reset_index upstream and use it here.
# We are going to use the last 8 points for now.
# TODO: Change this back to last 10 points once we normalize phone and this
last10Points_df = filtered_points_df.iloc[max(idx-self.point_threshold, curr_trip_start_point.idx):idx+1]
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/select_last10Points", time.time(), t_select_last10Points.elapsed)

distanceToLast = lambda row: pf.calDistance(ad.AttrDict(row), currPoint)
timeToLast = lambda row: currPoint.ts - ad.AttrDict(row).ts

# Timer for calculating last 5 minutes distances
with ect.Timer() as t_calculate_last5MinsDistances:
last5MinsDistances = last5MinsPoints_df.apply(distanceToLast, axis=1)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculate_last5MinsDistances", time.time(), t_calculate_last5MinsDistances.elapsed)

logging.debug("last5MinsDistances = %s with length %d" % (last5MinsDistances.to_numpy(), len(last5MinsDistances)))

# Timer for calculating last 10 points distances
with ect.Timer() as t_calculate_last10PointsDistances:
last10PointsDistances = last10Points_df.apply(distanceToLast, axis=1)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculate_last10PointsDistances", time.time(), t_calculate_last10PointsDistances.elapsed)

logging.debug("last10PointsDistances = %s with length %d, shape %s" % (last10PointsDistances.to_numpy(),
len(last10PointsDistances),
last10PointsDistances.shape))

# Timer for calculating last 5 minutes times
with ect.Timer() as t_calculate_last5MinTimes:
# Fix for https://github.com/e-mission/e-mission-server/issues/348
last5MinTimes = last5MinsPoints_df.apply(timeToLast, axis=1)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculate_last5MinTimes", time.time(), t_calculate_last5MinTimes.elapsed)

logging.debug("len(last10PointsDistances) = %d, len(last5MinsDistances) = %d" %
(len(last10PointsDistances), len(last5MinsDistances)))
logging.debug("last5MinsTimes.max() = %s, time_threshold = %s" %
(last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold))

# Timer for checking if trip has ended
with ect.Timer() as t_has_trip_ended:
trip_ended = self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/has_trip_ended", time.time(), t_has_trip_ended.elapsed)

if trip_ended:
# Timer for getting last trip end point
with ect.Timer() as t_get_last_trip_end_point:
(ended_before_this, last_trip_end_point) = self.get_last_trip_end_point(filtered_points_df,
last10Points_df, last5MinsPoints_df)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_last_trip_end_point", time.time(), t_get_last_trip_end_point.elapsed)

segmentation_points.append((curr_trip_start_point, last_trip_end_point))
logging.info("Found trip end at %s" % last_trip_end_point.fmt_time)
# We have "processed" everything up to the trip end by marking it as a completed trip
self.last_ts_processed = currPoint.metadata_write_ts

if ended_before_this:
# Timer for setting new trip start point in ended_before_this case
with ect.Timer() as t_set_new_trip_start_before:
# in this case, we end a trip at the previous point, and the next trip starts at this
# point, not the next one
just_ended = False
prevPoint = currPoint
curr_trip_start_point = currPoint
logging.debug("Setting new trip start point %s with idx %s" %
(currPoint, currPoint.idx))
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/set_new_trip_start_before", time.time(), t_set_new_trip_start_before.elapsed)
else:
# Timer for setting new trip start point in else case
with ect.Timer() as t_set_new_trip_start_else:
# We end a trip at the current point, and the next trip starts at the next point
just_ended = True
prevPoint = None
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/set_new_trip_start_else", time.time(), t_set_new_trip_start_else.elapsed)
else:
prevPoint = currPoint

# Store elapsed time for processing the row
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/process_row", time.time(), t_process_row.elapsed)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/loop_over_points", time.time(), t_loop_over_points.elapsed)

logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" %
(just_ended, len(transition_df)))
(just_ended, len(transition_df)))
if not just_ended and len(transition_df) > 0:
stopped_moving_after_last = transition_df[(transition_df.ts > currPoint.ts) & (transition_df.transition == 2)]
logging.debug("looking after %s, found transitions %s" %
(currPoint.ts, stopped_moving_after_last))
if len(stopped_moving_after_last) > 0:
(unused, last_trip_end_point) = self.get_last_trip_end_point(filtered_points_df,
last10Points_df, None)
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
logging.debug("Found trip end at %s" % last_trip_end_point.fmt_time)
# We have processed everything up to the trip end by marking it as a completed trip
self.last_ts_processed = currPoint.metadata_write_ts
# Timer for handling the final trip end point
with ect.Timer() as t_handle_final_trip_end:
stopped_moving_after_last = transition_df[(transition_df.ts > currPoint.ts) & (transition_df.transition == 2)]
logging.debug("looking after %s, found transitions %s" %
(currPoint.ts, stopped_moving_after_last))
if len(stopped_moving_after_last) > 0:
(unused, last_trip_end_point) = self.get_last_trip_end_point(filtered_points_df,
last10Points_df, None)
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
logging.debug("Found trip end at %s" % last_trip_end_point.fmt_time)
# We have "processed" everything up to the trip end by marking it as a completed trip
self.last_ts_processed = currPoint.metadata_write_ts
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/handle_final_trip_end", time.time(), t_handle_final_trip_end.elapsed)

return segmentation_points

Expand Down

0 comments on commit eace26b

Please sign in to comment.