Skip to content

Commit

Permalink
refactor: to use dask_from_map
Browse files Browse the repository at this point in the history
  • Loading branch information
tiffanychu90 committed Dec 3, 2024
1 parent 16fff25 commit c6f6dc7
Showing 1 changed file with 153 additions and 94 deletions.
247 changes: 153 additions & 94 deletions rt_segment_speeds/scripts/quarter_year_averages.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,62 +3,20 @@
a quarter or a year.
"""
import dask.dataframe as dd
import dask_geopandas as dg
import datetime
import geopandas as gpd
import pandas as pd

from dask import delayed, compute
from typing import Literal

from calitp_data_analysis import utils
from segment_speed_utils import time_series_utils
from shared_utils import dask_utils, rt_dates, time_helpers
from segment_speed_utils import helpers
from update_vars import SEGMENT_GCS, GTFS_DATA_DICT

def segment_speeds_one_day(
segment_type: str,
analysis_date: list,
segment_cols: list,
org_cols: list
):
"""
Concatenate segment geometry (from rt_segment_speeds)
for all the dates we have
and get it to route-direction-segment grain
"""
speed_file = GTFS_DATA_DICT[segment_type]["route_dir_single_segment"]
segment_file = GTFS_DATA_DICT[segment_type]["segments_file"]

speeds_df = pd.read_parquet(
f"{SEGMENT_GCS}{speed_file}_{analysis_date}.parquet",
columns = segment_cols + org_cols + [
"schedule_gtfs_dataset_key",
"p20_mph", "p50_mph", "p80_mph", "n_trips"]
).assign(
service_date = pd.to_datetime(analysis_date)
)

segment_gdf = gpd.read_parquet(
f"{SEGMENT_GCS}{segment_file}_{analysis_date}.parquet",
columns = segment_cols + [
"schedule_gtfs_dataset_key", "geometry"]
).drop_duplicates().reset_index(drop=True)

merge_cols = [c for c in speeds_df.columns if c in segment_gdf.columns]

df = pd.merge(
segment_gdf[merge_cols + ["geometry"]].drop_duplicates(),
speeds_df,
on = merge_cols,
how = "inner"
)

df = df.assign(
year = df.service_date.dt.year,
quarter = df.service_date.dt.quarter,
)

return df


def get_aggregation(df: pd.DataFrame, group_cols: list):
def get_aggregation(df: pd.DataFrame, group_cols: list) -> pd.DataFrame:
"""
Aggregating across days, take the (mean)p20/p50/p80 speed
and count number of trips across those days.
Expand All @@ -75,83 +33,184 @@ def get_aggregation(df: pd.DataFrame, group_cols: list):

return df2

def average_by_time(date_list: list, time_cols: list):

def get_segment_speed_ddf(
segment_type: str,
analysis_date_list: list,
time_cols: list
) -> dd.DataFrame:
"""
Import segment speeds (p20/p50/p80) speeds
across a list of dates.
Prep the df for aggregation by year-quarter or year.
"""
# These define segments, it's route-dir-stop_pair
segment_stop_cols = [
"route_id", "direction_id",
"stop_pair",
]
speed_file = GTFS_DATA_DICT[segment_type]["route_dir_single_segment"]
paths = [f"{SEGMENT_GCS}{speed_file}" for _ in analysis_date_list]

SEGMENT_COLS = [*GTFS_DATA_DICT[segment_type]["segment_cols"]]
SEGMENT_COLS = [i for i in SEGMENT_COLS if i != "geometry"]

