diff --git a/emission/analysis/intake/segmentation/trip_segmentation.py b/emission/analysis/intake/segmentation/trip_segmentation.py index d6828af77..7b47bd49b 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation.py +++ b/emission/analysis/intake/segmentation/trip_segmentation.py @@ -7,6 +7,7 @@ from builtins import * from builtins import object import logging +import time import emission.storage.timeseries.abstract_timeseries as esta import emission.storage.decorations.place_queries as esdp @@ -23,6 +24,9 @@ import emission.analysis.intake.segmentation.restart_checking as eaisr import emission.core.common as ecc +import emission.storage.decorations.stats_queries as esds +import emission.core.timer as ect +import emission.core.wrapper.pipelinestate as ecwp class TripSegmentationMethod(object): def segment_into_trips(self, timeseries, time_query): @@ -47,54 +51,76 @@ def segment_into_trips(self, timeseries, time_query): pass def segment_current_trips(user_id): - ts = esta.TimeSeries.get_time_series(user_id) - time_query = epq.get_time_range_for_segmentation(user_id) + with ect.Timer() as t_get_time_series: + ts = esta.TimeSeries.get_time_series(user_id) + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_series", time.time(), t_get_time_series.elapsed) + + with ect.Timer() as t_get_time_range: + time_query = epq.get_time_range_for_segmentation(user_id) + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_range_for_segmentation", time.time(), t_get_time_range.elapsed) import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_time_filter as dstf import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_dist_filter as dsdf - dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold = 5 * 60, # 5 mins - point_threshold = 9, - distance_threshold = 100) # 100 m - dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold = 10 * 60, # 10 mins - point_threshold = 9, - distance_threshold = 50) # 50 m + with ect.Timer() as t_create_time_filter: + dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold=5 * 60, # 5 mins + point_threshold=9, + distance_threshold=100) # 100 m + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_time_filter", time.time(), t_create_time_filter.elapsed) + + with ect.Timer() as t_create_dist_filter: + dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold=10 * 60, # 10 mins + point_threshold=9, + distance_threshold=50) # 50 m + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_dist_filter", time.time(), t_create_dist_filter.elapsed) filter_methods = {"time": dstfsm, "distance": dsdfsm} filter_method_names = {"time": "DwellSegmentationTimeFilter", "distance": "DwellSegmentationDistFilter"} + # We need to use the appropriate filter based on the incoming data # So let's read in the location points for the specified query - loc_df = ts.get_data_df("background/filtered_location", time_query) + with ect.Timer() as t_get_data_df: + loc_df = ts.get_data_df("background/filtered_location", time_query) + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_data_df", time.time(), t_get_data_df.elapsed) + if len(loc_df) == 0: # no new segments, no need to keep looking at these again logging.debug("len(loc_df) == 0, early return") epq.mark_segmentation_done(user_id, None) return - out_of_order_points = loc_df[loc_df.ts.diff() < 0] - if len(out_of_order_points) > 0: - logging.info("Found out of order points!") - logging.info("%s" % out_of_order_points) - # drop from the table - loc_df = loc_df.drop(out_of_order_points.index.tolist()) - loc_df.reset_index(inplace=True) - # invalidate in the database. - out_of_order_id_list = out_of_order_points["_id"].tolist() - logging.debug("out_of_order_id_list = %s" % out_of_order_id_list) - for ooid in out_of_order_id_list: - ts.invalidate_raw_entry(ooid) - - filters_in_df = loc_df["filter"].dropna().unique() + with ect.Timer() as t_handle_out_of_order: + out_of_order_points = loc_df[loc_df.ts.diff() < 0] + if len(out_of_order_points) > 0: + logging.info("Found out of order points!") + logging.info("%s" % out_of_order_points) + # drop from the table + loc_df = loc_df.drop(out_of_order_points.index.tolist()) + loc_df.reset_index(inplace=True) + # invalidate in the database. + out_of_order_id_list = out_of_order_points["_id"].tolist() + logging.debug("out_of_order_id_list = %s" % out_of_order_id_list) + for ooid in out_of_order_id_list: + ts.invalidate_raw_entry(ooid) + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/handle_out_of_order_points", time.time(), t_handle_out_of_order.elapsed) + + with ect.Timer() as t_get_filters: + filters_in_df = loc_df["filter"].dropna().unique() + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_filters_in_df", time.time(), t_get_filters.elapsed) + logging.debug("Filters in the dataframe = %s" % filters_in_df) if len(filters_in_df) == 1: # Common case - let's make it easy - - segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(ts, - time_query) + with ect.Timer() as t_segment_trips: + segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(ts, time_query) + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips", time.time(), t_segment_trips.elapsed) else: - segmentation_points = get_combined_segmentation_points(ts, loc_df, time_query, - filters_in_df, - filter_methods) + with ect.Timer() as t_get_combined_segmentation: + segmentation_points = get_combined_segmentation_points(ts, loc_df, time_query, + filters_in_df, + filter_methods) + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_combined_segmentation_points", time.time(), t_get_combined_segmentation.elapsed) + # Create and store trips and places based on the segmentation points if segmentation_points is None: epq.mark_segmentation_failed(user_id) @@ -103,13 +129,15 @@ def segment_current_trips(user_id): logging.debug("len(segmentation_points) == 0, early return") epq.mark_segmentation_done(user_id, None) else: - try: - create_places_and_trips(user_id, segmentation_points, filter_method_names[filters_in_df[0]]) - epq.mark_segmentation_done(user_id, get_last_ts_processed(filter_methods)) - except: - logging.exception("Trip generation failed for user %s" % user_id) - epq.mark_segmentation_failed(user_id) - + with ect.Timer() as t_create_places_trips: + try: + create_places_and_trips(user_id, segmentation_points, filter_method_names[filters_in_df[0]]) + epq.mark_segmentation_done(user_id, get_last_ts_processed(filter_methods)) + except: + logging.exception("Trip generation failed for user %s" % user_id) + epq.mark_segmentation_failed(user_id) + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_places_and_trips", time.time(), t_create_places_trips.elapsed) + def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods): """ We can have mixed filters in a particular time range for multiple reasons.