diff --git a/bus_service_increase/service_increase_vars.py b/bus_service_increase/service_increase_vars.py new file mode 100644 index 000000000..bfc178a4c --- /dev/null +++ b/bus_service_increase/service_increase_vars.py @@ -0,0 +1,10 @@ +from bus_service_utils import utils as bus_utils +from shared_utils import rt_dates + +DATA_PATH = f"{bus_utils.GCS_FILE_PATH}2023_Oct/" + +dates = { + "wed": rt_dates.DATES["oct2023"], + "sat": rt_dates.DATES["oct2023a"], + "sun": rt_dates.DATES["oct2023b"], +} diff --git a/rt_segment_speeds/25_interpolation_issues.ipynb b/rt_segment_speeds/25_interpolation_issues.ipynb new file mode 100644 index 000000000..ac63da271 --- /dev/null +++ b/rt_segment_speeds/25_interpolation_issues.ipynb @@ -0,0 +1,746 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0a0bbc47-639d-47f7-a051-d4a2f19f7dbc", + "metadata": {}, + "source": [ + "# Interpolation method - break down steps to debug\n", + "* When finding nearest vp, there are stops that have later arrival times coming before earlier arrivals.\n", + " * stop 1 arrives after stop 2. this can't happen in real life.\n", + " * filter down to these stops and try again\n", + " * how do we distinguish between whether stop 1 or stop 2 is the \"correct\" one?\n", + "* Want to add direction to deal with loop or inlining shapes --> maybe this can be expanded to all shapes\n", + " * Do not even allow opposite direction vp to be selected as nearest vp" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ef830f30-e979-4916-857e-08abe9762888", + "metadata": {}, + "outputs": [], + "source": [ + "import dask.dataframe as dd\n", + "import numpy as np\n", + "import pandas as pd\n", + "\n", + "from segment_speed_utils import helpers, wrangle_shapes\n", + "from segment_speed_utils.project_vars import SEGMENT_GCS, PROJECT_CRS \n", + " \n", + "from shared_utils import rt_dates\n", + "\n", + "analysis_date = rt_dates.DATES[\"sep2023\"]\n", + "\n", + "STOP_SEG_DICT = helpers.get_parameters(\n", + " \"./scripts/config.yml\", \"stop_segments\")" + ] + }, + { + "cell_type": "markdown", + "id": "c5f369bb-68bf-46a2-86ad-6279872859b1", + "metadata": {}, + "source": [ + "## Between stops, how to find stops behaving not as expected\n", + "There are erroneous calculations here.\n", + "\n", + "Prior arrival time can't take place **after** arrival time. \n", + "\n", + "Ex: `trip_instance_key == \"0000412e74a0e91993ce66bcbc4e3e73\"`, stop 1's nearest position is after stop 2's nearest position.\n", + "\n", + "Handle these along with loop or inlining shapes." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "19614462-03ca-4a84-a119-3a38ca719b0d", + "metadata": {}, + "outputs": [], + "source": [ + "NEAREST_VP = f\"{STOP_SEG_DICT['stage2']}_error_{analysis_date}\"\n", + "STOP_ARRIVALS = f\"{STOP_SEG_DICT['stage3']}_{analysis_date}\"\n", + "\n", + "df = pd.read_parquet(\n", + " f\"{SEGMENT_GCS}projection/{NEAREST_VP}.parquet\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ccf433cf-69e7-476c-a64a-8c999a53858b", + "metadata": {}, + "outputs": [], + "source": [ + "stop_arrivals = pd.read_parquet(\n", + " f\"{SEGMENT_GCS}{STOP_ARRIVALS}.parquet\",\n", + " columns = [\"trip_instance_key\", \"stop_sequence\", \"arrival_time\"]\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c3077ade-87c1-4b9d-8cf7-bbb743a03823", + "metadata": {}, + "outputs": [], + "source": [ + "df.error_arrival_order.value_counts()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3d0373a4-80fc-49e1-bac3-2edd8c5ae4d0", + "metadata": {}, + "outputs": [], + "source": [ + "df.error_same_endpoints.value_counts()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "11d2a032-83db-43b7-a7fb-9254a10ae524", + "metadata": {}, + "outputs": [], + "source": [ + "df[(df.error_same_endpoints==1) & \n", + " (df.error_arrival_order==1)].shape" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2ea55941-8d10-49f2-8765-6b2faba4080d", + "metadata": {}, + "outputs": [], + "source": [ + "trip_stats = (df.groupby(\"trip_instance_key\", \n", + " observed=True, group_keys=False)\n", + " .agg({\n", + " \"error_same_endpoints\": \"mean\",\n", + " \"error_arrival_order\": \"mean\"\n", + " }).reset_index()\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "83fde1e4-29b1-43ab-b30c-f98ec63a87c8", + "metadata": {}, + "outputs": [], + "source": [ + "# Very few trips are completely error-free\n", + "trip_stats[(trip_stats.error_same_endpoints==0) & \n", + " (trip_stats.error_arrival_order==0)].shape" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1bd9d1b1-48f0-46e1-9875-181ea20df66a", + "metadata": {}, + "outputs": [], + "source": [ + "#trip_stats.sample(10).trip_instance_key.unique()\n", + "subset_trip_keys = [\n", + " '9fad69264acd8387150f45b27d4b2d09',\n", + " '44a55d2fa2588a479065ef7702475ef1',\n", + " '36070a2428e62b96368d072eb2a8fc1b',\n", + " '7f665900c6b0879f4b9bda43b93fefe3',\n", + " '8e8ba9993d52388539d06a46710c1dbc',\n", + " 'b301c2170c1ca49bbc1a9b600cccf643',\n", + " '9373f5b0de977a718dea50fd90443619',\n", + " '8415b3949147c9dc3d5ceb37863440b1',\n", + " '984f598419c1d0830ef4618d495c1bd7',\n", + " '815e4dd921cdcb61ad2dbb1ca5f08a39'\n", + "]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "212eaa5d-735c-4332-b25d-e1883ee48f15", + "metadata": {}, + "outputs": [], + "source": [ + "def check_if_surrounding_points_are_ok(df: pd.DataFrame):\n", + " grouped_df = df.groupby(\"trip_instance_key\", \n", + " observed=True, group_keys=False\n", + " )\n", + " df = df.assign(\n", + " prior_error = (grouped_df\n", + " .error_arrival_order\n", + " .shift(1)\n", + " ),\n", + " subseq_error = (grouped_df\n", + " .error_arrival_order\n", + " .shift(-1)\n", + " )\n", + " )\n", + " \n", + " df = df.assign(\n", + " can_be_fixed = df.apply(\n", + " lambda x:\n", + " 1 if (x.error_arrival_order==1) and\n", + " (x.prior_error==0) and (x.subseq_error==0)\n", + " else 0, axis=1\n", + " )\n", + " )\n", + "\n", + " return df\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5a219a60-4ea2-45b0-9fa9-3a11f326b8a0", + "metadata": {}, + "outputs": [], + "source": [ + "df2 = pd.merge(\n", + " df,\n", + " stop_arrivals,\n", + " on = [\"trip_instance_key\", \"stop_sequence\"],\n", + " how = \"inner\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c5b1f434-cb45-4425-aa8f-7a85c87d3e8d", + "metadata": {}, + "outputs": [], + "source": [ + "df3 = check_if_surrounding_points_are_ok(df2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4c47138d-9129-43ff-b73a-4c494f5be58a", + "metadata": {}, + "outputs": [], + "source": [ + "df3[df3.error_arrival_order==1].shape" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d913f05d-2df8-4a92-bc6c-3dd3d2e78a37", + "metadata": {}, + "outputs": [], + "source": [ + "df3[(df3.error_arrival_order==1) & \n", + " (df3.prior_error==0) & \n", + " (df3.subseq_error==0)\n", + " ].shape" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e979265a-780d-496e-b3b0-195cc5058d2b", + "metadata": {}, + "outputs": [], + "source": [ + "df3[df3.can_be_fixed==1].trip_instance_key.unique()[:5]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6dbe0625-8cdf-42b5-bada-f7fa6ad167e5", + "metadata": {}, + "outputs": [], + "source": [ + "df3[df3.trip_instance_key==\"00019686e6c7bf335148c8d290feb285\"]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4fe66a39-cb78-43c1-85bf-e002b3e077f0", + "metadata": {}, + "outputs": [], + "source": [ + "df3[df3.trip_instance_key==\"0001ad7e1ef246cf6d68599de0fdcaad\"\n", + " ].tail(10)" + ] + }, + { + "cell_type": "markdown", + "id": "9ba0285d-119d-4d37-9122-f96f280e0f53", + "metadata": {}, + "source": [ + "Nearly half of the arrival errors are surrounded by stops where arrival time is correct.\n", + "\n", + "Let's hit this first and then deal with endpoints." + ] + }, + { + "cell_type": "markdown", + "id": "f546357f-fdeb-4ece-bbd6-f3b9b5f53c30", + "metadata": {}, + "source": [ + "If almost every trip has errors, it matters where the arrival order error comes in. Must take the minimum stop sequence of a clear monotonic period and set that as the max to filter out vp_idx that occur afterwards." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f172f124-8221-4d65-9a06-f5cdb2e15120", + "metadata": {}, + "outputs": [], + "source": [ + "import altair as alt\n", + "\n", + "for t in subset_trip_keys:\n", + " \n", + " subset_df = df[df.trip_instance_key==t]\n", + " \n", + " chart = (alt.Chart(subset_df)\n", + " .mark_line()\n", + " .encode(\n", + " x=\"stop_sequence\",\n", + " y=\"error_arrival_order\"\n", + " ).properties(title=f\"{t}\")\n", + " )\n", + " display(chart)\n", + " \n", + " chart2 = (alt.Chart(subset_df[subset_df.error_arrival_order == 0])\n", + " .mark_line()\n", + " .encode(\n", + " x=\"stop_sequence\",\n", + " y=\"error_same_endpoints\"\n", + " )\n", + " )\n", + " display(chart2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bcdc9773-fed6-4b81-9ccb-c868dca01535", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "350bf1c7-5163-44ad-bc32-b8a1c9919481", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "91d897a3-b1c1-453c-82de-f4a0610c5e02", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6ae9e22b-255c-4114-aef4-3c6a6ff22225", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "24a28b40-0548-412e-b21e-de7c455e7708", + "metadata": {}, + "outputs": [], + "source": [ + "nearest_array = df.groupby(\"trip_instance_key\").agg(\n", + " {\"nearest_vp_idx\": lambda x: list(x)}).reset_index()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3543a028-58a2-42e8-9bdb-cda4d6e88f45", + "metadata": {}, + "outputs": [], + "source": [ + "nearest_array = nearest_array.assign(\n", + " array_diff = nearest_array.apply(\n", + " lambda x: \n", + " np.ediff1d(np.asarray(x.nearest_vp_idx)),\n", + " axis=1)\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "702ea4a1-92f9-4adf-940b-b43505692613", + "metadata": {}, + "outputs": [], + "source": [ + "nearest_array = nearest_array.assign(\n", + " wrong_times = nearest_array.apply(\n", + " lambda x: 1 if len(np.where(x.array_diff < 0)[0] > 0)\n", + " else 0, axis=1\n", + " )\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a14c62ce-3f27-4c09-bb67-91113ba0afd4", + "metadata": {}, + "outputs": [], + "source": [ + "nearest_array.wrong_times.value_counts()" + ] + }, + { + "cell_type": "markdown", + "id": "6324f8c7-4b80-4f36-801d-babc482dc1e3", + "metadata": {}, + "source": [ + "## Index into specific portions of array" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "90e7c452-9054-4f61-8c1f-c78526a5fd23", + "metadata": {}, + "outputs": [], + "source": [ + "one_trip = \"bf87a17838cdaff5ba78fb70edd4f1bb\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e4779281-aa0d-4f12-82d8-7f8654ae491d", + "metadata": {}, + "outputs": [], + "source": [ + "def rt_trips_to_shape(analysis_date: str) -> pd.DataFrame:\n", + " \"\"\"\n", + " Filter down trip_instance_keys from schedule to \n", + " trips present in vp.\n", + " Provide shape_array_key associated with trip_instance_key.\n", + " \"\"\"\n", + " # Get RT trips\n", + " rt_trips = pd.read_parquet(\n", + " f\"{SEGMENT_GCS}vp_usable_{analysis_date}\",\n", + " filters = [[(\"trip_instance_key\", \"==\", one_trip)]],\n", + " columns = [\"trip_instance_key\"]\n", + " ).drop_duplicates()\n", + "\n", + " # Find the shape_array_key for RT trips\n", + " trip_to_shape = helpers.import_scheduled_trips(\n", + " analysis_date,\n", + " columns = [\"trip_instance_key\", \"shape_array_key\"],\n", + " get_pandas = True\n", + " ).merge(\n", + " rt_trips,\n", + " on = \"trip_instance_key\",\n", + " how = \"inner\"\n", + " )\n", + "\n", + " # Find whether it's loop or inlining\n", + " shapes_loop_inlining = pd.read_parquet(\n", + " f\"{SEGMENT_GCS}stops_projected_{analysis_date}.parquet\",\n", + " columns = [\n", + " \"shape_array_key\", \"loop_or_inlining\", \n", + " \"stop_primary_direction\", \n", + " ]\n", + " ).drop_duplicates().merge(\n", + " trip_to_shape,\n", + " on = \"shape_array_key\",\n", + " how = \"inner\"\n", + " )\n", + " \n", + " return shapes_loop_inlining" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0f3e202c-2854-4874-8f09-8074c3025f47", + "metadata": {}, + "outputs": [], + "source": [ + "trip_shape_crosswalk = rt_trips_to_shape(analysis_date)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c332ddfa-90d9-4a3f-bc81-005d1e277422", + "metadata": {}, + "outputs": [], + "source": [ + "vp = pd.read_parquet(\n", + " f\"{SEGMENT_GCS}vp_usable_{analysis_date}\",\n", + " filters = [[(\"trip_instance_key\", \"==\", one_trip)]],\n", + " columns = [\"trip_instance_key\", \"vp_idx\", \n", + " \"vp_primary_direction\", \n", + " ]\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ae73eb2d-ac83-405a-9aab-9143ec8874b3", + "metadata": {}, + "outputs": [], + "source": [ + "subset_vp = vp.vp_idx.unique()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3172933b-6d1b-4ef7-af50-5137c638b72b", + "metadata": {}, + "outputs": [], + "source": [ + "projected_shape_meters = pd.read_parquet(\n", + " f\"{SEGMENT_GCS}projection/vp_projected_{analysis_date}.parquet\",\n", + " filters = [[(\"vp_idx\", \"in\", subset_vp)]]\n", + ")\n", + "\n", + "vp_with_projection = pd.merge(\n", + " vp,\n", + " projected_shape_meters,\n", + " on = \"vp_idx\",\n", + " how = \"inner\"\n", + ").merge(\n", + " trip_shape_crosswalk,\n", + " on = \"trip_instance_key\",\n", + " how = \"inner\"\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c8687b8b-5550-484b-a080-aa8f787e95be", + "metadata": {}, + "outputs": [], + "source": [ + "shape_keys = trip_shape_crosswalk.shape_array_key.unique()\n", + "\n", + "stops_projected = pd.read_parquet(\n", + " f\"{SEGMENT_GCS}stops_projected_{analysis_date}.parquet\",\n", + " filters = [[(\"shape_array_key\", \"in\", shape_keys)]],\n", + " columns = [\"shape_array_key\", \"stop_sequence\", \"stop_id\", \n", + " \"shape_meters\", \"stop_primary_direction\"]\n", + ").rename(columns = {\"shape_meters\": \"stop_meters\"})" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "edac7256-c5ca-4bef-bf71-573b3a4d30c3", + "metadata": {}, + "outputs": [], + "source": [ + "trip_shape_cols = [\"trip_instance_key\", \"shape_array_key\"]\n", + "\n", + "trip_info = (\n", + " vp_with_projection\n", + " .groupby(trip_shape_cols, \n", + " observed=True, group_keys=False)\n", + " .agg({\n", + " \"vp_idx\": lambda x: list(x),\n", + " \"shape_meters\": lambda x: list(x),\n", + " \"vp_primary_direction\": lambda x: list(x),\n", + " })\n", + " .reset_index()\n", + " .rename(columns = {\n", + " \"vp_idx\": \"vp_idx_arr\",\n", + " \"shape_meters\": \"shape_meters_arr\",\n", + " \"vp_primary_direction\": \"vp_dir_arr\"\n", + " })\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ecfba640-5c25-4866-9b6c-3c32dcb8ec63", + "metadata": {}, + "outputs": [], + "source": [ + "vp_to_stop = pd.merge(\n", + " trip_info,\n", + " stops_projected,\n", + " on = \"shape_array_key\",\n", + " how = \"inner\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5aeaad1d-a191-49d9-b0d1-c5fe14f4ade5", + "metadata": {}, + "outputs": [], + "source": [ + "this_stop_direction = vp_to_stop.stop_primary_direction.iloc[0]\n", + "this_stop_meters = vp_to_stop.stop_meters.iloc[0]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "28e1a479-79fe-4ae1-8921-02d872257213", + "metadata": {}, + "outputs": [], + "source": [ + "this_stop_direction" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "11360fa7-de28-4f62-aa43-da65c61b1670", + "metadata": {}, + "outputs": [], + "source": [ + "opposite_to_stop_direction = wrangle_shapes.OPPOSITE_DIRECTIONS[\n", + " this_stop_direction]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "19fde734-15f8-4968-9637-014f6f80f469", + "metadata": {}, + "outputs": [], + "source": [ + "opposite_to_stop_direction" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "008612d6-e8db-4ea3-b2ff-64d94976381a", + "metadata": {}, + "outputs": [], + "source": [ + "vp_dir_array = np.asarray(vp_to_stop.vp_dir_arr.iloc[0])\n", + "shape_meters_array = np.asarray(vp_to_stop.shape_meters_arr.iloc[0])\n", + "vp_meters_array = np.asarray(vp_to_stop.vp_idx_arr.iloc[0])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e668c5c3-2d0b-4039-b1f9-6f1cfc2896fd", + "metadata": {}, + "outputs": [], + "source": [ + "#https://stackoverflow.com/questions/16094563/numpy-get-index-where-value-is-true\n", + "valid_vp_idx_indices = (vp_dir_array != opposite_to_stop_direction).nonzero()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a1ebd663-254e-44ae-87ae-2cc90e527e62", + "metadata": {}, + "outputs": [], + "source": [ + "vp_meters_array[valid_vp_idx_indices]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1e6c7bad-c77d-4695-b685-f5de12374490", + "metadata": {}, + "outputs": [], + "source": [ + "for row in vp_to_stop.tail(1).itertuples():\n", + " this_stop_meters = np.asarray(getattr(row, \"vp_idx_arr\"))[valid_vp_idx_indices]\n", + " #valid_stop_meters = this_stop_meters\n", + " #print(valid_stop_meters)\n", + " print(this_stop_meters)" + ] + }, + { + "cell_type": "markdown", + "id": "039ea93e-3950-467d-bdcf-398d004fbaac", + "metadata": {}, + "source": [ + "## Plot interpolated arrivals -> speed" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "680172b7-7d24-42ab-8ef7-5336e33b9d41", + "metadata": {}, + "outputs": [], + "source": [ + "df = pd.read_parquet(\n", + " f\"{SEGMENT_GCS}stop_arrivals_speed_{analysis_date}_2.parquet\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "aba87999-00da-46b7-b839-d521fcb0eb7e", + "metadata": {}, + "outputs": [], + "source": [ + "bins = range(0, 75, 5)\n", + "\n", + "df[df.speed_mph < 80].hist(\n", + " \"speed_mph\", \n", + " bins = bins)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0216fe6d-5e2a-4ba4-8aea-68a1c029593b", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/rt_segment_speeds/logs/interpolate_stop_arrival.log b/rt_segment_speeds/logs/interpolate_stop_arrival.log index a105b2850..0f93c12b4 100644 --- a/rt_segment_speeds/logs/interpolate_stop_arrival.log +++ b/rt_segment_speeds/logs/interpolate_stop_arrival.log @@ -1,3 +1,4 @@ -2023-10-24 13:53:30.034 | INFO | __main__::122 - set up df with nearest / subseq vp info: 0:01:06.871897 -2023-10-24 13:54:09.757 | INFO | __main__::127 - interpolate stop arrival: 0:00:39.722841 -2023-10-24 13:54:17.449 | INFO | __main__::133 - execution time: 0:01:54.286578 +2023-10-31 12:12:52.626 | INFO | __main__::99 - Analysis date: 2023-09-13 +2023-10-31 12:14:03.894 | INFO | __main__::134 - set up df with nearest / subseq vp info: 0:01:11.267039 +2023-10-31 12:14:57.365 | INFO | __main__::139 - interpolate stop arrival: 0:00:53.471494 +2023-10-31 12:15:05.266 | INFO | __main__::145 - execution time: 0:02:12.638916 diff --git a/rt_segment_speeds/logs/nearest_vp.log b/rt_segment_speeds/logs/nearest_vp.log index 51e6ecf91..34f9dc5b3 100644 --- a/rt_segment_speeds/logs/nearest_vp.log +++ b/rt_segment_speeds/logs/nearest_vp.log @@ -1,4 +1,8 @@ -2023-10-24 12:14:34.459 | INFO | __main__::203 - Analysis date: 2023-09-13 -2023-10-24 12:18:28.007 | INFO | __main__::231 - map partitions to transform vp: 0:03:53.547561 -2023-10-24 12:18:28.823 | INFO | __main__::262 - map partitions to find nearest vp to stop: 0:00:00.816213 -2023-10-24 12:20:42.144 | INFO | __main__::293 - execution time: 0:06:07.685094 +2023-10-31 at 09:34:59 | INFO | Analysis date: 2023-09-13 +2023-10-31 09:39:45.702 | INFO | __main__::261 - map partitions to transform vp: 0:04:46.103748 +2023-10-31 at 09:39:45 | INFO | map partitions to transform vp: 0:04:46.103748 +2023-10-31 09:39:46.981 | INFO | __main__::293 - map partitions to find nearest vp to stop: 0:00:01.279908 +2023-10-31 at 09:39:46 | INFO | map partitions to find nearest vp to stop: 0:00:01.2799082023-10-31 09:46:23.878 | INFO | __main__::316 - Analysis date: 2023-09-13 +2023-10-31 09:51:11.125 | INFO | __main__:find_nearest_vp_to_stop:261 - map partitions to transform vp: 0:04:47.246718 +2023-10-31 09:51:11.894 | INFO | __main__:find_nearest_vp_to_stop:293 - map partitions to find nearest vp to stop: 0:00:00.768417 +2023-10-31 09:57:34.934 | INFO | __main__::323 - execution time: 0:11:11.055258 diff --git a/rt_segment_speeds/logs/speeds_by_segment_trip.log b/rt_segment_speeds/logs/speeds_by_segment_trip.log index da5ed7639..d8226e495 100644 --- a/rt_segment_speeds/logs/speeds_by_segment_trip.log +++ b/rt_segment_speeds/logs/speeds_by_segment_trip.log @@ -1,101 +1,3 @@ -2023-08-13 21:07:14.840 | INFO | __main__::138 - Analysis date: 2023-07-12 -2023-08-13 21:07:20.671 | INFO | __main__:linear_referencing_and_speed_by_segment:80 - set up merged vp with segments: 0:00:05.827339 -2023-08-13 21:07:20.701 | INFO | __main__:linear_referencing_and_speed_by_segment:94 - linear referencing: 0:00:00.029502 -2023-08-13 21:07:20.706 | INFO | __main__:linear_referencing_and_speed_by_segment:122 - calculate speeds: 0:00:00.005830 -2023-08-13 21:10:49.771 | INFO | __main__::149 - speeds for stop segments: 0:03:34.931170 -2023-08-13 21:10:49.773 | INFO | __main__::150 - execution time: 0:03:34.932409 -2023-08-13 21:46:16.611 | INFO | __main__::138 - Analysis date: 2023-06-14 -2023-08-13 21:46:24.867 | INFO | __main__:linear_referencing_and_speed_by_segment:80 - set up merged vp with segments: 0:00:08.248141 -2023-08-13 21:46:24.926 | INFO | __main__:linear_referencing_and_speed_by_segment:94 - linear referencing: 0:00:00.059536 -2023-08-13 21:46:24.940 | INFO | __main__:linear_referencing_and_speed_by_segment:122 - calculate speeds: 0:00:00.014299 -2023-08-13 21:49:55.505 | INFO | __main__::149 - speeds for stop segments: 0:03:38.892609 -2023-08-13 21:49:55.506 | INFO | __main__::150 - execution time: 0:03:38.893893 -2023-08-13 22:30:03.918 | INFO | __main__::138 - Analysis date: 2023-05-17 -2023-08-13 22:30:10.204 | INFO | __main__:linear_referencing_and_speed_by_segment:80 - set up merged vp with segments: 0:00:06.280071 -2023-08-13 22:30:10.238 | INFO | __main__:linear_referencing_and_speed_by_segment:94 - linear referencing: 0:00:00.034310 -2023-08-13 22:30:10.245 | INFO | __main__:linear_referencing_and_speed_by_segment:122 - calculate speeds: 0:00:00.006647 -2023-08-13 22:33:29.899 | INFO | __main__::149 - speeds for stop segments: 0:03:25.979982 -2023-08-13 22:33:29.901 | INFO | __main__::150 - execution time: 0:03:25.981118 -2023-08-13 23:06:48.959 | INFO | __main__::138 - Analysis date: 2023-04-12 -2023-08-13 23:06:54.755 | INFO | __main__:linear_referencing_and_speed_by_segment:80 - set up merged vp with segments: 0:00:05.791592 -2023-08-13 23:06:54.783 | INFO | __main__:linear_referencing_and_speed_by_segment:94 - linear referencing: 0:00:00.027643 -2023-08-13 23:06:54.789 | INFO | __main__:linear_referencing_and_speed_by_segment:122 - calculate speeds: 0:00:00.006092 -2023-08-13 23:10:35.876 | INFO | __main__::149 - speeds for stop segments: 0:03:46.915330 -2023-08-13 23:10:35.877 | INFO | __main__::150 - execution time: 0:03:46.916034 -2023-08-14 08:29:20.811 | INFO | __main__::138 - Analysis date: 2023-03-15 -2023-08-14 08:29:27.627 | INFO | __main__:linear_referencing_and_speed_by_segment:80 - set up merged vp with segments: 0:00:06.775133 -2023-08-14 08:29:27.663 | INFO | __main__:linear_referencing_and_speed_by_segment:94 - linear referencing: 0:00:00.036027 -2023-08-14 08:29:27.670 | INFO | __main__:linear_referencing_and_speed_by_segment:122 - calculate speeds: 0:00:00.007424 -2023-08-14 08:35:27.714 | INFO | __main__::149 - speeds for stop segments: 0:06:06.866192 -2023-08-14 08:35:27.715 | INFO | __main__::150 - execution time: 0:06:06.867710 -2023-08-18 14:43:56.666 | INFO | __main__::139 - Analysis date: 2023-08-16 -2023-08-18 14:44:03.327 | INFO | __main__:linear_referencing_and_speed_by_segment:81 - set up merged vp with segments: 0:00:06.640459 -2023-08-18 14:44:03.352 | INFO | __main__:linear_referencing_and_speed_by_segment:95 - linear referencing: 0:00:00.024816 -2023-08-18 14:44:03.357 | INFO | __main__:linear_referencing_and_speed_by_segment:123 - calculate speeds: 0:00:00.005369 -2023-08-18 14:47:45.494 | INFO | __main__::150 - speeds for stop segments: 0:03:48.810752 -2023-08-18 14:47:45.494 | INFO | __main__::151 - execution time: 0:03:48.811523 -2023-08-24 14:51:41.446 | INFO | __main__::139 - Analysis date: 2023-08-15 -2023-08-24 14:51:47.786 | INFO | __main__:linear_referencing_and_speed_by_segment:81 - set up merged vp with segments: 0:00:06.310920 -2023-08-24 14:51:47.822 | INFO | __main__:linear_referencing_and_speed_by_segment:95 - linear referencing: 0:00:00.035842 -2023-08-24 14:51:47.828 | INFO | __main__:linear_referencing_and_speed_by_segment:123 - calculate speeds: 0:00:00.006706 -2023-08-24 14:56:14.082 | INFO | __main__::150 - speeds for stop segments: 0:04:32.610802 -2023-08-24 14:56:14.083 | INFO | __main__::151 - execution time: 0:04:32.611824 -2023-09-21 11:53:11.705 | INFO | __main__::367 - Analysis date: 2023-09-13 -2023-09-21 11:53:18.285 | INFO | __main__:linear_referencing_vp_against_line:58 - set up merged vp with segments: 0:00:00.152710 -2023-09-21 11:53:18.322 | INFO | __main__:linear_referencing_vp_against_line:76 - linear referencing: 0:00:00.037063 -2023-09-21 11:57:20.821 | INFO | __main__:linear_referencing_and_speed_by_segment:300 - linear referencing: 0:04:09.111393 -2023-09-21 12:03:20.231 | INFO | __main__:linear_referencing_and_speed_by_segment:311 - make wide and get initial speeds: 0:05:59.409953 -2023-09-21 12:05:33.476 | INFO | __main__:linear_referencing_and_speed_by_segment:352 - recalculate speeds and get final: 0:02:13.244950 -2023-09-21 12:05:46.785 | INFO | __main__::375 - speeds for stop segments: 0:12:35.079026 -2023-09-21 12:05:46.788 | INFO | __main__::376 - execution time: 0:12:35.081785 -2023-09-21 12:28:12.385 | INFO | __main__::367 - Analysis date: 2023-08-15 -2023-09-21 12:28:19.027 | INFO | __main__:linear_referencing_vp_against_line:58 - set up merged vp with segments: 0:00:00.162699 -2023-09-21 12:28:19.063 | INFO | __main__:linear_referencing_vp_against_line:76 - linear referencing: 0:00:00.036557 -2023-09-21 12:32:49.165 | INFO | __main__:linear_referencing_and_speed_by_segment:300 - linear referencing: 0:04:36.761005 -2023-09-21 12:39:56.447 | INFO | __main__:linear_referencing_and_speed_by_segment:311 - make wide and get initial speeds: 0:07:07.282236 -2023-09-21 12:42:14.396 | INFO | __main__:linear_referencing_and_speed_by_segment:352 - recalculate speeds and get final: 0:02:17.948959 -2023-09-21 12:42:27.893 | INFO | __main__::375 - speeds for stop segments: 0:14:15.493395 -2023-09-21 12:42:27.894 | INFO | __main__::376 - execution time: 0:14:15.494456 -2023-09-21 13:16:38.177 | INFO | __main__::367 - Analysis date: 2023-07-12 -2023-09-21 13:16:44.655 | INFO | __main__:linear_referencing_vp_against_line:58 - set up merged vp with segments: 0:00:00.156084 -2023-09-21 13:16:44.692 | INFO | __main__:linear_referencing_vp_against_line:76 - linear referencing: 0:00:00.037555 -2023-09-21 13:20:41.231 | INFO | __main__:linear_referencing_and_speed_by_segment:300 - linear referencing: 0:04:03.035504 -2023-09-21 13:26:57.794 | INFO | __main__:linear_referencing_and_speed_by_segment:311 - make wide and get initial speeds: 0:06:16.562615 -2023-09-21 13:29:08.771 | INFO | __main__:linear_referencing_and_speed_by_segment:352 - recalculate speeds and get final: 0:02:10.977540 -2023-09-21 13:29:21.791 | INFO | __main__::375 - speeds for stop segments: 0:12:43.599595 -2023-09-21 13:29:21.792 | INFO | __main__::376 - execution time: 0:12:43.600710 -2023-09-21 15:36:54.009 | INFO | __main__::369 - Analysis date: 2023-06-14 -2023-09-21 15:36:59.961 | INFO | __main__:linear_referencing_vp_against_line:58 - set up merged vp with segments: 0:00:00.156330 -2023-09-21 15:37:00.016 | INFO | __main__:linear_referencing_vp_against_line:76 - linear referencing: 0:00:00.055084 -2023-09-21 15:40:58.894 | INFO | __main__:linear_referencing_and_speed_by_segment:302 - linear referencing: 0:04:04.880927 -2023-09-21 15:46:35.541 | INFO | __main__:linear_referencing_and_speed_by_segment:313 - make wide and get initial speeds: 0:05:36.646767 -2023-09-21 15:48:52.854 | INFO | __main__:linear_referencing_and_speed_by_segment:354 - recalculate speeds and get final: 0:02:17.313285 -2023-09-21 15:49:04.953 | INFO | __main__::377 - speeds for stop segments: 0:12:10.943637 -2023-09-21 15:49:04.954 | INFO | __main__::378 - execution time: 0:12:10.944518 -2023-09-21 17:41:01.371 | INFO | __main__::369 - Analysis date: 2023-05-17 -2023-09-21 17:41:07.436 | INFO | __main__:linear_referencing_vp_against_line:58 - set up merged vp with segments: 0:00:00.155348 -2023-09-21 17:41:07.465 | INFO | __main__:linear_referencing_vp_against_line:76 - linear referencing: 0:00:00.028611 -2023-09-21 17:45:03.306 | INFO | __main__:linear_referencing_and_speed_by_segment:302 - linear referencing: 0:04:01.914759 -2023-09-21 17:51:13.000 | INFO | __main__:linear_referencing_and_speed_by_segment:313 - make wide and get initial speeds: 0:06:09.693714 -2023-09-21 17:53:38.147 | INFO | __main__:linear_referencing_and_speed_by_segment:354 - recalculate speeds and get final: 0:02:25.147277 -2023-09-21 17:53:49.471 | INFO | __main__::377 - speeds for stop segments: 0:12:48.084151 -2023-09-21 17:53:49.474 | INFO | __main__::378 - execution time: 0:12:48.087006 -2023-09-21 18:19:33.226 | INFO | __main__::369 - Analysis date: 2023-04-12 -2023-09-21 18:19:39.961 | INFO | __main__:linear_referencing_vp_against_line:58 - set up merged vp with segments: 0:00:00.173198 -2023-09-21 18:19:39.999 | INFO | __main__:linear_referencing_vp_against_line:76 - linear referencing: 0:00:00.037629 -2023-09-21 18:23:57.932 | INFO | __main__:linear_referencing_and_speed_by_segment:302 - linear referencing: 0:04:24.702955 -2023-09-21 18:30:30.758 | INFO | __main__:linear_referencing_and_speed_by_segment:313 - make wide and get initial speeds: 0:06:32.825103 -2023-09-21 18:32:55.850 | INFO | __main__:linear_referencing_and_speed_by_segment:354 - recalculate speeds and get final: 0:02:25.091989 -2023-09-21 18:33:07.969 | INFO | __main__::377 - speeds for stop segments: 0:13:34.743167 -2023-09-21 18:33:07.972 | INFO | __main__::378 - execution time: 0:13:34.745381 -2023-09-21 19:14:16.648 | INFO | __main__::369 - Analysis date: 2023-03-15 -2023-09-21 19:14:22.748 | INFO | __main__:linear_referencing_vp_against_line:58 - set up merged vp with segments: 0:00:00.170221 -2023-09-21 19:14:22.792 | INFO | __main__:linear_referencing_vp_against_line:76 - linear referencing: 0:00:00.044492 -2023-09-21 19:18:37.589 | INFO | __main__:linear_referencing_and_speed_by_segment:302 - linear referencing: 0:04:20.919458 -2023-09-21 19:24:39.102 | INFO | __main__:linear_referencing_and_speed_by_segment:313 - make wide and get initial speeds: 0:06:01.513090 -2023-09-21 19:28:06.672 | INFO | __main__:linear_referencing_and_speed_by_segment:354 - recalculate speeds and get final: 0:03:27.569825 -2023-09-21 19:28:23.294 | INFO | __main__::377 - speeds for stop segments: 0:14:06.628501 -2023-09-21 19:28:23.296 | INFO | __main__::378 - execution time: 0:14:06.630732 2023-10-17 18:24:17.885 | INFO | __main__::369 - Analysis date: 2023-10-11 2023-10-17 18:24:23.381 | INFO | __main__:linear_referencing_vp_against_line:58 - set up merged vp with segments: 0:00:00.132646 2023-10-17 18:24:23.430 | INFO | __main__:linear_referencing_vp_against_line:76 - linear referencing: 0:00:00.049441 @@ -104,3 +6,5 @@ 2023-10-17 18:34:24.377 | INFO | __main__:linear_referencing_and_speed_by_segment:354 - recalculate speeds and get final: 0:02:16.135938 2023-10-17 18:34:34.838 | INFO | __main__::377 - speeds for stop segments: 0:10:16.927674 2023-10-17 18:34:34.838 | INFO | __main__::378 - execution time: 0:10:16.928330 +2023-10-31 12:29:06.200 | INFO | __main__::23 - Analysis date: 2023-09-13 +2023-10-31 12:29:29.129 | INFO | __main__::69 - execution time: 0:00:22.926565 diff --git a/rt_segment_speeds/prep_comparison.py b/rt_segment_speeds/prep_comparison.py index 3a011f0f1..b1adc6720 100644 --- a/rt_segment_speeds/prep_comparison.py +++ b/rt_segment_speeds/prep_comparison.py @@ -111,6 +111,46 @@ def prep_tiff_data( return df_tiff +def prep_tiff_interpolated_data( + analysis_date: str, + subset_df: gpd.GeoDataFrame +) -> gpd.GeoDataFrame: + + shape_trips = subset_df[["shape_id", "trip_id"]].drop_duplicates() + + scheduled_trips = helpers.import_scheduled_trips( + analysis_date, + columns = [ + "gtfs_dataset_key", "name", + "trip_id", "trip_instance_key", + "shape_id", "shape_array_key", + "route_id", "direction_id"], + get_pandas = True + ).rename(columns = {"gtfs_dataset_key": "schedule_gtfs_dataset_key"}) + + # Grab the trip_instance_keys we need and use it + # to filter the speeds parquet down + subset_trips = scheduled_trips.merge( + shape_trips, + on = ["shape_id", "trip_id"], + how = "inner" + ) + + trip_instances = subset_trips.trip_instance_key.unique().tolist() + subset_shapes = subset_trips.shape_array_key.unique().tolist() + + filtered_trip_speeds = pd.read_parquet( + f"{SEGMENT_GCS}stop_arrivals_speed_{analysis_date}.parquet", + filters = [[("trip_instance_key", "in", trip_instances)]] + ).merge( + subset_trips, + on = ["trip_instance_key", "shape_array_key"], + how = "inner" + ) + + return filtered_trip_speeds + + def map_one_trip(gdf: gpd.GeoDataFrame, one_trip: str): gdf2 = gdf[gdf.trip_id==one_trip] @@ -123,8 +163,10 @@ def map_one_trip(gdf: gpd.GeoDataFrame, one_trip: str): return m1 if __name__ == "__main__": + df_eric = prep_eric_data(analysis_date) df_tiff = prep_tiff_data(analysis_date, df_eric) + df_tiff_interp = prep_tiff_interpolated_data(analysis_date, df_eric) utils.geoparquet_gcs_export( df_eric, @@ -138,6 +180,9 @@ def map_one_trip(gdf: gpd.GeoDataFrame, one_trip: str): f"speeds_tiff_{analysis_date}" ) + df_tiff_interp.to_parquet( + f"{SEGMENT_GCS}speeds_tiff_interp_{analysis_date}.parquet") + # stop_sequence doesn't exactly merge, but that's fine, # since Eric cuts shorter segments, so stop_sequence can have # values like 1.25, 1.50, etc. @@ -146,7 +191,6 @@ def map_one_trip(gdf: gpd.GeoDataFrame, one_trip: str): "trip_id", "shape_id", "stop_id", "stop_sequence", "route_id", "direction_id", ] - speed_df = pd.merge( df_eric[identifier_cols + ["speed_mph"]].rename( @@ -156,6 +200,12 @@ def map_one_trip(gdf: gpd.GeoDataFrame, one_trip: str): on = identifier_cols, how = "left", indicator = True + ).merge( + df_tiff_interp[identifier_cols + ["speed_mph"]].rename( + columns = {"speed_mph": "tiff_interp_speed_mph"}), + on = identifier_cols, + how = "left", ) - speed_df.to_parquet(f"{SEGMENT_GCS}speeds_comparison_{analysis_date}.parquet") \ No newline at end of file + speed_df.to_parquet( + f"{SEGMENT_GCS}speeds_comparison_{analysis_date}.parquet") \ No newline at end of file diff --git a/rt_segment_speeds/scripts/Makefile b/rt_segment_speeds/scripts/Makefile index baf76147b..8f79d19d8 100644 --- a/rt_segment_speeds/scripts/Makefile +++ b/rt_segment_speeds/scripts/Makefile @@ -21,8 +21,11 @@ speeds_pipeline: new_pipeline: python prep_stop_segments.py python shapely_project_vp.py - python shapely_interpolate.py - #python A3_valid_vehicle_positions.py + python nearest_vp_to_stop.py + python interpolate_stop_arrival.py + python stop_arrivals_to_speed.py + #python handle_common_errors.py + download_roads: diff --git a/rt_segment_speeds/scripts/config.yml b/rt_segment_speeds/scripts/config.yml index 165bbf56b..ac1d96bdb 100644 --- a/rt_segment_speeds/scripts/config.yml +++ b/rt_segment_speeds/scripts/config.yml @@ -1,27 +1,19 @@ stop_segments: - stage0: "vp" stage1: "vp_usable" - stage2: "vp_stop_segment" - stage3: "vp_pared_stops" - stage4: "speeds_stop_segments" - stage5: "avg_speeds_stop_segments" - segments_file: "stop_segments" - trip_grouping_cols: ["shape_array_key"] - grouping_col: "shape_array_key" + stage2: "nearest_vp" + stage3: "stop_arrivals" + stage4: "speed_stop_segments" segment_identifier_cols: ["shape_array_key", "stop_sequence"] timestamp_col: "location_timestamp_local" time_min_cutoff: 10 pct_segment_minimum: 0.3 road_segments: - stage0: "vp" stage1: "vp_usable" stage2: "vp_road_segment" stage3: "vp_pared_roads" stage4: "speeds_road_segments" stage5: "avg_speeds_road_segments" segments_file: "road_segments" - trip_grouping_cols: ["trip_instance_key"] - grouping_col: None segment_identifier_cols: ["linearid", "mtfcc", "primary_direction", "segment_sequence"] timestamp_col: "location_timestamp_local" time_min_cutoff: 10 diff --git a/rt_segment_speeds/scripts/handle_common_errors.py b/rt_segment_speeds/scripts/handle_common_errors.py new file mode 100644 index 000000000..5f1840b2b --- /dev/null +++ b/rt_segment_speeds/scripts/handle_common_errors.py @@ -0,0 +1,70 @@ +""" +Tag rows where we know errors are occurring when +we find the nearest vp. + +Error 1: nearest_vp_idx = subseq_vp_idx. +We cannot interpolate if the 2 endpoints are the same points. + +Error 2: change from the prior vp_idx value is negative. +We expect monotonically increasing vp_idx across stops, +which then translate to arrival times that are increasing. + +For loop/inlining shapes, esp near origin +(which is also destination), we can select a vp that's happening +at the end of the trip and attach to the origin stop. +This results in an arrival time for stop 1 that occurs after arrival time +for stop 2. +This happens because vp_primary_direction might be Unknown + +""" +import dask.dataframe as dd +import numpy as np +import pandas as pd + +from segment_speed_utils import helpers, wrangle_shapes +from segment_speed_utils.project_vars import SEGMENT_GCS, PROJECT_CRS, CONFIG_PATH +from shared_utils import rt_dates + +def tag_common_errors(df: pd.DataFrame) -> pd.DataFrame: + """ + """ + + df = df.assign( + # Find where we select same points and try to interpolate in between + error_same_endpoints = df.apply( + lambda x: 1 if x.nearest_vp_idx == x.subseq_vp_idx + else 0, axis=1 + ), + # Find the difference in nearest_vp_idx by comparing + # against the previous row and subsequent row + # If there's a big drop-off after a stop, we want to flag this current one + diff_subseq = (df.groupby("trip_instance_key", + observed=True, group_keys=False) + .nearest_vp_idx + .apply(lambda x: x.shift(-1) - x) + ), + ) + + df = df.assign( + error_arrival_order = df.apply( + lambda x: 1 if x.diff_subseq < 0 + else 0, axis=1 + ) + ) + + return df + + +if __name__ == "__main__": + + analysis_date = rt_dates.DATES["sep2023"] + STOP_SEG_DICT = helpers.get_parameters(CONFIG_PATH, "stop_segments") + + NEAREST_VP = f"{STOP_SEG_DICT['stage2']}_{analysis_date}" + + df = pd.read_parquet( + f"{SEGMENT_GCS}projection/{NEAREST_VP}.parquet" + ).pipe(tag_common_errors) + + df.to_parquet( + f"{SEGMENT_GCS}projection/nearest_vp_error_{analysis_date}.parquet") diff --git a/rt_segment_speeds/scripts/interpolate_stop_arrival.py b/rt_segment_speeds/scripts/interpolate_stop_arrival.py index 0d4aefc05..4460d8538 100644 --- a/rt_segment_speeds/scripts/interpolate_stop_arrival.py +++ b/rt_segment_speeds/scripts/interpolate_stop_arrival.py @@ -9,12 +9,11 @@ from loguru import logger -from segment_speed_utils import helpers, segment_calcs -from segment_speed_utils.project_vars import SEGMENT_GCS, PROJECT_CRS +from segment_speed_utils import helpers +from segment_speed_utils.project_vars import (SEGMENT_GCS, + PROJECT_CRS, CONFIG_PATH) from shared_utils import rt_dates -analysis_date = rt_dates.DATES["sep2023"] - def attach_vp_shape_meters_with_timestamp( analysis_date: str, **kwargs @@ -73,7 +72,12 @@ def get_stop_arrivals(df: pd.DataFrame) -> pd.DataFrame: df = df.assign( arrival_time = stop_arrival_series, - ).astype({"arrival_time": "datetime64[s]"}) + ).astype( + {"arrival_time": "datetime64[s]"} + ).sort_values( + ["trip_instance_key", + "shape_array_key", "stop_sequence"] + ).reset_index(drop=True) return df @@ -87,15 +91,21 @@ def get_stop_arrivals(df: pd.DataFrame) -> pd.DataFrame: level="INFO") analysis_date = rt_dates.DATES["sep2023"] + STOP_SEG_DICT = helpers.get_parameters(CONFIG_PATH, "stop_segments") + + NEAREST_VP = f"{STOP_SEG_DICT['stage2']}_{analysis_date}" + STOP_ARRIVALS_FILE = f"{STOP_SEG_DICT['stage3']}_{analysis_date}" logger.info(f"Analysis date: {analysis_date}") start = datetime.datetime.now() vp_pared = pd.read_parquet( - f"{SEGMENT_GCS}projection/nearest_vp_normal_{analysis_date}.parquet", + f"{SEGMENT_GCS}projection/{NEAREST_VP}.parquet", ) - + + # If we filter down pd.read_parquet, we can use np arrays + # If we filter down dd.read_parquet, we have to use lists subset_vp = np.union1d( vp_pared.nearest_vp_idx.unique(), vp_pared.subseq_vp_idx.unique() @@ -129,7 +139,7 @@ def get_stop_arrivals(df: pd.DataFrame) -> pd.DataFrame: logger.info(f"interpolate stop arrival: {time2 - time1}") stop_arrivals_df.to_parquet( - f"{SEGMENT_GCS}stop_arrivals_{analysis_date}.parquet") + f"{SEGMENT_GCS}{STOP_ARRIVALS_FILE}.parquet") end = datetime.datetime.now() logger.info(f"execution time: {end - start}") diff --git a/rt_segment_speeds/scripts/nearest_vp_to_stop.py b/rt_segment_speeds/scripts/nearest_vp_to_stop.py index da00c7223..1868772e2 100644 --- a/rt_segment_speeds/scripts/nearest_vp_to_stop.py +++ b/rt_segment_speeds/scripts/nearest_vp_to_stop.py @@ -1,8 +1,9 @@ """ -Handle normal vs loopy shapes separately. +Find the nearest vp to a stop cutpoint. -For normal shapes, find the nearest vp_idx before a stop, -and the vp_idx after. +Factor in direction. Only search within the valid vp idx +values (ones that are not running in the opposite direction +as the stop is going). Of these, find the nearest vp to each stop. """ import dask.dataframe as dd import datetime @@ -12,8 +13,9 @@ from loguru import logger -from segment_speed_utils import helpers, segment_calcs -from segment_speed_utils.project_vars import SEGMENT_GCS, PROJECT_CRS +from segment_speed_utils import helpers, segment_calcs, wrangle_shapes +from segment_speed_utils.project_vars import (SEGMENT_GCS, + PROJECT_CRS, CONFIG_PATH) from shared_utils import rt_dates @@ -43,7 +45,9 @@ def rt_trips_to_shape(analysis_date: str) -> pd.DataFrame: # Find whether it's loop or inlining shapes_loop_inlining = pd.read_parquet( f"{SEGMENT_GCS}stops_projected_{analysis_date}.parquet", - columns = ["shape_array_key", "loop_or_inlining"] + columns = [ + "shape_array_key", "loop_or_inlining", + ] ).drop_duplicates().merge( trip_to_shape, on = "shape_array_key", @@ -54,23 +58,21 @@ def rt_trips_to_shape(analysis_date: str) -> pd.DataFrame: def vp_with_shape_meters( - analysis_date: str, - subset_trips: list + vp_file_name: str, + **kwargs ) -> dd.DataFrame: """ Subset vp_usable down based on list of trip_instance_keys. For these trips, attach the projected shape meters. """ vp = dd.read_parquet( - f"{SEGMENT_GCS}vp_usable_{analysis_date}", - filters = [[("trip_instance_key", "in", subset_trips)]], + f"{SEGMENT_GCS}{vp_file_name}", + **kwargs, columns = ["trip_instance_key", "vp_idx", - "location_timestamp_local"] + "vp_primary_direction", + ] ) - vp = segment_calcs.convert_timestamp_to_seconds( - vp, ["location_timestamp_local"]).drop(columns = "location_timestamp_local") - projected_shape_meters = pd.read_parquet( f"{SEGMENT_GCS}projection/vp_projected_{analysis_date}.parquet", ) @@ -99,11 +101,15 @@ def transform_vp(vp: dd.DataFrame) -> dd.DataFrame: observed=True, group_keys=False) .agg({ "vp_idx": lambda x: list(x), - "shape_meters": lambda x: list(x)}) + "shape_meters": lambda x: list(x), + "vp_primary_direction": lambda x: list(x), + }) .reset_index() .rename(columns = { "vp_idx": "vp_idx_arr", - "shape_meters": "shape_meters_arr"}) + "shape_meters": "shape_meters_arr", + "vp_primary_direction": "vp_dir_arr" + }) ) return trip_info @@ -128,9 +134,33 @@ def find_vp_nearest_stop_position( # https://github.com/cal-itp/data-analyses/blob/main/rt_delay/rt_analysis/rt_parser.py#L270-L271 # Don't forget to subtract 1 for proper index + # Use this as a mask to hide the values that are not valid + # https://stackoverflow.com/questions/16094563/numpy-get-index-where-value-is-true + for row in df.itertuples(): + + this_stop_direction = getattr(row, "stop_primary_direction") + opposite_to_stop_direction = wrangle_shapes.OPPOSITE_DIRECTIONS[ + this_stop_direction] + + # Start with the array of all the vp primary direction + vp_dir_array = np.asarray(getattr(row, "vp_dir_arr")) + + # Hide the array indices where it's the opposite direction + # Keep the ones running in all other directions + valid_indices = (vp_dir_array != opposite_to_stop_direction).nonzero() + + shape_meters_array = getattr(row, "shape_meters_arr") + vp_idx_array = getattr(row, "vp_idx_arr") + + # Make sure these are arrays so we can search within the valid values + valid_shape_meters_array = np.asarray(shape_meters_array)[valid_indices] + valid_vp_idx_array = np.asarray(vp_idx_array)[valid_indices] + + this_stop_meters = getattr(row, "stop_meters") + idx = np.searchsorted( - getattr(row, "shape_meters_arr"), + valid_shape_meters_array, getattr(row, "stop_meters"), side="right" # want our stop_meters value to be < vp_shape_meters, @@ -139,8 +169,9 @@ def find_vp_nearest_stop_position( # For the next value, if there's nothing to index into, # just set it to the same position - # if we set subseq_value = getattr(row, )[idx], we might not get a consecutive vp - nearest_value = getattr(row, "vp_idx_arr")[idx-1] + # if we set subseq_value = getattr(row, )[idx], + # we might not get a consecutive vp + nearest_value = valid_vp_idx_array[idx-1] subseq_value = nearest_value + 1 nearest_vp_idx.append(nearest_value) @@ -162,12 +193,12 @@ def find_vp_nearest_stop_position( def fix_out_of_bound_results( df: pd.DataFrame, - analysis_date: str + vp_file_name: str ) -> pd.DataFrame: - + # Merge in usable bounds usable_bounds = dd.read_parquet( - f"{SEGMENT_GCS}vp_usable_{analysis_date}" + f"{SEGMENT_GCS}{vp_file_name}" ).pipe(segment_calcs.get_usable_vp_bounds_by_trip) results_with_bounds = pd.merge( @@ -186,34 +217,31 @@ def fix_out_of_bound_results( fixed_results = pd.concat( [correct_results, incorrect_results], axis=0 - ).drop(columns = ["min_vp_idx", "max_vp_idx"]).sort_index() + ).drop( + columns = ["min_vp_idx", "max_vp_idx"] + ).sort_values(["trip_instance_key", "shape_array_key", "stop_sequence"] + ).reset_index(drop=True) return fixed_results -if __name__ == "__main__": - - LOG_FILE = "../logs/nearest_vp.log" - logger.add(LOG_FILE, retention="3 months") - logger.add(sys.stderr, - format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", - level="INFO") - - analysis_date = rt_dates.DATES["sep2023"] - - logger.info(f"Analysis date: {analysis_date}") - +def find_nearest_vp_to_stop( + analysis_date: str, + dict_inputs: dict = {} +): start = datetime.datetime.now() - normal_shape_trips = rt_trips_to_shape(analysis_date).query('loop_or_inlining==0') - normal_trip_keys = normal_shape_trips.trip_instance_key.tolist() - normal_shapes = normal_shape_trips.shape_array_key.unique().tolist() + USABLE_VP = f'{dict_inputs["stage1"]}_{analysis_date}' + SEGMENT_IDENTIFIER_COLS = dict_inputs["segment_identifier_cols"] + EXPORT_FILE = f'{dict_inputs["stage2"]}_{analysis_date}' + + shape_trip_crosswalk = rt_trips_to_shape(analysis_date) + shape_keys = shape_trip_crosswalk.shape_array_key.unique().tolist() vp = vp_with_shape_meters( - analysis_date, - normal_trip_keys + USABLE_VP, ).merge( - normal_shape_trips, + shape_trip_crosswalk, on = "trip_instance_key", how = "inner" ) @@ -223,7 +251,8 @@ def fix_out_of_bound_results( meta = {"trip_instance_key": "object", "shape_array_key": "object", "vp_idx_arr": "object", - "shape_meters_arr": "object" + "shape_meters_arr": "object", + "vp_dir_arr": "object" }, align_dataframes = False ).persist() @@ -233,12 +262,13 @@ def fix_out_of_bound_results( stops_projected = pd.read_parquet( f"{SEGMENT_GCS}stops_projected_{analysis_date}.parquet", - filters = [[("shape_array_key", "in", normal_shapes)]], - columns = ["shape_array_key", "stop_sequence", "stop_id", "shape_meters"] + filters = [[("shape_array_key", "in", shape_keys)]], + columns = SEGMENT_IDENTIFIER_COLS + ["stop_id", + "shape_meters", "stop_primary_direction"] ).rename(columns = {"shape_meters": "stop_meters"}) existing_stop_cols = stops_projected[ - ["shape_array_key", "stop_sequence", "stop_id", "stop_meters"]].dtypes.to_dict() + SEGMENT_IDENTIFIER_COLS + ["stop_id", "stop_meters"]].dtypes.to_dict() existing_vp_cols = vp_wide[["trip_instance_key"]].dtypes.to_dict() vp_to_stop = dd.merge( @@ -264,13 +294,30 @@ def fix_out_of_bound_results( result = result.compute() - fixed_results = fix_out_of_bound_results(result, analysis_date) + fixed_results = fix_out_of_bound_results(result, USABLE_VP) fixed_results.to_parquet( - f"{SEGMENT_GCS}projection/nearest_vp_normal_{analysis_date}.parquet") + f"{SEGMENT_GCS}projection/{EXPORT_FILE}.parquet") + + + + +if __name__ == "__main__": + LOG_FILE = "../logs/nearest_vp.log" + logger.add(LOG_FILE, retention="3 months") + logger.add(sys.stderr, + format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", + level="INFO") + + analysis_date = rt_dates.DATES["sep2023"] + STOP_SEG_DICT = helpers.get_parameters(CONFIG_PATH, "stop_segments") + + logger.info(f"Analysis date: {analysis_date}") + + start = datetime.datetime.now() + + find_nearest_vp_to_stop(analysis_date, STOP_SEG_DICT) + end = datetime.datetime.now() logger.info(f"execution time: {end - start}") - - # https://stackoverflow.com/questions/10226551/whats-the-most-pythonic-way-to-calculate-percentage-changes-on-a-list-of-numbers - diff --git a/rt_segment_speeds/scripts/shapely_project_vp.py b/rt_segment_speeds/scripts/shapely_project_vp.py index a8c93df95..5539fb31c 100644 --- a/rt_segment_speeds/scripts/shapely_project_vp.py +++ b/rt_segment_speeds/scripts/shapely_project_vp.py @@ -10,23 +10,25 @@ from loguru import logger +from calitp_data_analysis.geography_utils import WGS84 from shared_utils import rt_dates from segment_speed_utils import helpers from segment_speed_utils.project_vars import (SEGMENT_GCS, - PROJECT_CRS) - -analysis_date = rt_dates.DATES["sep2023"] + PROJECT_CRS, CONFIG_PATH) def project_vp_to_shape( vp: dd.DataFrame, shapes: gpd.GeoDataFrame ): + """ + shapely.project vp point geom onto shape_geometry. + """ shapes = shapes.rename(columns = {"geometry": "shape_geometry"}) vp_gdf = gpd.GeoDataFrame( vp, geometry = gpd.points_from_xy(vp.x, vp.y), - crs = "EPSG:4326" + crs = WGS84 ).to_crs(PROJECT_CRS).drop(columns = ["x", "y"]) gdf = pd.merge( @@ -47,8 +49,13 @@ def project_vp_to_shape( if __name__ == "__main__": + analysis_date = rt_dates.DATES["sep2023"] + STOP_SEG_DICT = helpers.get_parameters(CONFIG_PATH, "stop_segments") + start = datetime.datetime.now() + USABLE_VP = f'{STOP_SEG_DICT["stage1"]}_{analysis_date}' + trips = helpers.import_scheduled_trips( analysis_date, columns = ["trip_instance_key", "shape_array_key"], @@ -56,7 +63,7 @@ def project_vp_to_shape( ) vp = dd.read_parquet( - f"{SEGMENT_GCS}vp_usable_{analysis_date}", + f"{SEGMENT_GCS}{USABLE_VP}", columns = ["trip_instance_key", "vp_idx", "x", "y"] ).merge( trips, @@ -65,7 +72,7 @@ def project_vp_to_shape( ) subset_shapes = pd.read_parquet( - f"{SEGMENT_GCS}vp_usable_{analysis_date}", + f"{SEGMENT_GCS}{USABLE_VP}", columns = ["trip_instance_key"] ).drop_duplicates().merge( trips, @@ -97,5 +104,4 @@ def project_vp_to_shape( f"{SEGMENT_GCS}projection/vp_projected_{analysis_date}.parquet") end = datetime.datetime.now() - logger.info(f"compute and export: {end - time1}") - + logger.info(f"compute and export: {end - time1}") \ No newline at end of file diff --git a/rt_segment_speeds/scripts/stop_arrivals_to_speed.py b/rt_segment_speeds/scripts/stop_arrivals_to_speed.py new file mode 100644 index 000000000..be9b0e677 --- /dev/null +++ b/rt_segment_speeds/scripts/stop_arrivals_to_speed.py @@ -0,0 +1,69 @@ +""" +Convert stop-to-stop arrivals into speeds. +""" +import datetime +import pandas as pd +import sys + +from loguru import logger + +from shared_utils import rt_dates +from segment_speed_utils import helpers, segment_calcs +from segment_speed_utils.project_vars import SEGMENT_GCS, CONFIG_PATH + +if __name__ == "__main__": + + LOG_FILE = "../logs/speeds_by_segment_trip.log" + logger.add(LOG_FILE, retention="3 months") + logger.add(sys.stderr, + format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", + level="INFO") + + analysis_date = rt_dates.DATES["sep2023"] + logger.info(f"Analysis date: {analysis_date}") + + + STOP_SEG_DICT = helpers.get_parameters(CONFIG_PATH, "stop_segments") + + STOP_ARRIVALS_FILE = f"{STOP_SEG_DICT['stage3']}_{analysis_date}" + SPEED_FILE = f"{STOP_SEG_DICT['stage4']}_{analysis_date}" + + start = datetime.datetime.now() + + df = pd.read_parquet( + f"{SEGMENT_GCS}stop_arrivals_{analysis_date}.parquet" + ) + + trip_stop_cols = ["trip_instance_key", "stop_sequence"] + + df = segment_calcs.convert_timestamp_to_seconds( + df, ["arrival_time"] + ).sort_values(trip_stop_cols).reset_index(drop=True) + + df = df.assign( + prior_arrival_time_sec = (df.groupby("trip_instance_key", + observed=True, group_keys=False) + .arrival_time_sec + .shift(1) + ), + prior_stop_meters = (df.groupby("trip_instance_key", + observed=True, group_keys=False) + .stop_meters + .shift(1) + ) + ) + + speed = df.assign( + meters_elapsed = df.stop_meters - df.prior_stop_meters, + sec_elapsed = df.arrival_time_sec - df.prior_arrival_time_sec, + ).pipe( + segment_calcs.derive_speed, + ("prior_stop_meters", "stop_meters"), + ("prior_arrival_time_sec", "arrival_time_sec") + ) + + speed.to_parquet( + f"{SEGMENT_GCS}{SPEED_FILE}.parquet") + + end = datetime.datetime.now() + logger.info(f"execution time: {end - start}") \ No newline at end of file diff --git a/rt_segment_speeds/segment_speed_utils/wrangle_shapes.py b/rt_segment_speeds/segment_speed_utils/wrangle_shapes.py index a5d7f89a8..acc7f1448 100644 --- a/rt_segment_speeds/segment_speed_utils/wrangle_shapes.py +++ b/rt_segment_speeds/segment_speed_utils/wrangle_shapes.py @@ -30,6 +30,7 @@ "Southbound": "Northbound", "Eastbound": "Westbound", "Westbound": "Eastbound", + "Unknown": "", } def interpolate_projected_points( diff --git a/rt_segment_speeds/test_loop_interline_rts.py b/rt_segment_speeds/test_loop_interline_rts.py deleted file mode 100644 index dfe38500a..000000000 --- a/rt_segment_speeds/test_loop_interline_rts.py +++ /dev/null @@ -1,10 +0,0 @@ -import datetime - -LOOP_INTERLINE_RTS = [ - {'service_date': datetime.date(2023, 2, 7), - 'feed_key': '99fca48844312a4b68af992be4f7ee43', - 'name': 'Kings Schedule', - 'route_id': '12', - 'route_key': '8907271777a255870b3992251710f005', - 'route_short_name': '12'} -] \ No newline at end of file