# These are the other columns we need, from speeds, but not in segments
org_cols = [
"stop_pair_name",
"time_period",
"name",
'caltrans_district', 'organization_source_record_id',
'organization_name', 'base64_url'
OPERATOR_COLS = [
"name",
"caltrans_district",
# TODO: in future, moving how we choose organization
# to another approach, so let's not even include it here
]

delayed_dfs = [
delayed(segment_speeds_one_day)(
"stop_segments",
one_date,
segment_stop_cols,
org_cols
) for one_date in date_list
]
group_cols = helpers.unique_list(OPERATOR_COLS + SEGMENT_COLS + ["stop_pair_name"])

ddf = dd.from_delayed(delayed_dfs)
ddf = dask_utils.get_ddf(
paths,
analysis_date_list,
data_type = "df",
add_date = True,
columns = group_cols + [
"p20_mph", "p50_mph", "p80_mph", "n_trips",
"time_period"
]
)

group_cols = [
c for c in segment_stop_cols + org_cols
if c not in ["schedule_gtfs_dataset_key"]
] + time_cols
ddf = time_helpers.add_quarter(ddf).sort_values(
"year_quarter", ascending=False)

speed_averages = get_aggregation(ddf, group_cols)
speed_averages = speed_averages.compute()
ddf2 = get_aggregation(
ddf,
group_cols + time_cols
)

segment_geom = ddf[
["name", "geometry"] + segment_stop_cols + time_cols
].drop_duplicates().compute()
return ddf2


def get_segment_geom_gddf(
segment_type: str,
analysis_date_list: list,
time_cols: list
) -> dg.GeoDataFrame:
"""
Import segment geometry gdf
across a list of dates.
Dedupe the df for aggregation by year-quarter or year.
"""
segment_file = GTFS_DATA_DICT[segment_type]["segments_file"]
segment_cols = [*GTFS_DATA_DICT[segment_type]["segment_cols"]]
paths = [f"{SEGMENT_GCS}{segment_file}" for _ in analysis_date_list]

# From the speed file, we want schedule_gtfs_dataset_key and name
# so we can dedupe by name over a longer time period
# name may not change, but key can change
speed_file = GTFS_DATA_DICT[segment_type]["route_dir_single_segment"]
operator_paths = [f"{SEGMENT_GCS}{speed_file}" for _ in analysis_date_list]

operator_ddf = dask_utils.get_ddf(
operator_paths,
analysis_date_list,
data_type = "df",
add_date = True,
columns = ["schedule_gtfs_dataset_key", "name"]
)

segment_gddf = dask_utils.get_ddf(
paths,
analysis_date_list,
data_type = "gdf",
add_date = True,
columns = ["schedule_gtfs_dataset_key"] + segment_cols
)

gddf = dd.merge(
segment_gddf,
operator_ddf,
on = ["schedule_gtfs_dataset_key", "service_date"],
how = "inner"
)

gddf = time_helpers.add_quarter(gddf).sort_values(
"year_quarter", ascending=False)

gddf2 = gddf[
["name"] + segment_cols + time_cols
].drop_duplicates()

return gddf2


def average_segment_speeds_by_time_grain(
segment_type: str,
analysis_date_list: list,
time_grain: Literal["quarter", "year"]
) -> gpd.GeoDataFrame:
"""
Average segment speeds (tabular) by time grain
and attach the geometry for segment that's been deduped at
that time grain.
"""
if time_grain == "quarter":
time_cols = ["year", "quarter"]
elif time_grain == "year":
time_cols = ["year"]

SEGMENT_COLS = [*GTFS_DATA_DICT[segment_type]["segment_cols"]]
SEGMENT_COLS = [c for c in SEGMENT_COLS if c != "geometry"]

speeds = get_segment_speed_ddf(
segment_type, analysis_date_list, time_cols
).compute()

segment_geom = get_segment_geom_gddf(
segment_type, analysis_date_list, time_cols
).compute()

speed_gdf = pd.merge(
segment_geom,
speed_averages,
on = ["name"] + segment_stop_cols + time_cols,
speeds,
on = ["name"] + SEGMENT_COLS + time_cols,
how = "inner"
)

return speed_gdf


if __name__ == "__main__":
import datetime
from shared_utils import rt_dates

start = datetime.datetime.now()

segment_type = "stop_segments"
EXPORT = GTFS_DATA_DICT[segment_type]["route_dir_multi_segment"]
QUARTER_EXPORT = GTFS_DATA_DICT[segment_type]["route_dir_quarter_segment"]
YEAR_EXPORT = GTFS_DATA_DICT[segment_type]["route_dir_year_segment"]

all_dates = rt_dates.y2024_dates + rt_dates.y2023_dates
'''
# quarter averages take x min
speeds_by_quarter = average_by_time(all_dates, ["year", "quarter"])

# quarter averages take 11 min
speeds_by_quarter = average_segment_speeds_by_time_grain(
segment_type,
all_dates,
time_grain = "quarter"
)

utils.geoparquet_gcs_export(
speeds_by_quarter,
SEGMENT_GCS,
f"{EXPORT}_quarter"
QUARTER_EXPORT
)
del speeds_by_quarter
'''
# year averages take 14 min
t0 = datetime.datetime.now()
speeds_by_year = average_by_time(all_dates, ["year"])

time1 = datetime.datetime.now()
print(f"quarter averages: {time1 - start}")

# year averages take 10 min
speeds_by_year = average_segment_speeds_by_time_grain(
segment_type,
all_dates,
time_grain = "year"
)

utils.geoparquet_gcs_export(
speeds_by_year,
SEGMENT_GCS,
f"{EXPORT}_year"
YEAR_EXPORT
)
t1 = datetime.datetime.now()
print(f"execution: {t1 - t0}")

end = datetime.datetime.now()
print(f"year averages: {end - time1}")
print(f"execution time: {end - start}")

0 comments on commit c6f6dc7

Please sign in to comment.