-
Notifications
You must be signed in to change notification settings - Fork 119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎇 BLE matching in the pipeline #965
Changes from all commits
c05b8bf
bfb20ae
f63c1d2
c0c8d83
8bca1fe
2059f9a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
import sys | ||
import os | ||
import logging | ||
|
||
import json | ||
import requests | ||
|
||
STUDY_CONFIG = os.getenv('STUDY_CONFIG', "stage-program") | ||
|
||
dynamic_config = None | ||
def get_dynamic_config(): | ||
global dynamic_config | ||
if dynamic_config is not None: | ||
logging.debug("Returning cached dynamic config for %s at version %s" % (STUDY_CONFIG, dynamic_config['version'])) | ||
return dynamic_config | ||
logging.debug("No cached dynamic config for %s, downloading from server" % STUDY_CONFIG) | ||
download_url = "https://raw.githubusercontent.com/e-mission/nrel-openpath-deploy-configs/main/configs/" + STUDY_CONFIG + ".nrel-op.json" | ||
logging.debug("About to download config from %s" % download_url) | ||
r = requests.get(download_url) | ||
if r.status_code != 200: | ||
logging.debug(f"Unable to download study config, status code: {r.status_code}") | ||
# sys.exit(1) | ||
# TODO what to do here? What if Github is down or something? | ||
# If we terminate, will the pipeline just try again later? | ||
else: | ||
dynamic_config = json.loads(r.text) | ||
logging.debug(f"Successfully downloaded config with version {dynamic_config['version']} "\ | ||
f"for {dynamic_config['intro']['translated_text']['en']['deployment_name']} "\ | ||
f"and data collection URL {dynamic_config['server']['connectUrl']}") | ||
return dynamic_config |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -193,12 +193,13 @@ def create_and_link_timeline(ts, timeline, last_confirmed_place): | |
confirmed_places, confirmed_trips) | ||
return confirmed_tl | ||
|
||
def get_section_summary(ts, cleaned_trip, section_key): | ||
def get_section_summary(ts, cleaned_trip, section_key, mode_prop="sensed_mode"): | ||
""" | ||
Returns the proportions of the distance, duration and count for each mode | ||
in this trip. Note that sections are unimodal by definition. | ||
cleaned_trip: the cleaned trip object associated with the sections | ||
section_key: 'inferred_section' or 'cleaned_section' | ||
mode_prop: the section property used as the basis for mode: 'sensed_mode' or 'ble_sensed_mode'. | ||
""" | ||
logging.debug(f"get_section_summary({cleaned_trip['_id']}, {section_key}) called") | ||
sections = esdt.get_sections_for_trip(key = section_key, | ||
|
@@ -207,12 +208,14 @@ def get_section_summary(ts, cleaned_trip, section_key): | |
logging.warning("While getting section summary, section length = 0. This should never happen, but let's not crash if it does") | ||
return {"distance": {}, "duration": {}, "count": {}} | ||
sections_df = ts.to_data_df(section_key, sections) | ||
cleaned_section_mapper = lambda sm: ecwm.MotionTypes(sm).name | ||
inferred_section_mapper = lambda sm: ecwmp.PredictedModeTypes(sm).name | ||
sel_section_mapper = cleaned_section_mapper \ | ||
if section_key == "analysis/cleaned_section" else inferred_section_mapper | ||
sections_df["sensed_mode_str"] = sections_df["sensed_mode"].apply(sel_section_mapper) | ||
grouped_section_df = sections_df.groupby("sensed_mode_str") | ||
if mode_prop == "ble_sensed_mode": | ||
mapper = lambda bsm: bsm['baseMode'] if bsm is not None else ecwm.MotionTypes.UNKNOWN.name | ||
elif section_key == "analysis/cleaned_section": | ||
mapper = lambda sm: ecwm.MotionTypes(sm).name | ||
else: | ||
mapper = lambda sm: ecwmp.PredictedModeTypes(sm).name | ||
sections_df[mode_prop + "_str"] = sections_df[mode_prop].apply(mapper) | ||
grouped_section_df = sections_df.groupby(mode_prop + "_str") | ||
retVal = { | ||
"distance": grouped_section_df.distance.sum().to_dict(), | ||
"duration": grouped_section_df.duration.sum().to_dict(), | ||
|
@@ -233,6 +236,7 @@ def create_confirmed_entry(ts, tce, confirmed_key, input_key_list): | |
tce["data"]["cleaned_trip"]) | ||
confirmed_object_data['inferred_section_summary'] = get_section_summary(ts, cleaned_trip, "analysis/inferred_section") | ||
confirmed_object_data['cleaned_section_summary'] = get_section_summary(ts, cleaned_trip, "analysis/cleaned_section") | ||
confirmed_object_data['ble_sensed_summary'] = get_section_summary(ts, cleaned_trip, "analysis/inferred_section", mode_prop="ble_sensed_mode") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would assume that you would want to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sensed data goes with sensor data, makes sense |
||
elif (confirmed_key == esda.CONFIRMED_PLACE_KEY): | ||
confirmed_object_data["cleaned_place"] = tce.get_id() | ||
confirmed_object_data["user_input"] = \ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should throw an exception instead of terminating. That will cause the pipeline to catch the exception and mark that state as incomplete, which will ensure that any unprocessed data will be processed in the next run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO