Skip to content

Commit

Permalink
Merge pull request #1325 from cal-itp/refactor--nn-interpolation
Browse files Browse the repository at this point in the history
Refactor nearest neighbor and interpolation steps
  • Loading branch information
tiffanychu90 authored Dec 18, 2024
2 parents 6aa1172 + 406035a commit 61e0bd9
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 212 deletions.
5 changes: 1 addition & 4 deletions _shared_utils/shared_utils/gtfs_analytics_data.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,14 @@ stop_segments:
dir: ${gcs_paths.SEGMENT_GCS}
stage1: ${speeds_tables.vp_dwell}
stage2: "nearest/nearest_vp_shape_segments"
stage2b: "nearest/nearest2_vp_shape_segments"
stage3: "stop_arrivals"
stage4: "speeds_stop_segments"
trip_stop_cols: ["trip_instance_key", "stop_sequence"]
shape_stop_cols: ["shape_array_key", "shape_id", "stop_sequence"]
stop_pair_cols: ["stop_pair", "stop_pair_name"]
route_dir_cols: ["route_id", "direction_id"]
segment_cols: ["route_id", "direction_id", "stop_pair", "geometry"]
shape_stop_single_segment: "rollup_singleday/speeds_shape_stop_segments" #-- stop after Oct 2024
#shape_stop_single_segment: "rollup_singleday/speeds_shape_stop_segments" #-- stop after Oct 2024
route_dir_single_segment: "rollup_singleday/speeds_route_dir_segments"
route_dir_single_segment_detail: "rollup_singleday/speeds_route_dir_segments_detail" # interim for speedmaps
route_dir_multi_segment: "rollup_multiday/speeds_route_dir_segments"
Expand All @@ -94,7 +93,6 @@ rt_stop_times:
dir: ${gcs_paths.SEGMENT_GCS}
stage1: ${speeds_tables.vp_dwell}
stage2: "nearest/nearest_vp_rt_stop_times"
stage2b: "nearest/nearest2_vp_rt_stop_times"
stage3: "rt_stop_times/stop_arrivals"
stage4: "rt_stop_times/speeds"
trip_stop_cols: ["trip_instance_key", "stop_sequence"]
Expand All @@ -115,7 +113,6 @@ speedmap_segments:
stage1: ${speeds_tables.vp_dwell}
proxy_stop_times: "stop_time_expansion/speedmap_stop_times"
stage2: "nearest/nearest_vp_speedmap_proxy"
stage2b: "nearest/nearest2_vp_speedmap_proxy"
stage3: "speedmap/stop_arrivals_proxy"
stage3b: "speedmap/stop_arrivals"
stage4: "speedmap/speeds"
Expand Down
2 changes: 2 additions & 0 deletions open_data/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
1. [HQTA Stops](https://gis.data.ca.gov/datasets/f6c30480f0e84be699383192c099a6a4_0): metadata [feature server](https://gisdata.dot.ca.gov/arcgis/rest/services/CHrailroad/CA_HQ_Transit_Stops/FeatureServer) or [map server](https://gisdata.dot.ca.gov/arcgis/rest/services/CHrailroad/CA_HQ_Transit_Stops/MapServer)
1. [CA Transit Routes](https://gis.data.ca.gov/datasets/dd7cb74665a14859a59b8c31d3bc5a3e_0): metadata [feature server](https://gisdata.dot.ca.gov/arcgis/rest/services/CHrailroad/CA_Transit_Routes/FeatureServer) or [map server](https://gisdata.dot.ca.gov/arcgis/rest/services/CHrailroad/CA_Transit_Routes/MapServer)
1. [CA Transit Stops](https://gis.data.ca.gov/datasets/900992cc94ab49dbbb906d8f147c2a72_0): metadata [feature server](https://gisdata.dot.ca.gov/arcgis/rest/services/CHrailroad/CA_Transit_Stops/FeatureServer) or [map server](https://gisdata.dot.ca.gov/arcgis/rest/services/CHrailroad/CA_Transit_Stops/MapServer)
1. [CA Average Transit Speeds by Stop-to-Stop Segments](https://gis.data.ca.gov/datasets/4937eeb59fdb4e56ae75e64688c7f2c0_0/): metadata [feature server](https://caltrans-gis.dot.ca.gov/arcgis/rest/services/CHrailroad/Speeds_by_Stop_Segments/FeatureServer/0) or [map server](https://caltrans-gis.dot.ca.gov/arcgis/rest/services/CHrailroad/Speeds_by_Stop_Segments/MapServer/0)
1. [CA Average Transit Speeds by Route and Time of Day](https://gis.data.ca.gov/datasets/071df783099f4224b7ebb54839eae007_0/): metadata [feature server](https://caltrans-gis.dot.ca.gov/arcgis/rest/services/CHrailroad/Speeds_by_Route_Time_of_Day/FeatureServer/0) or [map server](https://caltrans-gis.dot.ca.gov/arcgis/rest/services/CHrailroad/Speeds_by_Route_Time_of_Day/MapServer/0)
1. All GTFS datasets [metadata/data dictionary](https://data.ca.gov/dataset/cal-itp-gtfs-ingest-pipeline-dataset/resource/e26bf6ee-419d-4a95-8e4c-e2b13d5de793)

## GTFS Schedule Routes & Stops Geospatial Data
Expand Down
6 changes: 6 additions & 0 deletions rt_segment_speeds/scripts/cut_stop_segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ def stop_times_with_shape(
subset="geometry"
).reset_index(drop=True).set_geometry("geometry")

# Add a dummy arrival_time that is needed in gtfs_segments that is not NaT
# or else it'll throw error in gtfs_segments.create_segments. Use zero instead.
df = df.assign(
arrival_time = 0
)

return df


Expand Down
102 changes: 30 additions & 72 deletions rt_segment_speeds/scripts/interpolate_stop_arrival.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
"""
Interpolate stop arrival
based on where the nearest 2 vp are
when stop position is between the 2 vp.
Stop and vp geometries should be projected along the shape geometry,
Use `stop_meters`, `prior_vp_meters`, `subseq_vp_meters`.
"""
import datetime
import geopandas as gpd
Expand Down Expand Up @@ -33,62 +39,6 @@ def get_vp_timestamps(
return vp


def consolidate_surrounding_vp(
df: pd.DataFrame,
group_cols: list,
) -> pd.DataFrame:
"""
This reshapes the df to wide so that each stop position has
a prior and subseq timestamp (now called vp_timestamp_local).
"""
df = df.assign(
obs = (df.sort_values(group_cols + ["vp_idx"])
.groupby(group_cols,
observed=True, group_keys=False, dropna=False)
.cumcount()
)
)

group_cols2 = group_cols + ["stop_meters"]
prefix_cols = ["vp_idx", "shape_meters"]
timestamp_cols = ["location_timestamp_local", "moving_timestamp_local"]
# since shape_meters actually might be decreasing as time progresses,
# (bus moving back towards origin of shape)
# we don't actually know that the smaller shape_meters is the first timestamp
# nor the larger shape_meters is the second timestamp.
# all we know is that stop_meters (stop) falls between these 2 shape_meters.
# sort by timestamp, and set the order to be 0, 1
vp_before_stop = df.loc[df.obs==0][group_cols2 + prefix_cols + timestamp_cols]
vp_after_stop = df.loc[df.obs==1][group_cols2 + prefix_cols + timestamp_cols]

# For the vp before the stop occurs, we want the maximum timestamp
# of the last position
# We want to keep the moving_timestamp (which is after it's dwelled)
vp_before_stop = vp_before_stop.assign(
prior_vp_timestamp_local = vp_before_stop.moving_timestamp_local,
).rename(
columns = {**{i: f"prior_{i}" for i in prefix_cols}}
).drop(columns = timestamp_cols)

# For the vp after the stop occurs, we want the minimum timestamp
# of that next position
# Keep location_timetamp (before it dwells)
vp_after_stop = vp_after_stop.assign(
subseq_vp_timestamp_local = vp_after_stop.location_timestamp_local,
).rename(
columns = {**{i: f"subseq_{i}" for i in prefix_cols}}
).drop(columns = timestamp_cols)

df_wide = pd.merge(
vp_before_stop,
vp_after_stop,
on = group_cols2,
how = "inner"
)

return df_wide


def add_arrival_time(
nearest_vp_input_file: str,
vp_timestamp_file: str,
Expand All @@ -106,7 +56,11 @@ def add_arrival_time(
f"{SEGMENT_GCS}{nearest_vp_input_file}_{analysis_date}.parquet"
)

subset_vp = vp_filtered.vp_idx.unique()
subset_vp = np.unique(
np.concatenate(
(vp_filtered.prior_vp_idx.unique(),
vp_filtered.subseq_vp_idx.unique())
)).tolist()

vp_timestamps = get_vp_timestamps(
vp_timestamp_file,
Expand All @@ -116,10 +70,14 @@ def add_arrival_time(

df = pd.merge(
vp_filtered,
vp_timestamps,
on = "vp_idx",
vp_timestamps.add_prefix("prior_"),
on = "prior_vp_idx",
how = "inner"
).pipe(consolidate_surrounding_vp, group_cols)
).merge(
vp_timestamps.add_prefix("subseq_"),
on = "subseq_vp_idx",
how = "inner"
)

arrival_time_series = []

Expand All @@ -128,16 +86,15 @@ def add_arrival_time(
stop_position = getattr(row, "stop_meters")

projected_points = np.asarray([
getattr(row, "prior_shape_meters"),
getattr(row, "subseq_shape_meters")
getattr(row, "prior_vp_meters"),
getattr(row, "subseq_vp_meters")
])

timestamp_arr = np.asarray([
getattr(row, "prior_vp_timestamp_local"),
getattr(row, "subseq_vp_timestamp_local"),
getattr(row, "prior_moving_timestamp_local"),
getattr(row, "subseq_location_timestamp_local"),
])


interpolated_arrival = segment_calcs.interpolate_stop_arrival_time(
stop_position, projected_points, timestamp_arr)

Expand Down Expand Up @@ -215,12 +172,13 @@ def enforce_monotonicity_and_interpolate_across_stops(
)

# Subset to trips that have at least 1 obs that violates monotonicity
trips_with_one_false = (df.groupby("trip_instance_key")
.agg({"arrival_time_sec_monotonic": "min"})
.reset_index()
.query('arrival_time_sec_monotonic==0')
.trip_instance_key
)
trips_with_one_false = (
df.groupby("trip_instance_key")
.agg({"arrival_time_sec_monotonic": "min"})
.reset_index()
.query('arrival_time_sec_monotonic==0')
.trip_instance_key
)

# Set arrival times to NaT if it's not monotonically increasing
mask = df.arrival_time_sec_monotonic == False
Expand Down Expand Up @@ -254,7 +212,7 @@ def interpolate_stop_arrivals(
dict_inputs = config_path[segment_type]
trip_stop_cols = [*dict_inputs["trip_stop_cols"]]
USABLE_VP_FILE = dict_inputs["stage1"]
INPUT_FILE = dict_inputs["stage2b"]
INPUT_FILE = dict_inputs["stage2"]
STOP_ARRIVALS_FILE = dict_inputs["stage3"]

start = datetime.datetime.now()
Expand Down
50 changes: 29 additions & 21 deletions rt_segment_speeds/scripts/nearest_vp_to_stop.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from typing import Literal, Optional

from calitp_data_analysis.geography_utils import WGS84
from calitp_data_analysis import utils
from segment_speed_utils import helpers, neighbor
from update_vars import SEGMENT_GCS, GTFS_DATA_DICT
from segment_speed_utils.project_vars import SEGMENT_TYPES
Expand Down Expand Up @@ -155,31 +154,40 @@ def nearest_neighbor_for_stop(
else:
print(f"{segment_type} is not valid")

gdf = neighbor.merge_stop_vp_for_nearest_neighbor(
stop_times, analysis_date)

results = neighbor.add_nearest_neighbor_result_array(gdf, analysis_date)

# Keep columns from results that are consistent across segment types
# use trip_stop_cols as a way to uniquely key into a row
keep_cols = trip_stop_cols + [
"shape_array_key",
"stop_geometry",
"nearest_vp_arr"
]

utils.geoparquet_gcs_export(
results[keep_cols],
SEGMENT_GCS,
EXPORT_FILE,
gdf = neighbor.merge_stop_vp_for_nearest_neighbor(stop_times, analysis_date)

vp_before, vp_after, vp_before_meters, vp_after_meters = np.vectorize(
neighbor.subset_arrays_to_valid_directions
)(
gdf.vp_primary_direction,
gdf.vp_geometry,
gdf.vp_idx,
gdf.stop_geometry,
gdf.stop_primary_direction,
gdf.shape_geometry,
gdf.stop_meters
)

gdf2 = gdf.assign(
prior_vp_idx = vp_before,
subseq_vp_idx = vp_after,
prior_vp_meters = vp_before_meters,
subseq_vp_meters = vp_after_meters
)[trip_stop_cols + [
"shape_array_key", "stop_meters",
"prior_vp_idx", "subseq_vp_idx",
"prior_vp_meters", "subseq_vp_meters"]
]

del gdf, stop_times

gdf2.to_parquet(f"{SEGMENT_GCS}{EXPORT_FILE}.parquet")

end = datetime.datetime.now()
logger.info(f"nearest neighbor for {segment_type} "
f"{analysis_date}: {end - start}")

del gdf, stop_times, results

f"{analysis_date}: {end - start}")

return

'''
Expand Down
17 changes: 1 addition & 16 deletions rt_segment_speeds/scripts/pipeline_rt_stop_times.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from loguru import logger

from nearest_vp_to_stop import nearest_neighbor_for_stop
from vp_around_stops import filter_to_nearest_two_vp
from interpolate_stop_arrival import interpolate_stop_arrivals
from stop_arrivals_to_speed import calculate_speed_from_stop_arrivals
from update_vars import GTFS_DATA_DICT
Expand All @@ -36,21 +35,7 @@
]

[compute(i)[0] for i in delayed_dfs]

del delayed_dfs

delayed_dfs = [
delayed(filter_to_nearest_two_vp)(
analysis_date = analysis_date,
segment_type = segment_type,
config_path = GTFS_DATA_DICT
) for analysis_date in analysis_date_list
]

[compute(i)[0] for i in delayed_dfs]

del delayed_dfs


logger.remove()


Expand Down
14 changes: 0 additions & 14 deletions rt_segment_speeds/scripts/pipeline_segment_speeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from loguru import logger

from nearest_vp_to_stop import nearest_neighbor_for_stop
from vp_around_stops import filter_to_nearest_two_vp
from interpolate_stop_arrival import interpolate_stop_arrivals
from stop_arrivals_to_speed import calculate_speed_from_stop_arrivals
from update_vars import GTFS_DATA_DICT
Expand Down Expand Up @@ -40,19 +39,6 @@

del delayed_dfs


delayed_dfs = [
delayed(filter_to_nearest_two_vp)(
analysis_date = analysis_date,
segment_type = segment_type,
config_path = GTFS_DATA_DICT
) for analysis_date in analysis_date_list
]

[compute(i)[0] for i in delayed_dfs]

del delayed_dfs

logger.remove()

LOG_FILE = "../logs/interpolate_stop_arrival.log"
Expand Down
12 changes: 0 additions & 12 deletions rt_segment_speeds/scripts/pipeline_speedmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from update_vars import SEGMENT_GCS, GTFS_DATA_DICT

from nearest_vp_to_stop import nearest_neighbor_for_stop
from vp_around_stops import filter_to_nearest_two_vp
from interpolate_stop_arrival import interpolate_stop_arrivals
from stop_arrivals_to_speed import calculate_speed_from_stop_arrivals

Expand Down Expand Up @@ -88,17 +87,6 @@ def concatenate_speedmap_proxy_arrivals_with_remaining(
]

[compute(i)[0] for i in delayed_dfs]


delayed_dfs = [
delayed(filter_to_nearest_two_vp)(
analysis_date = analysis_date,
segment_type = segment_type,
config_path = GTFS_DATA_DICT
) for analysis_date in analysis_date_list
]

[compute(i)[0] for i in delayed_dfs]

logger.remove()

Expand Down
Loading

0 comments on commit 61e0bd9

Please sign in to comment.