Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎇 BLE matching in the pipeline #965

Merged
merged 6 commits into from
May 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions emission/analysis/configs/dynamic_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import sys
import os
import logging

import json
import requests

STUDY_CONFIG = os.getenv('STUDY_CONFIG', "stage-program")

dynamic_config = None
def get_dynamic_config():
global dynamic_config
if dynamic_config is not None:
logging.debug("Returning cached dynamic config for %s at version %s" % (STUDY_CONFIG, dynamic_config['version']))
return dynamic_config
logging.debug("No cached dynamic config for %s, downloading from server" % STUDY_CONFIG)
download_url = "https://raw.githubusercontent.com/e-mission/nrel-openpath-deploy-configs/main/configs/" + STUDY_CONFIG + ".nrel-op.json"
logging.debug("About to download config from %s" % download_url)
r = requests.get(download_url)
if r.status_code != 200:
logging.debug(f"Unable to download study config, status code: {r.status_code}")
# sys.exit(1)
# TODO what to do here? What if Github is down or something?
# If we terminate, will the pipeline just try again later?
Comment on lines +22 to +24
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should throw an exception instead of terminating. That will cause the pipeline to catch the exception and mark that state as incomplete, which will ensure that any unprocessed data will be processed in the next run.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO

else:
dynamic_config = json.loads(r.text)
logging.debug(f"Successfully downloaded config with version {dynamic_config['version']} "\
f"for {dynamic_config['intro']['translated_text']['en']['deployment_name']} "\
f"and data collection URL {dynamic_config['server']['connectUrl']}")
return dynamic_config
17 changes: 15 additions & 2 deletions emission/analysis/intake/segmentation/section_segmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import logging

# Our imports
import emission.analysis.configs.dynamic_config as eadc
import emission.storage.pipeline_queries as epq
import emission.storage.decorations.analysis_timeseries_queries as esda

Expand All @@ -22,6 +23,7 @@
import emission.core.wrapper.entry as ecwe

import emission.core.common as ecc
import emcommon.bluetooth.ble_matching as emcble

class SectionSegmentationMethod(object):
def segment_into_sections(self, timeseries, distance_from_place, time_query):
Expand Down Expand Up @@ -64,6 +66,7 @@ def segment_trip_into_sections(user_id, trip_entry, trip_source):
ts = esta.TimeSeries.get_time_series(user_id)
time_query = esda.get_time_query_for_trip_like(esda.RAW_TRIP_KEY, trip_entry.get_id())
distance_from_place = _get_distance_from_start_place_to_end(trip_entry)
ble_entries_during_trip = ts.find_entries(["background/bluetooth_ble"], time_query)

if (trip_source == "DwellSegmentationTimeFilter"):
import emission.analysis.intake.segmentation.section_segmentation_methods.smoothed_high_confidence_motion as shcm
Expand Down Expand Up @@ -118,7 +121,16 @@ def segment_trip_into_sections(user_id, trip_entry, trip_source):
# Particularly in this case, if we don't do this, then the trip end may overshoot the section end
end_loc = trip_end_loc

fill_section(section, start_loc, end_loc, sensed_mode)
# ble_sensed_mode represents the vehicle that was sensed via BLE beacon during the section.
# For now, we are going to rely on the current segmentation implementation and then fill in
# ble_sensed_mode by looking at scans within the timestamp range of the section.
# Later, we may want to actually use BLE sensor data as part of the basis for segmentation
dynamic_config = eadc.get_dynamic_config()
ble_sensed_mode = emcble.get_ble_sensed_vehicle_for_section(
ble_entries_during_trip, start_loc.ts, end_loc.ts, dynamic_config
)

fill_section(section, start_loc, end_loc, sensed_mode, ble_sensed_mode)
# We create the entry after filling in the section so that we know
# that the data is included properly
section_entry = ecwe.Entry.create_entry(user_id, esda.RAW_SECTION_KEY,
Expand All @@ -143,7 +155,7 @@ def segment_trip_into_sections(user_id, trip_entry, trip_source):
prev_section_entry = section_entry


def fill_section(section, start_loc, end_loc, sensed_mode):
def fill_section(section, start_loc, end_loc, sensed_mode, ble_sensed_mode=None):
section.start_ts = start_loc.ts
section.start_local_dt = start_loc.local_dt
section.start_fmt_time = start_loc.fmt_time
Expand All @@ -161,6 +173,7 @@ def fill_section(section, start_loc, end_loc, sensed_mode):
section.duration = end_loc.ts - start_loc.ts
section.source = "SmoothedHighConfidenceMotion"
section.sensed_mode = sensed_mode
section.ble_sensed_mode = ble_sensed_mode


def stitch_together(ending_section_entry, stop_entry, starting_section_entry):
Expand Down
18 changes: 11 additions & 7 deletions emission/analysis/userinput/matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,13 @@ def create_and_link_timeline(ts, timeline, last_confirmed_place):
confirmed_places, confirmed_trips)
return confirmed_tl

