Skip to content

Commit

Permalink
Merge pull request #1248 from cal-itp/schedule-stop-metrics
Browse files Browse the repository at this point in the history
Schedule stop metrics
  • Loading branch information
tiffanychu90 authored Oct 8, 2024
2 parents b5e6658 + 4c52d0f commit d2f900d
Show file tree
Hide file tree
Showing 28 changed files with 554 additions and 347 deletions.
2 changes: 1 addition & 1 deletion _shared_utils/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
setup(
name="shared_utils",
packages=find_packages(),
version="2.6",
version="2.7",
description="Shared utility functions for data analyses",
author="Cal-ITP",
license="Apache",
Expand Down
18 changes: 0 additions & 18 deletions _shared_utils/shared_utils/catalog_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from typing import Literal

import intake
import yaml
from omegaconf import OmegaConf # this is yaml parser

repo_name = "data-analyses/"
Expand All @@ -22,20 +21,3 @@ def get_catalog(catalog_name: Literal["shared_data_catalog", "gtfs_analytics_dat

else:
return intake.open_catalog(catalog_path)


def get_parameters(config_file: str, key: str) -> dict:
"""
Parse the config.yml file to get the parameters needed
for working with route or stop segments.
These parameters will be passed through the scripts when working
with vehicle position data.
Returns a dictionary of parameters.
"""
# https://aaltoscicomp.github.io/python-for-scicomp/scripts/
with open(config_file) as f:
my_dict = yaml.safe_load(f)
params_dict = my_dict[key]

return params_dict
2 changes: 2 additions & 0 deletions _shared_utils/shared_utils/gtfs_analytics_data.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ rt_vs_schedule_tables:
vp_trip_metrics: "vp_trip/trip_metrics"
vp_route_direction_metrics: "vp_route_dir/route_direction_metrics"
vp_operator_metrics: "vp_operator/operator_metrics"
sched_stop_metrics: "schedule_stop/schedule_stop_metrics"
#vp_stop_metrics: "vp_stop/vp_stop_metrics" # WIP: transit bunching
schedule_rt_stop_times: "schedule_rt_stop_times"
early_trip_minutes: -5
late_trip_minutes: 5
Expand Down
32 changes: 31 additions & 1 deletion _shared_utils/shared_utils/publish_utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import os
from pathlib import Path
from typing import Union
from typing import Literal, Union

import gcsfs
import geopandas as gpd
import pandas as pd
from shared_utils import catalog_utils

fs = gcsfs.GCSFileSystem()
SCHED_GCS = "gs://calitp-analytics-data/data-analyses/gtfs_schedule/"
PUBLIC_BUCKET = "gs://calitp-publish-data-analysis/"
GTFS_DATA_DICT = catalog_utils.get_catalog("gtfs_analytics_data")


def write_to_public_gcs(
Expand Down Expand Up @@ -59,3 +63,29 @@ def exclude_private_datasets(
Filter out private datasets.
"""
return df[df[col].isin(public_gtfs_dataset_keys)].reset_index(drop=True)


def subset_table_from_previous_date(
gcs_bucket: str,
filename: Union[str, Path],
operator_and_dates_dict: dict,
date: str,
crosswalk_col: str = "schedule_gtfs_dataset_key",
data_type: Literal["df", "gdf"] = "df",
) -> pd.DataFrame:
CROSSWALK_FILE = GTFS_DATA_DICT.schedule_tables.gtfs_key_crosswalk

crosswalk = pd.read_parquet(f"{SCHED_GCS}{CROSSWALK_FILE}_{date}.parquet", columns=["name", crosswalk_col])

subset_keys = crosswalk[crosswalk.name.isin(operator_and_dates_dict[date])][crosswalk_col].unique()

if data_type == "df":
past_df = pd.read_parquet(
f"{gcs_bucket}{filename}_{date}.parquet", filters=[[(crosswalk_col, "in", subset_keys)]]
)
else:
past_df = gpd.read_parquet(
f"{gcs_bucket}{filename}_{date}.parquet", filters=[[(crosswalk_col, "in", subset_keys)]]
)

return past_df
3 changes: 2 additions & 1 deletion gtfs_funnel/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ preprocess_vp:
preprocess_schedule_only:
make route_typologies_data
python operator_scheduled_stats.py

python schedule_stats_by_stop.py

route_typologies_data:
python route_typologies.py
python schedule_stats_by_route_direction.py
Expand Down
19 changes: 0 additions & 19 deletions gtfs_funnel/published_operators.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
- Alhambra Schedule
- Amador Schedule
- Anaheim Resort Schedule
- Anaheim Resort Schedule v2
- Antelope Valley Transit Authority Schedule
- Arcadia Schedule
- Arvin Schedule
Expand Down Expand Up @@ -51,7 +50,6 @@
- Bell Gardens Schedule
- Bellflower Bus Schedule
- Big Blue Bus Schedule
- Big Blue Bus Swiftly Schedule
- BruinBus Schedule
- Burbank Schedule
- Calabasas Schedule
Expand Down Expand Up @@ -193,7 +191,6 @@
- Santa Cruz Schedule
2024-06-12:
- Anteater Express Schedule
- Lassen Flex
- Lynwood Schedule
- Manteca Schedule
2024-05-22:
Expand All @@ -207,29 +204,13 @@
- Rosemead Schedule
2023-12-13:
- DowneyLINK Schedule
- Humboldt Flex
- Laguna Beach Flex
- Manteca Flex
- Placer Flex
- San Joaquin Flex
- Spirit Bus Schedule
- StanRTA Flex
- TART Flex
- Thousand Oaks Flex
- Tracy Flex
- Turlock Flex
- Union City Flex
- VCTC Flex
- WestCAT Flex
2023-11-15:
- Amtrak Schedule
- Mission Bay Schedule
2023-08-15:
- Blossom Express Schedule
- Eastern Sierra Flex
2023-06-14:
- Tuolumne Schedule
2023-04-12:
- Guadalupe Flex
2023-03-15:
- TIME GMV Schedule
143 changes: 143 additions & 0 deletions gtfs_funnel/schedule_stats_by_stop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
"""
Add some GTFS schedule derived metrics
by stop (arrivals, number of trips/routes served,
service hours).
This is stop grain version of schedule_stats_by_route_direction.
Grain: schedule_gtfs_dataset_key-stop_id
"""
import datetime
import geopandas as gpd
import pandas as pd

from calitp_data_analysis.geography_utils import WGS84
from calitp_data_analysis import utils
from segment_speed_utils import helpers

def stats_for_stop(
df: pd.DataFrame,
group_cols: list
) -> pd.DataFrame:
"""
List the stats we'd like to calculate for each stop.
"""
df2 = (
df
.groupby(group_cols, group_keys=False)
.agg({
"route_id": lambda x: list(sorted(set(x))),
"route_type": lambda x: list(sorted(set(x))),
"departure_sec": "count",
"departure_hour": "nunique"
}).reset_index()
.rename(columns = {
"departure_sec": "n_arrivals",
"departure_hour": "n_hours_in_service",
"route_id": "route_ids_served",
"route_type": "route_types_served"
})
)

df2 = df2.assign(
n_routes = df2.apply(lambda x: len(x.route_ids_served), axis=1)
)

# Instead of producing list, we want to show values like 0, 3 instead of [0, 3]
# portal users can see combinations more quickly
# and access particular rows using str.contains
df2 = df2.assign(
route_types_served = df2.route_types_served.str.join(", "),
route_ids_served = df2.route_ids_served.str.join(", "),
)

return df2


def schedule_stats_by_stop(
analysis_date: str
) -> gpd.GeoDataFrame:
"""
Import stop_times, trips, and stops.
Merge and aggregate for stop-level schedule stats.
Calculate some extra stats from other schedule tables,
such as how many route_ids and route_types the
stop shares.
"""
# departure hour nunique values can let us know span of service
stop_times = helpers.import_scheduled_stop_times(
analysis_date,
columns = ["feed_key", "stop_id", "trip_id",
"departure_sec", "departure_hour"],
with_direction = False,
get_pandas = True
)

# include route info so we know how many trips, routes,
# route_types that the stop serves
# stop can serve 1 light rail + 5 bus routes vs 6 bus routes
trips = helpers.import_scheduled_trips(
analysis_date,
columns = ["gtfs_dataset_key", "feed_key",
"trip_id",
"route_id", "route_type"],
get_pandas = True,
)

stops = helpers.import_scheduled_stops(
analysis_date,
columns = ["feed_key", "stop_id", "stop_name", "geometry"],
get_pandas = True,
crs = WGS84
)

stop_df = pd.merge(
stop_times,
trips,
on = ["feed_key", "trip_id"],
how = "inner"
).pipe(
stats_for_stop,
group_cols = ["schedule_gtfs_dataset_key", "feed_key", "stop_id"]
)


stop_gdf = pd.merge(
stops,
stop_df,
on = ["feed_key", "stop_id"],
how = "inner"
).drop(columns = "feed_key")

# Fix order of columns
col_order = [
c for c in stop_gdf.columns
if c not in ["schedule_gtfs_dataset_key", "geometry"]
]

stop_gdf = stop_gdf.reindex(
columns = ["schedule_gtfs_dataset_key", *col_order, "geometry"]
)

return stop_gdf


if __name__ == "__main__":

from update_vars import analysis_date_list, RT_SCHED_GCS, GTFS_DATA_DICT

EXPORT_FILE = GTFS_DATA_DICT.rt_vs_schedule_tables.sched_stop_metrics

for analysis_date in analysis_date_list:
start = datetime.datetime.now()

gdf = schedule_stats_by_stop(analysis_date)

utils.geoparquet_gcs_export(
gdf,
RT_SCHED_GCS,
f"{EXPORT_FILE}_{analysis_date}"
)

end = datetime.datetime.now()
print(f"schedule stop stats for {analysis_date}: {end - start}")
17 changes: 14 additions & 3 deletions gtfs_funnel/track_publish_dates.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pathlib import Path
from typing import Union

from shared_utils import rt_dates
from shared_utils import gtfs_utils_v2, rt_dates
from segment_speed_utils import time_series_utils

def filter_to_recent_date(df: pd.DataFrame) -> pd.DataFrame:
Expand All @@ -29,6 +29,7 @@ def filter_to_recent_date(df: pd.DataFrame) -> pd.DataFrame:
)
return df2


def export_results_yml(
df: pd.DataFrame,
export_yaml: Union[str, Path]
Expand All @@ -41,18 +42,25 @@ def export_results_yml(
# operator names that have more recent names that we are keeping,
# so we can remove these from our yaml
exclude_me = [
"TIME GMV"
"Flex",
]

df2 = df.copy()

for exclude_word in exclude_me:

df2 = df[~df.name.isin(exclude_me)]
df2 = df2[~df2.name.str.contains(exclude_word)]

# yaml export can have date as string
# but yaml safe_load will automatically parse as datetime again
my_dict = {
**{
date_key: df2[df2.service_date==date_key].name.tolist()
for date_key in df2.service_date.unique()
}
}


# sort_keys=False to prevent alphabetical sort (earliest date first)
# because we want to main our results and yaml with most recent date first
output = pyaml.dump(my_dict, sort_keys=False)
Expand All @@ -73,12 +81,15 @@ def export_results_yml(

TABLE = GTFS_DATA_DICT.schedule_downloads.trips

public_feeds = gtfs_utils_v2.filter_to_public_schedule_gtfs_dataset_keys()

operators = time_series_utils.concatenate_datasets_across_dates(
COMPILED_CACHED_VIEWS,
TABLE,
rt_dates.y2024_dates + rt_dates.y2023_dates,
data_type = "df",
get_pandas = True,
filters = [[("gtfs_dataset_key", "in", public_feeds)]],
columns = ["name"]
).drop_duplicates().pipe(filter_to_recent_date)

Expand Down
2 changes: 1 addition & 1 deletion open_data/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ compile_open_data_portal:
#python arcgis_script_pro.py #(in ESRI!)
python update_data_dict.py # check if columns are missing in data_dictionary yml
python update_fields_fgdc.py # populate fields with data dictionary yml values, run if update_data_dict had changes to incorporate
python open_data.py # go back into ESRI and update xml
python metadata_update_pro.py # go back into ESRI and update xml
python cleanup.py # run after ESRI work done
Loading

0 comments on commit d2f900d

Please sign in to comment.