From bbfb32fb71ef349d8dff71b8a2fcac57cf404f3a Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Mon, 23 Dec 2024 19:07:04 +0000 Subject: [PATCH] use dict to map stop direction, backfill rt_stop_times/speedmap for single dates --- rt_segment_speeds/logs/profile1.txt | 66 ----- rt_segment_speeds/logs/profile2.txt | 203 --------------- rt_segment_speeds/logs/profile3.txt | 235 ------------------ .../scripts/nearest_vp_to_stop.py | 26 +- .../segment_speed_utils/neighbor.py | 23 +- 5 files changed, 24 insertions(+), 529 deletions(-) delete mode 100644 rt_segment_speeds/logs/profile1.txt delete mode 100644 rt_segment_speeds/logs/profile2.txt delete mode 100644 rt_segment_speeds/logs/profile3.txt diff --git a/rt_segment_speeds/logs/profile1.txt b/rt_segment_speeds/logs/profile1.txt deleted file mode 100644 index dab40bcbf..000000000 --- a/rt_segment_speeds/logs/profile1.txt +++ /dev/null @@ -1,66 +0,0 @@ -Line # Mem usage Increment Occurrences Line Contents -============================================================= - 122 311.5 MiB 311.5 MiB 1 @profile - 123 def nearest_neighbor_for_stop( - 124 analysis_date: str, - 125 segment_type: Literal[SEGMENT_TYPES], - 126 config_path: Optional[Path] = GTFS_DATA_DICT - 127 ): - 128 """ - 129 Set up nearest neighbors for RT stop times, which - 130 includes all trips. Use stop sequences for each trip. - 131 """ - 132 311.7 MiB 0.2 MiB 1 start = datetime.datetime.now() - 133 - 134 311.7 MiB 0.0 MiB 1 dict_inputs = config_path[segment_type] - 135 - 136 311.7 MiB 0.0 MiB 1 EXPORT_FILE = f'{dict_inputs["stage2"]}_{analysis_date}' - 137 311.7 MiB 0.0 MiB 1 trip_stop_cols = [*dict_inputs["trip_stop_cols"]] - 138 - 139 311.7 MiB 0.0 MiB 1 stop_time_col_order = [ - 140 'trip_instance_key', 'shape_array_key', - 141 'stop_sequence', 'stop_id', 'stop_pair', - 142 'stop_primary_direction', 'geometry' - 143 ] - 144 - 145 311.7 MiB 0.0 MiB 1 if segment_type == "stop_segments": - 146 stop_times = stop_times_for_shape_segments(analysis_date, dict_inputs) - 147 stop_times = stop_times.reindex(columns = stop_time_col_order) - 148 - 149 311.7 MiB 0.0 MiB 1 elif segment_type == "rt_stop_times": - 150 stop_times = stop_times_for_all_trips(analysis_date) - 151 stop_times = stop_times.reindex(columns = stop_time_col_order) - 152 - 153 311.7 MiB 0.0 MiB 1 elif segment_type == "speedmap_segments": - 154 638.8 MiB 327.1 MiB 1 stop_times = stop_times_for_speedmaps(analysis_date) - 155 - 156 else: - 157 print(f"{segment_type} is not valid") - 158 - 159 2730.2 MiB 2091.4 MiB 2 gdf = neighbor.merge_stop_vp_for_nearest_neighbor( - 160 638.8 MiB 0.0 MiB 1 stop_times, analysis_date) - 161 - 162 2008.1 MiB -722.1 MiB 1 results = neighbor.add_nearest_neighbor_result_array(gdf, analysis_date) - 163 - 164 1965.8 MiB -42.3 MiB 1 del gdf, stop_times - 165 - 166 # Keep columns from results that are consistent across segment types - 167 # use trip_stop_cols as a way to uniquely key into a row - 168 1965.8 MiB 0.0 MiB 1 keep_cols = trip_stop_cols + [ - 169 "shape_array_key", - 170 "stop_geometry", - 171 "nearest_vp_arr" - 172 ] - 173 - 174 1965.8 MiB -249.9 MiB 2 utils.geoparquet_gcs_export( - 175 1965.8 MiB 0.0 MiB 1 results[keep_cols], - 176 1965.8 MiB 0.0 MiB 1 SEGMENT_GCS, - 177 1965.8 MiB 0.0 MiB 1 EXPORT_FILE, - 178 ) - 179 - 180 1715.9 MiB -249.9 MiB 1 end = datetime.datetime.now() - 181 1715.9 MiB 0.0 MiB 3 logger.info(f"nearest neighbor for {segment_type} " - 182 1715.9 MiB 0.0 MiB 2 f"{analysis_date}: {end - start}") - 183 - 184 1485.6 MiB -230.3 MiB 1 del results - 185 1485.6 MiB 0.0 MiB 1 return \ No newline at end of file diff --git a/rt_segment_speeds/logs/profile2.txt b/rt_segment_speeds/logs/profile2.txt deleted file mode 100644 index 6fea19a83..000000000 --- a/rt_segment_speeds/logs/profile2.txt +++ /dev/null @@ -1,203 +0,0 @@ -Line # Mem usage Increment Occurrences Line Contents -============================================================= - 206 312.0 MiB 312.0 MiB 1 @profile - 207 def filter_to_nearest_two_vp( - 208 analysis_date: str, - 209 segment_type: Literal[SEGMENT_TYPES], - 210 config_path: Optional[Path] = GTFS_DATA_DICT - 211 ): - 212 312.0 MiB 0.0 MiB 1 dict_inputs = config_path[segment_type] - 213 312.0 MiB 0.0 MiB 1 trip_stop_cols = [*dict_inputs["trip_stop_cols"]] - 214 312.0 MiB 0.0 MiB 1 USABLE_VP_FILE = dict_inputs["stage1"] - 215 312.0 MiB 0.0 MiB 1 INPUT_FILE = dict_inputs["stage2"] - 216 312.0 MiB 0.0 MiB 1 EXPORT_FILE = dict_inputs["stage2b"] - 217 - 218 312.0 MiB 0.0 MiB 1 start = datetime.datetime.now() - 219 - 220 312.1 MiB 0.1 MiB 2 stop_meters_df = delayed(stops_projected_against_shape)( - 221 312.0 MiB 0.0 MiB 1 INPUT_FILE, analysis_date, trip_stop_cols) - 222 - 223 312.1 MiB 0.0 MiB 2 vp_nearest = delayed(explode_vp_nearest)( - 224 312.1 MiB 0.0 MiB 1 INPUT_FILE, analysis_date, trip_stop_cols) - 225 - 226 312.1 MiB 0.0 MiB 1 subset_vp = vp_nearest.vp_idx.unique() - 227 - 228 312.1 MiB 0.0 MiB 2 vp_meters_df = delayed(get_vp_projected_against_shape)( - 229 312.1 MiB 0.0 MiB 1 USABLE_VP_FILE, - 230 312.1 MiB 0.0 MiB 1 analysis_date, - 231 312.1 MiB 0.0 MiB 1 filters = [[("vp_idx", "in", subset_vp)]] - 232 ) - 233 - 234 312.1 MiB 0.0 MiB 3 gdf = delayed(pd.merge)( - 235 312.1 MiB 0.0 MiB 1 vp_nearest, - 236 312.1 MiB 0.0 MiB 1 stop_meters_df, - 237 312.1 MiB 0.0 MiB 1 on = trip_stop_cols, - 238 312.1 MiB 0.0 MiB 1 how = "inner" - 239 ).merge( - 240 312.1 MiB 0.0 MiB 1 vp_meters_df, - 241 312.1 MiB 0.0 MiB 1 on = "vp_idx", - 242 312.1 MiB 0.0 MiB 1 how = "inner" - 243 ) - 244 - 245 # Calculate the distance between the stop and vp position - 246 # This is used to find the minimum positive and minimum negative - 247 # distance (get at vp before and after stop) - 248 312.1 MiB 0.0 MiB 2 gdf = gdf.assign( - 249 312.1 MiB 0.0 MiB 1 stop_meters = gdf.stop_meters.round(3), - 250 312.1 MiB 0.0 MiB 1 shape_meters = gdf.shape_meters.round(3), - 251 312.1 MiB 0.0 MiB 1 stop_vp_distance_meters = (gdf.stop_meters - gdf.shape_meters).round(2) - 252 ) - 253 - 254 312.1 MiB 0.0 MiB 1 gdf2 = delayed(find_two_closest_vp)(gdf, trip_stop_cols) - 255 2321.5 MiB 2009.4 MiB 1 gdf2 = compute(gdf2)[0] - 256 - 257 2321.5 MiB 0.0 MiB 1 del subset_vp, vp_nearest, stop_meters_df, vp_meters_df, gdf - 258 - 259 2753.4 MiB 431.9 MiB 2 gdf2.to_parquet( - 260 2321.5 MiB 0.0 MiB 1 f"{SEGMENT_GCS}{EXPORT_FILE}_{analysis_date}.parquet", - 261 ) - 262 - 263 2568.7 MiB -184.7 MiB 1 del gdf2 - 264 - 265 2568.7 MiB 0.0 MiB 1 end = datetime.datetime.now() - 266 2568.7 MiB 0.0 MiB 3 logger.info(f"nearest 2 vp for {segment_type} " - 267 2568.7 MiB 0.0 MiB 2 f"{analysis_date}: {end - start}") - 268 - 269 2568.7 MiB 0.0 MiB 1 return - -## second date in loop - Line # Mem usage Increment Occurrences Line Contents -============================================================= - 206 2568.7 MiB 2568.7 MiB 1 @profile - 207 def filter_to_nearest_two_vp( - 208 analysis_date: str, - 209 segment_type: Literal[SEGMENT_TYPES], - 210 config_path: Optional[Path] = GTFS_DATA_DICT - 211 ): - 212 2568.7 MiB 0.0 MiB 1 dict_inputs = config_path[segment_type] - 213 2568.7 MiB 0.0 MiB 1 trip_stop_cols = [*dict_inputs["trip_stop_cols"]] - 214 2568.7 MiB 0.0 MiB 1 USABLE_VP_FILE = dict_inputs["stage1"] - 215 2568.7 MiB 0.0 MiB 1 INPUT_FILE = dict_inputs["stage2"] - 216 2568.7 MiB 0.0 MiB 1 EXPORT_FILE = dict_inputs["stage2b"] - 217 - 218 2568.7 MiB 0.0 MiB 1 start = datetime.datetime.now() - 219 - 220 2568.7 MiB 0.0 MiB 2 stop_meters_df = delayed(stops_projected_against_shape)( - 221 2568.7 MiB 0.0 MiB 1 INPUT_FILE, analysis_date, trip_stop_cols) - 222 - 223 2568.7 MiB 0.0 MiB 2 vp_nearest = delayed(explode_vp_nearest)( - 224 2568.7 MiB 0.0 MiB 1 INPUT_FILE, analysis_date, trip_stop_cols) - 225 - 226 2568.7 MiB 0.0 MiB 1 subset_vp = vp_nearest.vp_idx.unique() - 227 - 228 2568.7 MiB 0.0 MiB 2 vp_meters_df = delayed(get_vp_projected_against_shape)( - 229 2568.7 MiB 0.0 MiB 1 USABLE_VP_FILE, - 230 2568.7 MiB 0.0 MiB 1 analysis_date, - 231 2568.7 MiB 0.0 MiB 1 filters = [[("vp_idx", "in", subset_vp)]] - 232 ) - 233 - 234 2568.7 MiB 0.0 MiB 3 gdf = delayed(pd.merge)( - 235 2568.7 MiB 0.0 MiB 1 vp_nearest, - 236 2568.7 MiB 0.0 MiB 1 stop_meters_df, - 237 2568.7 MiB 0.0 MiB 1 on = trip_stop_cols, - 238 2568.7 MiB 0.0 MiB 1 how = "inner" - 239 ).merge( - 240 2568.7 MiB 0.0 MiB 1 vp_meters_df, - 241 2568.7 MiB 0.0 MiB 1 on = "vp_idx", - 242 2568.7 MiB 0.0 MiB 1 how = "inner" - 243 ) - 244 - 245 # Calculate the distance between the stop and vp position - 246 # This is used to find the minimum positive and minimum negative - 247 # distance (get at vp before and after stop) - 248 2568.7 MiB 0.0 MiB 2 gdf = gdf.assign( - 249 2568.7 MiB 0.0 MiB 1 stop_meters = gdf.stop_meters.round(3), - 250 2568.7 MiB 0.0 MiB 1 shape_meters = gdf.shape_meters.round(3), - 251 2568.7 MiB 0.0 MiB 1 stop_vp_distance_meters = (gdf.stop_meters - gdf.shape_meters).round(2) - 252 ) - 253 - 254 2568.7 MiB 0.0 MiB 1 gdf2 = delayed(find_two_closest_vp)(gdf, trip_stop_cols) - 255 3068.6 MiB 499.9 MiB 1 gdf2 = compute(gdf2)[0] - 256 - 257 3068.6 MiB 0.0 MiB 1 del subset_vp, vp_nearest, stop_meters_df, vp_meters_df, gdf - 258 - 259 3480.6 MiB 412.0 MiB 2 gdf2.to_parquet( - 260 3068.6 MiB 0.0 MiB 1 f"{SEGMENT_GCS}{EXPORT_FILE}_{analysis_date}.parquet", - 261 ) - 262 - 263 3261.2 MiB -219.4 MiB 1 del gdf2 - 264 - 265 3261.2 MiB 0.0 MiB 1 end = datetime.datetime.now() - 266 3261.2 MiB 0.0 MiB 3 logger.info(f"nearest 2 vp for {segment_type} " - 267 3261.2 MiB 0.0 MiB 2 f"{analysis_date}: {end - start}") - 268 - 269 3261.2 MiB 0.0 MiB 1 return - - -## this is after running nearest_vp_to_stop -Line # Mem usage Increment Occurrences Line Contents -============================================================= - 206 1485.6 MiB 1485.6 MiB 1 @profile - 207 def filter_to_nearest_two_vp( - 208 analysis_date: str, - 209 segment_type: Literal[SEGMENT_TYPES], - 210 config_path: Optional[Path] = GTFS_DATA_DICT - 211 ): - 212 1485.6 MiB 0.0 MiB 1 dict_inputs = config_path[segment_type] - 213 1485.6 MiB 0.0 MiB 1 trip_stop_cols = [*dict_inputs["trip_stop_cols"]] - 214 1485.6 MiB 0.0 MiB 1 USABLE_VP_FILE = dict_inputs["stage1"] - 215 1485.6 MiB 0.0 MiB 1 INPUT_FILE = dict_inputs["stage2"] - 216 1485.6 MiB 0.0 MiB 1 EXPORT_FILE = dict_inputs["stage2b"] - 217 - 218 1485.6 MiB 0.0 MiB 1 start = datetime.datetime.now() - 219 - 220 1485.7 MiB 0.1 MiB 2 stop_meters_df = delayed(stops_projected_against_shape)( - 221 1485.6 MiB 0.0 MiB 1 INPUT_FILE, analysis_date, trip_stop_cols) - 222 - 223 1485.7 MiB 0.0 MiB 2 vp_nearest = delayed(explode_vp_nearest)( - 224 1485.7 MiB 0.0 MiB 1 INPUT_FILE, analysis_date, trip_stop_cols) - 225 - 226 1485.7 MiB 0.0 MiB 1 subset_vp = vp_nearest.vp_idx.unique() - 227 - 228 1485.7 MiB 0.0 MiB 2 vp_meters_df = delayed(get_vp_projected_against_shape)( - 229 1485.7 MiB 0.0 MiB 1 USABLE_VP_FILE, - 230 1485.7 MiB 0.0 MiB 1 analysis_date, - 231 1485.7 MiB 0.0 MiB 1 filters = [[("vp_idx", "in", subset_vp)]] - 232 ) - 233 - 234 1485.7 MiB 0.0 MiB 3 gdf = delayed(pd.merge)( - 235 1485.7 MiB 0.0 MiB 1 vp_nearest, - 236 1485.7 MiB 0.0 MiB 1 stop_meters_df, - 237 1485.7 MiB 0.0 MiB 1 on = trip_stop_cols, - 238 1485.7 MiB 0.0 MiB 1 how = "inner" - 239 ).merge( - 240 1485.7 MiB 0.0 MiB 1 vp_meters_df, - 241 1485.7 MiB 0.0 MiB 1 on = "vp_idx", - 242 1485.7 MiB 0.0 MiB 1 how = "inner" - 243 ) - 244 - 245 # Calculate the distance between the stop and vp position - 246 # This is used to find the minimum positive and minimum negative - 247 # distance (get at vp before and after stop) - 248 1485.7 MiB 0.0 MiB 2 gdf = gdf.assign( - 249 1485.7 MiB 0.0 MiB 1 stop_meters = gdf.stop_meters.round(3), - 250 1485.7 MiB 0.0 MiB 1 shape_meters = gdf.shape_meters.round(3), - 251 1485.7 MiB 0.0 MiB 1 stop_vp_distance_meters = (gdf.stop_meters - gdf.shape_meters).round(2) - 252 ) - 253 - 254 1485.7 MiB 0.0 MiB 1 gdf2 = delayed(find_two_closest_vp)(gdf, trip_stop_cols) - 255 2492.7 MiB 1007.0 MiB 1 gdf2 = compute(gdf2)[0] - 256 - 257 2492.7 MiB 0.0 MiB 1 del subset_vp, vp_nearest, stop_meters_df, vp_meters_df, gdf - 258 - 259 2565.7 MiB 73.1 MiB 2 gdf2.to_parquet( - 260 2492.7 MiB 0.0 MiB 1 f"{SEGMENT_GCS}{EXPORT_FILE}_{analysis_date}.parquet", - 261 ) - 262 - 263 2561.5 MiB -4.2 MiB 1 del gdf2 - 264 - 265 2561.5 MiB 0.0 MiB 1 end = datetime.datetime.now() - 266 2561.5 MiB 0.0 MiB 3 logger.info(f"nearest 2 vp for {segment_type} " - 267 2561.5 MiB 0.0 MiB 2 f"{analysis_date}: {end - start}") - 268 - 269 2561.5 MiB 0.0 MiB 1 return \ No newline at end of file diff --git a/rt_segment_speeds/logs/profile3.txt b/rt_segment_speeds/logs/profile3.txt deleted file mode 100644 index 39dd7e3a5..000000000 --- a/rt_segment_speeds/logs/profile3.txt +++ /dev/null @@ -1,235 +0,0 @@ -2024-07-25 11:15:41.427 | INFO | interpolate_stop_arrival:interpolate_stop_arrivals:235 - interpolate arrivals for stop_segments 2024-04-17: 2024-04-17: 0:44:52.267225 -2024-07-25 at 11:15:41 | INFO | interpolate arrivals for stop_segments 2024-04-17: 2024-04-17: 0:44:52.267225 -Filename: /home/jovyan/data-analyses/rt_segment_speeds/scripts/interpolate_stop_arrival.py - -Line # Mem usage Increment Occurrences Line Contents -============================================================= - 206 311.7 MiB 311.7 MiB 1 @profile - 207 def interpolate_stop_arrivals( - 208 analysis_date: str, - 209 segment_type: Literal[SEGMENT_TYPES], - 210 config_path: Optional[Path] = GTFS_DATA_DICT - 211 ): - 212 311.7 MiB 0.0 MiB 1 dict_inputs = config_path[segment_type] - 213 311.7 MiB 0.0 MiB 1 trip_stop_cols = [*dict_inputs["trip_stop_cols"]] - 214 311.7 MiB 0.0 MiB 1 INPUT_FILE = dict_inputs["stage2b"] - 215 311.7 MiB 0.0 MiB 1 STOP_ARRIVALS_FILE = dict_inputs["stage3"] - 216 - 217 311.7 MiB 0.0 MiB 1 start = datetime.datetime.now() - 218 - 219 867.0 MiB 555.3 MiB 2 df = add_arrival_time( - 220 311.7 MiB 0.0 MiB 1 INPUT_FILE, - 221 311.7 MiB 0.0 MiB 1 analysis_date, - 222 311.7 MiB 0.0 MiB 1 trip_stop_cols + ["shape_array_key"] - 223 ) - 224 - 225 1314.0 MiB 447.0 MiB 2 results = enforce_monotonicity_and_interpolate_across_stops( - 226 867.0 MiB 0.0 MiB 1 df, trip_stop_cols) - 227 - 228 1530.5 MiB 216.5 MiB 2 results.to_parquet( - 229 1314.0 MiB 0.0 MiB 1 f"{SEGMENT_GCS}{STOP_ARRIVALS_FILE}_{analysis_date}.parquet" - 230 ) - 231 - 232 1171.5 MiB -359.0 MiB 1 del results, df - 233 - 234 1171.5 MiB 0.0 MiB 1 end = datetime.datetime.now() - 235 1171.5 MiB 0.0 MiB 4 logger.info(f"interpolate arrivals for {segment_type} " - 236 1171.5 MiB 0.0 MiB 3 f"{analysis_date}: {analysis_date}: {end - start}") - 237 - 238 1171.5 MiB 0.0 MiB 1 return - - -2024-07-25 at 11:18:32 | INFO | speeds by segment for stop_segments 2024-04-17: 0:02:50.738942 -Filename: /home/jovyan/data-analyses/rt_segment_speeds/scripts/stop_arrivals_to_speed.py - -Line # Mem usage Increment Occurrences Line Contents -============================================================= - 111 1171.5 MiB 1171.5 MiB 1 @profile - 112 def calculate_speed_from_stop_arrivals( - 113 analysis_date: str, - 114 segment_type: Literal[SEGMENT_TYPES], - 115 config_path: Optional[Path] = GTFS_DATA_DICT, - 116 ): - 117 """ - 118 Calculate speed between the interpolated stop arrivals of - 119 2 stops. Use current stop to subsequent stop, to match - 120 with the segments cut by gtfs_segments.create_segments - 121 """ - 122 1171.5 MiB 0.0 MiB 1 dict_inputs = config_path[segment_type] - 123 - 124 1171.5 MiB 0.0 MiB 1 trip_cols = ["trip_instance_key"] - 125 1171.5 MiB 0.0 MiB 1 trip_stop_cols = [*dict_inputs["trip_stop_cols"]] - 126 - 127 # speedmap segments shoulse the full concatenated one - 128 1171.5 MiB 0.0 MiB 1 if segment_type == "speedmap_segments": - 129 STOP_ARRIVALS_FILE = f"{dict_inputs['stage3b']}_{analysis_date}" - 130 else: - 131 1171.5 MiB 0.0 MiB 1 STOP_ARRIVALS_FILE = f"{dict_inputs['stage3']}_{analysis_date}" - 132 - 133 1171.5 MiB 0.0 MiB 1 SPEED_FILE = f"{dict_inputs['stage4']}_{analysis_date}" - 134 - 135 1171.5 MiB 0.0 MiB 1 start = datetime.datetime.now() - 136 - 137 1399.6 MiB 228.1 MiB 2 df = pd.read_parquet( - 138 1171.5 MiB 0.0 MiB 1 f"{SEGMENT_GCS}{STOP_ARRIVALS_FILE}.parquet" - 139 ) - 140 - 141 1399.6 MiB -484.4 MiB 4 df = segment_calcs.convert_timestamp_to_seconds( - 142 1399.6 MiB 0.0 MiB 1 df, ["arrival_time"] - 143 1239.7 MiB -460.2 MiB 2 ).sort_values(trip_stop_cols).reset_index(drop=True) - 144 - 145 1077.4 MiB -322.1 MiB 2 df = df.assign( - 146 1055.7 MiB -0.1 MiB 3 subseq_arrival_time_sec = (df.groupby(trip_cols, - 147 1055.7 MiB 0.0 MiB 1 observed=True, group_keys=False) - 148 .arrival_time_sec - 149 1055.7 MiB 0.0 MiB 1 .shift(-1) - 150 ), - 151 1055.6 MiB -43.7 MiB 3 subseq_stop_meters = (df.groupby(trip_cols, - 152 1055.6 MiB 0.0 MiB 1 observed=True, group_keys=False) - 153 .stop_meters - 154 1055.6 MiB 0.0 MiB 1 .shift(-1) - 155 ) - 156 ) - 157 - 158 2252.9 MiB 1175.5 MiB 4 speed = df.assign( - 159 1077.4 MiB 0.0 MiB 1 meters_elapsed = df.subseq_stop_meters - df.stop_meters, - 160 1077.4 MiB 0.0 MiB 1 sec_elapsed = df.subseq_arrival_time_sec - df.arrival_time_sec, - 161 ).pipe( - 162 1164.7 MiB 0.0 MiB 1 segment_calcs.derive_speed, - 163 1164.7 MiB 0.0 MiB 1 ("stop_meters", "subseq_stop_meters"), - 164 1164.7 MiB 0.0 MiB 1 ("arrival_time_sec", "subseq_arrival_time_sec") - 165 ).pipe( - 166 1339.1 MiB 0.0 MiB 1 attach_operator_natural_identifiers, - 167 1339.1 MiB 0.0 MiB 1 analysis_date, - 168 1339.1 MiB 0.0 MiB 1 segment_type - 169 ) - 170 - 171 2808.4 MiB 555.5 MiB 2 speed.to_parquet( - 172 2252.9 MiB 0.0 MiB 1 f"{SEGMENT_GCS}{SPEED_FILE}.parquet") - 173 - 174 2808.4 MiB 0.0 MiB 1 end = datetime.datetime.now() - 175 2808.4 MiB 0.0 MiB 3 logger.info(f"speeds by segment for {segment_type} " - 176 2808.4 MiB 0.0 MiB 2 f"{analysis_date}: {end - start}") - 177 - 178 - 179 2808.4 MiB 0.0 MiB 1 return - - -## another example - -2024-07-25 at 13:45:42 | INFO | interpolate arrivals for stop_segments 2024-06-12: 2024-06-12: 0:12:48.380165 -Filename: /home/jovyan/data-analyses/rt_segment_speeds/scripts/interpolate_stop_arrival.py - -Line # Mem usage Increment Occurrences Line Contents -============================================================= - 241 2605.8 MiB 2605.8 MiB 1 @profile - 242 def interpolate_stop_arrivals( - 243 analysis_date: str, - 244 segment_type: Literal[SEGMENT_TYPES], - 245 config_path: Optional[Path] = GTFS_DATA_DICT - 246 ): - 247 2605.8 MiB 0.0 MiB 1 dict_inputs = config_path[segment_type] - 248 2605.8 MiB 0.0 MiB 1 trip_stop_cols = [*dict_inputs["trip_stop_cols"]] - 249 2605.8 MiB 0.0 MiB 1 USABLE_VP_FILE = dict_inputs["stage1"] - 250 2605.8 MiB 0.0 MiB 1 INPUT_FILE = dict_inputs["stage2b"] - 251 2605.8 MiB 0.0 MiB 1 STOP_ARRIVALS_FILE = dict_inputs["stage3"] - 252 - 253 2605.8 MiB 0.0 MiB 1 start = datetime.datetime.now() - 254 - 255 2605.8 MiB 0.0 MiB 2 df = delayed(add_arrival_time)( - 256 2605.8 MiB 0.0 MiB 1 INPUT_FILE, - 257 2605.8 MiB 0.0 MiB 1 USABLE_VP_FILE, - 258 2605.8 MiB 0.0 MiB 1 analysis_date, - 259 2605.8 MiB 0.0 MiB 1 trip_stop_cols + ["shape_array_key"] - 260 ) - 261 - 262 2605.8 MiB 0.0 MiB 2 results = delayed(enforce_monotonicity_and_interpolate_across_stops)( - 263 2605.8 MiB 0.0 MiB 1 df, trip_stop_cols) - 264 - 265 2122.0 MiB -483.8 MiB 1 results = compute(results)[0] - 266 - 267 2295.0 MiB 173.0 MiB 2 results.to_parquet( - 268 2122.0 MiB 0.0 MiB 1 f"{SEGMENT_GCS}{STOP_ARRIVALS_FILE}_{analysis_date}.parquet" - 269 ) - 270 - 271 2295.0 MiB 0.0 MiB 1 end = datetime.datetime.now() - 272 2295.0 MiB 0.0 MiB 4 logger.info(f"interpolate arrivals for {segment_type} " - 273 2295.0 MiB 0.0 MiB 3 f"{analysis_date}: {analysis_date}: {end - start}") - 274 - 275 2295.0 MiB 0.0 MiB 1 return - - -2024-07-25 at 13:48:13 | INFO | speeds by segment for stop_segments 2024-06-12: 0:02:31.544118 -Filename: /home/jovyan/data-analyses/rt_segment_speeds/scripts/stop_arrivals_to_speed.py - -Line # Mem usage Increment Occurrences Line Contents -============================================================= - 111 2191.7 MiB 2191.7 MiB 1 @profile - 112 def calculate_speed_from_stop_arrivals( - 113 analysis_date: str, - 114 segment_type: Literal[SEGMENT_TYPES], - 115 config_path: Optional[Path] = GTFS_DATA_DICT, - 116 ): - 117 """ - 118 Calculate speed between the interpolated stop arrivals of - 119 2 stops. Use current stop to subsequent stop, to match - 120 with the segments cut by gtfs_segments.create_segments - 121 """ - 122 2191.7 MiB 0.0 MiB 1 dict_inputs = config_path[segment_type] - 123 - 124 2191.7 MiB 0.0 MiB 1 trip_cols = ["trip_instance_key"] - 125 2191.7 MiB 0.0 MiB 1 trip_stop_cols = [*dict_inputs["trip_stop_cols"]] - 126 - 127 # speedmap segments shoulse the full concatenated one - 128 2191.7 MiB 0.0 MiB 1 if segment_type == "speedmap_segments": - 129 STOP_ARRIVALS_FILE = f"{dict_inputs['stage3b']}_{analysis_date}" - 130 else: - 131 2191.7 MiB 0.0 MiB 1 STOP_ARRIVALS_FILE = f"{dict_inputs['stage3']}_{analysis_date}" - 132 - 133 2191.7 MiB 0.0 MiB 1 SPEED_FILE = f"{dict_inputs['stage4']}_{analysis_date}" - 134 - 135 2191.7 MiB 0.0 MiB 1 start = datetime.datetime.now() - 136 - 137 2336.1 MiB 144.4 MiB 2 df = pd.read_parquet( - 138 2191.7 MiB 0.0 MiB 1 f"{SEGMENT_GCS}{STOP_ARRIVALS_FILE}.parquet" - 139 ) - 140 - 141 2336.1 MiB -146.7 MiB 4 df = segment_calcs.convert_timestamp_to_seconds( - 142 2336.1 MiB 0.0 MiB 1 df, ["arrival_time"] - 143 2330.5 MiB -64.5 MiB 2 ).sort_values(trip_stop_cols).reset_index(drop=True) - 144 - 145 2289.1 MiB -58.8 MiB 2 df = df.assign( - 146 2242.8 MiB -0.1 MiB 3 subseq_arrival_time_sec = (df.groupby(trip_cols, - 147 2242.8 MiB 0.0 MiB 1 observed=True, group_keys=False) - 148 .arrival_time_sec - 149 2242.8 MiB 0.0 MiB 1 .shift(-1) - 150 ), - 151 2254.5 MiB 11.8 MiB 3 subseq_stop_meters = (df.groupby(trip_cols, - 152 2242.7 MiB 0.0 MiB 1 observed=True, group_keys=False) - 153 .stop_meters - 154 2242.7 MiB 0.0 MiB 1 .shift(-1) - 155 ) - 156 ) - 157 - 158 3186.1 MiB 897.0 MiB 4 speed = df.assign( - 159 2289.1 MiB 0.0 MiB 1 meters_elapsed = df.subseq_stop_meters - df.stop_meters, - 160 2289.1 MiB 0.0 MiB 1 sec_elapsed = df.subseq_arrival_time_sec - df.arrival_time_sec, - 161 ).pipe( - 162 2409.5 MiB 0.0 MiB 1 segment_calcs.derive_speed, - 163 2409.5 MiB 0.0 MiB 1 ("stop_meters", "subseq_stop_meters"), - 164 2409.5 MiB 0.0 MiB 1 ("arrival_time_sec", "subseq_arrival_time_sec") - 165 ).pipe( - 166 2478.4 MiB 0.0 MiB 1 attach_operator_natural_identifiers, - 167 2478.4 MiB 0.0 MiB 1 analysis_date, - 168 2478.4 MiB 0.0 MiB 1 segment_type - 169 ) - 170 - 171 3501.6 MiB 315.5 MiB 2 speed.to_parquet( - 172 3186.1 MiB 0.0 MiB 1 f"{SEGMENT_GCS}{SPEED_FILE}.parquet") - 173 - 174 3501.6 MiB 0.0 MiB 1 end = datetime.datetime.now() - 175 3501.6 MiB 0.0 MiB 3 logger.info(f"speeds by segment for {segment_type} " - 176 3501.6 MiB 0.0 MiB 2 f"{analysis_date}: {end - start}") - 177 - 178 - 179 3501.6 MiB 0.0 MiB 1 return \ No newline at end of file diff --git a/rt_segment_speeds/scripts/nearest_vp_to_stop.py b/rt_segment_speeds/scripts/nearest_vp_to_stop.py index 6706637b4..6e64d0a3f 100644 --- a/rt_segment_speeds/scripts/nearest_vp_to_stop.py +++ b/rt_segment_speeds/scripts/nearest_vp_to_stop.py @@ -12,8 +12,7 @@ from pathlib import Path from typing import Literal, Optional -from calitp_data_analysis.geography_utils import WGS84 -from segment_speed_utils import helpers, neighbor +from segment_speed_utils import helpers, neighbor, vp_transform from update_vars import SEGMENT_GCS, GTFS_DATA_DICT from segment_speed_utils.project_vars import SEGMENT_TYPES @@ -82,7 +81,6 @@ def stop_times_for_all_trips( "geometry"], with_direction = True, get_pandas = True, - crs = WGS84 ) return stop_times @@ -157,7 +155,10 @@ def nearest_neighbor_for_stop( gdf = neighbor.merge_stop_vp_for_nearest_neighbor(stop_times, analysis_date) - + gdf = gdf.assign( + opposite_direction = gdf.stop_primary_direction.map(vp_transform.OPPOSITE_DIRECTIONS) + ) + vp_before, vp_after, vp_before_meters, vp_after_meters = np.vectorize( neighbor.two_nearest_neighbor_near_stop )( @@ -165,25 +166,23 @@ def nearest_neighbor_for_stop( gdf.vp_geometry, gdf.vp_idx, gdf.stop_geometry, - gdf.stop_primary_direction, + gdf.opposite_direction, gdf.shape_geometry, gdf.stop_meters ) - gdf2 = gdf.assign( + gdf = gdf[trip_stop_cols + ["shape_array_key"]] + + gdf = gdf.assign( prior_vp_idx = vp_before, subseq_vp_idx = vp_after, prior_vp_meters = vp_before_meters, subseq_vp_meters = vp_after_meters - )[trip_stop_cols + [ - "shape_array_key", "stop_meters", - "prior_vp_idx", "subseq_vp_idx", - "prior_vp_meters", "subseq_vp_meters"] - ] + ) - del gdf, stop_times + del stop_times - gdf2.to_parquet(f"{SEGMENT_GCS}{EXPORT_FILE}.parquet") + gdf.to_parquet(f"{SEGMENT_GCS}{EXPORT_FILE}.parquet") end = datetime.datetime.now() logger.info(f"nearest neighbor for {segment_type} " @@ -191,6 +190,7 @@ def nearest_neighbor_for_stop( return + ''' if __name__ == "__main__": diff --git a/rt_segment_speeds/segment_speed_utils/neighbor.py b/rt_segment_speeds/segment_speed_utils/neighbor.py index ad08361e8..82b4d7fd3 100644 --- a/rt_segment_speeds/segment_speed_utils/neighbor.py +++ b/rt_segment_speeds/segment_speed_utils/neighbor.py @@ -6,11 +6,10 @@ import pandas as pd import shapely -from segment_speed_utils import helpers, vp_transform +from segment_speed_utils import helpers from segment_speed_utils.project_vars import SEGMENT_GCS, GTFS_DATA_DICT, PROJECT_CRS from shared_utils import geo_utils - def merge_stop_vp_for_nearest_neighbor( stop_times: gpd.GeoDataFrame, analysis_date: str, @@ -37,7 +36,9 @@ def merge_stop_vp_for_nearest_neighbor( crs = PROJECT_CRS, get_pandas = True, filters = [[("shape_array_key", "in", stop_times.shape_array_key.tolist())]] - ).rename(columns = {"geometry": "shape_geometry"}) + ).rename( + columns = {"geometry": "shape_geometry"} + ).dropna(subset="shape_geometry") gdf = pd.merge( stop_times.rename( @@ -57,7 +58,7 @@ def merge_stop_vp_for_nearest_neighbor( # projected onto shape_geometry and is interpreted as # stop X is Y meters along shape gdf = gdf.assign( - stop_meters = gdf.shape_geometry.project(gdf.stop_geometry) + stop_meters = gdf.shape_geometry.project(gdf.stop_geometry), ) return gdf @@ -141,13 +142,13 @@ def filter_to_nearest2_vp( return before_idx, after_idx, before_vp_meters, after_vp_meters - + def two_nearest_neighbor_near_stop( vp_direction_array: np.ndarray, vp_geometry: shapely.LineString, vp_idx_array: np.ndarray, stop_geometry: shapely.Point, - stop_direction: str, + opposite_stop_direction: str, shape_geometry: shapely.LineString, stop_meters: float ) -> np.ndarray: @@ -159,13 +160,11 @@ def two_nearest_neighbor_near_stop( When we're doing nearest neighbor search, we want to first filter the full array down to valid vp before snapping it. - """ - opposite_direction = vp_transform.OPPOSITE_DIRECTIONS[stop_direction] - + """ # These are the valid index values where opposite direction # is excluded - valid_indices = (vp_direction_array != opposite_direction).nonzero() - + valid_indices = (vp_direction_array != opposite_stop_direction).nonzero() + # These are vp coords where index values of opposite direction is excluded valid_vp_coords_array = np.array(vp_geometry.coords)[valid_indices] @@ -185,4 +184,4 @@ def two_nearest_neighbor_near_stop( stop_meters, ) - return before_vp, after_vp, before_meters, after_meters \ No newline at end of file + return before_vp, after_vp, before_meters, after_meters