Skip to content

Commit

Permalink
Merge pull request #1335 from cal-itp/gtfs-funnel-performance
Browse files Browse the repository at this point in the history
Gtfs funnel performance improvements
  • Loading branch information
tiffanychu90 authored Dec 27, 2024
2 parents 2b18dd1 + a5baf6c commit 7bf7c63
Show file tree
Hide file tree
Showing 10 changed files with 404 additions and 547 deletions.
3 changes: 1 addition & 2 deletions gtfs_funnel/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ preprocess_schedule_vp_dependency:

preprocess_vp:
python vp_keep_usable.py
python vp_direction.py
python cleanup.py
python vp_dwell_time.py
python vp_condenser.py
Expand All @@ -22,7 +21,7 @@ preprocess_schedule_only:
make route_typologies_data
python operator_scheduled_stats.py
python schedule_stats_by_stop.py
#python track_publish_dates.py
python track_publish_dates.py

route_typologies_data:
python route_typologies.py
Expand Down
6 changes: 4 additions & 2 deletions gtfs_funnel/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ Use `update_vars` and input one or several days to download.
1. Harmonize how route names are displayed in time-series: `make timeseries_preprocessing`
1. Download monthly aggregated schedule data: `make monthly_scheduled_data`

[![gtfs_funnel_mermaid1](https://mermaid.ink/img/pako:eNqlVX9v2jAQ_SqWJwSTgBESCk23SqUQ_to0qWiTllSRSS7EarAj22lLK777nKRAKL86DSmJ8b1393w--15xwEPANm61Wh5TVCVgo8nUuUNOxhgkqPFTQCp4AFJSNv_ssQJYq716DCHKqLJRMUSormJYQN1G9RmRUG9WZ38RQcksAVnfwLUpFXRBxPKWJ1zkvE_jntNzbtbULWIKz2qL6nQ6-5AhFyGIY6CEMjhmkxBwFu7qcJz-eFjBKBCK7kCiKKqX5lX-0a9VreYxj80FSWM0HZbG8i2zWTkd8ieWcKKVrl0jFCREyhFEKIxQRJPEfkvDAYQMBE3VGlWIzEOucTdGw11H8JWGyna6vP9s2_YbsdW6Rlt4QXEL3L3G6PD79hiSFIRs00XKhfJlEEOYJfDm_moLHlZjy5ikcD740C2BX2fi-vfkbmD9u4zSQUXHrVGRoXiZglMibt0C9j8acn5FwmgnFdroK7r4QDpG7hZ8VMWo654UUtJ3OU9UxX5IBQSKcoa-oanI4ItDEgn3Fd3jqu5HiGmQgJ9ySXPWefnjbsPVJykgCph-znrYZef-xu5j-m4frtb1DSxcDzenabteP63eUpWteKdzZDYqSfZ3E3NyhaPuSSmP6TEJ43JpOrDGPACkfibzq_B8Pq2C8jF5-c_J4aXzo-Uz7ulNSoCwLN31V6kDp-A5RhE-vxuBSRDnw3er-DDfSP2fAREglf7SeTzjIp_-cTMamJv93V6ThxLLdakTxYUfCC7lE0keKvfdARUTLXsD9ecqkn5IlG5ISid_6SvuczEnjL6QAzndL8mJW7jIqVUe2oTYL1PcxAsQC0JD3VaLZufhogl62NbDECKSJcrDumNoKMkUv1uyANtKH8omzlItF0aU6OUvsB3lh7SJIaQ6B9_LVl107CZOCcP2K37GdmvQbZuXZvfSMvsD87JrWE28xLZ50Tas3sC6sCyj1xlYxqqJXzjXXo1219BNsGP0TVOjrb5VuPtTGIuQq79773TT?type=png)](https://mermaid.live/edit#pako:eNqlVX9v2jAQ_SqWJwSTgBESCk23SqUQ_to0qWiTllSRSS7EarAj22lLK777nKRAKL86DSmJ8b1393w--15xwEPANm61Wh5TVCVgo8nUuUNOxhgkqPFTQCp4AFJSNv_ssQJYq716DCHKqLJRMUSormJYQN1G9RmRUG9WZ38RQcksAVnfwLUpFXRBxPKWJ1zkvE_jntNzbtbULWIKz2qL6nQ6-5AhFyGIY6CEMjhmkxBwFu7qcJz-eFjBKBCK7kCiKKqX5lX-0a9VreYxj80FSWM0HZbG8i2zWTkd8ieWcKKVrl0jFCREyhFEKIxQRJPEfkvDAYQMBE3VGlWIzEOucTdGw11H8JWGyna6vP9s2_YbsdW6Rlt4QXEL3L3G6PD79hiSFIRs00XKhfJlEEOYJfDm_moLHlZjy5ikcD740C2BX2fi-vfkbmD9u4zSQUXHrVGRoXiZglMibt0C9j8acn5FwmgnFdroK7r4QDpG7hZ8VMWo654UUtJ3OU9UxX5IBQSKcoa-oanI4ItDEgn3Fd3jqu5HiGmQgJ9ySXPWefnjbsPVJykgCph-znrYZef-xu5j-m4frtb1DSxcDzenabteP63eUpWteKdzZDYqSfZ3E3NyhaPuSSmP6TEJ43JpOrDGPACkfibzq_B8Pq2C8jF5-c_J4aXzo-Uz7ulNSoCwLN31V6kDp-A5RhE-vxuBSRDnw3er-DDfSP2fAREglf7SeTzjIp_-cTMamJv93V6ThxLLdakTxYUfCC7lE0keKvfdARUTLXsD9ecqkn5IlG5ISid_6SvuczEnjL6QAzndL8mJW7jIqVUe2oTYL1PcxAsQC0JD3VaLZufhogl62NbDECKSJcrDumNoKMkUv1uyANtKH8omzlItF0aU6OUvsB3lh7SJIaQ6B9_LVl107CZOCcP2K37GdmvQbZuXZvfSMvsD87JrWE28xLZ50Tas3sC6sCyj1xlYxqqJXzjXXo1219BNsGP0TVOjrb5VuPtTGIuQq79773TT)

[![gtfs_funnel_mermaid1](https://mermaid.ink/img/pako:eNqllW9v2jAQxr-K5QlBJWCEhJamW6VSSF9tmlS0SQtVZJILsWrsyHZKadXvPicpJLQFNu0FSYh_z92T8597xqGIALu40-nMuKaagYtupt4t8jLOgaHWDwmpFCEoRfniZMYLsNF4nnGEKKfaRcUjQk2dwBKaLmrOiYJmu_72J5GUzBmo5hY3Q6mkSyLX14IJmes-TQbewLvaSCtiCo-6onq93ntkJGQEch_EKId9YwpCwaNdH553NhnVGA1S0x0kjuNmOfyS38zlpdGY8RlfSJImaDoqB8uryubl60isOBPEON2ERihkRKkxxCiKUUwZc1_L8AGhQklTvaEKk3nKDXdltfxNhkAbVHXT9d2J67qvwk7nElV4IfEL7s4wJv378QRYClJ16TIVUgcqTCDKGLyGv6jgUT23SkgKx5OP_BL8MpeXv25uh86_2ygD1HxcWzUbWpQlOGTi2i-w__GQ62sWxjulMIOBpsu_KMfYr-C9LsZ9_6CRUr6rWVGdBBGVEGoqOPqKpjKDzx5hCu5qvid13w-Q0JBBkApFc9Vx-5N-yzc7KSQauPkdjbCrzuNN_If0zTxcbNY38GjzuN1N1fcGaf2Uqk3FG59ju1UrcrBbmINfOO4ftPKQ7rMwKT_NJDbMPUAaZCo_Co_X0ykk0QoYK_weVXg5X0YvZXtX0WRg5ooB4Vm6G7W2HLxC51mFi_yIBK5AHjfRz3kORILS5k4XyVzIfFq_X42H9nZaq9Pxo3oKs8KJFjIIpVBqRdh97Zj7IOuNsblFg4WOVRARbfqQNjVfB1oEQi4Ip0_kg5l-vxJv_CJELq3r0DbF-9WJ23gJckloZLpp0eNmuOh9M-yaxwhikjE9w6ZRGJRkWtyueYhdbfZiG0uRLRLsxvmebOMsNeZhTIkpxnL7NiX8txDVf4ioqdC3sn8XbbxgsPuMH7HbGfa79rndP3fss6F93recNl5j1z7tWs5g6Jw6jjXoDR3rpY2fiqhWt2-Zztizzmzb0M6Z8_IHVGx2Qw?type=png)](https://mermaid.live/edit#pako:eNqllW9v2jAQxr-K5QlBJWCEhJamW6VSSF9tmlS0SQtVZJILsWrsyHZKadXvPicpJLQFNu0FSYh_z92T8597xqGIALu40-nMuKaagYtupt4t8jLOgaHWDwmpFCEoRfniZMYLsNF4nnGEKKfaRcUjQk2dwBKaLmrOiYJmu_72J5GUzBmo5hY3Q6mkSyLX14IJmes-TQbewLvaSCtiCo-6onq93ntkJGQEch_EKId9YwpCwaNdH553NhnVGA1S0x0kjuNmOfyS38zlpdGY8RlfSJImaDoqB8uryubl60isOBPEON2ERihkRKkxxCiKUUwZc1_L8AGhQklTvaEKk3nKDXdltfxNhkAbVHXT9d2J67qvwk7nElV4IfEL7s4wJv378QRYClJ16TIVUgcqTCDKGLyGv6jgUT23SkgKx5OP_BL8MpeXv25uh86_2ygD1HxcWzUbWpQlOGTi2i-w__GQ62sWxjulMIOBpsu_KMfYr-C9LsZ9_6CRUr6rWVGdBBGVEGoqOPqKpjKDzx5hCu5qvid13w-Q0JBBkApFc9Vx-5N-yzc7KSQauPkdjbCrzuNN_If0zTxcbNY38GjzuN1N1fcGaf2Uqk3FG59ju1UrcrBbmINfOO4ftPKQ7rMwKT_NJDbMPUAaZCo_Co_X0ykk0QoYK_weVXg5X0YvZXtX0WRg5ooB4Vm6G7W2HLxC51mFi_yIBK5AHjfRz3kORILS5k4XyVzIfFq_X42H9nZaq9Pxo3oKs8KJFjIIpVBqRdh97Zj7IOuNsblFg4WOVRARbfqQNjVfB1oEQi4Ip0_kg5l-vxJv_CJELq3r0DbF-9WJ23gJckloZLpp0eNmuOh9M-yaxwhikjE9w6ZRGJRkWtyueYhdbfZiG0uRLRLsxvmebOMsNeZhTIkpxnL7NiX8txDVf4ioqdC3sn8XbbxgsPuMH7HbGfa79rndP3fss6F93recNl5j1z7tWs5g6Jw6jjXoDR3rpY2fiqhWt2-Zztizzmzb0M6Z8_IHVGx2Qw)


[![speeds_stop_times_mermaid](https://mermaid.ink/img/pako:eNqlVV1v2jAU_SuWJwSTgPERCqTapEJh07buYUV7WEDITW7AmmNHttOWVvz3OU5YkkK7Lx7A-J577_Hxsf2IfREAdnGr1VpyTTUDF31doGvYRMA1uo4BAoUID-ysFjFa0AjUktuEWu1xyRGinGoX2SFCdb2FCOouqt8QBfVmefYbkZTcMFD1X3ATiiWNiNxNBRMyzXs1G8wH84tDaoFYwL0uUJ1O5xgyETIA-RyIUQ7PxRT4ggdVHvP5cDYpYTRITSuQMAzrWXif_pivfa225EseMnHnb4nUaDHJACq52UgSb5HKxKUPcCiMkM-IUpcQoiBEIWXMzUU4gVC-pLE-oCzFU6isyWIXQw6dT-f98bSJlJbiB7gp9ZToIfOi2_D8RK-V2eR1nq3a8W712nXdvGerhYqE9EMYQ9qEkM3SqTUM6N0T1IVXKboyBYPwvMBMTGsFDPy_6C44ZJ1jkEhtSQx1VWJRAJXIpmNCpXGyBGT2WVGlU38XMOJLoZQtaZdQYufZ8lVqq_OydtkIeHCYLPbaHqB1TGNIzVdOmjY8DoaP0utbw1rYBkdLfqrlZcOj3PgwFozonBSRkt4S9tvcWcMr45VtmhKsZp6jA82TK0ovgVyGfHlFl3kjj6y1Md7bimS2QcmVx_ymJU_MbPx9wyO3IMkGDmUywsfWKG3YB08KxhLTmfINg4Ds3uQTUcI0Nf9L7sj2R4rEiBlQ-YJJM0Ivi2MuyVMe_FiVReY-t6h_keVTSZYkSu-9P5Dl83_IsipRuPIK8us7qrcpwJxdKvjKsps9WcMXT_lbCBIG68rKizOUS4mbOAIZERqYF8m-D0ts340lds0wgJAYpktsLlkDJYkW1zvuY1fLBJo4iQNzIi4pMVsRYTc0BjezEFAt5FX2ytnHroljwrH7iO-x2xr12v1xvzd2-sNRf9zrOk28w27_rN11BiPnzHG6g87I6e6b-EEIU7Xb7nXNu9HpDvt9g3aGji333QZty_1PICdHvQ?type=png)](https://mermaid.live/edit#pako:eNqlVV1v2jAU_SuWJwSTgPERCqTapEJh07buYUV7WEDITW7AmmNHttOWVvz3OU5YkkK7Lx7A-J577_Hxsf2IfREAdnGr1VpyTTUDF31doGvYRMA1uo4BAoUID-ysFjFa0AjUktuEWu1xyRGinGoX2SFCdb2FCOouqt8QBfVmefYbkZTcMFD1X3ATiiWNiNxNBRMyzXs1G8wH84tDaoFYwL0uUJ1O5xgyETIA-RyIUQ7PxRT4ggdVHvP5cDYpYTRITSuQMAzrWXif_pivfa225EseMnHnb4nUaDHJACq52UgSb5HKxKUPcCiMkM-IUpcQoiBEIWXMzUU4gVC-pLE-oCzFU6isyWIXQw6dT-f98bSJlJbiB7gp9ZToIfOi2_D8RK-V2eR1nq3a8W712nXdvGerhYqE9EMYQ9qEkM3SqTUM6N0T1IVXKboyBYPwvMBMTGsFDPy_6C44ZJ1jkEhtSQx1VWJRAJXIpmNCpXGyBGT2WVGlU38XMOJLoZQtaZdQYufZ8lVqq_OydtkIeHCYLPbaHqB1TGNIzVdOmjY8DoaP0utbw1rYBkdLfqrlZcOj3PgwFozonBSRkt4S9tvcWcMr45VtmhKsZp6jA82TK0ovgVyGfHlFl3kjj6y1Md7bimS2QcmVx_ymJU_MbPx9wyO3IMkGDmUywsfWKG3YB08KxhLTmfINg4Ds3uQTUcI0Nf9L7sj2R4rEiBlQ-YJJM0Ivi2MuyVMe_FiVReY-t6h_keVTSZYkSu-9P5Dl83_IsipRuPIK8us7qrcpwJxdKvjKsps9WcMXT_lbCBIG68rKizOUS4mbOAIZERqYF8m-D0ts340lds0wgJAYpktsLlkDJYkW1zvuY1fLBJo4iQNzIi4pMVsRYTc0BjezEFAt5FX2ytnHroljwrH7iO-x2xr12v1xvzd2-sNRf9zrOk28w27_rN11BiPnzHG6g87I6e6b-EEIU7Xb7nXNu9HpDvt9g3aGji333QZty_1PICdHvQ)

Expand All @@ -24,5 +26,5 @@ Use `update_vars` and input one or several days to download.
| | | pipeline and workstream outputs available |
|---|---|---|
| Sampled Wednesdays Each Month for Time-Series<br>[rt_dates.py](../_shared_utils/shared_utils/rt_dates.py) | Mar 2023 - present | downloaded schedule tables (trips, shapes, stops, stop_times)<br>downloaded vehicle positions (vp)<br><br>`gtfs_funnel`: intermediate outputs for schedule and vp<br>* crosswalk<br>* schedule only metrics related to service availability<br>* operator aggregated metrics from schedule data<br>* route typologies<br><br><br>`rt_segment_speeds`: vp interpreted as speeds against <br>various segment types<br>* segment types: <br>(1) `stop segments` (shape-stop segments,<br>most common shape selected for a route-direction and all <br>trips aggregated to that shape)<br>(2) `rt_stop_times` (trip-stop segments, most granular, <br>cannot be aggregated, but used for rt_stop_times table)<br>(3) `speedmap segments`<br>(4) `road segments` (1 km road segments with all <br>transit across operators aggregated to the same physical <br>road space, currently WIP)<br>* interpolated stop arrivals <br>* speeds by trip<br>* segment and summary speeds for single day<br><br>`rt_vs_schedule`: <br>* RT vs schedule metrics<br>* rt_stop_times table (companion to scheduled stop_times)<br><br>`gtfs_digest`:<br>* downstream data product using all the outputs created in <br>gtfs_funnel, rt_segment_speeds, rt_vs_schedule. |
| Full Week for Weekly Averages<br>April / October each year | Apr 2023<br>Oct 2023<br>Apr 2024 | rt_segment_speeds:<br>* segment and summary speeds for a week<br><br>gtfs_digest<br>* service hours by hour for weekday / Saturday / Sunday |
| Full Week for Weekly Averages<br>April / October each year | Apr 2023<br>Oct 2023<br>Apr 2024<br>Oct 2024 | rt_segment_speeds:<br>* segment and summary speeds for a week<br><br>gtfs_digest<br>* service hours by hour for weekday / Saturday / Sunday |
| | | |
2 changes: 1 addition & 1 deletion gtfs_funnel/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
INPUT_FILE = GTFS_DATA_DICT.speeds_tables.usable_vp

publish_utils.if_exists_then_delete(
f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}_stage"
f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}_stage.parquet"
)
publish_utils.if_exists_then_delete(
f"{SEGMENT_GCS}vp_direction_{analysis_date}.parquet"
Expand Down
8 changes: 4 additions & 4 deletions gtfs_funnel/mermaid.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ graph TB
subgraph vp_preprocessing
E --> E3([vp_keep_usable.py]):::script -->
E4([vp_direction.py]):::script -->
F[vp_usable]:::df -->
E4([vp_dwell_time.py]):::script -->
F[vp_usable_dwell]:::df -->
E5([cleanup.py]):::script;
F --> F1([vp_condenser.py]):::script -->
F2[vp_condensed<br>vp_nearest_neighbor<br>NAD83]:::df;
F2[vp_nearest_neighbor<br>NAD83]:::df;
end
Expand Down Expand Up @@ -126,7 +126,7 @@ flowchart TB
end
subgraph RT stop_times
J(segment_type=rt_stop_times):::segmentType -->
J(segment_type=rt_stop_times<br>speedmap_segments):::segmentType -->
C;
E --> K([average_summary_speeds.py]):::script -->
L[rollup_singleday/rollup_multiday
Expand Down
210 changes: 96 additions & 114 deletions gtfs_funnel/stop_times_with_direction.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import geopandas as gpd
import numpy as np
import pandas as pd
import sys

from loguru import logger

from calitp_data_analysis import utils
from shared_utils import rt_utils
Expand Down Expand Up @@ -56,109 +59,114 @@ def prep_scheduled_stop_times(analysis_date: str) -> gpd.GeoDataFrame:
).drop(columns = ["trip_id"])

st_with_stop = gpd.GeoDataFrame(
st_with_stop, geometry = "geometry", crs = PROJECT_CRS)
st_with_stop, geometry = "geometry", crs = PROJECT_CRS
)

return st_with_stop


def get_projected_stop_meters(
stop_times: pd.DataFrame,
shapes: gpd.GeoDataFrame
) -> pd.DataFrame:
stop_times: gpd.GeoDataFrame,
analysis_date: str,
) -> pd.Series:
"""
Project the stop's position to the shape and
get stop_meters (meters from start of the shape).
Only return stop_meters as pd.Series to use as a column later.
"""
shapes = helpers.import_scheduled_shapes(
analysis_date,
columns = ["shape_array_key", "geometry"],
crs = PROJECT_CRS,
get_pandas=True
).dropna(subset="geometry")

gdf = pd.merge(
stop_times,
shapes.rename(columns = {"geometry": "shape_geometry"}),
stop_times.to_crs(PROJECT_CRS),
shapes.to_crs(PROJECT_CRS).rename(columns = {"geometry": "shape_geometry"}),
on = "shape_array_key",
how = "inner"
)
).set_geometry("geometry")

gdf = gdf.assign(
stop_meters = gdf.shape_geometry.project(gdf.geometry)
).drop(columns = "shape_geometry").drop_duplicates()

return gdf
stop_meters = gdf.shape_geometry.project(gdf.geometry)

return stop_meters


def find_prior_subseq_stop(
stop_times: gpd.GeoDataFrame,
trip_stop_cols: list
def find_prior_subseq_stop_info(
stop_times: gpd.GeoDataFrame,
analysis_date: str,
trip_cols: list = ["trip_instance_key"],
trip_stop_cols: list = ["trip_instance_key", "stop_sequence"]
) -> gpd.GeoDataFrame:
"""
For trip-stop, find the previous stop (using stop sequence).
Attach the previous stop's geometry.
This will determine the direction for the stop (it's from prior stop).
Add in subseq stop information too.
Create columns related to comparing current to prior stop.
- stop_pair (stop_id1_stop_id2)
- stop_pair_name (stop_name1__stop_name2)
"""
prior_stop = stop_times[trip_stop_cols].sort_values(
trip_stop_cols).reset_index(drop=True)

prior_stop = prior_stop.assign(
prior_stop_sequence = (prior_stop.groupby("trip_instance_key")
stop_meters = get_projected_stop_meters(stop_times, analysis_date)

gdf = stop_times[
trip_stop_cols + ["stop_id", "stop_name", "geometry"]
].assign(
stop_meters = stop_meters
)

gdf = gdf.assign(
prior_geometry = (gdf.groupby(trip_cols)
.geometry
.shift(1)),
prior_stop_sequence = (gdf.groupby(trip_cols)
.stop_sequence
.shift(1)),
# add subseq stop info here
subseq_stop_sequence = (prior_stop.groupby("trip_instance_key")
subseq_stop_sequence = (gdf.groupby(trip_cols)
.stop_sequence
.shift(-1)),
subseq_stop_id = (prior_stop.groupby("trip_instance_key")
subseq_stop_id = (gdf.groupby(trip_cols)
.stop_id
.shift(-1)),
subseq_stop_name = (prior_stop.groupby("trip_instance_key")
subseq_stop_name = (gdf.groupby(trip_cols)
.stop_name
.shift(-1))
.shift(-1)),
).fillna({
**{c: "" for c in ["subseq_stop_id", "subseq_stop_name"]}
})


stop_direction = np.vectorize(rt_utils.primary_cardinal_direction)(
gdf.prior_geometry.fillna(gdf.geometry), gdf.geometry)

# Just keep subset of columns because we'll get other stop columns back when we merge with stop_times
keep_cols = [
"trip_instance_key", "stop_sequence",
"stop_meters",
"prior_stop_sequence", "subseq_stop_sequence"
]

# Create stop pair with underscores, since stop_id
# can contain hyphens
gdf2 = gdf[keep_cols].assign(
stop_primary_direction = stop_direction,
stop_pair = gdf.stop_id.astype(str).str.cat(
gdf.subseq_stop_id.astype(str)),
stop_pair_name = gdf.stop_name.astype(str).str.cat(
gdf.subseq_stop_name.astype(str)),
)

# Merge in prior stop geom as a separate column so we can
# calculate distance / direction
prior_stop_geom = (stop_times[["trip_instance_key",
"stop_sequence", "geometry"]]
.rename(columns = {
"stop_sequence": "prior_stop_sequence",
"geometry": "prior_geometry"
})
.set_geometry("prior_geometry")
)

stop_times_with_prior = pd.merge(
stop_times_geom_direction = pd.merge(
stop_times,
prior_stop,
gdf2,
on = trip_stop_cols,
how = "left"
how = "inner"
)

stop_times_with_prior_geom = pd.merge(
stop_times_with_prior,
prior_stop_geom,
on = ["trip_instance_key", "prior_stop_sequence"],
how = "left"
).astype({
"prior_stop_sequence": "Int64",
"subseq_stop_sequence": "Int64"
}).fillna({
"subseq_stop_id": "",
"subseq_stop_name": ""
})

# Create stop pair with underscores, since stop_id
# can contain hyphens
stop_times_with_prior_geom = stop_times_with_prior_geom.assign(
stop_pair = stop_times_with_prior_geom.apply(
lambda x:
str(x.stop_id) + "__" + str(x.subseq_stop_id),
axis=1,
),
stop_pair_name = stop_times_with_prior_geom.apply(
lambda x:
x.stop_name + "__" + x.subseq_stop_name,
axis=1,
),
).drop(columns = ["subseq_stop_id", "subseq_stop_name"])

return stop_times_with_prior_geom

return stop_times_geom_direction


def assemble_stop_times_with_direction(
Expand All @@ -179,50 +187,17 @@ def assemble_stop_times_with_direction(

scheduled_stop_times = prep_scheduled_stop_times(analysis_date)

trip_stop_cols = ["trip_instance_key", "stop_sequence",
"stop_id", "stop_name"]
trip_cols = ["trip_instance_key"]
trip_stop_cols = ["trip_instance_key", "stop_sequence"]

scheduled_stop_times2 = find_prior_subseq_stop(
scheduled_stop_times, trip_stop_cols
)

other_stops = scheduled_stop_times2[
~(scheduled_stop_times2.prior_geometry.isna())
]

first_stop = scheduled_stop_times2[
scheduled_stop_times2.prior_geometry.isna()
]

first_stop = first_stop.assign(
stop_primary_direction = "Unknown"
).drop(columns = "prior_geometry")

other_stops_no_geom = other_stops.drop(columns = ["prior_geometry"])

prior_geom = other_stops.prior_geometry
current_geom = other_stops.geometry

# Create a column with readable direction like westbound, eastbound, etc
stop_direction = np.vectorize(
rt_utils.primary_cardinal_direction)(prior_geom, current_geom)
stop_distance = prior_geom.distance(current_geom)

other_stops_no_geom = other_stops_no_geom.assign(
stop_primary_direction = stop_direction,
stop_meters = stop_distance,
)

scheduled_stop_times_with_direction = pd.concat(
[first_stop, other_stops_no_geom],
axis=0
)

df = scheduled_stop_times_with_direction.sort_values(
trip_stop_cols).reset_index(drop=True)

time1 = datetime.datetime.now()
print(f"get scheduled stop times with direction: {time1 - start}")
df = find_prior_subseq_stop_info(
scheduled_stop_times,
analysis_date,
trip_cols = trip_cols,
trip_stop_cols = trip_stop_cols
).sort_values(
trip_stop_cols
).reset_index(drop=True)

utils.geoparquet_gcs_export(
df,
Expand All @@ -231,15 +206,22 @@ def assemble_stop_times_with_direction(
)

end = datetime.datetime.now()
print(f"execution time: {end - start}")
logger.info(
f"scheduled stop times with direction {analysis_date}: {end - start}"
)

return


if __name__ == "__main__":

from update_vars import analysis_date_list


LOG_FILE = "./logs/preprocessing.log"
logger.add(LOG_FILE, retention="3 months")
logger.add(sys.stderr,
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
level="INFO")

for date in analysis_date_list:
print(date)
assemble_stop_times_with_direction(date, GTFS_DATA_DICT)
Loading

0 comments on commit 7bf7c63

Please sign in to comment.