diff --git a/emission/analysis/intake/segmentation/trip_segmentation.py b/emission/analysis/intake/segmentation/trip_segmentation.py index 7b47bd49b..5ce46cf54 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation.py +++ b/emission/analysis/intake/segmentation/trip_segmentation.py @@ -209,13 +209,22 @@ def create_places_and_trips(user_id, segmentation_points, segmentation_method_na # description of dealing with gaps in tracking can be found in the wiki. # Let us first deal with the easy case. # restart_events_df = get_restart_events(ts, time_query) - ts = esta.TimeSeries.get_time_series(user_id) - last_place_entry = esdp.get_last_place_entry(esda.RAW_PLACE_KEY, user_id) + + with ect.Timer() as t_get_time_series: + ts = esta.TimeSeries.get_time_series(user_id) + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_places_and_trips/get_time_series", time.time(), t_get_time_series.elapsed) + + with ect.Timer() as t_get_last_place_entry: + last_place_entry = esdp.get_last_place_entry(esda.RAW_PLACE_KEY, user_id) + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_places_and_trips/get_last_place_entry", time.time(), t_get_last_place_entry.elapsed) + if last_place_entry is None: - last_place = start_new_chain(user_id) - last_place.source = segmentation_method_name - last_place_entry = ecwe.Entry.create_entry(user_id, - "segmentation/raw_place", last_place, create_id = True) + with ect.Timer() as t_start_new_chain: + last_place = start_new_chain(user_id) + last_place.source = segmentation_method_name + last_place_entry = ecwe.Entry.create_entry(user_id, + "segmentation/raw_place", last_place, create_id=True) + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_places_and_trips/start_new_chain", time.time(), t_start_new_chain.elapsed) else: last_place = last_place_entry.data @@ -223,54 +232,67 @@ def create_places_and_trips(user_id, segmentation_points, segmentation_method_na # Theoretically, we can do some sanity checks here to make sure # that we are fairly close to the last point. Maybe mark some kind # of confidence level based on that? - logging.debug("segmentation_point_list has length %s" % len(segmentation_points)) - for (start_loc_doc, end_loc_doc) in segmentation_points: - logging.debug("start_loc_doc = %s, end_loc_doc = %s" % (start_loc_doc, end_loc_doc)) - get_loc_for_row = lambda row: ts.df_row_to_entry("background/filtered_location", row).data - start_loc = get_loc_for_row(start_loc_doc) - end_loc = get_loc_for_row(end_loc_doc) - logging.debug("start_loc = %s, end_loc = %s" % (start_loc, end_loc)) - - # Stitch together the last place and the current trip - curr_trip = ecwrt.Rawtrip() - curr_trip.source = segmentation_method_name - curr_trip_entry = ecwe.Entry.create_entry(user_id, - "segmentation/raw_trip", curr_trip, create_id = True) - - new_place = ecwrp.Rawplace() - new_place.source = segmentation_method_name - new_place_entry = ecwe.Entry.create_entry(user_id, - "segmentation/raw_place", new_place, create_id = True) - - if found_untracked_period(ts, last_place_entry.data, start_loc, segmentation_method_name): - # Fill in the gap in the chain with an untracked period - curr_untracked = ecwut.Untrackedtime() - curr_untracked.source = segmentation_method_name - curr_untracked_entry = ecwe.Entry.create_entry(user_id, - "segmentation/raw_untracked", curr_untracked, create_id=True) - - restarted_place = ecwrp.Rawplace() - restarted_place.source = segmentation_method_name - restarted_place_entry = ecwe.Entry.create_entry(user_id, - "segmentation/raw_place", restarted_place, create_id=True) - - untracked_start_loc = ecwe.Entry(ts.get_entry_at_ts("background/filtered_location", - "data.ts", last_place_entry.data.enter_ts)).data - untracked_start_loc["ts"] = untracked_start_loc.ts + epq.END_FUZZ_AVOID_LTE - _link_and_save(ts, last_place_entry, curr_untracked_entry, restarted_place_entry, - untracked_start_loc, start_loc) - logging.debug("Created untracked period %s from %s to %s" % - (curr_untracked_entry.get_id(), curr_untracked_entry.data.start_ts, curr_untracked_entry.data.end_ts)) - logging.debug("Resetting last_place_entry from %s to %s" % - (last_place_entry, restarted_place_entry)) - last_place_entry = restarted_place_entry - - _link_and_save(ts, last_place_entry, curr_trip_entry, new_place_entry, start_loc, end_loc) - last_place_entry = new_place_entry - + + with ect.Timer() as t_loop_segmentation_points: + logging.debug("segmentation_point_list has length %s" % len(segmentation_points)) + for (start_loc_doc, end_loc_doc) in segmentation_points: + logging.debug("start_loc_doc = %s, end_loc_doc = %s" % (start_loc_doc, end_loc_doc)) + get_loc_for_row = lambda row: ts.df_row_to_entry("background/filtered_location", row).data + start_loc = get_loc_for_row(start_loc_doc) + end_loc = get_loc_for_row(end_loc_doc) + logging.debug("start_loc = %s, end_loc = %s" % (start_loc, end_loc)) + + # Stitch together the last place and the current trip + with ect.Timer() as t_create_raw_trip: + curr_trip = ecwrt.Rawtrip() + curr_trip.source = segmentation_method_name + curr_trip_entry = ecwe.Entry.create_entry(user_id, + "segmentation/raw_trip", curr_trip, create_id=True) + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_places_and_trips/create_raw_trip", time.time(), t_create_raw_trip.elapsed) + + with ect.Timer() as t_create_new_place: + new_place = ecwrp.Rawplace() + new_place.source = segmentation_method_name + new_place_entry = ecwe.Entry.create_entry(user_id, + "segmentation/raw_place", new_place, create_id=True) + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_places_and_trips/create_new_place", time.time(), t_create_new_place.elapsed) + + if found_untracked_period(ts, last_place_entry.data, start_loc, segmentation_method_name): + with ect.Timer() as t_handle_untracked_period: + # Fill in the gap in the chain with an untracked period + curr_untracked = ecwut.Untrackedtime() + curr_untracked.source = segmentation_method_name + curr_untracked_entry = ecwe.Entry.create_entry(user_id, + "segmentation/raw_untracked", curr_untracked, create_id=True) + + restarted_place = ecwrp.Rawplace() + restarted_place.source = segmentation_method_name + restarted_place_entry = ecwe.Entry.create_entry(user_id, + "segmentation/raw_place", restarted_place, create_id=True) + + untracked_start_loc = ecwe.Entry(ts.get_entry_at_ts("background/filtered_location", + "data.ts", last_place_entry.data.enter_ts)).data + untracked_start_loc["ts"] = untracked_start_loc.ts + epq.END_FUZZ_AVOID_LTE + _link_and_save(ts, last_place_entry, curr_untracked_entry, restarted_place_entry, + untracked_start_loc, start_loc) + logging.debug("Created untracked period %s from %s to %s" % + (curr_untracked_entry.get_id(), curr_untracked_entry.data.start_ts, curr_untracked_entry.data.end_ts)) + logging.debug("Resetting last_place_entry from %s to %s" % + (last_place_entry, restarted_place_entry)) + last_place_entry = restarted_place_entry + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_places_and_trips/handle_untracked_period", time.time(), t_handle_untracked_period.elapsed) + + with ect.Timer() as t_link_and_save: + _link_and_save(ts, last_place_entry, curr_trip_entry, new_place_entry, start_loc, end_loc) + last_place_entry = new_place_entry + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_places_and_trips/link_and_save", time.time(), t_link_and_save.elapsed) + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_places_and_trips/loop_segmentation_points", time.time(), t_loop_segmentation_points.elapsed) + # The last last_place hasn't been stitched together yet, but we # need to save it so that it can be the last_place for the next run - ts.insert(last_place_entry) + with ect.Timer() as t_insert_last_place: + ts.insert(last_place_entry) + esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_places_and_trips/insert_last_place", time.time(), t_insert_last_place.elapsed) def _link_and_save(ts, last_place_entry, curr_trip_entry, new_place_entry, start_loc, end_loc): stitch_together_start(last_place_entry, curr_trip_entry, start_loc)