Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed Bottom 80% functions from latest snapshot. #998

Merged
merged 2 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 23 additions & 35 deletions emission/analysis/intake/segmentation/trip_segmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,20 @@ def segment_into_trips(self, timeseries, time_query):
pass

def segment_current_trips(user_id):
with ect.Timer() as t_get_time_series:
ts = esta.TimeSeries.get_time_series(user_id)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_series", time.time(), t_get_time_series.elapsed)

with ect.Timer() as t_get_time_range:
time_query = epq.get_time_range_for_segmentation(user_id)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_range_for_segmentation", time.time(), t_get_time_range.elapsed)
ts = esta.TimeSeries.get_time_series(user_id)
time_query = epq.get_time_range_for_segmentation(user_id)

import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_time_filter as dstf
import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_dist_filter as dsdf

with ect.Timer() as t_create_time_filter:
dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold=5 * 60, # 5 mins
point_threshold=9,
distance_threshold=100) # 100 m
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_time_filter", time.time(), t_create_time_filter.elapsed)

with ect.Timer() as t_create_dist_filter:
dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold=10 * 60, # 10 mins
point_threshold=9,
distance_threshold=50) # 50 m
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_dist_filter", time.time(), t_create_dist_filter.elapsed)
dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold=5 * 60, # 5 mins
point_threshold=9,
distance_threshold=100) # 100 m

dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold=10 * 60, # 10 mins
point_threshold=9,
distance_threshold=50) # 50 m

filter_methods = {"time": dstfsm, "distance": dsdfsm}
filter_method_names = {"time": "DwellSegmentationTimeFilter", "distance": "DwellSegmentationDistFilter"}
Expand All @@ -89,24 +81,20 @@ def segment_current_trips(user_id):
epq.mark_segmentation_done(user_id, None)
return

with ect.Timer() as t_handle_out_of_order:
out_of_order_points = loc_df[loc_df.ts.diff() < 0]
if len(out_of_order_points) > 0:
logging.info("Found out of order points!")
logging.info("%s" % out_of_order_points)
# drop from the table
loc_df = loc_df.drop(out_of_order_points.index.tolist())
loc_df.reset_index(inplace=True)
# invalidate in the database.
out_of_order_id_list = out_of_order_points["_id"].tolist()
logging.debug("out_of_order_id_list = %s" % out_of_order_id_list)
for ooid in out_of_order_id_list:
ts.invalidate_raw_entry(ooid)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/handle_out_of_order_points", time.time(), t_handle_out_of_order.elapsed)

with ect.Timer() as t_get_filters:
filters_in_df = loc_df["filter"].dropna().unique()
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_filters_in_df", time.time(), t_get_filters.elapsed)
out_of_order_points = loc_df[loc_df.ts.diff() < 0]
if len(out_of_order_points) > 0:
logging.info("Found out of order points!")
logging.info("%s" % out_of_order_points)
# drop from the table
loc_df = loc_df.drop(out_of_order_points.index.tolist())
loc_df.reset_index(inplace=True)
# invalidate in the database.
out_of_order_id_list = out_of_order_points["_id"].tolist()
logging.debug("out_of_order_id_list = %s" % out_of_order_id_list)
for ooid in out_of_order_id_list:
ts.invalidate_raw_entry(ooid)

filters_in_df = loc_df["filter"].dropna().unique()

logging.debug("Filters in the dataframe = %s" % filters_in_df)
if len(filters_in_df) == 1:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,9 @@ def segment_into_trips(self, timeseries, time_query):
t_get_filtered_points.elapsed
)

with ect.Timer() as t_mark_valid:
self.filtered_points_df.loc[:, "valid"] = True
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/mark_valid",
time.time(),
t_mark_valid.elapsed
)
self.filtered_points_df.loc[:, "valid"] = True

