diff --git a/rt_segment_speeds/logs/avg_speeds.log b/rt_segment_speeds/logs/avg_speeds.log index 866fd2852..d568f1f2b 100644 --- a/rt_segment_speeds/logs/avg_speeds.log +++ b/rt_segment_speeds/logs/avg_speeds.log @@ -12,3 +12,17 @@ 2023-08-18 14:53:19.222 | INFO | __main__::165 - execution time: 0:05:17.742302 2023-08-24 14:56:29.366 | INFO | __main__::143 - Analysis date: 2023-08-15 2023-08-24 15:02:21.592 | INFO | __main__::165 - execution time: 0:05:52.222770 +2023-09-21 14:37:32.197 | INFO | __main__::167 - Analysis date: 2023-09-13 +2023-09-21 14:43:35.784 | INFO | __main__::189 - execution time: 0:06:03.584427 +2023-09-21 15:49:19.146 | INFO | __main__::167 - Analysis date: 2023-06-14 +2023-09-21 15:54:51.743 | INFO | __main__::189 - execution time: 0:05:32.595873 +2023-09-21 17:54:03.991 | INFO | __main__::167 - Analysis date: 2023-05-17 +2023-09-21 18:00:12.730 | INFO | __main__::189 - execution time: 0:06:08.738161 +2023-09-21 18:40:52.924 | INFO | __main__::159 - Analysis date: 2023-04-12 +2023-09-21 18:47:14.179 | INFO | __main__::181 - execution time: 0:06:21.254666 +2023-09-21 19:28:44.034 | INFO | __main__::159 - Analysis date: 2023-03-15 +2023-09-21 19:34:39.639 | INFO | __main__::181 - execution time: 0:05:55.604394 +2023-09-21 19:36:10.116 | INFO | __main__::159 - Analysis date: 2023-07-12 +2023-09-21 19:41:33.957 | INFO | __main__::181 - execution time: 0:05:23.840096 +2023-09-21 19:43:00.511 | INFO | __main__::159 - Analysis date: 2023-08-15 +2023-09-21 19:49:14.412 | INFO | __main__::181 - execution time: 0:06:13.900161 diff --git a/rt_segment_speeds/logs/sjoin_vp_segments.log b/rt_segment_speeds/logs/sjoin_vp_segments.log index b1ea97d92..f6e542da7 100644 --- a/rt_segment_speeds/logs/sjoin_vp_segments.log +++ b/rt_segment_speeds/logs/sjoin_vp_segments.log @@ -42,3 +42,17 @@ 2023-09-14 13:06:15.758 | INFO | __main__::223 - execution time: 0:12:03.599629 2023-09-14 13:06:31.913 | INFO | __main__::457 - Analysis date: 2023-09-13 2023-09-14 13:14:08.656 | INFO | __main__::465 - remove erroneous sjoin results: 0:07:36.742381 +2023-09-21 10:58:58.711 | INFO | __main__::457 - Analysis date: 2023-09-13 +2023-09-21 11:05:58.377 | INFO | __main__::465 - remove erroneous sjoin results: 0:06:59.626650 +2023-09-21 12:11:55.875 | INFO | __main__::456 - Analysis date: 2023-08-15 +2023-09-21 12:19:13.216 | INFO | __main__::464 - remove erroneous sjoin results: 0:07:17.337816 +2023-09-21 12:58:44.489 | INFO | __main__::456 - Analysis date: 2023-07-12 +2023-09-21 13:07:33.610 | INFO | __main__::464 - remove erroneous sjoin results: 0:08:49.120980 +2023-09-21 15:22:02.693 | INFO | __main__::456 - Analysis date: 2023-06-14 +2023-09-21 15:29:14.363 | INFO | __main__::464 - remove erroneous sjoin results: 0:07:11.668827 +2023-09-21 17:26:50.733 | INFO | __main__::456 - Analysis date: 2023-05-17 +2023-09-21 17:33:37.420 | INFO | __main__::464 - remove erroneous sjoin results: 0:06:46.686094 +2023-09-21 18:03:52.838 | INFO | __main__::456 - Analysis date: 2023-04-12 +2023-09-21 18:11:04.091 | INFO | __main__::464 - remove erroneous sjoin results: 0:07:11.251722 +2023-09-21 18:55:33.230 | INFO | __main__::456 - Analysis date: 2023-03-15 +2023-09-21 19:05:26.800 | INFO | __main__::464 - remove erroneous sjoin results: 0:09:53.569193 diff --git a/rt_segment_speeds/logs/speeds_by_segment_trip.log b/rt_segment_speeds/logs/speeds_by_segment_trip.log index 0e9d05197..855f2090e 100644 --- a/rt_segment_speeds/logs/speeds_by_segment_trip.log +++ b/rt_segment_speeds/logs/speeds_by_segment_trip.log @@ -40,3 +40,59 @@ 2023-08-24 14:51:47.828 | INFO | __main__:linear_referencing_and_speed_by_segment:123 - calculate speeds: 0:00:00.006706 2023-08-24 14:56:14.082 | INFO | __main__::150 - speeds for stop segments: 0:04:32.610802 2023-08-24 14:56:14.083 | INFO | __main__::151 - execution time: 0:04:32.611824 +2023-09-21 11:53:11.705 | INFO | __main__::367 - Analysis date: 2023-09-13 +2023-09-21 11:53:18.285 | INFO | __main__:linear_referencing_vp_against_line:58 - set up merged vp with segments: 0:00:00.152710 +2023-09-21 11:53:18.322 | INFO | __main__:linear_referencing_vp_against_line:76 - linear referencing: 0:00:00.037063 +2023-09-21 11:57:20.821 | INFO | __main__:linear_referencing_and_speed_by_segment:300 - linear referencing: 0:04:09.111393 +2023-09-21 12:03:20.231 | INFO | __main__:linear_referencing_and_speed_by_segment:311 - make wide and get initial speeds: 0:05:59.409953 +2023-09-21 12:05:33.476 | INFO | __main__:linear_referencing_and_speed_by_segment:352 - recalculate speeds and get final: 0:02:13.244950 +2023-09-21 12:05:46.785 | INFO | __main__::375 - speeds for stop segments: 0:12:35.079026 +2023-09-21 12:05:46.788 | INFO | __main__::376 - execution time: 0:12:35.081785 +2023-09-21 12:28:12.385 | INFO | __main__::367 - Analysis date: 2023-08-15 +2023-09-21 12:28:19.027 | INFO | __main__:linear_referencing_vp_against_line:58 - set up merged vp with segments: 0:00:00.162699 +2023-09-21 12:28:19.063 | INFO | __main__:linear_referencing_vp_against_line:76 - linear referencing: 0:00:00.036557 +2023-09-21 12:32:49.165 | INFO | __main__:linear_referencing_and_speed_by_segment:300 - linear referencing: 0:04:36.761005 +2023-09-21 12:39:56.447 | INFO | __main__:linear_referencing_and_speed_by_segment:311 - make wide and get initial speeds: 0:07:07.282236 +2023-09-21 12:42:14.396 | INFO | __main__:linear_referencing_and_speed_by_segment:352 - recalculate speeds and get final: 0:02:17.948959 +2023-09-21 12:42:27.893 | INFO | __main__::375 - speeds for stop segments: 0:14:15.493395 +2023-09-21 12:42:27.894 | INFO | __main__::376 - execution time: 0:14:15.494456 +2023-09-21 13:16:38.177 | INFO | __main__::367 - Analysis date: 2023-07-12 +2023-09-21 13:16:44.655 | INFO | __main__:linear_referencing_vp_against_line:58 - set up merged vp with segments: 0:00:00.156084 +2023-09-21 13:16:44.692 | INFO | __main__:linear_referencing_vp_against_line:76 - linear referencing: 0:00:00.037555 +2023-09-21 13:20:41.231 | INFO | __main__:linear_referencing_and_speed_by_segment:300 - linear referencing: 0:04:03.035504 +2023-09-21 13:26:57.794 | INFO | __main__:linear_referencing_and_speed_by_segment:311 - make wide and get initial speeds: 0:06:16.562615 +2023-09-21 13:29:08.771 | INFO | __main__:linear_referencing_and_speed_by_segment:352 - recalculate speeds and get final: 0:02:10.977540 +2023-09-21 13:29:21.791 | INFO | __main__::375 - speeds for stop segments: 0:12:43.599595 +2023-09-21 13:29:21.792 | INFO | __main__::376 - execution time: 0:12:43.600710 +2023-09-21 15:36:54.009 | INFO | __main__::369 - Analysis date: 2023-06-14 +2023-09-21 15:36:59.961 | INFO | __main__:linear_referencing_vp_against_line:58 - set up merged vp with segments: 0:00:00.156330 +2023-09-21 15:37:00.016 | INFO | __main__:linear_referencing_vp_against_line:76 - linear referencing: 0:00:00.055084 +2023-09-21 15:40:58.894 | INFO | __main__:linear_referencing_and_speed_by_segment:302 - linear referencing: 0:04:04.880927 +2023-09-21 15:46:35.541 | INFO | __main__:linear_referencing_and_speed_by_segment:313 - make wide and get initial speeds: 0:05:36.646767 +2023-09-21 15:48:52.854 | INFO | __main__:linear_referencing_and_speed_by_segment:354 - recalculate speeds and get final: 0:02:17.313285 +2023-09-21 15:49:04.953 | INFO | __main__::377 - speeds for stop segments: 0:12:10.943637 +2023-09-21 15:49:04.954 | INFO | __main__::378 - execution time: 0:12:10.944518 +2023-09-21 17:41:01.371 | INFO | __main__::369 - Analysis date: 2023-05-17 +2023-09-21 17:41:07.436 | INFO | __main__:linear_referencing_vp_against_line:58 - set up merged vp with segments: 0:00:00.155348 +2023-09-21 17:41:07.465 | INFO | __main__:linear_referencing_vp_against_line:76 - linear referencing: 0:00:00.028611 +2023-09-21 17:45:03.306 | INFO | __main__:linear_referencing_and_speed_by_segment:302 - linear referencing: 0:04:01.914759 +2023-09-21 17:51:13.000 | INFO | __main__:linear_referencing_and_speed_by_segment:313 - make wide and get initial speeds: 0:06:09.693714 +2023-09-21 17:53:38.147 | INFO | __main__:linear_referencing_and_speed_by_segment:354 - recalculate speeds and get final: 0:02:25.147277 +2023-09-21 17:53:49.471 | INFO | __main__::377 - speeds for stop segments: 0:12:48.084151 +2023-09-21 17:53:49.474 | INFO | __main__::378 - execution time: 0:12:48.087006 +2023-09-21 18:19:33.226 | INFO | __main__::369 - Analysis date: 2023-04-12 +2023-09-21 18:19:39.961 | INFO | __main__:linear_referencing_vp_against_line:58 - set up merged vp with segments: 0:00:00.173198 +2023-09-21 18:19:39.999 | INFO | __main__:linear_referencing_vp_against_line:76 - linear referencing: 0:00:00.037629 +2023-09-21 18:23:57.932 | INFO | __main__:linear_referencing_and_speed_by_segment:302 - linear referencing: 0:04:24.702955 +2023-09-21 18:30:30.758 | INFO | __main__:linear_referencing_and_speed_by_segment:313 - make wide and get initial speeds: 0:06:32.825103 +2023-09-21 18:32:55.850 | INFO | __main__:linear_referencing_and_speed_by_segment:354 - recalculate speeds and get final: 0:02:25.091989 +2023-09-21 18:33:07.969 | INFO | __main__::377 - speeds for stop segments: 0:13:34.743167 +2023-09-21 18:33:07.972 | INFO | __main__::378 - execution time: 0:13:34.745381 +2023-09-21 19:14:16.648 | INFO | __main__::369 - Analysis date: 2023-03-15 +2023-09-21 19:14:22.748 | INFO | __main__:linear_referencing_vp_against_line:58 - set up merged vp with segments: 0:00:00.170221 +2023-09-21 19:14:22.792 | INFO | __main__:linear_referencing_vp_against_line:76 - linear referencing: 0:00:00.044492 +2023-09-21 19:18:37.589 | INFO | __main__:linear_referencing_and_speed_by_segment:302 - linear referencing: 0:04:20.919458 +2023-09-21 19:24:39.102 | INFO | __main__:linear_referencing_and_speed_by_segment:313 - make wide and get initial speeds: 0:06:01.513090 +2023-09-21 19:28:06.672 | INFO | __main__:linear_referencing_and_speed_by_segment:354 - recalculate speeds and get final: 0:03:27.569825 +2023-09-21 19:28:23.294 | INFO | __main__::377 - speeds for stop segments: 0:14:06.628501 +2023-09-21 19:28:23.296 | INFO | __main__::378 - execution time: 0:14:06.630732 diff --git a/rt_segment_speeds/logs/valid_vehicle_positions.log b/rt_segment_speeds/logs/valid_vehicle_positions.log index 6027f701c..684a0bcbe 100644 --- a/rt_segment_speeds/logs/valid_vehicle_positions.log +++ b/rt_segment_speeds/logs/valid_vehicle_positions.log @@ -85,3 +85,45 @@ 2023-09-14 13:20:47.965 | INFO | __main__:pare_down_vp_by_segment:123 - exported: 0:06:15.874961 2023-09-14 13:20:47.966 | INFO | __main__::148 - pare down vp by stop segments for all cases 0:06:17.237883 2023-09-14 13:20:47.967 | INFO | __main__::151 - execution time: 0:06:17.243094 +2023-09-21 11:06:27.840 | INFO | __main__::134 - Analysis date: 2023-09-13 +2023-09-21 11:06:29.244 | INFO | __main__:pare_down_vp_by_segment:103 - merge usable vp with sjoin results: 0:00:01.386494 +2023-09-21 11:06:29.323 | INFO | __main__:pare_down_vp_by_segment:112 - keep enter/exit points: 0:00:00.079269 +2023-09-21 11:13:01.939 | INFO | __main__:pare_down_vp_by_segment:123 - exported: 0:06:32.615887 +2023-09-21 11:13:01.940 | INFO | __main__::148 - pare down vp by stop segments for all cases 0:06:34.083216 +2023-09-21 11:13:01.941 | INFO | __main__::151 - execution time: 0:06:34.088864 +2023-09-21 12:19:30.995 | INFO | __main__::134 - Analysis date: 2023-08-15 +2023-09-21 12:19:32.432 | INFO | __main__:pare_down_vp_by_segment:103 - merge usable vp with sjoin results: 0:00:01.410420 +2023-09-21 12:19:32.535 | INFO | __main__:pare_down_vp_by_segment:112 - keep enter/exit points: 0:00:00.103218 +2023-09-21 12:27:56.376 | INFO | __main__:pare_down_vp_by_segment:123 - exported: 0:08:23.840718 +2023-09-21 12:27:56.378 | INFO | __main__::148 - pare down vp by stop segments for all cases 0:08:25.356390 +2023-09-21 12:27:56.379 | INFO | __main__::151 - execution time: 0:08:25.360301 +2023-09-21 13:07:51.526 | INFO | __main__::134 - Analysis date: 2023-07-12 +2023-09-21 13:07:52.848 | INFO | __main__:pare_down_vp_by_segment:103 - merge usable vp with sjoin results: 0:00:01.297768 +2023-09-21 13:07:52.939 | INFO | __main__:pare_down_vp_by_segment:112 - keep enter/exit points: 0:00:00.091154 +2023-09-21 13:16:20.491 | INFO | __main__:pare_down_vp_by_segment:123 - exported: 0:08:27.552189 +2023-09-21 13:16:20.493 | INFO | __main__::148 - pare down vp by stop segments for all cases 0:08:28.942646 +2023-09-21 13:16:20.493 | INFO | __main__::151 - execution time: 0:08:28.946383 +2023-09-21 15:29:36.092 | INFO | __main__::134 - Analysis date: 2023-06-14 +2023-09-21 15:29:37.363 | INFO | __main__:pare_down_vp_by_segment:103 - merge usable vp with sjoin results: 0:00:01.266538 +2023-09-21 15:29:37.460 | INFO | __main__:pare_down_vp_by_segment:112 - keep enter/exit points: 0:00:00.096562 +2023-09-21 15:36:38.508 | INFO | __main__:pare_down_vp_by_segment:123 - exported: 0:07:01.048429 +2023-09-21 15:36:38.510 | INFO | __main__::148 - pare down vp by stop segments for all cases 0:07:02.413058 +2023-09-21 15:36:38.510 | INFO | __main__::151 - execution time: 0:07:02.417441 +2023-09-21 17:33:53.360 | INFO | __main__::134 - Analysis date: 2023-05-17 +2023-09-21 17:33:54.621 | INFO | __main__:pare_down_vp_by_segment:103 - merge usable vp with sjoin results: 0:00:01.258114 +2023-09-21 17:33:54.702 | INFO | __main__:pare_down_vp_by_segment:112 - keep enter/exit points: 0:00:00.080633 +2023-09-21 17:40:45.339 | INFO | __main__:pare_down_vp_by_segment:123 - exported: 0:06:50.637139 +2023-09-21 17:40:45.341 | INFO | __main__::148 - pare down vp by stop segments for all cases 0:06:51.977448 +2023-09-21 17:40:45.341 | INFO | __main__::151 - execution time: 0:06:51.981031 +2023-09-21 18:11:20.644 | INFO | __main__::134 - Analysis date: 2023-04-12 +2023-09-21 18:11:21.907 | INFO | __main__:pare_down_vp_by_segment:103 - merge usable vp with sjoin results: 0:00:01.253286 +2023-09-21 18:11:21.982 | INFO | __main__:pare_down_vp_by_segment:112 - keep enter/exit points: 0:00:00.074969 +2023-09-21 18:19:17.733 | INFO | __main__:pare_down_vp_by_segment:123 - exported: 0:07:55.751630 +2023-09-21 18:19:17.735 | INFO | __main__::148 - pare down vp by stop segments for all cases 0:07:57.081328 +2023-09-21 18:19:17.735 | INFO | __main__::151 - execution time: 0:07:57.086467 +2023-09-21 19:05:57.111 | INFO | __main__::134 - Analysis date: 2023-03-15 +2023-09-21 19:05:58.586 | INFO | __main__:pare_down_vp_by_segment:103 - merge usable vp with sjoin results: 0:00:01.446362 +2023-09-21 19:05:58.673 | INFO | __main__:pare_down_vp_by_segment:112 - keep enter/exit points: 0:00:00.087041 +2023-09-21 19:14:00.946 | INFO | __main__:pare_down_vp_by_segment:123 - exported: 0:08:02.272685 +2023-09-21 19:14:00.947 | INFO | __main__::148 - pare down vp by stop segments for all cases 0:08:03.807576 +2023-09-21 19:14:00.947 | INFO | __main__::151 - execution time: 0:08:03.812106 diff --git a/rt_segment_speeds/scripts/A2_sjoin_postprocessing.py b/rt_segment_speeds/scripts/A2_sjoin_postprocessing.py index 32327dbb5..886dc7208 100644 --- a/rt_segment_speeds/scripts/A2_sjoin_postprocessing.py +++ b/rt_segment_speeds/scripts/A2_sjoin_postprocessing.py @@ -380,7 +380,6 @@ def remove_erroneous_sjoin_results( """ USABLE_VP = dict_inputs["stage1"] INPUT_FILE_PREFIX = dict_inputs["stage2"] - SEGMENT_IDENTIFIER_COLS = dict_inputs["segment_identifier_cols"] SEGMENT_TRIP_COLS = ["trip_instance_key"] + SEGMENT_IDENTIFIER_COLS GROUPING_COL = dict_inputs["grouping_col"] diff --git a/rt_segment_speeds/scripts/A3_loop_inlining.py b/rt_segment_speeds/scripts/A3_loop_inlining.py deleted file mode 100644 index ad4e64268..000000000 --- a/rt_segment_speeds/scripts/A3_loop_inlining.py +++ /dev/null @@ -1,168 +0,0 @@ -""" -Handle complex shapes and pare down -vehicle position points by first checking the dot product -to make sure we are keeping vehicle positions -running in the same direction as the segment. -""" -import dask.dataframe as dd -import dask_geopandas as dg -import datetime -import geopandas as gpd -import numpy as np -import pandas as pd -import shapely -import sys - -from loguru import logger - -from shared_utils.geography_utils import WGS84 -from segment_speed_utils import helpers, segment_calcs, wrangle_shapes -from segment_speed_utils.project_vars import (SEGMENT_GCS, analysis_date, - CONFIG_PATH, PROJECT_CRS) -from A3_valid_vehicle_positions import (identify_stop_segment_cases, - merge_usable_vp_with_sjoin_vpidx) - - - - -def find_errors_in_segment_groups( - vp_sjoin: dd.DataFrame, - segments: gpd.GeoDataFrame, - segment_identifier_cols: list, -) -> dd.DataFrame: - """ - For each sjoin result for each segment-trip: - (1) find the direction the segment is running - (2) use the mean timestamp to divide sjoin results into 2 groups - (3) for each group, find the first/last vp - (4) find the direction of each group of vp for segment-trip - (5) as long as vp are running in same direction as segment (dot product > 0), - keep those observations. - """ - group_cols = segment_identifier_cols + ["trip_instance_key"] - - segments = get_stop_segments_direction_vector( - segments) - - vp_grouped = split_vp_into_groups( - vp_sjoin, - group_cols, - col_to_find_groups = "location_timestamp_local" - ) - - vp_pared_by_group = get_first_last_position_in_group( - vp_grouped, group_cols) - - vp_with_segment_vec = pd.merge( - segments, - vp_pared_by_group, - on = segment_identifier_cols, - ) - - vp_dot_prod = find_vp_direction_vector( - vp_with_segment_vec, group_cols) - - # Only keep if vehicle positions are running in the same - # direction as the segment - # TODO: should we keep NaNs? NaNs weren't able to have a vector calculated, - # which could mean it's kind of an outlier in the segment, - # maybe should have been attached elsewhere - vp_same_direction = (vp_dot_prod[~(vp_dot_prod.dot_product < 0)] - [group_cols + ["group"]] - .drop_duplicates() - .reset_index(drop=True) - ) - - vp_to_keep = dd.merge( - vp_grouped, - vp_same_direction, - on = group_cols + ["group"], - how = "inner", - ).drop(columns = ["location_timestamp_local_sec", "group"]) - - return vp_to_keep - - -def pare_down_vp_for_special_cases( - analysis_date: str, - dict_inputs: dict = {} -): - """ - For special shapes, include a direction check where each - batch of vp have direction generated, and compare that against - the direction the segment is running. - """ - USABLE_VP = dict_inputs["stage1"] - INPUT_FILE_PREFIX = dict_inputs["stage2"] - SEGMENT_FILE = dict_inputs["segments_file"] - SEGMENT_IDENTIFIER_COLS = dict_inputs["segment_identifier_cols"] - GROUPING_COL = dict_inputs["grouping_col"] - TIMESTAMP_COL = dict_inputs["timestamp_col"] - EXPORT_FILE = dict_inputs["stage3"] - - - special_shapes = identify_stop_segment_cases( - analysis_date, GROUPING_COL, 1) - - vp_joined_to_segments = merge_usable_vp_with_sjoin_vpidx( - special_shapes, - f"{USABLE_VP}_{analysis_date}", - f"{INPUT_FILE_PREFIX}_{analysis_date}", - sjoin_filtering = [[(GROUPING_COL, "in", special_shapes)]], - columns = ["vp_idx", "trip_instance_key", TIMESTAMP_COL, - "x", "y"] - ) - - segments = helpers.import_segments( - file_name = f"{SEGMENT_FILE}_{analysis_date}", - filters = [[(GROUPING_COL, "in", special_shapes)]], - columns = SEGMENT_IDENTIFIER_COLS + ["geometry"], - partitioned = False - ) - - vp_pared_special = find_errors_in_segment_groups( - vp_joined_to_segments, - segments, - SEGMENT_IDENTIFIER_COLS - ) - - special_vp_to_keep = segment_calcs.keep_min_max_timestamps_by_segment( - vp_pared_special, - SEGMENT_IDENTIFIER_COLS + ["trip_instance_key"], - TIMESTAMP_COL - ) - - special_vp_to_keep = special_vp_to_keep.repartition(npartitions=1) - - special_vp_to_keep.to_parquet( - f"{SEGMENT_GCS}vp_pare_down/{EXPORT_FILE}_special_{analysis_date}", - overwrite = True) - - - -if __name__ == "__main__": - - LOG_FILE = "../logs/valid_vehicle_positions.log" - logger.add(LOG_FILE, retention="3 months") - logger.add(sys.stderr, - format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", - level="INFO") - - logger.info(f"Analysis date: {analysis_date}") - - start = datetime.datetime.now() - - STOP_SEG_DICT = helpers.get_parameters(CONFIG_PATH, "stop_segments") - - time1 = datetime.datetime.now() - - pare_down_vp_for_special_cases( - analysis_date, - dict_inputs = STOP_SEG_DICT - ) - - time2 = datetime.datetime.now() - logger.info(f"pare down vp by stop segments special cases {time2 - time1}") - - end = datetime.datetime.now() - logger.info(f"execution time: {end-start}") \ No newline at end of file diff --git a/rt_segment_speeds/scripts/A3_valid_vehicle_positions.py b/rt_segment_speeds/scripts/A3_valid_vehicle_positions.py index e77dc3dc5..880d4b6b0 100644 --- a/rt_segment_speeds/scripts/A3_valid_vehicle_positions.py +++ b/rt_segment_speeds/scripts/A3_valid_vehicle_positions.py @@ -116,7 +116,7 @@ def pare_down_vp_by_segment( .repartition(npartitions=3) ) vp_to_keep.to_parquet( - f"{SEGMENT_GCS}vp_pare_down/{EXPORT_FILE}_all_{analysis_date}", + f"{SEGMENT_GCS}{EXPORT_FILE}_{analysis_date}", overwrite=True ) diff --git a/rt_segment_speeds/scripts/A4_concatenate_vp_pared.py b/rt_segment_speeds/scripts/A4_concatenate_vp_pared.py deleted file mode 100644 index fd6db812b..000000000 --- a/rt_segment_speeds/scripts/A4_concatenate_vp_pared.py +++ /dev/null @@ -1,52 +0,0 @@ -""" -Concatenate pared down vp -from normal and special cases -and save as one file to use. -""" -import dask.dataframe as dd -import datetime - -from segment_speed_utils import helpers -from segment_speed_utils.project_vars import (analysis_date, SEGMENT_GCS, - CONFIG_PATH) - -if __name__ == "__main__": - - start = datetime.datetime.now() - - STOP_SEG_DICT = helpers.get_parameters(CONFIG_PATH, "stop_segments") - - VP_FULL_INFO = STOP_SEG_DICT["stage1"] - INPUT_FILE = STOP_SEG_DICT["stage3"] - SEGMENT_IDENTIFIER_COLS = STOP_SEG_DICT["segment_identifier_cols"] - TIMESTAMP_COL = STOP_SEG_DICT["timestamp_col"] - cases = ["normal", "special"] - - dfs = [ - dd.read_parquet( - f"{SEGMENT_GCS}vp_pare_down/{INPUT_FILE}_{c}_{analysis_date}", - columns = ["vp_idx"] + SEGMENT_IDENTIFIER_COLS - ) for c in cases - ] - - pared_down_vp = dd.multi.concat(dfs, axis=0).reset_index( - drop=True).set_index("vp_idx", sorted=False) - - vp_full_info = dd.read_parquet( - f"{SEGMENT_GCS}{VP_FULL_INFO}_{analysis_date}" - ).set_index("vp_idx", sorted=False) - - df = dd.merge( - vp_full_info, - pared_down_vp, - left_index = True, - right_index = True, - how = "inner" - ).reset_index() - - df = df.repartition(npartitions = 2) - df.to_parquet(f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}", - overwrite=True) - - end = datetime.datetime.now() - print(f"execution time: {end-start}") \ No newline at end of file diff --git a/rt_segment_speeds/scripts/B1_speeds_by_segment_trip.py b/rt_segment_speeds/scripts/B1_speeds_by_segment_trip.py index 07a4993bf..4078478f9 100644 --- a/rt_segment_speeds/scripts/B1_speeds_by_segment_trip.py +++ b/rt_segment_speeds/scripts/B1_speeds_by_segment_trip.py @@ -8,16 +8,252 @@ import dask.dataframe as dd import dask_geopandas as dg import datetime +import geopandas as gpd +import numpy as np import pandas as pd import sys from loguru import logger -from shared_utils.geography_utils import WGS84 +from shared_utils import geography_utils from segment_speed_utils import helpers, segment_calcs, wrangle_shapes from segment_speed_utils.project_vars import (SEGMENT_GCS, analysis_date, PROJECT_CRS, CONFIG_PATH) +from shared_utils.rt_utils import MPH_PER_MPS + + +def linear_referencing_vp_against_line( + vp: dd.DataFrame, + segments: gpd.GeoDataFrame, + segment_identifier_cols: list, + timestamp_col: str +) -> dd.DataFrame: + """ + Take the vp x,y columns, make into gdf. + Merge in segment geometry and do linear referencing. + Return just the shape_meters result and timestamp converted to seconds. + """ + time0 = datetime.datetime.now() + + # https://stackoverflow.com/questions/71685387/faster-methods-to-create-geodataframe-from-a-dask-or-pandas-dataframe + # https://github.com/geopandas/dask-geopandas/issues/197 + vp_gddf = dg.from_dask_dataframe( + vp, + geometry=dg.points_from_xy(vp, "x", "y") + ).set_crs(geography_utils.WGS84).to_crs(PROJECT_CRS).drop(columns = ["x", "y"]) + + vp_with_seg_geom = dd.merge( + vp_gddf, + segments, + on = segment_identifier_cols, + how = "inner" + ).rename(columns = { + "geometry_x": "vp_geometry", + "geometry_y": "segment_geometry"} + ).set_geometry("vp_geometry") + + vp_with_seg_geom = vp_with_seg_geom.repartition(npartitions=50) + + time1 = datetime.datetime.now() + logger.info(f"set up merged vp with segments: {time1 - time0}") + + shape_meters_series = vp_with_seg_geom.map_partitions( + wrangle_shapes.project_point_geom_onto_linestring, + "segment_geometry", + "vp_geometry", + meta = ("shape_meters", "float") + ) + + vp_with_seg_geom = segment_calcs.convert_timestamp_to_seconds( + vp_with_seg_geom, [timestamp_col]) + + vp_with_seg_geom = vp_with_seg_geom.assign( + shape_meters = shape_meters_series, + segment_meters = vp_with_seg_geom.segment_geometry.length + ) + + time2 = datetime.datetime.now() + logger.info(f"linear referencing: {time2 - time1}") + + drop_cols = [f"{timestamp_col}", "vp_geometry", "segment_geometry"] + vp_with_seg_geom2 = vp_with_seg_geom.drop(columns = drop_cols) + + return vp_with_seg_geom2 + + +def make_wide_get_speed( + df: dd.DataFrame, + group_cols: list, + timestamp_col: str +) -> dd.DataFrame: + """ + Get df wide and set up current vp_idx and get meters/sec_elapsed + against prior and calculate speed. + """ + vp2 = ( + df.groupby(group_cols, + observed=True, group_keys=False) + .agg({"vp_idx": "max"}) + .reset_index() + .merge( + df, + on = group_cols + ["vp_idx"], + how = "inner" + ) + ) + + vp1 = ( + df.groupby(group_cols, + observed=True, group_keys=False) + .agg({"vp_idx": "min"}) + .reset_index() + .merge( + df, + on = group_cols + ["vp_idx"], + how = "inner" + ).rename(columns = { + "vp_idx": "prior_vp_idx", + f"{timestamp_col}_sec": f"prior_{timestamp_col}_sec", + "shape_meters": "prior_shape_meters", + }) + ) + + df_wide = dd.merge( + vp2, + vp1, + on = group_cols, + how = "left" + ) + + speed = segment_calcs.derive_speed( + df_wide, + distance_cols = ("prior_shape_meters", "shape_meters"), + time_cols = (f"prior_{timestamp_col}_sec", f"{timestamp_col}_sec") + ) + + speed = speed.assign( + pct_segment = speed.meters_elapsed.divide(speed.segment_meters) + ) + + return speed + + +def filter_for_unstable_speeds( + df: pd.DataFrame, + pct_segment_threshold: float +) -> tuple[pd.DataFrame]: + ok_speeds = df[df.pct_segment > pct_segment_threshold] + low_speeds = df[df.pct_segment <= pct_segment_threshold] + + return ok_speeds, low_speeds + + +def recalculate_low_speeds_with_straight_distance( + low_speeds_df: pd.DataFrame, + group_cols: list, + timestamp_col: str +): + """ + For low speed segments, select a different vp_idx. + Use the current vp_idx and subtract by 1. + This will fill in something where the segment only had 1 point previously. + """ + keep_cols = group_cols + [ + "vp_idx", "location_timestamp_local_sec", + ] + + df1 = low_speeds_df[keep_cols].drop_duplicates().reset_index(drop=True) + + df1 = df1.assign( + prior_vp_idx = df1.vp_idx - 1 + ) + + usable_vp = dd.read_parquet( + f"{SEGMENT_GCS}vp_usable_{analysis_date}", + columns = ["trip_instance_key", + "vp_idx", timestamp_col, "x", "y"] + ) + + vp_idx_bounds = segment_calcs.get_usable_vp_bounds_by_trip(usable_vp) + + df2 = pd.merge( + df1, + vp_idx_bounds, + on = "trip_instance_key", + how = "inner" + ) + + # Check that the prior_vp_idx actually is on the same trip (must be within bounds) + # If not, select the next point + df2 = df2.assign( + prior_vp_idx = df2.apply( + lambda x: + x.vp_idx + 1 if (x.prior_vp_idx < x.min_vp_idx) and + (x.vp_idx + 1 <= x.max_vp_idx) + else x.prior_vp_idx, + axis=1) + ).drop(columns = ["trip_instance_key", "min_vp_idx", "max_vp_idx"]) + + # We will need point geom again, since we are using straight distance + subset_vp_idx = np.union1d( + df2.vp_idx.unique(), + df2.prior_vp_idx.unique() + ).tolist() + + usable_vp2 = usable_vp[usable_vp.vp_idx.isin(subset_vp_idx)].compute() + + usable_gdf = geography_utils.create_point_geometry( + usable_vp2, + longitude_col = "x", + latitude_col = "y", + crs = PROJECT_CRS + ).drop(columns = ["x", "y"]).reset_index(drop=True) + usable_gdf2 = segment_calcs.convert_timestamp_to_seconds( + usable_gdf, [timestamp_col]).drop(columns = timestamp_col) + + # Merge in coord for current_vp_idx + # we already have a timestamp_sec for current vp_idx + gdf = pd.merge( + usable_gdf2.drop(columns = f"{timestamp_col}_sec"), + df2, + on = "vp_idx", + how = "inner" + ) + + # Merge in coord for prior_vp_idx + gdf2 = pd.merge( + gdf, + usable_gdf2[ + ["vp_idx", f"{timestamp_col}_sec", "geometry"] + ].add_prefix("prior_"), + on = "prior_vp_idx", + how = "inner" + ) + + # should we do straight distance or interpolate against full shape? + # what if full shape is problematic? + # do we want to do a check against the scale? that's not very robust either though + + gdf2 = gdf2.assign( + straight_distance = gdf2.geometry.distance(gdf2.prior_geometry) + ) + + gdf2 = gdf2.assign( + sec_elapsed = (gdf2[f"{timestamp_col}_sec"] - + gdf2[f"prior_{timestamp_col}_sec"]).abs() + ) + + gdf2 = gdf2.assign( + speed_mph = gdf2.straight_distance.divide(gdf2.sec_elapsed) * MPH_PER_MPS + ) + + drop_cols = ["geometry", "prior_geometry"] + results = gdf2.drop(columns = drop_cols) + + return results + + def linear_referencing_and_speed_by_segment( analysis_date: str, dict_inputs: dict = {} @@ -25,6 +261,7 @@ def linear_referencing_and_speed_by_segment( """ With just enter / exit points on segments, do the linear referencing to get shape_meters, and then derive speed. + Do a second pass for low speed segments with straight distance. """ time0 = datetime.datetime.now() @@ -33,23 +270,19 @@ def linear_referencing_and_speed_by_segment( SEGMENT_IDENTIFIER_COLS = dict_inputs["segment_identifier_cols"] TIMESTAMP_COL = dict_inputs["timestamp_col"] EXPORT_FILE = dict_inputs["stage4"] + PCT_SEGMENT_MIN = dict_inputs["pct_segment_minimum"] # Keep subset of columns - don't need it all. we can get the # columns dropped through segments file vp_keep_cols = [ - 'gtfs_dataset_key', 'gtfs_dataset_name', - 'trip_id', 'trip_instance_key', - 'schedule_gtfs_dataset_key', + 'trip_instance_key', TIMESTAMP_COL, - 'x', 'y' + 'x', 'y', 'vp_idx' ] + SEGMENT_IDENTIFIER_COLS - vp = helpers.import_vehicle_positions( - SEGMENT_GCS, - f"{VP_FILE}_{analysis_date}/", - file_type = "df", - columns = vp_keep_cols, - partitioned = True + vp = dd.read_parquet( + f"{SEGMENT_GCS}{VP_FILE}_{analysis_date}", + columns = vp_keep_cols ) segments = helpers.import_segments( @@ -58,75 +291,72 @@ def linear_referencing_and_speed_by_segment( columns = SEGMENT_IDENTIFIER_COLS + ["geometry"] ).dropna(subset="geometry").reset_index(drop=True) - # https://stackoverflow.com/questions/71685387/faster-methods-to-create-geodataframe-from-a-dask-or-pandas-dataframe - # https://github.com/geopandas/dask-geopandas/issues/197 - vp_gddf = dg.from_dask_dataframe( - vp, - geometry=dg.points_from_xy(vp, "x", "y") - ).set_crs(WGS84).to_crs(PROJECT_CRS).drop(columns = ["x", "y"]) - - vp_with_seg_geom = dd.merge( - vp_gddf, + vp_with_seg_geom = linear_referencing_vp_against_line( + vp, segments, - on = SEGMENT_IDENTIFIER_COLS, - how = "inner" - ).rename(columns = { - "geometry_x": "vp_geometry", - "geometry_y": "segment_geometry"} - ).set_geometry("vp_geometry") + SEGMENT_IDENTIFIER_COLS, + TIMESTAMP_COL + ).persist() - vp_with_seg_geom = vp_with_seg_geom.repartition(npartitions=50) - time1 = datetime.datetime.now() - logger.info(f"set up merged vp with segments: {time1 - time0}") + logger.info(f"linear referencing: {time1 - time0}") - shape_meters_series = vp_with_seg_geom.map_partitions( - wrangle_shapes.project_point_geom_onto_linestring, - "segment_geometry", - "vp_geometry", - meta = ("shape_meters", "float") - ) + SEGMENT_TRIP_COLS = ["trip_instance_key", + "segment_meters"] + SEGMENT_IDENTIFIER_COLS + + initial_speeds = make_wide_get_speed( + vp_with_seg_geom, SEGMENT_TRIP_COLS, TIMESTAMP_COL + ).compute() - vp_with_seg_geom["shape_meters"] = shape_meters_series - vp_with_seg_geom = segment_calcs.convert_timestamp_to_seconds( - vp_with_seg_geom, [TIMESTAMP_COL]) time2 = datetime.datetime.now() - logger.info(f"linear referencing: {time2 - time1}") - - # set up metadata for columns in exact order output appears for map_partitions - dtypes_dict = vp_with_seg_geom[ - ["gtfs_dataset_key", "gtfs_dataset_name", - "trip_id", "trip_instance_key", - "schedule_gtfs_dataset_key" - ] + SEGMENT_IDENTIFIER_COLS - ].dtypes.to_dict() - - speeds = vp_with_seg_geom.map_partitions( - segment_calcs.calculate_speed_by_segment_trip, - SEGMENT_IDENTIFIER_COLS, - f"{TIMESTAMP_COL}_sec", - meta = { - **dtypes_dict, - "min_time": "float", - "min_dist": "float", - "max_time": "float", - "max_dist": "float", - "meters_elapsed": "float", - "sec_elapsed": "float", - "speed_mph": "float", - }) - - speeds = speeds.repartition(npartitions=2) + logger.info(f"make wide and get initial speeds: {time2 - time1}") + + ok_speeds, low_speeds = filter_for_unstable_speeds( + initial_speeds, + pct_segment_threshold = PCT_SEGMENT_MIN + ) + + low_speeds_recalculated = recalculate_low_speeds_with_straight_distance( + low_speeds, + SEGMENT_TRIP_COLS, + TIMESTAMP_COL + ) + + # Add a flag that tells us speed was recalculated + # Combine columns and rename straight distance as meters_elapsed + low_speeds_recalculated = low_speeds_recalculated.assign( + flag_recalculated = 1, + meters_elapsed = low_speeds_recalculated.straight_distance + ) + + keep_cols = SEGMENT_TRIP_COLS + [ + "vp_idx", "prior_vp_idx", + f"{TIMESTAMP_COL}_sec", f"prior_{TIMESTAMP_COL}_sec", + "meters_elapsed", + "sec_elapsed", + "pct_segment", + "speed_mph", + "flag_recalculated", + ] + + speeds = pd.concat([ + ok_speeds, + low_speeds_recalculated + ], axis=0).sort_values(SEGMENT_IDENTIFIER_COLS + ["trip_instance_key"] + ).reset_index(drop=True) + + speeds = speeds.assign( + flag_recalculated = speeds.flag_recalculated.fillna(0).astype("int8") + )[keep_cols] time3 = datetime.datetime.now() - logger.info(f"calculate speeds: {time3 - time2}") + logger.info(f"recalculate speeds and get final: {time3 - time2}") speeds.to_parquet( - f"{SEGMENT_GCS}{EXPORT_FILE}_{analysis_date}", - overwrite = True + f"{SEGMENT_GCS}{EXPORT_FILE}_{analysis_date}.parquet", ) - + if __name__ == "__main__": @@ -141,11 +371,8 @@ def linear_referencing_and_speed_by_segment( start = datetime.datetime.now() STOP_SEG_DICT = helpers.get_parameters(CONFIG_PATH, "stop_segments") - - linear_referencing_and_speed_by_segment( - analysis_date, - dict_inputs = STOP_SEG_DICT - ) + + linear_referencing_and_speed_by_segment(analysis_date, STOP_SEG_DICT) logger.info(f"speeds for stop segments: {datetime.datetime.now() - start}") logger.info(f"execution time: {datetime.datetime.now() - start}") diff --git a/rt_segment_speeds/scripts/B2_avg_speeds_by_segment.py b/rt_segment_speeds/scripts/B2_avg_speeds_by_segment.py index b06b0894b..657c9bdd8 100644 --- a/rt_segment_speeds/scripts/B2_avg_speeds_by_segment.py +++ b/rt_segment_speeds/scripts/B2_avg_speeds_by_segment.py @@ -29,7 +29,7 @@ def calculate_avg_speeds( avg = (grouped_df .agg({ "speed_mph": "median", - "trip_id": "nunique"}) + "trip_instance_key": "nunique"}) .reset_index() ) @@ -45,7 +45,7 @@ def calculate_avg_speeds( stats = pd.merge( avg.rename(columns = {"speed_mph": "p50_mph", - "trip_id": "n_trips"}), + "trip_instance_key": "n_trips"}), p20.rename(columns = {"speed_mph": "p20_mph"}), on = group_cols, how = "left" @@ -65,7 +65,6 @@ def speeds_with_segment_geom( analysis_date: str, max_speed_cutoff: int = 70, dict_inputs: dict = {}, - percent_segment_covered:float = 0.40, ) -> gpd.GeoDataFrame: """ Import the segment-trip table. @@ -90,53 +89,45 @@ def speeds_with_segment_geom( f"{SEGMENT_FILE}_{analysis_date}", columns = segment_cols_to_keep ) - - # CRS is 3310, calculate the length - segments["segment_length"] = segments.geometry.length - + # Read in speeds df = pd.read_parquet( - f"{SEGMENT_GCS}{SPEEDS_FILE}_{analysis_date}", - filters = [[("speed_mph", "<=", max_speed_cutoff)]]) + f"{SEGMENT_GCS}{SPEEDS_FILE}_{analysis_date}.parquet", + filters = [[ + ("speed_mph", "<=", max_speed_cutoff), + ("meters_elapsed", ">", 0), + ("sec_elapsed", ">", 0) + ]]) # Do a merge with segments - merge_cols = ['shape_array_key','stop_sequence','schedule_gtfs_dataset_key'] - df2 = pd.merge(segments, df, on = merge_cols, how = "inner") + df2 = pd.merge( + segments, + df, + on = SEGMENT_IDENTIFIER_COLS, + how = "inner" + ) # Keep only segments that have RT data. unique_segments = (df2[segment_cols_to_keep] .drop_duplicates() .reset_index(drop = True) - ) - - # Find percentage of meters elapsed vs. total segment length - df2 = df2.assign( - pct_seg = df2.meters_elapsed.divide(df2.segment_length) - ) - - # Filter out abnormally high and low speeds - # Threshold defaults to throwing away the bottom 20% of rows with low speeds - df3 = df2[(df2.pct_seg >= percent_segment_covered) & - (df2.speed_mph.notna()) & - (df2.sec_elapsed > 0) & - (df2.meters_elapsed > 0) - ] + ).to_crs(geography_utils.WGS84) time_of_day_df = sched_rt_utils.get_trip_time_buckets(analysis_date) - df4 = pd.merge( - df3, + df3 = pd.merge( + df2, time_of_day_df, on = "trip_instance_key", how = "inner" ) all_day = calculate_avg_speeds( - df4, + df3, SEGMENT_IDENTIFIER_COLS ) peak = calculate_avg_speeds( - df4[df4.time_of_day.isin(["AM Peak", "PM Peak"])], + df3[df3.time_of_day.isin(["AM Peak", "PM Peak"])], SEGMENT_IDENTIFIER_COLS ) @@ -145,29 +136,14 @@ def speeds_with_segment_geom( peak.assign(time_of_day = "peak") ], axis=0) - # Merge in segment geometry with a changed CRS - unique_segments = unique_segments.to_crs(geography_utils.WGS84) - - # Merge in segment geometry - segments = helpers.import_segments( - SEGMENT_GCS, - f"{SEGMENT_FILE}_{analysis_date}", - columns = SEGMENT_IDENTIFIER_COLS + [ - "schedule_gtfs_dataset_key", - "stop_id", - "loop_or_inlining", - "geometry", - "district_name" - ] - ).to_crs(geography_utils.WGS84) + # Merge in segment geometry gdf = pd.merge( unique_segments, stats, on = SEGMENT_IDENTIFIER_COLS, how = "left" - ) - + ).sort_values(SEGMENT_IDENTIFIER_COLS + ["time_of_day"]).reset_index(drop=True) return gdf @@ -187,7 +163,6 @@ def speeds_with_segment_geom( EXPORT_FILE = f'{STOP_SEG_DICT["stage5"]}_{analysis_date}' MAX_SPEED = 70 - MIN_SEGMENT_PERCENT = 0.40 # Average the speeds for segment for entire day # Drop speeds above our max cutoff @@ -195,7 +170,6 @@ def speeds_with_segment_geom( analysis_date, max_speed_cutoff = MAX_SPEED, dict_inputs = STOP_SEG_DICT, - percent_segment_covered = MIN_SEGMENT_PERCENT ) utils.geoparquet_gcs_export( diff --git a/rt_segment_speeds/scripts/B3_export.py b/rt_segment_speeds/scripts/B3_export.py index d51637287..17147560c 100644 --- a/rt_segment_speeds/scripts/B3_export.py +++ b/rt_segment_speeds/scripts/B3_export.py @@ -137,7 +137,7 @@ def finalize_df_for_export(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame: final_gdf.drop(columns = "geometry").to_parquet( f"{SEGMENT_GCS}export/{INPUT_FILE}_tabular.parquet" ) - + utils.geoparquet_gcs_export( final_gdf[keep_cols], f"{SEGMENT_GCS}export/", diff --git a/rt_segment_speeds/scripts/Makefile b/rt_segment_speeds/scripts/Makefile index 239111bc5..f81ef8ebb 100644 --- a/rt_segment_speeds/scripts/Makefile +++ b/rt_segment_speeds/scripts/Makefile @@ -4,37 +4,19 @@ segmentize: python cut_normal_stop_segments.py python cut_special_stop_segments.py python concatenate_stop_segments.py - - -get_speeds_by_segment: - python A0_preprocessing.py - python A1_sjoin_vp_segments.py - python A2_valid_vehicle_positions.py - python A3_loop_inlining.py - python A4_concatenate_vp_pared.py - python B1_speeds_by_segment_trip.py - -export_data: - python B2_avg_speeds_by_segment.py - python B3_export.py - python C2_triangulate_vp.py - python C3_trip_route_speed.py - #python C1_rt_trip_diagnostics.py - speeds_pipeline: #python A0_preprocessing.py - python A1_sjoin_vp_segments.py - python A2_valid_vehicle_positions.py - python A3_loop_inlining.py - python A4_concatenate_vp_pared.py + #python A1_sjoin_vp_segments.py + python A2_sjoin_postprocessing.py + python A3_valid_vehicle_positions.py python B1_speeds_by_segment_trip.py python B2_avg_speeds_by_segment.py python B3_export.py - python C2_triangulate_vp.py - python C3_trip_route_speed.py - + #python C2_triangulate_vp.py + #python C3_trip_route_speed.py + download_roads: #pip install esridump diff --git a/rt_segment_speeds/scripts/config.yml b/rt_segment_speeds/scripts/config.yml index 0e58d7c31..e69d68a9e 100644 --- a/rt_segment_speeds/scripts/config.yml +++ b/rt_segment_speeds/scripts/config.yml @@ -24,4 +24,5 @@ stop_segments: grouping_col: "shape_array_key" segment_identifier_cols: ["shape_array_key", "stop_sequence"] timestamp_col: "location_timestamp_local" - time_min_cutoff: 10 \ No newline at end of file + time_min_cutoff: 10 + pct_segment_minimum: 0.3 \ No newline at end of file diff --git a/rt_segment_speeds/scripts/test_split.py b/rt_segment_speeds/scripts/test_split.py deleted file mode 100644 index e6631a17e..000000000 --- a/rt_segment_speeds/scripts/test_split.py +++ /dev/null @@ -1,270 +0,0 @@ -""" -Transform df so that it is wide instead of long -prior to calculating speed. - -For segments with 2 vp, we can do this. -For segments with 1 vp...set a placeholder for how to -fill in the previous coord? - -Caveats to work into future function: -* pulling the prior vp can be from multiple segments ago -* we want to calculate distance between 2 points using shape and not segment -* the prior vp should just be vp_idx of current - 1 -* check that it falls between the bounds of a trip's min_vp_idx and max_vp_idx -""" -import dask.dataframe as dd -import dask_geopandas as dg -import geopandas as gpd -import pandas as pd - -from segment_speed_utils import helpers, segment_calcs, wrangle_shapes -from segment_speed_utils.project_vars import (SEGMENT_GCS, analysis_date, - CONFIG_PATH, PROJECT_CRS) -from shared_utils.geography_utils import WGS84 - -def get_prior_position_on_segment( - df: pd.DataFrame, - segment_identifier_cols: list, - time_col: str, -) -> gpd.GeoDataFrame: - """ - Get the prior vp on the segment. - If a segment has 2 points, this will fill it in with a value. - If it has 1 point, it returns NaN, so we will have to subset - to those rows and fix those separately. - """ - segment_trip_cols = ["trip_instance_key"] + segment_identifier_cols - - obs_per_segment_trip = ( - df.groupby(segment_trip_cols, - observed=True, group_keys=False) - .agg({"vp_idx": "count"}) - .reset_index() - .rename(columns = {"vp_idx": "n_vp_seg"}) - ) - - df2 = pd.merge( - df, - obs_per_segment_trip, - on = segment_trip_cols, - how = "inner" - ).sort_values( - segment_trip_cols + ["vp_idx"] - ).reset_index(drop=True) - - df2 = df2.assign( - prior_vp_idx = (df2.groupby(segment_trip_cols, - observed=True, group_keys=False) - .vp_idx - .shift(1) - ) - ) - - df2 = df2.assign( - prior_vp_idx = df2.prior_vp_idx.fillna(df2.vp_idx - 1).astype(int) - ) - - return df2 - - -def get_usable_vp_bounds_by_trip(df: dd.DataFrame) -> pd.DataFrame: - """ - Of all the usable vp, for each trip, find the min(vp_idx) - and max(vp_idx). - For the first stop, there will never be a previous vp to find, - because the previous vp_idx will belong to a different operator/trip. - But for segments in the middle of the shape, the previous vp can be anywhere, - maybe several segments away. - """ - - grouped_df = df.groupby("trip_instance_key", - observed=True, group_keys=False) - - start_vp = (grouped_df.vp_idx.min().reset_index() - .rename(columns = {"vp_idx": "min_vp_idx"}) - ) - end_vp = (grouped_df.vp_idx.max().reset_index() - .rename(columns = {"vp_idx": "max_vp_idx"}) - ) - - df2 = dd.merge( - start_vp, - end_vp, - on = "trip_instance_key", - how = "left" - ).reset_index(drop=True).compute() - - return df2 - - -def put_all_together( - analysis_date: str, - dict_inputs: dict = {} -): - USABLE_VP = dict_inputs["stage1"] - INPUT_FILE = dict_inputs["stage3"] - SEGMENT_FILE = dict_inputs["segments_file"] - SEGMENT_IDENTIFIER_COLS = dict_inputs["segment_identifier_cols"] - GROUPING_COL = dict_inputs["grouping_col"] - TIMESTAMP_COL = dict_inputs["timestamp_col"] - - # Import usable vp, which we'll use later for the x, y and timestamp - usable_vp = dd.read_parquet( - f"{SEGMENT_GCS}{USABLE_VP}_{analysis_date}", - columns = ["trip_instance_key", "vp_idx", TIMESTAMP_COL, "x", "y"] - ) - vp_idx_bounds = get_usable_vp_bounds_by_trip(usable_vp) - - # Start from pared down vp - df = pd.read_parquet( - f"{SEGMENT_GCS}vp_pare_down/{INPUT_FILE}_all_{analysis_date}", - columns = SEGMENT_IDENTIFIER_COLS + ["trip_instance_key", "vp_idx"] - ) - - # Make sure all segments have 2 points - # If it doesn't, fill it in with the previous vp_idx - df2 = get_prior_position_on_segment( - df, - SEGMENT_IDENTIFIER_COLS, - TIMESTAMP_COL - ) - - # Check that the previous vp_idx actually occurs on the same trip - df3 = pd.merge( - df2, - vp_idx_bounds, - on = "trip_instance_key", - how = "inner" - ) - - # For the first segment, if we only have 1 vp, we can't find a previous point - # We'll use the next point then. - # but make sure that we never use a point outside of that trip - # later, we will have to use absolute value of difference in shape_meters - # since distance must be positive - df3 = df3.assign( - prior_vp_idx = df3.apply( - lambda x: - x.vp_idx + 1 if (x.prior_vp_idx < x.min_vp_idx) and - (x.vp_idx + 1 <= x.max_vp_idx) - else x.prior_vp_idx, - axis=1) - ).drop(columns = ["trip_instance_key", "min_vp_idx", "max_vp_idx"]) - - # Merge in the timestamp and x, y coords - df_with_xy = dd.merge( - usable_vp, - df3, - on = "vp_idx", - how = "inner" - ) - - # Merge again to get timestamp and x, y coords of previous point - usable_vp2 = usable_vp.rename( - columns = { - "vp_idx": "prior_vp_idx", - TIMESTAMP_COL: f"prior_{TIMESTAMP_COL}", - "x": "prior_x", - "y": "prior_y", - } - ).drop(columns = "trip_instance_key") - - df_with_prior_xy = dd.merge( - df_with_xy, - usable_vp2, - on = "prior_vp_idx", - how = "inner" - ).compute() - - gdf = gpd.GeoDataFrame( - df_with_prior_xy, - geometry = gpd.points_from_xy(df_with_prior_xy.x, df_with_prior_xy.y), - crs = WGS84 - ).to_crs(PROJECT_CRS).drop(columns = ["x", "y"]) - - gdf2 = gdf.assign( - prior_geometry = gpd.points_from_xy( - gdf.prior_x, gdf.prior_y, crs = WGS84 - ).to_crs(PROJECT_CRS) - ).drop(columns = ["prior_x", "prior_y"]).set_geometry("geometry") - - return gdf2 - - - - -if __name__ == "__main__": - - - - gdf = gdf.assign( - prior_time = gdf.prior_time.fillna( - gdf.groupby("trip_instance_key", - observed=True, group_keys=False) - [time_col] - .shift(1) - ), - prior_coord = gdf.geometry.fillna( - gdf.groupby("trip_instance_key", - observed=True, group_keys=False) - .geometry - .shift(1) - ), - ).rename(columns = {"geometry": "vp_geometry"}) - - - segments = gpd.read_parquet( - f"{SEGMENT_GCS}stop_segments_{analysis_date}.parquet", - columns = SEGMENT_IDENTIFIER_COLS + ["geometry"] - ).rename(columns = {"geometry": "segment_geometry"}) - - two_obs_in_seg_gdf = pd.merge( - two_obs_in_seg, - segments, - on = SEGMENT_IDENTIFIER_COLS, - how = "inner" - ) - - two_obs_in_seg_gdf = dg.from_geopandas(two_obs_in_seg_gdf, npartitions=10 - ).set_geometry("vp_geometry") - - shapes_to_keep = one_obs_in_seg_one_outside.shape_array_key.unique().tolist() - - shapes = helpers.import_scheduled_shapes( - analysis_date, - columns = ["shape_array_key", "geometry"], - filters = [[("shape_array_key", "in", shapes_to_keep)]], - get_pandas = True, - crs = "EPSG:3310" - ).rename(columns = {"geometry": "shape_geometry"}) - - one_obs_in_seg_one_outside_gdf = pd.merge( - one_obs_in_seg_one_outside, - shapes, - on = "shape_array_key", - how = "inner" - ) - - shape_meters_series = two_obs_in_seg_gdf.map_partitions( - wrangle_shapes.project_point_geom_onto_linestring( - "segment_geometry", - "vp_geometry", - meta = ("shape_meters", "float") - )) - - two_obs_in_seg_gdf["shape_meters"] = shape_meters_series - - two_obs_in_seg_gdf = two_obs_in_seg_gdf.repartition(npartitions=5) - two_obs_in_seg_gdf.to_parquet("two_seg_test") - - shape_meters_series2 =wrangle_shapes.project_point_geom_onto_linestring( - one_obs_in_seg_one_outside_gdf, - "shape_geometry", - "vp_geometry", - #meta = ("shape_meters", "float") - ) - - one_obs_in_seg_one_outside_gdf["shape_meters"] = shape_meters_series2 - one_obs_in_seg_one_outside_gdf.to_parquet("one_seg_test.parquet") - - \ No newline at end of file diff --git a/rt_segment_speeds/segment_speed_utils/segment_calcs.py b/rt_segment_speeds/segment_speed_utils/segment_calcs.py index dfe7597ca..0befce432 100644 --- a/rt_segment_speeds/segment_speed_utils/segment_calcs.py +++ b/rt_segment_speeds/segment_speed_utils/segment_calcs.py @@ -59,8 +59,8 @@ def keep_min_max_timestamps_by_segment( def derive_speed( df: pd.DataFrame, - distance_cols: tuple = ("min_dist", "max_dist"), - time_cols: tuple = ("min_time", "max_time") + distance_cols: tuple = ("prior_shape_meters", "shape_meters"), + time_cols: tuple = ("prior_location_timestamp_local_sec", "location_timestamp_local_sec") ) -> pd.DataFrame: """ Derive meters and sec elapsed to calculate speed_mph. @@ -69,19 +69,19 @@ def derive_speed( min_time, max_time = time_cols[0], time_cols[1] df = df.assign( - meters_elapsed = df[max_dist] - df[min_dist] + meters_elapsed = (df[max_dist] - df[min_dist]).abs() ) if df[min_time].dtype in ["float", "int"]: # If 2 time cols are already converted to seconds, just take difference df = df.assign( - sec_elapsed = (df[max_time] - df[min_time]) + sec_elapsed = (df[max_time] - df[min_time]).abs() ) else: # If 2 time cols are datetime, convert timedelta to seconds df = df.assign( sec_elapsed = (df[max_time] - df[min_time]).divide( - np.timedelta64(1, 's')), + np.timedelta64(1, 's')).abs(), ) df = df.assign( @@ -176,4 +176,34 @@ def derive_stop_delay( actual_minus_scheduled_sec = df[actual] - df[scheduled] ) - return df \ No newline at end of file + return df + + +def get_usable_vp_bounds_by_trip(df: dd.DataFrame) -> pd.DataFrame: + """ + Of all the usable vp, for each trip, find the min(vp_idx) + and max(vp_idx). + For the first stop, there will never be a previous vp to find, + because the previous vp_idx will belong to a different operator/trip. + But for segments in the middle of the shape, the previous vp can be anywhere, + maybe several segments away. + """ + + grouped_df = df.groupby("trip_instance_key", + observed=True, group_keys=False) + + start_vp = (grouped_df.vp_idx.min().reset_index() + .rename(columns = {"vp_idx": "min_vp_idx"}) + ) + end_vp = (grouped_df.vp_idx.max().reset_index() + .rename(columns = {"vp_idx": "max_vp_idx"}) + ) + + df2 = dd.merge( + start_vp, + end_vp, + on = "trip_instance_key", + how = "left" + ).reset_index(drop=True).compute() + + return df2