diff --git a/_shared_utils/shared_utils/dask_utils.py b/_shared_utils/shared_utils/dask_utils.py index 3a719dbd8..3f5493c56 100644 --- a/_shared_utils/shared_utils/dask_utils.py +++ b/_shared_utils/shared_utils/dask_utils.py @@ -11,6 +11,7 @@ from calitp_data_analysis import utils from dask import compute, delayed from dask.delayed import Delayed # type hint +from shared_utils import time_helpers fs = gcsfs.GCSFileSystem() @@ -108,10 +109,11 @@ def concatenate_list_of_files( return full_df -def func( +def import_df_func( path: str, one_date: str, data_type: Literal["df", "gdf"] = "df", + add_date: bool = False, **kwargs, ): """ @@ -134,10 +136,13 @@ def func( **kwargs, ).drop_duplicates() + if add_date: + df = time_helpers.add_service_date(df, one_date) + return df -def get_ddf(paths, date_list, data_type, **kwargs): +def get_ddf(paths, date_list, data_type, get_pandas: bool = False, **kwargs): """ Set up function with little modifications based on the dask docs. Modifications are that we want to read in @@ -146,4 +151,9 @@ def get_ddf(paths, date_list, data_type, **kwargs): https://docs.dask.org/en/latest/generated/dask.dataframe.from_map.html https://blog.dask.org/2023/04/12/from-map """ - return dd.from_map(func, paths, date_list, data_type=data_type, **kwargs).drop_duplicates() + ddf = dd.from_map(import_df_func, paths, date_list, data_type=data_type, **kwargs).drop_duplicates() + + if get_pandas: + ddf = ddf.compute() + + return ddf diff --git a/_shared_utils/shared_utils/gtfs_analytics_data.yml b/_shared_utils/shared_utils/gtfs_analytics_data.yml index 08a2124c9..daf8a6561 100644 --- a/_shared_utils/shared_utils/gtfs_analytics_data.yml +++ b/_shared_utils/shared_utils/gtfs_analytics_data.yml @@ -81,13 +81,15 @@ stop_segments: shape_stop_cols: ["shape_array_key", "shape_id", "stop_sequence"] stop_pair_cols: ["stop_pair", "stop_pair_name"] route_dir_cols: ["route_id", "direction_id"] - segment_cols: ["schedule_gtfs_dataset_key", "route_id", "direction_id", "stop_pair", "geometry"] + segment_cols: ["route_id", "direction_id", "stop_pair", "geometry"] shape_stop_single_segment: "rollup_singleday/speeds_shape_stop_segments" #-- stop after Oct 2024 route_dir_single_segment: "rollup_singleday/speeds_route_dir_segments" route_dir_single_segment_detail: "rollup_singleday/speeds_route_dir_segments_detail" # interim for speedmaps route_dir_multi_segment: "rollup_multiday/speeds_route_dir_segments" segments_file: "segment_options/shape_stop_segments" max_speed: ${speed_vars.max_speed} + route_dir_quarter_segment: "rollup_multiday/quarter_speeds_route_dir_segments" + route_dir_year_segment: "rollup_multiday/year_speeds_route_dir_segments" rt_stop_times: dir: ${gcs_paths.SEGMENT_GCS} diff --git a/_shared_utils/shared_utils/time_helpers.py b/_shared_utils/shared_utils/time_helpers.py index c5ab23972..c408e3efe 100644 --- a/_shared_utils/shared_utils/time_helpers.py +++ b/_shared_utils/shared_utils/time_helpers.py @@ -92,5 +92,11 @@ def add_quarter(df: pd.DataFrame, date_col: str = "service_date") -> pd.DataFram Parse a date column for the year, quarter it is in. Pipe this function when we want to use dask_utils. """ - df = df.assign(year=df[date_col].dt.year, quarter=df[date_col].dt.quarter) + df = df.assign( + year=df[date_col].dt.year, + quarter=df[date_col].dt.quarter, + ) + + df = df.assign(year_quarter=df.year.astype(str) + "_Q" + df.quarter.astype(str)) + return df