Skip to content

Commit

Permalink
Implement the initial pipeline state to match incoming user inputs wi…
Browse files Browse the repository at this point in the history
…th existing confirmed trips

+ remove the no-op and create our familiar pipeline architecture where we track the last processed input
+ implement the code to find the matching trip, update its user_input field, and store it back
+ minor fixes to existing code to support this
    - pull out the metadata key -> name function so it can be used for both matches
    - add a new key for confirmed trips
+ add unit test which tests the same inputs but with the user input being post-loaded

Testing done:
- All tests pass
  • Loading branch information
shankari committed Dec 18, 2020
1 parent 986a534 commit ccb98e9
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 11 deletions.
39 changes: 36 additions & 3 deletions emission/analysis/userinput/matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,43 @@
import emission.storage.pipeline_queries as epq
import emission.core.wrapper.entry as ecwe

obj_to_dict_key = lambda key: key.split("/")[1]

def match_incoming_user_inputs(user_id):
time_query = epq.get_time_range_for_incoming_userinput_match(user_id)
print("this is currently a no-op")
epq.mark_incoming_userinput_match_done(user_id, None)
try:
last_user_input_done = match_incoming_inputs(user_id, time_query)
if last_user_input_done is None:
logging.debug("after run, last_user_input_done == None, must be early return")
epq.mark_incoming_userinput_match_done(user_id, None)
else:
epq.mark_incoming_userinput_match_done(user_id, last_user_input_done.metadata.write_ts)
except:
logging.exception("Error while matching incoming user inputs, timestamp is unchanged")
epq.mark_incoming_userinput_match_failed(user_id)

def match_incoming_inputs(user_id, timerange):
ts = esta.TimeSeries.get_time_series(user_id)
input_key_list = eac.get_config()["userinput.keylist"]
toMatchInputs = [ecwe.Entry(e) for e in ts.find_entries(input_key_list, time_query=timerange)]
logging.debug("Matching %d inputs to trips" % len(toMatchInputs))
lastInputProcessed = None
if len(toMatchInputs) == 0:
logging.debug("len(toMatchInputs) == 0, early return")
return None
for ui in toMatchInputs:
confirmed_trip = esdt.get_trip_for_user_input_obj(ts, ui)
if confirmed_trip is not None:
input_name = obj_to_dict_key(ui.metadata.key)
confirmed_trip["data"]["user_input"][input_name] = ui.data.label
import emission.storage.timeseries.builtin_timeseries as estbt
estbt.BuiltinTimeSeries.update(confirmed_trip)
else:
logging.warn("No match found for user input %s, moving forward anyway" % ui)
lastInputProcessed = ui

return lastInputProcessed


def create_confirmed_objects(user_id):
time_query = epq.get_time_range_for_confirmed_object_creation(user_id)
Expand Down Expand Up @@ -59,7 +92,7 @@ def get_user_input_dict(ts, tct, input_key_list):
for ikey in input_key_list:
matched_userinput = esdt.get_user_input_for_trip_object(ts, tct, ikey)
if matched_userinput is not None:
ikey_name = ikey.split("/")[1]
ikey_name = obj_to_dict_key(ikey)
tct_userinput[ikey_name] = matched_userinput.data.label
logging.debug("for trip %s, returning user input dict %s" % (tct.get_id(), tct_userinput))
return tct_userinput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
CLEANED_STOP_KEY = "analysis/cleaned_stop"
CLEANED_UNTRACKED_KEY = "analysis/cleaned_untracked"
CLEANED_LOCATION_KEY = "analysis/recreated_location"
CONFIRMED_TRIP_KEY = "analysis/confirmed_trip"
METRICS_DAILY_USER_COUNT = "metrics/daily_user_count"
METRICS_DAILY_MEAN_COUNT = "metrics/daily_mean_count"
METRICS_DAILY_USER_DISTANCE = "metrics/daily_user_distance"
Expand Down
2 changes: 1 addition & 1 deletion emission/storage/decorations/trip_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def final_candidate(filter_fn, potential_candidates):
sorted_pc = sorted(extra_filtered_potential_candidates, key=lambda c:c["metadata"]["write_ts"])
entry_detail = lambda c: c.data.label if "label" in c.data else c.data.start_fmt_time
logging.debug("sorted candidates are %s" %
[(c.metadata.write_fmt_time, entry_detail(c)) for c in sorted_pc])
[{"write_fmt_time": c.metadata.write_fmt_time, "detail": entry_detail(c)} for c in sorted_pc])
most_recent_entry = sorted_pc[-1]
logging.debug("most recent entry is %s, %s" %
(most_recent_entry.metadata.write_fmt_time, entry_detail(most_recent_entry)))
Expand Down
14 changes: 7 additions & 7 deletions emission/tests/analysisTests/userInputTests/TestUserInput.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,13 @@ def testJun20Preload(self):
ld = ecwl.LocalDate({'year': 2016, 'month': 6, 'day': 20})
self.checkConfirmedTripsAndSections(dataFile, ld, preload=True)

# def testJun20Postload(self):
# # Same as testJun20Preload, except that the user input arrives after the
# # pipeline is run for the first time, and the matching happens on the
# # next pipeline run
# dataFile = "emission/tests/data/real_examples/shankari_2016-06-20"
# ld = ecwl.LocalDate({'year': 2016, 'month': 6, 'day': 20})
# self.checkConfirmedTripsAndSections(dataFile, ld, preload=False)
def testJun20Postload(self):
# Same as testJun20Preload, except that the user input arrives after the
# pipeline is run for the first time, and the matching happens on the
# next pipeline run
dataFile = "emission/tests/data/real_examples/shankari_2016-06-20"
ld = ecwl.LocalDate({'year': 2016, 'month': 6, 'day': 20})
self.checkConfirmedTripsAndSections(dataFile, ld, preload=False)

if __name__ == '__main__':
etc.configLogging()
Expand Down

0 comments on commit ccb98e9

Please sign in to comment.