From ccb98e963e51c6cf9a2d7e26fffc593a48608c8a Mon Sep 17 00:00:00 2001 From: Shankari Date: Thu, 17 Dec 2020 22:44:21 -0800 Subject: [PATCH] Implement the initial pipeline state to match incoming user inputs with existing confirmed trips + remove the no-op and create our familiar pipeline architecture where we track the last processed input + implement the code to find the matching trip, update its user_input field, and store it back + minor fixes to existing code to support this - pull out the metadata key -> name function so it can be used for both matches - add a new key for confirmed trips + add unit test which tests the same inputs but with the user input being post-loaded Testing done: - All tests pass --- emission/analysis/userinput/matcher.py | 39 +++++++++++++++++-- .../analysis_timeseries_queries.py | 1 + emission/storage/decorations/trip_queries.py | 2 +- .../userInputTests/TestUserInput.py | 14 +++---- 4 files changed, 45 insertions(+), 11 deletions(-) diff --git a/emission/analysis/userinput/matcher.py b/emission/analysis/userinput/matcher.py index 081f48d6d..df4875a1f 100644 --- a/emission/analysis/userinput/matcher.py +++ b/emission/analysis/userinput/matcher.py @@ -10,10 +10,43 @@ import emission.storage.pipeline_queries as epq import emission.core.wrapper.entry as ecwe +obj_to_dict_key = lambda key: key.split("/")[1] + def match_incoming_user_inputs(user_id): time_query = epq.get_time_range_for_incoming_userinput_match(user_id) - print("this is currently a no-op") - epq.mark_incoming_userinput_match_done(user_id, None) + try: + last_user_input_done = match_incoming_inputs(user_id, time_query) + if last_user_input_done is None: + logging.debug("after run, last_user_input_done == None, must be early return") + epq.mark_incoming_userinput_match_done(user_id, None) + else: + epq.mark_incoming_userinput_match_done(user_id, last_user_input_done.metadata.write_ts) + except: + logging.exception("Error while matching incoming user inputs, timestamp is unchanged") + epq.mark_incoming_userinput_match_failed(user_id) + +def match_incoming_inputs(user_id, timerange): + ts = esta.TimeSeries.get_time_series(user_id) + input_key_list = eac.get_config()["userinput.keylist"] + toMatchInputs = [ecwe.Entry(e) for e in ts.find_entries(input_key_list, time_query=timerange)] + logging.debug("Matching %d inputs to trips" % len(toMatchInputs)) + lastInputProcessed = None + if len(toMatchInputs) == 0: + logging.debug("len(toMatchInputs) == 0, early return") + return None + for ui in toMatchInputs: + confirmed_trip = esdt.get_trip_for_user_input_obj(ts, ui) + if confirmed_trip is not None: + input_name = obj_to_dict_key(ui.metadata.key) + confirmed_trip["data"]["user_input"][input_name] = ui.data.label + import emission.storage.timeseries.builtin_timeseries as estbt + estbt.BuiltinTimeSeries.update(confirmed_trip) + else: + logging.warn("No match found for user input %s, moving forward anyway" % ui) + lastInputProcessed = ui + + return lastInputProcessed + def create_confirmed_objects(user_id): time_query = epq.get_time_range_for_confirmed_object_creation(user_id) @@ -59,7 +92,7 @@ def get_user_input_dict(ts, tct, input_key_list): for ikey in input_key_list: matched_userinput = esdt.get_user_input_for_trip_object(ts, tct, ikey) if matched_userinput is not None: - ikey_name = ikey.split("/")[1] + ikey_name = obj_to_dict_key(ikey) tct_userinput[ikey_name] = matched_userinput.data.label logging.debug("for trip %s, returning user input dict %s" % (tct.get_id(), tct_userinput)) return tct_userinput diff --git a/emission/storage/decorations/analysis_timeseries_queries.py b/emission/storage/decorations/analysis_timeseries_queries.py index d4ad9a073..c7def2790 100644 --- a/emission/storage/decorations/analysis_timeseries_queries.py +++ b/emission/storage/decorations/analysis_timeseries_queries.py @@ -25,6 +25,7 @@ CLEANED_STOP_KEY = "analysis/cleaned_stop" CLEANED_UNTRACKED_KEY = "analysis/cleaned_untracked" CLEANED_LOCATION_KEY = "analysis/recreated_location" +CONFIRMED_TRIP_KEY = "analysis/confirmed_trip" METRICS_DAILY_USER_COUNT = "metrics/daily_user_count" METRICS_DAILY_MEAN_COUNT = "metrics/daily_mean_count" METRICS_DAILY_USER_DISTANCE = "metrics/daily_user_distance" diff --git a/emission/storage/decorations/trip_queries.py b/emission/storage/decorations/trip_queries.py index d56445b96..9487b604a 100644 --- a/emission/storage/decorations/trip_queries.py +++ b/emission/storage/decorations/trip_queries.py @@ -146,7 +146,7 @@ def final_candidate(filter_fn, potential_candidates): sorted_pc = sorted(extra_filtered_potential_candidates, key=lambda c:c["metadata"]["write_ts"]) entry_detail = lambda c: c.data.label if "label" in c.data else c.data.start_fmt_time logging.debug("sorted candidates are %s" % - [(c.metadata.write_fmt_time, entry_detail(c)) for c in sorted_pc]) + [{"write_fmt_time": c.metadata.write_fmt_time, "detail": entry_detail(c)} for c in sorted_pc]) most_recent_entry = sorted_pc[-1] logging.debug("most recent entry is %s, %s" % (most_recent_entry.metadata.write_fmt_time, entry_detail(most_recent_entry))) diff --git a/emission/tests/analysisTests/userInputTests/TestUserInput.py b/emission/tests/analysisTests/userInputTests/TestUserInput.py index a4b2938e7..df0288614 100644 --- a/emission/tests/analysisTests/userInputTests/TestUserInput.py +++ b/emission/tests/analysisTests/userInputTests/TestUserInput.py @@ -132,13 +132,13 @@ def testJun20Preload(self): ld = ecwl.LocalDate({'year': 2016, 'month': 6, 'day': 20}) self.checkConfirmedTripsAndSections(dataFile, ld, preload=True) -# def testJun20Postload(self): -# # Same as testJun20Preload, except that the user input arrives after the -# # pipeline is run for the first time, and the matching happens on the -# # next pipeline run -# dataFile = "emission/tests/data/real_examples/shankari_2016-06-20" -# ld = ecwl.LocalDate({'year': 2016, 'month': 6, 'day': 20}) -# self.checkConfirmedTripsAndSections(dataFile, ld, preload=False) + def testJun20Postload(self): + # Same as testJun20Preload, except that the user input arrives after the + # pipeline is run for the first time, and the matching happens on the + # next pipeline run + dataFile = "emission/tests/data/real_examples/shankari_2016-06-20" + ld = ecwl.LocalDate({'year': 2016, 'month': 6, 'day': 20}) + self.checkConfirmedTripsAndSections(dataFile, ld, preload=False) if __name__ == '__main__': etc.configLogging()