From 01402a9b52ba6968fc69b00c98e434e271cbece1 Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Thu, 5 Dec 2024 08:56:07 -0800 Subject: [PATCH 1/7] Removed Bottom 80% functions from latest snapshot. All values have times <0.5s --- .../intake/segmentation/trip_segmentation.py | 58 +++--- .../dwell_segmentation_dist_filter.py | 175 ++++++------------ 2 files changed, 83 insertions(+), 150 deletions(-) diff --git a/emission/analysis/intake/segmentation/trip_segmentation.py b/emission/analysis/intake/segmentation/trip_segmentation.py index 7b47bd49b..addac70e7 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation.py +++ b/emission/analysis/intake/segmentation/trip_segmentation.py @@ -51,28 +51,20 @@ def segment_into_trips(self, timeseries, time_query): pass def segment_current_trips(user_id): - with ect.Timer() as t_get_time_series: - ts = esta.TimeSeries.get_time_series(user_id) - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_series", time.time(), t_get_time_series.elapsed) - - with ect.Timer() as t_get_time_range: - time_query = epq.get_time_range_for_segmentation(user_id) - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_range_for_segmentation", time.time(), t_get_time_range.elapsed) + ts = esta.TimeSeries.get_time_series(user_id) + time_query = epq.get_time_range_for_segmentation(user_id) import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_time_filter as dstf import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_dist_filter as dsdf - with ect.Timer() as t_create_time_filter: - dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold=5 * 60, # 5 mins - point_threshold=9, - distance_threshold=100) # 100 m - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_time_filter", time.time(), t_create_time_filter.elapsed) - with ect.Timer() as t_create_dist_filter: - dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold=10 * 60, # 10 mins - point_threshold=9, - distance_threshold=50) # 50 m - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_dist_filter", time.time(), t_create_dist_filter.elapsed) + dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold=5 * 60, # 5 mins + point_threshold=9, + distance_threshold=100) # 100 m + + dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold=10 * 60, # 10 mins + point_threshold=9, + distance_threshold=50) # 50 m filter_methods = {"time": dstfsm, "distance": dsdfsm} filter_method_names = {"time": "DwellSegmentationTimeFilter", "distance": "DwellSegmentationDistFilter"} @@ -89,24 +81,20 @@ def segment_current_trips(user_id): epq.mark_segmentation_done(user_id, None) return - with ect.Timer() as t_handle_out_of_order: - out_of_order_points = loc_df[loc_df.ts.diff() < 0] - if len(out_of_order_points) > 0: - logging.info("Found out of order points!") - logging.info("%s" % out_of_order_points) - # drop from the table - loc_df = loc_df.drop(out_of_order_points.index.tolist()) - loc_df.reset_index(inplace=True) - # invalidate in the database. - out_of_order_id_list = out_of_order_points["_id"].tolist() - logging.debug("out_of_order_id_list = %s" % out_of_order_id_list) - for ooid in out_of_order_id_list: - ts.invalidate_raw_entry(ooid) - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/handle_out_of_order_points", time.time(), t_handle_out_of_order.elapsed) - - with ect.Timer() as t_get_filters: - filters_in_df = loc_df["filter"].dropna().unique() - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_filters_in_df", time.time(), t_get_filters.elapsed) + out_of_order_points = loc_df[loc_df.ts.diff() < 0] + if len(out_of_order_points) > 0: + logging.info("Found out of order points!") + logging.info("%s" % out_of_order_points) + # drop from the table + loc_df = loc_df.drop(out_of_order_points.index.tolist()) + loc_df.reset_index(inplace=True) + # invalidate in the database. + out_of_order_id_list = out_of_order_points["_id"].tolist() + logging.debug("out_of_order_id_list = %s" % out_of_order_id_list) + for ooid in out_of_order_id_list: + ts.invalidate_raw_entry(ooid) + + filters_in_df = loc_df["filter"].dropna().unique() logging.debug("Filters in the dataframe = %s" % filters_in_df) if len(filters_in_df) == 1: diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py index 2ffd6f058..5494920ec 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py @@ -60,14 +60,7 @@ def segment_into_trips(self, timeseries, time_query): t_get_filtered_points.elapsed ) - with ect.Timer() as t_mark_valid: - self.filtered_points_df.loc[:, "valid"] = True - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/mark_valid", - time.time(), - t_mark_valid.elapsed - ) + self.filtered_points_df.loc[:, "valid"] = True with ect.Timer() as t_get_transition_df: self.transition_df = timeseries.get_data_df("statemachine/transition", time_query) @@ -103,14 +96,7 @@ def segment_into_trips(self, timeseries, time_query): # segmentation_points.append(currPoint) if just_ended: - with ect.Timer() as t_continue_just_ended: - continue_flag = self.continue_just_ended(idx, currPoint, self.filtered_points_df) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/continue_just_ended", - time.time(), - t_continue_just_ended.elapsed - ) + continue_flag = self.continue_just_ended(idx, currPoint, self.filtered_points_df) if continue_flag: # We have "processed" the currPoint by deciding to glom it @@ -119,14 +105,7 @@ def segment_into_trips(self, timeseries, time_query): # else: sel_point = currPoint logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx)) - with ect.Timer() as t_set_start_point: - curr_trip_start_point = sel_point - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/set_new_trip_start_point", - time.time(), - t_set_start_point.elapsed - ) + curr_trip_start_point = sel_point just_ended = False else: with ect.Timer() as t_process_trip: @@ -137,48 +116,27 @@ def segment_into_trips(self, timeseries, time_query): max(idx - self.point_threshold, curr_trip_start_point.idx):idx + 1 ] lastPoint = self.find_last_valid_point(idx) - with ect.Timer() as t_has_trip_ended: - trip_ended = self.has_trip_ended(lastPoint, currPoint, timeseries) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/has_trip_ended", - time.time(), - t_has_trip_ended.elapsed - ) + trip_ended = self.has_trip_ended(lastPoint, currPoint, timeseries) if trip_ended: - with ect.Timer() as t_get_last_trip_end_point: - last_trip_end_point = lastPoint - logging.debug("Appending last_trip_end_point %s with index %s " % - (last_trip_end_point, idx - 1)) - segmentation_points.append((curr_trip_start_point, last_trip_end_point)) - logging.info("Found trip end at %s" % last_trip_end_point.fmt_time) - # We have processed everything up to the trip end by marking it as a completed trip - self.last_ts_processed = currPoint.metadata_write_ts - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/get_last_trip_end_point", - time.time(), - t_get_last_trip_end_point.elapsed - ) - - with ect.Timer() as t_handle_trip_end: - just_ended = True - # Now, we have finished processing the previous point as a trip - # end or not. But we still need to process this point by seeing - # whether it should represent a new trip start, or a glom to the - # previous trip - if not self.continue_just_ended(idx, currPoint, self.filtered_points_df): - sel_point = currPoint - logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx)) - curr_trip_start_point = sel_point - just_ended = False - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/handle_trip_end", - time.time(), - t_handle_trip_end.elapsed - ) + last_trip_end_point = lastPoint + logging.debug("Appending last_trip_end_point %s with index %s " % + (last_trip_end_point, idx - 1)) + segmentation_points.append((curr_trip_start_point, last_trip_end_point)) + logging.info("Found trip end at %s" % last_trip_end_point.fmt_time) + # We have processed everything up to the trip end by marking it as a completed trip + self.last_ts_processed = currPoint.metadata_write_ts + just_ended = True + # Now, we have finished processing the previous point as a trip + # end or not. But we still need to process this point by seeing + # whether it should represent a new trip start, or a glom to the + # previous trip + if not self.continue_just_ended(idx, currPoint, self.filtered_points_df): + sel_point = currPoint + logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx)) + curr_trip_start_point = sel_point + just_ended = False + esds.store_pipeline_time( user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/loop", @@ -186,57 +144,44 @@ def segment_into_trips(self, timeseries, time_query): t_loop.elapsed ) - with ect.Timer() as t_post_loop: - # Since we only end a trip when we start a new trip, this means that - # the last trip that was pushed is ignored. Consider the example of - # 2016-02-22 when I took kids to karate. We arrived shortly after 4pm, - # so during that remote push, a trip end was not detected. And we got - # back home shortly after 5pm, so the trip end was only detected on the - # phone at 6pm. At that time, the following points were pushed: - # ..., 2016-02-22T16:04:02, 2016-02-22T16:49:34, 2016-02-22T16:49:50, - # ..., 2016-02-22T16:57:04 - # Then, on the server, while iterating through the points, we detected - # a trip end at 16:04, and a new trip start at 16:49. But we did not - # detect the trip end at 16:57, because we didn't start a new point. - # This has two issues: - # - we won't see this trip until the next trip start, which may be on the next day - # - we won't see this trip at all, because when we run the pipeline the - # next time, we will only look at points from that time onwards. These - # points have been marked as "processed", so they won't even be considered. - - # There are multiple potential fixes: - # - we can mark only the completed trips as processed. This will solve (2) above, but not (1) - # - we can mark a trip end based on the fact that we only push data - # when a trip ends, so if we have data, it means that the trip has been - # detected as ended on the phone. - # This seems a bit fragile - what if we start pushing incomplete trip - # data for efficiency reasons? Therefore, we also check to see if there - # is a trip_end_detected in this timeframe after the last point. If so, - # then we end the trip at the last point that we have. - if not just_ended and len(self.transition_df) > 0: - with ect.Timer() as t_check_transitions: - stopped_moving_after_last = self.transition_df[ - (self.transition_df.ts > currPoint.ts) & (self.transition_df.transition == 2) - ] - logging.debug("stopped_moving_after_last = %s" % stopped_moving_after_last[["fmt_time", "transition"]]) - if len(stopped_moving_after_last) > 0: - logging.debug("Found %d transitions after last point, ending trip..." % len(stopped_moving_after_last)) - segmentation_points.append((curr_trip_start_point, currPoint)) - self.last_ts_processed = currPoint.metadata_write_ts - else: - logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last)) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/check_transitions_post_loop", - time.time(), - t_check_transitions.elapsed - ) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/post_loop", - time.time(), - t_post_loop.elapsed - ) + + # Since we only end a trip when we start a new trip, this means that + # the last trip that was pushed is ignored. Consider the example of + # 2016-02-22 when I took kids to karate. We arrived shortly after 4pm, + # so during that remote push, a trip end was not detected. And we got + # back home shortly after 5pm, so the trip end was only detected on the + # phone at 6pm. At that time, the following points were pushed: + # ..., 2016-02-22T16:04:02, 2016-02-22T16:49:34, 2016-02-22T16:49:50, + # ..., 2016-02-22T16:57:04 + # Then, on the server, while iterating through the points, we detected + # a trip end at 16:04, and a new trip start at 16:49. But we did not + # detect the trip end at 16:57, because we didn't start a new point. + # This has two issues: + # - we won't see this trip until the next trip start, which may be on the next day + # - we won't see this trip at all, because when we run the pipeline the + # next time, we will only look at points from that time onwards. These + # points have been marked as "processed", so they won't even be considered. + + # There are multiple potential fixes: + # - we can mark only the completed trips as processed. This will solve (2) above, but not (1) + # - we can mark a trip end based on the fact that we only push data + # when a trip ends, so if we have data, it means that the trip has been + # detected as ended on the phone. + # This seems a bit fragile - what if we start pushing incomplete trip + # data for efficiency reasons? Therefore, we also check to see if there + # is a trip_end_detected in this timeframe after the last point. If so, + # then we end the trip at the last point that we have. + if not just_ended and len(self.transition_df) > 0: + stopped_moving_after_last = self.transition_df[ + (self.transition_df.ts > currPoint.ts) & (self.transition_df.transition == 2) + ] + logging.debug("stopped_moving_after_last = %s" % stopped_moving_after_last[["fmt_time", "transition"]]) + if len(stopped_moving_after_last) > 0: + logging.debug("Found %d transitions after last point, ending trip..." % len(stopped_moving_after_last)) + segmentation_points.append((curr_trip_start_point, currPoint)) + self.last_ts_processed = currPoint.metadata_write_ts + else: + logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last)) return segmentation_points From f497ddf43b9cc17d071c430dcf7a1dcd22cf7681 Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Sat, 7 Dec 2024 17:50:43 -0800 Subject: [PATCH 2/7] Defluffed time filter and new one from dist filter --- .../dwell_segmentation_dist_filter.py | 9 +- .../dwell_segmentation_time_filter.py | 161 +++++++----------- 2 files changed, 63 insertions(+), 107 deletions(-) diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py index 5494920ec..388d23848 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py @@ -62,14 +62,7 @@ def segment_into_trips(self, timeseries, time_query): self.filtered_points_df.loc[:, "valid"] = True - with ect.Timer() as t_get_transition_df: - self.transition_df = timeseries.get_data_df("statemachine/transition", time_query) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/get_transition_df", - time.time(), - t_get_transition_df.elapsed - ) + self.transition_df = timeseries.get_data_df("statemachine/transition", time_query) if len(self.transition_df) > 0: logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]]) diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py index 3febdca20..4cf216e58 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py @@ -65,40 +65,17 @@ def segment_into_trips(self, timeseries, time_query): data that they want from the sensor streams in order to determine the segmentation points. """ - with ect.Timer() as t_get_filtered_points_pre: - filtered_points_pre_ts_diff_df = timeseries.get_data_df("background/filtered_location", time_query) - user_id = filtered_points_pre_ts_diff_df["user_id"].iloc[0] - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_filtered_points_pre_ts_diff_df", - time.time(), - t_get_filtered_points_pre.elapsed - ) - - with ect.Timer() as t_filter_bogus_points: - # Sometimes, we can get bogus points because data.ts and - # metadata.write_ts are off by a lot. If we don't do this, we end up - # appearing to travel back in time - # https://github.com/e-mission/e-mission-server/issues/457 - filtered_points_df = filtered_points_pre_ts_diff_df[ - (filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000 - ] - filtered_points_df.reset_index(inplace=True) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/filter_bogus_points", - time.time(), - t_filter_bogus_points.elapsed - ) - - with ect.Timer() as t_get_transition_df: - transition_df = timeseries.get_data_df("statemachine/transition", time_query) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_transition_df", - time.time(), - t_get_transition_df.elapsed - ) + filtered_points_pre_ts_diff_df = timeseries.get_data_df("background/filtered_location", time_query) + user_id = filtered_points_pre_ts_diff_df["user_id"].iloc[0] + # Sometimes, we can get bogus points because data.ts and + # metadata.write_ts are off by a lot. If we don't do this, we end up + # appearing to travel back in time + # https://github.com/e-mission/e-mission-server/issues/457 + filtered_points_df = filtered_points_pre_ts_diff_df[ + (filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000 + ] + filtered_points_df.reset_index(inplace=True) + transition_df = timeseries.get_data_df("statemachine/transition", time_query) if len(transition_df) > 0: logging.debug("transition_df = %s" % transition_df[["fmt_time", "transition"]]) @@ -135,47 +112,40 @@ def segment_into_trips(self, timeseries, time_query): curr_trip_start_point = sel_point just_ended = False - with ect.Timer() as t_calculations: - last5MinsPoints_df = filtered_points_df[np.logical_and( - np.logical_and( - filtered_points_df.ts > currPoint.ts - self.time_threshold, - filtered_points_df.ts < currPoint.ts - ), - filtered_points_df.ts >= curr_trip_start_point.ts - )] - # Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive. - # Using .iloc just ends up including points after this one. - # So we reset_index upstream and use it here. - # We are going to use the last 8 points for now. - # TODO: Change this back to last 10 points once we normalize phone and this - last10Points_df = filtered_points_df.iloc[ - max(idx - self.point_threshold, curr_trip_start_point.idx):idx + 1 - ] - distanceToLast = lambda row: pf.calDistance(ad.AttrDict(row), currPoint) - timeToLast = lambda row: currPoint.ts - ad.AttrDict(row).ts - last5MinsDistances = last5MinsPoints_df.apply(distanceToLast, axis=1) - logging.debug("last5MinsDistances = %s with length %d" % (last5MinsDistances.to_numpy(), len(last5MinsDistances))) - last10PointsDistances = last10Points_df.apply(distanceToLast, axis=1) - logging.debug("last10PointsDistances = %s with length %d, shape %s" % ( - last10PointsDistances.to_numpy(), - len(last10PointsDistances), - last10PointsDistances.shape - )) - - # Fix for https://github.com/e-mission/e-mission-server/issues/348 - last5MinTimes = last5MinsPoints_df.apply(timeToLast, axis=1) - - logging.debug("len(last10PointsDistances) = %d, len(last5MinsDistances) = %d" % - (len(last10PointsDistances), len(last5MinsDistances))) - logging.debug("last5MinTimes.max() = %s, time_threshold = %s" % - (last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold)) + last5MinsPoints_df = filtered_points_df[np.logical_and( + np.logical_and( + filtered_points_df.ts > currPoint.ts - self.time_threshold, + filtered_points_df.ts < currPoint.ts + ), + filtered_points_df.ts >= curr_trip_start_point.ts + )] + # Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive. + # Using .iloc just ends up including points after this one. + # So we reset_index upstream and use it here. + # We are going to use the last 8 points for now. + # TODO: Change this back to last 10 points once we normalize phone and this + last10Points_df = filtered_points_df.iloc[ + max(idx - self.point_threshold, curr_trip_start_point.idx):idx + 1 + ] + distanceToLast = lambda row: pf.calDistance(ad.AttrDict(row), currPoint) + timeToLast = lambda row: currPoint.ts - ad.AttrDict(row).ts + last5MinsDistances = last5MinsPoints_df.apply(distanceToLast, axis=1) + logging.debug("last5MinsDistances = %s with length %d" % (last5MinsDistances.to_numpy(), len(last5MinsDistances))) + last10PointsDistances = last10Points_df.apply(distanceToLast, axis=1) + logging.debug("last10PointsDistances = %s with length %d, shape %s" % ( + last10PointsDistances.to_numpy(), + len(last10PointsDistances), + last10PointsDistances.shape + )) + + # Fix for https://github.com/e-mission/e-mission-server/issues/348 + last5MinTimes = last5MinsPoints_df.apply(timeToLast, axis=1) + + logging.debug("len(last10PointsDistances) = %d, len(last5MinsDistances) = %d" % + (len(last10PointsDistances), len(last5MinsDistances))) + logging.debug("last5MinTimes.max() = %s, time_threshold = %s" % + (last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold)) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculations_per_iteration", - time.time(), - t_calculations.elapsed - ) with ect.Timer() as t_has_trip_ended: if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes): @@ -216,31 +186,24 @@ def segment_into_trips(self, timeseries, time_query): t_loop.elapsed ) - with ect.Timer() as t_post_loop: - logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" % - (just_ended, len(transition_df))) - if not just_ended and len(transition_df) > 0: - stopped_moving_after_last = transition_df[ - (transition_df.ts > currPoint.ts) & (transition_df.transition == 2) - ] - logging.debug("looking after %s, found transitions %s" % - (currPoint.ts, stopped_moving_after_last)) - if len(stopped_moving_after_last) > 0: - (unused, last_trip_end_point) = self.get_last_trip_end_point( - filtered_points_df, - last10Points_df, - None - ) - segmentation_points.append((curr_trip_start_point, last_trip_end_point)) - logging.debug("Found trip end at %s" % last_trip_end_point.fmt_time) - # We have processed everything up to the trip end by marking it as a completed trip - self.last_ts_processed = currPoint.metadata_write_ts - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/post_loop", - time.time(), - t_post_loop.elapsed - ) + logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" % + (just_ended, len(transition_df))) + if not just_ended and len(transition_df) > 0: + stopped_moving_after_last = transition_df[ + (transition_df.ts > currPoint.ts) & (transition_df.transition == 2) + ] + logging.debug("looking after %s, found transitions %s" % + (currPoint.ts, stopped_moving_after_last)) + if len(stopped_moving_after_last) > 0: + (unused, last_trip_end_point) = self.get_last_trip_end_point( + filtered_points_df, + last10Points_df, + None + ) + segmentation_points.append((curr_trip_start_point, last_trip_end_point)) + logging.debug("Found trip end at %s" % last_trip_end_point.fmt_time) + # We have processed everything up to the trip end by marking it as a completed trip + self.last_ts_processed = currPoint.metadata_write_ts return segmentation_points From 65042c63fdd2e6791704f7a4cf3aa63428a82ef6 Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Mon, 16 Dec 2024 11:55:15 -0800 Subject: [PATCH 3/7] Fix file handling to prevent ValueError and terminal crashes - Replaced manual file open/close with `with` statement to ensure proper resource management. - Removed redundant file read operation after the file was closed. - Resolved `ValueError: I/O operation on closed file` and addressed terminal crashing issue during execution. --- .../net/ext_service/transit_matching/match_stops.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/emission/net/ext_service/transit_matching/match_stops.py b/emission/net/ext_service/transit_matching/match_stops.py index afa5d6abd..3e6deb1c3 100644 --- a/emission/net/ext_service/transit_matching/match_stops.py +++ b/emission/net/ext_service/transit_matching/match_stops.py @@ -15,12 +15,12 @@ url = "https://lz4.overpass-api.de/" try: - query_file = open('conf/net/ext_service/overpass_transit_stops_query_template') -except: + with open('conf/net/ext_service/overpass_transit_stops_query_template', 'r', encoding='UTF-8') as query_file: + query_string = "".join(query_file.readlines()) +except FileNotFoundError: print("transit stops query not configured, falling back to default") - query_file = open('conf/net/ext_service/overpass_transit_stops_query_template.sample') - -query_string = "".join(query_file.readlines()) + with open('conf/net/ext_service/overpass_transit_stops_query_template.sample', 'r', encoding='UTF-8') as query_file: + query_string = "".join(query_file.readlines()) RETRY = -1 From 23e60d577c279403c49aa7aeb3250913c9ae36dc Mon Sep 17 00:00:00 2001 From: Shankari Date: Mon, 16 Dec 2024 13:55:08 -0800 Subject: [PATCH 4/7] =?UTF-8?q?=F0=9F=94=8A=20Turn=20on=20debug=20logging?= =?UTF-8?q?=20for=20the=20silent=20push=20notifications=20as=20well?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We had already turned on debug logging for the "remind" case acf6864b7f168771198d1774799129c3d79b437a so that we can more easily debug errors It looks like we have had multiple issues with FCM deprecation/migrations recently and this will be helpful until the APIs stabilize. --- bin/push/silent_ios_push.py | 1 + 1 file changed, 1 insertion(+) diff --git a/bin/push/silent_ios_push.py b/bin/push/silent_ios_push.py index c4887abc9..03c7da5b2 100644 --- a/bin/push/silent_ios_push.py +++ b/bin/push/silent_ios_push.py @@ -7,6 +7,7 @@ from builtins import * import json import logging +logging.basicConfig(level=logging.DEBUG) import argparse import emission.net.ext_service.push.notify_usage as pnu From 400f9fa95f9cd1d8e64e8217cf27e7bfa8361112 Mon Sep 17 00:00:00 2001 From: Shankari Date: Mon, 16 Dec 2024 14:03:47 -0800 Subject: [PATCH 5/7] =?UTF-8?q?=F0=9F=9B=82=20Login=20to=20the=20FCM=20map?= =?UTF-8?q?ping=20service=20using=20OAuth2=20tokens?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit FCM is doubling down on the "I'm going to change my API and break everything" approach. We made one round of fixes in: https://github.com/e-mission/e-mission-docs/issues/1094#issuecomment-2395599699 at which time the mapping to convert APNS tokens to FCM was working However, in the ~ 2 months since, that has also regressed, and we are now getting a 401 error with the old code. The new requirements include: - using an OAuth2 token instead of the server API key - passing in `"access_token_auth": "true"` as a header We already use an OAuth2 token to log in and actually send the messages ``` DEBUG:google.auth.transport.requests:Making request: POST https://oauth2.googleapis.com/token DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): oauth2.googleapis.com:443 DEBUG:urllib3.connectionpool:https://oauth2.googleapis.com:443 "POST /token HTTP/1.1" 200 None DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): fcm.googleapis.com:443 ``` So it seems like it would be best to just reuse it for this call as well. However, that token is retrieved from within the pyfcm library and is not easily exposed outside the library. Instead of retrieving the token, this change retrieves the entire authorization header. This header includes the token, but is also formatted correctly with the `Bearer` prefix and is accessible through the `requests_session` property. With this change, the mapping is successful and both silent and visible push notification are sent to iOS phones. Before the change: ``` DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): iid.googleapis.com:443 DEBUG:urllib3.connectionpool:https://iid.googleapis.com:443 "POST /iid/v1:batchImport HTTP/1.1" 401 None DEBUG:root:Response = Received invalid result for batch starting at = 0 after mapping iOS tokens, imported 0 -> processed 0 ``` After the change ``` DEBUG:root:Reading existing headers from current session {'User-Agent': 'python-requests/2.28.2', 'Accept-Encoding': 'gzip, deflate, br', 'Accept': '*/*', 'Connection': 'keep-alive', 'Content-Type': 'application/json', 'Authorization': 'Bearer ...'} DEBUG:root:About to send message {'application': 'gov.nrel.cims.openpath', 'sandbox': False, 'apns_tokens': [.... DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): iid.googleapis.com:443 DEBUG:urllib3.connectionpool:https://iid.googleapis.com:443 "POST /iid/v1:batchImport HTTP/1.1" 200 None DEBUG:root:Response = DEBUG:root:Found firebase mapping from ... at index 0 DEBUG:root:Found firebase mapping from ... at index 1 DEBUG:root:Found firebase mapping from ... at index 2 ... ``` Visible push ``` ... s see if the fix actually worked" -e nrelop_open-access_default_1hITb1CUmGT4iNqUgnifhDreySbQUrtP WARNING:root:Push configured for app gov.nrel.cims.openpath using platform firebase with token AAAAsojuOg... of length 152 after mapping iOS tokens, imported 0 -> processed 0 combo token map has 1 ios entries and 0 android entries {'success': 0, 'failure': 0, 'results': {}} Successfully sent to cK0jHHKUjS... {'success': 1, 'failure': 0, 'results': {'cK0jHHKUjS': 'projects/nrel-openpath/messages/1734384976007500'}} ``` --- .../push/notify_interface_impl/firebase.py | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/emission/net/ext_service/push/notify_interface_impl/firebase.py b/emission/net/ext_service/push/notify_interface_impl/firebase.py index d0b0671f2..4e6bc1b92 100644 --- a/emission/net/ext_service/push/notify_interface_impl/firebase.py +++ b/emission/net/ext_service/push/notify_interface_impl/firebase.py @@ -91,12 +91,17 @@ def map_existing_fcm_tokens(self, token_map): unmapped_token_list.append(token) return (mapped_token_map, unmapped_token_list) - def retrieve_fcm_tokens(self, token_list, dev): + def retrieve_fcm_tokens(self, push_service, token_list, dev): if len(token_list) == 0: logging.debug("len(token_list) == 0, skipping fcm token mapping to save API call") return [] importedResultList = [] - importHeaders = {"Authorization": "key=%s" % self.server_auth_token, + existing_headers = push_service.requests_session.headers + logging.debug(f"Reading existing headers from current session {existing_headers}") + # Copying over the authorization from existing headers since, as of Dec + # 2024, we cannot use the server API key and must use an OAuth2 token instead + importHeaders = {"Authorization": existing_headers['Authorization'], + "access_token_auth": "true", "Content-Type": "application/json"} for curr_first in range(0, len(token_list), 100): curr_batch = token_list[curr_first:curr_first + 100] @@ -115,7 +120,7 @@ def retrieve_fcm_tokens(self, token_list, dev): print("After appending result of size %s, total size = %s" % (len(importedResult), len(importedResultList))) else: - print(f"Received invalid result for batch starting at = {curr_first}") + print(f"Received invalid response {importResponse} for batch starting at = {curr_first}") return importedResultList def process_fcm_token_result(self, importedResultList): @@ -133,9 +138,9 @@ def process_fcm_token_result(self, importedResultList): (result, i)); return ret_list - def convert_to_fcm_if_necessary(self, token_map, dev): + def convert_to_fcm_if_necessary(self, push_service, token_map, dev): (mapped_token_map, unmapped_token_list) = self.map_existing_fcm_tokens(token_map) - importedResultList = self.retrieve_fcm_tokens(unmapped_token_list, dev) + importedResultList = self.retrieve_fcm_tokens(push_service, unmapped_token_list, dev) newly_mapped_token_list = self.process_fcm_token_result(importedResultList) print("after mapping iOS tokens, imported %s -> processed %s" % (len(importedResultList), len(newly_mapped_token_list))) @@ -152,15 +157,15 @@ def send_visible_notification(self, token_map, title, message, json_data, dev=Fa logging.info("len(token_map) == 0, early return to save api calls") return - # convert tokens if necessary - fcm_token_map = self.convert_to_fcm_if_necessary(token_map, dev) - push_service = FCMNotification( service_account_file=self.service_account_file, project_id=self.project_id) # Send android and iOS messages separately because they have slightly # different formats # https://github.com/e-mission/e-mission-server/issues/564#issuecomment-360720598 + # convert tokens if necessary + fcm_token_map = self.convert_to_fcm_if_necessary(push_service, token_map, dev) + android_response = self.notify_multiple_devices(push_service, fcm_token_map["android"], notification_body = message, @@ -192,7 +197,7 @@ def send_silent_notification(self, token_map, json_data, dev=False): project_id=self.project_id) # convert tokens if necessary - fcm_token_map = self.convert_to_fcm_if_necessary(token_map, dev) + fcm_token_map = self.convert_to_fcm_if_necessary(push_service, token_map, dev) response = {} response["ios"] = self.notify_multiple_devices(push_service, From 7f2e460f38169aee1c7adfe808b0ded87e6bf0bd Mon Sep 17 00:00:00 2001 From: Jack Greenlee Date: Thu, 19 Dec 2024 01:22:31 -0500 Subject: [PATCH 6/7] =?UTF-8?q?clear=20AND=20reload=20analysis=20config=20?= =?UTF-8?q?on=20tests=E2=80=99=20tearDown?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There are several tests that call `etc.set_analysis_config` in their `setUp()` to set config options in certain testing scenarios. This creates a config file, and in tearDown of each test the config file is deleted via os.remove. However, just deleting the file is not enough! When emission.analysis.config is intialized, we read the config and cache it in a variable `config_data` https://github.com/e-mission/e-mission-server/blob/4034533bfc95dbd7c976146341acd3527f9bc7c9/emission/analysis/config.py#L19 When get_config() is called while the pipeline is running, it returns the value of `config_data`, which can be out of sync with the contents of the config file if the config file was added/removed/modified and `reload_config()` was not called. So instead of just calling `os.remove()` these tests also need to call `reload_config()` in their `tearDown()`. Added this by way of a new function in `emission.tests.common` Also, I made the config file pathnames into constants for tidiness and to eliminate the risk of typo-related bugs --- emission/analysis/config.py | 10 +++++--- .../intakeTests/TestFilterAccuracy.py | 5 ++-- .../intakeTests/TestPipelineRealData.py | 9 +++---- .../intakeTests/TestSectionSegmentation.py | 6 ++--- .../intakeTests/TestTripSegmentation.py | 5 ++-- emission/tests/common.py | 24 +++++++++++-------- .../netTests/TestMetricsCleanedSections.py | 5 ++-- .../netTests/TestMetricsConfirmedTrips.py | 5 ++-- 8 files changed, 34 insertions(+), 35 deletions(-) diff --git a/emission/analysis/config.py b/emission/analysis/config.py index d484e5354..8dc514dcf 100644 --- a/emission/analysis/config.py +++ b/emission/analysis/config.py @@ -1,17 +1,21 @@ import json import os +ANALYSIS_CONF_PATH = "conf/analysis/debug.conf.json" +ANALYSIS_CONF_PROD_PATH = "conf/analysis/debug.conf.prod.json" +ANALYSIS_CONF_DEV_PATH = "conf/analysis/debug.conf.dev.json" + def get_config_data(): try: print("Trying to open debug.conf.json") - config_file = open('conf/analysis/debug.conf.json') + config_file = open(ANALYSIS_CONF_PATH) except: if os.getenv("PROD_STAGE") == "TRUE": print("In production environment, config not overridden, using default production debug.conf") - config_file = open('conf/analysis/debug.conf.prod.json') + config_file = open(ANALYSIS_CONF_PROD_PATH) else: print("analysis.debug.conf.json not configured, falling back to sample, default configuration") - config_file = open('conf/analysis/debug.conf.dev.json') + config_file = open(ANALYSIS_CONF_DEV_PATH) ret_val = json.load(config_file) config_file.close() return ret_val diff --git a/emission/tests/analysisTests/intakeTests/TestFilterAccuracy.py b/emission/tests/analysisTests/intakeTests/TestFilterAccuracy.py index 55ae19d90..e51e881a1 100644 --- a/emission/tests/analysisTests/intakeTests/TestFilterAccuracy.py +++ b/emission/tests/analysisTests/intakeTests/TestFilterAccuracy.py @@ -34,15 +34,14 @@ def setUp(self): import emission.core.get_database as edb import uuid - self.analysis_conf_path = \ - etc.set_analysis_config("intake.cleaning.filter_accuracy.enable", True) + etc.set_analysis_config("intake.cleaning.filter_accuracy.enable", True) self.testUUID = None def tearDown(self): import emission.core.get_database as edb edb.get_timeseries_db().delete_many({"user_id": self.testUUID}) edb.get_pipeline_state_db().delete_many({"user_id": self.testUUID}) - os.remove(self.analysis_conf_path) + etc.clear_analysis_config() def checkSuccessfulRun(self): pipelineState = edb.get_pipeline_state_db().find_one({"user_id": self.testUUID, diff --git a/emission/tests/analysisTests/intakeTests/TestPipelineRealData.py b/emission/tests/analysisTests/intakeTests/TestPipelineRealData.py index f01cdc042..35808bbcf 100644 --- a/emission/tests/analysisTests/intakeTests/TestPipelineRealData.py +++ b/emission/tests/analysisTests/intakeTests/TestPipelineRealData.py @@ -60,8 +60,7 @@ class TestPipelineRealData(unittest.TestCase): def setUp(self): # Thanks to M&J for the number! np.random.seed(61297777) - self.analysis_conf_path = \ - etc.set_analysis_config("analysis.result.section.key", "analysis/cleaned_section") + etc.set_analysis_config("analysis.result.section.key", "analysis/cleaned_section") logging.info("setUp complete") def tearDown(self): @@ -76,8 +75,7 @@ def tearDown(self): # to determine whether to switch to a new implementation if not hasattr(self, "evaluation") or not self.evaluation: self.clearRelatedDb() - if hasattr(self, "analysis_conf_path"): - os.remove(self.analysis_conf_path) + etc.clear_analysis_config() if hasattr(self, "seed_mode_path"): os.remove(self.seed_mode_path) logging.info("tearDown complete") @@ -744,8 +742,7 @@ def testJackUntrackedTimeMar12(self): def testJackUntrackedTimeMar12InferredSections(self): # Setup to use the inferred sections - self.analysis_conf_path = \ - etc.set_analysis_config("analysis.result.section.key", "analysis/inferred_section") + etc.set_analysis_config("analysis.result.section.key", "analysis/inferred_section") # along with the proper random seed self.seed_mode_path = etc.copy_dummy_seed_for_inference() dataFile = "emission/tests/data/real_examples/jack_untracked_time_2023-03-12" diff --git a/emission/tests/analysisTests/intakeTests/TestSectionSegmentation.py b/emission/tests/analysisTests/intakeTests/TestSectionSegmentation.py index df37a3c77..4162962fd 100644 --- a/emission/tests/analysisTests/intakeTests/TestSectionSegmentation.py +++ b/emission/tests/analysisTests/intakeTests/TestSectionSegmentation.py @@ -41,8 +41,7 @@ class TestSectionSegmentation(unittest.TestCase): def setUp(self): - self.analysis_conf_path = \ - etc.set_analysis_config("intake.cleaning.filter_accuracy.enable", True) + etc.set_analysis_config("intake.cleaning.filter_accuracy.enable", True) etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-aug-27") self.androidUUID = self.testUUID @@ -58,8 +57,7 @@ def setUp(self): def tearDown(self): if not hasattr(self, "evaluation") or not self.evaluation: self.clearRelatedDb() - if hasattr(self, "analysis_conf_path"): - os.remove(self.analysis_conf_path) + etc.clear_analysis_config() def clearRelatedDb(self): edb.get_timeseries_db().delete_many({"user_id": self.androidUUID}) diff --git a/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py b/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py index 0cc469fea..37af7bea6 100644 --- a/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py +++ b/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py @@ -36,8 +36,7 @@ class TestTripSegmentation(unittest.TestCase): def setUp(self): - self.analysis_conf_path = \ - etc.set_analysis_config("intake.cleaning.filter_accuracy.enable", True) + etc.set_analysis_config("intake.cleaning.filter_accuracy.enable", True) etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-aug-27") self.androidUUID = self.testUUID @@ -51,7 +50,7 @@ def setUp(self): logging.debug("androidUUID = %s, iosUUID = %s" % (self.androidUUID, self.iosUUID)) def tearDown(self): - os.remove(self.analysis_conf_path) + etc.clear_analysis_config() edb.get_timeseries_db().delete_many({"user_id": self.androidUUID}) edb.get_timeseries_db().delete_many({"user_id": self.iosUUID}) edb.get_pipeline_state_db().delete_many({"user_id": self.androidUUID}) diff --git a/emission/tests/common.py b/emission/tests/common.py index baae6053c..76e745555 100644 --- a/emission/tests/common.py +++ b/emission/tests/common.py @@ -263,25 +263,29 @@ def createDummyRequestEnviron(self, addl_headers, request_body): return test_environ def set_analysis_config(key, value): + """ + Tests that call this in their setUp must call clear_analysis_config in their tearDown + """ import emission.analysis.config as eac import shutil - analysis_conf_path = "conf/analysis/debug.conf.json" - shutil.copyfile("conf/analysis/debug.conf.dev.json", - analysis_conf_path) - with open(analysis_conf_path) as fd: + shutil.copyfile(eac.ANALYSIS_CONF_DEV_PATH, eac.ANALYSIS_CONF_PATH) + with open(eac.ANALYSIS_CONF_PATH) as fd: curr_config = json.load(fd) curr_config[key] = value - with open(analysis_conf_path, "w") as fd: + with open(eac.ANALYSIS_CONF_PATH, "w") as fd: json.dump(curr_config, fd, indent=4) - logging.debug("Finished setting up %s" % analysis_conf_path) - with open(analysis_conf_path) as fd: + logging.debug("Finished setting up %s" % eac.ANALYSIS_CONF_PATH) + with open(eac.ANALYSIS_CONF_PATH) as fd: logging.debug("Current values are %s" % json.load(fd)) eac.reload_config() - - # Return this so that we can delete it in the teardown - return analysis_conf_path + +def clear_analysis_config(): + import emission.analysis.config as eac + if os.path.exists(eac.ANALYSIS_CONF_PATH): + os.remove(eac.ANALYSIS_CONF_PATH) + eac.reload_config() def copy_dummy_seed_for_inference(): import shutil diff --git a/emission/tests/netTests/TestMetricsCleanedSections.py b/emission/tests/netTests/TestMetricsCleanedSections.py index 24eac37a6..66f4f08e6 100644 --- a/emission/tests/netTests/TestMetricsCleanedSections.py +++ b/emission/tests/netTests/TestMetricsCleanedSections.py @@ -23,8 +23,7 @@ class TestMetrics(unittest.TestCase): def setUp(self): - self.analysis_conf_path = \ - etc.set_analysis_config("analysis.result.section.key", "analysis/cleaned_section") + etc.set_analysis_config("analysis.result.section.key", "analysis/cleaned_section") etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-aug-21") self.testUUID1 = self.testUUID @@ -41,7 +40,7 @@ def setUp(self): def tearDown(self): self.clearRelatedDb() - os.remove(self.analysis_conf_path) + etc.clear_analysis_config() def clearRelatedDb(self): edb.get_timeseries_db().delete_many({"user_id": self.testUUID}) diff --git a/emission/tests/netTests/TestMetricsConfirmedTrips.py b/emission/tests/netTests/TestMetricsConfirmedTrips.py index 27a91dda0..444c5ed60 100644 --- a/emission/tests/netTests/TestMetricsConfirmedTrips.py +++ b/emission/tests/netTests/TestMetricsConfirmedTrips.py @@ -17,8 +17,7 @@ class TestMetrics(unittest.TestCase): def setUp(self): - self.analysis_conf_path = \ - etc.set_analysis_config("analysis.result.section.key", "analysis/confirmed_trip") + etc.set_analysis_config("analysis.result.section.key", "analysis/confirmed_trip") self._loadDataFileAndInputs("emission/tests/data/real_examples/shankari_2016-06-20") self.testUUID1 = self.testUUID self._loadDataFileAndInputs("emission/tests/data/real_examples/shankari_2016-06-21") @@ -39,7 +38,7 @@ def _loadDataFileAndInputs(self, dataFile): def tearDown(self): self.clearRelatedDb() - os.remove(self.analysis_conf_path) + etc.clear_analysis_config() def clearRelatedDb(self): edb.get_timeseries_db().delete_many({"user_id": self.testUUID1}) From 09a27469d8a119c8d5465f691649309407f433c3 Mon Sep 17 00:00:00 2001 From: Jack Greenlee Date: Thu, 19 Dec 2024 23:05:02 -0500 Subject: [PATCH 7/7] fix EnvironmentLocationNotFound error during test-with-manual-install "Teardown the test environment" has been consistently failing. ``` Run source setup/teardown_tests.sh Removing environment from CondaError: Run 'conda init' before 'conda deactivate' EnvironmentLocationNotFound: Not a conda environment: /usr/share/miniconda/envs/emissiontest Error: Process completed with exit code 1. ``` I noticed that test-with-manual-install.yml follows a similar teardown procedure (with 'emission' as the environment instead of 'emissiontest') I saw that test-with-manual-install.yml invokes activates_conda.sh again right before calling the teardown script --- .github/workflows/test-with-manual-install.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test-with-manual-install.yml b/.github/workflows/test-with-manual-install.yml index 4a81eb000..bfb8b530b 100644 --- a/.github/workflows/test-with-manual-install.yml +++ b/.github/workflows/test-with-manual-install.yml @@ -70,4 +70,6 @@ jobs: - name: Teardown the test environment shell: bash -l {0} - run: source setup/teardown_tests.sh + run: | + source setup/activate_conda.sh + source setup/teardown_tests.sh