From 9cca1e97a1b6ce403e67bb525f333d4e8863f477 Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Tue, 19 Nov 2024 18:01:06 +0000 Subject: [PATCH 1/3] get segment geom with averages --- .../scripts/quarter_year_averages.py | 169 ++++++++++++------ 1 file changed, 117 insertions(+), 52 deletions(-) diff --git a/rt_segment_speeds/scripts/quarter_year_averages.py b/rt_segment_speeds/scripts/quarter_year_averages.py index 7d2d2bd13..c4cf1344e 100644 --- a/rt_segment_speeds/scripts/quarter_year_averages.py +++ b/rt_segment_speeds/scripts/quarter_year_averages.py @@ -1,49 +1,67 @@ +""" +Average segment speeds over longer time periods, +a quarter or a year. +""" +import dask.dataframe as dd import geopandas as gpd import pandas as pd from dask import delayed, compute +from calitp_data_analysis import utils from update_vars import SEGMENT_GCS, GTFS_DATA_DICT -from segment_speed_utils import time_series_utils -from average_segment_speeds import concatenate_trip_segment_speeds - -def concatenate_single_day_summaries( - speed_file: str, - analysis_date_list: list, - group_cols: list +def segment_speeds_one_day( + segment_type: str, + analysis_date: list, + segment_cols: list, + org_cols: list ): """ - Concatenate several single day averages - and we'll take the average over that. + Concatenate segment geometry (from rt_segment_speeds) + for all the dates we have + and get it to route-direction-segment grain + """ + speed_file = GTFS_DATA_DICT[segment_type]["route_dir_single_segment"] + segment_file = GTFS_DATA_DICT[segment_type]["segments_file"] - If we have 6 dates of segment p20/p50/p80 speeds, - we'll treat each date as an independent entity - and average the p20/p50/p80 speeds over that time period. + speeds_df = pd.read_parquet( + f"{SEGMENT_GCS}{speed_file}_{analysis_date}.parquet", + columns = segment_cols + org_cols + [ + "schedule_gtfs_dataset_key", + "p20_mph", "p50_mph", "p80_mph", "n_trips"] + ).assign( + service_date = pd.to_datetime(analysis_date) + ) - We will not go back to trip segment speeds for each date - and do a weighted average. - In an extreme case, if one date had 1,000 trips and another date - had 100 trips, one date would have 10x the weight of another date, - and here, we just want to see where the p20 speed typically is. - """ - df = time_series_utils.concatenate_datasets_across_dates( - SEGMENT_GCS, - speed_file, - analysis_date_list, - data_type = "df", - columns = group_cols + ["p20_mph", "p50_mph", "p80_mph", "n_trips"], - get_pandas = False + segment_gdf = gpd.read_parquet( + f"{SEGMENT_GCS}{segment_file}_{analysis_date}.parquet", + columns = segment_cols + [ + "schedule_gtfs_dataset_key", "geometry"] + ).drop_duplicates().reset_index(drop=True) + + merge_cols = [c for c in speeds_df.columns if c in segment_gdf.columns] + + df = pd.merge( + segment_gdf[merge_cols + ["geometry"]].drop_duplicates(), + speeds_df, + on = merge_cols, + how = "inner" ) df = df.assign( year = df.service_date.dt.year, quarter = df.service_date.dt.quarter, ) - + return df + def get_aggregation(df: pd.DataFrame, group_cols: list): + """ + Aggregating across days, take the (mean)p20/p50/p80 speed + and count number of trips across those days. + """ speed_cols = [c for c in df.columns if "_mph" in c] df2 = (df @@ -56,36 +74,83 @@ def get_aggregation(df: pd.DataFrame, group_cols: list): return df2 -if __name__ == "__main__": - - from shared_utils import rt_dates - - group_cols = [ +def average_by_time(date_list: list, time_cols: list): + """ + """ + # These define segments, it's route-dir-stop_pair + segment_stop_cols = [ "route_id", "direction_id", - "stop_pair", "stop_pair_name", + "stop_pair", + ] + + # These are the other columns we need, from speeds, but not in segments + org_cols = [ + "stop_pair_name", "time_period", - 'name', # do not use schedule_gtfs_dataset_key, which can differ over time + "name", 'caltrans_district', 'organization_source_record_id', 'organization_name', 'base64_url' - ] + ] - FILE = GTFS_DATA_DICT["stop_segments"]["route_dir_single_segment"] - - quarter_df = concatenate_single_day_summaries( - FILE, - all_dates, - group_cols - ).pipe(get_aggregation, group_cols + ["year", "quarter"]) - - quarter_df = compute(quarter_df)[0] - quarter_df.to_parquet(f"{SEGMENT_GCS}{FILE}_quarter.parquet") + delayed_dfs = [ + delayed(segment_speeds_one_day)( + "stop_segments", + one_date, + segment_stop_cols, + org_cols + ) for one_date in date_list + ] + + ddf = dd.from_delayed(delayed_dfs) + + group_cols = [ + c for c in segment_stop_cols + org_cols + if c not in ["schedule_gtfs_dataset_key"] + ] + time_cols + + speed_averages = get_aggregation(ddf, group_cols) + speed_averages = speed_averages.compute() - year_df = concatenate_single_day_summaries( - FILE, - all_dates, - group_cols - ).pipe(get_aggregation, group_cols + ["year"]) + segment_geom = ddf[ + ["name", "geometry"] + segment_stop_cols + time_cols + ].drop_duplicates().compute() - year_df = compute(year_df)[0] - year_df.to_parquet(f"{SEGMENT_GCS}{FILE}_year.parquet") - \ No newline at end of file + speed_gdf = pd.merge( + segment_geom, + speed_averages, + on = ["name"] + segment_stop_cols + time_cols, + how = "inner" + ) + + return speed_gdf + + +if __name__ == "__main__": + import datetime + from shared_utils import rt_dates + + segment_type = "stop_segments" + EXPORT = GTFS_DATA_DICT[segment_type]["route_dir_multi_segment"] + all_dates = rt_dates.y2024_dates + rt_dates.y2023_dates + ''' + # quarter averages take x min + speeds_by_quarter = average_by_time(all_dates, ["year", "quarter"]) + + utils.geoparquet_gcs_export( + speeds_by_quarter, + SEGMENT_GCS, + f"{EXPORT}_quarter" + ) + del speeds_by_quarter + ''' + # year averages take 14 min + t0 = datetime.datetime.now() + speeds_by_year = average_by_time(all_dates, ["year"]) + + utils.geoparquet_gcs_export( + speeds_by_year, + SEGMENT_GCS, + f"{EXPORT}_year" + ) + t1 = datetime.datetime.now() + print(f"execution: {t1 - t0}") \ No newline at end of file From db5be6ff9006df8cac79be0b29f7ca53630bd7d3 Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Tue, 19 Nov 2024 19:46:49 +0000 Subject: [PATCH 2/3] break out segment geom for time-series --- .../scripts/quarter_year_averages.py | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/rt_segment_speeds/scripts/quarter_year_averages.py b/rt_segment_speeds/scripts/quarter_year_averages.py index c4cf1344e..d08b0fff1 100644 --- a/rt_segment_speeds/scripts/quarter_year_averages.py +++ b/rt_segment_speeds/scripts/quarter_year_averages.py @@ -9,6 +9,7 @@ from dask import delayed, compute from calitp_data_analysis import utils +from segment_speed_utils import time_series_utils from update_vars import SEGMENT_GCS, GTFS_DATA_DICT def segment_speeds_one_day( @@ -57,6 +58,57 @@ def segment_speeds_one_day( return df +def segment_geom_time_series( + segment_type: str, + analysis_date_list: list +): + """ + One challenge with pulling segment geometry + over a longer period is that we can get duplicates. + Segment geom uses schedule_gtfs_dataset_key, + which over a long enough period, can also change. + + We should come up with a way to get rid of dupes, + while also coming up with a way to merge this back onto + segment speed averages. + """ + speed_file = GTFS_DATA_DICT[segment_type]["route_dir_single_segment"] + segment_file = GTFS_DATA_DICT[segment_type]["segments_file"] + + operator_df = time_series_utils.concatenate_datasets_across_dates( + SEGMENT_GCS, + speed_file, + analysis_date_list, + data_type = "df", + columns = ["schedule_gtfs_dataset_key", "name", "organization_name"], + get_pandas = True + ).drop_duplicates() + + segment_gdf = time_series_utils.concatenate_datasets_across_dates( + SEGMENT_GCS, + segment_file, + analysis_date_list, + data_type = "gdf", + get_pandas = False + ) + + gdf = delayed(pd.merge)( + segment_gdf, + operator_df, + on = ["schedule_gtfs_dataset_key", "service_date"], + how = "inner" + ).sort_values( + by=["name", "service_date"], ascending=[True, False] + ).drop( + columns = ["schedule_gtfs_dataset_key", "service_date"] + ).drop_duplicates().reset_index(drop=True) + # this is dropping dupes with gtfs_dataset_name and organization_name + + gdf = compute(gdf)[0] + + return gdf + + def get_aggregation(df: pd.DataFrame, group_cols: list): """ Aggregating across days, take the (mean)p20/p50/p80 speed From dad53acff08b7f4086752883257d3aa52b8b7c09 Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Wed, 20 Nov 2024 01:25:14 +0000 Subject: [PATCH 3/3] test year averages with segment geom --- .../shared_utils/gtfs_analytics_data.yml | 1 + .../scripts/quarter_year_averages.py | 51 ------------------- 2 files changed, 1 insertion(+), 51 deletions(-) diff --git a/_shared_utils/shared_utils/gtfs_analytics_data.yml b/_shared_utils/shared_utils/gtfs_analytics_data.yml index db0b364d4..a745e0e1e 100644 --- a/_shared_utils/shared_utils/gtfs_analytics_data.yml +++ b/_shared_utils/shared_utils/gtfs_analytics_data.yml @@ -81,6 +81,7 @@ stop_segments: shape_stop_cols: ["shape_array_key", "shape_id", "stop_sequence"] stop_pair_cols: ["stop_pair", "stop_pair_name"] route_dir_cols: ["route_id", "direction_id"] + segment_cols: ["schedule_gtfs_dataset_key", "route_id", "direction_id", "stop_pair", "geometry"] shape_stop_single_segment: "rollup_singleday/speeds_shape_stop_segments" #-- stop after Oct 2024 route_dir_single_segment: "rollup_singleday/speeds_route_dir_segments" route_dir_multi_segment: "rollup_multiday/speeds_route_dir_segments" diff --git a/rt_segment_speeds/scripts/quarter_year_averages.py b/rt_segment_speeds/scripts/quarter_year_averages.py index d08b0fff1..afe2ec907 100644 --- a/rt_segment_speeds/scripts/quarter_year_averages.py +++ b/rt_segment_speeds/scripts/quarter_year_averages.py @@ -58,57 +58,6 @@ def segment_speeds_one_day( return df -def segment_geom_time_series( - segment_type: str, - analysis_date_list: list -): - """ - One challenge with pulling segment geometry - over a longer period is that we can get duplicates. - Segment geom uses schedule_gtfs_dataset_key, - which over a long enough period, can also change. - - We should come up with a way to get rid of dupes, - while also coming up with a way to merge this back onto - segment speed averages. - """ - speed_file = GTFS_DATA_DICT[segment_type]["route_dir_single_segment"] - segment_file = GTFS_DATA_DICT[segment_type]["segments_file"] - - operator_df = time_series_utils.concatenate_datasets_across_dates( - SEGMENT_GCS, - speed_file, - analysis_date_list, - data_type = "df", - columns = ["schedule_gtfs_dataset_key", "name", "organization_name"], - get_pandas = True - ).drop_duplicates() - - segment_gdf = time_series_utils.concatenate_datasets_across_dates( - SEGMENT_GCS, - segment_file, - analysis_date_list, - data_type = "gdf", - get_pandas = False - ) - - gdf = delayed(pd.merge)( - segment_gdf, - operator_df, - on = ["schedule_gtfs_dataset_key", "service_date"], - how = "inner" - ).sort_values( - by=["name", "service_date"], ascending=[True, False] - ).drop( - columns = ["schedule_gtfs_dataset_key", "service_date"] - ).drop_duplicates().reset_index(drop=True) - # this is dropping dupes with gtfs_dataset_name and organization_name - - gdf = compute(gdf)[0] - - return gdf - - def get_aggregation(df: pd.DataFrame, group_cols: list): """ Aggregating across days, take the (mean)p20/p50/p80 speed