def get_section_summary(ts, cleaned_trip, section_key):
def get_section_summary(ts, cleaned_trip, section_key, mode_prop="sensed_mode"):
"""
Returns the proportions of the distance, duration and count for each mode
in this trip. Note that sections are unimodal by definition.
cleaned_trip: the cleaned trip object associated with the sections
section_key: 'inferred_section' or 'cleaned_section'
mode_prop: the section property used as the basis for mode: 'sensed_mode' or 'ble_sensed_mode'.
"""
logging.debug(f"get_section_summary({cleaned_trip['_id']}, {section_key}) called")
sections = esdt.get_sections_for_trip(key = section_key,
Expand All @@ -207,12 +208,14 @@ def get_section_summary(ts, cleaned_trip, section_key):
logging.warning("While getting section summary, section length = 0. This should never happen, but let's not crash if it does")
return {"distance": {}, "duration": {}, "count": {}}
sections_df = ts.to_data_df(section_key, sections)
cleaned_section_mapper = lambda sm: ecwm.MotionTypes(sm).name
inferred_section_mapper = lambda sm: ecwmp.PredictedModeTypes(sm).name
sel_section_mapper = cleaned_section_mapper \
if section_key == "analysis/cleaned_section" else inferred_section_mapper
sections_df["sensed_mode_str"] = sections_df["sensed_mode"].apply(sel_section_mapper)
grouped_section_df = sections_df.groupby("sensed_mode_str")
if mode_prop == "ble_sensed_mode":
mapper = lambda bsm: bsm['baseMode'] if bsm is not None else ecwm.MotionTypes.UNKNOWN.name
elif section_key == "analysis/cleaned_section":
mapper = lambda sm: ecwm.MotionTypes(sm).name
else:
mapper = lambda sm: ecwmp.PredictedModeTypes(sm).name
sections_df[mode_prop + "_str"] = sections_df[mode_prop].apply(mapper)
grouped_section_df = sections_df.groupby(mode_prop + "_str")
retVal = {
"distance": grouped_section_df.distance.sum().to_dict(),
"duration": grouped_section_df.duration.sum().to_dict(),
Expand All @@ -233,6 +236,7 @@ def create_confirmed_entry(ts, tce, confirmed_key, input_key_list):
tce["data"]["cleaned_trip"])
confirmed_object_data['inferred_section_summary'] = get_section_summary(ts, cleaned_trip, "analysis/inferred_section")
confirmed_object_data['cleaned_section_summary'] = get_section_summary(ts, cleaned_trip, "analysis/cleaned_section")
confirmed_object_data['ble_sensed_summary'] = get_section_summary(ts, cleaned_trip, "analysis/inferred_section", mode_prop="ble_sensed_mode")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would assume that you would want to use analysis/cleaned_section here since the inferred section is not based on raw/smoothed data from sensors, but by running an ML pipeline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sensed data goes with sensor data, makes sense
TODO

elif (confirmed_key == esda.CONFIRMED_PLACE_KEY):
confirmed_object_data["cleaned_place"] = tce.get_id()
confirmed_object_data["user_input"] = \
Expand Down
1 change: 1 addition & 0 deletions emission/core/wrapper/confirmedtrip.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Confirmedtrip(ecwt.Trip):
"expected_trip": ecwb.WrapperBase.Access.WORM,
"inferred_section_summary": ecwb.WrapperBase.Access.WORM,
"cleaned_section_summary": ecwb.WrapperBase.Access.WORM,
"ble_sensed_summary": ecwb.WrapperBase.Access.WORM,
# the user input will have all `manual/*` entries
# let's make that be somewhat flexible instead of hardcoding into the data model
"user_input": ecwb.WrapperBase.Access.WORM,
Expand Down
1 change: 1 addition & 0 deletions emission/core/wrapper/section.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Section(ecwb.WrapperBase):
"end_loc": ecwb.WrapperBase.Access.WORM, # location of end point in geojson format
"duration": ecwb.WrapperBase.Access.WORM, # duration of the trip in secs
"sensed_mode": ecwb.WrapperBase.Access.WORM, # the sensed mode used for the segmentation
"ble_sensed_mode": ecwb.WrapperBase.Access.WORM, # the mode sensed from BLE beacon scans
"source": ecwb.WrapperBase.Access.WORM} # the method used to generate this trip

enums = {"sensed_mode": ecwm.MotionTypes}
Expand Down
2 changes: 2 additions & 0 deletions emission/tests/analysisTests/userInputTests/TestUserInput.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ def compare_confirmed_objs_result(self, result_dicts, expect_dicts, manual_keys
self.assertEqual(rt.data["inferred_section_summary"], et.data["inferred_section_summary"])
if "cleaned_section_summary" in et.data:
self.assertEqual(rt.data["cleaned_section_summary"], et.data["cleaned_section_summary"])
if 'ble_sensed_summary' in et.data:
self.assertEqual(rt.data["ble_sensed_summary"], et.data["ble_sensed_summary"])
logging.debug(20 * "=")

def compare_section_result(self, result, expect):
Expand Down
Loading
Loading