-
Notifications
You must be signed in to change notification settings - Fork 119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improving Trip Segmentation by reducing DB calls #958
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,36 +9,40 @@ | |
import emission.core.wrapper.transition as ecwt | ||
import emission.storage.timeseries.timequery as estt | ||
|
||
def is_tracking_restarted_in_range(start_ts, end_ts, timeseries): | ||
def is_tracking_restarted_in_range(start_ts, end_ts, transition_df): | ||
""" | ||
Check to see if tracing was restarted between the times specified | ||
:param start_ts: the start of the time range to check | ||
:param end_ts: the end of the time range to check | ||
:param timeseries: the timeseries to use for checking | ||
:return: | ||
""" | ||
import emission.storage.timeseries.timequery as estt | ||
transition_df_start_idx=transition_df.ts.searchsorted(start_ts,side='left') | ||
transition_df_end_idx=transition_df.ts.searchsorted(end_ts,side='right') | ||
transition_df_for_current=transition_df.iloc[transition_df_start_idx:transition_df_end_idx] | ||
|
||
tq = estt.TimeQuery(timeType="data.ts", startTs=start_ts, | ||
endTs=end_ts) | ||
transition_df = timeseries.get_data_df("statemachine/transition", tq) | ||
if len(transition_df) == 0: | ||
if len(transition_df_for_current) == 0: | ||
logging.debug("In range %s -> %s found no transitions" % | ||
(tq.startTs, tq.endTs)) | ||
(start_ts, end_ts)) | ||
return False | ||
logging.debug("In range %s -> %s found transitions %s" % | ||
(tq.startTs, tq.endTs, transition_df[["fmt_time", "curr_state", "transition"]])) | ||
return _is_tracking_restarted_android(transition_df) or \ | ||
_is_tracking_restarted_ios(transition_df) | ||
(start_ts, end_ts, transition_df_for_current[["fmt_time", "curr_state", "transition"]])) | ||
return _is_tracking_restarted_android(transition_df_for_current) or \ | ||
_is_tracking_restarted_ios(transition_df_for_current) | ||
|
||
def get_ongoing_motion_in_range(start_ts, end_ts, timeseries): | ||
tq = estt.TimeQuery(timeType = "data.ts", startTs = start_ts, | ||
endTs = end_ts) | ||
motion_list = list(timeseries.find_entries(["background/motion_activity"], tq)) | ||
def get_ongoing_motion_in_range(start_ts, end_ts, motion_df): | ||
## in case when we receive an empty dataframe, there's nothing to | ||
## process | ||
if motion_df.shape == (0,0): | ||
return motion_df | ||
|
||
motion_df_start_idx=motion_df.ts.searchsorted(start_ts,side='left') | ||
motion_df_end_idx=motion_df.ts.searchsorted(end_ts,side='right') | ||
filtered_motion_df=motion_df.iloc[motion_df_start_idx:motion_df_end_idx] | ||
Comment on lines
+39
to
+41
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto |
||
logging.debug("Found %s motion_activity entries in range %s -> %s" % | ||
(len(motion_list), tq.startTs, tq.endTs)) | ||
logging.debug("sample activities are %s" % motion_list[0:5]) | ||
return motion_list | ||
(len(filtered_motion_df),start_ts, end_ts)) | ||
#logging.debug("sample activities are %s" % filtered_motion_df.head()) | ||
return filtered_motion_df | ||
|
||
def _is_tracking_restarted_android(transition_df): | ||
""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ | |
from builtins import * | ||
from builtins import object | ||
import logging | ||
import pandas as pd | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need to add this import? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unused currently. Removed. |
||
|
||
import emission.storage.timeseries.abstract_timeseries as esta | ||
import emission.storage.decorations.place_queries as esdp | ||
|
@@ -65,6 +66,12 @@ def segment_current_trips(user_id): | |
# We need to use the appropriate filter based on the incoming data | ||
# So let's read in the location points for the specified query | ||
loc_df = ts.get_data_df("background/filtered_location", time_query) | ||
transition_df = ts.get_data_df("statemachine/transition", time_query) | ||
motion_df = ts.get_data_df("background/motion_activity",time_query) | ||
if len(transition_df) > 0: | ||
logging.debug("transition_df = %s" % transition_df[["fmt_time", "transition"]]) | ||
else: | ||
logging.debug("no transitions found. This can happen for continuous sensing") | ||
if len(loc_df) == 0: | ||
# no new segments, no need to keep looking at these again | ||
logging.debug("len(loc_df) == 0, early return") | ||
|
@@ -89,12 +96,12 @@ def segment_current_trips(user_id): | |
if len(filters_in_df) == 1: | ||
# Common case - let's make it easy | ||
|
||
segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(ts, | ||
segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(transition_df,motion_df,ts, | ||
time_query) | ||
else: | ||
segmentation_points = get_combined_segmentation_points(ts, loc_df, time_query, | ||
filters_in_df, | ||
filter_methods) | ||
filter_methods,transition_df,motion_df) | ||
# Create and store trips and places based on the segmentation points | ||
if segmentation_points is None: | ||
epq.mark_segmentation_failed(user_id) | ||
|
@@ -104,13 +111,13 @@ def segment_current_trips(user_id): | |
epq.mark_segmentation_done(user_id, None) | ||
else: | ||
try: | ||
create_places_and_trips(user_id, segmentation_points, filter_method_names[filters_in_df[0]]) | ||
create_places_and_trips(user_id, transition_df, segmentation_points, filter_method_names[filters_in_df[0]]) | ||
epq.mark_segmentation_done(user_id, get_last_ts_processed(filter_methods)) | ||
except: | ||
logging.exception("Trip generation failed for user %s" % user_id) | ||
epq.mark_segmentation_failed(user_id) | ||
|
||
def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods): | ||
def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods,transition_df, motion_df): | ||
""" | ||
We can have mixed filters in a particular time range for multiple reasons. | ||
a) user switches phones from one platform to another | ||
|
@@ -149,7 +156,7 @@ def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filt | |
time_query.endTs = loc_df.iloc[endIndex+1].ts | ||
logging.debug("for filter %s, startTs = %d and endTs = %d" % | ||
(curr_filter, time_query.startTs, time_query.endTs)) | ||
segmentation_map[time_query.startTs] = filter_methods[curr_filter].segment_into_trips(ts, time_query) | ||
segmentation_map[time_query.startTs] = filter_methods[curr_filter].segment_into_trips(transition_df,motion_df,ts, time_query) | ||
logging.debug("After filtering, segmentation_map has keys %s" % list(segmentation_map.keys())) | ||
sortedStartTsList = sorted(segmentation_map.keys()) | ||
segmentation_points = [] | ||
|
@@ -171,7 +178,7 @@ def get_last_ts_processed(filter_methods): | |
logging.info("Returning last_ts_processed = %s" % last_ts_processed) | ||
return last_ts_processed | ||
|
||
def create_places_and_trips(user_id, segmentation_points, segmentation_method_name): | ||
def create_places_and_trips(user_id,transition_df, segmentation_points, segmentation_method_name): | ||
# new segments, need to deal with them | ||
# First, retrieve the last place so that we can stitch it to the newly created trip. | ||
# Again, there are easy and hard. In the easy case, the trip was | ||
|
@@ -214,7 +221,7 @@ def create_places_and_trips(user_id, segmentation_points, segmentation_method_na | |
new_place_entry = ecwe.Entry.create_entry(user_id, | ||
"segmentation/raw_place", new_place, create_id = True) | ||
|
||
if found_untracked_period(ts, last_place_entry.data, start_loc, segmentation_method_name): | ||
if found_untracked_period(transition_df, last_place_entry.data, start_loc, segmentation_method_name): | ||
# Fill in the gap in the chain with an untracked period | ||
curr_untracked = ecwut.Untrackedtime() | ||
curr_untracked.source = segmentation_method_name | ||
|
@@ -254,7 +261,7 @@ def _link_and_save(ts, last_place_entry, curr_trip_entry, new_place_entry, start | |
# it will be lost | ||
ts.update(last_place_entry) | ||
|
||
def found_untracked_period(timeseries, last_place, start_loc, segmentation_method_name): | ||
def found_untracked_period(transition_df, last_place, start_loc, segmentation_method_name): | ||
""" | ||
Check to see whether the two places are the same. | ||
This is a fix for https://github.com/e-mission/e-mission-server/issues/378 | ||
|
@@ -270,7 +277,7 @@ def found_untracked_period(timeseries, last_place, start_loc, segmentation_metho | |
logging.debug("start of a chain, unable to check for restart from previous trip end, assuming not restarted") | ||
return False | ||
|
||
if _is_tracking_restarted(last_place, start_loc, timeseries): | ||
if _is_tracking_restarted(last_place, start_loc, transition_df): | ||
logging.debug("tracking has been restarted, returning True") | ||
return True | ||
|
||
|
@@ -378,6 +385,6 @@ def stitch_together_end(new_place_entry, curr_trip_entry, end_loc): | |
new_place_entry["data"] = new_place | ||
curr_trip_entry["data"] = curr_trip | ||
|
||
def _is_tracking_restarted(last_place, start_loc, timeseries): | ||
return eaisr.is_tracking_restarted_in_range(last_place.enter_ts, start_loc.ts, timeseries) | ||
def _is_tracking_restarted(last_place, start_loc, transition_df): | ||
return eaisr.is_tracking_restarted_in_range(last_place.enter_ts, start_loc.ts, transition_df) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,7 +38,7 @@ def __init__(self, time_threshold, point_threshold, distance_threshold): | |
self.point_threshold = point_threshold | ||
self.distance_threshold = distance_threshold | ||
|
||
def segment_into_trips(self, timeseries, time_query): | ||
def segment_into_trips(self, transition_df, motion_df, timeseries, time_query): | ||
""" | ||
Examines the timeseries database for a specific range and returns the | ||
segmentation points. Note that the input is the entire timeseries and | ||
|
@@ -48,7 +48,7 @@ def segment_into_trips(self, timeseries, time_query): | |
""" | ||
self.filtered_points_df = timeseries.get_data_df("background/filtered_location", time_query) | ||
self.filtered_points_df.loc[:,"valid"] = True | ||
self.transition_df = timeseries.get_data_df("statemachine/transition", time_query) | ||
self.transition_df = transition_df | ||
if len(self.transition_df) > 0: | ||
logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]]) | ||
else: | ||
|
@@ -88,7 +88,7 @@ def segment_into_trips(self, timeseries, time_query): | |
# So we reset_index upstream and use it here. | ||
last10Points_df = self.filtered_points_df.iloc[max(idx-self.point_threshold, curr_trip_start_point.idx):idx+1] | ||
lastPoint = self.find_last_valid_point(idx) | ||
if self.has_trip_ended(lastPoint, currPoint, timeseries): | ||
if self.has_trip_ended(lastPoint, currPoint, timeseries, motion_df): | ||
last_trip_end_point = lastPoint | ||
logging.debug("Appending last_trip_end_point %s with index %s " % | ||
(last_trip_end_point, idx-1)) | ||
|
@@ -144,7 +144,7 @@ def segment_into_trips(self, timeseries, time_query): | |
logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last)) | ||
return segmentation_points | ||
|
||
def has_trip_ended(self, lastPoint, currPoint, timeseries): | ||
def has_trip_ended(self, lastPoint, currPoint, timeseries, motion_df): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if you are already passing in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's being used here :
|
||
# So we must not have been moving for the last _time filter_ | ||
# points. So the trip must have ended | ||
# Since this is a distance filter, we detect that the last | ||
|
@@ -173,14 +173,14 @@ def has_trip_ended(self, lastPoint, currPoint, timeseries): | |
# for this kind of test | ||
speedThreshold = old_div(float(self.distance_threshold * 2), (old_div(self.time_threshold, 2))) | ||
|
||
if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, timeseries): | ||
if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, self.transition_df): | ||
Comment on lines
-176
to
+177
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is |
||
logging.debug("tracking was restarted, ending trip") | ||
return True | ||
|
||
# In general, we get multiple locations between each motion activity. If we see a bunch of motion activities | ||
# between two location points, and there is a large gap between the last location and the first | ||
# motion activity as well, let us just assume that there was a restart | ||
ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, timeseries) | ||
ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, motion_df) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. while motion_df is not. They are both read upfront and should be treated the same way for clarity |
||
ongoing_motion_check = len(ongoing_motion_in_range) > 0 | ||
if timeDelta > self.time_threshold and not ongoing_motion_check: | ||
logging.debug("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" % | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,7 +53,7 @@ def __init__(self, time_threshold, point_threshold, distance_threshold): | |
self.point_threshold = point_threshold | ||
self.distance_threshold = distance_threshold | ||
|
||
def segment_into_trips(self, timeseries, time_query): | ||
def segment_into_trips(self, transition_df,motion_df, timeseries, time_query): | ||
""" | ||
Examines the timeseries database for a specific range and returns the | ||
segmentation points. Note that the input is the entire timeseries and | ||
|
@@ -130,7 +130,7 @@ def segment_into_trips(self, timeseries, time_query): | |
logging.debug("last5MinsTimes.max() = %s, time_threshold = %s" % | ||
(last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold)) | ||
|
||
if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes): | ||
if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes,transition_df, motion_df): | ||
(ended_before_this, last_trip_end_point) = self.get_last_trip_end_point(filtered_points_df, | ||
last10Points_df, last5MinsPoints_df) | ||
segmentation_points.append((curr_trip_start_point, last_trip_end_point)) | ||
|
@@ -199,7 +199,7 @@ def continue_just_ended(self, idx, currPoint, filtered_points_df): | |
else: | ||
return False | ||
|
||
def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes): | ||
def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes, transition_df, motion_df): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto. if we are passing in the transition_df and motion_df, why do we also need the timeseries? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here, it's unused. Cleared. |
||
# Another mismatch between phone and server. Phone stops tracking too soon, | ||
# so the distance is still greater than the threshold at the end of the trip. | ||
# But then the next point is a long time away, so we can split again (similar to a distance filter) | ||
|
@@ -214,11 +214,11 @@ def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistanc | |
speedDelta = np.nan | ||
speedThreshold = old_div(float(self.distance_threshold), self.time_threshold) | ||
|
||
if eaisr.is_tracking_restarted_in_range(prev_point.ts, curr_point.ts, timeseries): | ||
if eaisr.is_tracking_restarted_in_range(prev_point.ts, curr_point.ts, transition_df): | ||
logging.debug("tracking was restarted, ending trip") | ||
return True | ||
|
||
ongoing_motion_check = len(eaisr.get_ongoing_motion_in_range(prev_point.ts, curr_point.ts, timeseries)) > 0 | ||
ongoing_motion_check = len(eaisr.get_ongoing_motion_in_range(prev_point.ts, curr_point.ts, motion_df)) > 0 | ||
if timeDelta > 2 * self.time_threshold and not ongoing_motion_check: | ||
logging.debug("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" % | ||
(prev_point.ts, curr_point.ts,self.time_threshold, curr_point.ts - prev_point.ts, ongoing_motion_check)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,8 +23,8 @@ def is_huge_invalid_ts_offset(filterMethod, lastPoint, currPoint, timeseries, | |
ecwm.MotionTypes.NONE.value, | ||
ecwm.MotionTypes.STOPPED_WHILE_IN_VEHICLE.value] | ||
|
||
non_still_motions = [ma for ma in motionInRange if ma["data"]["type"] not in ignore_modes_list and ma["data"]["confidence"] == 100] | ||
logging.debug("non_still_motions = %s" % [(ecwm.MotionTypes(ma["data"]["type"]), ma["data"]["confidence"], ma["data"]["fmt_time"]) for ma in non_still_motions]) | ||
non_still_motions=motionInRange[~motionInRange['type'].isin(ignore_modes_list) & (motionInRange['confidence'] ==100)] | ||
#logging.debug("non_still_motions = %s" % [(ecwm.MotionTypes(ma["data"]["type"]), ma["data"]["confidence"], ma["data"]["fmt_time"]) for ma in non_still_motions]) logging.debug("non_still_motions = %s" % [(ecwm.MotionTypes(ma["data"]["type"]), ma["data"]["confidence"], ma["data"]["fmt_time"]) for ma in non_still_motions]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this change doesn't appear to be related to this fix. |
||
|
||
non_still_motions_rate = len(non_still_motions) / (currPoint.ts - lastPoint.ts) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason you are using this instead of something like
or
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First was the performance reason, O(log (n)) here vs O (n) in others.
Second, in case of query(), it creates a boolean series that marks rows to keep or discard, which happens internally, which increases temporary memory usage. For very large DataFrames, this can be an issue.
can use this one ,if log(n) vs O(n) is not an issue :
transition_df_for_current=transition_df[transition_df.ts >= start_ts && transition_df.ts <= end_ts]