diff --git a/_shared_utils/shared_utils/gtfs_analytics_data.yml b/_shared_utils/shared_utils/gtfs_analytics_data.yml index db0b364d4..a745e0e1e 100644 --- a/_shared_utils/shared_utils/gtfs_analytics_data.yml +++ b/_shared_utils/shared_utils/gtfs_analytics_data.yml @@ -81,6 +81,7 @@ stop_segments: shape_stop_cols: ["shape_array_key", "shape_id", "stop_sequence"] stop_pair_cols: ["stop_pair", "stop_pair_name"] route_dir_cols: ["route_id", "direction_id"] + segment_cols: ["schedule_gtfs_dataset_key", "route_id", "direction_id", "stop_pair", "geometry"] shape_stop_single_segment: "rollup_singleday/speeds_shape_stop_segments" #-- stop after Oct 2024 route_dir_single_segment: "rollup_singleday/speeds_route_dir_segments" route_dir_multi_segment: "rollup_multiday/speeds_route_dir_segments" diff --git a/rt_segment_speeds/scripts/quarter_year_averages.py b/rt_segment_speeds/scripts/quarter_year_averages.py index 7d2d2bd13..afe2ec907 100644 --- a/rt_segment_speeds/scripts/quarter_year_averages.py +++ b/rt_segment_speeds/scripts/quarter_year_averages.py @@ -1,49 +1,68 @@ +""" +Average segment speeds over longer time periods, +a quarter or a year. +""" +import dask.dataframe as dd import geopandas as gpd import pandas as pd from dask import delayed, compute -from update_vars import SEGMENT_GCS, GTFS_DATA_DICT +from calitp_data_analysis import utils from segment_speed_utils import time_series_utils +from update_vars import SEGMENT_GCS, GTFS_DATA_DICT -from average_segment_speeds import concatenate_trip_segment_speeds - -def concatenate_single_day_summaries( - speed_file: str, - analysis_date_list: list, - group_cols: list +def segment_speeds_one_day( + segment_type: str, + analysis_date: list, + segment_cols: list, + org_cols: list ): """ - Concatenate several single day averages - and we'll take the average over that. + Concatenate segment geometry (from rt_segment_speeds) + for all the dates we have + and get it to route-direction-segment grain + """ + speed_file = GTFS_DATA_DICT[segment_type]["route_dir_single_segment"] + segment_file = GTFS_DATA_DICT[segment_type]["segments_file"] + + speeds_df = pd.read_parquet( + f"{SEGMENT_GCS}{speed_file}_{analysis_date}.parquet", + columns = segment_cols + org_cols + [ + "schedule_gtfs_dataset_key", + "p20_mph", "p50_mph", "p80_mph", "n_trips"] + ).assign( + service_date = pd.to_datetime(analysis_date) + ) - If we have 6 dates of segment p20/p50/p80 speeds, - we'll treat each date as an independent entity - and average the p20/p50/p80 speeds over that time period. + segment_gdf = gpd.read_parquet( + f"{SEGMENT_GCS}{segment_file}_{analysis_date}.parquet", + columns = segment_cols + [ + "schedule_gtfs_dataset_key", "geometry"] + ).drop_duplicates().reset_index(drop=True) + + merge_cols = [c for c in speeds_df.columns if c in segment_gdf.columns] - We will not go back to trip segment speeds for each date - and do a weighted average. - In an extreme case, if one date had 1,000 trips and another date - had 100 trips, one date would have 10x the weight of another date, - and here, we just want to see where the p20 speed typically is. - """ - df = time_series_utils.concatenate_datasets_across_dates( - SEGMENT_GCS, - speed_file, - analysis_date_list, - data_type = "df", - columns = group_cols + ["p20_mph", "p50_mph", "p80_mph", "n_trips"], - get_pandas = False + df = pd.merge( + segment_gdf[merge_cols + ["geometry"]].drop_duplicates(), + speeds_df, + on = merge_cols, + how = "inner" ) df = df.assign( year = df.service_date.dt.year, quarter = df.service_date.dt.quarter, ) - + return df + def get_aggregation(df: pd.DataFrame, group_cols: list): + """ + Aggregating across days, take the (mean)p20/p50/p80 speed + and count number of trips across those days. + """ speed_cols = [c for c in df.columns if "_mph" in c] df2 = (df @@ -56,36 +75,83 @@ def get_aggregation(df: pd.DataFrame, group_cols: list): return df2 -if __name__ == "__main__": - - from shared_utils import rt_dates - - group_cols = [ +def average_by_time(date_list: list, time_cols: list): + """ + """ + # These define segments, it's route-dir-stop_pair + segment_stop_cols = [ "route_id", "direction_id", - "stop_pair", "stop_pair_name", + "stop_pair", + ] + + # These are the other columns we need, from speeds, but not in segments + org_cols = [ + "stop_pair_name", "time_period", - 'name', # do not use schedule_gtfs_dataset_key, which can differ over time + "name", 'caltrans_district', 'organization_source_record_id', 'organization_name', 'base64_url' - ] + ] - FILE = GTFS_DATA_DICT["stop_segments"]["route_dir_single_segment"] - - quarter_df = concatenate_single_day_summaries( - FILE, - all_dates, - group_cols - ).pipe(get_aggregation, group_cols + ["year", "quarter"]) - - quarter_df = compute(quarter_df)[0] - quarter_df.to_parquet(f"{SEGMENT_GCS}{FILE}_quarter.parquet") + delayed_dfs = [ + delayed(segment_speeds_one_day)( + "stop_segments", + one_date, + segment_stop_cols, + org_cols + ) for one_date in date_list + ] + + ddf = dd.from_delayed(delayed_dfs) - year_df = concatenate_single_day_summaries( - FILE, - all_dates, - group_cols - ).pipe(get_aggregation, group_cols + ["year"]) + group_cols = [ + c for c in segment_stop_cols + org_cols + if c not in ["schedule_gtfs_dataset_key"] + ] + time_cols + + speed_averages = get_aggregation(ddf, group_cols) + speed_averages = speed_averages.compute() + + segment_geom = ddf[ + ["name", "geometry"] + segment_stop_cols + time_cols + ].drop_duplicates().compute() + + speed_gdf = pd.merge( + segment_geom, + speed_averages, + on = ["name"] + segment_stop_cols + time_cols, + how = "inner" + ) - year_df = compute(year_df)[0] - year_df.to_parquet(f"{SEGMENT_GCS}{FILE}_year.parquet") - \ No newline at end of file + return speed_gdf + + +if __name__ == "__main__": + import datetime + from shared_utils import rt_dates + + segment_type = "stop_segments" + EXPORT = GTFS_DATA_DICT[segment_type]["route_dir_multi_segment"] + all_dates = rt_dates.y2024_dates + rt_dates.y2023_dates + ''' + # quarter averages take x min + speeds_by_quarter = average_by_time(all_dates, ["year", "quarter"]) + + utils.geoparquet_gcs_export( + speeds_by_quarter, + SEGMENT_GCS, + f"{EXPORT}_quarter" + ) + del speeds_by_quarter + ''' + # year averages take 14 min + t0 = datetime.datetime.now() + speeds_by_year = average_by_time(all_dates, ["year"]) + + utils.geoparquet_gcs_export( + speeds_by_year, + SEGMENT_GCS, + f"{EXPORT}_year" + ) + t1 = datetime.datetime.now() + print(f"execution: {t1 - t0}") \ No newline at end of file