From 2ac008c06fb481683e82facdde62db415eddc88a Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Thu, 26 Dec 2024 20:41:13 +0000 Subject: [PATCH 01/11] refactor stop_times_with_direction, simplify functions --- gtfs_funnel/stop_times_with_direction.py | 210 +++++++++++------------ 1 file changed, 96 insertions(+), 114 deletions(-) diff --git a/gtfs_funnel/stop_times_with_direction.py b/gtfs_funnel/stop_times_with_direction.py index 284651297..4ff0383c4 100644 --- a/gtfs_funnel/stop_times_with_direction.py +++ b/gtfs_funnel/stop_times_with_direction.py @@ -6,6 +6,9 @@ import geopandas as gpd import numpy as np import pandas as pd +import sys + +from loguru import logger from calitp_data_analysis import utils from shared_utils import rt_utils @@ -56,109 +59,114 @@ def prep_scheduled_stop_times(analysis_date: str) -> gpd.GeoDataFrame: ).drop(columns = ["trip_id"]) st_with_stop = gpd.GeoDataFrame( - st_with_stop, geometry = "geometry", crs = PROJECT_CRS) + st_with_stop, geometry = "geometry", crs = PROJECT_CRS + ) return st_with_stop def get_projected_stop_meters( - stop_times: pd.DataFrame, - shapes: gpd.GeoDataFrame -) -> pd.DataFrame: + stop_times: gpd.GeoDataFrame, + analysis_date: str, +) -> pd.Series: """ Project the stop's position to the shape and get stop_meters (meters from start of the shape). + Only return stop_meters as pd.Series to use as a column later. """ + shapes = helpers.import_scheduled_shapes( + analysis_date, + columns = ["shape_array_key", "geometry"], + crs = PROJECT_CRS, + get_pandas=True + ).dropna(subset="geometry") + gdf = pd.merge( - stop_times, - shapes.rename(columns = {"geometry": "shape_geometry"}), + stop_times.to_crs(PROJECT_CRS), + shapes.to_crs(PROJECT_CRS).rename(columns = {"geometry": "shape_geometry"}), on = "shape_array_key", how = "inner" - ) + ).set_geometry("geometry") - gdf = gdf.assign( - stop_meters = gdf.shape_geometry.project(gdf.geometry) - ).drop(columns = "shape_geometry").drop_duplicates() - - return gdf + stop_meters = gdf.shape_geometry.project(gdf.geometry) + + return stop_meters -def find_prior_subseq_stop( - stop_times: gpd.GeoDataFrame, - trip_stop_cols: list +def find_prior_subseq_stop_info( + stop_times: gpd.GeoDataFrame, + analysis_date: str, + trip_cols: list = ["trip_instance_key"], + trip_stop_cols: list = ["trip_instance_key", "stop_sequence"] ) -> gpd.GeoDataFrame: """ For trip-stop, find the previous stop (using stop sequence). Attach the previous stop's geometry. This will determine the direction for the stop (it's from prior stop). Add in subseq stop information too. + + Create columns related to comparing current to prior stop. + - stop_pair (stop_id1_stop_id2) + - stop_pair_name (stop_name1__stop_name2) """ - prior_stop = stop_times[trip_stop_cols].sort_values( - trip_stop_cols).reset_index(drop=True) - - prior_stop = prior_stop.assign( - prior_stop_sequence = (prior_stop.groupby("trip_instance_key") + stop_meters = get_projected_stop_meters(stop_times, analysis_date) + + gdf = stop_times[ + trip_stop_cols + ["stop_id", "stop_name", "geometry"] + ].assign( + stop_meters = stop_meters + ) + + gdf = gdf.assign( + prior_geometry = (gdf.groupby(trip_cols) + .geometry + .shift(1)), + prior_stop_sequence = (gdf.groupby(trip_cols) .stop_sequence .shift(1)), # add subseq stop info here - subseq_stop_sequence = (prior_stop.groupby("trip_instance_key") + subseq_stop_sequence = (gdf.groupby(trip_cols) .stop_sequence .shift(-1)), - subseq_stop_id = (prior_stop.groupby("trip_instance_key") + subseq_stop_id = (gdf.groupby(trip_cols) .stop_id .shift(-1)), - subseq_stop_name = (prior_stop.groupby("trip_instance_key") + subseq_stop_name = (gdf.groupby(trip_cols) .stop_name - .shift(-1)) + .shift(-1)), + ).fillna({ + **{c: "" for c in ["subseq_stop_id", "subseq_stop_name"]} + }) + + + stop_direction = np.vectorize(rt_utils.primary_cardinal_direction)( + gdf.prior_geometry.fillna(gdf.geometry), gdf.geometry) + + # Just keep subset of columns because we'll get other stop columns back when we merge with stop_times + keep_cols = [ + "trip_instance_key", "stop_sequence", + "stop_meters", + "prior_stop_sequence", "subseq_stop_sequence" + ] + + # Create stop pair with underscores, since stop_id + # can contain hyphens + gdf2 = gdf[keep_cols].assign( + stop_primary_direction = stop_direction, + stop_pair = gdf.stop_id.astype(str).str.cat( + gdf.subseq_stop_id.astype(str)), + stop_pair_name = gdf.stop_name.astype(str).str.cat( + gdf.subseq_stop_name.astype(str)), ) - # Merge in prior stop geom as a separate column so we can - # calculate distance / direction - prior_stop_geom = (stop_times[["trip_instance_key", - "stop_sequence", "geometry"]] - .rename(columns = { - "stop_sequence": "prior_stop_sequence", - "geometry": "prior_geometry" - }) - .set_geometry("prior_geometry") - ) - - stop_times_with_prior = pd.merge( + stop_times_geom_direction = pd.merge( stop_times, - prior_stop, + gdf2, on = trip_stop_cols, - how = "left" + how = "inner" ) - - stop_times_with_prior_geom = pd.merge( - stop_times_with_prior, - prior_stop_geom, - on = ["trip_instance_key", "prior_stop_sequence"], - how = "left" - ).astype({ - "prior_stop_sequence": "Int64", - "subseq_stop_sequence": "Int64" - }).fillna({ - "subseq_stop_id": "", - "subseq_stop_name": "" - }) - - # Create stop pair with underscores, since stop_id - # can contain hyphens - stop_times_with_prior_geom = stop_times_with_prior_geom.assign( - stop_pair = stop_times_with_prior_geom.apply( - lambda x: - str(x.stop_id) + "__" + str(x.subseq_stop_id), - axis=1, - ), - stop_pair_name = stop_times_with_prior_geom.apply( - lambda x: - x.stop_name + "__" + x.subseq_stop_name, - axis=1, - ), - ).drop(columns = ["subseq_stop_id", "subseq_stop_name"]) - - return stop_times_with_prior_geom + + return stop_times_geom_direction def assemble_stop_times_with_direction( @@ -179,50 +187,17 @@ def assemble_stop_times_with_direction( scheduled_stop_times = prep_scheduled_stop_times(analysis_date) - trip_stop_cols = ["trip_instance_key", "stop_sequence", - "stop_id", "stop_name"] + trip_cols = ["trip_instance_key"] + trip_stop_cols = ["trip_instance_key", "stop_sequence"] - scheduled_stop_times2 = find_prior_subseq_stop( - scheduled_stop_times, trip_stop_cols - ) - - other_stops = scheduled_stop_times2[ - ~(scheduled_stop_times2.prior_geometry.isna()) - ] - - first_stop = scheduled_stop_times2[ - scheduled_stop_times2.prior_geometry.isna() - ] - - first_stop = first_stop.assign( - stop_primary_direction = "Unknown" - ).drop(columns = "prior_geometry") - - other_stops_no_geom = other_stops.drop(columns = ["prior_geometry"]) - - prior_geom = other_stops.prior_geometry - current_geom = other_stops.geometry - - # Create a column with readable direction like westbound, eastbound, etc - stop_direction = np.vectorize( - rt_utils.primary_cardinal_direction)(prior_geom, current_geom) - stop_distance = prior_geom.distance(current_geom) - - other_stops_no_geom = other_stops_no_geom.assign( - stop_primary_direction = stop_direction, - stop_meters = stop_distance, - ) - - scheduled_stop_times_with_direction = pd.concat( - [first_stop, other_stops_no_geom], - axis=0 - ) - - df = scheduled_stop_times_with_direction.sort_values( - trip_stop_cols).reset_index(drop=True) - - time1 = datetime.datetime.now() - print(f"get scheduled stop times with direction: {time1 - start}") + df = find_prior_subseq_stop_info( + scheduled_stop_times, + analysis_date, + trip_cols = trip_cols, + trip_stop_cols = trip_stop_cols + ).sort_values( + trip_stop_cols + ).reset_index(drop=True) utils.geoparquet_gcs_export( df, @@ -231,7 +206,9 @@ def assemble_stop_times_with_direction( ) end = datetime.datetime.now() - print(f"execution time: {end - start}") + logger.info( + f"scheduled stop times with direction {analysis_date}: {end - start}" + ) return @@ -239,7 +216,12 @@ def assemble_stop_times_with_direction( if __name__ == "__main__": from update_vars import analysis_date_list - + + LOG_FILE = "./logs/preprocessing.log" + logger.add(LOG_FILE, retention="3 months") + logger.add(sys.stderr, + format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", + level="INFO") + for date in analysis_date_list: - print(date) assemble_stop_times_with_direction(date, GTFS_DATA_DICT) \ No newline at end of file From de1f8f96ffc952379865fb07887bb2a3bfbb45f2 Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Fri, 27 Dec 2024 04:51:43 +0000 Subject: [PATCH 02/11] (refactor): vp dwell time script, combine steps and use arrays --- gtfs_funnel/vp_dwell_time.py | 191 +++++++++++++++++------------------ 1 file changed, 91 insertions(+), 100 deletions(-) diff --git a/gtfs_funnel/vp_dwell_time.py b/gtfs_funnel/vp_dwell_time.py index 68b08359c..4a4261896 100644 --- a/gtfs_funnel/vp_dwell_time.py +++ b/gtfs_funnel/vp_dwell_time.py @@ -2,33 +2,58 @@ Add dwell time to vp """ import datetime +import numpy as np import pandas as pd import sys -from dask import delayed, compute from loguru import logger -from segment_speed_utils import segment_calcs from segment_speed_utils.project_vars import SEGMENT_GCS from shared_utils import publish_utils from update_vars import GTFS_DATA_DICT -def import_vp(analysis_date: str) -> pd.DataFrame: + +def import_vp(analysis_date: str, **kwargs) -> pd.DataFrame: """ - Import vehicle positions with a subset of columns - we need to check whether bus is dwelling - at a location. + Import vehicle positions for this script, + and allow for kwargs for filtering columns or rows from + the partitioned parquet. """ USABLE_VP = GTFS_DATA_DICT.speeds_tables.usable_vp - vp = pd.read_parquet( - f"{SEGMENT_GCS}{USABLE_VP}_{analysis_date}", + + df = pd.read_parquet( + f"{SEGMENT_GCS}{USABLE_VP}_{analysis_date}/", + **kwargs + ) + + return df + + +def filter_to_not_moving_vp(analysis_date: str) -> pd.DataFrame: + """ + Filter down to vp that aren't moving, + because they have vp_primary_direction == Unknown. + Exclude first vp of each trip. + """ + first_vp = import_vp( + analysis_date, + columns = ["trip_instance_key", "vp_idx"] + ).groupby("trip_instance_key").vp_idx.min().tolist() + + # If the direction is Unknown, that means vp hasn't moved + # from prior point + # filter out the first vp for each trip (since that one row always has Unknown direction) + vp_staying = import_vp( + analysis_date, columns = [ "trip_instance_key", "vp_idx", "location_timestamp_local", "vp_primary_direction" ], + filters = [[("vp_primary_direction", "==", "Unknown"), + ("vp_idx", "not in", first_vp)]] ) - - return vp + + return vp_staying def group_vp_dwelling_rows(df: pd.DataFrame) -> pd.DataFrame: @@ -43,100 +68,68 @@ def group_vp_dwelling_rows(df: pd.DataFrame) -> pd.DataFrame: and it can stop at a plaza, go on elsewhere, and come back to a plaza, and we don't want to mistakenly group non-consecutive vp. """ + prior_expected = (df.vp_idx - 1).to_numpy() + prior = (df.groupby("trip_instance_key").vp_idx.shift(1)).to_numpy() + + # Assign 0 if it seems to be dwelling (we want it to be grouped together) + # 1 if it's moving + same_group = np.where(prior == prior_expected, 0, 1) + df = df.assign( - #prior_expected = df.vp_idx - 1, - prior = (df.sort_values(["trip_instance_key", "vp_idx"]) - .groupby("trip_instance_key", observed=True, group_keys=False) - .vp_idx - .apply(lambda x: x.shift(1)) - ) - ) - - - df = df.assign( - # flag whether it is moving (we want 0's to show up for dwelling vp - # because this will get the cumcount() to work - is_moving = df.apply( - lambda x: - 0 if x.prior == x.prior_expected - else 1, axis=1).astype("int8") + same_group = same_group ) return df -def split_into_moving_and_dwelling(vp: pd.DataFrame): - """ - Use vp_primary_direction to split vp into either moving vp or dwelling vp. - Dwelling vp need extra transforms to figure how long it dwelled. - It's unknown if there was no movement, because the x, y is the - same, so direction was not able to be calculated. - The only exception is the first vp, because there is no prior point against which - to calculate direction. +def assign_vp_groupings( + unknown_vp: pd.DataFrame, + analysis_date: str +): """ - usable_bounds = segment_calcs.get_usable_vp_bounds_by_trip( - vp - ).drop(columns = "max_vp_idx") - - vp2 = pd.merge( - vp, - usable_bounds, - on = "trip_instance_key", - how = "inner" + Concatenate the unknown-direction vp with known-direction vp. + A portion of unknown direction vps were flagged as being in the same + group (consecutive timestamps and locations) or not. + + Use this to set a vp_grouping column that can help us do a more + nuanced grouping. + Need to do this because it's possible for vp to show up in the same location + but with timestamps far apart. + """ + subset_vp_idx = unknown_vp.vp_idx.tolist() + + known_vp = import_vp( + analysis_date, + columns = [ + "trip_instance_key", "vp_idx", + "location_timestamp_local", "vp_primary_direction" + ], + filters = [[("vp_idx", "not in", subset_vp_idx)]] ) - vp2 = vp2.assign( - prior_expected = vp2.vp_idx - 1, + vp = pd.concat( + [known_vp, unknown_vp], + axis=0 + ).sort_values(["trip_instance_key", "vp_idx"]).reset_index(drop=True) + + vp = vp.assign( + same_group = vp.same_group.fillna(0).astype("int8") ) - # keep subset of prior vp when we have unknowns, - #then we want to grab just the one above - subset_vp_prior = vp2[ - vp2.vp_primary_direction=="Unknown" - ].prior_expected.unique().tolist() - - subset_unknown_vp = vp2[ - vp2.vp_primary_direction=="Unknown" - ].vp_idx.unique().tolist() - - # These vp have unknowns and may need to consolidate - # leave first vp in, just in case the second vp is unknown - vp_unknowns = vp2.loc[ - vp2.vp_idx.isin(subset_vp_prior + subset_unknown_vp) - ] - - # Vast majority of vp should be here, and we want to - # separate these out because no change is happening - # and we don't want to do an expensive row-wise shift on these - vp_knowns = vp2.loc[~vp2.vp_idx.isin(subset_vp_prior + subset_unknown_vp)] - - vp_unknowns2 = group_vp_dwelling_rows(vp_unknowns) - - vp3 = pd.concat( - [vp_knowns, vp_unknowns2], - axis=0, ignore_index=True - ).drop( - columns = ["prior", "prior_expected"] - ).fillna( - {"is_moving": 1} - ).astype( - {"is_moving": "int8"} - ).sort_values("vp_idx").reset_index(drop=True) - - vp3 = vp3.assign( - # since is_moving=0 if the vp is dwelling, + + vp = vp.assign( + # since same_group=0 if the vp is dwelling, # cumsum() will not change from the prior vp # and a set of 2 or 3 will hold the same vp_grouping value # once the vp moves and is_moving=1, then cumsum() will increase again - vp_grouping = (vp3.groupby("trip_instance_key", - observed=True, group_keys=False) - .is_moving + vp_grouping = (vp.groupby("trip_instance_key") + .same_group .cumsum() ) ) - return vp3 - + return vp + def add_dwell_time( vp_grouped: pd.DataFrame, @@ -149,7 +142,7 @@ def add_dwell_time( group_cols = ["trip_instance_key", "vp_grouping"] start_vp = (vp_grouped - .groupby(group_cols, observed=True, group_keys=False) + .groupby(group_cols, group_keys=False) .agg({ "vp_idx": "min", "location_timestamp_local": "min", @@ -159,7 +152,7 @@ def add_dwell_time( ) end_vp = (vp_grouped - .groupby(group_cols, observed=True, group_keys=False) + .groupby(group_cols, group_keys=False) .agg({ "vp_idx": "max", "location_timestamp_local": "max" @@ -183,7 +176,7 @@ def add_dwell_time( ) return df - + if __name__ == "__main__": from update_vars import analysis_date_list @@ -200,21 +193,19 @@ def add_dwell_time( EXPORT_FILE = GTFS_DATA_DICT.speeds_tables.vp_dwell start = datetime.datetime.now() - - vp = delayed(import_vp)(analysis_date) - - vp_grouped = delayed(split_into_moving_and_dwelling)(vp) - - vp_with_dwell = delayed(add_dwell_time)(vp_grouped) + + vp_unknowns = filter_to_not_moving_vp(analysis_date).pipe(group_vp_dwelling_rows) + + vp_grouped = assign_vp_groupings( + vp_unknowns, analysis_date + ) - vp_with_dwell = compute(vp_with_dwell)[0] + vp_with_dwell = add_dwell_time(vp_grouped) time1 = datetime.datetime.now() logger.info(f"compute dwell df: {time1 - start}") - vp_usable = pd.read_parquet( - f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}", - ) + vp_usable = import_vp(analysis_date) vp_usable_with_dwell = pd.merge( vp_usable, @@ -233,4 +224,4 @@ def add_dwell_time( end = datetime.datetime.now() logger.info(f"merge with original and export: {end - time1}") - logger.info(f"vp with dwell time {analysis_date}: {end - start}") \ No newline at end of file + logger.info(f"vp with dwell time {analysis_date}: {end - start}") From f81697f1a812f2c69a8f14fb6187f9f3443e87fb Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Fri, 27 Dec 2024 04:55:31 +0000 Subject: [PATCH 03/11] (refactor): combine vp_keep_usable and vp_direction into 1 script --- gtfs_funnel/vp_keep_usable.py | 245 ++++++++++++++++++++-------------- 1 file changed, 148 insertions(+), 97 deletions(-) diff --git a/gtfs_funnel/vp_keep_usable.py b/gtfs_funnel/vp_keep_usable.py index 7b210c878..f63399d82 100644 --- a/gtfs_funnel/vp_keep_usable.py +++ b/gtfs_funnel/vp_keep_usable.py @@ -1,140 +1,137 @@ """ Pre-processing vehicle positions. Drop all RT trips with less than 10 min of data. +Add direction to vp. + +Doing this with geopandas gdfs will crash kernel (2 geom cols too much). +Doing this with dask_geopandas gddfs takes ~25 min. +Doing this with dask ddfs (x, y) coords takes ~7 min. +Doing this with dask ddfs + np arrays takes ~4 min. (but persisting takes another 4 min) +Doing this with pandas and numpy arrays takes ~8 min. """ -import dask.dataframe as dd import datetime import geopandas as gpd -import gcsfs import numpy as np import pandas as pd import sys from loguru import logger -from shared_utils import publish_utils +from calitp_data_analysis import utils +from calitp_data_analysis.geography_utils import WGS84 +from segment_speed_utils.project_vars import PROJECT_CRS +from shared_utils import geo_utils, publish_utils, rt_utils from update_vars import GTFS_DATA_DICT, SEGMENT_GCS - -fs = gcsfs.GCSFileSystem() +import vp_transform -def trip_time_elapsed( - ddf: dd.DataFrame, - group_cols: list, - timestamp_col: str -): +def find_valid_trips( + vp: pd.DataFrame, + timestamp_col: str, + trip_time_min_cutoff: int +) -> list: """ Group by trip and calculate the time elapsed (max_time-min_time) for RT vp observed. """ - min_time = (ddf.groupby(group_cols, observed=True, group_keys=False) - [timestamp_col] - .min() - .dropna() - .reset_index() - .rename(columns = {timestamp_col: "min_time"}) - ) - - - max_time = (ddf.groupby(group_cols, observed=True, group_keys=False) - [timestamp_col] - .max() - .dropna() - .reset_index() - .rename(columns = {timestamp_col: "max_time"}) - ) - - df = dd.merge( - min_time, - max_time, - on = group_cols, - how = "outer" - ) - - df = df.assign( - trip_time_sec = (df.max_time - df.min_time) / np.timedelta64(1, "s") - ) - - return df - + min_time = vp.groupby("trip_instance_key")[timestamp_col].min() + max_time = vp.groupby("trip_instance_key")[timestamp_col].max() -def get_valid_trips_by_time_cutoff( - ddf: dd.DataFrame, - timestamp_col: str, - trip_time_min_cutoff: int -)-> pd.DataFrame: - """ - Filter down trips by trip time elapsed. - Set the number of minutes to do cut-off for at least x min of RT. - """ - trip_cols = ["trip_instance_key"] - trip_stats = trip_time_elapsed( - ddf, - trip_cols, - timestamp_col - ) + # This is a pd.Series that calculates the trip time elapsed + # in vp df + trip_times = ((max_time - min_time) / np.timedelta64(1, "s")) - usable_trips = (trip_stats[ - trip_stats.trip_time_sec >= trip_time_min_cutoff * 60] - [trip_cols] - .drop_duplicates() - .reset_index(drop=True) - ) + # Subset it to trips that meet our minimum time cut-off + # index is the trip_instance_key + usable_trips = trip_times.loc[ + trip_times > (trip_time_min_cutoff * 60) + ].index.tolist() return usable_trips -def pare_down_vp_to_valid_trips( +def pare_down_to_valid_trips( analysis_date: str, dict_inputs: dict = {} ): - """ - Pare down vehicle positions that have been joined to segments - to keep the enter / exit timestamps. - Also, exclude any bad batches of trips. - """ + time0 = datetime.datetime.now() + INPUT_FILE = dict_inputs.speeds_tables.raw_vp TIMESTAMP_COL = dict_inputs.speeds_tables.timestamp_col TIME_CUTOFF = dict_inputs.speeds_tables.time_min_cutoff EXPORT_FILE = dict_inputs.speeds_tables.usable_vp vp = gpd.read_parquet( - f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}.parquet" - ) + f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}.parquet", + ).to_crs(WGS84) - usable_trips = get_valid_trips_by_time_cutoff( - vp, - TIMESTAMP_COL, - TIME_CUTOFF - ) + usable_trips = find_valid_trips(vp, TIMESTAMP_COL, TIME_CUTOFF) - usable_vp = pd.merge( - vp, - usable_trips, - on = "trip_instance_key", - how = "inner" - ).sort_values( + vp = vp[vp.trip_instance_key.isin(usable_trips)].sort_values( ["gtfs_dataset_key", "trip_id", TIMESTAMP_COL] ).drop_duplicates( subset=["trip_instance_key", TIMESTAMP_COL] ).reset_index(drop=True) + - # Let's convert to tabular now, make use of partitioning - # We want to break up sjoins, so we can wrangle it to points on-the-fly - usable_vp = usable_vp.assign( - x = usable_vp.geometry.x, - y = usable_vp.geometry.y, - vp_idx = usable_vp.index - ).drop(columns = "geometry") + vp = vp.assign( + vp_idx = vp.index, + ) + + utils.geoparquet_gcs_export( + vp, + SEGMENT_GCS, + f"{EXPORT_FILE}_{analysis_date}_stage" + ) + + time1 = datetime.datetime.now() + logger.info(f"pare down vp: {time1 - time0}") + + # Add in direction of travel + get_vp_direction_column(vp) + + time2 = datetime.datetime.now() + logger.info(f"export vp direction: {time2 - time1}") + + return + + +def merge_in_vp_direction( + analysis_date: str, + dict_inputs: dict = {} +): + """ + Merge staged vp_usable with the vp direction results + and export. + """ + time0 = datetime.datetime.now() + INPUT_FILE = dict_inputs.speeds_tables.usable_vp + + vp_direction = pd.read_parquet( + f"{SEGMENT_GCS}vp_direction_{analysis_date}.parquet" + ) + + # By the end of add_vp_direction, we return df, not gdf + # Let's convert to tabular now, make use of partitioning + vp = gpd.read_parquet( + f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}_stage.parquet", + ).to_crs(WGS84).merge( + vp_direction, + on = "vp_idx", + how = "inner" + ) - # Either use dask (which kills kernel here) or remove the existing folder of output - # https://stackoverflow.com/questions/69092126/is-it-possible-to-change-the-output-filenames-when-saving-as-partitioned-parquet - export_path = f"{SEGMENT_GCS}{EXPORT_FILE}_{analysis_date}_stage" + vp = vp.assign( + x = vp.geometry.x, + y = vp.geometry.y + ).drop(columns = "geometry") + export_path = f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}" + publish_utils.if_exists_then_delete(export_path) - usable_vp.to_parquet( + vp.to_parquet( export_path, partition_cols = "gtfs_dataset_key", # if we don't delete the entire folder of partitioned parquets, this @@ -142,11 +139,58 @@ def pare_down_vp_to_valid_trips( #existing_data_behavior = "delete_matching" ) - del vp, usable_trips, usable_vp + time1 = datetime.datetime.now() + logger.info(f"{analysis_date}: export usable vp with direction: {time1 - time0}") + + return - return - +def get_vp_direction_column( + vp_gdf: gpd.GeoDataFrame, +) -> pd.DataFrame: + """ + """ + vp_gdf = vp_gdf[ + ["trip_instance_key", "vp_idx", "geometry"] + ].to_crs(PROJECT_CRS) + + vp_condensed = vp_transform.condense_by_trip( + vp_gdf, + group_cols = ["trip_instance_key"], + sort_cols = ["trip_instance_key", "vp_idx"], + array_cols = ["vp_idx", "geometry"] + ) + + vp_direction_series = [] + + for row in vp_condensed.itertuples(): + vp_geom = np.array(getattr(row, "geometry")) + next_vp_geom = vp_geom[1:] + + vp_direction = np.array( + ["Unknown"] + + [rt_utils.primary_cardinal_direction(prior_vp, current_vp) + for prior_vp, current_vp in zip(vp_geom, next_vp_geom) + ]) + + vp_direction_series.append(vp_direction) + + keep_cols = ["vp_idx", "vp_primary_direction"] + + vp_condensed = vp_condensed.assign( + vp_primary_direction = vp_direction_series + )[keep_cols].pipe( + vp_transform.explode_arrays, + array_cols = keep_cols + ) + + vp_condensed.to_parquet( + f"{SEGMENT_GCS}vp_direction_{analysis_date}.parquet" + ) + + return + + if __name__ == "__main__": from update_vars import analysis_date_list @@ -160,12 +204,19 @@ def pare_down_vp_to_valid_trips( for analysis_date in analysis_date_list: start = datetime.datetime.now() - pare_down_vp_to_valid_trips( + pare_down_to_valid_trips( analysis_date, GTFS_DATA_DICT ) - end = datetime.datetime.now() - logger.info(f"{analysis_date}: pare down vp: {end - start}") + merge_in_vp_direction( + analysis_date, + GTFS_DATA_DICT + ) + end = datetime.datetime.now() + logger.info( + f"{analysis_date}: pare down vp, add direction execution time: " + f"{end - start}" + ) \ No newline at end of file From 8330d8c4863c3c4cae14f2bd7cb65ede80bd7c64 Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Fri, 27 Dec 2024 04:56:30 +0000 Subject: [PATCH 04/11] staging usable vp is no longer partitioned --- gtfs_funnel/cleanup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gtfs_funnel/cleanup.py b/gtfs_funnel/cleanup.py index de31a7af5..bf6c81ae1 100644 --- a/gtfs_funnel/cleanup.py +++ b/gtfs_funnel/cleanup.py @@ -15,7 +15,7 @@ INPUT_FILE = GTFS_DATA_DICT.speeds_tables.usable_vp publish_utils.if_exists_then_delete( - f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}_stage" + f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}_stage.parquet" ) publish_utils.if_exists_then_delete( f"{SEGMENT_GCS}vp_direction_{analysis_date}.parquet" From bbc8dadff4e600fcd115abe6cc667abedadc7421 Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Fri, 27 Dec 2024 04:57:44 +0000 Subject: [PATCH 05/11] (remove): vp_direction script --- gtfs_funnel/vp_direction.py | 205 ------------------------------------ 1 file changed, 205 deletions(-) delete mode 100644 gtfs_funnel/vp_direction.py diff --git a/gtfs_funnel/vp_direction.py b/gtfs_funnel/vp_direction.py deleted file mode 100644 index 21bf97145..000000000 --- a/gtfs_funnel/vp_direction.py +++ /dev/null @@ -1,205 +0,0 @@ -""" -Add direction to vp. - -Doing this with geopandas gdfs will crash kernel (2 geom cols too much). -Doing this with dask_geopandas gddfs takes ~25 min. -Doing this with dask ddfs (x, y) coords takes ~7 min. -Doing this with dask ddfs + np arrays takes ~4 min. -""" -import dask.dataframe as dd -import dask_geopandas as dg -import datetime -import gcsfs -import geopandas as gpd -import numpy as np -import pandas as pd -import sys - -from loguru import logger - -from calitp_data_analysis.geography_utils import WGS84 -from segment_speed_utils import segment_calcs -from segment_speed_utils.project_vars import PROJECT_CRS -from shared_utils import publish_utils, rt_utils -from update_vars import GTFS_DATA_DICT, SEGMENT_GCS - -fs = gcsfs.GCSFileSystem() - -def attach_prior_vp_add_direction( - analysis_date: str, - dict_inputs: dict = {} -): - """ - For each vp, attach the prior_vp, and use - the 2 geometry columns to find the direction - the vp is traveling. - Since export takes awhile, - save out a parquet and read it in to merge later. - """ - time0 = datetime.datetime.now() - INPUT_FILE = dict_inputs.speeds_tables.usable_vp - - vp = dd.read_parquet( - f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}_stage", - columns = ["trip_instance_key", "vp_idx", "x", "y"], - ).repartition(npartitions=20) - - usable_bounds = segment_calcs.get_usable_vp_bounds_by_trip(vp) - - vp2 = dd.merge( - vp, - usable_bounds, - on = "trip_instance_key", - how = "inner" - ) - - # Convert the x, y into gdf, project it, since direction has to be - # calculated in projected CRS, so save out the projected CRS's x and y - vp_gddf = dg.from_dask_dataframe( - vp2, - geometry = dg.points_from_xy(vp2, x="x", y="y") - ).set_crs(WGS84).to_crs(PROJECT_CRS) - - vp_ddf = vp_gddf.assign( - x = vp_gddf.geometry.x, - y = vp_gddf.geometry.y, - prior_vp_idx = vp_gddf.vp_idx - 1 - ).drop(columns = "geometry") - - # Dask gdf doesn't like to be renamed on-the-fly - # Make 1 partition for faster merging - vp_ddf_renamed = vp_ddf[ - ["vp_idx", "x", "y"] - ].add_prefix("prior_").repartition(npartitions=1) - - # Merge on prior_vp_idx's geometry, and filter to those rows - # whose prior_vp_idx is from the same trip_instance_key - full_df = dd.merge( - vp_ddf, - vp_ddf_renamed, - on = "prior_vp_idx", - how = "inner" - ).query('prior_vp_idx >= min_vp_idx')[ - ["vp_idx", "prior_x", "prior_y", "x", "y"] - ].reset_index(drop=True) - - keep_cols = ["vp_idx", "prior_x", "prior_y", "x", "y"] - full_df = full_df[keep_cols].compute() - - time1 = datetime.datetime.now() - logger.info(f"persist vp gddf: {time1 - time0}") - - vp_indices = full_df.vp_idx.to_numpy() - distance_east = full_df.x - full_df.prior_x - distance_north = full_df.y - full_df.prior_y - - # Stack our results and convert to df - results_array = np.column_stack(( - vp_indices, - distance_east, - distance_north - )) - - vp_direction = pd.DataFrame( - results_array, - columns = ["vp_idx", "distance_east", "distance_north"] - ).astype({ - "vp_idx": "int64", - "distance_east": "float", - "distance_north": "float" - }) - - # Get a readable direction (westbound, eastbound) - vp_direction = vp_direction.assign( - vp_primary_direction = vp_direction.apply( - lambda x: - rt_utils.cardinal_definition_rules(x.distance_east, x.distance_north), - axis=1 - ) - ).drop(columns = ["distance_east", "distance_north"]) - - time2 = datetime.datetime.now() - logger.info(f"np vectorize arrays for direction: {time2 - time1}") - - vp_direction.to_parquet( - f"{SEGMENT_GCS}vp_direction_{analysis_date}.parquet") - - del vp_direction, full_df, usable_bounds, vp, vp2 - - return - - -def add_direction_to_usable_vp( - analysis_date: str, - dict_inputs: dict = {} -): - """ - Merge staged vp_usable (partitioned by gtfs_dataset_key) - to the vp direction results. - """ - INPUT_FILE = dict_inputs.speeds_tables.usable_vp - - usable_vp = pd.read_parquet( - f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}_stage" - ) - - vp_direction = pd.read_parquet( - f"{SEGMENT_GCS}vp_direction_{analysis_date}.parquet" - ) - - # Do a left merge so that rows (esp first vp for each trip) can be filled in - # with Unknowns later - vp_with_dir = pd.merge( - usable_vp, - vp_direction, - on = "vp_idx", - how = "left" - ) - - vp_with_dir = vp_with_dir.assign( - vp_primary_direction = vp_with_dir.vp_primary_direction.fillna("Unknown"), - ).drop_duplicates(subset=["vp_idx", "vp_primary_direction"]) - - export_path = f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}" - - publish_utils.if_exists_then_delete(export_path) - - vp_with_dir.to_parquet( - export_path, - partition_cols = "gtfs_dataset_key", - ) - - del usable_vp, vp_direction, vp_with_dir - - return - - -if __name__ == "__main__": - - from update_vars import analysis_date_list - - LOG_FILE = "./logs/vp_preprocessing.log" - logger.add(LOG_FILE, retention="3 months") - logger.add(sys.stderr, - format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", - level="INFO") - - for analysis_date in analysis_date_list: - - start = datetime.datetime.now() - - attach_prior_vp_add_direction(analysis_date, GTFS_DATA_DICT) - - time1 = datetime.datetime.now() - logger.info(f"{analysis_date}: export vp direction: {time1 - start}") - - add_direction_to_usable_vp(analysis_date, GTFS_DATA_DICT) - - end = datetime.datetime.now() - - logger.info( - f"{analysis_date}: export usable vp with direction: " - f"{end - time1}") - logger.info( - f"{analysis_date}: vp_direction script execution time: " - f"{end - start}") \ No newline at end of file From 0f4c80daa5e4ba26fb59d3f2ef32e867f56e4c89 Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Fri, 27 Dec 2024 04:57:59 +0000 Subject: [PATCH 06/11] remove vp_direction from Makefile --- gtfs_funnel/Makefile | 1 - 1 file changed, 1 deletion(-) diff --git a/gtfs_funnel/Makefile b/gtfs_funnel/Makefile index a98a88e67..aa42b0874 100644 --- a/gtfs_funnel/Makefile +++ b/gtfs_funnel/Makefile @@ -13,7 +13,6 @@ preprocess_schedule_vp_dependency: preprocess_vp: python vp_keep_usable.py - python vp_direction.py python cleanup.py python vp_dwell_time.py python vp_condenser.py From 4e99fdd15e5b21f0dc9abb80a09cf9ca957e496a Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Fri, 27 Dec 2024 21:50:00 +0000 Subject: [PATCH 07/11] (refacotor): fix vp_dwell_time, check vp_condenser --- gtfs_funnel/Makefile | 2 +- gtfs_funnel/vp_condenser.py | 14 +++--- gtfs_funnel/vp_dwell_time.py | 31 +++++++++---- .../segment_speed_utils/vp_transform.py | 46 +++++++++++++------ 4 files changed, 65 insertions(+), 28 deletions(-) diff --git a/gtfs_funnel/Makefile b/gtfs_funnel/Makefile index aa42b0874..d17bc190a 100644 --- a/gtfs_funnel/Makefile +++ b/gtfs_funnel/Makefile @@ -21,7 +21,7 @@ preprocess_schedule_only: make route_typologies_data python operator_scheduled_stats.py python schedule_stats_by_stop.py - #python track_publish_dates.py + python track_publish_dates.py route_typologies_data: python route_typologies.py diff --git a/gtfs_funnel/vp_condenser.py b/gtfs_funnel/vp_condenser.py index 80e06ea1f..4abcc31da 100644 --- a/gtfs_funnel/vp_condenser.py +++ b/gtfs_funnel/vp_condenser.py @@ -29,24 +29,26 @@ def condense_vp_to_linestring( EXPORT_FILE = dict_inputs.speeds_tables.vp_condensed_line vp = delayed(pd.read_parquet)( - f"{SEGMENT_GCS}{USABLE_VP}_{analysis_date}", + f"{SEGMENT_GCS}{USABLE_VP}_{analysis_date}/", columns = ["trip_instance_key", "x", "y", "vp_idx", "vp_primary_direction", "location_timestamp_local", "moving_timestamp_local", ], - ).pipe(geo_utils.vp_as_gdf, crs = WGS84) - + ).pipe( + geo_utils.vp_as_gdf, crs = WGS84 + ) + vp_condensed = delayed(vp_transform.condense_point_geom_to_line)( vp, group_cols = ["trip_instance_key"], geom_col = "geometry", - other_cols = ["vp_idx", "location_timestamp_local", - "moving_timestamp_local", + array_cols = ["vp_idx", + "location_timestamp_local", "moving_timestamp_local", "vp_primary_direction", ], ).set_geometry("geometry").set_crs(WGS84) - + vp_condensed = compute(vp_condensed)[0] utils.geoparquet_gcs_export( diff --git a/gtfs_funnel/vp_dwell_time.py b/gtfs_funnel/vp_dwell_time.py index 4a4261896..fe8502ab4 100644 --- a/gtfs_funnel/vp_dwell_time.py +++ b/gtfs_funnel/vp_dwell_time.py @@ -51,7 +51,7 @@ def filter_to_not_moving_vp(analysis_date: str) -> pd.DataFrame: ], filters = [[("vp_primary_direction", "==", "Unknown"), ("vp_idx", "not in", first_vp)]] - ) + ).sort_values("vp_idx").reset_index(drop=True) return vp_staying @@ -112,8 +112,11 @@ def assign_vp_groupings( axis=0 ).sort_values(["trip_instance_key", "vp_idx"]).reset_index(drop=True) + # Before, we assigned 0 if vp appeared to be dwelling at the location + # Now, for the other portion, we assign 1 because we do not want those vps to be in same group + # so we want the cumsum to move up in value vp = vp.assign( - same_group = vp.same_group.fillna(0).astype("int8") + same_group = vp.same_group.fillna(1).astype("int8") ) @@ -169,14 +172,19 @@ def add_dwell_time( on = group_cols, how = "inner" ) + + # We'll get these back when we merge on vp_idx later + # Use it to get dwell_sec and then drop + drop_cols = ["trip_instance_key", "location_timestamp_local"] df = df.assign( dwell_sec = (df.moving_timestamp_local - df.location_timestamp_local).dt.total_seconds().astype("int") - ) - + ).drop(columns = drop_cols) + return df + if __name__ == "__main__": from update_vars import analysis_date_list @@ -194,7 +202,9 @@ def add_dwell_time( start = datetime.datetime.now() - vp_unknowns = filter_to_not_moving_vp(analysis_date).pipe(group_vp_dwelling_rows) + vp_unknowns = filter_to_not_moving_vp( + analysis_date + ).pipe(group_vp_dwelling_rows) vp_grouped = assign_vp_groupings( vp_unknowns, analysis_date @@ -205,14 +215,19 @@ def add_dwell_time( time1 = datetime.datetime.now() logger.info(f"compute dwell df: {time1 - start}") - vp_usable = import_vp(analysis_date) + remaining_vp = vp_with_dwell.vp_idx.tolist() + + vp_usable = import_vp( + analysis_date, + filters = [[("vp_idx", "in", remaining_vp)]] + ) vp_usable_with_dwell = pd.merge( vp_usable, vp_with_dwell, - on = ["trip_instance_key", "vp_idx", "location_timestamp_local"], + on = "vp_idx", how = "inner" - ) + ).sort_values("vp_idx").reset_index(drop=True) publish_utils.if_exists_then_delete( f"{SEGMENT_GCS}{EXPORT_FILE}_{analysis_date}") diff --git a/rt_segment_speeds/segment_speed_utils/vp_transform.py b/rt_segment_speeds/segment_speed_utils/vp_transform.py index bc41a9a90..01795b48c 100644 --- a/rt_segment_speeds/segment_speed_utils/vp_transform.py +++ b/rt_segment_speeds/segment_speed_utils/vp_transform.py @@ -15,13 +15,14 @@ "Unknown": "", } + def condense_point_geom_to_line( - df: pd.DataFrame, + df: gpd.GeoDataFrame, group_cols: list, geom_col: str = "geometry", - other_cols: list = [] + array_cols: list: = [] ) -> gpd.GeoDataFrame: - """ + """ To apply nearest neighbors, we need to create our equivalent line with coords, out of which we can select some nearest neighbors in `gtfs_segments.geom_utils.nearest_points`. @@ -30,13 +31,31 @@ def condense_point_geom_to_line( vp that occurred for a trip into a list and create that as a shapely.LineString object. """ - valid_groups = (df.groupby(group_cols, - observed=True, group_keys=False) + valid_groups = (df.groupby(group_cols, group_keys=False) .agg({geom_col: "count"}) .reset_index() .query(f'{geom_col} > 1') )[group_cols].drop_duplicates() + ''' + # If we want to support multiple geometry columns...but how often will this happen? + # Find which columns are geometry + # Keep only groups that can be constructed as linestrings (have more than 1 coord) + check_me = df[group_cols + array_cols] + geo_cols = list(check_me.columns[check_me.dtypes == "geometry"]) + + valid_groups = (df.groupby(group_cols) + .agg({ + **{c: "count" for c in geo_cols}} + ) + .reset_index() + ) + + valid_groups["obs"] = valid_groups[geo_cols].sum(axis=1) + valid_groups = valid_groups[ + valid_groups.obs >= (2 * len(geo_cols)) + ][group_cols].drop_duplicates() + ''' df2 = pd.merge( df, valid_groups, @@ -44,13 +63,14 @@ def condense_point_geom_to_line( how = "inner" ) - df3 = (df2.groupby(group_cols, - observed=True, group_keys=False) - .agg({ - geom_col: lambda x: shapely.LineString(list(x)), - **{k: lambda x: list(x) for k in other_cols} - }) - .reset_index() - ) + df3 = ( + df2 + .groupby(group_cols) + .agg({ + geom_col: lambda x: shapely.LineString(list(x)), + **{c: lambda x: list(x) for c in array_cols}, + }) + .reset_index() + ) return df3 \ No newline at end of file From 4aa87c2e40e4f347d0cb5d2f1a162630518e5813 Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Fri, 27 Dec 2024 22:00:26 +0000 Subject: [PATCH 08/11] update mermaid diagrams --- gtfs_funnel/mermaid.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gtfs_funnel/mermaid.md b/gtfs_funnel/mermaid.md index 45e19ca68..1f002d51f 100644 --- a/gtfs_funnel/mermaid.md +++ b/gtfs_funnel/mermaid.md @@ -53,11 +53,11 @@ graph TB subgraph vp_preprocessing E --> E3([vp_keep_usable.py]):::script --> - E4([vp_direction.py]):::script --> - F[vp_usable]:::df --> + E4([vp_dwell_time.py]):::script --> + F[vp_usable_dwell]:::df --> E5([cleanup.py]):::script; F --> F1([vp_condenser.py]):::script --> - F2[vp_condensed
vp_nearest_neighbor
NAD83]:::df; + F2[vp_nearest_neighbor
NAD83]:::df; end From 18a11be1f017a239b3db7d25bb648406ff3739dd Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Fri, 27 Dec 2024 22:05:12 +0000 Subject: [PATCH 09/11] update mermaid link in readme --- gtfs_funnel/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gtfs_funnel/README.md b/gtfs_funnel/README.md index 5f914c37d..c4076e716 100644 --- a/gtfs_funnel/README.md +++ b/gtfs_funnel/README.md @@ -11,7 +11,7 @@ Use `update_vars` and input one or several days to download. 1. Harmonize how route names are displayed in time-series: `make timeseries_preprocessing` 1. Download monthly aggregated schedule data: `make monthly_scheduled_data` -[![gtfs_funnel_mermaid1](https://mermaid.ink/img/pako:eNqlVX9v2jAQ_SqWJwSTgBESCk23SqUQ_to0qWiTllSRSS7EarAj22lLK777nKRAKL86DSmJ8b1393w--15xwEPANm61Wh5TVCVgo8nUuUNOxhgkqPFTQCp4AFJSNv_ssQJYq716DCHKqLJRMUSormJYQN1G9RmRUG9WZ38RQcksAVnfwLUpFXRBxPKWJ1zkvE_jntNzbtbULWIKz2qL6nQ6-5AhFyGIY6CEMjhmkxBwFu7qcJz-eFjBKBCK7kCiKKqX5lX-0a9VreYxj80FSWM0HZbG8i2zWTkd8ieWcKKVrl0jFCREyhFEKIxQRJPEfkvDAYQMBE3VGlWIzEOucTdGw11H8JWGyna6vP9s2_YbsdW6Rlt4QXEL3L3G6PD79hiSFIRs00XKhfJlEEOYJfDm_moLHlZjy5ikcD740C2BX2fi-vfkbmD9u4zSQUXHrVGRoXiZglMibt0C9j8acn5FwmgnFdroK7r4QDpG7hZ8VMWo654UUtJ3OU9UxX5IBQSKcoa-oanI4ItDEgn3Fd3jqu5HiGmQgJ9ySXPWefnjbsPVJykgCph-znrYZef-xu5j-m4frtb1DSxcDzenabteP63eUpWteKdzZDYqSfZ3E3NyhaPuSSmP6TEJ43JpOrDGPACkfibzq_B8Pq2C8jF5-c_J4aXzo-Uz7ulNSoCwLN31V6kDp-A5RhE-vxuBSRDnw3er-DDfSP2fAREglf7SeTzjIp_-cTMamJv93V6ThxLLdakTxYUfCC7lE0keKvfdARUTLXsD9ecqkn5IlG5ISid_6SvuczEnjL6QAzndL8mJW7jIqVUe2oTYL1PcxAsQC0JD3VaLZufhogl62NbDECKSJcrDumNoKMkUv1uyANtKH8omzlItF0aU6OUvsB3lh7SJIaQ6B9_LVl107CZOCcP2K37GdmvQbZuXZvfSMvsD87JrWE28xLZ50Tas3sC6sCyj1xlYxqqJXzjXXo1219BNsGP0TVOjrb5VuPtTGIuQq79773TT?type=png)](https://mermaid.live/edit#pako:eNqlVX9v2jAQ_SqWJwSTgBESCk23SqUQ_to0qWiTllSRSS7EarAj22lLK777nKRAKL86DSmJ8b1393w--15xwEPANm61Wh5TVCVgo8nUuUNOxhgkqPFTQCp4AFJSNv_ssQJYq716DCHKqLJRMUSormJYQN1G9RmRUG9WZ38RQcksAVnfwLUpFXRBxPKWJ1zkvE_jntNzbtbULWIKz2qL6nQ6-5AhFyGIY6CEMjhmkxBwFu7qcJz-eFjBKBCK7kCiKKqX5lX-0a9VreYxj80FSWM0HZbG8i2zWTkd8ieWcKKVrl0jFCREyhFEKIxQRJPEfkvDAYQMBE3VGlWIzEOucTdGw11H8JWGyna6vP9s2_YbsdW6Rlt4QXEL3L3G6PD79hiSFIRs00XKhfJlEEOYJfDm_moLHlZjy5ikcD740C2BX2fi-vfkbmD9u4zSQUXHrVGRoXiZglMibt0C9j8acn5FwmgnFdroK7r4QDpG7hZ8VMWo654UUtJ3OU9UxX5IBQSKcoa-oanI4ItDEgn3Fd3jqu5HiGmQgJ9ySXPWefnjbsPVJykgCph-znrYZef-xu5j-m4frtb1DSxcDzenabteP63eUpWteKdzZDYqSfZ3E3NyhaPuSSmP6TEJ43JpOrDGPACkfibzq_B8Pq2C8jF5-c_J4aXzo-Uz7ulNSoCwLN31V6kDp-A5RhE-vxuBSRDnw3er-DDfSP2fAREglf7SeTzjIp_-cTMamJv93V6ThxLLdakTxYUfCC7lE0keKvfdARUTLXsD9ecqkn5IlG5ISid_6SvuczEnjL6QAzndL8mJW7jIqVUe2oTYL1PcxAsQC0JD3VaLZufhogl62NbDECKSJcrDumNoKMkUv1uyANtKH8omzlItF0aU6OUvsB3lh7SJIaQ6B9_LVl107CZOCcP2K37GdmvQbZuXZvfSMvsD87JrWE28xLZ50Tas3sC6sCyj1xlYxqqJXzjXXo1219BNsGP0TVOjrb5VuPtTGIuQq79773TT) +[![gtfs_funnel_mermaid1](https://mermaid.live/edit#pako:eNqlVX9v2jAQ_SqWJwSTgBESCk23SqUQ_to0qWiTllSRSS7EarAj22lLK777nKRAKL86DSmJ8b1393w--15xwEPANm61Wh5TVCVgo8nUuUNOxhgkqPFTQCp4AFJSNv_ssQJYq716DCHKqLJRMUSormJYQN1G9RmRUG9WZ38RQcksAVnfwLUpFXRBxPKWJ1zkvE_jntNzbtbULWIKz2qL6nQ6-5AhFyGIY6CEMjhmkxBwFu7qcJz-eFjBKBCK7kCiKKqX5lX-0a9VreYxj80FSWM0HZbG8i2zWTkd8ieWcKKVrl0jFCREyhFEKIxQRJPEfkvDAYQMBE3VGlWIzEOucTdGw11H8JWGyna6vP9s2_YbsdW6Rlt4QXEL3L3G6PD79hiSFIRs00XKhfJlEEOYJfDm_moLHlZjy5ikcD740C2BX2fi-vfkbmD9u4zSQUXHrVGRoXiZglMibt0C9j8acn5FwmgnFdroK7r4QDpG7hZ8VMWo654UUtJ3OU9UxX5IBQSKcoa-oanI4ItDEgn3Fd3jqu5HiGmQgJ9ySXPWefnjbsPVJykgCph-znrYZef-xu5j-m4frtb1DSxcDzenabteP63eUpWteKdzZDYqSfZ3E3NyhaPuSSmP6TEJ43JpOrDGPACkfibzq_B8Pq2C8jF5-c_J4aXzo-Uz7ulNSoCwLN31V6kDp-A5RhE-vxuBSRDnw3er-DDfSP2fAREglf7SeTzjIp_-cTMamJv93V6ThxLLdakTxYUfCC7lE0keKvfdARUTLXsD9ecqkn5IlG5ISid_6SvuczEnjL6QAzndL8mJW7jIqVUe2oTYL1PcxAsQC0JD3VaLZufhogl62NbDECKSJcrDumNoKMkUv1uyANtKH8omzlItF0aU6OUvsB3lh7SJIaQ6B9_LVl107CZOCcP2K37GdmvQbZuXZvfSMvsD87JrWE28xLZ50Tas3sC6sCyj1xlYxqqJXzjXXo1219BNsGP0TVOjrb5VuPtTGIuQq79773TT) [![speeds_stop_times_mermaid](https://mermaid.ink/img/pako:eNqlVV1v2jAU_SuWJwSTgPERCqTapEJh07buYUV7WEDITW7AmmNHttOWVvz3OU5YkkK7Lx7A-J577_Hxsf2IfREAdnGr1VpyTTUDF31doGvYRMA1uo4BAoUID-ysFjFa0AjUktuEWu1xyRGinGoX2SFCdb2FCOouqt8QBfVmefYbkZTcMFD1X3ATiiWNiNxNBRMyzXs1G8wH84tDaoFYwL0uUJ1O5xgyETIA-RyIUQ7PxRT4ggdVHvP5cDYpYTRITSuQMAzrWXif_pivfa225EseMnHnb4nUaDHJACq52UgSb5HKxKUPcCiMkM-IUpcQoiBEIWXMzUU4gVC-pLE-oCzFU6isyWIXQw6dT-f98bSJlJbiB7gp9ZToIfOi2_D8RK-V2eR1nq3a8W712nXdvGerhYqE9EMYQ9qEkM3SqTUM6N0T1IVXKboyBYPwvMBMTGsFDPy_6C44ZJ1jkEhtSQx1VWJRAJXIpmNCpXGyBGT2WVGlU38XMOJLoZQtaZdQYufZ8lVqq_OydtkIeHCYLPbaHqB1TGNIzVdOmjY8DoaP0utbw1rYBkdLfqrlZcOj3PgwFozonBSRkt4S9tvcWcMr45VtmhKsZp6jA82TK0ovgVyGfHlFl3kjj6y1Md7bimS2QcmVx_ymJU_MbPx9wyO3IMkGDmUywsfWKG3YB08KxhLTmfINg4Ds3uQTUcI0Nf9L7sj2R4rEiBlQ-YJJM0Ivi2MuyVMe_FiVReY-t6h_keVTSZYkSu-9P5Dl83_IsipRuPIK8us7qrcpwJxdKvjKsps9WcMXT_lbCBIG68rKizOUS4mbOAIZERqYF8m-D0ts340lds0wgJAYpktsLlkDJYkW1zvuY1fLBJo4iQNzIi4pMVsRYTc0BjezEFAt5FX2ytnHroljwrH7iO-x2xr12v1xvzd2-sNRf9zrOk28w27_rN11BiPnzHG6g87I6e6b-EEIU7Xb7nXNu9HpDvt9g3aGji333QZty_1PICdHvQ?type=png)](https://mermaid.live/edit#pako:eNqlVV1v2jAU_SuWJwSTgPERCqTapEJh07buYUV7WEDITW7AmmNHttOWVvz3OU5YkkK7Lx7A-J577_Hxsf2IfREAdnGr1VpyTTUDF31doGvYRMA1uo4BAoUID-ysFjFa0AjUktuEWu1xyRGinGoX2SFCdb2FCOouqt8QBfVmefYbkZTcMFD1X3ATiiWNiNxNBRMyzXs1G8wH84tDaoFYwL0uUJ1O5xgyETIA-RyIUQ7PxRT4ggdVHvP5cDYpYTRITSuQMAzrWXif_pivfa225EseMnHnb4nUaDHJACq52UgSb5HKxKUPcCiMkM-IUpcQoiBEIWXMzUU4gVC-pLE-oCzFU6isyWIXQw6dT-f98bSJlJbiB7gp9ZToIfOi2_D8RK-V2eR1nq3a8W712nXdvGerhYqE9EMYQ9qEkM3SqTUM6N0T1IVXKboyBYPwvMBMTGsFDPy_6C44ZJ1jkEhtSQx1VWJRAJXIpmNCpXGyBGT2WVGlU38XMOJLoZQtaZdQYufZ8lVqq_OydtkIeHCYLPbaHqB1TGNIzVdOmjY8DoaP0utbw1rYBkdLfqrlZcOj3PgwFozonBSRkt4S9tvcWcMr45VtmhKsZp6jA82TK0ovgVyGfHlFl3kjj6y1Md7bimS2QcmVx_ymJU_MbPx9wyO3IMkGDmUywsfWKG3YB08KxhLTmfINg4Ds3uQTUcI0Nf9L7sj2R4rEiBlQ-YJJM0Ivi2MuyVMe_FiVReY-t6h_keVTSZYkSu-9P5Dl83_IsipRuPIK8us7qrcpwJxdKvjKsps9WcMXT_lbCBIG68rKizOUS4mbOAIZERqYF8m-D0ts340lds0wgJAYpktsLlkDJYkW1zvuY1fLBJo4iQNzIi4pMVsRYTc0BjezEFAt5FX2ytnHroljwrH7iO-x2xr12v1xvzd2-sNRf9zrOk28w27_rN11BiPnzHG6g87I6e6b-EEIU7Xb7nXNu9HpDvt9g3aGji333QZty_1PICdHvQ) From df23a9111ef6b27e5a54e9a5f6dfb19e607904cd Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Fri, 27 Dec 2024 22:09:31 +0000 Subject: [PATCH 10/11] update mermaid link in readme --- gtfs_funnel/README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/gtfs_funnel/README.md b/gtfs_funnel/README.md index c4076e716..9ef241385 100644 --- a/gtfs_funnel/README.md +++ b/gtfs_funnel/README.md @@ -11,7 +11,9 @@ Use `update_vars` and input one or several days to download. 1. Harmonize how route names are displayed in time-series: `make timeseries_preprocessing` 1. Download monthly aggregated schedule data: `make monthly_scheduled_data` -[![gtfs_funnel_mermaid1](https://mermaid.live/edit#pako:eNqlVX9v2jAQ_SqWJwSTgBESCk23SqUQ_to0qWiTllSRSS7EarAj22lLK777nKRAKL86DSmJ8b1393w--15xwEPANm61Wh5TVCVgo8nUuUNOxhgkqPFTQCp4AFJSNv_ssQJYq716DCHKqLJRMUSormJYQN1G9RmRUG9WZ38RQcksAVnfwLUpFXRBxPKWJ1zkvE_jntNzbtbULWIKz2qL6nQ6-5AhFyGIY6CEMjhmkxBwFu7qcJz-eFjBKBCK7kCiKKqX5lX-0a9VreYxj80FSWM0HZbG8i2zWTkd8ieWcKKVrl0jFCREyhFEKIxQRJPEfkvDAYQMBE3VGlWIzEOucTdGw11H8JWGyna6vP9s2_YbsdW6Rlt4QXEL3L3G6PD79hiSFIRs00XKhfJlEEOYJfDm_moLHlZjy5ikcD740C2BX2fi-vfkbmD9u4zSQUXHrVGRoXiZglMibt0C9j8acn5FwmgnFdroK7r4QDpG7hZ8VMWo654UUtJ3OU9UxX5IBQSKcoa-oanI4ItDEgn3Fd3jqu5HiGmQgJ9ySXPWefnjbsPVJykgCph-znrYZef-xu5j-m4frtb1DSxcDzenabteP63eUpWteKdzZDYqSfZ3E3NyhaPuSSmP6TEJ43JpOrDGPACkfibzq_B8Pq2C8jF5-c_J4aXzo-Uz7ulNSoCwLN31V6kDp-A5RhE-vxuBSRDnw3er-DDfSP2fAREglf7SeTzjIp_-cTMamJv93V6ThxLLdakTxYUfCC7lE0keKvfdARUTLXsD9ecqkn5IlG5ISid_6SvuczEnjL6QAzndL8mJW7jIqVUe2oTYL1PcxAsQC0JD3VaLZufhogl62NbDECKSJcrDumNoKMkUv1uyANtKH8omzlItF0aU6OUvsB3lh7SJIaQ6B9_LVl107CZOCcP2K37GdmvQbZuXZvfSMvsD87JrWE28xLZ50Tas3sC6sCyj1xlYxqqJXzjXXo1219BNsGP0TVOjrb5VuPtTGIuQq79773TT) + +[![gtfs_funnel_mermaid1](https://mermaid.ink/img/pako:eNqllW9v2jAQxr-K5QlBJWCEhJamW6VSSF9tmlS0SQtVZJILsWrsyHZKadXvPicpJLQFNu0FSYh_z92T8597xqGIALu40-nMuKaagYtupt4t8jLOgaHWDwmpFCEoRfniZMYLsNF4nnGEKKfaRcUjQk2dwBKaLmrOiYJmu_72J5GUzBmo5hY3Q6mkSyLX14IJmes-TQbewLvaSCtiCo-6onq93ntkJGQEch_EKId9YwpCwaNdH553NhnVGA1S0x0kjuNmOfyS38zlpdGY8RlfSJImaDoqB8uryubl60isOBPEON2ERihkRKkxxCiKUUwZc1_L8AGhQklTvaEKk3nKDXdltfxNhkAbVHXT9d2J67qvwk7nElV4IfEL7s4wJv378QRYClJ16TIVUgcqTCDKGLyGv6jgUT23SkgKx5OP_BL8MpeXv25uh86_2ygD1HxcWzUbWpQlOGTi2i-w__GQ62sWxjulMIOBpsu_KMfYr-C9LsZ9_6CRUr6rWVGdBBGVEGoqOPqKpjKDzx5hCu5qvid13w-Q0JBBkApFc9Vx-5N-yzc7KSQauPkdjbCrzuNN_If0zTxcbNY38GjzuN1N1fcGaf2Uqk3FG59ju1UrcrBbmINfOO4ftPKQ7rMwKT_NJDbMPUAaZCo_Co_X0ykk0QoYK_weVXg5X0YvZXtX0WRg5ooB4Vm6G7W2HLxC51mFi_yIBK5AHjfRz3kORILS5k4XyVzIfFq_X42H9nZaq9Pxo3oKs8KJFjIIpVBqRdh97Zj7IOuNsblFg4WOVRARbfqQNjVfB1oEQi4Ip0_kg5l-vxJv_CJELq3r0DbF-9WJ23gJckloZLpp0eNmuOh9M-yaxwhikjE9w6ZRGJRkWtyueYhdbfZiG0uRLRLsxvmebOMsNeZhTIkpxnL7NiX8txDVf4ioqdC3sn8XbbxgsPuMH7HbGfa79rndP3fss6F93recNl5j1z7tWs5g6Jw6jjXoDR3rpY2fiqhWt2-Zztizzmzb0M6Z8_IHVGx2Qw?type=png)](https://mermaid.live/edit#pako:eNqllW9v2jAQxr-K5QlBJWCEhJamW6VSSF9tmlS0SQtVZJILsWrsyHZKadXvPicpJLQFNu0FSYh_z92T8597xqGIALu40-nMuKaagYtupt4t8jLOgaHWDwmpFCEoRfniZMYLsNF4nnGEKKfaRcUjQk2dwBKaLmrOiYJmu_72J5GUzBmo5hY3Q6mkSyLX14IJmes-TQbewLvaSCtiCo-6onq93ntkJGQEch_EKId9YwpCwaNdH553NhnVGA1S0x0kjuNmOfyS38zlpdGY8RlfSJImaDoqB8uryubl60isOBPEON2ERihkRKkxxCiKUUwZc1_L8AGhQklTvaEKk3nKDXdltfxNhkAbVHXT9d2J67qvwk7nElV4IfEL7s4wJv378QRYClJ16TIVUgcqTCDKGLyGv6jgUT23SkgKx5OP_BL8MpeXv25uh86_2ygD1HxcWzUbWpQlOGTi2i-w__GQ62sWxjulMIOBpsu_KMfYr-C9LsZ9_6CRUr6rWVGdBBGVEGoqOPqKpjKDzx5hCu5qvid13w-Q0JBBkApFc9Vx-5N-yzc7KSQauPkdjbCrzuNN_If0zTxcbNY38GjzuN1N1fcGaf2Uqk3FG59ju1UrcrBbmINfOO4ftPKQ7rMwKT_NJDbMPUAaZCo_Co_X0ykk0QoYK_weVXg5X0YvZXtX0WRg5ooB4Vm6G7W2HLxC51mFi_yIBK5AHjfRz3kORILS5k4XyVzIfFq_X42H9nZaq9Pxo3oKs8KJFjIIpVBqRdh97Zj7IOuNsblFg4WOVRARbfqQNjVfB1oEQi4Ip0_kg5l-vxJv_CJELq3r0DbF-9WJ23gJckloZLpp0eNmuOh9M-yaxwhikjE9w6ZRGJRkWtyueYhdbfZiG0uRLRLsxvmebOMsNeZhTIkpxnL7NiX8txDVf4ioqdC3sn8XbbxgsPuMH7HbGfa79rndP3fss6F93recNl5j1z7tWs5g6Jw6jjXoDR3rpY2fiqhWt2-Zztizzmzb0M6Z8_IHVGx2Qw) + [![speeds_stop_times_mermaid](https://mermaid.ink/img/pako:eNqlVV1v2jAU_SuWJwSTgPERCqTapEJh07buYUV7WEDITW7AmmNHttOWVvz3OU5YkkK7Lx7A-J577_Hxsf2IfREAdnGr1VpyTTUDF31doGvYRMA1uo4BAoUID-ysFjFa0AjUktuEWu1xyRGinGoX2SFCdb2FCOouqt8QBfVmefYbkZTcMFD1X3ATiiWNiNxNBRMyzXs1G8wH84tDaoFYwL0uUJ1O5xgyETIA-RyIUQ7PxRT4ggdVHvP5cDYpYTRITSuQMAzrWXif_pivfa225EseMnHnb4nUaDHJACq52UgSb5HKxKUPcCiMkM-IUpcQoiBEIWXMzUU4gVC-pLE-oCzFU6isyWIXQw6dT-f98bSJlJbiB7gp9ZToIfOi2_D8RK-V2eR1nq3a8W712nXdvGerhYqE9EMYQ9qEkM3SqTUM6N0T1IVXKboyBYPwvMBMTGsFDPy_6C44ZJ1jkEhtSQx1VWJRAJXIpmNCpXGyBGT2WVGlU38XMOJLoZQtaZdQYufZ8lVqq_OydtkIeHCYLPbaHqB1TGNIzVdOmjY8DoaP0utbw1rYBkdLfqrlZcOj3PgwFozonBSRkt4S9tvcWcMr45VtmhKsZp6jA82TK0ovgVyGfHlFl3kjj6y1Md7bimS2QcmVx_ymJU_MbPx9wyO3IMkGDmUywsfWKG3YB08KxhLTmfINg4Ds3uQTUcI0Nf9L7sj2R4rEiBlQ-YJJM0Ivi2MuyVMe_FiVReY-t6h_keVTSZYkSu-9P5Dl83_IsipRuPIK8us7qrcpwJxdKvjKsps9WcMXT_lbCBIG68rKizOUS4mbOAIZERqYF8m-D0ts340lds0wgJAYpktsLlkDJYkW1zvuY1fLBJo4iQNzIi4pMVsRYTc0BjezEFAt5FX2ytnHroljwrH7iO-x2xr12v1xvzd2-sNRf9zrOk28w27_rN11BiPnzHG6g87I6e6b-EEIU7Xb7nXNu9HpDvt9g3aGji333QZty_1PICdHvQ?type=png)](https://mermaid.live/edit#pako:eNqlVV1v2jAU_SuWJwSTgPERCqTapEJh07buYUV7WEDITW7AmmNHttOWVvz3OU5YkkK7Lx7A-J577_Hxsf2IfREAdnGr1VpyTTUDF31doGvYRMA1uo4BAoUID-ysFjFa0AjUktuEWu1xyRGinGoX2SFCdb2FCOouqt8QBfVmefYbkZTcMFD1X3ATiiWNiNxNBRMyzXs1G8wH84tDaoFYwL0uUJ1O5xgyETIA-RyIUQ7PxRT4ggdVHvP5cDYpYTRITSuQMAzrWXif_pivfa225EseMnHnb4nUaDHJACq52UgSb5HKxKUPcCiMkM-IUpcQoiBEIWXMzUU4gVC-pLE-oCzFU6isyWIXQw6dT-f98bSJlJbiB7gp9ZToIfOi2_D8RK-V2eR1nq3a8W712nXdvGerhYqE9EMYQ9qEkM3SqTUM6N0T1IVXKboyBYPwvMBMTGsFDPy_6C44ZJ1jkEhtSQx1VWJRAJXIpmNCpXGyBGT2WVGlU38XMOJLoZQtaZdQYufZ8lVqq_OydtkIeHCYLPbaHqB1TGNIzVdOmjY8DoaP0utbw1rYBkdLfqrlZcOj3PgwFozonBSRkt4S9tvcWcMr45VtmhKsZp6jA82TK0ovgVyGfHlFl3kjj6y1Md7bimS2QcmVx_ymJU_MbPx9wyO3IMkGDmUywsfWKG3YB08KxhLTmfINg4Ds3uQTUcI0Nf9L7sj2R4rEiBlQ-YJJM0Ivi2MuyVMe_FiVReY-t6h_keVTSZYkSu-9P5Dl83_IsipRuPIK8us7qrcpwJxdKvjKsps9WcMXT_lbCBIG68rKizOUS4mbOAIZERqYF8m-D0ts340lds0wgJAYpktsLlkDJYkW1zvuY1fLBJo4iQNzIi4pMVsRYTc0BjezEFAt5FX2ytnHroljwrH7iO-x2xr12v1xvzd2-sNRf9zrOk28w27_rN11BiPnzHG6g87I6e6b-EEIU7Xb7nXNu9HpDvt9g3aGji333QZty_1PICdHvQ) From a5baf6c211e7d5e55497f3cca02dcde431bf1b03 Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Fri, 27 Dec 2024 22:13:25 +0000 Subject: [PATCH 11/11] add oct2024 to readme table --- gtfs_funnel/README.md | 2 +- gtfs_funnel/mermaid.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/gtfs_funnel/README.md b/gtfs_funnel/README.md index 9ef241385..10121d69e 100644 --- a/gtfs_funnel/README.md +++ b/gtfs_funnel/README.md @@ -26,5 +26,5 @@ Use `update_vars` and input one or several days to download. | | | pipeline and workstream outputs available | |---|---|---| | Sampled Wednesdays Each Month for Time-Series
[rt_dates.py](../_shared_utils/shared_utils/rt_dates.py) | Mar 2023 - present | downloaded schedule tables (trips, shapes, stops, stop_times)
downloaded vehicle positions (vp)

`gtfs_funnel`: intermediate outputs for schedule and vp
* crosswalk
* schedule only metrics related to service availability
* operator aggregated metrics from schedule data
* route typologies


`rt_segment_speeds`: vp interpreted as speeds against
various segment types
* segment types:
(1) `stop segments` (shape-stop segments,
most common shape selected for a route-direction and all
trips aggregated to that shape)
(2) `rt_stop_times` (trip-stop segments, most granular,
cannot be aggregated, but used for rt_stop_times table)
(3) `speedmap segments`
(4) `road segments` (1 km road segments with all
transit across operators aggregated to the same physical
road space, currently WIP)
* interpolated stop arrivals
* speeds by trip
* segment and summary speeds for single day

`rt_vs_schedule`:
* RT vs schedule metrics
* rt_stop_times table (companion to scheduled stop_times)

`gtfs_digest`:
* downstream data product using all the outputs created in
gtfs_funnel, rt_segment_speeds, rt_vs_schedule. | -| Full Week for Weekly Averages
April / October each year | Apr 2023
Oct 2023
Apr 2024 | rt_segment_speeds:
* segment and summary speeds for a week

gtfs_digest
* service hours by hour for weekday / Saturday / Sunday | +| Full Week for Weekly Averages
April / October each year | Apr 2023
Oct 2023
Apr 2024
Oct 2024 | rt_segment_speeds:
* segment and summary speeds for a week

gtfs_digest
* service hours by hour for weekday / Saturday / Sunday | | | | | \ No newline at end of file diff --git a/gtfs_funnel/mermaid.md b/gtfs_funnel/mermaid.md index 1f002d51f..0cfd40ce0 100644 --- a/gtfs_funnel/mermaid.md +++ b/gtfs_funnel/mermaid.md @@ -126,7 +126,7 @@ flowchart TB end subgraph RT stop_times - J(segment_type=rt_stop_times):::segmentType --> + J(segment_type=rt_stop_times
speedmap_segments):::segmentType --> C; E --> K([average_summary_speeds.py]):::script --> L[rollup_singleday/rollup_multiday