From c6f6dc7e6fc6ec0b6fc47129401ea5162000a4e9 Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Tue, 26 Nov 2024 21:41:14 +0000 Subject: [PATCH] refactor: to use dask_from_map --- .../scripts/quarter_year_averages.py | 247 +++++++++++------- 1 file changed, 153 insertions(+), 94 deletions(-) diff --git a/rt_segment_speeds/scripts/quarter_year_averages.py b/rt_segment_speeds/scripts/quarter_year_averages.py index afe2ec907..f7392275c 100644 --- a/rt_segment_speeds/scripts/quarter_year_averages.py +++ b/rt_segment_speeds/scripts/quarter_year_averages.py @@ -3,62 +3,20 @@ a quarter or a year. """ import dask.dataframe as dd +import dask_geopandas as dg +import datetime import geopandas as gpd import pandas as pd -from dask import delayed, compute +from typing import Literal from calitp_data_analysis import utils -from segment_speed_utils import time_series_utils +from shared_utils import dask_utils, rt_dates, time_helpers +from segment_speed_utils import helpers from update_vars import SEGMENT_GCS, GTFS_DATA_DICT -def segment_speeds_one_day( - segment_type: str, - analysis_date: list, - segment_cols: list, - org_cols: list -): - """ - Concatenate segment geometry (from rt_segment_speeds) - for all the dates we have - and get it to route-direction-segment grain - """ - speed_file = GTFS_DATA_DICT[segment_type]["route_dir_single_segment"] - segment_file = GTFS_DATA_DICT[segment_type]["segments_file"] - - speeds_df = pd.read_parquet( - f"{SEGMENT_GCS}{speed_file}_{analysis_date}.parquet", - columns = segment_cols + org_cols + [ - "schedule_gtfs_dataset_key", - "p20_mph", "p50_mph", "p80_mph", "n_trips"] - ).assign( - service_date = pd.to_datetime(analysis_date) - ) - - segment_gdf = gpd.read_parquet( - f"{SEGMENT_GCS}{segment_file}_{analysis_date}.parquet", - columns = segment_cols + [ - "schedule_gtfs_dataset_key", "geometry"] - ).drop_duplicates().reset_index(drop=True) - - merge_cols = [c for c in speeds_df.columns if c in segment_gdf.columns] - - df = pd.merge( - segment_gdf[merge_cols + ["geometry"]].drop_duplicates(), - speeds_df, - on = merge_cols, - how = "inner" - ) - - df = df.assign( - year = df.service_date.dt.year, - quarter = df.service_date.dt.quarter, - ) - - return df - -def get_aggregation(df: pd.DataFrame, group_cols: list): +def get_aggregation(df: pd.DataFrame, group_cols: list) -> pd.DataFrame: """ Aggregating across days, take the (mean)p20/p50/p80 speed and count number of trips across those days. @@ -75,51 +33,137 @@ def get_aggregation(df: pd.DataFrame, group_cols: list): return df2 -def average_by_time(date_list: list, time_cols: list): + +def get_segment_speed_ddf( + segment_type: str, + analysis_date_list: list, + time_cols: list +) -> dd.DataFrame: """ + Import segment speeds (p20/p50/p80) speeds + across a list of dates. + Prep the df for aggregation by year-quarter or year. """ - # These define segments, it's route-dir-stop_pair - segment_stop_cols = [ - "route_id", "direction_id", - "stop_pair", - ] + speed_file = GTFS_DATA_DICT[segment_type]["route_dir_single_segment"] + paths = [f"{SEGMENT_GCS}{speed_file}" for _ in analysis_date_list] + + SEGMENT_COLS = [*GTFS_DATA_DICT[segment_type]["segment_cols"]] + SEGMENT_COLS = [i for i in SEGMENT_COLS if i != "geometry"] - # These are the other columns we need, from speeds, but not in segments - org_cols = [ - "stop_pair_name", - "time_period", - "name", - 'caltrans_district', 'organization_source_record_id', - 'organization_name', 'base64_url' + OPERATOR_COLS = [ + "name", + "caltrans_district", + # TODO: in future, moving how we choose organization + # to another approach, so let's not even include it here ] - delayed_dfs = [ - delayed(segment_speeds_one_day)( - "stop_segments", - one_date, - segment_stop_cols, - org_cols - ) for one_date in date_list - ] + group_cols = helpers.unique_list(OPERATOR_COLS + SEGMENT_COLS + ["stop_pair_name"]) - ddf = dd.from_delayed(delayed_dfs) + ddf = dask_utils.get_ddf( + paths, + analysis_date_list, + data_type = "df", + add_date = True, + columns = group_cols + [ + "p20_mph", "p50_mph", "p80_mph", "n_trips", + "time_period" + ] + ) - group_cols = [ - c for c in segment_stop_cols + org_cols - if c not in ["schedule_gtfs_dataset_key"] - ] + time_cols + ddf = time_helpers.add_quarter(ddf).sort_values( + "year_quarter", ascending=False) - speed_averages = get_aggregation(ddf, group_cols) - speed_averages = speed_averages.compute() + ddf2 = get_aggregation( + ddf, + group_cols + time_cols + ) - segment_geom = ddf[ - ["name", "geometry"] + segment_stop_cols + time_cols - ].drop_duplicates().compute() + return ddf2 + + +def get_segment_geom_gddf( + segment_type: str, + analysis_date_list: list, + time_cols: list +) -> dg.GeoDataFrame: + """ + Import segment geometry gdf + across a list of dates. + Dedupe the df for aggregation by year-quarter or year. + """ + segment_file = GTFS_DATA_DICT[segment_type]["segments_file"] + segment_cols = [*GTFS_DATA_DICT[segment_type]["segment_cols"]] + paths = [f"{SEGMENT_GCS}{segment_file}" for _ in analysis_date_list] + + # From the speed file, we want schedule_gtfs_dataset_key and name + # so we can dedupe by name over a longer time period + # name may not change, but key can change + speed_file = GTFS_DATA_DICT[segment_type]["route_dir_single_segment"] + operator_paths = [f"{SEGMENT_GCS}{speed_file}" for _ in analysis_date_list] + + operator_ddf = dask_utils.get_ddf( + operator_paths, + analysis_date_list, + data_type = "df", + add_date = True, + columns = ["schedule_gtfs_dataset_key", "name"] + ) + + segment_gddf = dask_utils.get_ddf( + paths, + analysis_date_list, + data_type = "gdf", + add_date = True, + columns = ["schedule_gtfs_dataset_key"] + segment_cols + ) + + gddf = dd.merge( + segment_gddf, + operator_ddf, + on = ["schedule_gtfs_dataset_key", "service_date"], + how = "inner" + ) + + gddf = time_helpers.add_quarter(gddf).sort_values( + "year_quarter", ascending=False) + + gddf2 = gddf[ + ["name"] + segment_cols + time_cols + ].drop_duplicates() + + return gddf2 + + +def average_segment_speeds_by_time_grain( + segment_type: str, + analysis_date_list: list, + time_grain: Literal["quarter", "year"] +) -> gpd.GeoDataFrame: + """ + Average segment speeds (tabular) by time grain + and attach the geometry for segment that's been deduped at + that time grain. + """ + if time_grain == "quarter": + time_cols = ["year", "quarter"] + elif time_grain == "year": + time_cols = ["year"] + + SEGMENT_COLS = [*GTFS_DATA_DICT[segment_type]["segment_cols"]] + SEGMENT_COLS = [c for c in SEGMENT_COLS if c != "geometry"] + speeds = get_segment_speed_ddf( + segment_type, analysis_date_list, time_cols + ).compute() + + segment_geom = get_segment_geom_gddf( + segment_type, analysis_date_list, time_cols + ).compute() + speed_gdf = pd.merge( segment_geom, - speed_averages, - on = ["name"] + segment_stop_cols + time_cols, + speeds, + on = ["name"] + SEGMENT_COLS + time_cols, how = "inner" ) @@ -127,31 +171,46 @@ def average_by_time(date_list: list, time_cols: list): if __name__ == "__main__": - import datetime from shared_utils import rt_dates + start = datetime.datetime.now() + segment_type = "stop_segments" - EXPORT = GTFS_DATA_DICT[segment_type]["route_dir_multi_segment"] + QUARTER_EXPORT = GTFS_DATA_DICT[segment_type]["route_dir_quarter_segment"] + YEAR_EXPORT = GTFS_DATA_DICT[segment_type]["route_dir_year_segment"] + all_dates = rt_dates.y2024_dates + rt_dates.y2023_dates - ''' - # quarter averages take x min - speeds_by_quarter = average_by_time(all_dates, ["year", "quarter"]) + + # quarter averages take 11 min + speeds_by_quarter = average_segment_speeds_by_time_grain( + segment_type, + all_dates, + time_grain = "quarter" + ) utils.geoparquet_gcs_export( speeds_by_quarter, SEGMENT_GCS, - f"{EXPORT}_quarter" + QUARTER_EXPORT ) del speeds_by_quarter - ''' - # year averages take 14 min - t0 = datetime.datetime.now() - speeds_by_year = average_by_time(all_dates, ["year"]) + + time1 = datetime.datetime.now() + print(f"quarter averages: {time1 - start}") + + # year averages take 10 min + speeds_by_year = average_segment_speeds_by_time_grain( + segment_type, + all_dates, + time_grain = "year" + ) utils.geoparquet_gcs_export( speeds_by_year, SEGMENT_GCS, - f"{EXPORT}_year" + YEAR_EXPORT ) - t1 = datetime.datetime.now() - print(f"execution: {t1 - t0}") \ No newline at end of file + + end = datetime.datetime.now() + print(f"year averages: {end - time1}") + print(f"execution time: {end - start}") \ No newline at end of file