Skip to content

Commit

Permalink
(dask_utils): use from_map
Browse files Browse the repository at this point in the history
  • Loading branch information
tiffanychu90 committed Dec 3, 2024
1 parent dfc33a6 commit 16fff25
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 5 deletions.
16 changes: 13 additions & 3 deletions _shared_utils/shared_utils/dask_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from calitp_data_analysis import utils
from dask import compute, delayed
from dask.delayed import Delayed # type hint
from shared_utils import time_helpers

fs = gcsfs.GCSFileSystem()

Expand Down Expand Up @@ -108,10 +109,11 @@ def concatenate_list_of_files(
return full_df


def func(
def import_df_func(
path: str,
one_date: str,
data_type: Literal["df", "gdf"] = "df",
add_date: bool = False,
**kwargs,
):
"""
Expand All @@ -134,10 +136,13 @@ def func(
**kwargs,
).drop_duplicates()

if add_date:
df = time_helpers.add_service_date(df, one_date)

return df


def get_ddf(paths, date_list, data_type, **kwargs):
def get_ddf(paths, date_list, data_type, get_pandas: bool = False, **kwargs):
"""
Set up function with little modifications based on
the dask docs. Modifications are that we want to read in
Expand All @@ -146,4 +151,9 @@ def get_ddf(paths, date_list, data_type, **kwargs):
https://docs.dask.org/en/latest/generated/dask.dataframe.from_map.html
https://blog.dask.org/2023/04/12/from-map
"""
return dd.from_map(func, paths, date_list, data_type=data_type, **kwargs).drop_duplicates()
ddf = dd.from_map(import_df_func, paths, date_list, data_type=data_type, **kwargs).drop_duplicates()

if get_pandas:
ddf = ddf.compute()

return ddf
4 changes: 3 additions & 1 deletion _shared_utils/shared_utils/gtfs_analytics_data.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,15 @@ stop_segments:
shape_stop_cols: ["shape_array_key", "shape_id", "stop_sequence"]
stop_pair_cols: ["stop_pair", "stop_pair_name"]
route_dir_cols: ["route_id", "direction_id"]
segment_cols: ["schedule_gtfs_dataset_key", "route_id", "direction_id", "stop_pair", "geometry"]
segment_cols: ["route_id", "direction_id", "stop_pair", "geometry"]
shape_stop_single_segment: "rollup_singleday/speeds_shape_stop_segments" #-- stop after Oct 2024
route_dir_single_segment: "rollup_singleday/speeds_route_dir_segments"
route_dir_single_segment_detail: "rollup_singleday/speeds_route_dir_segments_detail" # interim for speedmaps
route_dir_multi_segment: "rollup_multiday/speeds_route_dir_segments"
segments_file: "segment_options/shape_stop_segments"
max_speed: ${speed_vars.max_speed}
route_dir_quarter_segment: "rollup_multiday/quarter_speeds_route_dir_segments"
route_dir_year_segment: "rollup_multiday/year_speeds_route_dir_segments"

rt_stop_times:
dir: ${gcs_paths.SEGMENT_GCS}
Expand Down
8 changes: 7 additions & 1 deletion _shared_utils/shared_utils/time_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,11 @@ def add_quarter(df: pd.DataFrame, date_col: str = "service_date") -> pd.DataFram
Parse a date column for the year, quarter it is in.
Pipe this function when we want to use dask_utils.
"""
df = df.assign(year=df[date_col].dt.year, quarter=df[date_col].dt.quarter)
df = df.assign(
year=df[date_col].dt.year,
quarter=df[date_col].dt.quarter,
)

df = df.assign(year_quarter=df.year.astype(str) + "_Q" + df.quarter.astype(str))

return df

0 comments on commit 16fff25

Please sign in to comment.