with ect.Timer() as t_get_transition_df:
self.transition_df = timeseries.get_data_df("statemachine/transition", time_query)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/get_transition_df",
time.time(),
t_get_transition_df.elapsed
)
self.transition_df = timeseries.get_data_df("statemachine/transition", time_query)

if len(self.transition_df) > 0:
logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]])
Expand All @@ -103,14 +89,7 @@ def segment_into_trips(self, timeseries, time_query):
# segmentation_points.append(currPoint)

if just_ended:
with ect.Timer() as t_continue_just_ended:
continue_flag = self.continue_just_ended(idx, currPoint, self.filtered_points_df)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/continue_just_ended",
time.time(),
t_continue_just_ended.elapsed
)
continue_flag = self.continue_just_ended(idx, currPoint, self.filtered_points_df)

if continue_flag:
# We have "processed" the currPoint by deciding to glom it
Expand All @@ -119,14 +98,7 @@ def segment_into_trips(self, timeseries, time_query):
# else:
sel_point = currPoint
logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx))
with ect.Timer() as t_set_start_point:
curr_trip_start_point = sel_point
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/set_new_trip_start_point",
time.time(),
t_set_start_point.elapsed
)
curr_trip_start_point = sel_point
just_ended = False
else:
with ect.Timer() as t_process_trip:
Expand All @@ -137,106 +109,72 @@ def segment_into_trips(self, timeseries, time_query):
max(idx - self.point_threshold, curr_trip_start_point.idx):idx + 1
]
lastPoint = self.find_last_valid_point(idx)
with ect.Timer() as t_has_trip_ended:
trip_ended = self.has_trip_ended(lastPoint, currPoint, timeseries)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/has_trip_ended",
time.time(),
t_has_trip_ended.elapsed
)
trip_ended = self.has_trip_ended(lastPoint, currPoint, timeseries)

if trip_ended:
with ect.Timer() as t_get_last_trip_end_point:
last_trip_end_point = lastPoint
logging.debug("Appending last_trip_end_point %s with index %s " %
(last_trip_end_point, idx - 1))
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
logging.info("Found trip end at %s" % last_trip_end_point.fmt_time)
# We have processed everything up to the trip end by marking it as a completed trip
self.last_ts_processed = currPoint.metadata_write_ts
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/get_last_trip_end_point",
time.time(),
t_get_last_trip_end_point.elapsed
)

with ect.Timer() as t_handle_trip_end:
just_ended = True
# Now, we have finished processing the previous point as a trip
# end or not. But we still need to process this point by seeing
# whether it should represent a new trip start, or a glom to the
# previous trip
if not self.continue_just_ended(idx, currPoint, self.filtered_points_df):
sel_point = currPoint
logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx))
curr_trip_start_point = sel_point
just_ended = False
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/handle_trip_end",
time.time(),
t_handle_trip_end.elapsed
)
last_trip_end_point = lastPoint
logging.debug("Appending last_trip_end_point %s with index %s " %
(last_trip_end_point, idx - 1))
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
logging.info("Found trip end at %s" % last_trip_end_point.fmt_time)
# We have processed everything up to the trip end by marking it as a completed trip
self.last_ts_processed = currPoint.metadata_write_ts
just_ended = True
# Now, we have finished processing the previous point as a trip
# end or not. But we still need to process this point by seeing
# whether it should represent a new trip start, or a glom to the
# previous trip
if not self.continue_just_ended(idx, currPoint, self.filtered_points_df):
sel_point = currPoint
logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx))
curr_trip_start_point = sel_point
just_ended = False

esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/loop",
time.time(),
t_loop.elapsed
)

