From 13fc6acfc04a7345ed9b5c8745a954282323c627 Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Wed, 27 Nov 2024 18:29:53 +0000 Subject: [PATCH 1/7] (remove): vp_nn and rework nearest neighbor to consolidate subsetting --- gtfs_funnel/vp_condenser.py | 57 +----------- .../segment_speed_utils/neighbor.py | 93 +++++++++++++++++-- .../segment_speed_utils/vp_transform.py | 3 +- 3 files changed, 87 insertions(+), 66 deletions(-) diff --git a/gtfs_funnel/vp_condenser.py b/gtfs_funnel/vp_condenser.py index 2ec815b24..80e06ea1f 100644 --- a/gtfs_funnel/vp_condenser.py +++ b/gtfs_funnel/vp_condenser.py @@ -58,49 +58,6 @@ def condense_vp_to_linestring( return -def prepare_vp_for_all_directions( - analysis_date: str, - dict_inputs: dict -) -> gpd.GeoDataFrame: - """ - For each direction, exclude one the opposite direction and - save out the arrays of valid indices. - Every trip will have 4 rows, 1 row corresponding to each direction. - - Ex: for a given trip's northbound points, exclude southbound vp. - Subset vp_idx, location_timestamp_local and coordinate arrays - to exclude southbound. - """ - INPUT_FILE = dict_inputs.speeds_tables.vp_condensed_line - EXPORT_FILE = dict_inputs.speeds_tables.vp_nearest_neighbor - - vp = delayed(gpd.read_parquet)( - f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}.parquet", - ) - - dfs = [ - delayed(vp_transform.combine_valid_vp_for_direction)( - vp, direction) - for direction in vp_transform.ALL_DIRECTIONS - ] - - results = [compute(i)[0] for i in dfs] - - gdf = pd.concat( - results, axis=0, ignore_index=True - ).sort_values( - ["trip_instance_key", "vp_primary_direction"] - ).reset_index(drop=True) - - utils.geoparquet_gcs_export( - gdf, - SEGMENT_GCS, - f"{EXPORT_FILE}_{analysis_date}" - ) - - return - - if __name__ == "__main__": from update_vars import analysis_date_list @@ -116,17 +73,9 @@ def prepare_vp_for_all_directions( condense_vp_to_linestring(analysis_date, GTFS_DATA_DICT) - time1 = datetime.datetime.now() + end = datetime.datetime.now() logger.info( f"{analysis_date}: condense vp for trip " - f"{time1 - start}" - ) - - prepare_vp_for_all_directions(analysis_date, GTFS_DATA_DICT) - - end = datetime.datetime.now() - logger.info( - f"{analysis_date}: prepare vp to use in nearest neighbor: " - f"{end - time1}" - ) \ No newline at end of file + f"{end - start}" + ) \ No newline at end of file diff --git a/rt_segment_speeds/segment_speed_utils/neighbor.py b/rt_segment_speeds/segment_speed_utils/neighbor.py index 54ee5d8d4..4a3e9a75a 100644 --- a/rt_segment_speeds/segment_speed_utils/neighbor.py +++ b/rt_segment_speeds/segment_speed_utils/neighbor.py @@ -7,7 +7,7 @@ import shapely from calitp_data_analysis.geography_utils import WGS84 -from segment_speed_utils import gtfs_schedule_wrangling +from segment_speed_utils import gtfs_schedule_wrangling, vp_transform from segment_speed_utils.project_vars import SEGMENT_GCS, GTFS_DATA_DICT from shared_utils import geo_utils @@ -31,7 +31,7 @@ def merge_stop_vp_for_nearest_neighbor( analysis_date: str, **kwargs ) -> gpd.GeoDataFrame: - VP_NN = GTFS_DATA_DICT.speeds_tables.vp_nearest_neighbor + VP_NN = GTFS_DATA_DICT.speeds_tables.vp_condensed_line vp_condensed = gpd.read_parquet( f"{SEGMENT_GCS}{VP_NN}_{analysis_date}.parquet", @@ -43,20 +43,54 @@ def merge_stop_vp_for_nearest_neighbor( gdf = pd.merge( stop_times.rename( - columns = { - "geometry": "stop_geometry"} + columns = {"geometry": "stop_geometry"} ).set_geometry("stop_geometry").to_crs(WGS84), vp_condensed.rename( columns = { - "vp_primary_direction": "stop_primary_direction", "geometry": "vp_geometry" }), - on = ["trip_instance_key", "stop_primary_direction"], + on = "trip_instance_key", how = "inner" ) return gdf +def subset_arrays_to_valid_directions( + vp_direction_array: np.ndarray, + vp_geometry: shapely.LineString, + vp_idx_array: np.ndarray, + stop_geometry: shapely.Point, + stop_direction: str, +) -> np.ndarray: + """ + """ + opposite_direction = vp_transform.OPPOSITE_DIRECTIONS[stop_direction] + + # These are the valid index values where opposite direction + # is excluded + valid_indices = (vp_direction_array != opposite_direction).nonzero() + + vp_coords_line = shapely.LineString( + np.array(vp_geometry.coords)[valid_indices] + ) + vp_idx_arr = np.asarray(vp_idx_array)[valid_indices] + + np_inds = geo_utils.nearest_snap( + vp_coords_line, stop_geometry, N_NEAREST_POINTS + ) + + # nearest neighbor returns self.N + # if there are no nearest neighbor results found + # if we want 10 nearest neighbors and 8th, 9th, 10th are all + # the same result, the 8th will have a result, then 9th and 10th will + # return the length of the array (which is out-of-bounds) + + np_inds2 = np_inds[np_inds < vp_idx_arr.size] + + nearest_vp_arr = vp_idx_arr[np_inds2] + + return nearest_vp_arr + def add_nearest_neighbor_result_array( gdf: gpd.GeoDataFrame, @@ -71,10 +105,24 @@ def add_nearest_neighbor_result_array( nearest_vp_arr_series = [] for row in gdf.itertuples(): - vp_coords_line = getattr(row, "vp_geometry") - stop_geometry = getattr(row, "stop_geometry") - vp_idx_arr = getattr(row, "vp_idx") + # These are the ones that do not run in the + # opposite direction as stop_primary_direction + # Ex: if stop1 to stop2 is going eastbound, + # we do not allow westbound traveling vp + vp_coords_line = getattr(row, "vp_primary_direction") + stop_direction = getattr(row, "stop_primary_direction") + opposite_direction = vp_transform.OPPOSITE_DIRECTIONS[stop_direction] + + # These are the valid index values where opposite direction + # is excluded + valid_indices = (vp_coords_line != opposite_direction).nonzero() + + vp_coords_line = np.array(getattr(row, "vp_geometry").coords)[valid_indices] + vp_coords_line = shapely.LineString(vp_coords_line) + stop_geometry = getattr(row, "stop_geometry") + vp_idx_arr = np.asarray(getattr(row, "vp_idx"))[valid_indices] + np_inds = geo_utils.nearest_snap( vp_coords_line, stop_geometry, N_NEAREST_POINTS ) @@ -93,6 +141,31 @@ def add_nearest_neighbor_result_array( gdf2 = gdf.assign( nearest_vp_arr = nearest_vp_arr_series - ).drop(columns = ["vp_idx", "vp_geometry"]) + ).drop(columns = ["vp_primary_direction", "vp_idx", "vp_geometry"]) + + return gdf2 + +def add_nearest_neighbor_result_array2( + gdf: gpd.GeoDataFrame, + analysis_date: str, + **kwargs +) -> pd.DataFrame: + """ + Add the nearest k_neighbors result. + """ + N_NEAREST_POINTS = 10 + + nearest_vp_arr = np.vectorize( + subset_arrays_to_valid_directions)( + gdf.vp_primary_direction, + gdf.vp_geometry, + gdf.vp_idx, + gdf.stop_geometry, + gdf.stop_primary_direction, + ) + + gdf2 = gdf.assign( + nearest_vp_arr = nearest_vp_arr_series + ).drop(columns = ["vp_primary_direction", "vp_idx", "vp_geometry"]) return gdf2 \ No newline at end of file diff --git a/rt_segment_speeds/segment_speed_utils/vp_transform.py b/rt_segment_speeds/segment_speed_utils/vp_transform.py index 154a5b040..3a3703f92 100644 --- a/rt_segment_speeds/segment_speed_utils/vp_transform.py +++ b/rt_segment_speeds/segment_speed_utils/vp_transform.py @@ -86,8 +86,7 @@ def combine_valid_vp_for_direction( for row in vp_condensed.itertuples(): vp_dir_arr = np.asarray(getattr(row, "vp_primary_direction")) - # These are the valid index values where opposite direction - # is excluded + valid_indices = (vp_dir_arr != opposite_direction).nonzero() # Subset all the other arrays to these indices From 753237a86d7e2027907cb18d6664b5fead5dcfdf Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Wed, 27 Nov 2024 21:51:31 +0000 Subject: [PATCH 2/7] refactor nearest neighbor to use vp_condensed --- _shared_utils/shared_utils/geo_utils.py | 9 +- rt_segment_speeds/42_switch_vp_nn_file.ipynb | 493 ++++++++++++++++++ rt_segment_speeds/logs/nearest_vp.log | 1 + rt_segment_speeds/scripts/test_new_vp_near.py | 98 ++++ .../segment_speed_utils/neighbor.py | 86 +-- .../segment_speed_utils/vp_transform.py | 71 +-- 6 files changed, 622 insertions(+), 136 deletions(-) create mode 100644 rt_segment_speeds/42_switch_vp_nn_file.ipynb create mode 100644 rt_segment_speeds/scripts/test_new_vp_near.py diff --git a/_shared_utils/shared_utils/geo_utils.py b/_shared_utils/shared_utils/geo_utils.py index c030f1fd7..26ef4c0cb 100644 --- a/_shared_utils/shared_utils/geo_utils.py +++ b/_shared_utils/shared_utils/geo_utils.py @@ -1,6 +1,8 @@ """ Geospatial utility functions """ +from typing import Union + import geopandas as gpd import numpy as np import pandas as pd @@ -17,13 +19,16 @@ geo_const_miles = 3_959_000 * np.pi / 180 -def nearest_snap(line: shapely.LineString, point: shapely.Point, k_neighbors: int = 1) -> np.ndarray: +def nearest_snap(line: Union[shapely.LineString, np.ndarray], point: shapely.Point, k_neighbors: int = 1) -> np.ndarray: """ Based off of this function, but we want to return the index value, rather than the point. https://github.com/UTEL-UIUC/gtfs_segments/blob/main/gtfs_segments/geom_utils.py """ - line = np.asarray(line.coords) + if isinstance(line, shapely.LineString): + line = np.asarray(line.coords) + elif isinstance(line, np.ndarray): + line = line point = np.asarray(point.coords) tree = KDTree(line) diff --git a/rt_segment_speeds/42_switch_vp_nn_file.ipynb b/rt_segment_speeds/42_switch_vp_nn_file.ipynb new file mode 100644 index 000000000..bb4d06584 --- /dev/null +++ b/rt_segment_speeds/42_switch_vp_nn_file.ipynb @@ -0,0 +1,493 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "ce9e058e-c60e-4f4f-ab10-0ff543008e4e", + "metadata": {}, + "source": [ + "## vp_condenser...no direction\n", + "\n", + "Let's see if we can get vp_condensed version working with nearest neighbor.\n", + "\n", + "We want to look for only the valid directions and do nearest snap, and correctly index back into the whole linestring.\n", + "\n", + "If done correctly, can get an entire function removed in `gtfs_funnel`\n", + "and have different starting point in `rt_segment_speeds` for `nearest_vp_to_stop`.\n", + "\n", + "\n", + "Things to update:\n", + "1. remove vp_nn from `gtfs_funnel`\n", + "2. In `vp_transform`, use vp_condensed_line, remove merging on vp_primary_direction\n", + "3. Re-jig the function to subset for valid indices first. But we need to add back all the columns we need at the end of nearest_vp_to_stop.\n", + "3a. maybe if the function for nearest_snap only takes shapely, we can coerce any arrays into that \n", + "4. nearest_vp_to_stop has very sparse columns" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "6ad4dd53-bcdc-4fbf-ab6b-5073101086e1", + "metadata": {}, + "outputs": [], + "source": [ + "import geopandas as gpd\n", + "import pandas as pd\n", + "\n", + "from update_vars import SEGMENT_GCS, GTFS_DATA_DICT\n", + "from shared_utils import rt_dates" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "6c2300f5-f6d2-4927-866a-4df88b983e73", + "metadata": {}, + "outputs": [], + "source": [ + "dict_inputs = GTFS_DATA_DICT[\"stop_segments\"]\n", + "analysis_date = rt_dates.DATES[\"oct2024\"]" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "c00e4e3f-2ae8-4f9c-b370-f8f835c2d591", + "metadata": {}, + "outputs": [], + "source": [ + "file = dict_inputs[\"stage2\"]\n", + "df1 = pd.read_parquet(\n", + " f\"{SEGMENT_GCS}{file}_{analysis_date}.parquet\")\n", + "\n", + "df2 = pd.read_parquet(\n", + " f\"{SEGMENT_GCS}{file}_{analysis_date}_test.parquet\")" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "505ce5df-c2e7-4f78-8d6c-f718a7860a36", + "metadata": {}, + "outputs": [], + "source": [ + "df = pd.merge(\n", + " df1,\n", + " df2,\n", + " on = [\"trip_instance_key\", \"stop_sequence\", \"shape_array_key\", \"stop_geometry\"],\n", + " how = \"inner\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "54488a53-ee01-458e-98f2-aa6e27718a84", + "metadata": {}, + "outputs": [], + "source": [ + "df = df.assign(\n", + " different = df.apply(\n", + " lambda x: True if set(x.nearest_vp_arr_x) != set(x.nearest_vp_arr_y) \n", + " else False, axis=1\n", + " )\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "id": "bb57c231-9eb8-462d-a7ff-4cef3f350709", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "False 2873531\n", + "True 11\n", + "Name: different, dtype: int64" + ] + }, + "execution_count": 23, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df.different.value_counts()" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "id": "05a8d0cd-f76b-4b8c-891a-5898ce4e5804", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "0.9999961719717338" + ] + }, + "execution_count": 25, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "2873531/(2873531+11)" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "id": "3afd3141-82e9-4b20-8415-330be3f72e53", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "3.828028266160717e-06" + ] + }, + "execution_count": 26, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "11/(2873531+11)" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "id": "4dfb832c-0d34-429e-af74-61a3466af9b1", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
trip_instance_keystop_sequenceshape_array_keystop_geometrynearest_vp_arr_xnearest_vp_arr_ydifferent
2647784446add580d803889d500434f9ece4e7632ad7711dbb909b690ee6c2a00fd96219eb'\\x01\\x01\\x00\\x00\\x00\\xe0Jvl\\x04*^\\xc0d\\x92\\x...[5638440, 5638441, 5638439, 5638442, 5638435, ...[5638440, 5638441, 5638439, 5638442, 5638435, ...True
264779253daf28e5f0e5af189abbc99b3fe8e5332ad7711dbb909b690ee6c2a00fd96219eb'\\x01\\x01\\x00\\x00\\x00\\xe0Jvl\\x04*^\\xc0d\\x92\\x...[5640985, 5640986, 5640984, 5640975, 5640983, ...[5640985, 5640986, 5640984, 5640975, 5640983, ...True
2647800553ec9026070f40c8487751635a8ccfe32ad7711dbb909b690ee6c2a00fd96219eb'\\x01\\x01\\x00\\x00\\x00\\xe0Jvl\\x04*^\\xc0d\\x92\\x...[5640664, 5640665, 5640666, 5640663, 5640662, ...[5640664, 5640665, 5640666, 5640663, 5640662, ...True
264782671a4480b5784014caaf9b21e0328e94e32ad7711dbb909b690ee6c2a00fd96219eb'\\x01\\x01\\x00\\x00\\x00\\xe0Jvl\\x04*^\\xc0d\\x92\\x...[5640483, 5640482, 5640484, 5640481, 5640477, ...[5640483, 5640482, 5640484, 5640501, 5640481, ...True
26478427bba7220b0cd77563f2a6c9c82ad976932ad7711dbb909b690ee6c2a00fd96219eb'\\x01\\x01\\x00\\x00\\x00\\xe0Jvl\\x04*^\\xc0d\\x92\\x...[5639651, 5639652, 5639650, 5639649, 5639653, ...[5639651, 5639652, 5639650, 5639649, 5639653, ...True
26478507cfa9724d2b8274d633ab3dfb21a7f8d32ad7711dbb909b690ee6c2a00fd96219eb'\\x01\\x01\\x00\\x00\\x00\\xe0Jvl\\x04*^\\xc0d\\x92\\x...[5639970, 5639971, 5639972, 5639969, 5639966, ...[5639970, 5639971, 5639972, 5639969, 5639966, ...True
26478589410c17c4f154ae3aa4c25bd98096de632ad7711dbb909b690ee6c2a00fd96219eb'\\x01\\x01\\x00\\x00\\x00\\xe0Jvl\\x04*^\\xc0d\\x92\\x...[5640815, 5640816, 5640814, 5640817, 5640813, ...[5640815, 5640816, 5640814, 5640817, 5640813, ...True
2647874a1cba44baf1f12ca2c06464374ff527232ad7711dbb909b690ee6c2a00fd96219eb'\\x01\\x01\\x00\\x00\\x00\\xe0Jvl\\x04*^\\xc0d\\x92\\x...[5640140, 5640141, 5640139, 5640142, 5640136, ...[5640140, 5640141, 5640139, 5640142, 5640136, ...True
2647904d68e3b7a03a0c31ca8efe8941d74888c32ad7711dbb909b690ee6c2a00fd96219eb'\\x01\\x01\\x00\\x00\\x00\\xe0Jvl\\x04*^\\xc0d\\x92\\x...[5639810, 5639809, 5639811, 5639808, 5639804, ...[5639810, 5639809, 5639811, 5639808, 5639804, ...True
2648084d94d1eb5a31337b8f938aeaf50b967e032ad7711dbb909b690ee6c2a00fd96219eb'\\x01\\x01\\x00\\x00\\x00\\xe0Jvl\\x04*^\\xc0d\\x92\\x...[5638755, 5638754, 5638756, 5638753, 5638750, ...[5638755, 5638754, 5638756, 5638753, 5638750, ...True
26481367066ce97630ef8ba6666c19d2f0fbcf128ad7711dbb909b690ee6c2a00fd96219eb'\\x01\\x01\\x00\\x00\\x00+oG8-*^\\xc0\\x93\\x9b\\xe1\\...[5639106, 5639107, 5639115, 5639105, 5639104, ...[5639106, 5639107, 5639115, 5639105, 5639116, ...True
\n", + "
" + ], + "text/plain": [ + " trip_instance_key stop_sequence \\\n", + "2647784 446add580d803889d500434f9ece4e76 32 \n", + "2647792 53daf28e5f0e5af189abbc99b3fe8e53 32 \n", + "2647800 553ec9026070f40c8487751635a8ccfe 32 \n", + "2647826 71a4480b5784014caaf9b21e0328e94e 32 \n", + "2647842 7bba7220b0cd77563f2a6c9c82ad9769 32 \n", + "2647850 7cfa9724d2b8274d633ab3dfb21a7f8d 32 \n", + "2647858 9410c17c4f154ae3aa4c25bd98096de6 32 \n", + "2647874 a1cba44baf1f12ca2c06464374ff5272 32 \n", + "2647904 d68e3b7a03a0c31ca8efe8941d74888c 32 \n", + "2648084 d94d1eb5a31337b8f938aeaf50b967e0 32 \n", + "2648136 7066ce97630ef8ba6666c19d2f0fbcf1 28 \n", + "\n", + " shape_array_key \\\n", + "2647784 ad7711dbb909b690ee6c2a00fd96219e \n", + "2647792 ad7711dbb909b690ee6c2a00fd96219e \n", + "2647800 ad7711dbb909b690ee6c2a00fd96219e \n", + "2647826 ad7711dbb909b690ee6c2a00fd96219e \n", + "2647842 ad7711dbb909b690ee6c2a00fd96219e \n", + "2647850 ad7711dbb909b690ee6c2a00fd96219e \n", + "2647858 ad7711dbb909b690ee6c2a00fd96219e \n", + "2647874 ad7711dbb909b690ee6c2a00fd96219e \n", + "2647904 ad7711dbb909b690ee6c2a00fd96219e \n", + "2648084 ad7711dbb909b690ee6c2a00fd96219e \n", + "2648136 ad7711dbb909b690ee6c2a00fd96219e \n", + "\n", + " stop_geometry \\\n", + "2647784 b'\\x01\\x01\\x00\\x00\\x00\\xe0Jvl\\x04*^\\xc0d\\x92\\x... \n", + "2647792 b'\\x01\\x01\\x00\\x00\\x00\\xe0Jvl\\x04*^\\xc0d\\x92\\x... \n", + "2647800 b'\\x01\\x01\\x00\\x00\\x00\\xe0Jvl\\x04*^\\xc0d\\x92\\x... \n", + "2647826 b'\\x01\\x01\\x00\\x00\\x00\\xe0Jvl\\x04*^\\xc0d\\x92\\x... \n", + "2647842 b'\\x01\\x01\\x00\\x00\\x00\\xe0Jvl\\x04*^\\xc0d\\x92\\x... \n", + "2647850 b'\\x01\\x01\\x00\\x00\\x00\\xe0Jvl\\x04*^\\xc0d\\x92\\x... \n", + "2647858 b'\\x01\\x01\\x00\\x00\\x00\\xe0Jvl\\x04*^\\xc0d\\x92\\x... \n", + "2647874 b'\\x01\\x01\\x00\\x00\\x00\\xe0Jvl\\x04*^\\xc0d\\x92\\x... \n", + "2647904 b'\\x01\\x01\\x00\\x00\\x00\\xe0Jvl\\x04*^\\xc0d\\x92\\x... \n", + "2648084 b'\\x01\\x01\\x00\\x00\\x00\\xe0Jvl\\x04*^\\xc0d\\x92\\x... \n", + "2648136 b'\\x01\\x01\\x00\\x00\\x00+oG8-*^\\xc0\\x93\\x9b\\xe1\\... \n", + "\n", + " nearest_vp_arr_x \\\n", + "2647784 [5638440, 5638441, 5638439, 5638442, 5638435, ... \n", + "2647792 [5640985, 5640986, 5640984, 5640975, 5640983, ... \n", + "2647800 [5640664, 5640665, 5640666, 5640663, 5640662, ... \n", + "2647826 [5640483, 5640482, 5640484, 5640481, 5640477, ... \n", + "2647842 [5639651, 5639652, 5639650, 5639649, 5639653, ... \n", + "2647850 [5639970, 5639971, 5639972, 5639969, 5639966, ... \n", + "2647858 [5640815, 5640816, 5640814, 5640817, 5640813, ... \n", + "2647874 [5640140, 5640141, 5640139, 5640142, 5640136, ... \n", + "2647904 [5639810, 5639809, 5639811, 5639808, 5639804, ... \n", + "2648084 [5638755, 5638754, 5638756, 5638753, 5638750, ... \n", + "2648136 [5639106, 5639107, 5639115, 5639105, 5639104, ... \n", + "\n", + " nearest_vp_arr_y different \n", + "2647784 [5638440, 5638441, 5638439, 5638442, 5638435, ... True \n", + "2647792 [5640985, 5640986, 5640984, 5640975, 5640983, ... True \n", + "2647800 [5640664, 5640665, 5640666, 5640663, 5640662, ... True \n", + "2647826 [5640483, 5640482, 5640484, 5640501, 5640481, ... True \n", + "2647842 [5639651, 5639652, 5639650, 5639649, 5639653, ... True \n", + "2647850 [5639970, 5639971, 5639972, 5639969, 5639966, ... True \n", + "2647858 [5640815, 5640816, 5640814, 5640817, 5640813, ... True \n", + "2647874 [5640140, 5640141, 5640139, 5640142, 5640136, ... True \n", + "2647904 [5639810, 5639809, 5639811, 5639808, 5639804, ... True \n", + "2648084 [5638755, 5638754, 5638756, 5638753, 5638750, ... True \n", + "2648136 [5639106, 5639107, 5639115, 5639105, 5639116, ... True " + ] + }, + "execution_count": 24, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df[df.different==True]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b6efaa60-12a4-4f51-86fd-efe39d7099d0", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "96d87666-9c25-4692-a6c5-c4ce90e02679", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ac571393-c54e-4c51-bb20-6993a55f961b", + "metadata": {}, + "outputs": [], + "source": [ + "def check_value(gdf: gpd.GeoDataFrame, x):\n", + " one_direction_arr = gdf.vp_primary_direction.iloc[x]\n", + " one_stop_direction = gdf.stop_primary_direction.iloc[x]\n", + " one_near_vp_arr = gdf.nearest_vp_arr.iloc[x]\n", + " one_orig_vp_arr = gdf.vp_idx.iloc[x]\n", + "\n", + " for i in one_near_vp_arr:\n", + " this_index = np.where(one_orig_vp_arr == i)[0]\n", + " this_direction = one_direction_arr[this_index]\n", + " print(one_stop_direction, this_index, this_direction)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "73b1ed84-1225-4a5a-9dc2-373ee0011abe", + "metadata": {}, + "outputs": [], + "source": [ + "check_value(gdf2, 3)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6af227e4-1e67-4498-90a5-af4a4a78b4d3", + "metadata": {}, + "outputs": [], + "source": [ + "check_value(gdf2, 10)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e798f140-5f95-4a85-8290-906f07795480", + "metadata": {}, + "outputs": [], + "source": [ + "check_value(gdf2, 64)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a81c9ab7-8165-45c4-9479-dd8a15bf8655", + "metadata": {}, + "outputs": [], + "source": [ + "check_value(gdf2, 1_000)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c9f91d2b-0010-43c9-bcec-564414ac85c5", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/rt_segment_speeds/logs/nearest_vp.log b/rt_segment_speeds/logs/nearest_vp.log index 8c02fb8b1..56c0d1d14 100644 --- a/rt_segment_speeds/logs/nearest_vp.log +++ b/rt_segment_speeds/logs/nearest_vp.log @@ -270,3 +270,4 @@ 2024-11-25 20:41:58.771 | INFO | vp_around_stops:filter_to_nearest_two_vp:248 - nearest 2 vp for rt_stop_times 2024-07-17: 0:09:44.129455 2024-11-25 21:00:54.275 | INFO | nearest_vp_to_stop:nearest_neighbor_for_stop:178 - nearest neighbor for speedmap_segments 2024-07-17: 0:02:16.592519 2024-11-25 21:04:16.776 | INFO | vp_around_stops:filter_to_nearest_two_vp:248 - nearest 2 vp for speedmap_segments 2024-07-17: 0:03:22.103136 +2024-11-27 13:35:33.184 | INFO | __main__:nearest_neighbor_for_stop:66 - nearest neighbor for stop_segments 2024-10-16: 0:14:44.207350 diff --git a/rt_segment_speeds/scripts/test_new_vp_near.py b/rt_segment_speeds/scripts/test_new_vp_near.py new file mode 100644 index 000000000..f50bee800 --- /dev/null +++ b/rt_segment_speeds/scripts/test_new_vp_near.py @@ -0,0 +1,98 @@ +import datetime +import sys + +from dask import delayed, compute +from loguru import logger + +from update_vars import GTFS_DATA_DICT, SEGMENT_GCS + +# Add me for this script +from calitp_data_analysis import utils +from typing import Literal, Optional +from segment_speed_utils.project_vars import SEGMENT_TYPES +from segment_speed_utils import neighbor +from pathlib import Path +from nearest_vp_to_stop import stop_times_for_shape_segments + +def nearest_neighbor_for_stop( + analysis_date: str, + segment_type: Literal[SEGMENT_TYPES], + config_path: Optional[Path] = GTFS_DATA_DICT +): + """ + Set up nearest neighbors for RT stop times, which + includes all trips. Use stop sequences for each trip. + """ + start = datetime.datetime.now() + + dict_inputs = config_path[segment_type] + + EXPORT_FILE = f'{dict_inputs["stage2"]}_{analysis_date}_test' + trip_stop_cols = [*dict_inputs["trip_stop_cols"]] + + stop_time_col_order = [ + 'trip_instance_key', 'shape_array_key', + 'stop_sequence', 'stop_id', 'stop_pair', + 'stop_primary_direction', 'geometry' + ] + + if segment_type == "stop_segments": + stop_times = stop_times_for_shape_segments(analysis_date, dict_inputs) + stop_times = stop_times.reindex(columns = stop_time_col_order) + + else: + print(f"{segment_type} is not valid") + + gdf = neighbor.merge_stop_vp_for_nearest_neighbor( + stop_times, analysis_date) + + results = neighbor.add_nearest_neighbor_result_array2(gdf, analysis_date) + + # Keep columns from results that are consistent across segment types + # use trip_stop_cols as a way to uniquely key into a row + keep_cols = trip_stop_cols + [ + "shape_array_key", + "stop_geometry", + "nearest_vp_arr" + ] + + utils.geoparquet_gcs_export( + results[keep_cols], + SEGMENT_GCS, + EXPORT_FILE, + ) + + end = datetime.datetime.now() + logger.info(f"nearest neighbor for {segment_type} " + f"{analysis_date}: {end - start}") + + del gdf, stop_times, results + + return + +if __name__ == "__main__": + + #from segment_speed_utils.project_vars import analysis_date_list + from shared_utils import rt_dates + analysis_date_list = [rt_dates.DATES["oct2024"]] + + segment_type = "stop_segments" + print(f"segment_type: {segment_type}") + + LOG_FILE = "../logs/nearest_vp.log" + logger.add(LOG_FILE, retention="3 months") + logger.add(sys.stderr, + format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", + level="INFO") + + delayed_dfs = [ + delayed(nearest_neighbor_for_stop)( + analysis_date = analysis_date, + segment_type = segment_type, + config_path = GTFS_DATA_DICT + ) for analysis_date in analysis_date_list + ] + + [compute(i)[0] for i in delayed_dfs] + + del delayed_dfs \ No newline at end of file diff --git a/rt_segment_speeds/segment_speed_utils/neighbor.py b/rt_segment_speeds/segment_speed_utils/neighbor.py index 4a3e9a75a..ffa197f93 100644 --- a/rt_segment_speeds/segment_speed_utils/neighbor.py +++ b/rt_segment_speeds/segment_speed_utils/neighbor.py @@ -55,6 +55,7 @@ def merge_stop_vp_for_nearest_neighbor( return gdf + def subset_arrays_to_valid_directions( vp_direction_array: np.ndarray, vp_geometry: shapely.LineString, @@ -63,16 +64,24 @@ def subset_arrays_to_valid_directions( stop_direction: str, ) -> np.ndarray: """ + Each row stores several arrays related to vp. + vp_direction is an array, vp_idx is an array, + and the linestring of vp coords can be coerced into an array. + + When we're doing nearest neighbor search, we want to + first filter the full array down to valid vp + before snapping it. """ + N_NEAREST_POINTS = 10 + opposite_direction = vp_transform.OPPOSITE_DIRECTIONS[stop_direction] # These are the valid index values where opposite direction # is excluded valid_indices = (vp_direction_array != opposite_direction).nonzero() - vp_coords_line = shapely.LineString( - np.array(vp_geometry.coords)[valid_indices] - ) + vp_coords_line = np.array(vp_geometry.coords)[valid_indices] + vp_idx_arr = np.asarray(vp_idx_array)[valid_indices] np_inds = geo_utils.nearest_snap( @@ -84,11 +93,10 @@ def subset_arrays_to_valid_directions( # if we want 10 nearest neighbors and 8th, 9th, 10th are all # the same result, the 8th will have a result, then 9th and 10th will # return the length of the array (which is out-of-bounds) - np_inds2 = np_inds[np_inds < vp_idx_arr.size] - + nearest_vp_arr = vp_idx_arr[np_inds2] - + return nearest_vp_arr @@ -99,71 +107,21 @@ def add_nearest_neighbor_result_array( ) -> pd.DataFrame: """ Add the nearest k_neighbors result. - """ - N_NEAREST_POINTS = 10 - + """ nearest_vp_arr_series = [] for row in gdf.itertuples(): - # These are the ones that do not run in the - # opposite direction as stop_primary_direction - # Ex: if stop1 to stop2 is going eastbound, - # we do not allow westbound traveling vp - vp_coords_line = getattr(row, "vp_primary_direction") - stop_direction = getattr(row, "stop_primary_direction") - opposite_direction = vp_transform.OPPOSITE_DIRECTIONS[stop_direction] - - # These are the valid index values where opposite direction - # is excluded - valid_indices = (vp_coords_line != opposite_direction).nonzero() - - vp_coords_line = np.array(getattr(row, "vp_geometry").coords)[valid_indices] - vp_coords_line = shapely.LineString(vp_coords_line) - stop_geometry = getattr(row, "stop_geometry") - vp_idx_arr = np.asarray(getattr(row, "vp_idx"))[valid_indices] - - np_inds = geo_utils.nearest_snap( - vp_coords_line, stop_geometry, N_NEAREST_POINTS + nearest_vp_arr = subset_arrays_to_valid_directions( + getattr(row, "vp_primary_direction"), + getattr(row, "vp_geometry"), + getattr(row, "vp_idx"), + getattr(row, "stop_geometry"), + getattr(row, "stop_primary_direction"), ) - # nearest neighbor returns self.N - # if there are no nearest neighbor results found - # if we want 10 nearest neighbors and 8th, 9th, 10th are all - # the same result, the 8th will have a result, then 9th and 10th will - # return the length of the array (which is out-of-bounds) - - np_inds2 = np_inds[np_inds < vp_idx_arr.size] - - nearest_vp_arr = vp_idx_arr[np_inds2] - nearest_vp_arr_series.append(nearest_vp_arr) - - gdf2 = gdf.assign( - nearest_vp_arr = nearest_vp_arr_series - ).drop(columns = ["vp_primary_direction", "vp_idx", "vp_geometry"]) - - return gdf2 - -def add_nearest_neighbor_result_array2( - gdf: gpd.GeoDataFrame, - analysis_date: str, - **kwargs -) -> pd.DataFrame: - """ - Add the nearest k_neighbors result. - """ - N_NEAREST_POINTS = 10 - - nearest_vp_arr = np.vectorize( - subset_arrays_to_valid_directions)( - gdf.vp_primary_direction, - gdf.vp_geometry, - gdf.vp_idx, - gdf.stop_geometry, - gdf.stop_primary_direction, - ) - + gdf2 = gdf.assign( nearest_vp_arr = nearest_vp_arr_series ).drop(columns = ["vp_primary_direction", "vp_idx", "vp_geometry"]) diff --git a/rt_segment_speeds/segment_speed_utils/vp_transform.py b/rt_segment_speeds/segment_speed_utils/vp_transform.py index 3a3703f92..48694b585 100644 --- a/rt_segment_speeds/segment_speed_utils/vp_transform.py +++ b/rt_segment_speeds/segment_speed_utils/vp_transform.py @@ -68,73 +68,4 @@ def sort_by_vp_idx_order( geom_sorted = np.take_along_axis(geometry_array, sort_order, axis=0) timestamp_sorted = np.take_along_axis(timestamp_array, sort_order, axis=0) - return vp_sorted, geom_sorted, timestamp_sorted - - -def combine_valid_vp_for_direction( - vp_condensed: gpd.GeoDataFrame, - direction: str -) -> gpd.GeoDataFrame: - - opposite_direction = OPPOSITE_DIRECTIONS[direction] - - coords_series = [] - vp_idx_series = [] - timestamp_series = [] - moving_timestamp_series = [] - - for row in vp_condensed.itertuples(): - vp_dir_arr = np.asarray(getattr(row, "vp_primary_direction")) - - - valid_indices = (vp_dir_arr != opposite_direction).nonzero() - - # Subset all the other arrays to these indices - vp_idx_arr = np.asarray(getattr(row, "vp_idx")) - coords_arr = np.array(getattr(row, "geometry").coords) - - timestamp_arr = np.asarray( - getattr(row, "location_timestamp_local")) - - moving_timestamp_arr = np.asarray( - getattr(row, "moving_timestamp_local") - ) - - vp_linestring = coords_arr[valid_indices] - - if len(vp_linestring) > 1: - valid_vp_line = shapely.LineString([shapely.Point(p) - for p in vp_linestring]) - elif len(vp_linestring) == 1: - valid_vp_line = shapely.Point([p for p in vp_linestring]) - else: - valid_vp_line = shapely.LineString() - - coords_series.append(valid_vp_line) - vp_idx_series.append(vp_idx_arr[valid_indices]) - timestamp_series.append(timestamp_arr[valid_indices]) - moving_timestamp_series.append(moving_timestamp_arr[valid_indices]) - - - vp_condensed = vp_condensed.assign( - vp_primary_direction = direction, - geometry = coords_series, - vp_idx = vp_idx_series, - location_timestamp_local = timestamp_series, - moving_timestamp_local = moving_timestamp_series, - )[["trip_instance_key", "vp_primary_direction", - "geometry", "vp_idx", - "location_timestamp_local", - "moving_timestamp_local" - ]].reset_index(drop=True) - - gdf = gpd.GeoDataFrame( - vp_condensed, - geometry = "geometry", - crs = WGS84 - ) - - del coords_series, vp_idx_series, timestamp_series - del vp_condensed - - return gdf \ No newline at end of file + return vp_sorted, geom_sorted, timestamp_sorted \ No newline at end of file From 87bbad6432b6754f6e2e23097bebcfb661568b25 Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Fri, 6 Dec 2024 21:28:00 +0000 Subject: [PATCH 3/7] update segment cutting once gtfs-segments pinned to 2.1.7 --- rt_segment_speeds/scripts/cut_stop_segments.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/rt_segment_speeds/scripts/cut_stop_segments.py b/rt_segment_speeds/scripts/cut_stop_segments.py index a04bc295c..a5ef2d2f6 100644 --- a/rt_segment_speeds/scripts/cut_stop_segments.py +++ b/rt_segment_speeds/scripts/cut_stop_segments.py @@ -74,7 +74,10 @@ def cut_stop_segments(analysis_date: str) -> gpd.GeoDataFrame: # so let's partition it with a lot of npartitions ddf = ddf.repartition(npartitions=150).persist() - renamed_ddf = ddf.rename(columns = {"stop_id": "stop_id1"}) + renamed_ddf = ddf.rename(columns = { + "stop_id": "stop_id1", + "arrival_time": "arrival_time1" + }) orig_dtypes = renamed_ddf.dtypes.to_dict() segments = ddf.map_partitions( @@ -85,15 +88,19 @@ def cut_stop_segments(analysis_date: str) -> gpd.GeoDataFrame: "stop_id2": "str", "end": "geometry", "snap_end_id": "int", + "arrival_time2": "int", "segment_id": "str" }, align_dataframes = False ) - + # We don't need several of these columns, esp 3 geometry columns segments = (segments.drop( - columns = ["start", "end", - "snap_start_id", "snap_end_id"] + columns = [ + "start", "end", + "snap_start_id", "snap_end_id", + "arrival_time1", "arrival_time2" + ] ).pipe( gtfs_schedule_wrangling.gtfs_segments_rename_cols, natural_identifier = False From e8bd1d5cac529fc9d68213683b22a453e4b39d95 Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Mon, 9 Dec 2024 18:29:32 +0000 Subject: [PATCH 4/7] bump gtfs-segments version --- _shared_utils/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/_shared_utils/requirements.txt b/_shared_utils/requirements.txt index 790acea4b..5f80c78b3 100644 --- a/_shared_utils/requirements.txt +++ b/_shared_utils/requirements.txt @@ -1,7 +1,7 @@ -e . altair==5.3.0 altair-transform==0.2.0 -gtfs-segments==0.1.0 +gtfs-segments==2.1.7 pyairtable==2.2.2 great_tables==0.14.0 omegaconf==2.3.0 # better yaml configuration From fa544a5c2609a9d3c0280809c80c966b984643cb Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Mon, 9 Dec 2024 18:29:53 +0000 Subject: [PATCH 5/7] remove redundant line --- .../segment_speed_utils/gtfs_schedule_wrangling.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rt_segment_speeds/segment_speed_utils/gtfs_schedule_wrangling.py b/rt_segment_speeds/segment_speed_utils/gtfs_schedule_wrangling.py index aee49b8a5..27120c2a7 100644 --- a/rt_segment_speeds/segment_speed_utils/gtfs_schedule_wrangling.py +++ b/rt_segment_speeds/segment_speed_utils/gtfs_schedule_wrangling.py @@ -548,8 +548,7 @@ def get_sched_trips_hr(analysis_date: str) -> pd.DataFrame: keep_trip_cols = ['trip_instance_key', 'gtfs_dataset_key', 'route_id', 'shape_id'] trips = helpers.import_scheduled_trips(analysis_date, columns=keep_trip_cols) - trips = trips.rename( - columns={'gtfs_dataset_key': 'schedule_gtfs_dataset_key'}) + time_buckets = get_trip_time_buckets(analysis_date) trips = pd.merge(trips, time_buckets, on='trip_instance_key', how='inner') schedule_trip_counts = count_trips_by_group(trips, From 6764dcf2057d21e8d5d2f0fd359be6847a277e50 Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Mon, 9 Dec 2024 19:36:40 +0000 Subject: [PATCH 6/7] remove vp_nn entry in catalog --- _shared_utils/shared_utils/gtfs_analytics_data.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/_shared_utils/shared_utils/gtfs_analytics_data.yml b/_shared_utils/shared_utils/gtfs_analytics_data.yml index daf8a6561..5391c75b8 100644 --- a/_shared_utils/shared_utils/gtfs_analytics_data.yml +++ b/_shared_utils/shared_utils/gtfs_analytics_data.yml @@ -32,7 +32,6 @@ speeds_tables: usable_vp: vp_usable vp_dwell: vp_usable_dwell vp_condensed_line: condensed/vp_condensed - vp_nearest_neighbor: condensed/vp_nearest_neighbor timestamp_col: ${speed_vars.timestamp_col} time_min_cutoff: ${speed_vars.time_min_cutoff} From ebd0676064df1e47f27cceebc9ec21508707fe8f Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Mon, 9 Dec 2024 19:41:39 +0000 Subject: [PATCH 7/7] (remove): test new nearest vp script --- rt_segment_speeds/scripts/test_new_vp_near.py | 98 ------------------- 1 file changed, 98 deletions(-) delete mode 100644 rt_segment_speeds/scripts/test_new_vp_near.py diff --git a/rt_segment_speeds/scripts/test_new_vp_near.py b/rt_segment_speeds/scripts/test_new_vp_near.py deleted file mode 100644 index f50bee800..000000000 --- a/rt_segment_speeds/scripts/test_new_vp_near.py +++ /dev/null @@ -1,98 +0,0 @@ -import datetime -import sys - -from dask import delayed, compute -from loguru import logger - -from update_vars import GTFS_DATA_DICT, SEGMENT_GCS - -# Add me for this script -from calitp_data_analysis import utils -from typing import Literal, Optional -from segment_speed_utils.project_vars import SEGMENT_TYPES -from segment_speed_utils import neighbor -from pathlib import Path -from nearest_vp_to_stop import stop_times_for_shape_segments - -def nearest_neighbor_for_stop( - analysis_date: str, - segment_type: Literal[SEGMENT_TYPES], - config_path: Optional[Path] = GTFS_DATA_DICT -): - """ - Set up nearest neighbors for RT stop times, which - includes all trips. Use stop sequences for each trip. - """ - start = datetime.datetime.now() - - dict_inputs = config_path[segment_type] - - EXPORT_FILE = f'{dict_inputs["stage2"]}_{analysis_date}_test' - trip_stop_cols = [*dict_inputs["trip_stop_cols"]] - - stop_time_col_order = [ - 'trip_instance_key', 'shape_array_key', - 'stop_sequence', 'stop_id', 'stop_pair', - 'stop_primary_direction', 'geometry' - ] - - if segment_type == "stop_segments": - stop_times = stop_times_for_shape_segments(analysis_date, dict_inputs) - stop_times = stop_times.reindex(columns = stop_time_col_order) - - else: - print(f"{segment_type} is not valid") - - gdf = neighbor.merge_stop_vp_for_nearest_neighbor( - stop_times, analysis_date) - - results = neighbor.add_nearest_neighbor_result_array2(gdf, analysis_date) - - # Keep columns from results that are consistent across segment types - # use trip_stop_cols as a way to uniquely key into a row - keep_cols = trip_stop_cols + [ - "shape_array_key", - "stop_geometry", - "nearest_vp_arr" - ] - - utils.geoparquet_gcs_export( - results[keep_cols], - SEGMENT_GCS, - EXPORT_FILE, - ) - - end = datetime.datetime.now() - logger.info(f"nearest neighbor for {segment_type} " - f"{analysis_date}: {end - start}") - - del gdf, stop_times, results - - return - -if __name__ == "__main__": - - #from segment_speed_utils.project_vars import analysis_date_list - from shared_utils import rt_dates - analysis_date_list = [rt_dates.DATES["oct2024"]] - - segment_type = "stop_segments" - print(f"segment_type: {segment_type}") - - LOG_FILE = "../logs/nearest_vp.log" - logger.add(LOG_FILE, retention="3 months") - logger.add(sys.stderr, - format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", - level="INFO") - - delayed_dfs = [ - delayed(nearest_neighbor_for_stop)( - analysis_date = analysis_date, - segment_type = segment_type, - config_path = GTFS_DATA_DICT - ) for analysis_date in analysis_date_list - ] - - [compute(i)[0] for i in delayed_dfs] - - del delayed_dfs \ No newline at end of file