with ect.Timer() as t_post_loop:
# Since we only end a trip when we start a new trip, this means that
# the last trip that was pushed is ignored. Consider the example of
# 2016-02-22 when I took kids to karate. We arrived shortly after 4pm,
# so during that remote push, a trip end was not detected. And we got
# back home shortly after 5pm, so the trip end was only detected on the
# phone at 6pm. At that time, the following points were pushed:
# ..., 2016-02-22T16:04:02, 2016-02-22T16:49:34, 2016-02-22T16:49:50,
# ..., 2016-02-22T16:57:04
# Then, on the server, while iterating through the points, we detected
# a trip end at 16:04, and a new trip start at 16:49. But we did not
# detect the trip end at 16:57, because we didn't start a new point.
# This has two issues:
# - we won't see this trip until the next trip start, which may be on the next day
# - we won't see this trip at all, because when we run the pipeline the
# next time, we will only look at points from that time onwards. These
# points have been marked as "processed", so they won't even be considered.

# There are multiple potential fixes:
# - we can mark only the completed trips as processed. This will solve (2) above, but not (1)
# - we can mark a trip end based on the fact that we only push data
# when a trip ends, so if we have data, it means that the trip has been
# detected as ended on the phone.
# This seems a bit fragile - what if we start pushing incomplete trip
# data for efficiency reasons? Therefore, we also check to see if there
# is a trip_end_detected in this timeframe after the last point. If so,
# then we end the trip at the last point that we have.
if not just_ended and len(self.transition_df) > 0:
with ect.Timer() as t_check_transitions:
stopped_moving_after_last = self.transition_df[
(self.transition_df.ts > currPoint.ts) & (self.transition_df.transition == 2)
]
logging.debug("stopped_moving_after_last = %s" % stopped_moving_after_last[["fmt_time", "transition"]])
if len(stopped_moving_after_last) > 0:
logging.debug("Found %d transitions after last point, ending trip..." % len(stopped_moving_after_last))
segmentation_points.append((curr_trip_start_point, currPoint))
self.last_ts_processed = currPoint.metadata_write_ts
else:
logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last))
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/check_transitions_post_loop",
time.time(),
t_check_transitions.elapsed
)
esds.store_pipeline_time(
user_id,
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/post_loop",
time.time(),
t_post_loop.elapsed
)

# Since we only end a trip when we start a new trip, this means that
# the last trip that was pushed is ignored. Consider the example of
# 2016-02-22 when I took kids to karate. We arrived shortly after 4pm,
# so during that remote push, a trip end was not detected. And we got
# back home shortly after 5pm, so the trip end was only detected on the
# phone at 6pm. At that time, the following points were pushed:
# ..., 2016-02-22T16:04:02, 2016-02-22T16:49:34, 2016-02-22T16:49:50,
# ..., 2016-02-22T16:57:04
# Then, on the server, while iterating through the points, we detected
# a trip end at 16:04, and a new trip start at 16:49. But we did not
# detect the trip end at 16:57, because we didn't start a new point.
# This has two issues:
# - we won't see this trip until the next trip start, which may be on the next day
# - we won't see this trip at all, because when we run the pipeline the
# next time, we will only look at points from that time onwards. These
# points have been marked as "processed", so they won't even be considered.

# There are multiple potential fixes:
# - we can mark only the completed trips as processed. This will solve (2) above, but not (1)
# - we can mark a trip end based on the fact that we only push data
# when a trip ends, so if we have data, it means that the trip has been
# detected as ended on the phone.
# This seems a bit fragile - what if we start pushing incomplete trip
# data for efficiency reasons? Therefore, we also check to see if there
# is a trip_end_detected in this timeframe after the last point. If so,
# then we end the trip at the last point that we have.
if not just_ended and len(self.transition_df) > 0:
stopped_moving_after_last = self.transition_df[
(self.transition_df.ts > currPoint.ts) & (self.transition_df.transition == 2)
]
logging.debug("stopped_moving_after_last = %s" % stopped_moving_after_last[["fmt_time", "transition"]])
if len(stopped_moving_after_last) > 0:
logging.debug("Found %d transitions after last point, ending trip..." % len(stopped_moving_after_last))
segmentation_points.append((curr_trip_start_point, currPoint))
self.last_ts_processed = currPoint.metadata_write_ts
else:
logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last))

return segmentation_points

Expand Down
Loading
Loading